Skip to content

Commit

Permalink
Merge #562: Update rust-jsonrpc and switch to using minreq
Browse files Browse the repository at this point in the history
43a0cf2 test_misc.py: linter (Antoine Poinsot)
c421610 qa: test we retry requests to bitcoind when it's overloaded (Antoine Poinsot)
9007947 bitcoind: use minreq as HTTP transport for JSONRPC (Antoine Poinsot)
e3ee50b Update rust-jsonrpc dependency to latest version (Antoine Poinsot)

Pull request description:

  This makes us take advantage of a more robust, but still lightweight, HTTP implementation.

  This PR also cleanups our error handling code and adds a functional test checking we do retry request on transient bitcoind failures.

ACKs for top commit:
  darosior:
    ACK 43a0cf2 -- let's get it in early and try it as we go.

Tree-SHA512: 8b66663079a77df86ce0fc2f6f60c61b2d3fd50a6d5f8f06b35dcb5a8a942210260e68023e90647aaf8da70625b73d024eda2aba284203c0db99842ed4bb0ceb
  • Loading branch information
darosior committed Jul 13, 2023
2 parents 3251e70 + 43a0cf2 commit ebea147
Show file tree
Hide file tree
Showing 5 changed files with 144 additions and 79 deletions.
34 changes: 15 additions & 19 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ backtrace = "0.3"
rusqlite = { version = "0.27", features = ["bundled", "unlock_notify"] }

# To talk to bitcoind
jsonrpc = "0.12"
jsonrpc = { version = "0.16", features = ["minreq_http"], default-features = false }

