Skip to content

Commit

Permalink
refactor: directly deserialize as Duration (#303)
Browse files Browse the repository at this point in the history
* refactor: directly deserialize as `Duration`

* fix: android compilation
  • Loading branch information
de-sh authored Nov 4, 2023
1 parent 4a12479 commit 4cbb802
Show file tree
Hide file tree
Showing 8 changed files with 180 additions and 31 deletions.
127 changes: 125 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions uplink/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ flume = "0.10"
rumqttc = { git = "https://github.com/bytebeamio/rumqtt" }
serde = { version = "1", features = ["derive"] }
serde_json = "1.0"
serde_with = "3.3.0"
thiserror = "1"
tokio = { version = "1", features = ["full"] }
tokio-util = { version = "0.7", features = ["codec", "time"] }
Expand Down
35 changes: 20 additions & 15 deletions uplink/src/base/bridge/actions_lane.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,10 +103,9 @@ impl ActionsBridge {

pub fn register_action_route(
&mut self,
ActionRoute { name, timeout }: ActionRoute,
ActionRoute { name, timeout: duration }: ActionRoute,
actions_tx: Sender<Action>,
) -> Result<(), Error> {
let duration = Duration::from_secs(timeout);
let action_router = ActionRouter { actions_tx, duration };
if self.action_routes.insert(name.clone(), action_router).is_some() {
return Err(Error::ActionRouteClash(name));
Expand Down Expand Up @@ -136,7 +135,7 @@ impl ActionsBridge {
}

pub async fn start(&mut self) -> Result<(), Error> {
let mut metrics_timeout = interval(Duration::from_secs(self.config.stream_metrics.timeout));
let mut metrics_timeout = interval(self.config.stream_metrics.timeout);
let mut end: Pin<Box<Sleep>> = Box::pin(time::sleep(Duration::from_secs(u64::MAX)));
self.load_saved_action()?;

Expand Down Expand Up @@ -461,10 +460,13 @@ mod tests {
Config {
stream_metrics: StreamMetricsConfig {
enabled: false,
timeout: 10,
timeout: Duration::from_secs(10),
..Default::default()
},
action_status: StreamConfig {
flush_period: Duration::from_secs(2),
..Default::default()
},
action_status: StreamConfig { flush_period: 2, ..Default::default() },
..Default::default()
}
}
Expand Down Expand Up @@ -510,13 +512,13 @@ mod tests {
std::env::set_current_dir(&tmpdir).unwrap();
let config = Arc::new(default_config());
let (mut bridge, actions_tx, data_rx) = create_bridge(config);
let route_1 = ActionRoute { name: "route_1".to_string(), timeout: 10 };
let route_1 = ActionRoute { name: "route_1".to_string(), timeout: Duration::from_secs(10) };

let (route_tx, route_1_rx) = bounded(1);
bridge.register_action_route(route_1, route_tx).unwrap();

let (route_tx, route_2_rx) = bounded(1);
let route_2 = ActionRoute { name: "route_2".to_string(), timeout: 30 };
let route_2 = ActionRoute { name: "route_2".to_string(), timeout: Duration::from_secs(30) };
bridge.register_action_route(route_2, route_tx).unwrap();

spawn_bridge(bridge);
Expand Down Expand Up @@ -596,7 +598,7 @@ mod tests {
let config = Arc::new(default_config());
let (mut bridge, actions_tx, data_rx) = create_bridge(config);

let test_route = ActionRoute { name: "test".to_string(), timeout: 30 };
let test_route = ActionRoute { name: "test".to_string(), timeout: Duration::from_secs(30) };

let (route_tx, action_rx) = bounded(1);
bridge.register_action_route(test_route, route_tx).unwrap();
Expand Down Expand Up @@ -648,7 +650,7 @@ mod tests {
let config = Arc::new(default_config());
let (mut bridge, actions_tx, data_rx) = create_bridge(config);

let test_route = ActionRoute { name: "test".to_string(), timeout: 30 };
let test_route = ActionRoute { name: "test".to_string(), timeout: Duration::from_secs(30) };

let (route_tx, action_rx) = bounded(1);
bridge.register_action_route(test_route, route_tx).unwrap();
Expand Down Expand Up @@ -699,11 +701,12 @@ mod tests {
let bridge_tx_2 = bridge.tx();

let (route_tx, action_rx_1) = bounded(1);
let test_route = ActionRoute { name: "test".to_string(), timeout: 30 };
let test_route = ActionRoute { name: "test".to_string(), timeout: Duration::from_secs(30) };
bridge.register_action_route(test_route, route_tx).unwrap();

let (route_tx, action_rx_2) = bounded(1);
let redirect_route = ActionRoute { name: "redirect".to_string(), timeout: 30 };
let redirect_route =
ActionRoute { name: "redirect".to_string(), timeout: Duration::from_secs(30) };
bridge.register_action_route(redirect_route, route_tx).unwrap();

spawn_bridge(bridge);
Expand Down Expand Up @@ -766,11 +769,12 @@ mod tests {
let bridge_tx_2 = bridge.tx();

let (route_tx, action_rx_1) = bounded(1);
let tunshell_route = ActionRoute { name: TUNSHELL_ACTION.to_string(), timeout: 30 };
let tunshell_route =
ActionRoute { name: TUNSHELL_ACTION.to_string(), timeout: Duration::from_secs(30) };
bridge.register_action_route(tunshell_route, route_tx).unwrap();

let (route_tx, action_rx_2) = bounded(1);
let test_route = ActionRoute { name: "test".to_string(), timeout: 30 };
let test_route = ActionRoute { name: "test".to_string(), timeout: Duration::from_secs(30) };
bridge.register_action_route(test_route, route_tx).unwrap();

spawn_bridge(bridge);
Expand Down Expand Up @@ -856,11 +860,12 @@ mod tests {
let bridge_tx_2 = bridge.tx();

let (route_tx, action_rx_1) = bounded(1);
let test_route = ActionRoute { name: "test".to_string(), timeout: 30 };
let test_route = ActionRoute { name: "test".to_string(), timeout: Duration::from_secs(30) };
bridge.register_action_route(test_route, route_tx).unwrap();

let (route_tx, action_rx_2) = bounded(1);
let tunshell_route = ActionRoute { name: TUNSHELL_ACTION.to_string(), timeout: 30 };
let tunshell_route =
ActionRoute { name: TUNSHELL_ACTION.to_string(), timeout: Duration::from_secs(30) };
bridge.register_action_route(tunshell_route, route_tx).unwrap();

spawn_bridge(bridge);
Expand Down
4 changes: 2 additions & 2 deletions uplink/src/base/bridge/data_lane.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{sync::Arc, time::Duration};
use std::sync::Arc;

use flume::{bounded, Receiver, RecvError, Sender};
use log::{debug, error};
Expand Down Expand Up @@ -47,7 +47,7 @@ impl DataBridge {
}

pub async fn start(&mut self) -> Result<(), Error> {
let mut metrics_timeout = interval(Duration::from_secs(self.config.stream_metrics.timeout));
let mut metrics_timeout = interval(self.config.stream_metrics.timeout);

loop {
select! {
Expand Down
2 changes: 1 addition & 1 deletion uplink/src/base/bridge/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ where
tx: Sender<Box<dyn Package>>,
) -> Stream<T> {
let mut stream = Stream::new(name, &config.topic, config.buf_size, tx, config.compression);
stream.flush_period = Duration::from_secs(config.flush_period);
stream.flush_period = config.flush_period;
stream
}

Expand Down
Loading

0 comments on commit 4cbb802

Please sign in to comment.