Skip to content

Commit

Permalink
fix: Remove action timeouts everywhere (#368)
Browse files Browse the repository at this point in the history
Changes
* Remove action timeouts everywhere
* Update action handling collectors so that they work properly with this setup

Why?
As per internal discussion

Trials Performed
* Tests in the QA document
* Tried restarting and cancelling actions during various steps of the action flow
  • Loading branch information
amokfa authored Oct 30, 2024
1 parent 919e200 commit 5ca70a9
Show file tree
Hide file tree
Showing 21 changed files with 527 additions and 993 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@ tokio-stream = "0.1.15"
tokio-util = { version = "0.7", features = ["codec", "time"] }

[profile.dev]
opt-level = 1
opt-level = 0
debug = true
strip = true
strip = false
panic = "unwind"

[profile.release]
Expand Down
1 change: 1 addition & 0 deletions tools/utils/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,5 @@ serde_json = { workspace = true }
tokio = { workspace = true }
tokio-stream = { workspace = true }
tokio-util = { workspace = true }
structopt = "0.3.26"
uplink = { path = "../../uplink" }
132 changes: 95 additions & 37 deletions tools/utils/src/wait_and_send.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
use futures_util::SinkExt;
use serde::{Deserialize, Serialize};
use std::thread::sleep;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use structopt::StructOpt;
use tokio::net::TcpStream;
use tokio::select;
use tokio_stream::StreamExt;
use tokio_util::codec::{Framed, LinesCodec};
use uplink::Action;
Expand All @@ -18,46 +19,103 @@ struct Response {
errors: Vec<String>,
}

#[derive(StructOpt, Debug)]
pub struct CommandLine {
#[structopt(short = "s", default_value = "Completed")]
pub final_status: String,
#[structopt(short = "w", default_value = "3")]
pub wait_time: u64,
#[structopt(short = "p")]
pub port: String,
}

async fn respond<'a>(
framed: &'a mut Framed<TcpStream, LinesCodec>,
idx: &mut u32,
action_id: &str,
state: &str,
progress: u8,
) {
*idx += 1;
let response = Response {
stream: "action_status".to_string(),
sequence: *idx,
timestamp: SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_millis() as u64,
id: action_id.to_string(),
state: state.to_string(),
progress,
errors: vec![],
};
let resp = serde_json::to_string(&response).unwrap();
println!("Sending: {resp}");
framed.send(resp).await.unwrap();
}

struct ActionState {
id: String,
response_counter: u32,
}

#[tokio::main]
async fn main() {
let final_state = std::env::args().nth(1).unwrap_or_else(|| {
println!("Using default value \"Completed\"");
"Completed".to_string()
});
let args: CommandLine = StructOpt::from_args();
let port = std::env::args().nth(2).unwrap_or_else(|| "127.0.0.1:5050".to_string());
let stream = TcpStream::connect(port).await.unwrap();
let mut stream = TcpStream::connect(port.as_str()).await.unwrap();
let mut framed = Framed::new(stream, LinesCodec::new());
async fn respond<'a>(
framed: &'a mut Framed<TcpStream, LinesCodec>,
idx: &mut u32,
action_id: &str,
state: &str,
progress: u8,
) {
*idx += 1;
let response = Response {
stream: "action_status".to_string(),
sequence: *idx,
timestamp: SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_millis() as u64,
id: action_id.to_string(),
state: state.to_string(),
progress,
errors: vec![],
};
let resp = serde_json::to_string(&response).unwrap();
println!("Sending: {resp}");
framed.send(resp).await.unwrap();
}
let mut idx = 0;

let mut curr_action: Option<ActionState> = None;

loop {
let action_s = framed.next().await.unwrap().unwrap();
println!("Received: {action_s}");
let action = serde_json::from_str::<Action>(action_s.as_str()).unwrap();
sleep(Duration::from_secs(3));
respond(&mut framed, &mut idx, action.action_id.as_str(), "Working", 33).await;
sleep(Duration::from_secs(3));
respond(&mut framed, &mut idx, action.action_id.as_str(), "Working", 66).await;
sleep(Duration::from_secs(3));
respond(&mut framed, &mut idx, action.action_id.as_str(), final_state.as_str(), 100).await;
select! {
action_t = framed.next() => {
let action_s = if let Some(Ok(action_s)) = action_t {
action_s
} else {
tokio::time::sleep(Duration::from_secs(1)).await;
if let Ok(s) = TcpStream::connect(port.as_str()).await {
stream = s;
framed = Framed::new(stream, LinesCodec::new());
}
continue;
};
println!("Received: {action_s}");
let action = match serde_json::from_str::<Action>(action_s.as_str()) {
Err(e) => {
println!("invalid payload: {e}");
continue;
}
Ok(s) => s,
};
if curr_action.is_some() {
let curr_action_ref = curr_action.as_mut().unwrap();
if action.name == "cancel_action" {
respond(&mut framed, &mut curr_action_ref.response_counter, action.action_id.as_str(), "Completed", 100).await;
respond(&mut framed, &mut curr_action_ref.response_counter, curr_action_ref.id.as_str(), "Failed", 100).await;
curr_action = None;
} else {
respond(&mut framed, &mut curr_action_ref.response_counter, action.action_id.as_str(), "Failed", 100).await;
}
} else {
curr_action = Some(ActionState {
id: action.action_id.clone(),
response_counter: 0,
});
let curr_action = curr_action.as_mut().unwrap();
respond(&mut framed, &mut curr_action.response_counter, action.action_id.as_str(), "ReceivedByClient", 0).await;
}
}
_ = tokio::time::sleep(Duration::from_secs(args.wait_time)), if curr_action.is_some() => {
let curr_action_ref = curr_action.as_mut().unwrap();
if curr_action_ref.response_counter == 2 {
respond(&mut framed, &mut curr_action_ref.response_counter, curr_action_ref.id.as_str(), args.final_status.as_str(), 100).await;
} else {
let progress= (33 * curr_action_ref.response_counter) as u8;
respond(&mut framed, &mut curr_action_ref.response_counter, curr_action_ref.id.as_str(), "Working", progress).await;
}
if curr_action_ref.response_counter == 3 {
curr_action = None;
}
}
}
}
}
Loading

0 comments on commit 5ca70a9

Please sign in to comment.