From 0c7f8bbed0ac354a1348a174afe08e8475e76c40 Mon Sep 17 00:00:00 2001 From: Piotr Jastrzebski Date: Wed, 23 Oct 2024 13:53:16 +0200 Subject: [PATCH] Expose detailed sync stats through Database::get_sync_usage_stats Signed-off-by: Piotr Jastrzebski --- libsql/src/local/database.rs | 15 +++++++++ libsql/src/replication/mod.rs | 41 +++++++++++++++++++++++++ libsql/src/replication/remote_client.rs | 6 +++- 3 files changed, 61 insertions(+), 1 deletion(-) diff --git a/libsql/src/local/database.rs b/libsql/src/local/database.rs index 3453a777c9..141ff54987 100644 --- a/libsql/src/local/database.rs +++ b/libsql/src/local/database.rs @@ -21,6 +21,7 @@ cfg_replication!( use crate::{database::OpenFlags, local::connection::Connection}; use crate::{Error::ConnectionFailed, Result}; use libsql_sys::ffi; +use crate::replication::SyncUsageStats; // A libSQL database. pub struct Database { @@ -277,6 +278,20 @@ impl Database { Ok(self.sync_oneshot().await?) } + #[cfg(feature = "replication")] + /// Return detailed logs about bytes synced with primary + pub async fn get_sync_usage_stats(&self) -> Result { + if let Some(ctx) = &self.replication_ctx { + let sync_stats = ctx.replicator.get_sync_usage_stats().await?; + Ok(sync_stats) + } else { + Err(crate::errors::Error::Misuse( + "No replicator available. Use Database::with_replicator() to enable replication" + .to_string(), + )) + } + } + #[cfg(feature = "replication")] /// Sync with primary at least to a given replication index pub async fn sync_until(&self, replication_index: FrameNo) -> Result { diff --git a/libsql/src/replication/mod.rs b/libsql/src/replication/mod.rs index 851f8f238f..76d0aaf8af 100644 --- a/libsql/src/replication/mod.rs +++ b/libsql/src/replication/mod.rs @@ -65,6 +65,18 @@ pub enum Frames { Snapshot(SnapshotFile), } +/// Detailed logs about bytes synced with primary +pub struct SyncUsageStats { + pub prefetched_bytes: u64, + pub prefetched_bytes_discarded_due_to_new_session: u64, + pub prefetched_bytes_discarded_due_to_consecutive_handshake: u64, + pub prefetched_bytes_discarded_due_to_invalid_frame_header: u64, + pub synced_bytes_discarded_due_to_invalid_frame_header: u64, + pub prefetched_bytes_used: u64, + pub synced_bytes_used: u64, + pub snapshot_bytes: u64, +} + #[derive(Clone)] pub(crate) struct Writer { pub(crate) client: client::Client, @@ -210,6 +222,35 @@ impl EmbeddedReplicator { }) } + pub async fn get_sync_usage_stats(&self) -> Result { + let mut replicator = self.replicator.lock().await; + match replicator.client_mut() { + Either::Right(_) => { + Err(crate::errors::Error::Misuse( + "Trying to get sync usage stats, but this is a local replicator".into(), + )) + } + Either::Left(c) => { + let stats = c.sync_stats(); + Ok(SyncUsageStats { + prefetched_bytes: stats.prefetched_bytes.load(std::sync::atomic::Ordering::SeqCst), + prefetched_bytes_discarded_due_to_new_session: stats + .prefetched_bytes_discarded_due_to_new_session.load(std::sync::atomic::Ordering::SeqCst), + prefetched_bytes_discarded_due_to_consecutive_handshake: stats + .prefetched_bytes_discarded_due_to_consecutive_handshake.load(std::sync::atomic::Ordering::SeqCst), + prefetched_bytes_discarded_due_to_invalid_frame_header: stats + .prefetched_bytes_discarded_due_to_invalid_frame_header.load(std::sync::atomic::Ordering::SeqCst), + synced_bytes_discarded_due_to_invalid_frame_header: stats + .synced_bytes_discarded_due_to_invalid_frame_header.load(std::sync::atomic::Ordering::SeqCst), + prefetched_bytes_used: stats.prefetched_bytes_used.load(std::sync::atomic::Ordering::SeqCst), + synced_bytes_used: stats.synced_bytes_used.load(std::sync::atomic::Ordering::SeqCst), + snapshot_bytes: stats.snapshot_bytes.load(std::sync::atomic::Ordering::SeqCst), + }) + } + } + + } + pub async fn sync_oneshot(&self) -> Result { use libsql_replication::replicator::ReplicatorClient; diff --git a/libsql/src/replication/remote_client.rs b/libsql/src/replication/remote_client.rs index 7beaf97f92..ba2b75c40a 100644 --- a/libsql/src/replication/remote_client.rs +++ b/libsql/src/replication/remote_client.rs @@ -24,7 +24,7 @@ async fn time(fut: impl Future) -> (O, Duration) { (out, before.elapsed()) } -struct SyncStats { +pub(crate) struct SyncStats { pub prefetched_bytes: AtomicU64, pub prefetched_bytes_discarded_due_to_new_session: AtomicU64, pub prefetched_bytes_discarded_due_to_consecutive_handshake: AtomicU64, @@ -121,6 +121,10 @@ impl RemoteClient { }) } + pub(crate) fn sync_stats(&self) -> Arc { + self.sync_stats.clone() + } + fn next_offset(&self) -> FrameNo { match self.last_received { Some(fno) => fno + 1,