diff --git a/stratum-v1/Cargo.toml b/stratum-v1/Cargo.toml index ac37f5f..c735406 100644 --- a/stratum-v1/Cargo.toml +++ b/stratum-v1/Cargo.toml @@ -25,7 +25,6 @@ serde = { workspace = true } serde-json-core = { features = ["custom-error-messages"], git = "https://github.com/rust-embedded-community/serde-json-core.git", branch = "master" } [features] -default = [] defmt-03 = [ "dep:defmt", "embedded-io-async/defmt-03", @@ -38,6 +37,7 @@ defmt-03 = [ embedded-io = { workspace = true, features = ["std"] } env_logger = "0.11" inquire = "0.7" +log = { workspace = true } tokio = { version = "1", features = ["full"] } [[example]] diff --git a/stratum-v1/examples/tokio-cli.rs b/stratum-v1/examples/tokio-cli.rs index 2fc0e73..8e99355 100644 --- a/stratum-v1/examples/tokio-cli.rs +++ b/stratum-v1/examples/tokio-cli.rs @@ -3,18 +3,20 @@ #![allow(static_mut_refs)] -use stratum_v1::{Client, Extensions, Share, VersionRolling, Work}; +use stratum_v1::{Client, Extensions, Message, Share, VersionRolling}; -use heapless::{spsc::Queue, String, Vec}; +use heapless::{String, Vec}; use inquire::Select; +use log::{debug, error}; use std::{ net::{Ipv4Addr, SocketAddr}, str::FromStr, + sync::Arc, time::Duration, }; use tokio::{ - io::{ReadHalf, WriteHalf}, net::TcpStream, + sync::{watch, Mutex}, }; #[tokio::main] @@ -31,95 +33,117 @@ async fn main() -> Result<(), Box> { }; let stream = TcpStream::connect(addr).await?; - let (stream_reader, stream_writer) = tokio::io::split(stream); - let conn_reader = adapter::FromTokioRead::>::new(stream_reader); - let conn_writer = adapter::FromTokioWrite::>::new(stream_writer); + let conn = adapter::FromTokio::::new(stream); - let vers_mask_queue: &'static mut Queue = { - static mut Q: Queue = Queue::new(); - unsafe { &mut Q } - }; - let (vers_mask_queue_prod, mut vers_mask_queue_cons) = vers_mask_queue.split(); - tokio::spawn(async move { - loop { - if let Some(mask) = vers_mask_queue_cons.dequeue() { - println!("new version mask from Pool: {:x}", mask); - } - } - }); + let mut client = Client::<_, 1480, 512>::new(conn); + client.enable_software_rolling(true, false, false); - let work_queue: &'static mut Queue = { - static mut Q: Queue = Queue::new(); - unsafe { &mut Q } - }; - let (work_queue_prod, mut work_queue_cons) = work_queue.split(); - tokio::spawn(async move { - loop { - if let Some(work) = work_queue_cons.dequeue() { - println!("new work from Pool: {:?}", work); - } - } - }); + let client_tx = Arc::new(Mutex::new(client)); + let client_rx = Arc::clone(&client_tx); - let (mut client_rx, mut client_tx) = Client::<_, _, 1480, 512>::new_rx_tx( - conn_reader, - conn_writer, - vers_mask_queue_prod, - work_queue_prod, - ); + let (authorized_tx, mut authorized_rx) = watch::channel(false); tokio::spawn(async move { - client_rx.software_rolling(true, false); loop { - if let Err(e) = client_rx.run().await { - println!("client_rx error: {:?}", e); + let mut c = client_rx.lock().await; + match c.receive_message().await { + Ok(msg) => match msg { + Message::Configured => { + c.send_connect(Some(String::<32>::from_str("demo").unwrap())) + .await + .unwrap(); + } + Message::Connected => { + c.send_authorize( + match pool { + "Public-Pool" => String::<64>::from_str( + "1HLQGxzAQWnLore3fWHc2W8UP1CgMv1GKQ.miner1", + ) + .unwrap(), + "Braiins" => String::<64>::from_str("slush.miner1").unwrap(), + _ => unreachable!(), + }, + String::<64>::from_str("x").unwrap(), + ) + .await + .unwrap(); + } + Message::Authorized => { + authorized_tx.send(true).unwrap(); + } + Message::Share { + accepted: _, + rejected: _, + } => { + // TODO update the display if any + } + Message::VersionMask(_mask) => { + // TODO use mask for hardware version rolling is available + } + Message::Difficulty(_diff) => { + // TODO use diff to filter ASIC reported hits + } + Message::CleanJobs => { + // TODO clean the job queue and immediately start hashing a new job + } + Message::Other => { + debug!("Received Other Message"); + } + }, + Err(e) => { + error!("Client receive_message error: {:?}", e); + } } } }); - - let exts = Extensions { - version_rolling: Some(VersionRolling { - mask: Some(0x1fffe000), - min_bit_count: Some(10), - }), - minimum_difficulty: None, - subscribe_extranonce: None, - info: None, - }; - client_tx.send_configure(exts).await.unwrap(); - tokio::time::sleep(Duration::from_millis(1000)).await; - client_tx - .send_connect(Some(String::<32>::from_str("slush").unwrap())) - .await - .unwrap(); - tokio::time::sleep(Duration::from_millis(1000)).await; - client_tx - .send_authorize( - match pool { - "Public-Pool" => { - String::<64>::from_str("1HLQGxzAQWnLore3fWHc2W8UP1CgMv1GKQ.miner1").unwrap() - } - "Braiins" => String::<64>::from_str("slush.miner1").unwrap(), - _ => unreachable!(), - }, - String::<64>::from_str("x").unwrap(), - ) - .await - .unwrap(); + { + let mut c = client_tx.lock().await; + let exts = Extensions { + version_rolling: Some(VersionRolling { + mask: Some(0x1fffe000), + min_bit_count: Some(10), + }), + minimum_difficulty: None, + subscribe_extranonce: None, + info: None, + }; + c.send_configure(exts).await.unwrap(); + } + authorized_rx.changed().await.unwrap(); loop { + // TODO: use client.roll_job() to get a new job at the rate the hardware need it tokio::time::sleep(Duration::from_millis(5000)).await; - let mut extranonce2 = Vec::new(); - extranonce2.resize(4, 0).unwrap(); - extranonce2[3] = 0x01; - let fake_share = Share { - job_id: String::<64>::from_str("01").unwrap(), - extranonce2, - ntime: 1722789905, - nonce: 0, - version_bits: None, - }; - client_tx.send_submit(fake_share, 1000000.0).await.unwrap(); + { + let mut c = client_tx.lock().await; + let mut extranonce2 = Vec::new(); + extranonce2.resize(4, 0).unwrap(); + extranonce2[3] = 0x01; + let fake_share = Share { + job_id: String::<64>::from_str("01").unwrap(), // TODO will come from the Job + extranonce2, // TODO will come from the Job + ntime: 1722789905, // TODO will come from the Job + nonce: 0, // TODO will come from the ASIC hit + version_bits: None, // TODO will come from the ASIC hit if hardware version rolling is enabled + }; + c.send_submit(fake_share).await.unwrap(); + } + } +} + +trait Readable { + fn poll_read_ready( + &self, + cx: &mut core::task::Context<'_>, + ) -> core::task::Poll>; +} + +impl Readable for TcpStream { + fn poll_read_ready( + &self, + cx: &mut core::task::Context<'_>, + ) -> core::task::Poll> { + self.poll_read_ready(cx) } } @@ -130,11 +154,11 @@ mod adapter { /// Adapter from `tokio::io` traits. #[derive(Clone)] - pub struct FromTokioRead { + pub struct FromTokio { inner: T, } - impl FromTokioRead { + impl FromTokio { /// Create a new adapter. pub fn new(inner: T) -> Self { Self { inner } @@ -146,7 +170,7 @@ mod adapter { } } - impl FromTokioRead { + impl FromTokio { /// Borrow the inner object. pub fn inner(&self) -> &T { &self.inner @@ -158,11 +182,11 @@ mod adapter { } } - impl embedded_io::ErrorType for FromTokioRead { + impl embedded_io::ErrorType for FromTokio { type Error = std::io::Error; } - impl embedded_io_async::Read for FromTokioRead { + impl embedded_io_async::Read for FromTokio { async fn read(&mut self, buf: &mut [u8]) -> Result { // The current tokio implementation (https://github.com/tokio-rs/tokio/blob/tokio-1.33.0/tokio/src/io/poll_evented.rs#L165) // does not consider the case of buf.is_empty() as a special case, @@ -186,40 +210,21 @@ mod adapter { } } - #[derive(Clone)] - pub struct FromTokioWrite { - inner: T, - } - - impl FromTokioWrite { - /// Create a new adapter. - pub fn new(inner: T) -> Self { - Self { inner } - } - - /// Consume the adapter, returning the inner object. - pub fn into_inner(self) -> T { - self.inner - } - } - - impl FromTokioWrite { - /// Borrow the inner object. - pub fn inner(&self) -> &T { - &self.inner - } - - /// Mutably borrow the inner object. - pub fn inner_mut(&mut self) -> &mut T { - &mut self.inner + impl embedded_io_async::ReadReady for FromTokio { + fn read_ready(&mut self) -> Result { + // TODO: This crash at runtime : + // Cannot start a runtime from within a runtime. This happens because a function (like `block_on`) + // attempted to block the current thread while the thread is being used to drive asynchronous tasks. + tokio::runtime::Handle::current().block_on(poll_fn(|cx| { + match Pin::new(&mut self.inner).poll_read_ready(cx) { + Poll::Ready(_) => Poll::Ready(Ok(true)), + Poll::Pending => Poll::Ready(Ok(false)), + } + })) } } - impl embedded_io::ErrorType for FromTokioWrite { - type Error = std::io::Error; - } - - impl embedded_io_async::Write for FromTokioWrite { + impl embedded_io_async::Write for FromTokio { async fn write(&mut self, buf: &[u8]) -> Result { match poll_fn(|cx| Pin::new(&mut self.inner).poll_write(cx, buf)).await { Ok(0) if !buf.is_empty() => Err(std::io::ErrorKind::WriteZero.into()), diff --git a/stratum-v1/src/client/job.rs b/stratum-v1/src/client/job.rs index dddb8c4..ffe41bd 100644 --- a/stratum-v1/src/client/job.rs +++ b/stratum-v1/src/client/job.rs @@ -1,7 +1,8 @@ // SPDX-FileCopyrightText: © 2024 Foundation Devices, Inc. // SPDX-License-Identifier: GPL-3.0-or-later -use crate::{Error, Result, Work}; +use super::notification::Work; +use crate::{Error, Result}; use bitcoin::{ block::{Header, Version}, @@ -9,7 +10,7 @@ use bitcoin::{ hashes::{sha256d::Hash as DHash, Hash}, CompactTarget, }; -use heapless::{String, Vec}; +use heapless::Vec; #[derive(Debug)] pub struct Job { @@ -42,7 +43,7 @@ impl defmt::Format for Job { #[cfg_attr(feature = "defmt-03", derive(defmt::Format))] pub(crate) struct JobCreator { job_id: u64, - last_job_id: String<32>, + last_work: Option, version_mask: i32, pub(crate) version_rolling: bool, version_bits: u16, @@ -50,24 +51,16 @@ pub(crate) struct JobCreator { extranonce2_size: usize, pub(crate) extranonce2_rolling: bool, extranonce2: Vec, + pub(crate) ntime_rolling: bool, + ntime_bits: u32, } impl JobCreator { - // pub fn new() -> Self { - // Self { - // version_mask: 0, - // version_bits: 0, - // extranonce1: Vec::new(), - // extranonce2: Vec::new(), - // extranonce2_size: 0, - // } - // } - - pub fn set_version_mask(&mut self, mask: u32) { + pub(crate) fn set_version_mask(&mut self, mask: u32) { self.version_mask = mask as i32; } - pub fn set_extranonces( + pub(crate) fn set_extranonces( &mut self, extranonce1: Vec, extranonce2_size: usize, @@ -79,6 +72,16 @@ impl JobCreator { .map_err(|_| Error::VecFull) } + pub(crate) fn set_work(&mut self, work: Work) -> Result<()> { + self.last_work = Some(work); + self.version_bits = 0; + self.extranonce2 + .resize_default(self.extranonce2_size) + .map_err(|_| Error::VecFull)?; + self.extranonce2.fill(0); + Ok(()) + } + fn merkle_root(&self, work: &Work) -> Result<[u8; 32]> { let mut coinbase = Vec::::new(); coinbase @@ -104,20 +107,13 @@ impl JobCreator { Ok(merkle_root) } - pub fn roll(&mut self, work: &Work) -> Result { - if self.last_job_id != work.job_id { - self.version_bits = 0; - self.extranonce2 - .resize_default(self.extranonce2_size) - .map_err(|_| Error::VecFull)?; - self.extranonce2.fill(0); - self.last_job_id = work.job_id.clone(); - } + pub(crate) fn roll(&mut self) -> Result { + let work = self.last_work.as_ref().ok_or(Error::NoWork)?; let rolled_version = if self.version_rolling { self.version_bits = self.version_bits.wrapping_add(1); (work.version & !self.version_mask) | (((self.version_bits as i32) << self.version_mask.trailing_zeros()) - & self.version_mask) + & self.version_mask) // TODO: test this } else { work.version }; @@ -132,6 +128,12 @@ impl JobCreator { } } } + let rolled_ntime = if self.ntime_rolling { + self.ntime_bits = self.ntime_bits.wrapping_add(1); + work.ntime + self.ntime_bits + } else { + work.ntime + }; self.job_id += self.job_id.wrapping_add(1); Ok(Job { job_id: self.job_id, @@ -141,7 +143,7 @@ impl JobCreator { version: Version::from_consensus(rolled_version), prev_blockhash: BlockHash::from_byte_array(work.prev_hash), merkle_root: TxMerkleNode::from_byte_array(self.merkle_root(work)?), - time: work.ntime, + time: rolled_ntime, bits: CompactTarget::from_consensus(work.nbits), nonce: 0, }, diff --git a/stratum-v1/src/client/mod.rs b/stratum-v1/src/client/mod.rs index 19f1028..1a65460 100644 --- a/stratum-v1/src/client/mod.rs +++ b/stratum-v1/src/client/mod.rs @@ -7,183 +7,157 @@ mod request; mod response; use crate::{Error, Result}; -use job::{Job, JobCreator}; -pub use notification::{Notification, Work}; +pub use job::Job; +use job::JobCreator; +use notification::Notification; +use request::ReqKind; pub use request::{Extensions, Info, Share, VersionRolling}; -use request::{ReqIdKind, ReqKind}; use response::Subscription; -use embedded_io_async::{Read, Write}; -use heapless::{ - spsc::{Consumer, Producer, Queue}, - FnvIndexMap, HistoryBuffer, String, Vec, -}; +use embedded_io_async::{Read, ReadReady, Write}; +use heapless::{FnvIndexMap, String, Vec}; #[derive(Debug)] -#[cfg_attr(feature = "defmt-03", derive(defmt::Format))] -pub struct Client { - phantom_read: core::marker::PhantomData, - phantom_write: core::marker::PhantomData, -} - -// #[derive(Debug)] // #[cfg_attr(feature = "defmt-03", derive(defmt::Format))] -pub struct ClientRx { - network_reader: R, - buf: [u8; BUF_SIZE], - pos: usize, +pub struct Client { + network_conn: C, + rx_buf: [u8; RX_BUF_SIZE], + rx_free_pos: usize, + tx_buf: [u8; TX_BUF_SIZE], reqs: FnvIndexMap, job_creator: JobCreator, - jobs: HistoryBuffer, configuration: Option, subscriptions: Vec, shares_accepted: u64, shares_rejected: u64, - req_queue_cons: Consumer<'static, ReqIdKind, 32>, - state_queue_prod: Producer<'static, ReqKind, 2>, - vers_mask_queue_prod: Producer<'static, u32, 2>, - diff_queue_prod: Producer<'static, f64, 2>, - work_queue_prod: Producer<'static, Work, 2>, -} - -// #[derive(Debug, PartialEq)] -// #[cfg_attr(feature = "defmt-03", derive(defmt::Format))] -pub struct ClientTx { - network_writer: W, - buf: [u8; BUF_SIZE], req_id: u64, - configured: bool, connected: bool, authorized: bool, - pool_target_difficulty: f64, user: String<64>, - req_queue_prod: Producer<'static, ReqIdKind, 32>, - state_queue_cons: Consumer<'static, ReqKind, 2>, - diff_queue_cons: Consumer<'static, f64, 2>, } -impl - Client +#[derive(Debug, PartialEq)] +#[cfg_attr(feature = "defmt-03", derive(defmt::Format))] +pub enum Message { + Configured, + Connected, + Authorized, + Share { accepted: u64, rejected: u64 }, + VersionMask(u32), + Difficulty(f64), + CleanJobs, + Other, +} + +impl + Client { - pub fn new_rx_tx( - network_reader: R, - network_writer: W, - vers_mask_queue_prod: Producer<'static, u32, 2>, - work_queue_prod: Producer<'static, Work, 2>, // TODO: transform into local Job queue consumed by ClientTx - ) -> (ClientRx, ClientTx) { - let req_queue: &'static mut Queue = { - static mut Q: Queue = Queue::new(); - unsafe { &mut Q } - }; - let (req_queue_prod, req_queue_cons) = req_queue.split(); - let state_queue: &'static mut Queue = { - static mut Q: Queue = Queue::new(); - unsafe { &mut Q } - }; - let (state_queue_prod, state_queue_cons) = state_queue.split(); - let diff_queue: &'static mut Queue = { - static mut Q: Queue = Queue::new(); - unsafe { &mut Q } - }; - let (diff_queue_prod, diff_queue_cons) = diff_queue.split(); - ( - ClientRx { - network_reader, - buf: [0; RX_BUF_SIZE], - pos: 0, - reqs: FnvIndexMap::new(), - job_creator: JobCreator::default(), - jobs: HistoryBuffer::new(), - configuration: None, - subscriptions: Vec::new(), - shares_accepted: 0, - shares_rejected: 0, - req_queue_cons, - state_queue_prod, - vers_mask_queue_prod, - diff_queue_prod, - work_queue_prod, - }, - ClientTx { - network_writer, - buf: [0; TX_BUF_SIZE], - req_id: 0, - configured: false, - connected: false, - authorized: false, - pool_target_difficulty: 0.0, - user: String::new(), - req_queue_prod, - state_queue_cons, - diff_queue_cons, - }, - ) + pub fn new(network_conn: C) -> Client { + Client { + network_conn, + rx_buf: [0; RX_BUF_SIZE], + rx_free_pos: 0, + tx_buf: [0; TX_BUF_SIZE], + reqs: FnvIndexMap::new(), + job_creator: JobCreator::default(), + configuration: None, + subscriptions: Vec::new(), + shares_accepted: 0, + shares_rejected: 0, + req_id: 0, + connected: false, + authorized: false, + user: String::new(), + } } } -impl ClientRx { - pub fn software_rolling(&mut self, version: bool, extranonce2: bool) { +impl + Client +{ + pub fn enable_software_rolling(&mut self, version: bool, extranonce2: bool, ntime: bool) { self.job_creator.version_rolling = version; self.job_creator.extranonce2_rolling = extranonce2; + self.job_creator.ntime_rolling = ntime; + debug!( + "Software Rolling Enabled : version: {}, extranonce2: {}, ntime: {}", + version, extranonce2, ntime + ); } - pub async fn run(&mut self) -> Result<()> { - while let Some(req) = self.req_queue_cons.dequeue() { - debug!("ClientRx::run dequeue: {:?}", req.clone()); - self.reqs.insert(req.0, req.1).map_err(|_| Error::MapFull)?; - } - // TODO: maybe add some garbage collection here to remove old reqs never responded by Pool - while let Some(i) = self.buf[..self.pos].iter().position(|&c| c == b'\n') { - let line = &self.buf[..i]; - debug!( - "pos: {}; i: {}; line: {:?}", - self.pos, - i, - core::str::from_utf8(line) + pub async fn roll_job(&mut self) -> Result { + self.job_creator.roll() + } + + pub async fn receive_message(&mut self) -> Result { + let mut msg = Message::Other; + let mut start = 0; + while let Some(stop) = self.rx_buf[start..self.rx_free_pos] + .iter() + .position(|&c| c == b'\n') + { + let line = &self.rx_buf[start..stop]; + debug!("Received Message: {}", core::str::from_utf8(line).unwrap()); + trace!( + "^^^ start: {}, stop: {}, free: {} ^^^", + start, + stop, + self.rx_free_pos ); if let Some(id) = response::parse_id(line)? { // it's a Response match self.reqs.get(&id) { Some(ReqKind::Configure) => { self.configuration = Some(response::parse_configure(line)?); - self.state_queue_prod - .enqueue(ReqKind::Configure) - .map_err(|_| Error::QueueFull)?; self.reqs.remove(&id); - debug!("enqueue: {:?}, reqs: {:?}", ReqKind::Configure, self.reqs); + info!("Stratum v1 Client Configured"); + msg = Message::Configured; } Some(ReqKind::Connect) => { let conn = response::parse_connect(line)?; self.subscriptions = conn.subscriptions; self.job_creator .set_extranonces(conn.extranonce1, conn.extranonce2_size)?; - self.state_queue_prod - .enqueue(ReqKind::Connect) - .map_err(|_| Error::QueueFull)?; + self.connected = true; self.reqs.remove(&id); - debug!("enqueue: {:?}, reqs: {:?}", ReqKind::Connect, self.reqs); + info!("Stratum v1 Client Connected"); + msg = Message::Connected; } Some(ReqKind::Authorize) => { if response::parse_authorize(line)? { - self.state_queue_prod - .enqueue(ReqKind::Authorize) - .map_err(|_| Error::QueueFull)?; + self.authorized = true; self.reqs.remove(&id); - debug!("enqueue: {:?}, reqs: {:?}", ReqKind::Authorize, self.reqs); + info!("Stratum v1 Client Authorized"); + msg = Message::Authorized; } } Some(ReqKind::Submit) => { match response::parse_submit(line) { - Ok(_) => self.shares_accepted += 1, + Ok(_) => { + self.shares_accepted += 1; + info!( + "Share #{} Accepted, count: {}/{}", + id, self.shares_accepted, self.shares_rejected + ); + } Err(Error::Pool { code: _c, // TODO: use this code to differentiate why share has been rejected message: _, detail: _, - }) => self.shares_rejected += 1, + }) => { + self.shares_rejected += 1; + info!( + "Share #{} Rejected, count: {}/{}", + id, self.shares_accepted, self.shares_rejected + ); + } Err(e) => return Err(e), } self.reqs.remove(&id); - debug!("rx sumbit response, reqs: {:?}", self.reqs); + msg = Message::Share { + accepted: self.shares_accepted, + rejected: self.shares_rejected, + }; } None => return Err(Error::IdNotFound(id)), } @@ -193,82 +167,63 @@ impl ClientRx { Notification::SetVersionMask => { let mask = notification::parse_set_version_mask(line)?; self.job_creator.set_version_mask(mask); - self.vers_mask_queue_prod - .enqueue(mask) - .map_err(|_| Error::QueueFull)?; + msg = Message::VersionMask(mask); + info!("Set Version Mask: 0x{:x}", mask); } Notification::SetDifficulty => { - self.diff_queue_prod - .enqueue(notification::parse_set_difficulty(line)?) - .map_err(|_| Error::QueueFull)?; + let diff = notification::parse_set_difficulty(line)?; + msg = Message::Difficulty(diff); + info!("Set Difficulty: {}", diff); } Notification::Notify => { let work = notification::parse_notify(line)?; - self.jobs.clear(); - // while !self.jobs.is_full() { - self.jobs.write(self.job_creator.roll(&work)?); - // } - // if work.clean_jobs { - // todo!("inform app to immediately change job") - // } - self.work_queue_prod - .enqueue(work) - .map_err(|_| Error::QueueFull)?; + if work.clean_jobs { + msg = Message::CleanJobs; + } + info!("New Work: {:?}", work); + self.job_creator.set_work(work)?; } } } - if self.pos > i + 1 { - self.buf.copy_within(i + 1..self.pos, 0); - } - self.pos -= i + 1; + start = stop + 1; } - let n = self - .network_reader - .read(self.buf[self.pos..].as_mut()) - .await - .map_err(|_| Error::NetworkError)?; - trace!( - "read {} bytes, pos {}: {:?}", - n, - self.pos, - core::str::from_utf8(&self.buf[self.pos..self.pos + n]) - ); - self.pos += n; - Ok(()) - } -} - -impl ClientTx { - fn check_queues(&mut self) { - if let Some(state) = self.state_queue_cons.dequeue() { - debug!("dequeue req: {:?}", state); - match state { - ReqKind::Configure => self.configured = true, - ReqKind::Connect => self.connected = true, - ReqKind::Authorize => self.authorized = true, - _ => unreachable!("unknown state: {:?}", state), - } + if start > 0 && self.rx_free_pos > start { + self.rx_buf.copy_within(start..self.rx_free_pos, 0); + self.rx_free_pos -= start; } - if let Some(diff) = self.diff_queue_cons.dequeue() { - debug!("dequeue diff: {:?}", diff); - self.pool_target_difficulty = diff; + if self + .network_conn + .read_ready() + .map_err(|_| Error::NetworkError)? + { + let n = self + .network_conn + .read(self.rx_buf[self.rx_free_pos..].as_mut()) + .await + .map_err(|_| Error::NetworkError)?; + trace!( + "read {} bytes, free {}: {}", + n, + self.rx_free_pos, + core::str::from_utf8(&self.rx_buf[self.rx_free_pos..self.rx_free_pos + n]).unwrap() + ); + self.rx_free_pos += n; } + Ok(msg) } fn prepare_req(&mut self, req_kind: ReqKind) -> Result<()> { self.req_id += 1; - let req_id_kind = ReqIdKind(self.req_id, req_kind); - self.req_queue_prod - .enqueue(req_id_kind.clone()) - .map_err(|_| Error::QueueFull)?; - debug!("enqueue: {:?}", req_id_kind); + self.reqs + .insert(self.req_id, req_kind) + .map_err(|_| Error::MapFull)?; Ok(()) } async fn send_req(&mut self, req_len: usize) -> Result<()> { - self.buf[req_len] = 0x0a; - self.network_writer - .write_all(&self.buf[..req_len + 1]) + self.tx_buf[req_len] = 0x0a; + self.network_conn + .write_all(&self.tx_buf[..req_len + 1]) .await .map_err(|_| Error::NetworkError) } @@ -280,12 +235,15 @@ impl ClientTx { /// exts: a list of extensions to configure. /// pub async fn send_configure(&mut self, exts: Extensions) -> Result<()> { - self.check_queues(); - if self.configured { + if self.configuration.is_some() { return Err(Error::AlreadyConfigured); } self.prepare_req(ReqKind::Configure)?; - let n = request::configure(self.req_id, exts, self.buf.as_mut_slice())?; + let n = request::configure(self.req_id, exts, self.tx_buf.as_mut_slice())?; + debug!( + "Send Configure: {}", + core::str::from_utf8(&self.tx_buf[..n]).unwrap() + ); self.send_req(n).await } @@ -296,15 +254,18 @@ impl ClientTx { /// identifier: a string to identify the client to the pool. /// pub async fn send_connect(&mut self, identifier: Option>) -> Result<()> { - self.check_queues(); - if !self.configured { + if self.configuration.is_none() { return Err(Error::NotConfigured); } if self.connected { return Err(Error::AlreadyConnected); } self.prepare_req(ReqKind::Connect)?; - let n = request::connect(self.req_id, identifier, self.buf.as_mut_slice())?; + let n = request::connect(self.req_id, identifier, self.tx_buf.as_mut_slice())?; + debug!( + "Send Connect: {}", + core::str::from_utf8(&self.tx_buf[..n]).unwrap() + ); self.send_req(n).await } @@ -318,7 +279,6 @@ impl ClientTx { /// pass: a string with user password. /// pub async fn send_authorize(&mut self, user: String<64>, pass: String<64>) -> Result<()> { - self.check_queues(); if !self.connected { return Err(Error::NotConnected); } @@ -327,7 +287,11 @@ impl ClientTx { } self.prepare_req(ReqKind::Authorize)?; self.user = user.clone(); - let n = request::authorize(self.req_id, user, pass, self.buf.as_mut_slice())?; + let n = request::authorize(self.req_id, user, pass, self.tx_buf.as_mut_slice())?; + debug!( + "Send Authorize: {}", + core::str::from_utf8(&self.tx_buf[..n]).unwrap() + ); self.send_req(n).await } @@ -345,24 +309,21 @@ impl ClientTx { /// /// version_bits: an optional 32-bits unsigned integer with the share's version_bits. /// - pub async fn send_submit(&mut self, share: Share, diff: f64) -> Result<()> { - self.check_queues(); + pub async fn send_submit(&mut self, share: Share) -> Result<()> { if !self.authorized { return Err(Error::Unauthorized); } - if diff < self.pool_target_difficulty { - return Err(Error::LowDifficulty { - share_diff: diff, - pool_diff: self.pool_target_difficulty, - }); - } self.prepare_req(ReqKind::Submit)?; let n = request::submit( self.req_id, self.user.clone(), share, - self.buf.as_mut_slice(), + self.tx_buf.as_mut_slice(), )?; + debug!( + "Send Submit: {}", + core::str::from_utf8(&self.tx_buf[..n]).unwrap() + ); self.send_req(n).await } } diff --git a/stratum-v1/src/client/notification.rs b/stratum-v1/src/client/notification.rs index 519e94f..27d26c8 100644 --- a/stratum-v1/src/client/notification.rs +++ b/stratum-v1/src/client/notification.rs @@ -162,6 +162,7 @@ pub(crate) fn parse_set_difficulty(resp: &[u8]) -> Result { #[cfg(test)] mod tests { + use core::str::FromStr; use heapless::Vec; use super::*; @@ -182,7 +183,10 @@ mod tests { parse_set_version_mask(resp), Err(Error::JsonError( serde_json_core::de::Error::CustomErrorWithMessage( - "invalid length 9, expected a string no more than 8 bytes long".into() + String::<64>::from_str( + "invalid length 9, expected a string no more than 8 bytes long" + ) + .unwrap() ) )) ); diff --git a/stratum-v1/src/client/request.rs b/stratum-v1/src/client/request.rs index df43566..b4104ba 100644 --- a/stratum-v1/src/client/request.rs +++ b/stratum-v1/src/client/request.rs @@ -15,10 +15,6 @@ pub(crate) enum ReqKind { Submit, } -#[derive(Debug, Clone)] -#[cfg_attr(feature = "defmt-03", derive(defmt::Format))] -pub(crate) struct ReqIdKind(pub(crate) u64, pub(crate) ReqKind); - ///Request representation. /// ///Note that omitting `id` means that request is notification, rather than call, which expects diff --git a/stratum-v1/src/error.rs b/stratum-v1/src/error.rs index 81928b6..c6ad841 100644 --- a/stratum-v1/src/error.rs +++ b/stratum-v1/src/error.rs @@ -43,6 +43,8 @@ pub enum Error { /// Map is full MapFull, + NoWork, + /// Pool reported an error Pool { code: isize, @@ -57,11 +59,6 @@ pub enum Error { IdNotFound(u64), - LowDifficulty { - share_diff: f64, - pool_diff: f64, - }, - /// correspond to serde_json_core::ser:Error::BufferFull JsonBufferFull, /// correspond to all serde_json_core::de:Error diff --git a/stratum-v1/src/lib.rs b/stratum-v1/src/lib.rs index ad680a0..f996beb 100644 --- a/stratum-v1/src/lib.rs +++ b/stratum-v1/src/lib.rs @@ -15,5 +15,5 @@ pub(crate) mod fmt; mod client; mod error; -pub use client::{Client, Extensions, Info, Notification, Share, VersionRolling, Work}; +pub use client::{Client, Extensions, Info, Job, Message, Share, VersionRolling}; pub use error::{Error, Result};