Skip to content

Commit

Permalink
Include endpoint info in retry messages
Browse files Browse the repository at this point in the history
  • Loading branch information
ximinez committed Nov 14, 2024
1 parent 7f74c46 commit 34d918a
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 54 deletions.
60 changes: 33 additions & 27 deletions src/test/jtx/impl/JSONRPCClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,30 +36,6 @@ namespace test {

class JSONRPCClient : public AbstractClient
{
static boost::asio::ip::tcp::endpoint
getEndpoint(BasicConfig const& cfg)
{
auto& log = std::cerr;
ParsedPort common;
parse_Port(common, cfg["server"], log);
for (auto const& name : cfg.section("server").values())
{
if (!cfg.exists(name))
continue;
ParsedPort pp;
parse_Port(pp, cfg[name], log);
if (pp.protocol.count("http") == 0)
continue;
using namespace boost::asio::ip;
if (pp.ip && pp.ip->is_unspecified())
*pp.ip = pp.ip->is_v6() ? address{address_v6::loopback()}
: address{address_v4::loopback()};
return {*pp.ip, *pp.port};
}
Throw<std::runtime_error>("Missing HTTP port");
return {}; // Silence compiler control paths return value warning
}

template <class ConstBufferSequence>
static std::string
buffer_string(ConstBufferSequence const& b)
Expand All @@ -72,6 +48,7 @@ class JSONRPCClient : public AbstractClient
}

boost::asio::ip::tcp::endpoint ep_;
std::string endpointLabel_;
boost::asio::io_service ios_;
boost::asio::ip::tcp::socket stream_;
boost::beast::multi_buffer bin_;
Expand All @@ -83,6 +60,9 @@ class JSONRPCClient : public AbstractClient
: ep_(getEndpoint(cfg)), stream_(ios_), rpc_version_(rpc_version)
{
stream_.connect(ep_);
std::stringstream ss;
ss << ep_;
endpointLabel_ = ss.str();
}

~JSONRPCClient() override
Expand All @@ -91,6 +71,30 @@ class JSONRPCClient : public AbstractClient
// stream_.close();
}

static boost::asio::ip::tcp::endpoint
getEndpoint(BasicConfig const& cfg)
{
auto& log = std::cerr;
ParsedPort common;
parse_Port(common, cfg["server"], log);
for (auto const& name : cfg.section("server").values())
{
if (!cfg.exists(name))
continue;
ParsedPort pp;
parse_Port(pp, cfg[name], log);
if (pp.protocol.count("http") == 0)
continue;
using namespace boost::asio::ip;
if (pp.ip && pp.ip->is_unspecified())
*pp.ip = pp.ip->is_v6() ? address{address_v6::loopback()}
: address{address_v4::loopback()};
return {*pp.ip, *pp.port};
}
Throw<std::runtime_error>("Missing HTTP port");
return {}; // Silence compiler control paths return value warning
}

/*
Return value is an Object type with up to three keys:
status
Expand Down Expand Up @@ -137,13 +141,13 @@ class JSONRPCClient : public AbstractClient

Env::retry(
[&]() { write(stream_, req); },
"JSONRPCClient::invoke write",
"JSONRPCClient::invoke write " + endpointLabel_,
10ms);

response<dynamic_body> res;
Env::retry(
[&]() { read(stream_, bin_, res); },
"JSONRPCClient::invoke read",
"JSONRPCClient::invoke read " + endpointLabel_,
10ms);

Json::Reader jr;
Expand All @@ -170,9 +174,11 @@ makeJSONRPCClient(Config const& cfg, unsigned rpc_version)
using namespace ripple::test::jtx;

std::unique_ptr<JSONRPCClient> ret;
std::stringstream endpoint;
endpoint << JSONRPCClient::getEndpoint(cfg);
Env::retry(
[&]() { ret = std::make_unique<JSONRPCClient>(cfg, rpc_version); },
"makeJSONRPCClient",
"makeJSONRPCClient " + endpoint.str(),
250ms);
return ret;
}
Expand Down
60 changes: 33 additions & 27 deletions src/test/jtx/impl/WSClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,31 +50,6 @@ class WSClientImpl : public WSClient
}
};

static boost::asio::ip::tcp::endpoint
getEndpoint(BasicConfig const& cfg, bool v2)
{
auto& log = std::cerr;
ParsedPort common;
parse_Port(common, cfg["server"], log);
auto const ps = v2 ? "ws2" : "ws";
for (auto const& name : cfg.section("server").values())
{
if (!cfg.exists(name))
continue;
ParsedPort pp;
parse_Port(pp, cfg[name], log);
if (pp.protocol.count(ps) == 0)
continue;
using namespace boost::asio::ip;
if (pp.ip && pp.ip->is_unspecified())
*pp.ip = pp.ip->is_v6() ? address{address_v6::loopback()}
: address{address_v4::loopback()};
return {*pp.ip, *pp.port};
}
Throw<std::runtime_error>("Missing WebSocket port");
return {}; // Silence compiler control paths return value warning
}

template <class ConstBuffers>
static std::string
buffer_string(ConstBuffers const& b)
Expand All @@ -94,6 +69,7 @@ class WSClientImpl : public WSClient
boost::asio::ip::tcp::socket stream_;
boost::beast::websocket::stream<boost::asio::ip::tcp::socket&> ws_;
boost::beast::multi_buffer rb_;
std::string endpointLabel_;

bool peerClosed_ = false;

Expand Down Expand Up @@ -153,6 +129,9 @@ class WSClientImpl : public WSClient
rb_,
strand_.wrap(std::bind(
&WSClientImpl::on_read_msg, this, std::placeholders::_1)));
std::stringstream ss;
ss << ep;
endpointLabel_ = ss.str();
}
catch (std::exception&)
{
Expand All @@ -166,6 +145,31 @@ class WSClientImpl : public WSClient
cleanup();
}

static boost::asio::ip::tcp::endpoint
getEndpoint(BasicConfig const& cfg, bool v2)
{
auto& log = std::cerr;
ParsedPort common;
parse_Port(common, cfg["server"], log);
auto const ps = v2 ? "ws2" : "ws";
for (auto const& name : cfg.section("server").values())
{
if (!cfg.exists(name))
continue;
ParsedPort pp;
parse_Port(pp, cfg[name], log);
if (pp.protocol.count(ps) == 0)
continue;
using namespace boost::asio::ip;
if (pp.ip && pp.ip->is_unspecified())
*pp.ip = pp.ip->is_v6() ? address{address_v6::loopback()}
: address{address_v4::loopback()};
return {*pp.ip, *pp.port};
}
Throw<std::runtime_error>("Missing WebSocket port");
return {}; // Silence compiler control paths return value warning
}

Json::Value
invoke(std::string const& cmd, Json::Value const& params) override
{
Expand All @@ -190,7 +194,7 @@ class WSClientImpl : public WSClient
auto const s = to_string(jp);
Env::retry(
[&]() { ws_.write_some(true, buffer(s)); },
"WSClient::invoke write_some",
"WSClient::invoke write_some " + endpointLabel_,
100ms);
}

Expand Down Expand Up @@ -312,12 +316,14 @@ makeWSClient(
using namespace ripple::test::jtx;

std::unique_ptr<WSClientImpl> ret;
std::stringstream endpoint;
endpoint << WSClientImpl::getEndpoint(cfg, v2);

Env::retry(
[&]() {
ret = std::make_unique<WSClientImpl>(cfg, v2, rpc_version, headers);
},
"makeWSClient",
"makeWSClient " + endpoint.str(),
250ms);
return ret;
}
Expand Down

0 comments on commit 34d918a

Please sign in to comment.