Skip to content

Commit

Permalink
Build raft cluster from state manager
Browse files Browse the repository at this point in the history
Signed-off-by: James Lovejoy <[email protected]>
  • Loading branch information
metalicjames authored and HalosGhost committed Aug 15, 2022
1 parent 9daab81 commit 0a5635a
Show file tree
Hide file tree
Showing 12 changed files with 93 additions and 231 deletions.
19 changes: 9 additions & 10 deletions src/uhs/atomizer/atomizer/atomizer_raft.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,24 +12,23 @@
#include "util/serialization/util.hpp"

namespace cbdc::atomizer {
atomizer_raft::atomizer_raft(uint32_t atomizer_id,
const network::endpoint_t& raft_endpoint,
size_t stxo_cache_depth,
std::shared_ptr<logging::log> logger,
config::options opts,
nuraft::cb_func::func_type raft_callback,
bool wait_for_followers)
atomizer_raft::atomizer_raft(
uint32_t atomizer_id,
std::vector<network::endpoint_t> raft_endpoints,
size_t stxo_cache_depth,
std::shared_ptr<logging::log> logger,
config::options opts,
nuraft::cb_func::func_type raft_callback)
: node(static_cast<int>(atomizer_id),
raft_endpoint,
std::move(raft_endpoints),
m_node_type,
false,
nuraft::cs_new<state_machine>(
stxo_cache_depth,
"atomizer_snps_" + std::to_string(atomizer_id)),
0,
logger,
std::move(raft_callback),
wait_for_followers),
std::move(raft_callback)),
m_log(std::move(logger)),
m_opts(std::move(opts)) {}

Expand Down
10 changes: 3 additions & 7 deletions src/uhs/atomizer/atomizer/atomizer_raft.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,21 +22,17 @@ namespace cbdc::atomizer {
public:
/// Constructor.
/// \param atomizer_id ID of the raft node.
/// \param raft_endpoint endpoint for raft communications.
/// \param raft_endpoints node endpoints for raft communications.
/// \param stxo_cache_depth number of blocks in the spent output cache.
/// \param logger log instance.
/// \param opts configuration options.
/// \param raft_callback NuRaft callback for raft events.
/// \param wait_for_followers true if the leader raft node should
/// re-attempt to add all followers to the
/// cluster until success.
atomizer_raft(uint32_t atomizer_id,
const network::endpoint_t& raft_endpoint,
std::vector<network::endpoint_t> raft_endpoints,
size_t stxo_cache_depth,
std::shared_ptr<logging::log> logger,
config::options opts,
nuraft::cb_func::func_type raft_callback,
bool wait_for_followers);
nuraft::cb_func::func_type raft_callback);

