Skip to content

Commit

Permalink
fix: ActionResponse.action_id serialization (#302)
Browse files Browse the repository at this point in the history
* fix: `ActionResponse.action_id` serialization

* fix: forwarding `action_status`

* style: `Point` impl traits

* fix: remove unused import
  • Loading branch information
de-sh authored Nov 3, 2023
1 parent 6da56bc commit 4a12479
Show file tree
Hide file tree
Showing 6 changed files with 27 additions and 39 deletions.
23 changes: 3 additions & 20 deletions uplink/src/base/actions.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use serde::{Deserialize, Serialize};
use serde_json::json;
use tokio::time::Instant;

use crate::{Payload, Point};
Expand Down Expand Up @@ -98,33 +97,17 @@ impl ActionResponse {
self
}

pub fn as_payload(&self) -> Payload {
Payload::from(self)
}

pub fn from_payload(payload: &Payload) -> Result<Self, serde_json::Error> {
let intermediate = serde_json::to_value(payload)?;
serde_json::from_value(intermediate)
}
}

impl From<&ActionResponse> for Payload {
fn from(resp: &ActionResponse) -> Self {
Self {
stream: "action_status".to_owned(),
sequence: resp.sequence,
timestamp: resp.timestamp,
payload: json!({
"id": resp.action_id,
"state": resp.state,
"progress": resp.progress,
"errors": resp.errors
}),
}
impl Point for ActionResponse {
fn stream_name(&self) -> &str {
"action_status"
}
}

impl Point for ActionResponse {
fn sequence(&self) -> u32 {
self.sequence
}
Expand Down
16 changes: 8 additions & 8 deletions uplink/src/base/bridge/actions_lane.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ pub struct ActionsBridge {
/// Actions incoming from backend
actions_rx: Receiver<Action>,
/// Contains stream to send ActionResponses on
streams: Streams,
streams: Streams<ActionResponse>,
/// Apps registered with the bridge
/// NOTE: Sometimes action_routes could overlap, the latest route
/// to be registered will be used in such a circumstance.
Expand Down Expand Up @@ -276,7 +276,7 @@ impl ActionsBridge {
Ok(())
}

async fn forward_action_response(&mut self, response: ActionResponse) {
async fn forward_action_response(&mut self, mut response: ActionResponse) {
if self.parallel_actions.contains(&response.action_id) {
self.forward_parallel_action_response(response).await;

Expand All @@ -297,7 +297,7 @@ impl ActionsBridge {
}

info!("Action response = {:?}", response);
self.streams.forward(response.as_payload()).await;
self.streams.forward(response.clone()).await;

if response.is_completed() || response.is_failed() {
self.clear_current_action();
Expand All @@ -309,15 +309,15 @@ impl ActionsBridge {
if response.is_done() {
let mut action = inflight_action.action.clone();

if let Some(a) = response.done_response {
if let Some(a) = response.done_response.take() {
action = a;
}

if let Err(RedirectionError(action)) = self.redirect_action(action).await {
// NOTE: send success reponse for actions that don't have redirections configured
warn!("Action redirection is not configured for: {:?}", action);
let response = ActionResponse::success(&action.action_id);
self.streams.forward(response.as_payload()).await;
self.streams.forward(response).await;

self.clear_current_action();
}
Expand Down Expand Up @@ -350,17 +350,17 @@ impl ActionsBridge {

async fn forward_parallel_action_response(&mut self, response: ActionResponse) {
info!("Action response = {:?}", response);
self.streams.forward(response.as_payload()).await;

if response.is_completed() || response.is_failed() {
self.parallel_actions.remove(&response.action_id);
}

self.streams.forward(response).await;
}

async fn forward_action_error(&mut self, action: Action, error: Error) {
let response = ActionResponse::failure(&action.action_id, error.to_string());

self.streams.forward(response.as_payload()).await;
self.streams.forward(response).await;
}
}

Expand Down
2 changes: 1 addition & 1 deletion uplink/src/base/bridge/data_lane.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ pub struct DataBridge {
/// Rx to receive data from apps
data_rx: Receiver<Payload>,
/// Handle to send data over streams
streams: Streams,
streams: Streams<Payload>,
ctrl_rx: Receiver<DataBridgeShutdown>,
ctrl_tx: Sender<DataBridgeShutdown>,
}
Expand Down
7 changes: 6 additions & 1 deletion uplink/src/base/bridge/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ pub use self::{
use super::Compression;
pub use metrics::StreamMetrics;

pub trait Point: Send + Debug {
pub trait Point: Send + Debug + Serialize + 'static {
fn stream_name(&self) -> &str;
fn sequence(&self) -> u32;
fn timestamp(&self) -> u64;
}
Expand Down Expand Up @@ -56,6 +57,10 @@ pub struct Payload {
}

impl Point for Payload {
fn stream_name(&self) -> &str {
&self.stream
}

fn sequence(&self) -> u32 {
self.sequence
}
Expand Down
4 changes: 2 additions & 2 deletions uplink/src/base/bridge/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ pub struct Stream<T> {

impl<T> Stream<T>
where
T: Point + Debug + Send + 'static,
T: Point,
Buffer<T>: Package,
{
pub fn new(
Expand Down Expand Up @@ -265,7 +265,7 @@ impl<T> Buffer<T> {

impl<T> Package for Buffer<T>
where
T: Debug + Send + Point,
T: Point,
Vec<T>: Serialize,
{
fn topic(&self) -> Arc<String> {
Expand Down
14 changes: 7 additions & 7 deletions uplink/src/base/bridge/streams.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,21 +5,21 @@ use flume::Sender;
use log::{error, info, trace};

use super::stream::{self, StreamStatus, MAX_BUFFER_SIZE};
use super::StreamMetrics;
use super::{Point, StreamMetrics};
use crate::base::StreamConfig;
use crate::{Config, Package, Payload, Stream};
use crate::{Config, Package, Stream};

use super::delaymap::DelayMap;

pub struct Streams {
pub struct Streams<T> {
config: Arc<Config>,
data_tx: Sender<Box<dyn Package>>,
metrics_tx: Sender<StreamMetrics>,
map: HashMap<String, Stream<Payload>>,
map: HashMap<String, Stream<T>>,
pub stream_timeouts: DelayMap<String>,
}

impl Streams {
impl<T: Point> Streams<T> {
pub fn new(
config: Arc<Config>,
data_tx: Sender<Box<dyn Package>>,
Expand All @@ -35,8 +35,8 @@ impl Streams {
}
}

pub async fn forward(&mut self, data: Payload) {
let stream_name = data.stream.to_owned();
pub async fn forward(&mut self, data: T) {
let stream_name = data.stream_name().to_string();

let stream = match self.map.get_mut(&stream_name) {
Some(partition) => partition,
Expand Down

0 comments on commit 4a12479

Please sign in to comment.