Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

adds second kuksa client in zenoh-kuksa-provider #10

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion zenoh-kuksa-provider/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
# SPDX-License-Identifier: Apache-2.0
########################################################################


[package]
name = "zenoh-kuksa-provider"
version = "0.1.0"
Expand Down
28 changes: 11 additions & 17 deletions zenoh-kuksa-provider/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ use provider_config::ProviderConfig;
use std::collections::HashMap;
use std::sync::Arc;
use std::{error::Error, fs};
use tokio::sync::Mutex;

mod provider_config;
mod utils;
Expand Down Expand Up @@ -51,12 +50,10 @@ async fn handling_zenoh_subscribtion(
provider_config: Arc<ProviderConfig>,
session: Arc<Session>,
metadata_store: MetadataStore,
kuksa_client: Arc<Mutex<kuksa::Client>>,
mut kuksa_client: kuksa::Client,
) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use reference for kuksa_client

info!("Listening on selector: {:?}", provider_config.zenoh.key_exp);

let mut sub_client = kuksa_client.lock().await;

let provider_config_clone = Arc::clone(&provider_config);
let subscriber = session
.declare_subscriber(provider_config_clone.zenoh.key_exp.clone())
Expand All @@ -80,7 +77,7 @@ async fn handling_zenoh_subscribtion(
let datapoint_update = new_datapoint_for_update(&vss_path, &sample, &store);

debug!("Forwarding: {:#?}", datapoint_update);
sub_client
kuksa_client
.set_current_values(datapoint_update)
.await
.unwrap();
Expand All @@ -91,10 +88,8 @@ async fn handling_zenoh_subscribtion(
async fn publish_to_zenoh(
provider_config: Arc<ProviderConfig>,
session: Arc<Session>,
kuksa_client: Arc<Mutex<kuksa::Client>>,
mut kuksa_client: kuksa::Client,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use reference. mut not needed since you're not editing the client

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is necessary to have a mutable client because the implementation for the functions requires a mutable reference for self.

    pub async fn subscribe_target_values(
        &mut self,
        paths: Vec<&str>,

Correct me if I'm wrong.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will check if mut is necessary there, but yes you're right. Nice catch!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've checked it is necessary so all good here

) {
let mut actuation_client = kuksa_client.lock().await;

let attachment = Some(String::from("type=targetValue"));

let vss_paths = Vec::from_iter(provider_config.signals.iter().map(String::as_str));
Expand All @@ -114,7 +109,7 @@ async fn publish_to_zenoh(
vss_paths
);

match actuation_client.subscribe_target_values(vss_paths).await {
match kuksa_client.subscribe_target_values(vss_paths).await {
Ok(mut stream) => {
while let Some(response) = stream.message().await.unwrap() {
for update in &response.updates {
Expand All @@ -123,7 +118,7 @@ async fn publish_to_zenoh(
let vss_path = &entry.path;

if let Some(publisher) = publishers.get(vss_path.as_str()) {
let buf = match datapoint_to_string(&datapoint) {
let buf = match datapoint_to_string(datapoint) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please keep using the reference

Some(v) => v,
None => "null".to_string(),
};
Expand Down Expand Up @@ -182,10 +177,11 @@ async fn main() {

let uri = kuksa::Uri::try_from(provider_config.kuksa.databroker_url.as_str())
.expect("Invalid URI for Kuksa Databroker connection.");
let client = Arc::new(Mutex::new(kuksa::Client::new(uri)));
let mut client = kuksa::Client::new(uri.clone());
let actuation_client = kuksa::Client::new(uri);

fetch_metadata(
client.clone(),
client = fetch_metadata(
client,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should work if you give a reference or clone the client

provider_config.signals.iter().map(|s| s as &str).collect(),
&metadata_store,
)
Expand All @@ -195,15 +191,13 @@ async fn main() {
let session = Arc::clone(&zenoh_session);
let provider_config = Arc::clone(&provider_config);
let metadata_store = Arc::clone(&metadata_store);
let kuksa_client = Arc::clone(&client);
handling_zenoh_subscribtion(provider_config, session, metadata_store, kuksa_client)
handling_zenoh_subscribtion(provider_config, session, metadata_store, client)
});

let publisher_handle = tokio::spawn({
let session = Arc::clone(&zenoh_session);
let provider_config = Arc::clone(&provider_config);
let kuksa_client = Arc::clone(&client);
publish_to_zenoh(provider_config, session, kuksa_client)
publish_to_zenoh(provider_config, session, actuation_client)
});

let _ = subscriber_handle.await;
Expand Down
2 changes: 1 addition & 1 deletion zenoh-kuksa-provider/src/provider_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ impl ProviderConfig {
};
config.set_mode(Some(mode)).unwrap();

if self.zenoh.scouting.multicast.enabled == true {
if self.zenoh.scouting.multicast.enabled {
config.scouting.multicast.set_enabled(Some(true)).unwrap();

config
Expand Down
17 changes: 9 additions & 8 deletions zenoh-kuksa-provider/src/utils/kuksa_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,24 +12,23 @@
********************************************************************************/

use kuksa::proto::v1::{datapoint::Value, DataType, Datapoint};
use kuksa::Client;
use log::warn;
use prost_types::Timestamp;
use std::time::{SystemTime, UNIX_EPOCH};
use std::{collections::HashMap, sync::Arc};
use tokio::sync::Mutex;
use std::collections::HashMap;
use zenoh::{buffers::ZBuf, sample::Sample};

use crate::utils::{metadata_store::MetadataInfo, zenoh_utils::zbuf_to_string};

pub async fn fetch_metadata(
kuksa_client: Arc<Mutex<kuksa::Client>>,
mut kuksa_client: Client,
paths: Vec<&str>,
metadata_store: &super::metadata_store::MetadataStore,
) {
let mut client = kuksa_client.lock().await;
) -> Client {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you should not return anything here right?

let mut store = metadata_store.lock().await;

let data_entries: Vec<kuksa::DataEntry> = client.get_metadata(paths).await.unwrap();
let data_entries: Vec<kuksa::DataEntry> = kuksa_client.get_metadata(paths).await.unwrap();

for entry in data_entries {
store.insert(
Expand All @@ -40,6 +39,8 @@ pub async fn fetch_metadata(
},
);
}

kuksa_client
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unnecessary to return this

}

pub fn new_datapoint(data_type: &DataType, payload: &ZBuf) -> Datapoint {
Expand Down Expand Up @@ -68,10 +69,10 @@ pub fn new_datapoint(data_type: &DataType, payload: &ZBuf) -> Datapoint {
nanos: duration_since_epoch.subsec_nanos() as i32,
};

return Datapoint {
Datapoint {
timestamp: Some(timestamp), // TODO: get timestamp right
value: Some(value),
};
}
}

pub fn new_datapoint_for_update(
Expand Down
2 changes: 1 addition & 1 deletion zenoh-kuksa-provider/src/utils/zenoh_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ pub fn zbuf_to_string(zbuf: &ZBuf) -> Result<String, std::str::Utf8Error> {
for zslice in zbuf.zslices() {
bytes.extend_from_slice(zslice.as_slice());
}
String::from_utf8(bytes).map_err(|e| std::str::Utf8Error::from(e.utf8_error()))
String::from_utf8(bytes).map_err(|e| e.utf8_error())
}

pub fn extract_attachment_as_string(sample: &Sample) -> String {
Expand Down