/// Serialize and replicate the given request in the atomizer raft
/// cluster. Return the response asynchronously via the given result
Expand Down
29 changes: 10 additions & 19 deletions src/uhs/atomizer/atomizer/controller.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,16 @@ namespace cbdc::atomizer {
: m_atomizer_id(atomizer_id),
m_opts(opts),
m_logger(std::move(log)),
m_raft_node(
static_cast<uint32_t>(atomizer_id),
opts.m_atomizer_raft_endpoints[atomizer_id].value(),
m_opts.m_stxo_cache_depth,
m_logger,
opts,
[&](auto&& type, auto&& param) {
return raft_callback(std::forward<decltype(type)>(type),
std::forward<decltype(param)>(param));
},
m_opts.m_wait_for_followers) {}
m_raft_node(static_cast<uint32_t>(atomizer_id),
opts.m_atomizer_raft_endpoints,
m_opts.m_stxo_cache_depth,
m_logger,
m_opts,
[&](auto&& type, auto&& param) {
return raft_callback(
std::forward<decltype(type)>(type),
std::forward<decltype(param)>(param));
}) {}

controller::~controller() {
m_raft_node.stop();
Expand Down Expand Up @@ -81,14 +80,6 @@ namespace cbdc::atomizer {
return false;
}

auto raft_endpoints = std::vector<network::endpoint_t>();
for(const auto& s : m_opts.m_atomizer_raft_endpoints) {
raft_endpoints.push_back(*s);
}
if(!m_raft_node.build_cluster(raft_endpoints)) {
return false;
}

m_tx_notify_thread = std::thread{[&] {
tx_notify_handler();
}};
Expand Down
13 changes: 3 additions & 10 deletions src/uhs/twophase/coordinator/controller.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ namespace cbdc::coordinator {

m_raft_serv = std::make_shared<raft::node>(
static_cast<int>(m_node_id),
m_opts.m_coordinator_raft_endpoints[m_coordinator_id][m_node_id],
m_opts.m_coordinator_raft_endpoints[m_coordinator_id],
"coordinator" + std::to_string(m_coordinator_id),
true,
m_state_machine,
Expand All @@ -94,8 +94,7 @@ namespace cbdc::coordinator {
[&](auto&& res, auto&& err) {
return raft_callback(std::forward<decltype(res)>(res),
std::forward<decltype(err)>(err));
},
m_opts.m_wait_for_followers);
});

// Thread to handle starting and stopping the message handler and dtx
// batch processing threads when triggered by the raft callback
Expand All @@ -107,13 +106,7 @@ namespace cbdc::coordinator {
// Initialize NuRaft with the state machine we just created. Register
// our callback function to notify us when we become a leader or
// follower.
if(!m_raft_serv->init(m_raft_params)) {
return false;
}

// Connect to the other raft nodes in our coordinator cluster
return m_raft_serv->build_cluster(
m_opts.m_coordinator_raft_endpoints[m_coordinator_id]);
return m_raft_serv->init(m_raft_params);
}

auto controller::raft_callback(nuraft::cb_func::Type type,
Expand Down
11 changes: 2 additions & 9 deletions src/uhs/twophase/locking_shard/controller.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ namespace cbdc::locking_shard {

m_raft_serv = std::make_shared<raft::node>(
static_cast<int>(m_node_id),
m_opts.m_locking_shard_raft_endpoints[m_shard_id][m_node_id],
m_opts.m_locking_shard_raft_endpoints[m_shard_id],
"shard" + std::to_string(m_shard_id),
false,
m_state_machine,
Expand All @@ -87,20 +87,13 @@ namespace cbdc::locking_shard {
[&](auto&& res, auto&& err) {
return raft_callback(std::forward<decltype(res)>(res),
std::forward<decltype(err)>(err));
},
m_opts.m_wait_for_followers);
});

if(!m_raft_serv->init(params)) {
m_logger->error("Failed to initialize raft server");
return false;
}

if(!m_raft_serv->build_cluster(
m_opts.m_locking_shard_raft_endpoints[m_shard_id])) {
m_logger->error("Failed to build raft cluster");
return false;
}

auto status_rpc_server = std::make_unique<
cbdc::rpc::blocking_tcp_server<rpc::status_request,
rpc::status_response>>(
Expand Down
10 changes: 5 additions & 5 deletions src/util/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -475,7 +475,11 @@ namespace cbdc::config {

const auto endpoint_key = get_atomizer_raft_endpoint_key(i);
const auto endpoint_str = cfg.get_endpoint(endpoint_key);
opts.m_atomizer_raft_endpoints.push_back(endpoint_str);
if(!endpoint_str) {
return "No raft endpoint specified for atomizer "
+ std::to_string(i) + " (" + endpoint_key + ")";
}
opts.m_atomizer_raft_endpoints.push_back(endpoint_str.value());
}

opts.m_target_block_interval
Expand Down Expand Up @@ -584,10 +588,6 @@ namespace cbdc::config {

opts.m_batch_size
= cfg.get_ulong(batch_size_key).value_or(opts.m_batch_size);
auto wait_for_followers = cfg.get_ulong(wait_for_followers_key);
if(wait_for_followers.has_value()) {
opts.m_wait_for_followers = wait_for_followers.value() != 0;
}
}

void read_loadgen_options(options& opts, const parser& cfg) {
Expand Down
8 changes: 1 addition & 7 deletions src/util/common/config.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@ namespace cbdc::config {
static constexpr size_t initial_mint_value{100};
static constexpr size_t watchtower_block_cache_size{100};
static constexpr size_t watchtower_error_cache_size{1000000};
static constexpr bool wait_for_followers{true};
static constexpr size_t input_count{2};
static constexpr size_t output_count{2};
static constexpr double fixed_tx_rate{1.0};
Expand Down Expand Up @@ -162,8 +161,7 @@ namespace cbdc::config {
/// List of shard endpoints, ordered by shard ID.
std::vector<network::endpoint_t> m_shard_endpoints;
/// List of atomizer raft endpoints, ordered by atomizer ID.
std::vector<std::optional<network::endpoint_t>>
m_atomizer_raft_endpoints;
std::vector<network::endpoint_t> m_atomizer_raft_endpoints;
/// Maximum transaction batch size for one log entry in the raft
/// atomizer or one batch in the coordinator.
size_t m_batch_size{defaults::batch_size};
Expand Down Expand Up @@ -252,10 +250,6 @@ namespace cbdc::config {
/// Number of load generators over which to split pre-seeded UTXOs.
size_t m_loadgen_count{0};

/// Flag for whether the raft leader should re-attempt to join
/// followers to the cluster until successful.
bool m_wait_for_followers{defaults::wait_for_followers};

/// Private keys for sentinels.
std::unordered_map<size_t, privkey_t> m_sentinel_private_keys;

Expand Down
100 changes: 6 additions & 94 deletions src/util/raft/node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,27 +7,24 @@

namespace cbdc::raft {
node::node(int node_id,
const network::endpoint_t& raft_endpoint,
std::vector<network::endpoint_t> raft_endpoints,
const std::string& node_type,
bool blocking,
nuraft::ptr<nuraft::state_machine> sm,
size_t asio_thread_pool_size,
std::shared_ptr<logging::log> logger,
nuraft::cb_func::func_type raft_cb,
bool wait_for_followers)
nuraft::cb_func::func_type raft_cb)
: m_node_id(static_cast<uint32_t>(node_id)),
m_blocking(blocking),
m_port(raft_endpoint.second),
m_port(raft_endpoints[m_node_id].second),
m_raft_logger(nuraft::cs_new<console_logger>(std::move(logger))),
m_smgr(nuraft::cs_new<state_manager>(
static_cast<uint32_t>(m_node_id + 1),
raft_endpoint.first + ":" + std::to_string(m_port),
node_type + "_raft_log_" + std::to_string(m_node_id),
node_type + "_raft_config_" + std::to_string(m_node_id) + ".dat",
node_type + "_raft_state_" + std::to_string(m_node_id)
+ ".dat")),
m_sm(std::move(sm)),
m_wait_for_followers(wait_for_followers) {
node_type + "_raft_state_" + std::to_string(m_node_id) + ".dat",
std::move(raft_endpoints))),
m_sm(std::move(sm)) {
m_asio_opt.thread_pool_size_ = asio_thread_pool_size;
m_init_opts.raft_callback_ = std::move(raft_cb);
if(m_node_id != 0) {
Expand Down Expand Up @@ -69,91 +66,6 @@ namespace cbdc::raft {
return true;
}

auto node::add_cluster_nodes(
const std::vector<network::endpoint_t>& raft_servers) const -> bool {
static constexpr auto sleep_time = std::chrono::milliseconds(100);

auto srvs = std::vector<std::pair<int, std::string>>();
for(size_t i{0}; i < raft_servers.size(); i++) {
if(i != static_cast<size_t>(m_node_id)) {
const auto& ep = raft_servers[i];
auto ep_str = ep.first + ":" + std::to_string(ep.second);
srvs.emplace_back(std::make_pair(i + 1, ep_str));
}
}

for(const auto& srv_data : srvs) {
nuraft::srv_config srv(srv_data.first, srv_data.second);

for(;;) {
std::cout << "Adding raft server: " << srv.get_id() << ", "
<< srv.get_endpoint() << std::flush;

auto ret = m_raft_instance->add_srv(srv);
if(!ret->get_accepted()
&& ret->get_result_code()
!= nuraft::cmd_result_code::SERVER_IS_JOINING) {
std::cout << "Failed to add raft server: " << srv.get_id()
<< ", " << srv.get_endpoint()
<< ", error: " << ret->get_result_str()
<< std::endl;
return false;
}

nuraft::ptr<nuraft::srv_config> srv_conf;
int attempts{0};
const auto max_retries = 50;
do {
srv_conf = m_raft_instance->get_srv_config(srv_data.first);
std::cout << "." << std::flush;
attempts++;
std::this_thread::sleep_for(sleep_time);
} while(!srv_conf && attempts < max_retries);

if(!srv_conf) {
std::cout << "timed out" << std::endl;
if(!m_wait_for_followers) {
return false;
}
} else {
break;
}
}

std::cout << "done" << std::endl;
}

return true;
}

auto
node::build_cluster(const std::vector<network::endpoint_t>& raft_servers)
-> bool {
std::vector<nuraft::ptr<nuraft::srv_config>> srv_configs;
m_raft_instance->get_srv_config_all(srv_configs);

static constexpr auto sleep_time = std::chrono::milliseconds(100);

if(srv_configs.size() < raft_servers.size()) {
if(m_node_id != 0) {
std::cout << "Waiting for raft cluster";
do {
srv_configs.clear();
m_raft_instance->get_srv_config_all(srv_configs);
std::cout << "." << std::flush;
std::this_thread::sleep_for(sleep_time);
} while(srv_configs.size() < raft_servers.size());
std::cout << "done" << std::endl;
} else if(!add_cluster_nodes(raft_servers)) {
return false;
}
}

m_raft_instance->restart_election_timer();

return true;
}

auto node::is_leader() const -> bool {
return m_raft_instance->is_leader();
}
Expand Down
Loading

0 comments on commit 0a5635a

Please sign in to comment.