diff --git a/data/vss-core/README.md b/data/vss-core/README.md index dbba2cff..73289905 100644 --- a/data/vss-core/README.md +++ b/data/vss-core/README.md @@ -60,7 +60,7 @@ use the full name. When official release is created replace the copied *.json-fi Build and run kuksa_databroker using the new VSS file according to [documentation](../../README.md), e.g. ```sh -$cargo run --bin databroker -- --metadata ../data/vss-core/vss_release_4.0.json +$cargo run --bin databroker -- --metadata ./data/vss-core/vss_release_4.0.json ``` Use the client to verify that changes in VSS are reflected, by doing e.g. set/get on some new or renamed signals. diff --git a/databroker-cli/src/kuksa_cli.rs b/databroker-cli/src/kuksa_cli.rs index 5dd91099..1b86f054 100644 --- a/databroker-cli/src/kuksa_cli.rs +++ b/databroker-cli/src/kuksa_cli.rs @@ -312,7 +312,7 @@ pub async fn kuksa_main(_cli: Cli) -> Result<(), Box> { } } Err(err) => { - cli::print_error(cmd, &format!("Malformed token: {err}"))? + cli::print_error(cmd, format!("Malformed token: {err}"))? } } } @@ -350,12 +350,12 @@ pub async fn kuksa_main(_cli: Cli) -> Result<(), Box> { } } Err(err) => { - cli::print_error(cmd, &format!("Malformed token: {err}"))? + cli::print_error(cmd, format!("Malformed token: {err}"))? } }, Err(err) => cli::print_error( cmd, - &format!( + format!( "Failed to open token file \"{token_filename}\": {err}" ), )?, diff --git a/databroker-cli/src/sdv_cli.rs b/databroker-cli/src/sdv_cli.rs index e1913e29..0bf91b0d 100644 --- a/databroker-cli/src/sdv_cli.rs +++ b/databroker-cli/src/sdv_cli.rs @@ -256,7 +256,7 @@ pub async fn sdv_main(_cli: Cli) -> Result<(), Box> { } } Err(err) => { - cli::print_error(cmd, &format!("Malformed token: {err}"))? + cli::print_error(cmd, format!("Malformed token: {err}"))? } } } @@ -295,12 +295,12 @@ pub async fn sdv_main(_cli: Cli) -> Result<(), Box> { } } Err(err) => { - cli::print_error(cmd, &format!("Malformed token: {err}"))? + cli::print_error(cmd, format!("Malformed token: {err}"))? } }, Err(err) => cli::print_error( cmd, - &format!( + format!( "Failed to open token file \"{token_filename}\": {err}" ), )?, diff --git a/databroker/src/grpc/kuksa_val_v2/conversions.rs b/databroker/src/grpc/kuksa_val_v2/conversions.rs index 44a990cc..874f7181 100644 --- a/databroker/src/grpc/kuksa_val_v2/conversions.rs +++ b/databroker/src/grpc/kuksa_val_v2/conversions.rs @@ -46,7 +46,12 @@ impl From<&proto::Datapoint> for broker::Datapoint { impl From for Option { fn from(from: broker::Datapoint) -> Self { match from.value { - broker::DataValue::NotAvailable => None, + broker::DataValue::NotAvailable => Some(proto::Datapoint { + value_state: Some(proto::datapoint::ValueState::Failure( + proto::ValueFailure::NotProvided.into(), + )), + timestamp: None, + }), broker::DataValue::Bool(value) => Some(proto::Datapoint { value_state: Some(proto::datapoint::ValueState::Value(proto::Value { typed_value: Some(proto::value::TypedValue::Bool(value)), @@ -196,10 +201,7 @@ impl From<&i32> for broker::ValueFailure { fn from_i32(value: i32) -> proto::ValueFailure { // Use a match statement to convert the i32 to the corresponding enum variant match value { - 1 => proto::ValueFailure::InvalidValue, 2 => proto::ValueFailure::NotProvided, - 3 => proto::ValueFailure::UnknownSignal, - 4 => proto::ValueFailure::AccessDenied, 5 => proto::ValueFailure::InternalError, _ => proto::ValueFailure::Unspecified, } @@ -209,10 +211,7 @@ impl From<&proto::ValueFailure> for broker::ValueFailure { fn from(value_failure: &proto::ValueFailure) -> Self { match value_failure { proto::ValueFailure::Unspecified => broker::ValueFailure::Unspecified, - proto::ValueFailure::InvalidValue => broker::ValueFailure::InvalidValue, proto::ValueFailure::NotProvided => broker::ValueFailure::NotProvided, - proto::ValueFailure::UnknownSignal => broker::ValueFailure::UnknownSignal, - proto::ValueFailure::AccessDenied => broker::ValueFailure::AccessDenied, proto::ValueFailure::InternalError => broker::ValueFailure::InternalError, } } diff --git a/databroker/src/grpc/kuksa_val_v2/val.rs b/databroker/src/grpc/kuksa_val_v2/val.rs index 95f2061b..5037ccf1 100644 --- a/databroker/src/grpc/kuksa_val_v2/val.rs +++ b/databroker/src/grpc/kuksa_val_v2/val.rs @@ -14,7 +14,7 @@ use std::{collections::HashMap, pin::Pin}; use crate::{ - broker::{self, AuthorizedAccess, SubscriptionError}, + broker::{self, AuthorizedAccess, ReadError, SubscriptionError}, glob::Matcher, permissions::Permissions, }; @@ -39,12 +39,42 @@ const MAX_REQUEST_PATH_LENGTH: usize = 1000; impl proto::val_server::Val for broker::DataBroker { async fn get_value( &self, - _request: tonic::Request, + request: tonic::Request, ) -> Result, tonic::Status> { - Err(tonic::Status::new( - tonic::Code::Unimplemented, - "Unimplemented", - )) + debug!(?request); + let permissions = match request.extensions().get::() { + Some(permissions) => { + debug!(?permissions); + permissions.clone() + } + None => return Err(tonic::Status::unauthenticated("Unauthenticated")), + }; + + let broker = self.authorized_access(&permissions); + + let request = request.into_inner(); + + let signal_id = match get_signal(request.signal_id, &broker).await { + Ok(signal_id) => signal_id, + Err(err) => return Err(err), + }; + + let datapoint = match broker.get_datapoint(signal_id).await { + Ok(datapoint) => datapoint, + Err(ReadError::NotFound) => { + return Err(tonic::Status::new(tonic::Code::NotFound, "Path not found")) + } + Err(ReadError::PermissionDenied | ReadError::PermissionExpired) => { + return Err(tonic::Status::new( + tonic::Code::PermissionDenied, + "Permission Denied", + )) + } + }; + + Ok(tonic::Response::new(proto::GetValueResponse { + data_point: datapoint.into(), + })) } async fn get_values( @@ -632,7 +662,14 @@ fn convert_to_proto_stream( let mut entries: HashMap = HashMap::with_capacity(size); for update in item.updates { let update_datapoint: Option = match update.update.datapoint { - Some(datapoint) => datapoint.into(), + Some(datapoint) => { + // For subscribe streams we do not want to return NotAvailable + // even if the values is not available when subscribe starts + match datapoint.value { + broker::DataValue::NotAvailable => None, + _ => datapoint.into(), + } + } None => None, }; if let Some(dp) = update_datapoint { @@ -697,6 +734,291 @@ mod tests { } } + // Helper for adding an int32 signal and adding value + async fn helper_add_int32( + broker: &DataBroker, + name: &str, + value: i32, + timestamp: std::time::SystemTime, + ) -> i32 { + let authorized_access = broker.authorized_access(&permissions::ALLOW_ALL); + let entry_id = authorized_access + .add_entry( + name.to_owned(), + broker::DataType::Int32, + broker::ChangeType::OnChange, + broker::EntryType::Sensor, + "Some Description hat Does Not Matter".to_owned(), + None, + None, + ) + .await + .unwrap(); + + let _ = authorized_access + .update_entries([( + entry_id, + broker::EntryUpdate { + path: None, + datapoint: Some(broker::Datapoint { + //ts: std::time::SystemTime::now(), + ts: timestamp, + source_ts: None, + value: broker::types::DataValue::Int32(value), + }), + actuator_target: None, + entry_type: None, + data_type: None, + description: None, + allowed: None, + unit: None, + }, + )]) + .await; + + entry_id + } + + #[tokio::test] + async fn test_get_value_id_ok() { + let broker = DataBroker::default(); + + let timestamp = std::time::SystemTime::now(); + + let entry_id = helper_add_int32(&broker, "test.datapoint1", -64, timestamp).await; + + let request = proto::GetValueRequest { + signal_id: Some(proto::SignalId { + signal: Some(proto::signal_id::Signal::Id(entry_id)), + }), + }; + + // Manually insert permissions + let mut get_value_request = tonic::Request::new(request); + get_value_request + .extensions_mut() + .insert(permissions::ALLOW_ALL.clone()); + + match broker.get_value(get_value_request).await { + Ok(response) => { + // Handle the successful response + let get_response = response.into_inner(); + + let value = proto::Value { + typed_value: Some(proto::value::TypedValue::Int32(-64)), + }; + assert_eq!( + get_response, + proto::GetValueResponse { + data_point: { + Some(proto::Datapoint { + timestamp: Some(timestamp.into()), + value_state: Some(proto::datapoint::ValueState::Value(value)), + }) + }, + } + ); + } + Err(status) => { + panic!("Get failed with status: {:?}", status); + } + } + } + + #[tokio::test] + async fn test_get_value_name_ok() { + let broker = DataBroker::default(); + + let timestamp = std::time::SystemTime::now(); + + let _entry_id = helper_add_int32(&broker, "test.datapoint1", -64, timestamp).await; + + let request = proto::GetValueRequest { + signal_id: Some(proto::SignalId { + signal: Some(proto::signal_id::Signal::Path( + "test.datapoint1".to_string(), + )), + }), + }; + + // Manually insert permissions + let mut get_value_request = tonic::Request::new(request); + get_value_request + .extensions_mut() + .insert(permissions::ALLOW_ALL.clone()); + + match broker.get_value(get_value_request).await { + Ok(response) => { + // Handle the successful response + let get_response = response.into_inner(); + + let value = proto::Value { + typed_value: Some(proto::value::TypedValue::Int32(-64)), + }; + assert_eq!( + get_response, + proto::GetValueResponse { + data_point: { + Some(proto::Datapoint { + timestamp: Some(timestamp.into()), + value_state: Some(proto::datapoint::ValueState::Value(value)), + }) + }, + } + ); + } + Err(status) => { + panic!("Get failed with status: {:?}", status); + } + } + } + + #[tokio::test] + async fn test_get_value_id_not_authorized() { + let broker = DataBroker::default(); + + let timestamp = std::time::SystemTime::now(); + + let entry_id = helper_add_int32(&broker, "test.datapoint1", -64, timestamp).await; + + let request = proto::GetValueRequest { + signal_id: Some(proto::SignalId { + signal: Some(proto::signal_id::Signal::Id(entry_id)), + }), + }; + + // Do not insert permissions + let get_value_request = tonic::Request::new(request); + + match broker.get_value(get_value_request).await { + Ok(_response) => { + panic!("Did not expect success"); + } + Err(status) => { + assert_eq!(status.code(), tonic::Code::Unauthenticated) + } + } + } + + #[tokio::test] + async fn test_get_value_id_no_value() { + // Define signal but do not assign any value + + let broker = DataBroker::default(); + let authorized_access = broker.authorized_access(&permissions::ALLOW_ALL); + let entry_id = authorized_access + .add_entry( + "test.datapoint1".to_string(), + broker::DataType::Int32, + broker::ChangeType::OnChange, + broker::EntryType::Sensor, + "Some Description hat Does Not Matter".to_owned(), + None, + None, + ) + .await + .unwrap(); + + // Now try to get it + + let request = proto::GetValueRequest { + signal_id: Some(proto::SignalId { + signal: Some(proto::signal_id::Signal::Id(entry_id)), + }), + }; + + // Manually insert permissions + let mut get_value_request = tonic::Request::new(request); + get_value_request + .extensions_mut() + .insert(permissions::ALLOW_ALL.clone()); + + match broker.get_value(get_value_request).await { + Ok(response) => { + // Handle the successful response + let get_response = response.into_inner(); + + assert_eq!( + get_response, + proto::GetValueResponse { + data_point: { + Some(proto::Datapoint { + timestamp: None, + value_state: Some(proto::datapoint::ValueState::Failure( + proto::ValueFailure::NotProvided.into(), + )), + }) + }, + } + ); + } + Err(status) => { + // Handle the error from the publish_value function + panic!("Get failed with status: {:?}", status); + } + } + } + + #[tokio::test] + async fn test_get_value_id_not_defined() { + let broker = DataBroker::default(); + // Just use some arbitrary number + let entry_id: i32 = 12345; + + // Now try to get it + + let request = proto::GetValueRequest { + signal_id: Some(proto::SignalId { + signal: Some(proto::signal_id::Signal::Id(entry_id)), + }), + }; + + // Manually insert permissions + let mut get_value_request = tonic::Request::new(request); + get_value_request + .extensions_mut() + .insert(permissions::ALLOW_ALL.clone()); + + match broker.get_value(get_value_request).await { + Ok(_response) => { + panic!("Did not expect success"); + } + Err(status) => { + assert_eq!(status.code(), tonic::Code::NotFound) + } + } + } + + #[tokio::test] + async fn test_get_value_name_not_defined() { + let broker = DataBroker::default(); + + // Now try to get it + + let request = proto::GetValueRequest { + signal_id: Some(proto::SignalId { + signal: Some(proto::signal_id::Signal::Path( + "test.datapoint1".to_string(), + )), + }), + }; + + // Manually insert permissions + let mut get_value_request = tonic::Request::new(request); + get_value_request + .extensions_mut() + .insert(permissions::ALLOW_ALL.clone()); + + match broker.get_value(get_value_request).await { + Ok(_response) => { + panic!("Did not expect success"); + } + Err(status) => { + assert_eq!(status.code(), tonic::Code::NotFound) + } + } + } + #[tokio::test] async fn test_publish_value() { let broker = DataBroker::default(); @@ -839,6 +1161,8 @@ mod tests { .extensions_mut() .insert(permissions::ALLOW_ALL.clone()); + // Note: We subscribe before the signal test.datapoint1 has any value + // but we do not expect to get a NOT_AVAILABLE message back! let result = tokio::task::block_in_place(|| { // Blocking operation here // Since broker.subscribe is async, you need to run it in an executor diff --git a/proto/kuksa/val/v2/types.proto b/proto/kuksa/val/v2/types.proto index 9adc6053..1258aed8 100644 --- a/proto/kuksa/val/v2/types.proto +++ b/proto/kuksa/val/v2/types.proto @@ -22,7 +22,12 @@ message Datapoint { google.protobuf.Timestamp timestamp = 1; oneof value_state { + // When reading - returned if the signal exists but has no value, + // or some other error prevents a value from being provided + // When writing - could theoretically be used to "delete" a value ValueFailure failure = 2; + // When reading - this is returned if Databroker has an actual value for the signal + // When writing - this shall be used to provide a value to databroker Value value = 3; } } @@ -50,7 +55,14 @@ message Value { message SignalID { oneof signal { + // Numeric identifier to the signal + // As of today Databroker assigns arbitrary unique numbers to each registered signal + // at startup, meaning that identifiers may change after restarting Databroker. + // A mechanism for static identifiers may be introduced in the future. int32 id = 1; + // Full VSS-style path to a specific signal, like "Vehicle.Speed" + // Wildcards and paths to branches are not supported. + // The given path must be known by the Databroker. string path = 2; } } @@ -147,18 +159,20 @@ message ValueRestrictionString { repeated string allowed_values = 1; } +// Used to indicate status of a signal in Databroker +// This is given as an alternative to Value, so if there is a valid value +// ValueFailure shall not be specified. +// +// Scenarios where a signal does not exist or a user does not have access to a signal +// shall typically not be represented as a ValueFailure, as those secnarios +// should result in a GRPC error code being returned to the caller. enum ValueFailure { // Unspecified value failure, reserved for gRPC backwards compatibility // (see https://protobuf.dev/programming-guides/dos-donts/#unspecified-enum) UNSPECIFIED = 0; - // The signal is known and provided, but doesn't have a valid value - INVALID_VALUE = 1; // The signal is known, but no value is provided currently + // Can theoretically also be used in write request to "delete" a value NOT_PROVIDED = 2; - // The referred signal is unknown on the system - UNKNOWN_SIGNAL = 3; - // The client does not have the necessary access rights to the signal - ACCESS_DENIED = 4; // Unexpected internal error INTERNAL_ERROR = 5; } diff --git a/proto/kuksa/val/v2/val.proto b/proto/kuksa/val/v2/val.proto index 67d1a86f..4e603e28 100644 --- a/proto/kuksa/val/v2/val.proto +++ b/proto/kuksa/val/v2/val.proto @@ -25,6 +25,10 @@ service VAL { // Returns (GRPC error code): // NOT_FOUND if the requested signal doesn't exist // PERMISSION_DENIED if access is denied + // + // If the signal exist but does not have a valid value + // a ValueFailure datapoint will be returned. + // rpc GetValue(GetValueRequest) returns (GetValueResponse); // Get the latest values of a set of signals. @@ -41,13 +45,19 @@ service VAL { // is allowed to read are included (everything else is ignored). // // Returns (GRPC error code): - // NOT_FOUND if the specified root branch does not exist. + // NOT_FOUND if any of the requested signals doesn't exist. rpc ListValues(ListValuesRequest) returns (ListValuesResponse); // Subscribe to a set of signals using string path parameters // Returns (GRPC error code): // NOT_FOUND if any of the signals are non-existant. // PERMISSION_DENIED if access is denied for any of the signals. + // + // When subscribing the Broker shall immediately return the value for all + // subscribed entries. If no value is available when subscribing a value shall be returned + // first when a value is available. + // TODO - Verify logic above? + // If we have the availability to "delete" values then getting a NotAvailable message sort of makes sense rpc Subscribe(SubscribeRequest) returns (stream SubscribeResponse); // Subscribe to a set of signals using i32 id parameters