Skip to content

Commit

Permalink
make a single Client again (it is upt to the app to shared a single r…
Browse files Browse the repository at this point in the history
…essource)
  • Loading branch information
georgesFoundation committed Aug 7, 2024
1 parent fce694b commit 00612fd
Show file tree
Hide file tree
Showing 8 changed files with 315 additions and 350 deletions.
2 changes: 1 addition & 1 deletion stratum-v1/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ serde = { workspace = true }
serde-json-core = { features = ["custom-error-messages"], git = "https://github.com/rust-embedded-community/serde-json-core.git", branch = "master" }

[features]
default = []
defmt-03 = [
"dep:defmt",
"embedded-io-async/defmt-03",
Expand All @@ -38,6 +37,7 @@ defmt-03 = [
embedded-io = { workspace = true, features = ["std"] }
env_logger = "0.11"
inquire = "0.7"
log = { workspace = true }
tokio = { version = "1", features = ["full"] }

[[example]]
Expand Down
237 changes: 121 additions & 116 deletions stratum-v1/examples/tokio-cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,20 @@

#![allow(static_mut_refs)]

use stratum_v1::{Client, Extensions, Share, VersionRolling, Work};
use stratum_v1::{Client, Extensions, Message, Share, VersionRolling};

use heapless::{spsc::Queue, String, Vec};
use heapless::{String, Vec};
use inquire::Select;
use log::{debug, error};
use std::{
net::{Ipv4Addr, SocketAddr},
str::FromStr,
sync::Arc,
time::Duration,
};
use tokio::{
io::{ReadHalf, WriteHalf},
net::TcpStream,
sync::{watch, Mutex},
};

#[tokio::main]
Expand All @@ -31,95 +33,117 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
};

let stream = TcpStream::connect(addr).await?;
let (stream_reader, stream_writer) = tokio::io::split(stream);

let conn_reader = adapter::FromTokioRead::<ReadHalf<TcpStream>>::new(stream_reader);
let conn_writer = adapter::FromTokioWrite::<WriteHalf<TcpStream>>::new(stream_writer);
let conn = adapter::FromTokio::<TcpStream>::new(stream);

let vers_mask_queue: &'static mut Queue<u32, 2> = {
static mut Q: Queue<u32, 2> = Queue::new();
unsafe { &mut Q }
};
let (vers_mask_queue_prod, mut vers_mask_queue_cons) = vers_mask_queue.split();
tokio::spawn(async move {
loop {
if let Some(mask) = vers_mask_queue_cons.dequeue() {
println!("new version mask from Pool: {:x}", mask);
}
}
});
let mut client = Client::<_, 1480, 512>::new(conn);
client.enable_software_rolling(true, false, false);

let work_queue: &'static mut Queue<Work, 2> = {
static mut Q: Queue<Work, 2> = Queue::new();
unsafe { &mut Q }
};
let (work_queue_prod, mut work_queue_cons) = work_queue.split();
tokio::spawn(async move {
loop {
if let Some(work) = work_queue_cons.dequeue() {
println!("new work from Pool: {:?}", work);
}
}
});
let client_tx = Arc::new(Mutex::new(client));
let client_rx = Arc::clone(&client_tx);

let (mut client_rx, mut client_tx) = Client::<_, _, 1480, 512>::new_rx_tx(
conn_reader,
conn_writer,
vers_mask_queue_prod,
work_queue_prod,
);
let (authorized_tx, mut authorized_rx) = watch::channel(false);

tokio::spawn(async move {
client_rx.software_rolling(true, false);
loop {
if let Err(e) = client_rx.run().await {
println!("client_rx error: {:?}", e);
let mut c = client_rx.lock().await;
match c.receive_message().await {
Ok(msg) => match msg {
Message::Configured => {
c.send_connect(Some(String::<32>::from_str("demo").unwrap()))
.await
.unwrap();
}
Message::Connected => {
c.send_authorize(
match pool {
"Public-Pool" => String::<64>::from_str(
"1HLQGxzAQWnLore3fWHc2W8UP1CgMv1GKQ.miner1",
)
.unwrap(),
"Braiins" => String::<64>::from_str("slush.miner1").unwrap(),
_ => unreachable!(),
},
String::<64>::from_str("x").unwrap(),
)
.await
.unwrap();
}
Message::Authorized => {
authorized_tx.send(true).unwrap();
}
Message::Share {
accepted: _,
rejected: _,
} => {
// TODO update the display if any
}
Message::VersionMask(_mask) => {
// TODO use mask for hardware version rolling is available
}
Message::Difficulty(_diff) => {
// TODO use diff to filter ASIC reported hits
}
Message::CleanJobs => {
// TODO clean the job queue and immediately start hashing a new job
}
Message::Other => {
debug!("Received Other Message");
}
},
Err(e) => {
error!("Client receive_message error: {:?}", e);
}
}
}
});

let exts = Extensions {
version_rolling: Some(VersionRolling {
mask: Some(0x1fffe000),
min_bit_count: Some(10),
}),
minimum_difficulty: None,
subscribe_extranonce: None,
info: None,
};
client_tx.send_configure(exts).await.unwrap();
tokio::time::sleep(Duration::from_millis(1000)).await;
client_tx
.send_connect(Some(String::<32>::from_str("slush").unwrap()))
.await
.unwrap();
tokio::time::sleep(Duration::from_millis(1000)).await;
client_tx
.send_authorize(
match pool {
"Public-Pool" => {
String::<64>::from_str("1HLQGxzAQWnLore3fWHc2W8UP1CgMv1GKQ.miner1").unwrap()
}
"Braiins" => String::<64>::from_str("slush.miner1").unwrap(),
_ => unreachable!(),
},
String::<64>::from_str("x").unwrap(),
)
.await
.unwrap();
{
let mut c = client_tx.lock().await;
let exts = Extensions {
version_rolling: Some(VersionRolling {
mask: Some(0x1fffe000),
min_bit_count: Some(10),
}),
minimum_difficulty: None,
subscribe_extranonce: None,
info: None,
};
c.send_configure(exts).await.unwrap();
}
authorized_rx.changed().await.unwrap();
loop {
// TODO: use client.roll_job() to get a new job at the rate the hardware need it
tokio::time::sleep(Duration::from_millis(5000)).await;
let mut extranonce2 = Vec::new();
extranonce2.resize(4, 0).unwrap();
extranonce2[3] = 0x01;
let fake_share = Share {
job_id: String::<64>::from_str("01").unwrap(),
extranonce2,
ntime: 1722789905,
nonce: 0,
version_bits: None,
};
client_tx.send_submit(fake_share, 1000000.0).await.unwrap();
{
let mut c = client_tx.lock().await;
let mut extranonce2 = Vec::new();
extranonce2.resize(4, 0).unwrap();
extranonce2[3] = 0x01;
let fake_share = Share {
job_id: String::<64>::from_str("01").unwrap(), // TODO will come from the Job
extranonce2, // TODO will come from the Job
ntime: 1722789905, // TODO will come from the Job
nonce: 0, // TODO will come from the ASIC hit
version_bits: None, // TODO will come from the ASIC hit if hardware version rolling is enabled
};
c.send_submit(fake_share).await.unwrap();
}
}
}

