From 726c40c1d5869e267128f37ea4600b5caefcfe17 Mon Sep 17 00:00:00 2001 From: ad hoc Date: Mon, 30 Sep 2024 12:25:49 +0200 Subject: [PATCH 1/3] cleanup wal toolkit --- Cargo.lock | 42 +++ libsql-server/Cargo.toml | 1 + libsql-server/src/main.rs | 6 +- libsql-server/src/wal_toolkit.rs | 552 +++++++++++++++---------------- 4 files changed, 321 insertions(+), 280 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index b93a4127d5..f819d8d329 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2797,6 +2797,19 @@ dependencies = [ "hashbrown 0.14.5", ] +[[package]] +name = "indicatif" +version = "0.17.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "763a5a8f45087d6bcea4222e7b72c291a054edf80e4ef6efd2a4979878c7bea3" +dependencies = [ + "console", + "instant", + "number_prefix", + "portable-atomic", + "unicode-width", +] + [[package]] name = "inferno" version = "0.11.19" @@ -2855,6 +2868,15 @@ dependencies = [ "similar", ] +[[package]] +name = "instant" +version = "0.1.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e0242819d153cba4b4b05a5a8f2a7e9bbf97b6055b2a002b395c96b5ff3c0222" +dependencies = [ + "cfg-if", +] + [[package]] name = "io-extras" version = "0.17.4" @@ -3194,6 +3216,7 @@ dependencies = [ "hyper 0.14.30", "hyper-rustls 0.24.1", "hyper-tungstenite", + "indicatif", "insta", "itertools 0.10.5", "jsonwebtoken", @@ -3347,6 +3370,7 @@ dependencies = [ "crossbeam-skiplist", "dashmap", "fst", + "futures", "hashbrown 0.14.5", "hex", "http-body 1.0.0", @@ -3354,10 +3378,12 @@ dependencies = [ "inquire", "insta", "libsql-sys", + "memmap", "nix 0.28.0", "once_cell", "parking_lot", "petgraph", + "pin-project-lite", "priority-queue 2.0.3", "rand", "rand_chacha", @@ -3534,6 +3560,16 @@ dependencies = [ "rustix 0.38.34", ] +[[package]] +name = "memmap" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6585fd95e7bb50d6cc31e20d4cf9afb4e2ba16c5846fc76793f11218da9c475b" +dependencies = [ + "libc", + "winapi", +] + [[package]] name = "memmap2" version = "0.9.4" @@ -3828,6 +3864,12 @@ dependencies = [ "libc", ] +[[package]] +name = "number_prefix" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "830b246a0e5f20af87141b25c173cd1b609bd7779a4617d6ec582abaf90870f3" + [[package]] name = "numeric_cast" version = "0.2.1" diff --git a/libsql-server/Cargo.toml b/libsql-server/Cargo.toml index 26dd1dc3d3..4223e1939a 100644 --- a/libsql-server/Cargo.toml +++ b/libsql-server/Cargo.toml @@ -96,6 +96,7 @@ aws-config = "1" aws-sdk-s3 = "1" aws-smithy-runtime = "1.6.2" dialoguer = { version = "0.11.0", features = ["history"] } +indicatif = "0.17.8" [dev-dependencies] arbitrary = { version = "1.3.0", features = ["derive_arbitrary"] } diff --git a/libsql-server/src/main.rs b/libsql-server/src/main.rs index d08376d1cc..8f7f7ab98f 100644 --- a/libsql-server/src/main.rs +++ b/libsql-server/src/main.rs @@ -9,7 +9,7 @@ use bytesize::ByteSize; use clap::Parser; use hyper::client::HttpConnector; use libsql_server::auth::{parse_http_basic_auth_arg, parse_jwt_keys, user_auth_strategies, Auth}; -use libsql_server::wal_toolkit::{S3Args, WalToolkit}; +use libsql_server::wal_toolkit::{S3Args, WalToolkitCommand}; use tokio::sync::Notify; use tokio::time::Duration; use tracing_subscriber::util::SubscriberInitExt; @@ -328,7 +328,7 @@ enum UtilsSubcommands { #[clap(flatten)] s3_args: S3Args, #[clap(subcommand)] - command: WalToolkit, + command: WalToolkitCommand, }, } @@ -754,7 +754,7 @@ async fn main() -> Result<()> { path, s3_args, } => { - command.run(path, s3_args).await?; + command.exec(path, s3_args).await?; } } diff --git a/libsql-server/src/wal_toolkit.rs b/libsql-server/src/wal_toolkit.rs index 5297b932da..4fa39e5a54 100644 --- a/libsql-server/src/wal_toolkit.rs +++ b/libsql-server/src/wal_toolkit.rs @@ -5,6 +5,7 @@ use aws_config::{retry::RetryConfig, BehaviorVersion, Region}; use aws_sdk_s3::config::{Credentials, SharedCredentialsProvider}; use chrono::DateTime; use hashbrown::HashSet; +use indicatif::ProgressStyle; use libsql_sys::name::NamespaceName; use libsql_wal::io::StdIO; use libsql_wal::storage::backend::s3::S3Backend; @@ -22,319 +23,316 @@ pub enum CompactStrategy { } #[derive(Debug, clap::Subcommand)] -pub enum WalToolkit { - /// Register namespaces to monitor - Monitor { - /// list monitored namespaces - #[clap(long, short)] - list: bool, - /// Monitor the passed namespace - #[clap(long, short)] - add: Option, - /// Unmonitor the passed namespace - #[clap(long, short)] - delete: Option, - /// Sync namespaces from a sqld meta-store - #[clap(long)] - from_db: Option, - }, - /// Analyze segments for a namespaces - Analyze { - /// list all segments - #[clap(long)] - list_all: bool, - namespace: String, - }, - /// Compact segments into bigger segments - Compact { - /// compaction strategy - #[clap(long, short)] - strategy: CompactStrategy, - /// prints the compaction plan, but doesn't perform it. - #[clap(long)] - dry_run: bool, - /// only compact if it takes more than `threshold` segments to restore - #[clap(long, short, default_value = "1")] - threshold: usize, - /// namespace to compact, otherwise, all namespaces are compacted - namespace: Option, - }, - /// Sync namespace metadata from remote storage - Sync { - /// When performing a full sync, all the segment space is scanned again. By default, only - /// segments with frame_no greated that the last frame_no are retrieved. - #[clap(long)] - full: bool, - /// unless this is specified, all monitored namespaces are synced - namespace: Option, - }, - /// Restore namespace - Restore { - #[clap(long)] - verify: bool, - namespace: String, - out: PathBuf, - }, +pub enum WalToolkitCommand { + Monitor(MonitorCommand), + Analyze(AnalyzeCommand), + Compact(CompactCommand), + Sync(SyncCommand), + Restore(RestoreCommand), } -impl WalToolkit { - pub async fn run(&self, compact_path: &Path, s3_args: &S3Args) -> anyhow::Result<()> { +impl WalToolkitCommand { + pub async fn exec(&self, compact_path: &Path, s3_args: &S3Args) -> anyhow::Result<()> { let backend = setup_storage(s3_args).await?; tokio::fs::create_dir_all(compact_path).await?; let mut compactor = Compactor::new(backend.into(), compact_path)?; match self { - Self::Monitor { - add, - list, - from_db, - delete, - } => { - handle_monitor( - *list, - &mut compactor, - add.as_deref(), - delete.as_deref(), - from_db.as_deref(), - ) - .await?; - } - Self::Analyze { - namespace, - list_all, - } => { - handle_analyze(namespace, &compactor, *list_all)?; - } - Self::Compact { - strategy, - dry_run, - namespace, - threshold, - } => { - handle_compact( - namespace.as_deref(), - &mut compactor, - *threshold, - *strategy, - *dry_run, - ) - .await? - } - Self::Sync { full, namespace } => { - handle_sync(namespace.as_deref(), &mut compactor, full).await? - } - Self::Restore { - namespace, - out, - verify, - } => { - handle_restore(namespace, compactor, out, *verify).await?; - } + Self::Monitor(cmd) => cmd.exec(&mut compactor).await?, + Self::Analyze(cmd) => cmd.exec(&compactor).await?, + Self::Compact(cmd) => cmd.exec(&mut compactor).await?, + Self::Sync(cmd) => cmd.exec(&mut compactor).await?, + Self::Restore(cmd) => cmd.exec(&compactor).await?, } Ok(()) } } -async fn handle_restore( - namespace: &str, - compactor: Compactor>, - out: &Path, - verify: bool, -) -> Result<(), anyhow::Error> { - let namespace = NamespaceName::from_string(namespace.to_string()); - let analysis = compactor.analyze(&namespace)?; - let set = analysis.shortest_restore_path(); - compactor.restore(set, &out).await?; - Ok(if verify { - let conn = libsql_sys::rusqlite::Connection::open(&out)?; - conn.pragma_query(None, "integrity_check", |r| { - println!("{r:?}"); - Ok(()) - })?; - }) +#[derive(Debug, clap::Args)] +/// Restore namespace +pub struct RestoreCommand { + #[clap(long)] + pub verify: bool, + pub namespace: String, + pub out: PathBuf, } -async fn handle_sync( - namespace: Option<&str>, - compactor: &mut Compactor>, - full: &bool, -) -> Result<(), anyhow::Error> { - Ok(match namespace { - Some(ns) => { - let namespace = NamespaceName::from_string(ns.to_string()); - compactor.sync_one(&namespace, *full).await?; - println!("`{namespace}` fully up to date."); - } - None => { - compactor.sync_all(*full).await?; - println!("all monitored namespace fully up to date."); - } - }) +fn make_progress_fn() -> impl FnMut(u32, u32) { + let bar = indicatif::ProgressBar::new(0); + bar.set_style( + ProgressStyle::with_template("[{elapsed_precise}] {bar:30} {percent_precise}% eta: {eta}") + .unwrap() + .progress_chars("##-"), + ); + + move |current, total| { + bar.set_length(total as u64); + bar.set_position(current as u64); + } } -async fn handle_compact( - namespace: Option<&str>, - compactor: &mut Compactor>, - threshold: usize, - strategy: CompactStrategy, - dry_run: bool, -) -> Result<(), anyhow::Error> { - Ok(match namespace { - Some(namespace) => { - let namespace = NamespaceName::from_string(namespace.to_string()); - compact_namespace(compactor, &namespace, threshold, strategy, dry_run).await?; - } - None => { - let mut out = Vec::new(); - compactor.list_monitored_namespaces(|ns| { - out.push(ns); +impl RestoreCommand { + async fn exec(&self, compactor: &Compactor>) -> Result<(), anyhow::Error> { + let namespace = NamespaceName::from_string(self.namespace.to_string()); + let analysis = compactor.analyze(&namespace)?; + let set = analysis.shortest_restore_path(); + compactor + .restore(set, &self.out, make_progress_fn()) + .await?; + if self.verify { + let conn = libsql_sys::rusqlite::Connection::open(&self.out)?; + conn.pragma_query(None, "integrity_check", |r| { + println!("{r:?}"); + Ok(()) })?; + } + Ok(()) + } +} - for ns in &out { - compact_namespace(compactor, ns, threshold, strategy, dry_run).await?; +#[derive(Debug, clap::Args)] +/// Sync namespace metadata from remote storage +pub struct SyncCommand { + /// When performing a full sync, all the segment space is scanned again. By default, only + /// segments with frame_no greated that the last frame_no are retrieved. + #[clap(long)] + full: bool, + /// unless this is specified, all monitored namespaces are synced + namespace: Option, +} + +impl SyncCommand { + async fn exec(&self, compactor: &mut Compactor>) -> Result<(), anyhow::Error> { + match self.namespace { + Some(ref ns) => { + let namespace = NamespaceName::from_string(ns.to_string()); + compactor.sync_one(&namespace, self.full).await?; + println!("`{namespace}` fully up to date."); + } + None => { + compactor.sync_all(self.full).await?; + println!("all monitored namespace fully up to date."); } } - }) + + Ok(()) + } +} + +#[derive(Debug, clap::Args)] +/// Compact segments into bigger segments +pub struct CompactCommand { + /// compaction strategy + #[clap(long, short)] + pub strategy: CompactStrategy, + /// prints the compaction plan, but doesn't perform it. + #[clap(long)] + pub dry_run: bool, + /// only compact if it takes more than `threshold` segments to restore + #[clap(long, short, default_value = "1")] + pub threshold: usize, + /// whether to display a progress bar + #[clap(long, short)] + pub progress: bool, + /// namespace to compact, otherwise, all namespaces are compacted + pub namespace: Option, + #[clap(requires = "namespace")] + /// compact to given path instead of sending to backend + pub out: Option, } -fn handle_analyze( - namespace: &str, - compactor: &Compactor>, - list_all: bool, -) -> Result<(), anyhow::Error> { - let namespace = NamespaceName::from_string(namespace.to_string()); - let analysis = compactor.analyze(&namespace)?; - println!("stats for {namespace}:"); - println!("- segment count: {}", analysis.segment_count()); - println!("- last frame_no: {}", analysis.last_frame_no()); - let set = analysis.shortest_restore_path(); - println!("- shortest restore path len: {}", set.len()); - if let Some((first, last)) = compactor.get_segment_range(&namespace)? { - println!( - "- oldest segment: {}-{} ({})", - first.key.start_frame_no, - first.key.end_frame_no, - DateTime::from_timestamp_millis(first.key.timestamp as _).unwrap() - ); - println!( - "- most recent segment: {}-{} ({})", - last.key.start_frame_no, - last.key.end_frame_no, - DateTime::from_timestamp_millis(last.key.timestamp as _).unwrap() - ); +impl CompactCommand { + async fn exec(&self, compactor: &mut Compactor>) -> Result<(), anyhow::Error> { + match self.namespace { + Some(ref namespace) => { + let namespace = NamespaceName::from_string(namespace.to_string()); + self.compact_namespace(compactor, &namespace).await?; + } + None => { + let mut out = Vec::new(); + compactor.list_monitored_namespaces(|ns| { + out.push(ns); + })?; + + for ns in &out { + self.compact_namespace(compactor, ns).await?; + } + } + } + Ok(()) } - Ok(if list_all { - println!("segments:"); - compactor.list_all_segments(&namespace, |info| { + + async fn compact_namespace( + &self, + compactor: &mut Compactor, + namespace: &NamespaceName, + ) -> anyhow::Result<()> { + let analysis = compactor.analyze(&namespace)?; + let strat: Box = match self.strategy { + CompactStrategy::Logarithmic => Box::new(LogReductionStrategy), + CompactStrategy::CompactAll => Box::new(IdentityStrategy), + }; + let set = analysis.shortest_restore_path(); + if set.len() <= self.threshold { println!( - "- {}-{} ({})", - info.key.start_frame_no, - info.key.end_frame_no, - DateTime::from_timestamp_millis(info.key.timestamp as _).unwrap() + "skipping {namespace}: shortest restore path is {}, and threshold is {}", + set.len(), + self.threshold, ); - })?; - }) -} + return Ok(()); + } + let partition = strat.partition(&set); -async fn handle_monitor( - list: bool, - compactor: &mut Compactor>, - add: Option<&str>, - delete: Option<&str>, - from_db: Option<&Path>, -) -> Result<(), anyhow::Error> { - if list { - compactor.list_monitored_namespaces(|ns| { - println!("{ns}"); - })?; - } else if let Some(namespace) = add { - let namespace = NamespaceName::from_string(namespace.to_string()); - compactor.monitor(&namespace).await?; - println!("monitoring {namespace}"); - } - Ok(if let Some(namespace) = delete { - let namespace = NamespaceName::from_string(namespace.to_string()); - compactor.unmonitor(&namespace)?; - println!("{namespace} is unmonitored"); - } else if let Some(path) = from_db { - let metastore_path = path.join("metastore").join("data"); - let conn = rusqlite::Connection::open_with_flags( - metastore_path, - OpenFlags::SQLITE_OPEN_READ_ONLY, - )?; - let mut stmt = conn.prepare("SELECT namespace FROM namespace_configs")?; - let metastore_namespaces = stmt - .query(())? - .mapped(|r| Ok(NamespaceName::from_string(r.get(0)?))) - .collect::, _>>()?; - - let mut monitored_namespace = HashSet::new(); - compactor.list_monitored_namespaces(|n| { - monitored_namespace.insert(n); - })?; - - let to_remove = monitored_namespace.difference(&metastore_namespaces); - for ns in to_remove { - println!("- {ns}"); - compactor.unmonitor(ns)?; + println!("compacting {namespace}:"); + println!("-> initial shortest restore path len: {}", set.len()); + println!("-> compacting into {} segments", partition.len()); + for set in partition.iter() { + println!("\t- {:?}", set.range().unwrap()); } - let to_add = metastore_namespaces.difference(&monitored_namespace); - for ns in to_add { - println!("+ {ns}"); - compactor.monitor(&ns).await?; + if self.dry_run { + println!("dry run: stopping"); + } else { + println!("performing compaction"); + let part_len = partition.len(); + for (idx, set) in partition.into_iter().enumerate() { + let Some((start, end)) = set.range() else { + continue; + }; + println!("compacting {start}-{end} ({}/{})", idx + 1, part_len); + if self.progress { + compactor + .compact(set, self.out.as_deref(), make_progress_fn()) + .await?; + } else { + compactor + .compact(set, self.out.as_deref(), |_, _| ()) + .await?; + } + + // sync back the new segments + compactor.sync_one(&namespace, false).await?; + } } - }) -} -async fn compact_namespace( - compactor: &mut Compactor, - namespace: &NamespaceName, - threshold: usize, - strategy: CompactStrategy, - dry_run: bool, -) -> anyhow::Result<()> { - let analysis = compactor.analyze(&namespace)?; - let strat: Box = match strategy { - CompactStrategy::Logarithmic => Box::new(LogReductionStrategy), - CompactStrategy::CompactAll => Box::new(IdentityStrategy), - }; - let set = analysis.shortest_restore_path(); - if set.len() <= threshold { - println!( - "skipping {namespace}: shortest restore path is {}, and threshold is {threshold}", - set.len() - ); - return Ok(()); + Ok(()) } - let partition = strat.partition(&set); +} - println!("compacting {namespace}:"); - println!("-> initial shortest restore path len: {}", set.len()); - println!("-> compacting into {} segments", partition.len()); - for set in partition.iter() { - println!("\t- {:?}", set.range().unwrap()); - } +#[derive(Debug, clap::Args)] +/// Analyze segments for a namespaces +pub struct AnalyzeCommand { + /// list all segments + #[clap(long)] + pub list_all: bool, + pub namespace: String, +} - if dry_run { - println!("dry run: stopping"); - } else { - println!("performing compaction"); - let part_len = partition.len(); - for (idx, set) in partition.into_iter().enumerate() { - let Some((start, end)) = set.range() else { - continue; - }; - println!("compacting {start}-{end} ({}/{})", idx + 1, part_len); - // TODO: we can compact in conccurently - compactor.compact(set).await?; - - // sync back the new segments - compactor.sync_one(&namespace, false).await?; +impl AnalyzeCommand { + async fn exec(&self, compactor: &Compactor>) -> Result<(), anyhow::Error> { + let namespace = NamespaceName::from_string(self.namespace.to_string()); + let analysis = compactor.analyze(&namespace)?; + println!("stats for {namespace}:"); + println!("- segment count: {}", analysis.segment_count()); + println!("- last frame_no: {}", analysis.last_frame_no()); + let set = analysis.shortest_restore_path(); + println!("- shortest restore path len: {}", set.len()); + if let Some((first, last)) = compactor.get_segment_range(&namespace)? { + println!( + "- oldest segment: {}-{} ({})", + first.key.start_frame_no, + first.key.end_frame_no, + DateTime::from_timestamp_millis(first.key.timestamp as _).unwrap() + ); + println!( + "- most recent segment: {}-{} ({})", + last.key.start_frame_no, + last.key.end_frame_no, + DateTime::from_timestamp_millis(last.key.timestamp as _).unwrap() + ); + } + + if self.list_all { + println!("segments:"); + compactor.list_all_segments(&namespace, |info| { + println!( + "- {}-{} ({})", + info.key.start_frame_no, + info.key.end_frame_no, + DateTime::from_timestamp_millis(info.key.timestamp as _).unwrap() + ); + })?; } + + Ok(()) } +} - Ok(()) +#[derive(Debug, clap::Args)] +/// Register namespaces to monitor +pub struct MonitorCommand { + /// list monitored namespaces + #[clap(long, short)] + pub list: bool, + /// Monitor the passed namespace + #[clap(long, short)] + pub add: Option, + /// Unmonitor the passed namespace + #[clap(long, short)] + pub delete: Option, + /// Sync namespaces from a sqld meta-store + #[clap(long)] + pub from_db: Option, +} + +impl MonitorCommand { + async fn exec(&self, compactor: &mut Compactor>) -> Result<(), anyhow::Error> { + if self.list { + compactor.list_monitored_namespaces(|ns| { + println!("{ns}"); + })?; + } else if let Some(ref namespace) = self.add { + let namespace = NamespaceName::from_string(namespace.to_string()); + compactor.monitor(&namespace).await?; + println!("monitoring {namespace}"); + } + + if let Some(ref namespace) = self.delete { + let namespace = NamespaceName::from_string(namespace.to_string()); + compactor.unmonitor(&namespace)?; + println!("{namespace} is unmonitored"); + } else if let Some(ref path) = self.from_db { + let metastore_path = path.join("metastore").join("data"); + let conn = rusqlite::Connection::open_with_flags( + metastore_path, + OpenFlags::SQLITE_OPEN_READ_ONLY, + )?; + let mut stmt = conn.prepare("SELECT namespace FROM namespace_configs")?; + let metastore_namespaces = stmt + .query(())? + .mapped(|r| Ok(NamespaceName::from_string(r.get(0)?))) + .collect::, _>>()?; + + let mut monitored_namespace = HashSet::new(); + compactor.list_monitored_namespaces(|n| { + monitored_namespace.insert(n); + })?; + + let to_remove = monitored_namespace.difference(&metastore_namespaces); + for ns in to_remove { + println!("- {ns}"); + compactor.unmonitor(ns)?; + } + + let to_add = metastore_namespaces.difference(&monitored_namespace); + for ns in to_add { + println!("+ {ns}"); + compactor.monitor(&ns).await?; + } + } + + Ok(()) + } } async fn setup_storage(opt: &S3Args) -> anyhow::Result> { From 912aaf3e37c93236bf7a374323d201df0aa20f55 Mon Sep 17 00:00:00 2001 From: ad hoc Date: Mon, 30 Sep 2024 12:26:37 +0200 Subject: [PATCH 2/3] make MapSlice more general --- libsql-wal/src/io/buf.rs | 56 ++++++++++++++++++++++++++++++---------- 1 file changed, 42 insertions(+), 14 deletions(-) diff --git a/libsql-wal/src/io/buf.rs b/libsql-wal/src/io/buf.rs index 34ce446409..0a06be4cef 100644 --- a/libsql-wal/src/io/buf.rs +++ b/libsql-wal/src/io/buf.rs @@ -1,8 +1,12 @@ // from tokio uring -use std::mem::{size_of, MaybeUninit}; +use std::{ + borrow::Borrow, + marker::PhantomData, + mem::{size_of, MaybeUninit}, +}; -use bytes::BytesMut; +use bytes::{Bytes, BytesMut}; use zerocopy::{AsBytes, FromBytes}; pub unsafe trait IoBuf: Unpin + 'static { @@ -90,6 +94,20 @@ unsafe impl IoBuf for BytesMut { } } +unsafe impl IoBuf for Bytes { + fn stable_ptr(&self) -> *const u8 { + self.as_ptr() + } + + fn bytes_init(&self) -> usize { + self.len() + } + + fn bytes_total(&self) -> usize { + self.len() + } +} + unsafe impl IoBufMut for BytesMut { fn stable_mut_ptr(&mut self) -> *mut u8 { self.as_mut_ptr() @@ -185,11 +203,11 @@ impl ZeroCopyBuf { } } - pub fn map_slice(self, f: F) -> MapSlice + pub fn map_slice(self, f: F) -> MapSlice where for<'a> F: Fn(&'a Self) -> &'a [u8] + Unpin + 'static, { - MapSlice { inner: self, f } + MapSlice::new(self, f) } #[inline] @@ -220,32 +238,42 @@ impl ZeroCopyBuf { } } -pub struct MapSlice { - inner: ZeroCopyBuf, +pub struct MapSlice { + inner: T, f: F, + _p: PhantomData, } -impl MapSlice { - pub(crate) fn into_inner(self) -> ZeroCopyBuf { +impl MapSlice { + pub(crate) fn into_inner(self) -> T { self.inner } + + pub(crate) fn new(inner: T, f: F) -> Self { + Self { + inner, + f, + _p: PhantomData, + } + } } -unsafe impl IoBuf for MapSlice +unsafe impl IoBuf for MapSlice where - for<'a> F: Fn(&'a ZeroCopyBuf) -> &'a [u8] + Unpin + 'static, - T: Unpin + 'static + AsBytes, + for<'a> F: Fn(&'a ZeroCopyBuf) -> &'a [u8] + Unpin + 'static, + T: Borrow> + Unpin + 'static, + U: AsBytes + Unpin + 'static, { fn stable_ptr(&self) -> *const u8 { - (self.f)(&self.inner).as_ptr() + (self.f)(&self.inner.borrow()).as_ptr() } fn bytes_init(&self) -> usize { - (self.f)(&self.inner).len() + (self.f)(&self.inner.borrow()).len() } fn bytes_total(&self) -> usize { - (self.f)(&self.inner).len() + (self.f)(&self.inner.borrow()).len() } } From 06e177fb9803de90951ad6c4735e53a3d07d7a6a Mon Sep 17 00:00:00 2001 From: ad hoc Date: Mon, 30 Sep 2024 12:42:44 +0200 Subject: [PATCH 3/3] streaming compaction --- libsql-wal/Cargo.toml | 3 + libsql-wal/src/replication/storage.rs | 16 +- libsql-wal/src/segment/compacted.rs | 140 ++++++- libsql-wal/src/segment/mod.rs | 55 +-- libsql-wal/src/segment/sealed.rs | 84 ++-- libsql-wal/src/shared_wal.rs | 8 +- libsql-wal/src/storage/async_storage.rs | 15 +- libsql-wal/src/storage/backend/mod.rs | 97 +++-- libsql-wal/src/storage/backend/s3.rs | 361 ++++++++++------ libsql-wal/src/storage/compaction/mod.rs | 504 +++++++++++++++-------- libsql-wal/src/storage/job.rs | 200 +-------- libsql-wal/src/storage/mod.rs | 46 +-- libsql-wal/src/transaction.rs | 32 +- 13 files changed, 860 insertions(+), 701 deletions(-) diff --git a/libsql-wal/Cargo.toml b/libsql-wal/Cargo.toml index 5a356d50f4..c5e39cb853 100644 --- a/libsql-wal/Cargo.toml +++ b/libsql-wal/Cargo.toml @@ -48,6 +48,9 @@ rand = "0.8.5" aws-smithy-types-convert = { version = "0.60.8", features = ["convert-chrono"] } petgraph = "0.6.5" anyhow = { version = "1.0.86", optional = true } +futures = "0.3.30" +memmap = "0.7.0" +pin-project-lite = "0.2.14" [dev-dependencies] criterion = "0.5.1" diff --git a/libsql-wal/src/replication/storage.rs b/libsql-wal/src/replication/storage.rs index 062f7a6b6e..d239bad300 100644 --- a/libsql-wal/src/replication/storage.rs +++ b/libsql-wal/src/replication/storage.rs @@ -8,6 +8,7 @@ use tokio_stream::Stream; use zerocopy::FromZeroes; use crate::io::buf::ZeroCopyBoxIoBuf; +use crate::segment::compacted::CompactedFrame; use crate::segment::Frame; use crate::storage::backend::FindSegmentReq; use crate::storage::Storage; @@ -65,10 +66,19 @@ where }, }; - let (frame, ret) = segment.read_frame(ZeroCopyBoxIoBuf::new_uninit(Frame::new_box_zeroed()), offset as u32).await; + // TODO: The copy here is inneficient. This is OK for now, until we rewrite + // this code to read from the main db file instead of storage. + let (compacted_frame, ret) = segment.read_frame(ZeroCopyBoxIoBuf::new_uninit(CompactedFrame::new_box_zeroed()), offset as u32).await; ret?; - let frame = frame.into_inner(); - debug_assert_eq!(frame.header().size_after(), 0, "all frames in a compacted segment should have size_after set to 0"); + let compacted_frame = compacted_frame.into_inner(); + let mut frame = Frame::new_box_zeroed(); + frame.data_mut().copy_from_slice(&compacted_frame.data); + + let header = frame.header_mut(); + header.frame_no = compacted_frame.header().frame_no; + header.size_after = 0.into(); + header.page_no = compacted_frame.header().page_no; + if frame.header().frame_no() >= until { yield frame; } diff --git a/libsql-wal/src/segment/compacted.rs b/libsql-wal/src/segment/compacted.rs index 4259114425..5c123cf26e 100644 --- a/libsql-wal/src/segment/compacted.rs +++ b/libsql-wal/src/segment/compacted.rs @@ -1,23 +1,23 @@ use std::io; -use std::mem::size_of; +use std::mem::{offset_of, size_of}; +use chrono::{DateTime, Utc}; +use uuid::Uuid; use zerocopy::little_endian::{U128 as lu128, U16 as lu16, U32 as lu32, U64 as lu64}; use zerocopy::{AsBytes, FromBytes, FromZeroes}; use crate::io::buf::{IoBufMut, ZeroCopyBuf}; use crate::io::FileExt; -use crate::segment::FrameHeader; use crate::{LIBSQL_MAGIC, LIBSQL_PAGE_SIZE, LIBSQL_WAL_VERSION}; -use super::{Frame, Result}; +use super::Result; #[derive(Debug, AsBytes, FromZeroes, FromBytes)] #[repr(C)] -pub struct CompactedSegmentDataHeader { +pub struct CompactedSegmentHeader { pub(crate) magic: lu64, pub(crate) version: lu16, - pub(crate) frame_count: lu32, - pub(crate) segment_id: lu128, + pub(crate) log_id: lu128, pub(crate) start_frame_no: lu64, pub(crate) end_frame_no: lu64, pub(crate) size_after: lu32, @@ -25,7 +25,107 @@ pub struct CompactedSegmentDataHeader { pub(crate) page_size: lu16, pub(crate) timestamp: lu64, } -impl CompactedSegmentDataHeader { + +bitflags::bitflags! { + pub struct CompactedFrameFlags: u32 { + /// This flag is set for the last frame in the segment + const LAST = 1 << 0; + } +} + +#[derive(Debug, AsBytes, FromZeroes, FromBytes)] +#[repr(C)] +pub struct CompactedFrameHeader { + pub flags: lu32, + pub page_no: lu32, + pub frame_no: lu64, + /// running checksum from this frame + /// this is the crc32 of the checksum of the previous frame and all the frame data, including + /// all the fields before checksum in the header. THIS FIELD MUST ALWAYS BE THE last FIELD IN + /// THE STRUCT + pub checksum: lu32, +} + +impl CompactedFrameHeader { + pub fn flags(&self) -> CompactedFrameFlags { + CompactedFrameFlags::from_bits(self.flags.get()).unwrap() + } + + pub(crate) fn is_last(&self) -> bool { + self.flags().contains(CompactedFrameFlags::LAST) + } + + pub(crate) fn set_last(&mut self) { + let mut flags = self.flags(); + flags.insert(CompactedFrameFlags::LAST); + self.flags = flags.bits().into(); + } + + pub(crate) fn reset_flags(&mut self) { + self.flags = 0.into(); + } + + pub(crate) fn compute_checksum(&self, previous: u32, data: &[u8]) -> u32 { + assert_eq!(data.len(), LIBSQL_PAGE_SIZE as usize); + let mut h = crc32fast::Hasher::new_with_initial(previous); + h.update(&self.as_bytes()[..offset_of!(Self, checksum)]); + h.update(data); + h.finalize() + } + + /// updates the checksum with the previous frame checksum and the frame data + pub(crate) fn update_checksum(&mut self, previous: u32, data: &[u8]) -> u32 { + let checksum = self.compute_checksum(previous, data); + self.checksum = checksum.into(); + checksum + } + + pub fn checksum(&self) -> u32 { + self.checksum.get() + } + + pub fn page_no(&self) -> u32 { + self.page_no.get() + } +} + +#[derive(Debug, AsBytes, FromZeroes, FromBytes)] +#[repr(C)] +pub struct CompactedFrame { + pub header: CompactedFrameHeader, + pub data: [u8; LIBSQL_PAGE_SIZE as usize], +} + +impl CompactedFrame { + pub fn header(&self) -> &CompactedFrameHeader { + &self.header + } + + pub(crate) fn header_mut(&mut self) -> &mut CompactedFrameHeader { + &mut self.header + } +} + +impl CompactedSegmentHeader { + pub fn new( + start_frame_no: u64, + end_frame_no: u64, + size_after: u32, + timestamp: DateTime, + log_id: Uuid, + ) -> Self { + Self { + magic: LIBSQL_MAGIC.into(), + version: LIBSQL_WAL_VERSION.into(), + start_frame_no: start_frame_no.into(), + end_frame_no: end_frame_no.into(), + size_after: size_after.into(), + page_size: LIBSQL_PAGE_SIZE.into(), + timestamp: (timestamp.timestamp_millis() as u64).into(), + log_id: log_id.as_u128().into(), + } + } + fn check(&self) -> Result<()> { if self.magic.get() != LIBSQL_MAGIC { return Err(super::Error::InvalidHeaderMagic); @@ -47,14 +147,8 @@ impl CompactedSegmentDataHeader { } } -#[derive(Debug, AsBytes, FromZeroes, FromBytes)] -#[repr(C)] -pub struct CompactedSegmentDataFooter { - pub(crate) checksum: lu32, -} - pub struct CompactedSegment { - header: CompactedSegmentDataHeader, + header: CompactedSegmentHeader, file: F, } @@ -69,7 +163,7 @@ impl CompactedSegment { } } - pub fn header(&self) -> &CompactedSegmentDataHeader { + pub fn header(&self) -> &CompactedSegmentHeader { &self.header } } @@ -79,23 +173,25 @@ impl CompactedSegment { let buf = ZeroCopyBuf::new_uninit(); let (buf, ret) = file.read_exact_at_async(buf, 0).await; ret?; - let header: CompactedSegmentDataHeader = buf.into_inner(); + let header: CompactedSegmentHeader = buf.into_inner(); header.check()?; Ok(Self { file, header }) } - pub(crate) fn from_parts(file: F, header: CompactedSegmentDataHeader) -> Self { + pub(crate) fn from_parts(file: F, header: CompactedSegmentHeader) -> Self { Self { header, file } } + /// read a CompactedFrame from the segment pub(crate) async fn read_frame( &self, buf: B, offset: u32, ) -> (B, io::Result<()>) { assert_eq!(buf.bytes_init(), 0); - assert_eq!(buf.bytes_total(), size_of::()); - let offset = size_of::() + size_of::() * offset as usize; + assert_eq!(buf.bytes_total(), size_of::()); + let offset = + size_of::() + size_of::() * offset as usize; let (buf, ret) = self.file.read_exact_at_async(buf, offset as u64).await; (buf, ret) } @@ -107,9 +203,9 @@ impl CompactedSegment { ) -> (B, io::Result<()>) { assert_eq!(buf.bytes_init(), 0); assert_eq!(buf.bytes_total(), LIBSQL_PAGE_SIZE as usize); - let offset = size_of::() - + size_of::() * offset as usize - + size_of::(); + let offset = size_of::() + + size_of::() * offset as usize + + size_of::(); let (buf, ret) = self.file.read_exact_at_async(buf, offset as u64).await; (buf, ret) } diff --git a/libsql-wal/src/segment/mod.rs b/libsql-wal/src/segment/mod.rs index 6a0447609f..1b250bd997 100644 --- a/libsql-wal/src/segment/mod.rs +++ b/libsql-wal/src/segment/mod.rs @@ -155,11 +155,7 @@ impl SegmentHeader { } pub trait Segment: Send + Sync + 'static { - fn compact( - &self, - out_file: &impl FileExt, - id: uuid::Uuid, - ) -> impl Future>> + Send; + fn compact(&self, out_file: &impl FileExt) -> impl Future>> + Send; fn start_frame_no(&self) -> u64; fn last_committed(&self) -> u64; fn index(&self) -> &fst::Map>; @@ -177,55 +173,6 @@ pub trait Segment: Send + Sync + 'static { fn destroy(&self, io: &IO) -> impl Future; } -impl Segment for Arc { - fn compact( - &self, - out_file: &impl FileExt, - id: uuid::Uuid, - ) -> impl Future>> + Send { - self.as_ref().compact(out_file, id) - } - - fn start_frame_no(&self) -> u64 { - self.as_ref().start_frame_no() - } - - fn last_committed(&self) -> u64 { - self.as_ref().last_committed() - } - - fn index(&self) -> &fst::Map> { - self.as_ref().index() - } - - fn read_page(&self, page_no: u32, max_frame_no: u64, buf: &mut [u8]) -> io::Result { - self.as_ref().read_page(page_no, max_frame_no, buf) - } - - fn is_checkpointable(&self) -> bool { - self.as_ref().is_checkpointable() - } - - fn size_after(&self) -> u32 { - self.as_ref().size_after() - } - - async fn read_frame_offset_async(&self, offset: u32, buf: B) -> (B, Result<()>) - where - B: IoBufMut + Send + 'static, - { - self.as_ref().read_frame_offset_async(offset, buf).await - } - - fn destroy(&self, io: &IO) -> impl Future { - self.as_ref().destroy(io) - } - - fn timestamp(&self) -> DateTime { - self.as_ref().timestamp() - } -} - #[repr(C)] #[derive(Debug, zerocopy::AsBytes, zerocopy::FromBytes, zerocopy::FromZeroes)] pub struct FrameHeader { diff --git a/libsql-wal/src/segment/sealed.rs b/libsql-wal/src/segment/sealed.rs index 236697be03..d9e480bd44 100644 --- a/libsql-wal/src/segment/sealed.rs +++ b/libsql-wal/src/segment/sealed.rs @@ -12,13 +12,15 @@ use fst::{Map, MapBuilder, Streamer}; use zerocopy::{AsBytes, FromZeroes}; use crate::error::Result; -use crate::io::buf::{IoBufMut, ZeroCopyBuf}; +use crate::io::buf::{IoBuf, IoBufMut, MapSlice, ZeroCopyBuf}; use crate::io::file::{BufCopy, FileExt}; use crate::io::Inspect; use crate::segment::{checked_frame_offset, CheckedFrame}; use crate::{LIBSQL_MAGIC, LIBSQL_WAL_VERSION}; -use super::compacted::{CompactedSegmentDataFooter, CompactedSegmentDataHeader}; +use super::compacted::{ + CompactedFrame, CompactedFrameFlags, CompactedFrameHeader, CompactedSegmentHeader, +}; use super::{frame_offset, page_offset, Frame, Segment, SegmentFlags, SegmentHeader}; /// an immutable, wal segment @@ -81,12 +83,8 @@ impl Segment for SealedSegment where F: FileExt, { - async fn compact(&self, out_file: &impl FileExt, id: uuid::Uuid) -> Result> { - let mut hasher = crc32fast::Hasher::new(); - - let header = CompactedSegmentDataHeader { - frame_count: (self.index().len() as u32).into(), - segment_id: id.as_u128().into(), + async fn compact(&self, out_file: &impl FileExt) -> Result> { + let header = CompactedSegmentHeader { start_frame_no: self.header().start_frame_no, end_frame_no: self.header().last_commited_frame_no, size_after: self.header.size_after, @@ -94,50 +92,64 @@ where magic: LIBSQL_MAGIC.into(), page_size: self.header().page_size, timestamp: self.header.sealed_at_timestamp, + log_id: self.header().log_id.into(), }; - hasher.update(header.as_bytes()); + let mut crc_init = crc32fast::hash(header.as_bytes()); + let (_, ret) = out_file .write_all_at_async(ZeroCopyBuf::new_init(header), 0) .await; ret?; + let mut count_pages = self.index().len(); let mut pages = self.index().stream(); let mut buffer = Box::new(ZeroCopyBuf::::new_uninit()); let mut out_index = fst::MapBuilder::memory(); let mut current_offset = 0; while let Some((page_no_bytes, offset)) = pages.next() { + count_pages -= 1; let (mut b, ret) = self.read_frame_offset_async(offset as _, buffer).await; ret?; // transaction boundaries in a segment are completely erased. The responsibility is on // the consumer of the segment to place the transaction boundary such that all frames from // the segment are applied within the same transaction. - b.get_mut().header_mut().set_size_after(0); - hasher.update(&b.get_ref().as_bytes()); - let dest_offset = - size_of::() + current_offset * size_of::(); - let (mut b, ret) = out_file.write_all_at_async(b, dest_offset as u64).await; - ret?; + let frame = b.get_mut(); + + let flags = if count_pages == 0 { + CompactedFrameFlags::LAST.bits() + } else { + CompactedFrameFlags::empty().bits() + }; + + let mut compacted_header = CompactedFrameHeader { + checksum: 0.into(), + flags: flags.into(), + page_no: frame.header().page_no, + frame_no: frame.header().frame_no, + }; + + crc_init = compacted_header.update_checksum(crc_init, b.get_ref().data()); + + fn map_data(b: &ZeroCopyBuf) -> &[u8] { + b.get_ref().data() + } + + let data = MapSlice::new(b, map_data); + let mut b = write_compact_frame(out_file, current_offset, compacted_header, data) + .await? + .into_inner(); + out_index .insert(page_no_bytes, current_offset as _) .unwrap(); current_offset += 1; + b.deinit(); buffer = b; } - let footer = CompactedSegmentDataFooter { - checksum: hasher.finalize().into(), - }; - - let footer_offset = - size_of::() + current_offset * size_of::(); - let (_, ret) = out_file - .write_all_at_async(ZeroCopyBuf::new_init(footer), footer_offset as _) - .await; - ret?; - Ok(out_index.into_inner().unwrap()) } @@ -209,6 +221,26 @@ where } } +async fn write_compact_frame( + file: &impl FileExt, + offset: usize, + header: CompactedFrameHeader, + data: B, +) -> Result { + let header_offset = size_of::() + offset * size_of::(); + + let fut1 = file.write_all_at_async(ZeroCopyBuf::new_init(header), header_offset as u64); + + let data_offset = header_offset + size_of::(); + let fut2 = file.write_all_at_async(data, data_offset as u64); + let ((_, ret1), (b, ret2)) = futures::join!(fut1, fut2); + + ret1?; + ret2?; + + Ok(b) +} + impl SealedSegment { pub fn open( file: Arc, diff --git a/libsql-wal/src/shared_wal.rs b/libsql-wal/src/shared_wal.rs index 2573dfd809..4087a254c9 100644 --- a/libsql-wal/src/shared_wal.rs +++ b/libsql-wal/src/shared_wal.rs @@ -1,4 +1,3 @@ -use std::collections::BTreeMap; use std::path::PathBuf; use std::sync::atomic::{AtomicBool, AtomicU64, Ordering}; use std::sync::Arc; @@ -207,12 +206,7 @@ impl SharedWal { Ok(WriteTransaction { wal_lock: self.wal_lock.clone(), - savepoints: vec![Savepoint { - current_checksum, - next_offset, - next_frame_no, - index: BTreeMap::new(), - }], + savepoints: vec![Savepoint::new(next_offset, next_frame_no, current_checksum)], next_frame_no, next_offset, current_checksum, diff --git a/libsql-wal/src/storage/async_storage.rs b/libsql-wal/src/storage/async_storage.rs index 3fc62fa56d..438d426257 100644 --- a/libsql-wal/src/storage/async_storage.rs +++ b/libsql-wal/src/storage/async_storage.rs @@ -15,7 +15,7 @@ use crate::segment::Segment; use super::backend::{Backend, FindSegmentReq}; use super::scheduler::Scheduler; -use super::{OnStoreCallback, RestoreOptions, Storage, StoreSegmentRequest}; +use super::{OnStoreCallback, Storage, StoreSegmentRequest}; /// Background loop task state. /// @@ -212,19 +212,6 @@ where Ok(meta.max_frame_no) } - async fn restore( - &self, - file: impl crate::io::FileExt, - namespace: &NamespaceName, - restore_options: RestoreOptions, - config_override: Option, - ) -> super::Result<()> { - let config = config_override.unwrap_or_else(|| self.backend.default_config()); - self.backend - .restore(&config, &namespace, restore_options, file) - .await - } - async fn find_segment( &self, namespace: &NamespaceName, diff --git a/libsql-wal/src/storage/backend/mod.rs b/libsql-wal/src/storage/backend/mod.rs index e9ceb743ea..cc90c0db75 100644 --- a/libsql-wal/src/storage/backend/mod.rs +++ b/libsql-wal/src/storage/backend/mod.rs @@ -2,14 +2,14 @@ use std::future::Future; use std::sync::Arc; +use bytes::Bytes; use chrono::{DateTime, Utc}; use fst::Map; use tokio_stream::Stream; -use uuid::Uuid; -use super::{RestoreOptions, Result, SegmentInfo, SegmentKey}; +use super::{Result, SegmentInfo, SegmentKey}; use crate::io::file::FileExt; -use crate::segment::compacted::CompactedSegmentDataHeader; +use crate::segment::compacted::{CompactedFrameHeader, CompactedSegmentHeader}; use libsql_sys::name::NamespaceName; // pub mod fs; @@ -19,7 +19,6 @@ pub mod s3; #[derive(Debug)] pub struct SegmentMeta { pub namespace: NamespaceName, - pub segment_id: Uuid, pub start_frame_no: u64, pub end_frame_no: u64, pub segment_timestamp: DateTime, @@ -52,6 +51,24 @@ pub trait Backend: Send + Sync + 'static { segment_index: Vec, ) -> impl Future> + Send; + /// Store `segment_data` with its associated `meta` + fn store_segment_data( + &self, + config: &Self::Config, + namespace: &NamespaceName, + key: &SegmentKey, + segment_data: impl Stream> + Send + Sync + 'static, + ) -> impl Future> + Send; + + /// Store `segment_data` with its associated `meta` + fn store_segment_index( + &self, + config: &Self::Config, + namespace: &NamespaceName, + key: &SegmentKey, + index: Vec, + ) -> impl Future> + Send; + fn find_segment( &self, config: &Self::Config, @@ -73,7 +90,7 @@ pub trait Backend: Send + Sync + 'static { namespace: &NamespaceName, key: &SegmentKey, file: &impl FileExt, - ) -> Result; + ) -> Result; // this method taking self: Arc is an infortunate consequence of rust type system making // impl FileExt variant with all the arguments, with no escape hatch... @@ -84,20 +101,22 @@ pub trait Backend: Send + Sync + 'static { key: SegmentKey, ) -> impl Future> + Send; - /// Fetch meta for `namespace` - fn meta( + async fn fetch_segment_data_stream( &self, - config: &Self::Config, + config: Self::Config, namespace: &NamespaceName, - ) -> impl Future> + Send; + key: SegmentKey, + ) -> Result<( + CompactedSegmentHeader, + impl Stream>, + )>; - async fn restore( + /// Fetch meta for `namespace` + fn meta( &self, config: &Self::Config, namespace: &NamespaceName, - restore_options: RestoreOptions, - dest: impl FileExt, - ) -> Result<()>; + ) -> impl Future> + Send; fn list_segments<'a>( &'a self, @@ -132,18 +151,6 @@ impl Backend for Arc { self.as_ref().default_config() } - async fn restore( - &self, - config: &Self::Config, - namespace: &NamespaceName, - restore_options: RestoreOptions, - dest: impl FileExt, - ) -> Result<()> { - self.as_ref() - .restore(config, namespace, restore_options, dest) - .await - } - async fn find_segment( &self, config: &Self::Config, @@ -170,7 +177,7 @@ impl Backend for Arc { namespace: &NamespaceName, key: &SegmentKey, file: &impl FileExt, - ) -> Result { + ) -> Result { self.as_ref() .fetch_segment_data_to_file(config, namespace, key, file) .await @@ -197,4 +204,42 @@ impl Backend for Arc { ) -> impl Stream> + 'a { self.as_ref().list_segments(config, namespace, until) } + + async fn fetch_segment_data_stream( + &self, + config: Self::Config, + namespace: &NamespaceName, + key: SegmentKey, + ) -> Result<( + CompactedSegmentHeader, + impl Stream>, + )> { + self.as_ref() + .fetch_segment_data_stream(config, namespace, key) + .await + } + + async fn store_segment_data( + &self, + config: &Self::Config, + namespace: &NamespaceName, + key: &SegmentKey, + segment_data: impl Stream> + Send + Sync + 'static, + ) -> Result<()> { + self.as_ref() + .store_segment_data(config, namespace, key, segment_data) + .await + } + + async fn store_segment_index( + &self, + config: &Self::Config, + namespace: &NamespaceName, + key: &SegmentKey, + index: Vec, + ) -> Result<()> { + self.as_ref() + .store_segment_index(config, namespace, key, index) + .await + } } diff --git a/libsql-wal/src/storage/backend/s3.rs b/libsql-wal/src/storage/backend/s3.rs index 81f7dfcbad..70b1b321b7 100644 --- a/libsql-wal/src/storage/backend/s3.rs +++ b/libsql-wal/src/storage/backend/s3.rs @@ -18,20 +18,19 @@ use bytes::{Bytes, BytesMut}; use chrono::{DateTime, Utc}; use http_body::{Frame as HttpFrame, SizeHint}; use libsql_sys::name::NamespaceName; -use roaring::RoaringBitmap; +use pin_project_lite::pin_project; use tokio::io::{AsyncBufReadExt, AsyncRead, AsyncReadExt, BufReader}; use tokio_stream::Stream; +use tokio_util::codec::Decoder; use tokio_util::sync::ReusableBoxFuture; use zerocopy::byteorder::little_endian::{U16 as lu16, U32 as lu32, U64 as lu64}; use zerocopy::{AsBytes, FromBytes, FromZeroes}; use super::{Backend, FindSegmentReq, SegmentMeta}; -use crate::io::buf::ZeroCopyBuf; use crate::io::compat::copy_to_file; use crate::io::{FileExt, Io, StdIO}; -use crate::segment::compacted::CompactedSegmentDataHeader; -use crate::segment::Frame; -use crate::storage::{Error, RestoreOptions, Result, SegmentInfo, SegmentKey}; +use crate::segment::compacted::{CompactedFrame, CompactedFrameHeader, CompactedSegmentHeader}; +use crate::storage::{Error, Result, SegmentInfo, SegmentKey}; use crate::LIBSQL_MAGIC; pub struct S3Backend { @@ -39,6 +38,7 @@ pub struct S3Backend { default_config: Arc, io: IO, } + impl S3Backend { pub async fn from_sdk_config( aws_config: SdkConfig, @@ -137,19 +137,123 @@ impl S3Backend { folder_key: &FolderKey<'_>, segment_key: &SegmentKey, file: &impl FileExt, - ) -> Result { + ) -> Result { let reader = self .fetch_segment_data_reader(config, folder_key, segment_key) .await?; let mut reader = tokio::io::BufReader::with_capacity(8196, reader); - while reader.fill_buf().await?.len() < size_of::() {} - let header = CompactedSegmentDataHeader::read_from_prefix(reader.buffer()).unwrap(); + while reader.fill_buf().await?.len() < size_of::() {} + let header = CompactedSegmentHeader::read_from_prefix(reader.buffer()).unwrap(); copy_to_file(reader, file).await?; Ok(header) } + /// returns a stream over raw CompactedFrame bytes + async fn fetch_segment_data_stream_inner( + &self, + config: &S3Config, + folder_key: &FolderKey<'_>, + segment_key: &SegmentKey, + ) -> Result<( + CompactedSegmentHeader, + impl Stream>, + )> { + let reader = self + .fetch_segment_data_reader(config, folder_key, segment_key) + .await?; + + let mut reader = BufReader::new(reader); + let mut header = CompactedSegmentHeader::new_zeroed(); + reader.read_exact(header.as_bytes_mut()).await?; + + struct FrameDecoder { + verify: bool, + finished: bool, + current_crc: u32, + } + + impl FrameDecoder { + fn new(verify: bool, header: &CompactedSegmentHeader) -> Self { + let current_crc = if verify { + crc32fast::hash(header.as_bytes()) + } else { + 0 + }; + + Self { + finished: false, + verify, + current_crc, + } + } + } + + impl Decoder for FrameDecoder { + type Item = (CompactedFrameHeader, Bytes); + type Error = Error; + + fn decode( + &mut self, + src: &mut BytesMut, + ) -> std::result::Result, Self::Error> { + if self.finished { + return Ok(None); + } + + if src.len() >= size_of::() { + let mut data = src.split_to(size_of::()); + let header_bytes = data.split_to(size_of::()); + let header = CompactedFrameHeader::read_from(&header_bytes).unwrap(); + + if header.is_last() { + self.finished = true; + } + + if self.verify { + let checksum = header.compute_checksum(self.current_crc, &data); + if checksum != header.checksum() { + todo!("invalid frame checksum") + } + self.current_crc = checksum; + } + + return Ok(Some((header, data.freeze()))); + } else { + if src.capacity() < size_of::() { + src.reserve(size_of::() - src.capacity()) + } + Ok(None) + } + } + + fn decode_eof( + &mut self, + buf: &mut BytesMut, + ) -> std::result::Result, Self::Error> { + match self.decode(buf)? { + Some(frame) => Ok(Some(frame)), + None => { + if buf.is_empty() { + Ok(None) + } else { + Err(std::io::Error::new( + std::io::ErrorKind::Other, + "bytes remaining on stream", + ) + .into()) + } + } + } + } + } + + let stream = tokio_util::codec::FramedRead::new(reader, FrameDecoder::new(true, &header)); + + Ok((header, stream)) + } + async fn s3_get(&self, config: &S3Config, key: impl ToString) -> Result { Ok(self .client @@ -327,71 +431,6 @@ impl S3Backend { } } - // This method could probably be optimized a lot by using indexes and only downloading useful - // segments - async fn restore_latest( - &self, - config: &S3Config, - namespace: &NamespaceName, - dest: impl FileExt, - ) -> Result<()> { - let folder_key = FolderKey { - cluster_id: &config.cluster_id, - namespace, - }; - let Some(latest_key) = self - .find_segment_by_frame_no(config, &folder_key, u64::MAX) - .await? - else { - tracing::info!("nothing to restore for {namespace}"); - return Ok(()); - }; - - let reader = self - .fetch_segment_data_reader(config, &folder_key, &latest_key) - .await?; - let mut reader = BufReader::new(reader); - let mut header: CompactedSegmentDataHeader = CompactedSegmentDataHeader::new_zeroed(); - reader.read_exact(header.as_bytes_mut()).await?; - let db_size = header.size_after.get(); - let mut seen = RoaringBitmap::new(); - let mut frame: Frame = Frame::new_zeroed(); - loop { - for _ in 0..header.frame_count.get() { - reader.read_exact(frame.as_bytes_mut()).await?; - let page_no = frame.header().page_no(); - if !seen.contains(page_no) { - seen.insert(page_no); - let offset = (page_no as u64 - 1) * 4096; - let buf = ZeroCopyBuf::new_init(frame).map_slice(|f| f.get_ref().data()); - let (buf, ret) = dest.write_all_at_async(buf, offset).await; - ret?; - frame = buf.into_inner().into_inner(); - } - } - - // db is restored - if seen.len() == db_size as u64 { - break; - } - - let next_frame_no = header.start_frame_no.get() - 1; - let Some(key) = self - .find_segment_by_frame_no(config, &folder_key, next_frame_no) - .await? - else { - todo!("there should be a segment!"); - }; - let r = self - .fetch_segment_data_reader(config, &folder_key, &key) - .await?; - reader = BufReader::new(r); - reader.read_exact(header.as_bytes_mut()).await?; - } - - Ok(()) - } - async fn fetch_segment_from_key( &self, config: &S3Config, @@ -451,6 +490,54 @@ impl S3Backend { } } } + + async fn store_segment_data_inner( + &self, + config: &S3Config, + namespace: &NamespaceName, + body: ByteStream, + segment_key: &SegmentKey, + ) -> Result<()> { + let folder_key = FolderKey { + cluster_id: &config.cluster_id, + namespace, + }; + let s3_data_key = s3_segment_data_key(&folder_key, segment_key); + + self.s3_put(config, s3_data_key, body).await + } + + async fn store_segment_index_inner( + &self, + config: &S3Config, + namespace: &NamespaceName, + index: Vec, + segment_key: &SegmentKey, + ) -> Result<()> { + let folder_key = FolderKey { + cluster_id: &config.cluster_id, + namespace, + }; + let s3_index_key = s3_segment_index_key(&folder_key, segment_key); + + let checksum = crc32fast::hash(&index); + let header = SegmentIndexHeader { + version: 1.into(), + len: (index.len() as u64).into(), + checksum: checksum.into(), + magic: LIBSQL_MAGIC.into(), + }; + + let mut bytes = BytesMut::with_capacity(size_of::() + index.len()); + bytes.extend_from_slice(header.as_bytes()); + bytes.extend_from_slice(&index); + + let body = ByteStream::from(bytes.freeze()); + + self.s3_put(config, s3_index_key, body).await?; + + Ok(()) + } } pub struct S3Config { @@ -547,36 +634,12 @@ where segment_data: impl FileExt, segment_index: Vec, ) -> Result<()> { - let folder_key = FolderKey { - cluster_id: &config.cluster_id, - namespace: &meta.namespace, - }; let segment_key = SegmentKey::from(&meta); - let s3_data_key = s3_segment_data_key(&folder_key, &segment_key); - let body = FileStreamBody::new(segment_data).into_byte_stream(); - - self.s3_put(config, s3_data_key, body).await?; - - let s3_index_key = s3_segment_index_key(&folder_key, &segment_key); - - let checksum = crc32fast::hash(&segment_index); - let header = SegmentIndexHeader { - version: 1.into(), - len: (segment_index.len() as u64).into(), - checksum: checksum.into(), - magic: LIBSQL_MAGIC.into(), - }; - - let mut bytes = - BytesMut::with_capacity(size_of::() + segment_index.len()); - bytes.extend_from_slice(header.as_bytes()); - bytes.extend_from_slice(&segment_index); - - let body = ByteStream::from(bytes.freeze()); - - self.s3_put(config, s3_index_key, body).await?; - + self.store_segment_data_inner(config, &meta.namespace, body, &segment_key) + .await?; + self.store_segment_index(config, &meta.namespace, &segment_key, segment_index) + .await?; Ok(()) } @@ -604,19 +667,6 @@ where self.default_config.clone() } - async fn restore( - &self, - config: &Self::Config, - namespace: &NamespaceName, - restore_options: RestoreOptions, - dest: impl FileExt, - ) -> Result<()> { - match restore_options { - RestoreOptions::Latest => self.restore_latest(config, &namespace, dest).await, - RestoreOptions::Timestamp(_) => todo!(), - } - } - async fn find_segment( &self, config: &Self::Config, @@ -660,7 +710,7 @@ where namespace: &NamespaceName, key: &SegmentKey, file: &impl FileExt, - ) -> Result { + ) -> Result { let folder_key = FolderKey { cluster_id: &config.cluster_id, namespace: &namespace, @@ -691,6 +741,84 @@ where ) -> impl Stream> + 'a { self.list_segments_inner(config, namespace, until) } + + async fn fetch_segment_data_stream( + &self, + config: Self::Config, + namespace: &NamespaceName, + key: SegmentKey, + ) -> Result<( + CompactedSegmentHeader, + impl Stream>, + )> { + let folder_key = FolderKey { + cluster_id: &config.cluster_id, + namespace: &namespace, + }; + self.fetch_segment_data_stream_inner(&config, &folder_key, &key) + .await + } + + async fn store_segment_data( + &self, + config: &Self::Config, + namespace: &NamespaceName, + segment_key: &SegmentKey, + segment_data: impl Stream> + Send + Sync + 'static, + ) -> Result<()> { + let byte_stream = StreamBody::new(segment_data); + let body = ByteStream::from_body_1_x(byte_stream); + self.store_segment_data_inner(config, namespace, body, &segment_key) + .await?; + + Ok(()) + } + + async fn store_segment_index( + &self, + config: &Self::Config, + namespace: &NamespaceName, + segment_key: &SegmentKey, + index: Vec, + ) -> Result<()> { + self.store_segment_index_inner(config, namespace, index, segment_key) + .await?; + + Ok(()) + } +} + +pin_project! { + struct StreamBody { + #[pin] + s: S, + } +} + +impl StreamBody { + fn new(s: S) -> Self { + Self { s } + } +} + +impl http_body::Body for StreamBody +where + S: Stream>, +{ + type Data = bytes::Bytes; + type Error = crate::storage::Error; + + fn poll_frame( + self: Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> Poll, Self::Error>>> { + let this = self.project(); + match std::task::ready!(this.s.poll_next(cx)) { + Some(Ok(data)) => Poll::Ready(Some(Ok(HttpFrame::data(data)))), + Some(Err(e)) => Poll::Ready(Some(Err(e))), + None => Poll::Ready(None), + } + } } #[derive(Clone, Copy)] @@ -802,7 +930,6 @@ mod tests { use fst::MapBuilder; use s3s::auth::SimpleAuth; use s3s::service::{S3ServiceBuilder, SharedS3Service}; - use uuid::Uuid; use crate::io::StdIO; @@ -870,7 +997,6 @@ mod tests { &s3_config, SegmentMeta { namespace: ns.clone(), - segment_id: Uuid::new_v4(), start_frame_no: 1u64.into(), end_frame_no: 64u64.into(), segment_timestamp: Utc::now(), @@ -892,7 +1018,6 @@ mod tests { &s3_config, SegmentMeta { namespace: ns.clone(), - segment_id: Uuid::new_v4(), start_frame_no: 64u64.into(), end_frame_no: 128u64.into(), segment_timestamp: Utc::now(), diff --git a/libsql-wal/src/storage/compaction/mod.rs b/libsql-wal/src/storage/compaction/mod.rs index d91cf27238..8157f81f98 100644 --- a/libsql-wal/src/storage/compaction/mod.rs +++ b/libsql-wal/src/storage/compaction/mod.rs @@ -1,26 +1,30 @@ +use std::io::Write as _; use std::ops::Deref; use std::path::Path; use std::path::PathBuf; use std::sync::Arc; -use chrono::DateTime; -use fst::map::OpBuilder; +use bytes::Bytes; +use fst::raw::IndexedValue; +use fst::MapBuilder; use fst::Streamer; +use futures::FutureExt as _; +use futures::Stream; +use futures::StreamExt as _; +use futures::TryStreamExt; use libsql_sys::name::NamespaceName; use libsql_sys::rusqlite::OptionalExtension; use libsql_sys::rusqlite::{self, TransactionBehavior}; +use roaring::RoaringBitmap; use tempfile::tempdir; -use tokio_stream::StreamExt; -use uuid::Uuid; +use tokio::io::AsyncWriteExt; +use tokio::task::JoinHandle; use zerocopy::AsBytes; -use crate::io::buf::ZeroCopyBuf; -use crate::io::FileExt; -use crate::segment::compacted::CompactedSegment; -use crate::segment::compacted::CompactedSegmentDataFooter; -use crate::segment::compacted::CompactedSegmentDataHeader; -use crate::segment::Frame; -use crate::storage::backend::SegmentMeta; +use crate::io::FileExt as _; +use crate::segment::compacted::CompactedFrameHeader; +use crate::segment::compacted::CompactedSegmentHeader; +use crate::LibsqlFooter; use crate::LIBSQL_MAGIC; use crate::LIBSQL_PAGE_SIZE; use crate::LIBSQL_WAL_VERSION; @@ -44,17 +48,17 @@ pub enum Error { pub struct Compactor { backend: Arc, - meta: rusqlite::Connection, + conn: rusqlite::Connection, path: PathBuf, } impl Compactor { pub fn new(backend: Arc, compactor_path: &Path) -> Result { - let meta = rusqlite::Connection::open(compactor_path.join("meta.db"))?; + let conn = rusqlite::Connection::open(compactor_path.join("meta.db"))?; // todo! set pragmas: wal + foreign key check - meta.pragma_update(None, "journal_mode", "wal")?; - meta.execute(r#"CREATE TABLE IF NOT EXISTS monitored_namespaces (id INTEGER PRIMARY KEY AUTOINCREMENT, namespace_name BLOB NOT NULL, UNIQUE(namespace_name))"#, ()).unwrap(); - meta.execute( + conn.pragma_update(None, "journal_mode", "wal")?; + conn.execute(r#"CREATE TABLE IF NOT EXISTS monitored_namespaces (id INTEGER PRIMARY KEY AUTOINCREMENT, namespace_name BLOB NOT NULL, UNIQUE(namespace_name))"#, ()).unwrap(); + conn.execute( r#"CREATE TABLE IF NOT EXISTS segments ( start_frame_no INTEGER, end_frame_no INTEGER, @@ -68,7 +72,7 @@ impl Compactor { Ok(Self { backend, - meta, + conn, path: compactor_path.into(), }) } @@ -77,7 +81,7 @@ impl Compactor { where B: Backend, { - let tx = self.meta.transaction()?; + let tx = self.conn.transaction()?; let id = { let mut stmt = tx.prepare_cached("INSERT OR IGNORE INTO monitored_namespaces(namespace_name) VALUES (?) RETURNING id")?; stmt.query_row([namespace.as_str()], |r| r.get(0)) @@ -94,7 +98,7 @@ impl Compactor { } pub fn analyze(&self, namespace: &NamespaceName) -> Result { - let mut stmt = self.meta.prepare_cached( + let mut stmt = self.conn.prepare_cached( r#" SELECT start_frame_no, end_frame_no, timestamp FROM segments as s @@ -127,7 +131,7 @@ impl Compactor { &self, namespace: &NamespaceName, ) -> Result> { - segments_range(&self.meta, namespace) + segments_range(&self.conn, namespace) } /// Polls storage for new frames since last sync @@ -148,7 +152,7 @@ impl Compactor { B: Backend, { let tx = self - .meta + .conn .transaction_with_behavior(TransactionBehavior::Immediate)?; { let mut stmt = tx.prepare("SELECT namespace_name, id FROM monitored_namespaces")?; @@ -170,7 +174,7 @@ impl Compactor { B: Backend, { let tx = self - .meta + .conn .transaction_with_behavior(TransactionBehavior::Immediate)?; { let mut stmt = @@ -188,190 +192,296 @@ impl Compactor { Ok(()) } - async fn fetch( - &self, - set: &SegmentSet, - into: &Path, - ) -> Result<( - Vec>, - Vec>>, - )> + /// `dedup_frames` takes a segment set and returns a stream of deduplicated raw frames, + /// containing the most recent version of every frame in the segments covered by the set. + /// + /// if out_index is passed, the index of the new segement is generated and put there + /// + /// the progress callback is called with (count_pages, total_pages), as new pages are found + /// + /// returns a stream of the most recent deduplicated frames, and the size_after for that new + /// segment + async fn dedup_stream<'a>( + &'a self, + set: SegmentSet, + out_index: Option<&'a mut MapBuilder>>, + mut progress: impl FnMut(u32, u32) + 'a, + ) -> ( + impl Stream> + 'a, + CompactedSegmentHeader, + ) where B: Backend, { - let mut indexes = Vec::with_capacity(set.len()); - let mut segments = Vec::with_capacity(set.len()); - for key in set.iter() { - let file = std::fs::File::options() - .create_new(true) - .write(true) - .read(true) - .open(into.join(&format!("{key}.data"))) - .unwrap(); - let header = self - .backend - .fetch_segment_data_to_file( - &self.backend.default_config(), - &set.namespace, - key, - &file, - ) - .await - .unwrap(); - let index = self - .backend - .fetch_segment_index(&self.backend.default_config(), &set.namespace, key) - .await - .unwrap(); - indexes.push(index); - segments.push(CompactedSegment::from_parts(file, header)); - } + let (snd, rcv) = tokio::sync::oneshot::channel(); + let mut snd = Some(snd); + + let stream = async_stream::try_stream! { + assert!(!set.is_empty()); + let tmp = tempdir()?; + let config = self.backend.default_config(); + // We fetch indexes in reverse order so that the most recent index comes first + let indexes_stream = futures::stream::iter(set.iter().rev()).map(|k| { + self + .backend + .fetch_segment_index(&config, &set.namespace, k) + .map(|i| i.map(|i| (i, *k))) + }) + // we download indexes in the background as we read from their data files to reduce + // latencies + .buffered(4); + + tokio::pin!(indexes_stream); + + let mut size_after = u64::MAX; + let mut seen_pages = RoaringBitmap::new(); + // keep track of the indexes for segments that we took frames from. This is a vec of + // memory mapped segments, sorted by descending segment timestamp. + let mut saved_indexes = Vec::new(); + // this map keeps a mapping from index in the saved indexed to the count of frames + // taken from that segment. It is necessary to rebuild the new index and compute the + // actual position of a frame in the streamed segement. + let mut index_offset_mapping = Vec::new(); + let mut page_count = 0; + let mut current_crc = 0; + + while let Some((index, key)) = indexes_stream.try_next().await? { + let mut s = index.stream(); + // how many frames to take from that segment + let mut frames_to_take = 0; + while let Some((pno, _)) = s.next() { + let pno = u32::from_be_bytes(pno.try_into().unwrap()); + if !seen_pages.contains(pno) { + // this segment contains data that we haven't seen before, download that + // segment + frames_to_take += 1; + } + tokio::task::consume_budget().await; + } + + tracing::debug!(key = ?key, "taking {} frames from segment", frames_to_take); + + // no frames to take + if frames_to_take == 0 { continue } + + // we need to build an index at the end, so we keep the indexes. + // To reduce the amount of RAM needed to handle all the potential indexes, we write + // it to disk, and map the file. + if out_index.is_some() { + let mut index_file = std::fs::File::options() + .create(true) + .read(true) + .write(true) + .open(tmp.path().join(&format!("{key}")))?; + index_file.write_all(index.as_fst().as_bytes())?; + let map = unsafe { memmap::Mmap::map(&index_file)? }; + let index = fst::Map::new(map).unwrap(); + saved_indexes.push(index); + index_offset_mapping.push(page_count); + } + + let (segment_header, frames) = self.backend.fetch_segment_data_stream(config.clone(), &set.namespace, key).await?; + + if size_after == u64::MAX { + size_after = segment_header.size_after() as u64; + let key = set.compact_key().expect("we asserted that the segment wasn't empty"); + let segment_header = CompactedSegmentHeader::new(key.start_frame_no, key.end_frame_no, size_after as u32, key.timestamp(), uuid::Uuid::from_u128(segment_header.log_id.get())); + current_crc = crc32fast::hash(segment_header.as_bytes()); + let _ = snd.take().unwrap().send(segment_header); + } + + tokio::pin!(frames); + + let mut frames_taken = 0; + while let Some((mut frame_header, frame_data)) = frames.try_next().await ? { + // we took all the frames that we needed from that segment, no need to read the + // rest of it + if frames_taken == frames_to_take { + break + } + + + if seen_pages.insert(frame_header.page_no()) { + frames_taken += 1; + page_count += 1; + let is_last = if page_count == size_after { + frame_header.set_last(); + true + } else { + frame_header.reset_flags(); + false + }; + + current_crc = frame_header.update_checksum(current_crc, &frame_data); + progress(page_count as u32, size_after as _); + yield (frame_header, frame_data); + if is_last { + break + } + } + } + } + + // now, we need to construct the index. + if let Some(out_index) = out_index { + let op_builder = saved_indexes.iter().collect::(); + let mut union = op_builder.union(); + while let Some((pno, idxs)) = union.next() { + let &IndexedValue { index, .. } = idxs.iter().min_by_key(|idx| idx.index).unwrap(); + let offset = index_offset_mapping[index]; + index_offset_mapping[index] += 1; + out_index.insert(pno, offset as _).unwrap(); + tokio::task::consume_budget().await; + } + } + }.peekable(); - Ok((segments, indexes)) + let mut stream = Box::pin(stream); + let header = { + stream.as_mut().peek().await; + rcv.await.unwrap() + }; + + (stream, header) } - pub async fn compact(&self, set: SegmentSet) -> Result<()> + /// compact the passed segment set to out_path if provided, otherwise, uploads it to the + /// backend + pub async fn compact( + &self, + set: SegmentSet, + out_path: Option<&Path>, + progress: impl FnMut(u32, u32), + ) -> Result<()> where B: Backend, { - assert!(!set.is_empty()); - let tmp = tempdir().unwrap(); - // FIXME: bruteforce: we don't necessarily need to download all the segments to cover all - // the changes. Iterating backward over the set items and filling the gaps in the pages - // range would, in theory, be sufficient - // another alternative is to download all the indexes, and lazily download the segment data - // TODO: fetch conccurently - let (segments, indexes) = self.fetch(&set, tmp.path()).await?; - let last_header = segments.last().expect("non-empty set").header(); - - // It's unfortunate that we need to know the number of frames in the final segment ahead of - // time, but it's necessary for computing the checksum. This seems like the least costly - // methods (over recomputing the whole checksum). - let mut union = OpBuilder::from_iter(indexes.iter()).union(); - let mut count = 0; - while let Some(_) = union.next() { - count += 1; - } + let Some(new_key) = set.compact_key() else { + return Ok(()); + }; - let mut hasher = crc32fast::Hasher::new(); + let mut builder = MapBuilder::new(Vec::new()).unwrap(); - let out_file = std::fs::File::options() - .create_new(true) - .write(true) - .read(true) - .open(tmp.path().join("out")) - .unwrap(); - let header = CompactedSegmentDataHeader { - frame_count: (count as u32).into(), - segment_id: Uuid::new_v4().to_u128_le().into(), - start_frame_no: set.range().expect("non-empty set").0.into(), - end_frame_no: set.range().expect("non-empty set").1.into(), - size_after: last_header.size_after, - version: LIBSQL_WAL_VERSION.into(), - magic: LIBSQL_MAGIC.into(), - page_size: last_header.page_size, - // the new compacted segment inherit the last segment timestamp: it contains the same - // logical data. - timestamp: last_header.timestamp, + let (sender, mut receiver) = tokio::sync::mpsc::channel::>(1); + let handle: JoinHandle> = match out_path { + Some(path) => { + let path = path.join(&format!("{new_key}.seg")); + let mut data_file = tokio::fs::File::create(path).await?; + tokio::task::spawn(async move { + while let Some(data) = receiver.recv().await { + let data = data?; + data_file.write_all(&data).await?; + } + + data_file.flush().await?; + + Ok(()) + }) + } + None => { + let backend = self.backend.clone(); + let config = self.backend.default_config(); + let ns = set.namespace.clone(); + let key = new_key.clone(); + tokio::task::spawn(async move { + backend + .store_segment_data( + &config, + &ns, + &key, + tokio_stream::wrappers::ReceiverStream::new(receiver), + ) + .await?; + Ok(()) + }) + } }; - hasher.update(header.as_bytes()); - let (_, ret) = out_file - .write_all_at_async(ZeroCopyBuf::new_init(header), 0) + let (stream, segment_header) = self + .dedup_stream(set.clone(), Some(&mut builder), progress) .await; - ret?; - - let mut union = OpBuilder::from_iter(indexes.iter()).union(); - let mut buffer = Box::new(ZeroCopyBuf::::new_uninit()); - let mut out_index = fst::MapBuilder::memory(); - let mut current_offset = 0; - - while let Some((page_no_bytes, indexed_offsets)) = union.next() { - let (index, offset) = indexed_offsets - .iter() - .max_by_key(|v| v.index) - .map(|v| (v.index, v.value)) - .expect("union returned something, must be non-empty"); - let segment = &segments[index]; - let (frame, ret) = segment.read_frame(buffer, offset as u32).await; - ret?; - hasher.update(&frame.get_ref().as_bytes()); - let dest_offset = - size_of::() + current_offset * size_of::(); - let (mut frame, ret) = out_file.write_all_at_async(frame, dest_offset as u64).await; - ret?; - out_index - .insert(page_no_bytes, current_offset as _) - .unwrap(); - current_offset += 1; - frame.deinit(); - buffer = frame; + + sender + .send(Ok(Bytes::copy_from_slice(segment_header.as_bytes()))) + .await + .unwrap(); + + { + tokio::pin!(stream); + loop { + match stream.next().await { + Some(Ok((frame_header, frame_data))) => { + sender + .send(Ok(Bytes::copy_from_slice(frame_header.as_bytes()))) + .await + .unwrap(); + sender.send(Ok(frame_data)).await.unwrap(); + } + Some(Err(_e)) => { + panic!() + // sender.send(Err(e.into())).await.unwrap(); + } + None => break, + } + } + drop(sender); } - let footer = CompactedSegmentDataFooter { - checksum: hasher.finalize().into(), - }; + handle.await.unwrap()?; - let footer_offset = - size_of::() + current_offset * size_of::(); - let (_, ret) = out_file - .write_all_at_async(ZeroCopyBuf::new_init(footer), footer_offset as _) - .await; - ret?; - - let (start, end) = set.range().expect("non-empty set"); - let timestamp = DateTime::from_timestamp_millis(set.last().unwrap().timestamp as _) - .unwrap() - .to_utc(); - self.backend - .store( - &self.backend.default_config(), - SegmentMeta { - namespace: set.namespace.clone(), - segment_id: Uuid::new_v4(), - start_frame_no: start, - end_frame_no: end, - segment_timestamp: timestamp, - }, - out_file, - out_index.into_inner().unwrap(), - ) - .await?; + let index = builder.into_inner().unwrap(); + match out_path { + Some(path) => { + let mut index_file = + tokio::fs::File::create(path.join(&format!("{new_key}.idx"))).await?; + index_file.write_all(&index).await?; + index_file.flush().await?; + } + None => { + self.backend + .store_segment_index( + &self.backend.default_config(), + &set.namespace, + &new_key, + index, + ) + .await?; + } + } Ok(()) } /// Restore a datatase file from a segment set /// set must start at frame_no 1 - pub async fn restore(&self, set: SegmentSet, to: impl AsRef) -> Result<()> + pub async fn restore( + &self, + set: SegmentSet, + to: impl AsRef, + progress: impl FnMut(u32, u32), + ) -> Result<()> where B: Backend, { - if set.is_empty() { - return Ok(()); - } - assert_eq!(set.range().unwrap().0, 1); - let tmp = tempdir()?; - let (segments, indexes) = self.fetch(&set, tmp.path()).await?; - let mut union = OpBuilder::from_iter(indexes.iter()).union(); - let mut buffer = Vec::with_capacity(LIBSQL_PAGE_SIZE as usize); - let out_file = std::fs::File::create(to)?; - - while let Some((page_no_bytes, indexed_offsets)) = union.next() { - let page_no = u32::from_be_bytes(page_no_bytes.try_into().unwrap()); - let (index, offset) = indexed_offsets - .iter() - .max_by_key(|v| v.index) - .map(|v| (v.index, v.value as u32)) - .expect("union returned something, must be non-empty"); - let segment = &segments[index]; - let (b, ret) = segment.read_page(buffer, offset).await; - ret?; - let offset = (page_no as u64 - 1) * LIBSQL_PAGE_SIZE as u64; - let (mut b, ret) = out_file.write_all_at_async(b, offset).await; + let file = std::fs::File::create(to)?; + let (stream, header) = self.dedup_stream(set.clone(), None, progress).await; + let _footer = LibsqlFooter { + magic: LIBSQL_MAGIC.into(), + version: LIBSQL_WAL_VERSION.into(), + replication_index: set.range().unwrap().1.into(), + log_id: header.log_id.get().into(), + }; + + tokio::pin!(stream); + + while let Some((frame_header, frame_data)) = stream.try_next().await? { + let (_, ret) = file + .write_all_at_async( + frame_data, + LIBSQL_PAGE_SIZE as u64 * (frame_header.page_no() as u64 - 1), + ) + .await; ret?; - b.clear(); - buffer = b; } Ok(()) @@ -382,15 +492,19 @@ impl Compactor { namespace: &NamespaceName, f: impl FnMut(SegmentInfo), ) -> Result<()> { - list_segments(&self.meta, namespace, f) + list_segments(&self.conn, namespace, f) } pub fn list_monitored_namespaces(&self, f: impl FnMut(NamespaceName)) -> Result<()> { - list_namespace(&self.meta, f) + list_namespace(&self.conn, f) } pub fn unmonitor(&self, ns: &NamespaceName) -> Result<()> { - unmonitor(&self.meta, ns) + unmonitor(&self.conn, ns) + } + + pub fn segment_info(&self, ns: &NamespaceName, key: SegmentKey) -> Result { + segment_infos(&self.conn, ns, key) } } @@ -471,6 +585,17 @@ impl SegmentSet { .zip(self.segments.last()) .map(|(f, l)| (f.start_frame_no, l.end_frame_no)) } + + pub fn compact_key(&self) -> Option { + match self.segments.first().zip(self.segments.last()) { + Some((f, l)) => Some(SegmentKey { + start_frame_no: f.start_frame_no, + end_frame_no: l.end_frame_no, + timestamp: l.timestamp, + }), + None => None, + } + } } impl Deref for SegmentSet { @@ -653,3 +778,18 @@ fn unmonitor(conn: &rusqlite::Connection, namespace: &NamespaceName) -> Result<( )?; Ok(()) } + +fn segment_infos( + conn: &rusqlite::Connection, + namespace: &NamespaceName, + key: SegmentKey, +) -> Result { + let mut stmt = conn.prepare("SELECT size FROM segments AS s JOIN monitored_namespaces AS ns WHERE s.start_frame_no=? AND s.end_frame_no=? AND ns.namespace_name=? LIMIT 1")?; + let mut rows = stmt.query((key.start_frame_no, key.end_frame_no, namespace.as_str()))?; + + let row = rows.next()?.unwrap(); + Ok(SegmentInfo { + key, + size: row.get(0)?, + }) +} diff --git a/libsql-wal/src/storage/job.rs b/libsql-wal/src/storage/job.rs index eb173ca0ec..b3351db023 100644 --- a/libsql-wal/src/storage/job.rs +++ b/libsql-wal/src/storage/job.rs @@ -51,7 +51,6 @@ where IO: Io, { let segment = &self.request.segment; - let segment_id = io.uuid(); let tmp = io.tempfile()?; tracing::debug!( @@ -59,13 +58,9 @@ where "sending segment to durable storage" ); - let new_index = segment - .compact(&tmp, segment_id) - .await - .map_err(super::Error::Compact)?; + let new_index = segment.compact(&tmp).await.map_err(super::Error::Compact)?; let meta = SegmentMeta { - segment_id, namespace: self.request.namespace.clone(), start_frame_no: segment.start_frame_no(), end_frame_no: segment.last_committed(), @@ -97,196 +92,3 @@ pub(crate) struct JobResult { /// The outcome of the job: the new durable index, or an error. pub(crate) result: Result, } - -#[cfg(test)] -mod test { - use std::future::ready; - use std::str::FromStr; - use std::sync::Arc; - - use chrono::prelude::DateTime; - use chrono::Utc; - use uuid::Uuid; - - use crate::io::file::FileExt; - use crate::io::StdIO; - use crate::segment::compacted::CompactedSegmentDataHeader; - use crate::storage::backend::FindSegmentReq; - use crate::storage::{RestoreOptions, SegmentKey}; - use libsql_sys::name::NamespaceName; - - use super::*; - - #[tokio::test] - async fn perform_job() { - #[derive(Debug)] - struct TestSegment; - - impl Segment for TestSegment { - async fn compact( - &self, - out_file: &impl FileExt, - id: uuid::Uuid, - ) -> crate::error::Result> { - out_file.write_all_at(id.to_string().as_bytes(), 0).unwrap(); - Ok(b"test_index".into()) - } - - fn start_frame_no(&self) -> u64 { - 1 - } - - fn last_committed(&self) -> u64 { - 10 - } - - fn index(&self) -> &fst::Map> { - todo!() - } - - fn read_page( - &self, - _page_no: u32, - _max_frame_no: u64, - _buf: &mut [u8], - ) -> std::io::Result { - todo!() - } - - fn is_checkpointable(&self) -> bool { - todo!() - } - - fn size_after(&self) -> u32 { - todo!() - } - - async fn read_frame_offset_async( - &self, - _offset: u32, - _buf: B, - ) -> (B, crate::error::Result<()>) - where - B: crate::io::buf::IoBufMut + Send + 'static, - { - todo!() - } - - fn destroy(&self, _io: &IO) -> impl std::future::Future { - async move { todo!() } - } - - fn timestamp(&self) -> DateTime { - Utc::now() - } - } - - struct TestBackend; - - impl Backend for TestBackend { - type Config = (); - - async fn store( - &self, - _config: &Self::Config, - meta: SegmentMeta, - segment_data: impl FileExt, - segment_index: Vec, - ) -> Result<()> { - // verify that the stored segment is the same as the one we compacted - assert_eq!(segment_index, b"test_index"); - let mut buf = vec![0; Uuid::new_v4().to_string().len()]; - segment_data.read_exact_at(&mut buf, 0).unwrap(); - let id = Uuid::from_str(std::str::from_utf8(&buf).unwrap()).unwrap(); - assert_eq!(meta.segment_id, id); - - Ok(()) - } - - async fn meta( - &self, - _config: &Self::Config, - _namespace: &NamespaceName, - ) -> Result { - todo!() - } - - fn default_config(&self) -> Self::Config { - () - } - - async fn restore( - &self, - _config: &Self::Config, - _namespace: &NamespaceName, - _restore_options: RestoreOptions, - _dest: impl FileExt, - ) -> Result<()> { - todo!() - } - - async fn find_segment( - &self, - _config: &Self::Config, - _namespace: &NamespaceName, - _frame_no: FindSegmentReq, - ) -> Result { - todo!() - } - - async fn fetch_segment_index( - &self, - _config: &Self::Config, - _namespace: &NamespaceName, - _key: &SegmentKey, - ) -> Result>> { - todo!() - } - - async fn fetch_segment_data_to_file( - &self, - _config: &Self::Config, - _namespace: &NamespaceName, - _key: &SegmentKey, - _file: &impl FileExt, - ) -> Result { - todo!() - } - - async fn fetch_segment_data( - self: Arc, - _config: Self::Config, - _namespace: NamespaceName, - _key: SegmentKey, - ) -> Result { - Ok(std::fs::File::open("").unwrap()) - } - - fn list_segments<'a>( - &'a self, - _config: Self::Config, - _namespace: &'a NamespaceName, - _until: u64, - ) -> impl tokio_stream::Stream> + 'a - { - tokio_stream::iter(std::iter::from_fn(|| todo!())) - } - } - - let job = Job { - request: IndexedRequest { - request: StoreSegmentRequest { - namespace: "test".into(), - segment: TestSegment, - created_at: Utc::now(), - storage_config_override: None, - on_store_callback: Box::new(|_| Box::pin(ready(()))), - }, - id: 0, - }, - }; - - let result = job.perform(TestBackend, StdIO(())).await; - assert_eq!(result.result.unwrap(), 10); - } -} diff --git a/libsql-wal/src/storage/mod.rs b/libsql-wal/src/storage/mod.rs index 86f09abaae..62d7282a6f 100644 --- a/libsql-wal/src/storage/mod.rs +++ b/libsql-wal/src/storage/mod.rs @@ -196,14 +196,6 @@ pub trait Storage: Send + Sync + 'static { config_override: Option, ) -> impl Future> + Send; - async fn restore( - &self, - file: impl FileExt, - namespace: &NamespaceName, - restore_options: RestoreOptions, - config_override: Option, - ) -> Result<()>; - fn find_segment( &self, namespace: &NamespaceName, @@ -290,19 +282,6 @@ where } } - async fn restore( - &self, - file: impl FileExt, - namespace: &NamespaceName, - restore_options: RestoreOptions, - config_override: Option, - ) -> Result<()> { - match zip(self, config_override) { - Either::A((s, c)) => s.restore(file, namespace, restore_options, c).await, - Either::B((s, c)) => s.restore(file, namespace, restore_options, c).await, - } - } - fn find_segment( &self, namespace: &NamespaceName, @@ -402,16 +381,6 @@ impl Storage for NoStorage { Ok(u64::MAX) } - async fn restore( - &self, - _file: impl FileExt, - _namespace: &NamespaceName, - _restore_options: RestoreOptions, - _config_override: Option, - ) -> Result<()> { - panic!("can restore from no storage") - } - async fn find_segment( &self, _namespace: &NamespaceName, @@ -515,11 +484,10 @@ impl Storage for TestStorage { ) { let mut inner = self.inner.lock_blocking(); if inner.store { - let id = uuid::Uuid::new_v4(); - let out_path = inner.dir.path().join(id.to_string()); + let out_path = inner.dir.path().join(uuid::Uuid::new_v4().to_string()); let out_file = inner.io.open(true, true, true, &out_path).unwrap(); let index = tokio::runtime::Handle::current() - .block_on(seg.compact(&out_file, id)) + .block_on(seg.compact(&out_file)) .unwrap(); let end_frame_no = seg.header().last_committed(); let key = SegmentKey { @@ -551,16 +519,6 @@ impl Storage for TestStorage { Ok(u64::MAX) } - async fn restore( - &self, - _file: impl FileExt, - _namespace: &NamespaceName, - _restore_options: RestoreOptions, - _config_override: Option, - ) -> Result<()> { - todo!(); - } - async fn find_segment( &self, namespace: &NamespaceName, diff --git a/libsql-wal/src/transaction.rs b/libsql-wal/src/transaction.rs index a01a7fcff6..519bc6620f 100644 --- a/libsql-wal/src/transaction.rs +++ b/libsql-wal/src/transaction.rs @@ -1,5 +1,6 @@ use std::collections::BTreeMap; use std::ops::{Deref, DerefMut}; +use std::sync::atomic::AtomicU64; use std::sync::Arc; use std::time::Instant; @@ -131,6 +132,26 @@ pub struct Savepoint { pub index: BTreeMap, } +pub static SAVEPOINT_COUNTER: AtomicU64 = AtomicU64::new(0); + +impl Savepoint { + pub fn new(next_offset: u32, next_frame_no: u64, current_checksum: u32) -> Self { + SAVEPOINT_COUNTER.fetch_add(1, std::sync::atomic::Ordering::SeqCst); + Self { + next_offset, + next_frame_no, + current_checksum, + index: Default::default(), + } + } +} + +impl Drop for Savepoint { + fn drop(&mut self) { + SAVEPOINT_COUNTER.fetch_sub(1, std::sync::atomic::Ordering::SeqCst); + } +} + /// The savepoints must be passed from most recent to oldest pub(crate) fn merge_savepoints<'a>( savepoints: impl Iterator>, @@ -224,12 +245,11 @@ impl WriteTransaction { pub fn savepoint(&mut self) -> usize { let savepoint_id = self.savepoints.len(); - self.savepoints.push(Savepoint { - next_offset: self.next_offset, - next_frame_no: self.next_frame_no, - index: BTreeMap::new(), - current_checksum: self.current_checksum, - }); + self.savepoints.push(Savepoint::new( + self.next_offset, + self.next_frame_no, + self.current_checksum, + )); savepoint_id }