From 4cbb802d9054e58b1aa45e08ee9b14f01097004e Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Sat, 4 Nov 2023 11:49:29 +0530 Subject: [PATCH] refactor: directly deserialize as `Duration` (#303) * refactor: directly deserialize as `Duration` * fix: android compilation --- Cargo.lock | 127 ++++++++++++++++++++++++- uplink/Cargo.toml | 1 + uplink/src/base/bridge/actions_lane.rs | 35 ++++--- uplink/src/base/bridge/data_lane.rs | 4 +- uplink/src/base/bridge/stream.rs | 2 +- uplink/src/base/mod.rs | 23 +++-- uplink/src/collector/downloader.rs | 5 +- uplink/src/lib.rs | 14 ++- 8 files changed, 180 insertions(+), 31 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 22d22f8e4..a75a02099 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -24,6 +24,21 @@ dependencies = [ "memchr", ] +[[package]] +name = "android-tzdata" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e999941b234f3131b00bc13c22d06e8c5ff726d1b6318ac7eb276997bbb4fef0" + +[[package]] +name = "android_system_properties" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "819e7219dbd41043ac279b19830f2efc897156490d7fd6ea916720117ee66311" +dependencies = [ + "libc", +] + [[package]] name = "ansi_term" version = "0.12.1" @@ -227,6 +242,19 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" +[[package]] +name = "chrono" +version = "0.4.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7f2c685bad3eb3d45a01354cedb7d5faa66194d1d58ba6e267a8de788f79db38" +dependencies = [ + "android-tzdata", + "iana-time-zone", + "num-traits", + "serde", + "windows-targets", +] + [[package]] name = "clap" version = "2.34.0" @@ -474,6 +502,12 @@ dependencies = [ "termcolor", ] +[[package]] +name = "equivalent" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5443807d6dff69373d433ab9ef5378ad8df50ca6298caf15de6e52e24aaf54d5" + [[package]] name = "errno" version = "0.2.8" @@ -816,7 +850,7 @@ dependencies = [ "futures-sink", "futures-util", "http", - "indexmap", + "indexmap 1.9.3", "slab", "tokio 1.28.2", "tokio-util 0.7.8", @@ -829,6 +863,12 @@ version = "0.12.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8a9ee70c43aaf417c914396645a0fa852624801b24ebb7ae78fe8272889ac888" +[[package]] +name = "hashbrown" +version = "0.14.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7dfda62a12f55daeae5015f81b0baea145391cb4520f86c248fc615d72640d12" + [[package]] name = "heck" version = "0.3.3" @@ -963,6 +1003,29 @@ dependencies = [ "tokio-rustls 0.24.1", ] +[[package]] +name = "iana-time-zone" +version = "0.1.57" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2fad5b825842d2b38bd206f3e81d6957625fd7f0a361e345c30e01a0ae2dd613" +dependencies = [ + "android_system_properties", + "core-foundation-sys", + "iana-time-zone-haiku", + "js-sys", + "wasm-bindgen", + "windows", +] + +[[package]] +name = "iana-time-zone-haiku" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f31827a206f56af32e590ba56d5d2d085f558508192593743f16b2306495269f" +dependencies = [ + "cc", +] + [[package]] name = "ident_case" version = "1.0.1" @@ -986,7 +1049,19 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bd070e393353796e801d209ad339e89596eb4c8d430d18ede6a1cced8fafbd99" dependencies = [ "autocfg", - "hashbrown", + "hashbrown 0.12.3", + "serde", +] + +[[package]] +name = "indexmap" +version = "2.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8adf3ddd720272c6ea8bf59463c04e0f93d0bbf7c5439b691bca2987e0270897" +dependencies = [ + "equivalent", + "hashbrown 0.14.1", + "serde", ] [[package]] @@ -1338,6 +1413,15 @@ dependencies = [ "winapi 0.3.9", ] +[[package]] +name = "num-traits" +version = "0.2.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "39e3200413f237f41ab11ad6d161bc7239c84dcb631773ccd7de3dfe4b5c267c" +dependencies = [ + "autocfg", +] + [[package]] name = "num_cpus" version = "1.15.0" @@ -2170,6 +2254,35 @@ dependencies = [ "serde", ] +[[package]] +name = "serde_with" +version = "3.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1ca3b16a3d82c4088f343b7480a93550b3eabe1a358569c2dfe38bbcead07237" +dependencies = [ + "base64 0.21.2", + "chrono", + "hex", + "indexmap 1.9.3", + "indexmap 2.0.2", + "serde", + "serde_json", + "serde_with_macros", + "time", +] + +[[package]] +name = "serde_with_macros" +version = "3.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2e6be15c453eb305019bfa438b1593c731f36a289a7853f7707ee29e870b3b3c" +dependencies = [ + "darling", + "proc-macro2", + "quote", + "syn 2.0.18", +] + [[package]] name = "serial" version = "0.4.1" @@ -3030,6 +3143,7 @@ dependencies = [ "rumqttc", "serde", "serde_json", + "serde_with", "signal-hook", "signal-hook-tokio", "storage", @@ -3333,6 +3447,15 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" +[[package]] +name = "windows" +version = "0.48.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e686886bc078bc1b0b600cac0147aadb815089b6e4da64016cbd754b6342700f" +dependencies = [ + "windows-targets", +] + [[package]] name = "windows-sys" version = "0.42.0" diff --git a/uplink/Cargo.toml b/uplink/Cargo.toml index dc7124fec..3c10e3406 100644 --- a/uplink/Cargo.toml +++ b/uplink/Cargo.toml @@ -10,6 +10,7 @@ flume = "0.10" rumqttc = { git = "https://github.com/bytebeamio/rumqtt" } serde = { version = "1", features = ["derive"] } serde_json = "1.0" +serde_with = "3.3.0" thiserror = "1" tokio = { version = "1", features = ["full"] } tokio-util = { version = "0.7", features = ["codec", "time"] } diff --git a/uplink/src/base/bridge/actions_lane.rs b/uplink/src/base/bridge/actions_lane.rs index fa2738a64..c64682729 100644 --- a/uplink/src/base/bridge/actions_lane.rs +++ b/uplink/src/base/bridge/actions_lane.rs @@ -103,10 +103,9 @@ impl ActionsBridge { pub fn register_action_route( &mut self, - ActionRoute { name, timeout }: ActionRoute, + ActionRoute { name, timeout: duration }: ActionRoute, actions_tx: Sender, ) -> Result<(), Error> { - let duration = Duration::from_secs(timeout); let action_router = ActionRouter { actions_tx, duration }; if self.action_routes.insert(name.clone(), action_router).is_some() { return Err(Error::ActionRouteClash(name)); @@ -136,7 +135,7 @@ impl ActionsBridge { } pub async fn start(&mut self) -> Result<(), Error> { - let mut metrics_timeout = interval(Duration::from_secs(self.config.stream_metrics.timeout)); + let mut metrics_timeout = interval(self.config.stream_metrics.timeout); let mut end: Pin> = Box::pin(time::sleep(Duration::from_secs(u64::MAX))); self.load_saved_action()?; @@ -461,10 +460,13 @@ mod tests { Config { stream_metrics: StreamMetricsConfig { enabled: false, - timeout: 10, + timeout: Duration::from_secs(10), + ..Default::default() + }, + action_status: StreamConfig { + flush_period: Duration::from_secs(2), ..Default::default() }, - action_status: StreamConfig { flush_period: 2, ..Default::default() }, ..Default::default() } } @@ -510,13 +512,13 @@ mod tests { std::env::set_current_dir(&tmpdir).unwrap(); let config = Arc::new(default_config()); let (mut bridge, actions_tx, data_rx) = create_bridge(config); - let route_1 = ActionRoute { name: "route_1".to_string(), timeout: 10 }; + let route_1 = ActionRoute { name: "route_1".to_string(), timeout: Duration::from_secs(10) }; let (route_tx, route_1_rx) = bounded(1); bridge.register_action_route(route_1, route_tx).unwrap(); let (route_tx, route_2_rx) = bounded(1); - let route_2 = ActionRoute { name: "route_2".to_string(), timeout: 30 }; + let route_2 = ActionRoute { name: "route_2".to_string(), timeout: Duration::from_secs(30) }; bridge.register_action_route(route_2, route_tx).unwrap(); spawn_bridge(bridge); @@ -596,7 +598,7 @@ mod tests { let config = Arc::new(default_config()); let (mut bridge, actions_tx, data_rx) = create_bridge(config); - let test_route = ActionRoute { name: "test".to_string(), timeout: 30 }; + let test_route = ActionRoute { name: "test".to_string(), timeout: Duration::from_secs(30) }; let (route_tx, action_rx) = bounded(1); bridge.register_action_route(test_route, route_tx).unwrap(); @@ -648,7 +650,7 @@ mod tests { let config = Arc::new(default_config()); let (mut bridge, actions_tx, data_rx) = create_bridge(config); - let test_route = ActionRoute { name: "test".to_string(), timeout: 30 }; + let test_route = ActionRoute { name: "test".to_string(), timeout: Duration::from_secs(30) }; let (route_tx, action_rx) = bounded(1); bridge.register_action_route(test_route, route_tx).unwrap(); @@ -699,11 +701,12 @@ mod tests { let bridge_tx_2 = bridge.tx(); let (route_tx, action_rx_1) = bounded(1); - let test_route = ActionRoute { name: "test".to_string(), timeout: 30 }; + let test_route = ActionRoute { name: "test".to_string(), timeout: Duration::from_secs(30) }; bridge.register_action_route(test_route, route_tx).unwrap(); let (route_tx, action_rx_2) = bounded(1); - let redirect_route = ActionRoute { name: "redirect".to_string(), timeout: 30 }; + let redirect_route = + ActionRoute { name: "redirect".to_string(), timeout: Duration::from_secs(30) }; bridge.register_action_route(redirect_route, route_tx).unwrap(); spawn_bridge(bridge); @@ -766,11 +769,12 @@ mod tests { let bridge_tx_2 = bridge.tx(); let (route_tx, action_rx_1) = bounded(1); - let tunshell_route = ActionRoute { name: TUNSHELL_ACTION.to_string(), timeout: 30 }; + let tunshell_route = + ActionRoute { name: TUNSHELL_ACTION.to_string(), timeout: Duration::from_secs(30) }; bridge.register_action_route(tunshell_route, route_tx).unwrap(); let (route_tx, action_rx_2) = bounded(1); - let test_route = ActionRoute { name: "test".to_string(), timeout: 30 }; + let test_route = ActionRoute { name: "test".to_string(), timeout: Duration::from_secs(30) }; bridge.register_action_route(test_route, route_tx).unwrap(); spawn_bridge(bridge); @@ -856,11 +860,12 @@ mod tests { let bridge_tx_2 = bridge.tx(); let (route_tx, action_rx_1) = bounded(1); - let test_route = ActionRoute { name: "test".to_string(), timeout: 30 }; + let test_route = ActionRoute { name: "test".to_string(), timeout: Duration::from_secs(30) }; bridge.register_action_route(test_route, route_tx).unwrap(); let (route_tx, action_rx_2) = bounded(1); - let tunshell_route = ActionRoute { name: TUNSHELL_ACTION.to_string(), timeout: 30 }; + let tunshell_route = + ActionRoute { name: TUNSHELL_ACTION.to_string(), timeout: Duration::from_secs(30) }; bridge.register_action_route(tunshell_route, route_tx).unwrap(); spawn_bridge(bridge); diff --git a/uplink/src/base/bridge/data_lane.rs b/uplink/src/base/bridge/data_lane.rs index de2b39834..6e100fc18 100644 --- a/uplink/src/base/bridge/data_lane.rs +++ b/uplink/src/base/bridge/data_lane.rs @@ -1,4 +1,4 @@ -use std::{sync::Arc, time::Duration}; +use std::sync::Arc; use flume::{bounded, Receiver, RecvError, Sender}; use log::{debug, error}; @@ -47,7 +47,7 @@ impl DataBridge { } pub async fn start(&mut self) -> Result<(), Error> { - let mut metrics_timeout = interval(Duration::from_secs(self.config.stream_metrics.timeout)); + let mut metrics_timeout = interval(self.config.stream_metrics.timeout); loop { select! { diff --git a/uplink/src/base/bridge/stream.rs b/uplink/src/base/bridge/stream.rs index 2e199d6b4..d83c93541 100644 --- a/uplink/src/base/bridge/stream.rs +++ b/uplink/src/base/bridge/stream.rs @@ -76,7 +76,7 @@ where tx: Sender>, ) -> Stream { let mut stream = Stream::new(name, &config.topic, config.buf_size, tx, config.compression); - stream.flush_period = Duration::from_secs(config.flush_period); + stream.flush_period = config.flush_period; stream } diff --git a/uplink/src/base/mod.rs b/uplink/src/base/mod.rs index 5a651390e..c78d1e225 100644 --- a/uplink/src/base/mod.rs +++ b/uplink/src/base/mod.rs @@ -1,9 +1,10 @@ use std::env::current_dir; use std::path::PathBuf; -use std::time::{SystemTime, UNIX_EPOCH}; +use std::time::{Duration, SystemTime, UNIX_EPOCH}; use std::{collections::HashMap, fmt::Debug}; use serde::{Deserialize, Serialize}; +use serde_with::{serde_as, DurationSeconds}; #[cfg(target_os = "linux")] use crate::collector::journalctl::JournalCtlConfig; @@ -21,8 +22,8 @@ pub mod serializer; pub const DEFAULT_TIMEOUT: u64 = 60; #[inline] -fn default_timeout() -> u64 { - DEFAULT_TIMEOUT +fn default_timeout() -> Duration { + Duration::from_secs(DEFAULT_TIMEOUT) } #[inline] @@ -65,15 +66,17 @@ pub enum Compression { Lz4, } +#[serde_as] #[derive(Debug, Clone, Deserialize)] pub struct StreamConfig { pub topic: String, #[serde(default = "max_buf_size")] pub buf_size: usize, #[serde(default = "default_timeout")] + #[serde_as(as = "DurationSeconds")] /// Duration(in seconds) that bridge collector waits from /// receiving first element, before the stream gets flushed. - pub flush_period: u64, + pub flush_period: Duration, #[serde(default)] pub compression: Compression, #[serde(default)] @@ -149,19 +152,23 @@ pub struct InstallerConfig { pub uplink_port: u16, } +#[serde_as] #[derive(Debug, Clone, Deserialize, Serialize, Default)] pub struct StreamMetricsConfig { pub enabled: bool, pub topic: String, pub blacklist: Vec, - pub timeout: u64, + #[serde_as(as = "DurationSeconds")] + pub timeout: Duration, } +#[serde_as] #[derive(Debug, Clone, Deserialize, Serialize, Default)] pub struct SerializerMetricsConfig { pub enabled: bool, pub topic: String, - pub timeout: u64, + #[serde_as(as = "DurationSeconds")] + pub timeout: Duration, } #[derive(Debug, Clone, Deserialize, Serialize, Default)] @@ -191,11 +198,13 @@ pub struct MqttConfig { pub network_timeout: u64, } +#[serde_as] #[derive(Debug, Clone, Deserialize, Default)] pub struct ActionRoute { pub name: String, #[serde(default = "default_timeout")] - pub timeout: u64, + #[serde_as(as = "DurationSeconds")] + pub timeout: Duration, } impl From<&ActionRoute> for ActionRoute { diff --git a/uplink/src/collector/downloader.rs b/uplink/src/collector/downloader.rs index a9fc8008e..95b2c78da 100644 --- a/uplink/src/collector/downloader.rs +++ b/uplink/src/collector/downloader.rs @@ -452,7 +452,10 @@ mod test { let mut path = PathBuf::from(DOWNLOAD_DIR); path.push("uplink-test"); let downloader_cfg = DownloaderConfig { - actions: vec![ActionRoute { name: "firmware_update".to_owned(), timeout: 10 }], + actions: vec![ActionRoute { + name: "firmware_update".to_owned(), + timeout: Duration::from_secs(10), + }], path, }; let config = config(downloader_cfg.clone()); diff --git a/uplink/src/lib.rs b/uplink/src/lib.rs index dc72a793a..52f4acd01 100644 --- a/uplink/src/lib.rs +++ b/uplink/src/lib.rs @@ -43,6 +43,7 @@ //! [`name`]: Action#structfield.name use std::sync::Arc; use std::thread; +use std::time::Duration; use anyhow::Error; @@ -408,7 +409,8 @@ impl Uplink { pub fn spawn_builtins(&mut self, bridge: &mut Bridge) -> Result<(), Error> { let bridge_tx = bridge.tx(); - let route = ActionRoute { name: "launch_shell".to_owned(), timeout: 10 }; + let route = + ActionRoute { name: "launch_shell".to_owned(), timeout: Duration::from_secs(10) }; let (actions_tx, actions_rx) = bounded(1); bridge.register_action_route(route, actions_tx)?; let tunshell_client = TunshellClient::new(actions_rx, bridge_tx.clone()); @@ -435,7 +437,10 @@ impl Uplink { #[cfg(target_os = "linux")] if let Some(config) = self.config.logging.clone() { - let route = ActionRoute { name: "journalctl_config".to_string(), timeout: 10 }; + let route = ActionRoute { + name: "journalctl_config".to_string(), + timeout: Duration::from_secs(10), + }; let (actions_tx, actions_rx) = bounded(1); bridge.register_action_route(route, actions_tx)?; let logger = JournalCtl::new(config, actions_rx, bridge_tx.clone()); @@ -448,7 +453,10 @@ impl Uplink { #[cfg(target_os = "android")] if let Some(config) = self.config.logging.clone() { - let route = ActionRoute { name: "journalctl_config".to_string(), timeout: 10 }; + let route = ActionRoute { + name: "journalctl_config".to_string(), + timeout: Duration::from_secs(10), + }; let (actions_tx, actions_rx) = bounded(1); bridge.register_action_route(route, actions_tx)?; let logger = Logcat::new(config, actions_rx, bridge_tx.clone());