trait Readable {
fn poll_read_ready(
&self,
cx: &mut core::task::Context<'_>,
) -> core::task::Poll<std::io::Result<()>>;
}

impl Readable for TcpStream {
fn poll_read_ready(
&self,
cx: &mut core::task::Context<'_>,
) -> core::task::Poll<std::io::Result<()>> {
self.poll_read_ready(cx)
}
}

Expand All @@ -130,11 +154,11 @@ mod adapter {

/// Adapter from `tokio::io` traits.
#[derive(Clone)]
pub struct FromTokioRead<T: ?Sized> {
pub struct FromTokio<T: ?Sized> {
inner: T,
}

impl<T> FromTokioRead<T> {
impl<T> FromTokio<T> {
/// Create a new adapter.
pub fn new(inner: T) -> Self {
Self { inner }
Expand All @@ -146,7 +170,7 @@ mod adapter {
}
}

impl<T: ?Sized> FromTokioRead<T> {
impl<T: ?Sized> FromTokio<T> {
/// Borrow the inner object.
pub fn inner(&self) -> &T {
&self.inner
Expand All @@ -158,11 +182,11 @@ mod adapter {
}
}

impl<T: ?Sized> embedded_io::ErrorType for FromTokioRead<T> {
impl<T: ?Sized> embedded_io::ErrorType for FromTokio<T> {
type Error = std::io::Error;
}

impl<T: tokio::io::AsyncRead + Unpin + ?Sized> embedded_io_async::Read for FromTokioRead<T> {
impl<T: tokio::io::AsyncRead + Unpin + ?Sized> embedded_io_async::Read for FromTokio<T> {
async fn read(&mut self, buf: &mut [u8]) -> Result<usize, Self::Error> {
// The current tokio implementation (https://github.com/tokio-rs/tokio/blob/tokio-1.33.0/tokio/src/io/poll_evented.rs#L165)
// does not consider the case of buf.is_empty() as a special case,
Expand All @@ -186,40 +210,21 @@ mod adapter {
}
}

#[derive(Clone)]
pub struct FromTokioWrite<T: ?Sized> {
inner: T,
}

impl<T> FromTokioWrite<T> {
/// Create a new adapter.
pub fn new(inner: T) -> Self {
Self { inner }
}

/// Consume the adapter, returning the inner object.
pub fn into_inner(self) -> T {
self.inner
}
}

impl<T: ?Sized> FromTokioWrite<T> {
/// Borrow the inner object.
pub fn inner(&self) -> &T {
&self.inner
}

/// Mutably borrow the inner object.
pub fn inner_mut(&mut self) -> &mut T {
&mut self.inner
impl<T: super::Readable + Unpin + ?Sized> embedded_io_async::ReadReady for FromTokio<T> {
fn read_ready(&mut self) -> Result<bool, Self::Error> {
// TODO: This crash at runtime :
// Cannot start a runtime from within a runtime. This happens because a function (like `block_on`)
// attempted to block the current thread while the thread is being used to drive asynchronous tasks.
tokio::runtime::Handle::current().block_on(poll_fn(|cx| {
match Pin::new(&mut self.inner).poll_read_ready(cx) {
Poll::Ready(_) => Poll::Ready(Ok(true)),
Poll::Pending => Poll::Ready(Ok(false)),
}
}))
}
}

impl<T: ?Sized> embedded_io::ErrorType for FromTokioWrite<T> {
type Error = std::io::Error;
}

impl<T: tokio::io::AsyncWrite + Unpin + ?Sized> embedded_io_async::Write for FromTokioWrite<T> {
impl<T: tokio::io::AsyncWrite + Unpin + ?Sized> embedded_io_async::Write for FromTokio<T> {
async fn write(&mut self, buf: &[u8]) -> Result<usize, Self::Error> {
match poll_fn(|cx| Pin::new(&mut self.inner).poll_write(cx, buf)).await {
Ok(0) if !buf.is_empty() => Err(std::io::ErrorKind::WriteZero.into()),
Expand Down
Loading

0 comments on commit 00612fd

Please sign in to comment.