Skip to content

Commit

Permalink
add some log
Browse files Browse the repository at this point in the history
  • Loading branch information
georgesFoundation committed Aug 5, 2024
1 parent dd915ef commit d611961
Show file tree
Hide file tree
Showing 11 changed files with 300 additions and 29 deletions.
2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,15 @@ bs58 = "0.5"
clap = { version = "4", features = ["cargo"] }
crc = "3"
criterion = { version = "0.4" }
defmt = "0.3"
derive_more = { version = "1.0.0-beta", default-features = false }
embedded-io = "0.6"
embedded-io-async = "0.6"
faster-hex = { version = "0.9", default-features = false }
heapless = { version = "0.8", default-features = false }
itertools = { version = "0.10", default-features = false }
libfuzzer-sys = "0.4"
log = { version = "0.4" }
minicbor = { version = "0.24", features = ["derive"] }
nom = { version = "7", default-features = false }
phf = { version = "0.11", features = ["macros"], default-features = false }
Expand Down
8 changes: 8 additions & 0 deletions stratum-v1/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,26 @@ license = "GPL-3.0-or-later AND GPL-3.0-only"
[dependencies]
bitcoin = { version = "0.32", default-features = false }
bitcoin_hashes = { workspace = true }
defmt = { workspace = true, optional = true }
derive_more = { workspace = true, features = ["from"] }
embedded-io = { workspace = true }
embedded-io-async = { workspace = true }
faster-hex = { version = "0.9", default-features = false, git = "https://github.com/Georges760/faster-hex.git", branch = "no-alloc-error-partialeq" }
heapless = { workspace = true, features = ["serde"] }
log = { workspace = true, optional = true }
serde = { workspace = true }
serde-json-core = { workspace = true, features = ["custom-error-messages"] }

[dev-dependencies]
tokio = { version = "1", features = ["full"] }
embedded-io-adapters = { version = "0.6", features = ["tokio-1"] }
env_logger = "0.10.1"
log = { workspace = true }

[features]
default = []
defmt = ["dep:defmt", "heapless/defmt-03"]
log = ["dep:log"]

[[example]]
name = "tokio-cli"
Expand Down
2 changes: 1 addition & 1 deletion stratum-v1/examples/tokio-cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
nonce: 0,
version_bits: None,
};
client_tx.send_submit(fake_share, 1000.0).await.unwrap();
client_tx.send_submit(fake_share, 1000000.0).await.unwrap();
}
}

Expand Down
6 changes: 5 additions & 1 deletion stratum-v1/src/client/job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,18 @@ use bitcoin::{
};
use heapless::{String, Vec};

#[derive(Debug)]
// #[cfg_attr(feature = "defmt", derive(defmt::Format))]
pub struct Job {
pub job_id: u64,
pub extranonce2: Vec<u8, 8>,
pub version_bits: i32,
pub header: Header,
}
//TODO: implement defmt::Format manually because Header does not implement it

