Skip to content

Commit

Permalink
Expose detailed sync stats through Database::get_sync_usage_stats
Browse files Browse the repository at this point in the history
Signed-off-by: Piotr Jastrzebski <[email protected]>
  • Loading branch information
haaawk committed Oct 23, 2024
1 parent 7889902 commit 1eaf5c2
Show file tree
Hide file tree
Showing 3 changed files with 95 additions and 1 deletion.
15 changes: 15 additions & 0 deletions libsql/src/local/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ cfg_replication!(
use crate::replication::remote_client::RemoteClient;
use crate::replication::EmbeddedReplicator;
pub use crate::replication::Frames;
pub use crate::replication::SyncUsageStats;

pub struct ReplicationContext {
pub(crate) replicator: EmbeddedReplicator,
Expand Down Expand Up @@ -277,6 +278,20 @@ impl Database {
Ok(self.sync_oneshot().await?)
}

#[cfg(feature = "replication")]
/// Return detailed logs about bytes synced with primary
pub async fn get_sync_usage_stats(&self) -> Result<SyncUsageStats> {
if let Some(ctx) = &self.replication_ctx {
let sync_stats = ctx.replicator.get_sync_usage_stats().await?;
Ok(sync_stats)
} else {
Err(crate::errors::Error::Misuse(
"No replicator available. Use Database::with_replicator() to enable replication"
.to_string(),
))
}
}

#[cfg(feature = "replication")]
/// Sync with primary at least to a given replication index
pub async fn sync_until(&self, replication_index: FrameNo) -> Result<crate::replication::Replicated> {
Expand Down
75 changes: 75 additions & 0 deletions libsql/src/replication/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,52 @@ pub enum Frames {
Snapshot(SnapshotFile),
}

/// Detailed logs about bytes synced with primary
pub struct SyncUsageStats {
prefetched_bytes: u64,
prefetched_bytes_discarded_due_to_new_session: u64,
prefetched_bytes_discarded_due_to_consecutive_handshake: u64,
prefetched_bytes_discarded_due_to_invalid_frame_header: u64,
synced_bytes_discarded_due_to_invalid_frame_header: u64,
prefetched_bytes_used: u64,
synced_bytes_used: u64,
snapshot_bytes: u64,
}

impl SyncUsageStats {
pub fn prefetched_bytes(&self) -> u64 {
self.prefetched_bytes
}

pub fn prefetched_bytes_discarded_due_to_new_session(&self) -> u64 {
self.prefetched_bytes_discarded_due_to_new_session
}

pub fn prefetched_bytes_discarded_due_to_consecutive_handshake(&self) -> u64 {
self.prefetched_bytes_discarded_due_to_consecutive_handshake
}

pub fn prefetched_bytes_discarded_due_to_invalid_frame_header(&self) -> u64 {
self.prefetched_bytes_discarded_due_to_invalid_frame_header
}

pub fn synced_bytes_discarded_due_to_invalid_frame_header(&self) -> u64 {
self.synced_bytes_discarded_due_to_invalid_frame_header
}

pub fn prefetched_bytes_used(&self) -> u64 {
self.prefetched_bytes_used
}

pub fn synced_bytes_used(&self) -> u64 {
self.synced_bytes_used
}

pub fn snapshot_bytes(&self) -> u64 {
self.snapshot_bytes
}
}

#[derive(Clone)]
pub(crate) struct Writer {
pub(crate) client: client::Client,
Expand Down Expand Up @@ -210,6 +256,35 @@ impl EmbeddedReplicator {
})
}

pub async fn get_sync_usage_stats(&self) -> Result<SyncUsageStats> {
let mut replicator = self.replicator.lock().await;
match replicator.client_mut() {
Either::Right(_) => {
Err(crate::errors::Error::Misuse(
"Trying to get sync usage stats, but this is a local replicator".into(),
))
}
Either::Left(c) => {
let stats = c.sync_stats();
Ok(SyncUsageStats {
prefetched_bytes: stats.prefetched_bytes.load(std::sync::atomic::Ordering::SeqCst),
prefetched_bytes_discarded_due_to_new_session: stats
.prefetched_bytes_discarded_due_to_new_session.load(std::sync::atomic::Ordering::SeqCst),
prefetched_bytes_discarded_due_to_consecutive_handshake: stats
.prefetched_bytes_discarded_due_to_consecutive_handshake.load(std::sync::atomic::Ordering::SeqCst),
prefetched_bytes_discarded_due_to_invalid_frame_header: stats
.prefetched_bytes_discarded_due_to_invalid_frame_header.load(std::sync::atomic::Ordering::SeqCst),
synced_bytes_discarded_due_to_invalid_frame_header: stats
.synced_bytes_discarded_due_to_invalid_frame_header.load(std::sync::atomic::Ordering::SeqCst),
prefetched_bytes_used: stats.prefetched_bytes_used.load(std::sync::atomic::Ordering::SeqCst),
synced_bytes_used: stats.synced_bytes_used.load(std::sync::atomic::Ordering::SeqCst),
snapshot_bytes: stats.snapshot_bytes.load(std::sync::atomic::Ordering::SeqCst),
})
}
}

}

pub async fn sync_oneshot(&self) -> Result<Replicated> {
use libsql_replication::replicator::ReplicatorClient;

Expand Down
6 changes: 5 additions & 1 deletion libsql/src/replication/remote_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ async fn time<O>(fut: impl Future<Output = O>) -> (O, Duration) {
(out, before.elapsed())
}

struct SyncStats {
pub(crate) struct SyncStats {
pub prefetched_bytes: AtomicU64,
pub prefetched_bytes_discarded_due_to_new_session: AtomicU64,
pub prefetched_bytes_discarded_due_to_consecutive_handshake: AtomicU64,
Expand Down Expand Up @@ -121,6 +121,10 @@ impl RemoteClient {
})
}

pub(crate) fn sync_stats(&self) -> Arc<SyncStats> {
self.sync_stats.clone()
}

fn next_offset(&self) -> FrameNo {
match self.last_received {
Some(fno) => fno + 1,
Expand Down

0 comments on commit 1eaf5c2

Please sign in to comment.