Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Real-time Sync: Add push logic and run method #568

Open
wants to merge 3 commits into
base: yse-rt-merge
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
139 changes: 138 additions & 1 deletion lib/core/src/sync/mod.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,21 @@
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;

use anyhow::{anyhow, Result};
use futures_util::TryFutureExt;

use crate::sync::model::sync::{Record, SetRecordRequest, SetRecordStatus};
use crate::utils;
use crate::{persist::Persister, prelude::Signer};

use self::client::SyncerClient;
use self::model::DecryptedRecord;
use self::model::{
data::{ChainSyncData, ReceiveSyncData, SendSyncData, SyncData},
sync::ListChangesRequest,
RecordType, SyncState,
};
use self::model::{DecryptedRecord, SyncOutgoingDetails};

pub(crate) mod client;
pub(crate) mod model;
Expand Down Expand Up @@ -50,6 +54,21 @@ impl SyncService {
}
}

async fn run(self: Arc<Self>) -> Result<()> {
let cloned = self.clone();

tokio::spawn(async move {
loop {
if let Err(err) = cloned.pull().and_then(|_| cloned.push()).await {
log::debug!("Could not run sync event loop: {err:?}");
}
tokio::time::sleep(Duration::from_secs(30)).await;
}
});

Ok(())
}

fn commit_record(
&self,
decrypted_record: &DecryptedRecord,
Expand Down Expand Up @@ -208,6 +227,124 @@ impl SyncService {

Ok(())
}

async fn push(&self) -> Result<()> {
let outgoing_details = self.persister.get_sync_outgoing_details()?;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps we can have one method: get_records_to_push which will return all records with revisions we need to push?


for SyncOutgoingDetails {
record_id,
record_type,
..
} in outgoing_details
{
// Step 1: Get the sync state, ensure it exists
let Some(sync_state) = unwrap_or_continue!(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we are iterating each record here I don't think we need the unwrap_or_continue macro repetitive calls.
We can have one method that is called process_push for example which we call here and log on error once if we get error as result.
I think it sill simplify the code.

self.persister.get_sync_state_by_record_id(&record_id),
"Could not retrieve sync state: {}"
) else {
log::debug!("Cannot sync record without any sync state. Skipping.");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When clients upgrade the sdk how would we push their existing swaps (first revision)?
One way is to add a migration step to add sync state for all existing swaps. The other option is to include get_sync_outgoing_details all swaps that without sync details.
I somehow like the second option because:

  1. It doesn't need any migration step
  2. We don't need to add sync_state on every time we adda swap and it helps us also separate the sync logic from the business logic.

continue;
};

// Step 2: Fetch the sync data
let sync_data = unwrap_or_continue!(
self.load_sync_data(&sync_state.data_id, record_type),
"Could not load sync data: {}"
);

// Step 3: Create the record to push outwards
let record = unwrap_or_continue!(
Record::new(
sync_state.record_id.clone(),
sync_data,
sync_state.record_revision,
self.signer.clone()
),
"Could not create new record: {}"
);

// Step 5: Push the record
let req = unwrap_or_continue!(
SetRecordRequest::new(record, utils::now(), self.signer.clone()),
"Could not create SetRecordRequest: {}"
);
let reply =
unwrap_or_continue!(self.client.push(req).await, "Could not push record: {}");

// Step 6: Check for conflict. If present, skip and retry on the next call
if reply.status() == SetRecordStatus::Conflict {
continue;
}

// Step 7: Update state revision
unwrap_or_continue!(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where do we clean the updated_fields?

self.persister.set_sync_state(SyncState {
data_id: sync_state.data_id,
record_id: sync_state.record_id,
record_revision: reply.new_revision,
is_local: sync_state.is_local,
}),
"Could not set final sync state after push: {}"
);
}

Ok(())
}

pub(crate) async fn enqueue_record(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method is racy. The updated_fields should be updated in the same transaction that we update the swap.

&self,
data_id: &str,
record_type: RecordType,
new_updated_fields: Option<Vec<String>>,
) -> Result<()> {
let maybe_previous_sync_state = self.persister.get_sync_state_by_data_id(data_id)?;

let record_revision = maybe_previous_sync_state
.as_ref()
.map(|sync_state| sync_state.record_revision)
.unwrap_or(0);
let is_local = maybe_previous_sync_state
.as_ref()
.map(|sync_state| sync_state.is_local)
.unwrap_or(true);
let record_id = maybe_previous_sync_state
.map(|sync_state| sync_state.record_id)
.unwrap_or(uuid::Uuid::new_v4().to_string());

// Try to merge previous updated fields with new ones, if either is present
let maybe_previous_updated_fields = self
.persister
.get_sync_outgoing_details_by_id(&record_id)?
.map(|details| details.updated_fields)
.flatten();

let updated_fields = match (maybe_previous_updated_fields, new_updated_fields) {
(None, None) => None,
(Some(previous_fields), None) => Some(previous_fields),
(None, Some(new_fields)) => Some(new_fields),
(Some(previous_fields), Some(new_fields)) => {
Some([previous_fields, new_fields].concat())
}
};

let outgoing_details = SyncOutgoingDetails {
record_id: record_id.clone(),
record_type,
updated_fields,
commit_time: utils::now(),
};
let new_state = SyncState {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As far as I understand the enqueue method should update the update_fields in the sync_outgoing_details so why do we need to update the sync_state ?

data_id: data_id.to_string(),
record_id,
record_revision,
is_local,
};

self.persister
.set_sync_outgoing_details_and_state(outgoing_details, new_state)?;

Ok(())
}
}

#[cfg(test)]
Expand Down
28 changes: 27 additions & 1 deletion lib/core/src/sync/model/client.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use anyhow::Result;
use lwk_wollet::hashes::hex::DisplayHex as _;
use openssl::sha::sha256;
use std::sync::Arc;

Expand All @@ -7,7 +8,10 @@ use crate::{
utils,
};

use super::{sync::ListChangesRequest, MESSAGE_PREFIX};
use super::{
sync::{ListChangesRequest, Record, SetRecordRequest},
CURRENT_SCHEMA_VERSION, MESSAGE_PREFIX,
};

fn sign_message(msg: &[u8], signer: Arc<Box<dyn Signer>>) -> Result<String, SignerError> {
let msg = [MESSAGE_PREFIX, msg].concat();
Expand All @@ -29,3 +33,25 @@ impl ListChangesRequest {
})
}
}
impl SetRecordRequest {
pub(crate) fn new(
record: Record,
request_time: u32,
signer: Arc<Box<dyn Signer>>,
) -> Result<Self, SignerError> {
let msg = format!(
"{}-{}-{}-{}-{}",
record.id,
record.data.to_lower_hex_string(),
record.revision,
CURRENT_SCHEMA_VERSION.to_string(),
request_time,
);
let signature = sign_message(msg.as_bytes(), signer)?;
Ok(Self {
record: Some(record),
request_time,
signature,
})
}
}
Loading