#[derive(Debug, Default)]
#[cfg_attr(feature = "defmt", derive(defmt::Format))]
pub(crate) struct JobCreator {
job_id: u64,
last_job_id: String<32>,
Expand Down Expand Up @@ -116,7 +120,7 @@ impl JobCreator {
header: Header {
version: Version::from_consensus(rolled_version),
prev_blockhash: BlockHash::from_byte_array(work.prev_hash),
merkle_root: TxMerkleNode::from_byte_array(self.merkle_root(&work)?),
merkle_root: TxMerkleNode::from_byte_array(self.merkle_root(work)?),
time: work.ntime,
bits: CompactTarget::from_consensus(work.nbits),
nonce: 0,
Expand Down
56 changes: 32 additions & 24 deletions stratum-v1/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,15 @@ use heapless::{
FnvIndexMap, HistoryBuffer, String, Vec,
};

#[derive(Debug)]
#[cfg_attr(feature = "defmt", derive(defmt::Format))]
pub struct Client<R: Read, W: Write, const RX_BUF_SIZE: usize, const TX_BUF_SIZE: usize> {
phantom_read: core::marker::PhantomData<R>,
phantom_write: core::marker::PhantomData<W>,
}

// #[derive(Debug)]
// #[cfg_attr(feature = "defmt", derive(defmt::Format))]
pub struct ClientRx<R: Read, const BUF_SIZE: usize> {
network_reader: R,
buf: [u8; BUF_SIZE],
Expand All @@ -42,6 +46,8 @@ pub struct ClientRx<R: Read, const BUF_SIZE: usize> {
work_queue_prod: Producer<'static, Work, 2>,
}

// #[derive(Debug, PartialEq)]
// #[cfg_attr(feature = "defmt", derive(defmt::Format))]
pub struct ClientTx<W: Write, const BUF_SIZE: usize> {
network_writer: W,
buf: [u8; BUF_SIZE],
Expand All @@ -63,7 +69,6 @@ impl<R: Read, W: Write, const RX_BUF_SIZE: usize, const TX_BUF_SIZE: usize>
network_reader: R,
network_writer: W,
vers_mask_queue_prod: Producer<'static, u32, 2>,

work_queue_prod: Producer<'static, Work, 2>, // TODO: transform into local Job queue consumed by ClientTx
) -> (ClientRx<R, RX_BUF_SIZE>, ClientTx<W, TX_BUF_SIZE>) {
let req_queue: &'static mut Queue<ReqIdKind, 32> = {
Expand Down Expand Up @@ -124,17 +129,18 @@ impl<R: Read, const RX_BUF_SIZE: usize> ClientRx<R, RX_BUF_SIZE> {

pub async fn run(&mut self) -> Result<()> {
while let Some(req) = self.req_queue_cons.dequeue() {
// println!("dequeue: {:?}", req.clone());
debug!("ClientRx::run dequeue: {:?}", req.clone());
self.reqs.insert(req.0, req.1).map_err(|_| Error::MapFull)?;
}
// TODO: maybe add some garbage collection here to remove old reqs never responded by Pool
while let Some(i) = self.buf[..self.pos].iter().position(|&c| c == b'\n') {
let line = &self.buf[..i];
// println!(
// "pos: {}; i: {i}; line: {:?}",
// self.pos,
// std::str::from_utf8(line)
// );
debug!(
"pos: {}; i: {}; line: {:?}",
self.pos,
i,
core::str::from_utf8(line)
);
if let Some(id) = response::parse_id(line)? {
// it's a Response
match self.reqs.get(&id) {
Expand All @@ -144,7 +150,7 @@ impl<R: Read, const RX_BUF_SIZE: usize> ClientRx<R, RX_BUF_SIZE> {
.enqueue(ReqKind::Configure)
.map_err(|_| Error::QueueFull)?;
self.reqs.remove(&id);
// println!("enqueue: {:?}, reqs: {:?}", ReqKind::Configure, self.reqs);
debug!("enqueue: {:?}, reqs: {:?}", ReqKind::Configure, self.reqs);
}
Some(ReqKind::Connect) => {
let conn = response::parse_connect(line)?;
Expand All @@ -155,15 +161,15 @@ impl<R: Read, const RX_BUF_SIZE: usize> ClientRx<R, RX_BUF_SIZE> {
.enqueue(ReqKind::Connect)
.map_err(|_| Error::QueueFull)?;
self.reqs.remove(&id);
// println!("enqueue: {:?}, reqs: {:?}", ReqKind::Connect, self.reqs);
debug!("enqueue: {:?}, reqs: {:?}", ReqKind::Connect, self.reqs);
}
Some(ReqKind::Authorize) => {
if response::parse_authorize(line)? {
self.state_queue_prod
.enqueue(ReqKind::Authorize)
.map_err(|_| Error::QueueFull)?;
self.reqs.remove(&id);
// println!("enqueue: {:?}, reqs: {:?}", ReqKind::Authorize, self.reqs);
debug!("enqueue: {:?}, reqs: {:?}", ReqKind::Authorize, self.reqs);
}
}
Some(ReqKind::Submit) => {
Expand All @@ -177,7 +183,7 @@ impl<R: Read, const RX_BUF_SIZE: usize> ClientRx<R, RX_BUF_SIZE> {
Err(e) => return Err(e),
}
self.reqs.remove(&id);
// println!("rx sumbit response, reqs: {:?}", self.reqs);
debug!("rx sumbit response, reqs: {:?}", self.reqs);
}
None => return Err(Error::IdNotFound(id)),
}
Expand All @@ -202,9 +208,9 @@ impl<R: Read, const RX_BUF_SIZE: usize> ClientRx<R, RX_BUF_SIZE> {
// while !self.jobs.is_full() {
self.jobs.write(self.job_creator.roll(&work)?);
// }
if work.clean_jobs {
todo!("inform app to immediately change job")
}
// if work.clean_jobs {
// todo!("inform app to immediately change job")
// }
self.work_queue_prod
.enqueue(work)
.map_err(|_| Error::QueueFull)?;
Expand All @@ -221,11 +227,12 @@ impl<R: Read, const RX_BUF_SIZE: usize> ClientRx<R, RX_BUF_SIZE> {
.read(self.buf[self.pos..].as_mut())
.await
.map_err(|_| Error::NetworkError)?;
// println!(
// "read {n} bytes, pos {}: {:?}",
// self.pos,
// std::str::from_utf8(&self.buf[self.pos..self.pos + n])
// );
trace!(
"read {} bytes, pos {}: {:?}",
n,
self.pos,
core::str::from_utf8(&self.buf[self.pos..self.pos + n])
);
self.pos += n;
Ok(())
}
Expand All @@ -234,26 +241,27 @@ impl<R: Read, const RX_BUF_SIZE: usize> ClientRx<R, RX_BUF_SIZE> {
impl<T: Write, const TX_BUF_SIZE: usize> ClientTx<T, TX_BUF_SIZE> {
fn check_queues(&mut self) {
if let Some(state) = self.state_queue_cons.dequeue() {
// println!("dequeue req: {:?}", state);
debug!("dequeue req: {:?}", state);
match state {
ReqKind::Configure => self.configured = true,
ReqKind::Connect => self.connected = true,
ReqKind::Authorize => self.authorized = true,
_ => todo!("add some log here"),
_ => unreachable!("unknown state: {:?}", state),
}
}
if let Some(diff) = self.diff_queue_cons.dequeue() {
// println!("dequeue diff: {:?}", diff);
debug!("dequeue diff: {:?}", diff);
self.pool_target_difficulty = diff;
}
}

fn prepare_req(&mut self, req_kind: ReqKind) -> Result<()> {
self.req_id += 1;
let req_id_kind = ReqIdKind(self.req_id, req_kind);
self.req_queue_prod
.enqueue(ReqIdKind(self.req_id, req_kind))
.enqueue(req_id_kind.clone())
.map_err(|_| Error::QueueFull)?;
// println!("enqueue: {:?}", ReqIdKind(self.req_id, req_kind));
debug!("enqueue: {:?}", req_id_kind);
Ok(())
}

Expand Down
6 changes: 5 additions & 1 deletion stratum-v1/src/client/notification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use serde::Deserialize;
use super::request::Request;

#[derive(Debug, PartialEq)]
#[cfg_attr(feature = "defmt", derive(defmt::Format))]
pub struct Work {
pub job_id: String<32>,
pub prev_hash: [u8; 32],
Expand All @@ -22,14 +23,16 @@ pub struct Work {
}

#[derive(Debug, PartialEq)]
#[cfg_attr(feature = "defmt", derive(defmt::Format))]
pub enum Notification {
SetVersionMask,
Notify,
SetDifficulty,
}

pub(crate) fn parse_method(resp: &[u8]) -> Result<Notification> {
#[derive(Deserialize)]
#[derive(Debug, Deserialize)]
#[cfg_attr(feature = "defmt", derive(defmt::Format))]
struct MethodOnly {
method: String<32>,
}
Expand Down Expand Up @@ -62,6 +65,7 @@ pub(crate) fn parse_set_version_mask(resp: &[u8]) -> Result<u32> {

pub(crate) fn parse_notify(resp: &[u8]) -> Result<Work> {
#[derive(Debug, Deserialize)]
#[cfg_attr(feature = "defmt", derive(defmt::Format))]
struct WorkRaw(
// Job ID. This is included when miners submit a results so work can be matched with proper transactions.
String<32>,
Expand Down
10 changes: 10 additions & 0 deletions stratum-v1/src/client/request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use heapless::{String, Vec};
use serde::{Deserialize, Serialize};

#[derive(Debug, Clone)]
#[cfg_attr(feature = "defmt", derive(defmt::Format))]
pub(crate) enum ReqKind {
Configure,
Connect,
Expand All @@ -15,6 +16,7 @@ pub(crate) enum ReqKind {
}

#[derive(Debug, Clone)]
#[cfg_attr(feature = "defmt", derive(defmt::Format))]
pub(crate) struct ReqIdKind(pub(crate) u64, pub(crate) ReqKind);

///Request representation.
Expand All @@ -29,6 +31,7 @@ pub(crate) struct ReqIdKind(pub(crate) u64, pub(crate) ReqKind);
///- `T` - specifies textual type. By default it uses static buffer of 32 bytes, which is more than enough in normal cases.
#[derive(Clone, Debug, PartialEq, Deserialize, Serialize)]
#[serde(deny_unknown_fields)]
#[cfg_attr(feature = "defmt", derive(defmt::Format))]
pub struct Request<P> {
#[serde(skip_serializing_if = "Option::is_none")]
///An identifier established by the Client.
Expand All @@ -46,6 +49,7 @@ pub struct Request<P> {
}

#[derive(Debug, PartialEq)]
#[cfg_attr(feature = "defmt", derive(defmt::Format))]
pub struct VersionRolling {
/// Bits set to 1 can be changed by the miner.
/// If a miner changes bits with mask value 0, the server will reject the submit.
Expand All @@ -55,6 +59,7 @@ pub struct VersionRolling {
}

#[derive(Debug, PartialEq)]
#[cfg_attr(feature = "defmt", derive(defmt::Format))]
pub struct Info {
/// Exact URL used by the mining software to connect to the stratum server.
pub connection_url: Option<String<32>>,
Expand All @@ -67,6 +72,7 @@ pub struct Info {
}

#[derive(Debug, PartialEq)]
#[cfg_attr(feature = "defmt", derive(defmt::Format))]
pub struct Extensions {
/// This extension allows the miner to change the value of some bits in the version field
/// in the block header. Currently there are no standard bits used for version rolling
Expand All @@ -89,6 +95,7 @@ pub(crate) fn configure(id: u64, exts: Extensions, buf: &mut [u8]) -> Result<usi
type ExtList = Vec<String<32>, 4>;

#[derive(Debug, Serialize)]
#[cfg_attr(feature = "defmt", derive(defmt::Format))]
struct ExtParams {
#[serde(skip_serializing_if = "Option::is_none")]
#[serde(rename = "version-rolling.mask")]
Expand Down Expand Up @@ -120,6 +127,7 @@ pub(crate) fn configure(id: u64, exts: Extensions, buf: &mut [u8]) -> Result<usi
}

#[derive(Debug, Serialize)]
#[cfg_attr(feature = "defmt", derive(defmt::Format))]
struct ConfigureParams(ExtList, ExtParams);

let mut ext_list = Vec::new();
Expand Down Expand Up @@ -214,6 +222,8 @@ pub(crate) fn authorize(
serde_json_core::to_slice(&req, buf).map_err(|_| Error::JsonBufferFull)
}

#[derive(Debug, PartialEq)]
#[cfg_attr(feature = "defmt", derive(defmt::Format))]
pub struct Share {
pub job_id: String<64>,
pub extranonce2: Vec<u8, 8>,
Expand Down
8 changes: 7 additions & 1 deletion stratum-v1/src/client/response.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ use heapless::{String, Vec};
use serde::{Deserialize, Deserializer};

pub(crate) fn parse_id(resp: &[u8]) -> Result<Option<u64>> {
#[derive(Deserialize)]
#[derive(Debug, Deserialize)]
#[cfg_attr(feature = "defmt", derive(defmt::Format))]
struct IdOnly {
id: Option<u64>,
}
Expand All @@ -32,6 +33,7 @@ pub(crate) fn parse_id(resp: &[u8]) -> Result<Option<u64>> {
///
///- `R` - Type of payload for successful response
#[derive(Clone, Debug, PartialEq)]
#[cfg_attr(feature = "defmt", derive(defmt::Format))]
pub struct Response<R> {
///An identifier established by the Client.
///
Expand All @@ -50,6 +52,7 @@ impl<'de, R: Deserialize<'de>> Deserialize<'de> for Response<R> {
use serde::de::{self, Visitor};

#[derive(Debug, Deserialize)]
#[cfg_attr(feature = "defmt", derive(defmt::Format))]
struct RespErr(isize, String<32>, Option<String<32>>);

impl From<RespErr> for Error {
Expand Down Expand Up @@ -180,6 +183,7 @@ impl<'de, R: Deserialize<'de>> Deserialize<'de> for Response<R> {

pub(crate) fn parse_configure(resp: &[u8]) -> Result<Extensions> {
#[derive(Debug, Deserialize)]
#[cfg_attr(feature = "defmt", derive(defmt::Format))]
pub struct ConfigureRespRaw {
#[serde(skip_serializing_if = "Option::is_none")]
#[serde(rename = "version-rolling")]
Expand Down Expand Up @@ -279,6 +283,7 @@ pub(crate) fn parse_configure(resp: &[u8]) -> Result<Extensions> {
pub type Subscription = Vec<String<32>, 2>;

#[derive(Debug, PartialEq)]
#[cfg_attr(feature = "defmt", derive(defmt::Format))]
pub struct ConnectResp {
pub subscriptions: Vec<Subscription, 2>,
pub extranonce1: Vec<u8, 8>,
Expand All @@ -287,6 +292,7 @@ pub struct ConnectResp {

pub(crate) fn parse_connect(resp: &[u8]) -> Result<ConnectResp> {
#[derive(Debug, Deserialize)]
#[cfg_attr(feature = "defmt", derive(defmt::Format))]
struct ConnectRespRaw(
// Subscriptions details - 2-tuple with name of subscribed notification and subscription ID. Theoretically it may be used for unsubscribing, but obviously miners won't use it.
Vec<Vec<String<32>, 2>, 2>,
Expand Down
Loading

0 comments on commit d611961

Please sign in to comment.