From 06e177fb9803de90951ad6c4735e53a3d07d7a6a Mon Sep 17 00:00:00 2001 From: ad hoc Date: Mon, 30 Sep 2024 12:42:44 +0200 Subject: [PATCH] streaming compaction --- libsql-wal/Cargo.toml | 3 + libsql-wal/src/replication/storage.rs | 16 +- libsql-wal/src/segment/compacted.rs | 140 ++++++- libsql-wal/src/segment/mod.rs | 55 +-- libsql-wal/src/segment/sealed.rs | 84 ++-- libsql-wal/src/shared_wal.rs | 8 +- libsql-wal/src/storage/async_storage.rs | 15 +- libsql-wal/src/storage/backend/mod.rs | 97 +++-- libsql-wal/src/storage/backend/s3.rs | 361 ++++++++++------ libsql-wal/src/storage/compaction/mod.rs | 504 +++++++++++++++-------- libsql-wal/src/storage/job.rs | 200 +-------- libsql-wal/src/storage/mod.rs | 46 +-- libsql-wal/src/transaction.rs | 32 +- 13 files changed, 860 insertions(+), 701 deletions(-) diff --git a/libsql-wal/Cargo.toml b/libsql-wal/Cargo.toml index 5a356d50f4..c5e39cb853 100644 --- a/libsql-wal/Cargo.toml +++ b/libsql-wal/Cargo.toml @@ -48,6 +48,9 @@ rand = "0.8.5" aws-smithy-types-convert = { version = "0.60.8", features = ["convert-chrono"] } petgraph = "0.6.5" anyhow = { version = "1.0.86", optional = true } +futures = "0.3.30" +memmap = "0.7.0" +pin-project-lite = "0.2.14" [dev-dependencies] criterion = "0.5.1" diff --git a/libsql-wal/src/replication/storage.rs b/libsql-wal/src/replication/storage.rs index 062f7a6b6e..d239bad300 100644 --- a/libsql-wal/src/replication/storage.rs +++ b/libsql-wal/src/replication/storage.rs @@ -8,6 +8,7 @@ use tokio_stream::Stream; use zerocopy::FromZeroes; use crate::io::buf::ZeroCopyBoxIoBuf; +use crate::segment::compacted::CompactedFrame; use crate::segment::Frame; use crate::storage::backend::FindSegmentReq; use crate::storage::Storage; @@ -65,10 +66,19 @@ where }, }; - let (frame, ret) = segment.read_frame(ZeroCopyBoxIoBuf::new_uninit(Frame::new_box_zeroed()), offset as u32).await; + // TODO: The copy here is inneficient. This is OK for now, until we rewrite + // this code to read from the main db file instead of storage. + let (compacted_frame, ret) = segment.read_frame(ZeroCopyBoxIoBuf::new_uninit(CompactedFrame::new_box_zeroed()), offset as u32).await; ret?; - let frame = frame.into_inner(); - debug_assert_eq!(frame.header().size_after(), 0, "all frames in a compacted segment should have size_after set to 0"); + let compacted_frame = compacted_frame.into_inner(); + let mut frame = Frame::new_box_zeroed(); + frame.data_mut().copy_from_slice(&compacted_frame.data); + + let header = frame.header_mut(); + header.frame_no = compacted_frame.header().frame_no; + header.size_after = 0.into(); + header.page_no = compacted_frame.header().page_no; + if frame.header().frame_no() >= until { yield frame; } diff --git a/libsql-wal/src/segment/compacted.rs b/libsql-wal/src/segment/compacted.rs index 4259114425..5c123cf26e 100644 --- a/libsql-wal/src/segment/compacted.rs +++ b/libsql-wal/src/segment/compacted.rs @@ -1,23 +1,23 @@ use std::io; -use std::mem::size_of; +use std::mem::{offset_of, size_of}; +use chrono::{DateTime, Utc}; +use uuid::Uuid; use zerocopy::little_endian::{U128 as lu128, U16 as lu16, U32 as lu32, U64 as lu64}; use zerocopy::{AsBytes, FromBytes, FromZeroes}; use crate::io::buf::{IoBufMut, ZeroCopyBuf}; use crate::io::FileExt; -use crate::segment::FrameHeader; use crate::{LIBSQL_MAGIC, LIBSQL_PAGE_SIZE, LIBSQL_WAL_VERSION}; -use super::{Frame, Result}; +use super::Result; #[derive(Debug, AsBytes, FromZeroes, FromBytes)] #[repr(C)] -pub struct CompactedSegmentDataHeader { +pub struct CompactedSegmentHeader { pub(crate) magic: lu64, pub(crate) version: lu16, - pub(crate) frame_count: lu32, - pub(crate) segment_id: lu128, + pub(crate) log_id: lu128, pub(crate) start_frame_no: lu64, pub(crate) end_frame_no: lu64, pub(crate) size_after: lu32, @@ -25,7 +25,107 @@ pub struct CompactedSegmentDataHeader { pub(crate) page_size: lu16, pub(crate) timestamp: lu64, } -impl CompactedSegmentDataHeader { + +bitflags::bitflags! { + pub struct CompactedFrameFlags: u32 { + /// This flag is set for the last frame in the segment + const LAST = 1 << 0; + } +} + +#[derive(Debug, AsBytes, FromZeroes, FromBytes)] +#[repr(C)] +pub struct CompactedFrameHeader { + pub flags: lu32, + pub page_no: lu32, + pub frame_no: lu64, + /// running checksum from this frame + /// this is the crc32 of the checksum of the previous frame and all the frame data, including + /// all the fields before checksum in the header. THIS FIELD MUST ALWAYS BE THE last FIELD IN + /// THE STRUCT + pub checksum: lu32, +} + +impl CompactedFrameHeader { + pub fn flags(&self) -> CompactedFrameFlags { + CompactedFrameFlags::from_bits(self.flags.get()).unwrap() + } + + pub(crate) fn is_last(&self) -> bool { + self.flags().contains(CompactedFrameFlags::LAST) + } + + pub(crate) fn set_last(&mut self) { + let mut flags = self.flags(); + flags.insert(CompactedFrameFlags::LAST); + self.flags = flags.bits().into(); + } + + pub(crate) fn reset_flags(&mut self) { + self.flags = 0.into(); + } + + pub(crate) fn compute_checksum(&self, previous: u32, data: &[u8]) -> u32 { + assert_eq!(data.len(), LIBSQL_PAGE_SIZE as usize); + let mut h = crc32fast::Hasher::new_with_initial(previous); + h.update(&self.as_bytes()[..offset_of!(Self, checksum)]); + h.update(data); + h.finalize() + } + + /// updates the checksum with the previous frame checksum and the frame data + pub(crate) fn update_checksum(&mut self, previous: u32, data: &[u8]) -> u32 { + let checksum = self.compute_checksum(previous, data); + self.checksum = checksum.into(); + checksum + } + + pub fn checksum(&self) -> u32 { + self.checksum.get() + } + + pub fn page_no(&self) -> u32 { + self.page_no.get() + } +} + +#[derive(Debug, AsBytes, FromZeroes, FromBytes)] +#[repr(C)] +pub struct CompactedFrame { + pub header: CompactedFrameHeader, + pub data: [u8; LIBSQL_PAGE_SIZE as usize], +} + +impl CompactedFrame { + pub fn header(&self) -> &CompactedFrameHeader { + &self.header + } + + pub(crate) fn header_mut(&mut self) -> &mut CompactedFrameHeader { + &mut self.header + } +} + +impl CompactedSegmentHeader { + pub fn new( + start_frame_no: u64, + end_frame_no: u64, + size_after: u32, + timestamp: DateTime, + log_id: Uuid, + ) -> Self { + Self { + magic: LIBSQL_MAGIC.into(), + version: LIBSQL_WAL_VERSION.into(), + start_frame_no: start_frame_no.into(), + end_frame_no: end_frame_no.into(), + size_after: size_after.into(), + page_size: LIBSQL_PAGE_SIZE.into(), + timestamp: (timestamp.timestamp_millis() as u64).into(), + log_id: log_id.as_u128().into(), + } + } + fn check(&self) -> Result<()> { if self.magic.get() != LIBSQL_MAGIC { return Err(super::Error::InvalidHeaderMagic); @@ -47,14 +147,8 @@ impl CompactedSegmentDataHeader { } } -#[derive(Debug, AsBytes, FromZeroes, FromBytes)] -#[repr(C)] -pub struct CompactedSegmentDataFooter { - pub(crate) checksum: lu32, -} - pub struct CompactedSegment { - header: CompactedSegmentDataHeader, + header: CompactedSegmentHeader, file: F, } @@ -69,7 +163,7 @@ impl CompactedSegment { } } - pub fn header(&self) -> &CompactedSegmentDataHeader { + pub fn header(&self) -> &CompactedSegmentHeader { &self.header } } @@ -79,23 +173,25 @@ impl CompactedSegment { let buf = ZeroCopyBuf::new_uninit(); let (buf, ret) = file.read_exact_at_async(buf, 0).await; ret?; - let header: CompactedSegmentDataHeader = buf.into_inner(); + let header: CompactedSegmentHeader = buf.into_inner(); header.check()?; Ok(Self { file, header }) } - pub(crate) fn from_parts(file: F, header: CompactedSegmentDataHeader) -> Self { + pub(crate) fn from_parts(file: F, header: CompactedSegmentHeader) -> Self { Self { header, file } } + /// read a CompactedFrame from the segment pub(crate) async fn read_frame( &self, buf: B, offset: u32, ) -> (B, io::Result<()>) { assert_eq!(buf.bytes_init(), 0); - assert_eq!(buf.bytes_total(), size_of::()); - let offset = size_of::() + size_of::() * offset as usize; + assert_eq!(buf.bytes_total(), size_of::()); + let offset = + size_of::() + size_of::() * offset as usize; let (buf, ret) = self.file.read_exact_at_async(buf, offset as u64).await; (buf, ret) } @@ -107,9 +203,9 @@ impl CompactedSegment { ) -> (B, io::Result<()>) { assert_eq!(buf.bytes_init(), 0); assert_eq!(buf.bytes_total(), LIBSQL_PAGE_SIZE as usize); - let offset = size_of::() - + size_of::() * offset as usize - + size_of::(); + let offset = size_of::() + + size_of::() * offset as usize + + size_of::(); let (buf, ret) = self.file.read_exact_at_async(buf, offset as u64).await; (buf, ret) } diff --git a/libsql-wal/src/segment/mod.rs b/libsql-wal/src/segment/mod.rs index 6a0447609f..1b250bd997 100644 --- a/libsql-wal/src/segment/mod.rs +++ b/libsql-wal/src/segment/mod.rs @@ -155,11 +155,7 @@ impl SegmentHeader { } pub trait Segment: Send + Sync + 'static { - fn compact( - &self, - out_file: &impl FileExt, - id: uuid::Uuid, - ) -> impl Future>> + Send; + fn compact(&self, out_file: &impl FileExt) -> impl Future>> + Send; fn start_frame_no(&self) -> u64; fn last_committed(&self) -> u64; fn index(&self) -> &fst::Map>; @@ -177,55 +173,6 @@ pub trait Segment: Send + Sync + 'static { fn destroy(&self, io: &IO) -> impl Future; } -impl Segment for Arc { - fn compact( - &self, - out_file: &impl FileExt, - id: uuid::Uuid, - ) -> impl Future>> + Send { - self.as_ref().compact(out_file, id) - } - - fn start_frame_no(&self) -> u64 { - self.as_ref().start_frame_no() - } - - fn last_committed(&self) -> u64 { - self.as_ref().last_committed() - } - - fn index(&self) -> &fst::Map> { - self.as_ref().index() - } - - fn read_page(&self, page_no: u32, max_frame_no: u64, buf: &mut [u8]) -> io::Result { - self.as_ref().read_page(page_no, max_frame_no, buf) - } - - fn is_checkpointable(&self) -> bool { - self.as_ref().is_checkpointable() - } - - fn size_after(&self) -> u32 { - self.as_ref().size_after() - } - - async fn read_frame_offset_async(&self, offset: u32, buf: B) -> (B, Result<()>) - where - B: IoBufMut + Send + 'static, - { - self.as_ref().read_frame_offset_async(offset, buf).await - } - - fn destroy(&self, io: &IO) -> impl Future { - self.as_ref().destroy(io) - } - - fn timestamp(&self) -> DateTime { - self.as_ref().timestamp() - } -} - #[repr(C)] #[derive(Debug, zerocopy::AsBytes, zerocopy::FromBytes, zerocopy::FromZeroes)] pub struct FrameHeader { diff --git a/libsql-wal/src/segment/sealed.rs b/libsql-wal/src/segment/sealed.rs index 236697be03..d9e480bd44 100644 --- a/libsql-wal/src/segment/sealed.rs +++ b/libsql-wal/src/segment/sealed.rs @@ -12,13 +12,15 @@ use fst::{Map, MapBuilder, Streamer}; use zerocopy::{AsBytes, FromZeroes}; use crate::error::Result; -use crate::io::buf::{IoBufMut, ZeroCopyBuf}; +use crate::io::buf::{IoBuf, IoBufMut, MapSlice, ZeroCopyBuf}; use crate::io::file::{BufCopy, FileExt}; use crate::io::Inspect; use crate::segment::{checked_frame_offset, CheckedFrame}; use crate::{LIBSQL_MAGIC, LIBSQL_WAL_VERSION}; -use super::compacted::{CompactedSegmentDataFooter, CompactedSegmentDataHeader}; +use super::compacted::{ + CompactedFrame, CompactedFrameFlags, CompactedFrameHeader, CompactedSegmentHeader, +}; use super::{frame_offset, page_offset, Frame, Segment, SegmentFlags, SegmentHeader}; /// an immutable, wal segment @@ -81,12 +83,8 @@ impl Segment for SealedSegment where F: FileExt, { - async fn compact(&self, out_file: &impl FileExt, id: uuid::Uuid) -> Result> { - let mut hasher = crc32fast::Hasher::new(); - - let header = CompactedSegmentDataHeader { - frame_count: (self.index().len() as u32).into(), - segment_id: id.as_u128().into(), + async fn compact(&self, out_file: &impl FileExt) -> Result> { + let header = CompactedSegmentHeader { start_frame_no: self.header().start_frame_no, end_frame_no: self.header().last_commited_frame_no, size_after: self.header.size_after, @@ -94,50 +92,64 @@ where magic: LIBSQL_MAGIC.into(), page_size: self.header().page_size, timestamp: self.header.sealed_at_timestamp, + log_id: self.header().log_id.into(), }; - hasher.update(header.as_bytes()); + let mut crc_init = crc32fast::hash(header.as_bytes()); + let (_, ret) = out_file .write_all_at_async(ZeroCopyBuf::new_init(header), 0) .await; ret?; + let mut count_pages = self.index().len(); let mut pages = self.index().stream(); let mut buffer = Box::new(ZeroCopyBuf::::new_uninit()); let mut out_index = fst::MapBuilder::memory(); let mut current_offset = 0; while let Some((page_no_bytes, offset)) = pages.next() { + count_pages -= 1; let (mut b, ret) = self.read_frame_offset_async(offset as _, buffer).await; ret?; // transaction boundaries in a segment are completely erased. The responsibility is on // the consumer of the segment to place the transaction boundary such that all frames from // the segment are applied within the same transaction. - b.get_mut().header_mut().set_size_after(0); - hasher.update(&b.get_ref().as_bytes()); - let dest_offset = - size_of::() + current_offset * size_of::(); - let (mut b, ret) = out_file.write_all_at_async(b, dest_offset as u64).await; - ret?; + let frame = b.get_mut(); + + let flags = if count_pages == 0 { + CompactedFrameFlags::LAST.bits() + } else { + CompactedFrameFlags::empty().bits() + }; + + let mut compacted_header = CompactedFrameHeader { + checksum: 0.into(), + flags: flags.into(), + page_no: frame.header().page_no, + frame_no: frame.header().frame_no, + }; + + crc_init = compacted_header.update_checksum(crc_init, b.get_ref().data()); + + fn map_data(b: &ZeroCopyBuf) -> &[u8] { + b.get_ref().data() + } + + let data = MapSlice::new(b, map_data); + let mut b = write_compact_frame(out_file, current_offset, compacted_header, data) + .await? + .into_inner(); + out_index .insert(page_no_bytes, current_offset as _) .unwrap(); current_offset += 1; + b.deinit(); buffer = b; } - let footer = CompactedSegmentDataFooter { - checksum: hasher.finalize().into(), - }; - - let footer_offset = - size_of::() + current_offset * size_of::(); - let (_, ret) = out_file - .write_all_at_async(ZeroCopyBuf::new_init(footer), footer_offset as _) - .await; - ret?; - Ok(out_index.into_inner().unwrap()) } @@ -209,6 +221,26 @@ where } } +async fn write_compact_frame( + file: &impl FileExt, + offset: usize, + header: CompactedFrameHeader, + data: B, +) -> Result { + let header_offset = size_of::() + offset * size_of::(); + + let fut1 = file.write_all_at_async(ZeroCopyBuf::new_init(header), header_offset as u64); + + let data_offset = header_offset + size_of::(); + let fut2 = file.write_all_at_async(data, data_offset as u64); + let ((_, ret1), (b, ret2)) = futures::join!(fut1, fut2); + + ret1?; + ret2?; + + Ok(b) +} + impl SealedSegment { pub fn open( file: Arc, diff --git a/libsql-wal/src/shared_wal.rs b/libsql-wal/src/shared_wal.rs index 2573dfd809..4087a254c9 100644 --- a/libsql-wal/src/shared_wal.rs +++ b/libsql-wal/src/shared_wal.rs @@ -1,4 +1,3 @@ -use std::collections::BTreeMap; use std::path::PathBuf; use std::sync::atomic::{AtomicBool, AtomicU64, Ordering}; use std::sync::Arc; @@ -207,12 +206,7 @@ impl SharedWal { Ok(WriteTransaction { wal_lock: self.wal_lock.clone(), - savepoints: vec![Savepoint { - current_checksum, - next_offset, - next_frame_no, - index: BTreeMap::new(), - }], + savepoints: vec![Savepoint::new(next_offset, next_frame_no, current_checksum)], next_frame_no, next_offset, current_checksum, diff --git a/libsql-wal/src/storage/async_storage.rs b/libsql-wal/src/storage/async_storage.rs index 3fc62fa56d..438d426257 100644 --- a/libsql-wal/src/storage/async_storage.rs +++ b/libsql-wal/src/storage/async_storage.rs @@ -15,7 +15,7 @@ use crate::segment::Segment; use super::backend::{Backend, FindSegmentReq}; use super::scheduler::Scheduler; -use super::{OnStoreCallback, RestoreOptions, Storage, StoreSegmentRequest}; +use super::{OnStoreCallback, Storage, StoreSegmentRequest}; /// Background loop task state. /// @@ -212,19 +212,6 @@ where Ok(meta.max_frame_no) } - async fn restore( - &self, - file: impl crate::io::FileExt, - namespace: &NamespaceName, - restore_options: RestoreOptions, - config_override: Option, - ) -> super::Result<()> { - let config = config_override.unwrap_or_else(|| self.backend.default_config()); - self.backend - .restore(&config, &namespace, restore_options, file) - .await - } - async fn find_segment( &self, namespace: &NamespaceName, diff --git a/libsql-wal/src/storage/backend/mod.rs b/libsql-wal/src/storage/backend/mod.rs index e9ceb743ea..cc90c0db75 100644 --- a/libsql-wal/src/storage/backend/mod.rs +++ b/libsql-wal/src/storage/backend/mod.rs @@ -2,14 +2,14 @@ use std::future::Future; use std::sync::Arc; +use bytes::Bytes; use chrono::{DateTime, Utc}; use fst::Map; use tokio_stream::Stream; -use uuid::Uuid; -use super::{RestoreOptions, Result, SegmentInfo, SegmentKey}; +use super::{Result, SegmentInfo, SegmentKey}; use crate::io::file::FileExt; -use crate::segment::compacted::CompactedSegmentDataHeader; +use crate::segment::compacted::{CompactedFrameHeader, CompactedSegmentHeader}; use libsql_sys::name::NamespaceName; // pub mod fs; @@ -19,7 +19,6 @@ pub mod s3; #[derive(Debug)] pub struct SegmentMeta { pub namespace: NamespaceName, - pub segment_id: Uuid, pub start_frame_no: u64, pub end_frame_no: u64, pub segment_timestamp: DateTime, @@ -52,6 +51,24 @@ pub trait Backend: Send + Sync + 'static { segment_index: Vec, ) -> impl Future> + Send; + /// Store `segment_data` with its associated `meta` + fn store_segment_data( + &self, + config: &Self::Config, + namespace: &NamespaceName, + key: &SegmentKey, + segment_data: impl Stream> + Send + Sync + 'static, + ) -> impl Future> + Send; + + /// Store `segment_data` with its associated `meta` + fn store_segment_index( + &self, + config: &Self::Config, + namespace: &NamespaceName, + key: &SegmentKey, + index: Vec, + ) -> impl Future> + Send; + fn find_segment( &self, config: &Self::Config, @@ -73,7 +90,7 @@ pub trait Backend: Send + Sync + 'static { namespace: &NamespaceName, key: &SegmentKey, file: &impl FileExt, - ) -> Result; + ) -> Result; // this method taking self: Arc is an infortunate consequence of rust type system making // impl FileExt variant with all the arguments, with no escape hatch... @@ -84,20 +101,22 @@ pub trait Backend: Send + Sync + 'static { key: SegmentKey, ) -> impl Future> + Send; - /// Fetch meta for `namespace` - fn meta( + async fn fetch_segment_data_stream( &self, - config: &Self::Config, + config: Self::Config, namespace: &NamespaceName, - ) -> impl Future> + Send; + key: SegmentKey, + ) -> Result<( + CompactedSegmentHeader, + impl Stream>, + )>; - async fn restore( + /// Fetch meta for `namespace` + fn meta( &self, config: &Self::Config, namespace: &NamespaceName, - restore_options: RestoreOptions, - dest: impl FileExt, - ) -> Result<()>; + ) -> impl Future> + Send; fn list_segments<'a>( &'a self, @@ -132,18 +151,6 @@ impl Backend for Arc { self.as_ref().default_config() } - async fn restore( - &self, - config: &Self::Config, - namespace: &NamespaceName, - restore_options: RestoreOptions, - dest: impl FileExt, - ) -> Result<()> { - self.as_ref() - .restore(config, namespace, restore_options, dest) - .await - } - async fn find_segment( &self, config: &Self::Config, @@ -170,7 +177,7 @@ impl Backend for Arc { namespace: &NamespaceName, key: &SegmentKey, file: &impl FileExt, - ) -> Result { + ) -> Result { self.as_ref() .fetch_segment_data_to_file(config, namespace, key, file) .await @@ -197,4 +204,42 @@ impl Backend for Arc { ) -> impl Stream> + 'a { self.as_ref().list_segments(config, namespace, until) } + + async fn fetch_segment_data_stream( + &self, + config: Self::Config, + namespace: &NamespaceName, + key: SegmentKey, + ) -> Result<( + CompactedSegmentHeader, + impl Stream>, + )> { + self.as_ref() + .fetch_segment_data_stream(config, namespace, key) + .await + } + + async fn store_segment_data( + &self, + config: &Self::Config, + namespace: &NamespaceName, + key: &SegmentKey, + segment_data: impl Stream> + Send + Sync + 'static, + ) -> Result<()> { + self.as_ref() + .store_segment_data(config, namespace, key, segment_data) + .await + } + + async fn store_segment_index( + &self, + config: &Self::Config, + namespace: &NamespaceName, + key: &SegmentKey, + index: Vec, + ) -> Result<()> { + self.as_ref() + .store_segment_index(config, namespace, key, index) + .await + } } diff --git a/libsql-wal/src/storage/backend/s3.rs b/libsql-wal/src/storage/backend/s3.rs index 81f7dfcbad..70b1b321b7 100644 --- a/libsql-wal/src/storage/backend/s3.rs +++ b/libsql-wal/src/storage/backend/s3.rs @@ -18,20 +18,19 @@ use bytes::{Bytes, BytesMut}; use chrono::{DateTime, Utc}; use http_body::{Frame as HttpFrame, SizeHint}; use libsql_sys::name::NamespaceName; -use roaring::RoaringBitmap; +use pin_project_lite::pin_project; use tokio::io::{AsyncBufReadExt, AsyncRead, AsyncReadExt, BufReader}; use tokio_stream::Stream; +use tokio_util::codec::Decoder; use tokio_util::sync::ReusableBoxFuture; use zerocopy::byteorder::little_endian::{U16 as lu16, U32 as lu32, U64 as lu64}; use zerocopy::{AsBytes, FromBytes, FromZeroes}; use super::{Backend, FindSegmentReq, SegmentMeta}; -use crate::io::buf::ZeroCopyBuf; use crate::io::compat::copy_to_file; use crate::io::{FileExt, Io, StdIO}; -use crate::segment::compacted::CompactedSegmentDataHeader; -use crate::segment::Frame; -use crate::storage::{Error, RestoreOptions, Result, SegmentInfo, SegmentKey}; +use crate::segment::compacted::{CompactedFrame, CompactedFrameHeader, CompactedSegmentHeader}; +use crate::storage::{Error, Result, SegmentInfo, SegmentKey}; use crate::LIBSQL_MAGIC; pub struct S3Backend { @@ -39,6 +38,7 @@ pub struct S3Backend { default_config: Arc, io: IO, } + impl S3Backend { pub async fn from_sdk_config( aws_config: SdkConfig, @@ -137,19 +137,123 @@ impl S3Backend { folder_key: &FolderKey<'_>, segment_key: &SegmentKey, file: &impl FileExt, - ) -> Result { + ) -> Result { let reader = self .fetch_segment_data_reader(config, folder_key, segment_key) .await?; let mut reader = tokio::io::BufReader::with_capacity(8196, reader); - while reader.fill_buf().await?.len() < size_of::() {} - let header = CompactedSegmentDataHeader::read_from_prefix(reader.buffer()).unwrap(); + while reader.fill_buf().await?.len() < size_of::() {} + let header = CompactedSegmentHeader::read_from_prefix(reader.buffer()).unwrap(); copy_to_file(reader, file).await?; Ok(header) } + /// returns a stream over raw CompactedFrame bytes + async fn fetch_segment_data_stream_inner( + &self, + config: &S3Config, + folder_key: &FolderKey<'_>, + segment_key: &SegmentKey, + ) -> Result<( + CompactedSegmentHeader, + impl Stream>, + )> { + let reader = self + .fetch_segment_data_reader(config, folder_key, segment_key) + .await?; + + let mut reader = BufReader::new(reader); + let mut header = CompactedSegmentHeader::new_zeroed(); + reader.read_exact(header.as_bytes_mut()).await?; + + struct FrameDecoder { + verify: bool, + finished: bool, + current_crc: u32, + } + + impl FrameDecoder { + fn new(verify: bool, header: &CompactedSegmentHeader) -> Self { + let current_crc = if verify { + crc32fast::hash(header.as_bytes()) + } else { + 0 + }; + + Self { + finished: false, + verify, + current_crc, + } + } + } + + impl Decoder for FrameDecoder { + type Item = (CompactedFrameHeader, Bytes); + type Error = Error; + + fn decode( + &mut self, + src: &mut BytesMut, + ) -> std::result::Result, Self::Error> { + if self.finished { + return Ok(None); + } + + if src.len() >= size_of::() { + let mut data = src.split_to(size_of::()); + let header_bytes = data.split_to(size_of::()); + let header = CompactedFrameHeader::read_from(&header_bytes).unwrap(); + + if header.is_last() { + self.finished = true; + } + + if self.verify { + let checksum = header.compute_checksum(self.current_crc, &data); + if checksum != header.checksum() { + todo!("invalid frame checksum") + } + self.current_crc = checksum; + } + + return Ok(Some((header, data.freeze()))); + } else { + if src.capacity() < size_of::() { + src.reserve(size_of::() - src.capacity()) + } + Ok(None) + } + } + + fn decode_eof( + &mut self, + buf: &mut BytesMut, + ) -> std::result::Result, Self::Error> { + match self.decode(buf)? { + Some(frame) => Ok(Some(frame)), + None => { + if buf.is_empty() { + Ok(None) + } else { + Err(std::io::Error::new( + std::io::ErrorKind::Other, + "bytes remaining on stream", + ) + .into()) + } + } + } + } + } + + let stream = tokio_util::codec::FramedRead::new(reader, FrameDecoder::new(true, &header)); + + Ok((header, stream)) + } + async fn s3_get(&self, config: &S3Config, key: impl ToString) -> Result { Ok(self .client @@ -327,71 +431,6 @@ impl S3Backend { } } - // This method could probably be optimized a lot by using indexes and only downloading useful - // segments - async fn restore_latest( - &self, - config: &S3Config, - namespace: &NamespaceName, - dest: impl FileExt, - ) -> Result<()> { - let folder_key = FolderKey { - cluster_id: &config.cluster_id, - namespace, - }; - let Some(latest_key) = self - .find_segment_by_frame_no(config, &folder_key, u64::MAX) - .await? - else { - tracing::info!("nothing to restore for {namespace}"); - return Ok(()); - }; - - let reader = self - .fetch_segment_data_reader(config, &folder_key, &latest_key) - .await?; - let mut reader = BufReader::new(reader); - let mut header: CompactedSegmentDataHeader = CompactedSegmentDataHeader::new_zeroed(); - reader.read_exact(header.as_bytes_mut()).await?; - let db_size = header.size_after.get(); - let mut seen = RoaringBitmap::new(); - let mut frame: Frame = Frame::new_zeroed(); - loop { - for _ in 0..header.frame_count.get() { - reader.read_exact(frame.as_bytes_mut()).await?; - let page_no = frame.header().page_no(); - if !seen.contains(page_no) { - seen.insert(page_no); - let offset = (page_no as u64 - 1) * 4096; - let buf = ZeroCopyBuf::new_init(frame).map_slice(|f| f.get_ref().data()); - let (buf, ret) = dest.write_all_at_async(buf, offset).await; - ret?; - frame = buf.into_inner().into_inner(); - } - } - - // db is restored - if seen.len() == db_size as u64 { - break; - } - - let next_frame_no = header.start_frame_no.get() - 1; - let Some(key) = self - .find_segment_by_frame_no(config, &folder_key, next_frame_no) - .await? - else { - todo!("there should be a segment!"); - }; - let r = self - .fetch_segment_data_reader(config, &folder_key, &key) - .await?; - reader = BufReader::new(r); - reader.read_exact(header.as_bytes_mut()).await?; - } - - Ok(()) - } - async fn fetch_segment_from_key( &self, config: &S3Config, @@ -451,6 +490,54 @@ impl S3Backend { } } } + + async fn store_segment_data_inner( + &self, + config: &S3Config, + namespace: &NamespaceName, + body: ByteStream, + segment_key: &SegmentKey, + ) -> Result<()> { + let folder_key = FolderKey { + cluster_id: &config.cluster_id, + namespace, + }; + let s3_data_key = s3_segment_data_key(&folder_key, segment_key); + + self.s3_put(config, s3_data_key, body).await + } + + async fn store_segment_index_inner( + &self, + config: &S3Config, + namespace: &NamespaceName, + index: Vec, + segment_key: &SegmentKey, + ) -> Result<()> { + let folder_key = FolderKey { + cluster_id: &config.cluster_id, + namespace, + }; + let s3_index_key = s3_segment_index_key(&folder_key, segment_key); + + let checksum = crc32fast::hash(&index); + let header = SegmentIndexHeader { + version: 1.into(), + len: (index.len() as u64).into(), + checksum: checksum.into(), + magic: LIBSQL_MAGIC.into(), + }; + + let mut bytes = BytesMut::with_capacity(size_of::() + index.len()); + bytes.extend_from_slice(header.as_bytes()); + bytes.extend_from_slice(&index); + + let body = ByteStream::from(bytes.freeze()); + + self.s3_put(config, s3_index_key, body).await?; + + Ok(()) + } } pub struct S3Config { @@ -547,36 +634,12 @@ where segment_data: impl FileExt, segment_index: Vec, ) -> Result<()> { - let folder_key = FolderKey { - cluster_id: &config.cluster_id, - namespace: &meta.namespace, - }; let segment_key = SegmentKey::from(&meta); - let s3_data_key = s3_segment_data_key(&folder_key, &segment_key); - let body = FileStreamBody::new(segment_data).into_byte_stream(); - - self.s3_put(config, s3_data_key, body).await?; - - let s3_index_key = s3_segment_index_key(&folder_key, &segment_key); - - let checksum = crc32fast::hash(&segment_index); - let header = SegmentIndexHeader { - version: 1.into(), - len: (segment_index.len() as u64).into(), - checksum: checksum.into(), - magic: LIBSQL_MAGIC.into(), - }; - - let mut bytes = - BytesMut::with_capacity(size_of::() + segment_index.len()); - bytes.extend_from_slice(header.as_bytes()); - bytes.extend_from_slice(&segment_index); - - let body = ByteStream::from(bytes.freeze()); - - self.s3_put(config, s3_index_key, body).await?; - + self.store_segment_data_inner(config, &meta.namespace, body, &segment_key) + .await?; + self.store_segment_index(config, &meta.namespace, &segment_key, segment_index) + .await?; Ok(()) } @@ -604,19 +667,6 @@ where self.default_config.clone() } - async fn restore( - &self, - config: &Self::Config, - namespace: &NamespaceName, - restore_options: RestoreOptions, - dest: impl FileExt, - ) -> Result<()> { - match restore_options { - RestoreOptions::Latest => self.restore_latest(config, &namespace, dest).await, - RestoreOptions::Timestamp(_) => todo!(), - } - } - async fn find_segment( &self, config: &Self::Config, @@ -660,7 +710,7 @@ where namespace: &NamespaceName, key: &SegmentKey, file: &impl FileExt, - ) -> Result { + ) -> Result { let folder_key = FolderKey { cluster_id: &config.cluster_id, namespace: &namespace, @@ -691,6 +741,84 @@ where ) -> impl Stream> + 'a { self.list_segments_inner(config, namespace, until) } + + async fn fetch_segment_data_stream( + &self, + config: Self::Config, + namespace: &NamespaceName, + key: SegmentKey, + ) -> Result<( + CompactedSegmentHeader, + impl Stream>, + )> { + let folder_key = FolderKey { + cluster_id: &config.cluster_id, + namespace: &namespace, + }; + self.fetch_segment_data_stream_inner(&config, &folder_key, &key) + .await + } + + async fn store_segment_data( + &self, + config: &Self::Config, + namespace: &NamespaceName, + segment_key: &SegmentKey, + segment_data: impl Stream> + Send + Sync + 'static, + ) -> Result<()> { + let byte_stream = StreamBody::new(segment_data); + let body = ByteStream::from_body_1_x(byte_stream); + self.store_segment_data_inner(config, namespace, body, &segment_key) + .await?; + + Ok(()) + } + + async fn store_segment_index( + &self, + config: &Self::Config, + namespace: &NamespaceName, + segment_key: &SegmentKey, + index: Vec, + ) -> Result<()> { + self.store_segment_index_inner(config, namespace, index, segment_key) + .await?; + + Ok(()) + } +} + +pin_project! { + struct StreamBody { + #[pin] + s: S, + } +} + +impl StreamBody { + fn new(s: S) -> Self { + Self { s } + } +} + +impl http_body::Body for StreamBody +where + S: Stream>, +{ + type Data = bytes::Bytes; + type Error = crate::storage::Error; + + fn poll_frame( + self: Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> Poll, Self::Error>>> { + let this = self.project(); + match std::task::ready!(this.s.poll_next(cx)) { + Some(Ok(data)) => Poll::Ready(Some(Ok(HttpFrame::data(data)))), + Some(Err(e)) => Poll::Ready(Some(Err(e))), + None => Poll::Ready(None), + } + } } #[derive(Clone, Copy)] @@ -802,7 +930,6 @@ mod tests { use fst::MapBuilder; use s3s::auth::SimpleAuth; use s3s::service::{S3ServiceBuilder, SharedS3Service}; - use uuid::Uuid; use crate::io::StdIO; @@ -870,7 +997,6 @@ mod tests { &s3_config, SegmentMeta { namespace: ns.clone(), - segment_id: Uuid::new_v4(), start_frame_no: 1u64.into(), end_frame_no: 64u64.into(), segment_timestamp: Utc::now(), @@ -892,7 +1018,6 @@ mod tests { &s3_config, SegmentMeta { namespace: ns.clone(), - segment_id: Uuid::new_v4(), start_frame_no: 64u64.into(), end_frame_no: 128u64.into(), segment_timestamp: Utc::now(), diff --git a/libsql-wal/src/storage/compaction/mod.rs b/libsql-wal/src/storage/compaction/mod.rs index d91cf27238..8157f81f98 100644 --- a/libsql-wal/src/storage/compaction/mod.rs +++ b/libsql-wal/src/storage/compaction/mod.rs @@ -1,26 +1,30 @@ +use std::io::Write as _; use std::ops::Deref; use std::path::Path; use std::path::PathBuf; use std::sync::Arc; -use chrono::DateTime; -use fst::map::OpBuilder; +use bytes::Bytes; +use fst::raw::IndexedValue; +use fst::MapBuilder; use fst::Streamer; +use futures::FutureExt as _; +use futures::Stream; +use futures::StreamExt as _; +use futures::TryStreamExt; use libsql_sys::name::NamespaceName; use libsql_sys::rusqlite::OptionalExtension; use libsql_sys::rusqlite::{self, TransactionBehavior}; +use roaring::RoaringBitmap; use tempfile::tempdir; -use tokio_stream::StreamExt; -use uuid::Uuid; +use tokio::io::AsyncWriteExt; +use tokio::task::JoinHandle; use zerocopy::AsBytes; -use crate::io::buf::ZeroCopyBuf; -use crate::io::FileExt; -use crate::segment::compacted::CompactedSegment; -use crate::segment::compacted::CompactedSegmentDataFooter; -use crate::segment::compacted::CompactedSegmentDataHeader; -use crate::segment::Frame; -use crate::storage::backend::SegmentMeta; +use crate::io::FileExt as _; +use crate::segment::compacted::CompactedFrameHeader; +use crate::segment::compacted::CompactedSegmentHeader; +use crate::LibsqlFooter; use crate::LIBSQL_MAGIC; use crate::LIBSQL_PAGE_SIZE; use crate::LIBSQL_WAL_VERSION; @@ -44,17 +48,17 @@ pub enum Error { pub struct Compactor { backend: Arc, - meta: rusqlite::Connection, + conn: rusqlite::Connection, path: PathBuf, } impl Compactor { pub fn new(backend: Arc, compactor_path: &Path) -> Result { - let meta = rusqlite::Connection::open(compactor_path.join("meta.db"))?; + let conn = rusqlite::Connection::open(compactor_path.join("meta.db"))?; // todo! set pragmas: wal + foreign key check - meta.pragma_update(None, "journal_mode", "wal")?; - meta.execute(r#"CREATE TABLE IF NOT EXISTS monitored_namespaces (id INTEGER PRIMARY KEY AUTOINCREMENT, namespace_name BLOB NOT NULL, UNIQUE(namespace_name))"#, ()).unwrap(); - meta.execute( + conn.pragma_update(None, "journal_mode", "wal")?; + conn.execute(r#"CREATE TABLE IF NOT EXISTS monitored_namespaces (id INTEGER PRIMARY KEY AUTOINCREMENT, namespace_name BLOB NOT NULL, UNIQUE(namespace_name))"#, ()).unwrap(); + conn.execute( r#"CREATE TABLE IF NOT EXISTS segments ( start_frame_no INTEGER, end_frame_no INTEGER, @@ -68,7 +72,7 @@ impl Compactor { Ok(Self { backend, - meta, + conn, path: compactor_path.into(), }) } @@ -77,7 +81,7 @@ impl Compactor { where B: Backend, { - let tx = self.meta.transaction()?; + let tx = self.conn.transaction()?; let id = { let mut stmt = tx.prepare_cached("INSERT OR IGNORE INTO monitored_namespaces(namespace_name) VALUES (?) RETURNING id")?; stmt.query_row([namespace.as_str()], |r| r.get(0)) @@ -94,7 +98,7 @@ impl Compactor { } pub fn analyze(&self, namespace: &NamespaceName) -> Result { - let mut stmt = self.meta.prepare_cached( + let mut stmt = self.conn.prepare_cached( r#" SELECT start_frame_no, end_frame_no, timestamp FROM segments as s @@ -127,7 +131,7 @@ impl Compactor { &self, namespace: &NamespaceName, ) -> Result> { - segments_range(&self.meta, namespace) + segments_range(&self.conn, namespace) } /// Polls storage for new frames since last sync @@ -148,7 +152,7 @@ impl Compactor { B: Backend, { let tx = self - .meta + .conn .transaction_with_behavior(TransactionBehavior::Immediate)?; { let mut stmt = tx.prepare("SELECT namespace_name, id FROM monitored_namespaces")?; @@ -170,7 +174,7 @@ impl Compactor { B: Backend, { let tx = self - .meta + .conn .transaction_with_behavior(TransactionBehavior::Immediate)?; { let mut stmt = @@ -188,190 +192,296 @@ impl Compactor { Ok(()) } - async fn fetch( - &self, - set: &SegmentSet, - into: &Path, - ) -> Result<( - Vec>, - Vec>>, - )> + /// `dedup_frames` takes a segment set and returns a stream of deduplicated raw frames, + /// containing the most recent version of every frame in the segments covered by the set. + /// + /// if out_index is passed, the index of the new segement is generated and put there + /// + /// the progress callback is called with (count_pages, total_pages), as new pages are found + /// + /// returns a stream of the most recent deduplicated frames, and the size_after for that new + /// segment + async fn dedup_stream<'a>( + &'a self, + set: SegmentSet, + out_index: Option<&'a mut MapBuilder>>, + mut progress: impl FnMut(u32, u32) + 'a, + ) -> ( + impl Stream> + 'a, + CompactedSegmentHeader, + ) where B: Backend, { - let mut indexes = Vec::with_capacity(set.len()); - let mut segments = Vec::with_capacity(set.len()); - for key in set.iter() { - let file = std::fs::File::options() - .create_new(true) - .write(true) - .read(true) - .open(into.join(&format!("{key}.data"))) - .unwrap(); - let header = self - .backend - .fetch_segment_data_to_file( - &self.backend.default_config(), - &set.namespace, - key, - &file, - ) - .await - .unwrap(); - let index = self - .backend - .fetch_segment_index(&self.backend.default_config(), &set.namespace, key) - .await - .unwrap(); - indexes.push(index); - segments.push(CompactedSegment::from_parts(file, header)); - } + let (snd, rcv) = tokio::sync::oneshot::channel(); + let mut snd = Some(snd); + + let stream = async_stream::try_stream! { + assert!(!set.is_empty()); + let tmp = tempdir()?; + let config = self.backend.default_config(); + // We fetch indexes in reverse order so that the most recent index comes first + let indexes_stream = futures::stream::iter(set.iter().rev()).map(|k| { + self + .backend + .fetch_segment_index(&config, &set.namespace, k) + .map(|i| i.map(|i| (i, *k))) + }) + // we download indexes in the background as we read from their data files to reduce + // latencies + .buffered(4); + + tokio::pin!(indexes_stream); + + let mut size_after = u64::MAX; + let mut seen_pages = RoaringBitmap::new(); + // keep track of the indexes for segments that we took frames from. This is a vec of + // memory mapped segments, sorted by descending segment timestamp. + let mut saved_indexes = Vec::new(); + // this map keeps a mapping from index in the saved indexed to the count of frames + // taken from that segment. It is necessary to rebuild the new index and compute the + // actual position of a frame in the streamed segement. + let mut index_offset_mapping = Vec::new(); + let mut page_count = 0; + let mut current_crc = 0; + + while let Some((index, key)) = indexes_stream.try_next().await? { + let mut s = index.stream(); + // how many frames to take from that segment + let mut frames_to_take = 0; + while let Some((pno, _)) = s.next() { + let pno = u32::from_be_bytes(pno.try_into().unwrap()); + if !seen_pages.contains(pno) { + // this segment contains data that we haven't seen before, download that + // segment + frames_to_take += 1; + } + tokio::task::consume_budget().await; + } + + tracing::debug!(key = ?key, "taking {} frames from segment", frames_to_take); + + // no frames to take + if frames_to_take == 0 { continue } + + // we need to build an index at the end, so we keep the indexes. + // To reduce the amount of RAM needed to handle all the potential indexes, we write + // it to disk, and map the file. + if out_index.is_some() { + let mut index_file = std::fs::File::options() + .create(true) + .read(true) + .write(true) + .open(tmp.path().join(&format!("{key}")))?; + index_file.write_all(index.as_fst().as_bytes())?; + let map = unsafe { memmap::Mmap::map(&index_file)? }; + let index = fst::Map::new(map).unwrap(); + saved_indexes.push(index); + index_offset_mapping.push(page_count); + } + + let (segment_header, frames) = self.backend.fetch_segment_data_stream(config.clone(), &set.namespace, key).await?; + + if size_after == u64::MAX { + size_after = segment_header.size_after() as u64; + let key = set.compact_key().expect("we asserted that the segment wasn't empty"); + let segment_header = CompactedSegmentHeader::new(key.start_frame_no, key.end_frame_no, size_after as u32, key.timestamp(), uuid::Uuid::from_u128(segment_header.log_id.get())); + current_crc = crc32fast::hash(segment_header.as_bytes()); + let _ = snd.take().unwrap().send(segment_header); + } + + tokio::pin!(frames); + + let mut frames_taken = 0; + while let Some((mut frame_header, frame_data)) = frames.try_next().await ? { + // we took all the frames that we needed from that segment, no need to read the + // rest of it + if frames_taken == frames_to_take { + break + } + + + if seen_pages.insert(frame_header.page_no()) { + frames_taken += 1; + page_count += 1; + let is_last = if page_count == size_after { + frame_header.set_last(); + true + } else { + frame_header.reset_flags(); + false + }; + + current_crc = frame_header.update_checksum(current_crc, &frame_data); + progress(page_count as u32, size_after as _); + yield (frame_header, frame_data); + if is_last { + break + } + } + } + } + + // now, we need to construct the index. + if let Some(out_index) = out_index { + let op_builder = saved_indexes.iter().collect::(); + let mut union = op_builder.union(); + while let Some((pno, idxs)) = union.next() { + let &IndexedValue { index, .. } = idxs.iter().min_by_key(|idx| idx.index).unwrap(); + let offset = index_offset_mapping[index]; + index_offset_mapping[index] += 1; + out_index.insert(pno, offset as _).unwrap(); + tokio::task::consume_budget().await; + } + } + }.peekable(); - Ok((segments, indexes)) + let mut stream = Box::pin(stream); + let header = { + stream.as_mut().peek().await; + rcv.await.unwrap() + }; + + (stream, header) } - pub async fn compact(&self, set: SegmentSet) -> Result<()> + /// compact the passed segment set to out_path if provided, otherwise, uploads it to the + /// backend + pub async fn compact( + &self, + set: SegmentSet, + out_path: Option<&Path>, + progress: impl FnMut(u32, u32), + ) -> Result<()> where B: Backend, { - assert!(!set.is_empty()); - let tmp = tempdir().unwrap(); - // FIXME: bruteforce: we don't necessarily need to download all the segments to cover all - // the changes. Iterating backward over the set items and filling the gaps in the pages - // range would, in theory, be sufficient - // another alternative is to download all the indexes, and lazily download the segment data - // TODO: fetch conccurently - let (segments, indexes) = self.fetch(&set, tmp.path()).await?; - let last_header = segments.last().expect("non-empty set").header(); - - // It's unfortunate that we need to know the number of frames in the final segment ahead of - // time, but it's necessary for computing the checksum. This seems like the least costly - // methods (over recomputing the whole checksum). - let mut union = OpBuilder::from_iter(indexes.iter()).union(); - let mut count = 0; - while let Some(_) = union.next() { - count += 1; - } + let Some(new_key) = set.compact_key() else { + return Ok(()); + }; - let mut hasher = crc32fast::Hasher::new(); + let mut builder = MapBuilder::new(Vec::new()).unwrap(); - let out_file = std::fs::File::options() - .create_new(true) - .write(true) - .read(true) - .open(tmp.path().join("out")) - .unwrap(); - let header = CompactedSegmentDataHeader { - frame_count: (count as u32).into(), - segment_id: Uuid::new_v4().to_u128_le().into(), - start_frame_no: set.range().expect("non-empty set").0.into(), - end_frame_no: set.range().expect("non-empty set").1.into(), - size_after: last_header.size_after, - version: LIBSQL_WAL_VERSION.into(), - magic: LIBSQL_MAGIC.into(), - page_size: last_header.page_size, - // the new compacted segment inherit the last segment timestamp: it contains the same - // logical data. - timestamp: last_header.timestamp, + let (sender, mut receiver) = tokio::sync::mpsc::channel::>(1); + let handle: JoinHandle> = match out_path { + Some(path) => { + let path = path.join(&format!("{new_key}.seg")); + let mut data_file = tokio::fs::File::create(path).await?; + tokio::task::spawn(async move { + while let Some(data) = receiver.recv().await { + let data = data?; + data_file.write_all(&data).await?; + } + + data_file.flush().await?; + + Ok(()) + }) + } + None => { + let backend = self.backend.clone(); + let config = self.backend.default_config(); + let ns = set.namespace.clone(); + let key = new_key.clone(); + tokio::task::spawn(async move { + backend + .store_segment_data( + &config, + &ns, + &key, + tokio_stream::wrappers::ReceiverStream::new(receiver), + ) + .await?; + Ok(()) + }) + } }; - hasher.update(header.as_bytes()); - let (_, ret) = out_file - .write_all_at_async(ZeroCopyBuf::new_init(header), 0) + let (stream, segment_header) = self + .dedup_stream(set.clone(), Some(&mut builder), progress) .await; - ret?; - - let mut union = OpBuilder::from_iter(indexes.iter()).union(); - let mut buffer = Box::new(ZeroCopyBuf::::new_uninit()); - let mut out_index = fst::MapBuilder::memory(); - let mut current_offset = 0; - - while let Some((page_no_bytes, indexed_offsets)) = union.next() { - let (index, offset) = indexed_offsets - .iter() - .max_by_key(|v| v.index) - .map(|v| (v.index, v.value)) - .expect("union returned something, must be non-empty"); - let segment = &segments[index]; - let (frame, ret) = segment.read_frame(buffer, offset as u32).await; - ret?; - hasher.update(&frame.get_ref().as_bytes()); - let dest_offset = - size_of::() + current_offset * size_of::(); - let (mut frame, ret) = out_file.write_all_at_async(frame, dest_offset as u64).await; - ret?; - out_index - .insert(page_no_bytes, current_offset as _) - .unwrap(); - current_offset += 1; - frame.deinit(); - buffer = frame; + + sender + .send(Ok(Bytes::copy_from_slice(segment_header.as_bytes()))) + .await + .unwrap(); + + { + tokio::pin!(stream); + loop { + match stream.next().await { + Some(Ok((frame_header, frame_data))) => { + sender + .send(Ok(Bytes::copy_from_slice(frame_header.as_bytes()))) + .await + .unwrap(); + sender.send(Ok(frame_data)).await.unwrap(); + } + Some(Err(_e)) => { + panic!() + // sender.send(Err(e.into())).await.unwrap(); + } + None => break, + } + } + drop(sender); } - let footer = CompactedSegmentDataFooter { - checksum: hasher.finalize().into(), - }; + handle.await.unwrap()?; - let footer_offset = - size_of::() + current_offset * size_of::(); - let (_, ret) = out_file - .write_all_at_async(ZeroCopyBuf::new_init(footer), footer_offset as _) - .await; - ret?; - - let (start, end) = set.range().expect("non-empty set"); - let timestamp = DateTime::from_timestamp_millis(set.last().unwrap().timestamp as _) - .unwrap() - .to_utc(); - self.backend - .store( - &self.backend.default_config(), - SegmentMeta { - namespace: set.namespace.clone(), - segment_id: Uuid::new_v4(), - start_frame_no: start, - end_frame_no: end, - segment_timestamp: timestamp, - }, - out_file, - out_index.into_inner().unwrap(), - ) - .await?; + let index = builder.into_inner().unwrap(); + match out_path { + Some(path) => { + let mut index_file = + tokio::fs::File::create(path.join(&format!("{new_key}.idx"))).await?; + index_file.write_all(&index).await?; + index_file.flush().await?; + } + None => { + self.backend + .store_segment_index( + &self.backend.default_config(), + &set.namespace, + &new_key, + index, + ) + .await?; + } + } Ok(()) } /// Restore a datatase file from a segment set /// set must start at frame_no 1 - pub async fn restore(&self, set: SegmentSet, to: impl AsRef) -> Result<()> + pub async fn restore( + &self, + set: SegmentSet, + to: impl AsRef, + progress: impl FnMut(u32, u32), + ) -> Result<()> where B: Backend, { - if set.is_empty() { - return Ok(()); - } - assert_eq!(set.range().unwrap().0, 1); - let tmp = tempdir()?; - let (segments, indexes) = self.fetch(&set, tmp.path()).await?; - let mut union = OpBuilder::from_iter(indexes.iter()).union(); - let mut buffer = Vec::with_capacity(LIBSQL_PAGE_SIZE as usize); - let out_file = std::fs::File::create(to)?; - - while let Some((page_no_bytes, indexed_offsets)) = union.next() { - let page_no = u32::from_be_bytes(page_no_bytes.try_into().unwrap()); - let (index, offset) = indexed_offsets - .iter() - .max_by_key(|v| v.index) - .map(|v| (v.index, v.value as u32)) - .expect("union returned something, must be non-empty"); - let segment = &segments[index]; - let (b, ret) = segment.read_page(buffer, offset).await; - ret?; - let offset = (page_no as u64 - 1) * LIBSQL_PAGE_SIZE as u64; - let (mut b, ret) = out_file.write_all_at_async(b, offset).await; + let file = std::fs::File::create(to)?; + let (stream, header) = self.dedup_stream(set.clone(), None, progress).await; + let _footer = LibsqlFooter { + magic: LIBSQL_MAGIC.into(), + version: LIBSQL_WAL_VERSION.into(), + replication_index: set.range().unwrap().1.into(), + log_id: header.log_id.get().into(), + }; + + tokio::pin!(stream); + + while let Some((frame_header, frame_data)) = stream.try_next().await? { + let (_, ret) = file + .write_all_at_async( + frame_data, + LIBSQL_PAGE_SIZE as u64 * (frame_header.page_no() as u64 - 1), + ) + .await; ret?; - b.clear(); - buffer = b; } Ok(()) @@ -382,15 +492,19 @@ impl Compactor { namespace: &NamespaceName, f: impl FnMut(SegmentInfo), ) -> Result<()> { - list_segments(&self.meta, namespace, f) + list_segments(&self.conn, namespace, f) } pub fn list_monitored_namespaces(&self, f: impl FnMut(NamespaceName)) -> Result<()> { - list_namespace(&self.meta, f) + list_namespace(&self.conn, f) } pub fn unmonitor(&self, ns: &NamespaceName) -> Result<()> { - unmonitor(&self.meta, ns) + unmonitor(&self.conn, ns) + } + + pub fn segment_info(&self, ns: &NamespaceName, key: SegmentKey) -> Result { + segment_infos(&self.conn, ns, key) } } @@ -471,6 +585,17 @@ impl SegmentSet { .zip(self.segments.last()) .map(|(f, l)| (f.start_frame_no, l.end_frame_no)) } + + pub fn compact_key(&self) -> Option { + match self.segments.first().zip(self.segments.last()) { + Some((f, l)) => Some(SegmentKey { + start_frame_no: f.start_frame_no, + end_frame_no: l.end_frame_no, + timestamp: l.timestamp, + }), + None => None, + } + } } impl Deref for SegmentSet { @@ -653,3 +778,18 @@ fn unmonitor(conn: &rusqlite::Connection, namespace: &NamespaceName) -> Result<( )?; Ok(()) } + +fn segment_infos( + conn: &rusqlite::Connection, + namespace: &NamespaceName, + key: SegmentKey, +) -> Result { + let mut stmt = conn.prepare("SELECT size FROM segments AS s JOIN monitored_namespaces AS ns WHERE s.start_frame_no=? AND s.end_frame_no=? AND ns.namespace_name=? LIMIT 1")?; + let mut rows = stmt.query((key.start_frame_no, key.end_frame_no, namespace.as_str()))?; + + let row = rows.next()?.unwrap(); + Ok(SegmentInfo { + key, + size: row.get(0)?, + }) +} diff --git a/libsql-wal/src/storage/job.rs b/libsql-wal/src/storage/job.rs index eb173ca0ec..b3351db023 100644 --- a/libsql-wal/src/storage/job.rs +++ b/libsql-wal/src/storage/job.rs @@ -51,7 +51,6 @@ where IO: Io, { let segment = &self.request.segment; - let segment_id = io.uuid(); let tmp = io.tempfile()?; tracing::debug!( @@ -59,13 +58,9 @@ where "sending segment to durable storage" ); - let new_index = segment - .compact(&tmp, segment_id) - .await - .map_err(super::Error::Compact)?; + let new_index = segment.compact(&tmp).await.map_err(super::Error::Compact)?; let meta = SegmentMeta { - segment_id, namespace: self.request.namespace.clone(), start_frame_no: segment.start_frame_no(), end_frame_no: segment.last_committed(), @@ -97,196 +92,3 @@ pub(crate) struct JobResult { /// The outcome of the job: the new durable index, or an error. pub(crate) result: Result, } - -#[cfg(test)] -mod test { - use std::future::ready; - use std::str::FromStr; - use std::sync::Arc; - - use chrono::prelude::DateTime; - use chrono::Utc; - use uuid::Uuid; - - use crate::io::file::FileExt; - use crate::io::StdIO; - use crate::segment::compacted::CompactedSegmentDataHeader; - use crate::storage::backend::FindSegmentReq; - use crate::storage::{RestoreOptions, SegmentKey}; - use libsql_sys::name::NamespaceName; - - use super::*; - - #[tokio::test] - async fn perform_job() { - #[derive(Debug)] - struct TestSegment; - - impl Segment for TestSegment { - async fn compact( - &self, - out_file: &impl FileExt, - id: uuid::Uuid, - ) -> crate::error::Result> { - out_file.write_all_at(id.to_string().as_bytes(), 0).unwrap(); - Ok(b"test_index".into()) - } - - fn start_frame_no(&self) -> u64 { - 1 - } - - fn last_committed(&self) -> u64 { - 10 - } - - fn index(&self) -> &fst::Map> { - todo!() - } - - fn read_page( - &self, - _page_no: u32, - _max_frame_no: u64, - _buf: &mut [u8], - ) -> std::io::Result { - todo!() - } - - fn is_checkpointable(&self) -> bool { - todo!() - } - - fn size_after(&self) -> u32 { - todo!() - } - - async fn read_frame_offset_async( - &self, - _offset: u32, - _buf: B, - ) -> (B, crate::error::Result<()>) - where - B: crate::io::buf::IoBufMut + Send + 'static, - { - todo!() - } - - fn destroy(&self, _io: &IO) -> impl std::future::Future { - async move { todo!() } - } - - fn timestamp(&self) -> DateTime { - Utc::now() - } - } - - struct TestBackend; - - impl Backend for TestBackend { - type Config = (); - - async fn store( - &self, - _config: &Self::Config, - meta: SegmentMeta, - segment_data: impl FileExt, - segment_index: Vec, - ) -> Result<()> { - // verify that the stored segment is the same as the one we compacted - assert_eq!(segment_index, b"test_index"); - let mut buf = vec![0; Uuid::new_v4().to_string().len()]; - segment_data.read_exact_at(&mut buf, 0).unwrap(); - let id = Uuid::from_str(std::str::from_utf8(&buf).unwrap()).unwrap(); - assert_eq!(meta.segment_id, id); - - Ok(()) - } - - async fn meta( - &self, - _config: &Self::Config, - _namespace: &NamespaceName, - ) -> Result { - todo!() - } - - fn default_config(&self) -> Self::Config { - () - } - - async fn restore( - &self, - _config: &Self::Config, - _namespace: &NamespaceName, - _restore_options: RestoreOptions, - _dest: impl FileExt, - ) -> Result<()> { - todo!() - } - - async fn find_segment( - &self, - _config: &Self::Config, - _namespace: &NamespaceName, - _frame_no: FindSegmentReq, - ) -> Result { - todo!() - } - - async fn fetch_segment_index( - &self, - _config: &Self::Config, - _namespace: &NamespaceName, - _key: &SegmentKey, - ) -> Result>> { - todo!() - } - - async fn fetch_segment_data_to_file( - &self, - _config: &Self::Config, - _namespace: &NamespaceName, - _key: &SegmentKey, - _file: &impl FileExt, - ) -> Result { - todo!() - } - - async fn fetch_segment_data( - self: Arc, - _config: Self::Config, - _namespace: NamespaceName, - _key: SegmentKey, - ) -> Result { - Ok(std::fs::File::open("").unwrap()) - } - - fn list_segments<'a>( - &'a self, - _config: Self::Config, - _namespace: &'a NamespaceName, - _until: u64, - ) -> impl tokio_stream::Stream> + 'a - { - tokio_stream::iter(std::iter::from_fn(|| todo!())) - } - } - - let job = Job { - request: IndexedRequest { - request: StoreSegmentRequest { - namespace: "test".into(), - segment: TestSegment, - created_at: Utc::now(), - storage_config_override: None, - on_store_callback: Box::new(|_| Box::pin(ready(()))), - }, - id: 0, - }, - }; - - let result = job.perform(TestBackend, StdIO(())).await; - assert_eq!(result.result.unwrap(), 10); - } -} diff --git a/libsql-wal/src/storage/mod.rs b/libsql-wal/src/storage/mod.rs index 86f09abaae..62d7282a6f 100644 --- a/libsql-wal/src/storage/mod.rs +++ b/libsql-wal/src/storage/mod.rs @@ -196,14 +196,6 @@ pub trait Storage: Send + Sync + 'static { config_override: Option, ) -> impl Future> + Send; - async fn restore( - &self, - file: impl FileExt, - namespace: &NamespaceName, - restore_options: RestoreOptions, - config_override: Option, - ) -> Result<()>; - fn find_segment( &self, namespace: &NamespaceName, @@ -290,19 +282,6 @@ where } } - async fn restore( - &self, - file: impl FileExt, - namespace: &NamespaceName, - restore_options: RestoreOptions, - config_override: Option, - ) -> Result<()> { - match zip(self, config_override) { - Either::A((s, c)) => s.restore(file, namespace, restore_options, c).await, - Either::B((s, c)) => s.restore(file, namespace, restore_options, c).await, - } - } - fn find_segment( &self, namespace: &NamespaceName, @@ -402,16 +381,6 @@ impl Storage for NoStorage { Ok(u64::MAX) } - async fn restore( - &self, - _file: impl FileExt, - _namespace: &NamespaceName, - _restore_options: RestoreOptions, - _config_override: Option, - ) -> Result<()> { - panic!("can restore from no storage") - } - async fn find_segment( &self, _namespace: &NamespaceName, @@ -515,11 +484,10 @@ impl Storage for TestStorage { ) { let mut inner = self.inner.lock_blocking(); if inner.store { - let id = uuid::Uuid::new_v4(); - let out_path = inner.dir.path().join(id.to_string()); + let out_path = inner.dir.path().join(uuid::Uuid::new_v4().to_string()); let out_file = inner.io.open(true, true, true, &out_path).unwrap(); let index = tokio::runtime::Handle::current() - .block_on(seg.compact(&out_file, id)) + .block_on(seg.compact(&out_file)) .unwrap(); let end_frame_no = seg.header().last_committed(); let key = SegmentKey { @@ -551,16 +519,6 @@ impl Storage for TestStorage { Ok(u64::MAX) } - async fn restore( - &self, - _file: impl FileExt, - _namespace: &NamespaceName, - _restore_options: RestoreOptions, - _config_override: Option, - ) -> Result<()> { - todo!(); - } - async fn find_segment( &self, namespace: &NamespaceName, diff --git a/libsql-wal/src/transaction.rs b/libsql-wal/src/transaction.rs index a01a7fcff6..519bc6620f 100644 --- a/libsql-wal/src/transaction.rs +++ b/libsql-wal/src/transaction.rs @@ -1,5 +1,6 @@ use std::collections::BTreeMap; use std::ops::{Deref, DerefMut}; +use std::sync::atomic::AtomicU64; use std::sync::Arc; use std::time::Instant; @@ -131,6 +132,26 @@ pub struct Savepoint { pub index: BTreeMap, } +pub static SAVEPOINT_COUNTER: AtomicU64 = AtomicU64::new(0); + +impl Savepoint { + pub fn new(next_offset: u32, next_frame_no: u64, current_checksum: u32) -> Self { + SAVEPOINT_COUNTER.fetch_add(1, std::sync::atomic::Ordering::SeqCst); + Self { + next_offset, + next_frame_no, + current_checksum, + index: Default::default(), + } + } +} + +impl Drop for Savepoint { + fn drop(&mut self) { + SAVEPOINT_COUNTER.fetch_sub(1, std::sync::atomic::Ordering::SeqCst); + } +} + /// The savepoints must be passed from most recent to oldest pub(crate) fn merge_savepoints<'a>( savepoints: impl Iterator>, @@ -224,12 +245,11 @@ impl WriteTransaction { pub fn savepoint(&mut self) -> usize { let savepoint_id = self.savepoints.len(); - self.savepoints.push(Savepoint { - next_offset: self.next_offset, - next_frame_no: self.next_frame_no, - index: BTreeMap::new(), - current_checksum: self.current_checksum, - }); + self.savepoints.push(Savepoint::new( + self.next_offset, + self.next_frame_no, + self.current_checksum, + )); savepoint_id }