# Used for daemonization
libc = { version = "0.2", optional = true }
Expand Down
90 changes: 60 additions & 30 deletions src/bitcoin/d/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ use std::{
use jsonrpc::{
arg,
client::Client,
simple_http::{self, SimpleHttpTransport},
minreq,
minreq_http::{self, MinreqHttpTransport},
};

use miniscript::{
Expand Down Expand Up @@ -74,16 +75,41 @@ impl BitcoindError {

/// Is it a timeout of any kind?
pub fn is_timeout(&self) -> bool {
match self {
BitcoindError::Server(jsonrpc::Error::Transport(ref e)) => {
match e.downcast_ref::<simple_http::Error>() {
Some(simple_http::Error::Timeout) => true,
Some(simple_http::Error::SocketError(e)) => e.kind() == io::ErrorKind::TimedOut,
_ => false,
if let BitcoindError::Server(jsonrpc::Error::Transport(ref e)) = self {
if let Some(minreq_http::Error::Minreq(minreq::Error::IoError(e))) =
e.downcast_ref::<minreq_http::Error>()
{
return e.kind() == io::ErrorKind::TimedOut;
}
}
false
}

/// Is it an error that can be recovered from?
pub fn is_transient(&self) -> bool {
if let BitcoindError::Server(jsonrpc::Error::Transport(ref e)) = self {
if let Some(ref e) = e.downcast_ref::<minreq_http::Error>() {
// Bitcoind is overloaded
if let minreq_http::Error::Http(minreq_http::HttpError { status_code, .. }) = e {
return status_code == &503;
}
// Bitcoind may have been restarted
return matches!(e, minreq_http::Error::Minreq(minreq::Error::IoError(_)));
}
_ => false,
}
false
}

/// Is it an error that has to do with our credentials?
pub fn is_unauthorized(&self) -> bool {
if let BitcoindError::Server(jsonrpc::Error::Transport(ref e)) = self {
if let Some(minreq_http::Error::Http(minreq_http::HttpError { status_code, .. })) =
e.downcast_ref::<minreq_http::Error>()
{
return status_code == &402;
}
}
false
}
}

Expand Down Expand Up @@ -131,8 +157,8 @@ impl From<jsonrpc::error::Error> for BitcoindError {
}
}

impl From<simple_http::Error> for BitcoindError {
fn from(e: simple_http::Error) -> Self {
impl From<minreq_http::Error> for BitcoindError {
fn from(e: minreq_http::Error) -> Self {
jsonrpc::error::Error::Transport(Box::new(e)).into()
}
}
Expand Down Expand Up @@ -204,27 +230,28 @@ impl BitcoinD {
) -> Result<BitcoinD, BitcoindError> {
let cookie_string =
fs::read_to_string(&config.cookie_path).map_err(BitcoindError::CookieFile)?;
let node_url = format!("http://{}", config.addr);
let watchonly_url = format!("http://{}/wallet/{}", config.addr, watchonly_wallet_path);

// Create a dummy bitcoind with clients using a low timeout to sanity check the connection.
let dummy_node_client = Client::with_transport(
SimpleHttpTransport::builder()
.url(&config.addr.to_string())
MinreqHttpTransport::builder()
.url(&node_url)
.map_err(BitcoindError::from)?
.timeout(Duration::from_secs(3))
.cookie_auth(cookie_string.clone())
.build(),
);
let sendonly_client = Client::with_transport(
SimpleHttpTransport::builder()
MinreqHttpTransport::builder()
.url(&watchonly_url)
.map_err(BitcoindError::from)?
.timeout(Duration::from_secs(1))
.cookie_auth(cookie_string.clone())
.build(),
);
let dummy_wo_client = Client::with_transport(
SimpleHttpTransport::builder()
MinreqHttpTransport::builder()
.url(&watchonly_url)
.map_err(BitcoindError::from)?
.timeout(Duration::from_secs(3))
Expand All @@ -242,23 +269,23 @@ impl BitcoinD {

// Now the connection is checked, create the clients with an appropriate timeout.
let node_client = Client::with_transport(
SimpleHttpTransport::builder()
.url(&config.addr.to_string())
MinreqHttpTransport::builder()
.url(&node_url)
.map_err(BitcoindError::from)?
.timeout(Duration::from_secs(RPC_SOCKET_TIMEOUT))
.cookie_auth(cookie_string.clone())
.build(),
);
let sendonly_client = Client::with_transport(
SimpleHttpTransport::builder()
MinreqHttpTransport::builder()
.url(&watchonly_url)
.map_err(BitcoindError::from)?
.timeout(Duration::from_secs(1))
.cookie_auth(cookie_string.clone())
.build(),
);
let watchonly_client = Client::with_transport(
SimpleHttpTransport::builder()
MinreqHttpTransport::builder()
.url(&watchonly_url)
.map_err(BitcoindError::from)?
.timeout(Duration::from_secs(RPC_SOCKET_TIMEOUT))
Expand Down Expand Up @@ -307,20 +334,23 @@ impl BitcoinD {
Ok(res) => return Ok(res),
Err(e) => {
if e.is_warming_up() {
// Always retry when bitcoind is warming up, it'll be available eventually.
std::thread::sleep(Duration::from_secs(1));
error = Some(e)
} else if let BitcoindError::Server(jsonrpc::Error::Transport(ref err)) = e {
match err.downcast_ref::<simple_http::Error>() {
Some(simple_http::Error::Timeout)
| Some(simple_http::Error::SocketError(_))
| Some(simple_http::Error::HttpErrorCode(503)) => {
if i <= self.retries {
std::thread::sleep(Duration::from_secs(1));
log::debug!("Retrying RPC request to bitcoind: attempt #{}", i);
}
error = Some(e);
}
_ => return Err(e),
} else if e.is_unauthorized() {
// FIXME: it should be trivial for us to cache the cookie path and simply
// refresh the credentials when this happens. Unfortunately this means
// making the BitcoinD struct mutable...
log::error!("Denied access to bitcoind. Most likely bitcoind was restarted from under us and the cookie changed.");
return Err(e);
} else if e.is_transient() {
// If we start hitting transient errors retry requests for a limited time.
log::warn!("Transient error when sending request to bitcoind: {}", e);
if i <= self.retries {
std::thread::sleep(Duration::from_secs(1));
log::debug!("Retrying RPC request to bitcoind: attempt #{}", i);
}
error = Some(e);
} else {
return Err(e);
}
Expand Down
56 changes: 32 additions & 24 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -513,10 +513,12 @@ mod tests {
"HTTP/1.1 200\n\r\n{\"jsonrpc\":\"2.0\",\"id\":1,\"result\":[]}\n".as_bytes();

// Read the first echo, respond to it
let (mut stream, _) = server.accept().unwrap();
read_til_json_end(&mut stream);
stream.write_all(echo_resp).unwrap();
stream.flush().unwrap();
{
let (mut stream, _) = server.accept().unwrap();
read_til_json_end(&mut stream);
stream.write_all(echo_resp).unwrap();
stream.flush().unwrap();
}

// Read the second echo, respond to it
let (mut stream, _) = server.accept().unwrap();
Expand Down Expand Up @@ -549,23 +551,27 @@ mod tests {

// Send them responses for the calls involved when creating a fresh wallet
fn complete_wallet_creation(server: &net::TcpListener) {
let net_resp =
["HTTP/1.1 200\n\r\n{\"jsonrpc\":\"2.0\",\"id\":1,\"result\":[]}\n".as_bytes()]
.concat();
let (mut stream, _) = server.accept().unwrap();
read_til_json_end(&mut stream);
stream.write_all(&net_resp).unwrap();
stream.flush().unwrap();
{
let net_resp =
["HTTP/1.1 200\n\r\n{\"jsonrpc\":\"2.0\",\"id\":1,\"result\":[]}\n".as_bytes()]
.concat();
let (mut stream, _) = server.accept().unwrap();
read_til_json_end(&mut stream);
stream.write_all(&net_resp).unwrap();
stream.flush().unwrap();
}

let net_resp = [
{
let net_resp = [
"HTTP/1.1 200\n\r\n{\"jsonrpc\":\"2.0\",\"id\":1,\"result\":{\"name\":\"dummy\"}}\n"
.as_bytes(),
]
.concat();
let (mut stream, _) = server.accept().unwrap();
read_til_json_end(&mut stream);
stream.write_all(&net_resp).unwrap();
stream.flush().unwrap();
]
.concat();
let (mut stream, _) = server.accept().unwrap();
read_til_json_end(&mut stream);
stream.write_all(&net_resp).unwrap();
stream.flush().unwrap();
}

let net_resp = [
"HTTP/1.1 200\n\r\n{\"jsonrpc\":\"2.0\",\"id\":1,\"result\":[{\"success\":true}]}\n"
Expand All @@ -580,12 +586,14 @@ mod tests {

// Send them a dummy result to loadwallet.
fn complete_wallet_loading(server: &net::TcpListener) {
let listwallets_resp =
"HTTP/1.1 200\n\r\n{\"jsonrpc\":\"2.0\",\"id\":1,\"result\":[]}\n".as_bytes();
let (mut stream, _) = server.accept().unwrap();
read_til_json_end(&mut stream);
stream.write_all(listwallets_resp).unwrap();
stream.flush().unwrap();
{
let listwallets_resp =
"HTTP/1.1 200\n\r\n{\"jsonrpc\":\"2.0\",\"id\":1,\"result\":[]}\n".as_bytes();
let (mut stream, _) = server.accept().unwrap();
read_til_json_end(&mut stream);
stream.write_all(listwallets_resp).unwrap();
stream.flush().unwrap();
}

let loadwallet_resp =
"HTTP/1.1 200\n\r\n{\"jsonrpc\":\"2.0\",\"id\":1,\"result\":{\"name\":\"dummy\"}}\n"
Expand Down
41 changes: 36 additions & 5 deletions tests/test_misc.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
from test_framework.serializations import PSBT
from test_framework.utils import wait_for, RpcError, OLD_LIANAD_PATH, LIANAD_PATH

from threading import Thread


def receive_and_send(lianad, bitcoind):
n_coins = len(lianad.rpc.listcoins()["coins"])
Expand All @@ -16,11 +18,13 @@ def receive_and_send(lianad, bitcoind):
wait_for(lambda: len(lianad.rpc.listcoins()["coins"]) == n_coins + 3)

# Create a spend that will create a change output, sign and broadcast it.
outpoints = [next(
c["outpoint"]
for c in lianad.rpc.listcoins()["coins"]
if c["spend_info"] is None
)]
outpoints = [
next(
c["outpoint"]
for c in lianad.rpc.listcoins()["coins"]
if c["spend_info"] is None
)
]
destinations = {
bitcoind.rpc.getnewaddress(): 200_000,
}
Expand Down Expand Up @@ -218,3 +222,30 @@ def test_migration(lianad_multisig, bitcoind):
receive_and_send(lianad, bitcoind)
spend_txs = lianad.rpc.listspendtxs()["spend_txs"]
assert len(spend_txs) == 2 and all(s["updated_at"] is not None for s in spend_txs)


def test_retry_on_workqueue_exceeded(lianad, bitcoind):
"""Make sure we retry requests to bitcoind if it is temporarily overloaded."""
# Start by reducing the work queue to a single slot. Note we need to stop lianad
# as we don't support yet restarting a bitcoind due to the cookie file getting
# overwritten.
lianad.stop()
bitcoind.cmd_line += ["-rpcworkqueue=1", "-rpcthreads=1"]
bitcoind.stop()
bitcoind.start()
lianad.start()

# Stuck the bitcoind RPC server working queue with a command that takes 5 seconds
# to be replied to, and make lianad send it a request. Make sure we detect this is
# a transient HTTP 503 error and we retry the request. Once the 5 seconds are past
# our request succeeds and we get the reply to the lianad RPC command.
t = Thread(target=bitcoind.rpc.waitfornewblock, args=(5_000,))
t.start()
lianad.rpc.getinfo()
lianad.wait_for_logs(
[
"Transient error when sending request to bitcoind.*(status: 503, body: Work queue depth exceeded)",
"Retrying RPC request to bitcoind",
]
)
t.join()

0 comments on commit ebea147

Please sign in to comment.