Skip to content
This repository has been archived by the owner on Oct 4, 2019. It is now read-only.

Commit

Permalink
Merge pull request #500 from GolosChain/499-configurable-read-write-l…
Browse files Browse the repository at this point in the history
…ocks

Configurable read write locks. #499
  • Loading branch information
kotbegemot authored Apr 2, 2018
2 parents a2e3455 + a3e0b5f commit 8ed290b
Show file tree
Hide file tree
Showing 13 changed files with 224 additions and 55 deletions.
14 changes: 7 additions & 7 deletions libraries/chain/database.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ namespace golos {

if (chainbase_flags & chainbase::database::read_write) {
if (!find<dynamic_global_property_object>()) {
with_write_lock([&]() {
with_strong_write_lock([&]() {
init_genesis(initial_supply);
});
}
Expand All @@ -111,7 +111,7 @@ namespace golos {
auto log_head = _block_log.head();

// Rewind all undo state. This should return us to the state at the last irreversible block.
with_write_lock([&]() {
with_strong_write_lock([&]() {
undo_all();
FC_ASSERT(revision() ==
head_block_num(), "Chainbase revision does not match head block num",
Expand Down Expand Up @@ -161,7 +161,7 @@ namespace golos {
skip_validate_invariants |
skip_block_log;

with_write_lock([&]() {
with_strong_write_lock([&]() {
auto itr = _block_log.read_block(0);
auto last_block_num = _block_log.head()->block_num();

Expand Down Expand Up @@ -628,7 +628,7 @@ namespace golos {
//fc::time_point begin_time = fc::time_point::now();

bool result;
with_write_lock([&]() {
with_strong_write_lock([&]() {
detail::with_skip_flags(*this, skip, [&]() {
detail::without_pending_transactions(*this, std::move(_pending_tx), [&]() {
try {
Expand Down Expand Up @@ -756,7 +756,7 @@ namespace golos {
void database::push_transaction(const signed_transaction &trx, uint32_t skip) {
try {
FC_ASSERT(fc::raw::pack_size(trx) <= (get_dynamic_global_properties().maximum_block_size - 256));
with_write_lock([&]() {
with_weak_write_lock([&]() {
detail::with_producing(*this, [&]() {
detail::with_skip_flags(*this, skip, [&]() {
_push_transaction(trx);
Expand Down Expand Up @@ -830,7 +830,7 @@ namespace golos {

signed_block pending_block;

with_write_lock([&]() { detail::with_skip_flags(*this, skip, [&]() {
with_strong_write_lock([&]() { detail::with_skip_flags(*this, skip, [&]() {
//
// The following code throws away existing pending_tx_session and
// rebuilds it by re-applying pending transactions.
Expand Down Expand Up @@ -2930,7 +2930,7 @@ namespace golos {


void database::validate_transaction(const signed_transaction &trx) {
database::with_write_lock([&]() {
database::with_weak_write_lock([&]() {
auto session = start_undo_session(true);
_apply_transaction(trx);
session.undo();
Expand Down
2 changes: 1 addition & 1 deletion plugins/chain/include/golos/plugins/chain/plugin.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ namespace golos {

void plugin_shutdown() override;

bool accept_block(const protocol::signed_block &block, bool currently_syncing, uint32_t skip);
bool accept_block(const protocol::signed_block &block, bool currently_syncing = false, uint32_t skip = 0);

void accept_transaction(const protocol::signed_transaction &trx);

Expand Down
167 changes: 145 additions & 22 deletions plugins/chain/plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#include <iostream>
#include <golos/protocol/protocol.hpp>
#include <golos/protocol/types.hpp>
#include <future>

namespace golos {
namespace plugins {
Expand All @@ -31,11 +32,34 @@ namespace chain {

uint32_t allow_future_time = 5;

uint64_t read_wait_micro;
uint32_t max_read_wait_retries;

uint64_t write_wait_micro;
uint32_t max_write_wait_retries;

golos::chain::database db;

bool single_write_thread = false;

plugin_impl() {
// get default settings
read_wait_micro = db.read_wait_micro();
max_read_wait_retries = db.max_read_wait_retries();

write_wait_micro = db.write_wait_micro();
max_write_wait_retries = db.max_write_wait_retries();
}

// HELPERS
golos::chain::database &database() {
return db;
}

boost::asio::io_service& io_service() {
return appbase::app().get_io_service();
}

constexpr const static char *plugin_name = "chain_api";
static const std::string &name() {
static std::string name = plugin_name;
Expand All @@ -45,9 +69,6 @@ namespace chain {
void check_time_in_block(const protocol::signed_block &block);
bool accept_block(const protocol::signed_block &block, bool currently_syncing, uint32_t skip);
void accept_transaction(const protocol::signed_transaction &trx);


golos::chain::database db;
};

void plugin::plugin_impl::check_time_in_block(const protocol::signed_block &block) {
Expand All @@ -66,11 +87,40 @@ namespace chain {

check_time_in_block(block);

return db.push_block(block, skip);
if (single_write_thread) {
std::promise<bool> promise;
auto result = promise.get_future();

io_service().post([&]{
try {
promise.set_value(db.push_block(block, skip));
} catch(...) {
promise.set_exception(std::current_exception());
}
});
return result.get(); // if an exception was, it will be thrown
} else {
return db.push_block(block, skip);
}
}

void plugin::plugin_impl::accept_transaction(const protocol::signed_transaction &trx) {
db.push_transaction(trx);
if (single_write_thread) {
std::promise<bool> promise;
auto wait = promise.get_future();

io_service().post([&]{
try {
db.push_transaction(trx);
promise.set_value(true);
} catch(...) {
promise.set_exception(std::current_exception());
}
});
wait.get(); // if an exception was, it will be thrown
} else {
db.push_transaction(trx);
}
}

plugin::plugin() {
Expand All @@ -89,23 +139,73 @@ namespace chain {

void plugin::set_program_options(boost::program_options::options_description &cli,
boost::program_options::options_description &cfg) {
cfg.add_options()("shared-file-dir", boost::program_options::value<boost::filesystem::path>()->default_value("blockchain"),
"the location of the chain shared memory files (absolute path or relative to application data dir)")(
"shared-file-size", boost::program_options::value<std::string>()->default_value("54G"),
"Size of the shared memory file. Default: 54G")("checkpoint,c",
boost::program_options::value<std::vector<std::string>>()->composing(),
"Pairs of [BLOCK_NUM,BLOCK_ID] that should be enforced as checkpoints.")(
"flush-state-interval", boost::program_options::value<uint32_t>(),
"flush shared memory changes to disk every N blocks");
cli.add_options()("replay-blockchain", boost::program_options::bool_switch()->default_value(false),
"clear chain database and replay all blocks")("resync-blockchain",
boost::program_options::bool_switch()->default_value(
false),
"clear chain database and block log")(
"check-locks", boost::program_options::bool_switch()->default_value(false),
"Check correctness of chainbase locking")("validate-database-invariants",
boost::program_options::bool_switch()->default_value(false),
"Validate all supply invariants check out");
cfg.add_options()
(
"shared-file-dir",
boost::program_options::value<boost::filesystem::path>()->default_value("blockchain"),
"the location of the chain shared memory files (absolute path or relative to application data dir)"
)
(
"shared-file-size",
boost::program_options::value<std::string>()->default_value("64G"),
"Size of the shared memory file. Default: 54G"
)
(
"checkpoint,c",
boost::program_options::value<std::vector<std::string>>()->composing(),
"Pairs of [BLOCK_NUM,BLOCK_ID] that should be enforced as checkpoints."
)
(
"flush-state-interval",
boost::program_options::value<uint32_t>(),
"flush shared memory changes to disk every N blocks"
)
(
"read-wait-micro",
boost::program_options::value<uint64_t>(),
"maximum microseconds for trying to get read lock"
)
(
"max-read-wait-retries",
boost::program_options::value<uint32_t>(),
"maximum number of retries to get read lock"
)
(
"write-wait-micro",
boost::program_options::value<uint64_t>(),
"maximum microseconds for trying to get write lock"
)
(
"max-write-wait-retries",
boost::program_options::value<uint32_t>(),
"maximum number of retries to get write lock"
)
(
"single-write-thread",
boost::program_options::value<bool>()->default_value(false),
"push blocks and transactions from one thread"
);
cli.add_options()
(
"replay-blockchain",
boost::program_options::bool_switch()->default_value(false),
"clear chain database and replay all blocks"
)
(
"resync-blockchain",
boost::program_options::bool_switch()->default_value(false),
"clear chain database and block log"
)
(
"check-locks",
boost::program_options::bool_switch()->default_value(false),
"Check correctness of chainbase locking"
)
(
"validate-database-invariants",
boost::program_options::bool_switch()->default_value(false),
"Validate all supply invariants check out"
);
}

void plugin::plugin_initialize(const boost::program_options::variables_map &options) {
Expand All @@ -122,6 +222,24 @@ namespace chain {
}
}

if (options.count("read-wait-micro")) {
my->read_wait_micro = options.at("read-wait-micro").as<uint64_t>();
}

if (options.count("max-read-wait-retries")) {
my->max_read_wait_retries = options.at("max-read-wait-retries").as<uint32_t>();
}

if (options.count("write-wait-micro")) {
my->write_wait_micro = options.at("write-wait-micro").as<uint64_t>();
}

if (options.count("max-write-wait-retries")) {
my->max_write_wait_retries = options.at("max-write-wait-retries").as<uint32_t>();
}

my->single_write_thread = options.at("single-write-thread").as<bool>();

my->shared_memory_size = fc::parse_size(options.at("shared-file-size").as<std::string>());

my->replay = options.at("replay-blockchain").as<bool>();
Expand Down Expand Up @@ -156,6 +274,11 @@ namespace chain {
my->db.add_checkpoints(my->loaded_checkpoints);
my->db.set_require_locking(my->check_locks);

my->db.read_wait_micro(my->read_wait_micro);
my->db.max_read_wait_retries(my->max_read_wait_retries);
my->db.write_wait_micro(my->write_wait_micro);
my->db.max_write_wait_retries(my->max_write_wait_retries);

if (my->replay) {
ilog("Replaying blockchain on user request.");
my->db.reindex(appbase::app().data_dir() / "blockchain", my->shared_memory_dir, my->shared_memory_size);
Expand Down
14 changes: 8 additions & 6 deletions plugins/database_api/api.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -126,13 +126,15 @@ namespace golos {
if (!_follow_api) {
return 0;
}
msg_pack msg;
msg.args = std::vector<fc::variant>({fc::variant(account), fc::variant(1)});
auto reputations = _follow_api->get_account_reputations(msg);
if (reputations.empty()) {
return 0;
auto &rep_idx = database().get_index<follow::reputation_index>().indices().get<follow::by_account>();
auto itr = rep_idx.find(account);
if (rep_idx.end() != itr) {
return itr->reputation;
}
return reputations[0].reputation;
return 0;
}
template<typename T>
Expand Down
8 changes: 4 additions & 4 deletions plugins/network_broadcast_api/network_broadcast_api.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ namespace golos {
const auto max_block_age = args.args->at(1).as<uint32_t>();
FC_ASSERT(!check_max_block_age(max_block_age));
}
pimpl->_chain.db().push_transaction(trx);
pimpl->_chain.accept_transaction(trx);
pimpl->_p2p.broadcast_transaction(trx);

return broadcast_transaction_return();
Expand All @@ -73,7 +73,7 @@ namespace golos {
pimpl->_callback_expirations[trx.expiration].push_back(trx.id());
}

pimpl->_chain.db().push_transaction(trx);
pimpl->_chain.accept_transaction(trx);
pimpl->_p2p.broadcast_transaction(trx);
transfer.complete();

Expand All @@ -84,7 +84,7 @@ namespace golos {
const auto n_args = args.args->size();
FC_ASSERT(n_args == 1, "Expected 1 argument, got 0");
auto block = args.args->at(0).as<signed_block>();
pimpl->_chain.db().push_block(block);
pimpl->_chain.accept_block(block);
pimpl->_p2p.broadcast_block(block);
return broadcast_block_return();
}
Expand Down Expand Up @@ -116,7 +116,7 @@ namespace golos {
}


pimpl->_chain.db().push_transaction(trx);
pimpl->_chain.accept_transaction(trx);
pimpl->_p2p.broadcast_transaction(trx);
transfer.complete();

Expand Down
2 changes: 1 addition & 1 deletion plugins/p2p/p2p_plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ namespace golos {

void p2p_plugin_impl::handle_transaction(const trx_message &trx_msg) {
try {
chain.db().push_transaction(trx_msg.trx);
chain.accept_transaction(trx_msg.trx);
} FC_CAPTURE_AND_RETHROW((trx_msg))
}

Expand Down
14 changes: 8 additions & 6 deletions plugins/social_network/social_network.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -120,13 +120,15 @@ namespace golos {
if (!follow_api_) {
return 0;
}
msg_pack msg;
msg.args = std::vector<fc::variant>({fc::variant(account), fc::variant(1)});
auto reputations = follow_api_->get_account_reputations(msg);
if (reputations.empty()) {
return 0;
auto &rep_idx = database().get_index<follow::reputation_index>().indices().get<follow::by_account>();
auto itr = rep_idx.find(account);
if (rep_idx.end() != itr) {
return itr->reputation;
}
return reputations[0].reputation;
return 0;
}
comment_object::id_type get_parent(const discussion_query &query) const {
Expand Down
Loading

0 comments on commit 8ed290b

Please sign in to comment.