From 34cefd0434995a25700e52291b7c9da2a5c34b7c Mon Sep 17 00:00:00 2001 From: Sven Erik Jeroschewski Date: Tue, 15 Oct 2024 19:20:49 +0200 Subject: [PATCH 1/3] adds second kuksa client in zenoh-kuksa-provider To avoid a deadlock between waiting for the kuksa subscription and sending messages on the same client, a second client gets initialized. In addition, some clippy warning get fixed. --- zenoh-kuksa-provider/src/main.rs | 14 ++++++-------- zenoh-kuksa-provider/src/provider_config.rs | 2 +- zenoh-kuksa-provider/src/utils/kuksa_utils.rs | 4 ++-- zenoh-kuksa-provider/src/utils/zenoh_utils.rs | 2 +- 4 files changed, 10 insertions(+), 12 deletions(-) diff --git a/zenoh-kuksa-provider/src/main.rs b/zenoh-kuksa-provider/src/main.rs index dee4f22..84fcc67 100644 --- a/zenoh-kuksa-provider/src/main.rs +++ b/zenoh-kuksa-provider/src/main.rs @@ -91,10 +91,8 @@ async fn handling_zenoh_subscribtion( async fn publish_to_zenoh( provider_config: Arc, session: Arc, - kuksa_client: Arc>, + mut kuksa_client: kuksa::Client, ) { - let mut actuation_client = kuksa_client.lock().await; - let attachment = Some(String::from("type=targetValue")); let vss_paths = Vec::from_iter(provider_config.signals.iter().map(String::as_str)); @@ -114,7 +112,7 @@ async fn publish_to_zenoh( vss_paths ); - match actuation_client.subscribe_target_values(vss_paths).await { + match kuksa_client.subscribe_target_values(vss_paths).await { Ok(mut stream) => { while let Some(response) = stream.message().await.unwrap() { for update in &response.updates { @@ -123,7 +121,7 @@ async fn publish_to_zenoh( let vss_path = &entry.path; if let Some(publisher) = publishers.get(vss_path.as_str()) { - let buf = match datapoint_to_string(&datapoint) { + let buf = match datapoint_to_string(datapoint) { Some(v) => v, None => "null".to_string(), }; @@ -182,7 +180,8 @@ async fn main() { let uri = kuksa::Uri::try_from(provider_config.kuksa.databroker_url.as_str()) .expect("Invalid URI for Kuksa Databroker connection."); - let client = Arc::new(Mutex::new(kuksa::Client::new(uri))); + let client = Arc::new(Mutex::new(kuksa::Client::new(uri.clone()))); + let actuation_client = kuksa::Client::new(uri); fetch_metadata( client.clone(), @@ -202,8 +201,7 @@ async fn main() { let publisher_handle = tokio::spawn({ let session = Arc::clone(&zenoh_session); let provider_config = Arc::clone(&provider_config); - let kuksa_client = Arc::clone(&client); - publish_to_zenoh(provider_config, session, kuksa_client) + publish_to_zenoh(provider_config, session, actuation_client) }); let _ = subscriber_handle.await; diff --git a/zenoh-kuksa-provider/src/provider_config.rs b/zenoh-kuksa-provider/src/provider_config.rs index 25c669c..8c21d9e 100644 --- a/zenoh-kuksa-provider/src/provider_config.rs +++ b/zenoh-kuksa-provider/src/provider_config.rs @@ -65,7 +65,7 @@ impl ProviderConfig { }; config.set_mode(Some(mode)).unwrap(); - if self.zenoh.scouting.multicast.enabled == true { + if self.zenoh.scouting.multicast.enabled { config.scouting.multicast.set_enabled(Some(true)).unwrap(); config diff --git a/zenoh-kuksa-provider/src/utils/kuksa_utils.rs b/zenoh-kuksa-provider/src/utils/kuksa_utils.rs index 98ad0fa..b6b75d0 100644 --- a/zenoh-kuksa-provider/src/utils/kuksa_utils.rs +++ b/zenoh-kuksa-provider/src/utils/kuksa_utils.rs @@ -68,10 +68,10 @@ pub fn new_datapoint(data_type: &DataType, payload: &ZBuf) -> Datapoint { nanos: duration_since_epoch.subsec_nanos() as i32, }; - return Datapoint { + Datapoint { timestamp: Some(timestamp), // TODO: get timestamp right value: Some(value), - }; + } } pub fn new_datapoint_for_update( diff --git a/zenoh-kuksa-provider/src/utils/zenoh_utils.rs b/zenoh-kuksa-provider/src/utils/zenoh_utils.rs index c17993f..5b9f8d2 100644 --- a/zenoh-kuksa-provider/src/utils/zenoh_utils.rs +++ b/zenoh-kuksa-provider/src/utils/zenoh_utils.rs @@ -30,7 +30,7 @@ pub fn zbuf_to_string(zbuf: &ZBuf) -> Result { for zslice in zbuf.zslices() { bytes.extend_from_slice(zslice.as_slice()); } - String::from_utf8(bytes).map_err(|e| std::str::Utf8Error::from(e.utf8_error())) + String::from_utf8(bytes).map_err(|e| e.utf8_error()) } pub fn extract_attachment_as_string(sample: &Sample) -> String { From 501d510770cdae05ec1fe33227a3687f685c0876 Mon Sep 17 00:00:00 2001 From: Sven Erik Jeroschewski Date: Fri, 18 Oct 2024 11:40:58 +0200 Subject: [PATCH 2/3] decreases the part for when the kuksa client is locked in the zenoh-kuksa-provider --- zenoh-kuksa-provider/Cargo.toml | 2 ++ zenoh-kuksa-provider/src/main.rs | 4 ++-- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/zenoh-kuksa-provider/Cargo.toml b/zenoh-kuksa-provider/Cargo.toml index 6be94b1..c75adff 100644 --- a/zenoh-kuksa-provider/Cargo.toml +++ b/zenoh-kuksa-provider/Cargo.toml @@ -12,6 +12,8 @@ ######################################################################## +[workspace] + [package] name = "zenoh-kuksa-provider" version = "0.1.0" diff --git a/zenoh-kuksa-provider/src/main.rs b/zenoh-kuksa-provider/src/main.rs index 84fcc67..4f31248 100644 --- a/zenoh-kuksa-provider/src/main.rs +++ b/zenoh-kuksa-provider/src/main.rs @@ -55,8 +55,6 @@ async fn handling_zenoh_subscribtion( ) { info!("Listening on selector: {:?}", provider_config.zenoh.key_exp); - let mut sub_client = kuksa_client.lock().await; - let provider_config_clone = Arc::clone(&provider_config); let subscriber = session .declare_subscriber(provider_config_clone.zenoh.key_exp.clone()) @@ -79,11 +77,13 @@ async fn handling_zenoh_subscribtion( if field_type == "currentValue" { let datapoint_update = new_datapoint_for_update(&vss_path, &sample, &store); + let mut sub_client = kuksa_client.lock().await; debug!("Forwarding: {:#?}", datapoint_update); sub_client .set_current_values(datapoint_update) .await .unwrap(); + drop(sub_client); } } } From 62f4012e2901c689e7634757be950264f8a44349 Mon Sep 17 00:00:00 2001 From: Sven Erik Jeroschewski Date: Fri, 25 Oct 2024 22:00:37 +0200 Subject: [PATCH 3/3] removes mutexes from zenoh-kuksa-client --- zenoh-kuksa-provider/Cargo.toml | 3 --- zenoh-kuksa-provider/src/main.rs | 16 ++++++---------- zenoh-kuksa-provider/src/utils/kuksa_utils.rs | 13 +++++++------ 3 files changed, 13 insertions(+), 19 deletions(-) diff --git a/zenoh-kuksa-provider/Cargo.toml b/zenoh-kuksa-provider/Cargo.toml index c75adff..cac7683 100644 --- a/zenoh-kuksa-provider/Cargo.toml +++ b/zenoh-kuksa-provider/Cargo.toml @@ -11,9 +11,6 @@ # SPDX-License-Identifier: Apache-2.0 ######################################################################## - -[workspace] - [package] name = "zenoh-kuksa-provider" version = "0.1.0" diff --git a/zenoh-kuksa-provider/src/main.rs b/zenoh-kuksa-provider/src/main.rs index 4f31248..a51ccd7 100644 --- a/zenoh-kuksa-provider/src/main.rs +++ b/zenoh-kuksa-provider/src/main.rs @@ -17,7 +17,6 @@ use provider_config::ProviderConfig; use std::collections::HashMap; use std::sync::Arc; use std::{error::Error, fs}; -use tokio::sync::Mutex; mod provider_config; mod utils; @@ -51,7 +50,7 @@ async fn handling_zenoh_subscribtion( provider_config: Arc, session: Arc, metadata_store: MetadataStore, - kuksa_client: Arc>, + mut kuksa_client: kuksa::Client, ) { info!("Listening on selector: {:?}", provider_config.zenoh.key_exp); @@ -77,13 +76,11 @@ async fn handling_zenoh_subscribtion( if field_type == "currentValue" { let datapoint_update = new_datapoint_for_update(&vss_path, &sample, &store); - let mut sub_client = kuksa_client.lock().await; debug!("Forwarding: {:#?}", datapoint_update); - sub_client + kuksa_client .set_current_values(datapoint_update) .await .unwrap(); - drop(sub_client); } } } @@ -180,11 +177,11 @@ async fn main() { let uri = kuksa::Uri::try_from(provider_config.kuksa.databroker_url.as_str()) .expect("Invalid URI for Kuksa Databroker connection."); - let client = Arc::new(Mutex::new(kuksa::Client::new(uri.clone()))); + let mut client = kuksa::Client::new(uri.clone()); let actuation_client = kuksa::Client::new(uri); - fetch_metadata( - client.clone(), + client = fetch_metadata( + client, provider_config.signals.iter().map(|s| s as &str).collect(), &metadata_store, ) @@ -194,8 +191,7 @@ async fn main() { let session = Arc::clone(&zenoh_session); let provider_config = Arc::clone(&provider_config); let metadata_store = Arc::clone(&metadata_store); - let kuksa_client = Arc::clone(&client); - handling_zenoh_subscribtion(provider_config, session, metadata_store, kuksa_client) + handling_zenoh_subscribtion(provider_config, session, metadata_store, client) }); let publisher_handle = tokio::spawn({ diff --git a/zenoh-kuksa-provider/src/utils/kuksa_utils.rs b/zenoh-kuksa-provider/src/utils/kuksa_utils.rs index b6b75d0..a3d29c9 100644 --- a/zenoh-kuksa-provider/src/utils/kuksa_utils.rs +++ b/zenoh-kuksa-provider/src/utils/kuksa_utils.rs @@ -12,24 +12,23 @@ ********************************************************************************/ use kuksa::proto::v1::{datapoint::Value, DataType, Datapoint}; +use kuksa::Client; use log::warn; use prost_types::Timestamp; use std::time::{SystemTime, UNIX_EPOCH}; -use std::{collections::HashMap, sync::Arc}; -use tokio::sync::Mutex; +use std::collections::HashMap; use zenoh::{buffers::ZBuf, sample::Sample}; use crate::utils::{metadata_store::MetadataInfo, zenoh_utils::zbuf_to_string}; pub async fn fetch_metadata( - kuksa_client: Arc>, + mut kuksa_client: Client, paths: Vec<&str>, metadata_store: &super::metadata_store::MetadataStore, -) { - let mut client = kuksa_client.lock().await; +) -> Client { let mut store = metadata_store.lock().await; - let data_entries: Vec = client.get_metadata(paths).await.unwrap(); + let data_entries: Vec = kuksa_client.get_metadata(paths).await.unwrap(); for entry in data_entries { store.insert( @@ -40,6 +39,8 @@ pub async fn fetch_metadata( }, ); } + + kuksa_client } pub fn new_datapoint(data_type: &DataType, payload: &ZBuf) -> Datapoint {