Skip to content

Commit

Permalink
Merge branch 'main' into deserialize
Browse files Browse the repository at this point in the history
  • Loading branch information
sjunepark authored Oct 11, 2024
2 parents 7b9ec4e + 7521bc0 commit 9b5cc27
Show file tree
Hide file tree
Showing 36 changed files with 988 additions and 516 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

72 changes: 46 additions & 26 deletions bottomless/src/replicator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ pub struct Replicator {
upload_progress: Arc<Mutex<CompletionProgress>>,
last_uploaded_frame_no: Receiver<u32>,
skip_snapshot: bool,
skip_shutdown_upload: bool,
}

#[derive(Debug)]
Expand Down Expand Up @@ -122,6 +123,8 @@ pub struct Options {
pub s3_max_retries: u32,
/// Skip snapshot upload per checkpoint.
pub skip_snapshot: bool,
/// Skip uploading snapshots on shutdown
pub skip_shutdown_upload: bool,
}

impl Options {
Expand Down Expand Up @@ -238,6 +241,10 @@ impl Options {
Some(key) => Some(EncryptionConfig::new(cipher, key)),
None => None,
};

let skip_shutdown_upload =
env_var_or("LIBSQL_BOTTOMLESS_SKIP_SHUTDOWN_UPLOAD", false).parse::<bool>()?;

Ok(Options {
db_id,
create_bucket_if_not_exists: true,
Expand All @@ -255,6 +262,7 @@ impl Options {
bucket_name,
s3_max_retries,
skip_snapshot,
skip_shutdown_upload,
})
}
}
Expand Down Expand Up @@ -343,6 +351,12 @@ impl Replicator {
};
tracing::debug!("Database path: '{}', name: '{}'", db_path, db_name);

let skip_shutdown_upload = options.skip_shutdown_upload;

if skip_shutdown_upload {
tracing::warn!("skipping upload on shutdown");
}

let (flush_trigger, mut flush_trigger_rx) = channel(());
let (last_committed_frame_no_sender, last_committed_frame_no) = channel(Ok(0));

Expand Down Expand Up @@ -498,6 +512,7 @@ impl Replicator {
join_set,
upload_progress,
last_uploaded_frame_no,
skip_shutdown_upload,
})
}

Expand Down Expand Up @@ -529,33 +544,38 @@ impl Replicator {
}

pub async fn shutdown_gracefully(&mut self) -> Result<()> {
tracing::info!("bottomless replicator: shutting down...");
// 1. wait for all committed WAL frames to be committed locally
let last_frame_no = self.last_known_frame();
// force flush in order to not wait for periodic wake up of local back up process
if let Some(tx) = &self.flush_trigger {
let _ = tx.send(());
}
self.wait_until_committed(last_frame_no).await?;
tracing::info!(
"bottomless replicator: local backup replicated frames until {}",
last_frame_no
);
// 2. wait for snapshot upload to S3 to finish
self.wait_until_snapshotted().await?;
tracing::info!("bottomless replicator: snapshot succesfully uploaded to S3");
// 3. drop flush trigger, which will cause WAL upload loop to close. Since this action will
// close the channel used by wait_until_committed, it must happen after wait_until_committed
// has finished. If trigger won't be dropped, tasks from join_set will never finish.
self.flush_trigger.take();
// 4. drop shutdown trigger which will notify S3 upload process to stop all retry attempts
// and finish upload process
self.shutdown_trigger.take();
while let Some(t) = self.join_set.join_next().await {
// one of the tasks we're waiting for is upload of local WAL segment from pt.1 to S3
// this should ensure that all WAL frames are one S3
t?;
if !self.skip_shutdown_upload {
tracing::info!("bottomless replicator: shutting down...");
// 1. wait for all committed WAL frames to be committed locally
let last_frame_no = self.last_known_frame();
// force flush in order to not wait for periodic wake up of local back up process
if let Some(tx) = &self.flush_trigger {
let _ = tx.send(());
}
self.wait_until_committed(last_frame_no).await?;
tracing::info!(
"bottomless replicator: local backup replicated frames until {}",
last_frame_no
);
// 2. wait for snapshot upload to S3 to finish
self.wait_until_snapshotted().await?;
tracing::info!("bottomless replicator: snapshot succesfully uploaded to S3");
// 3. drop flush trigger, which will cause WAL upload loop to close. Since this action will
// close the channel used by wait_until_committed, it must happen after wait_until_committed
// has finished. If trigger won't be dropped, tasks from join_set will never finish.
self.flush_trigger.take();
// 4. drop shutdown trigger which will notify S3 upload process to stop all retry attempts
// and finish upload process
self.shutdown_trigger.take();
while let Some(t) = self.join_set.join_next().await {
// one of the tasks we're waiting for is upload of local WAL segment from pt.1 to S3
// this should ensure that all WAL frames are one S3
t?;
}
} else {
tracing::warn!("skipping snapshot upload during shutdown");
}

tracing::info!("bottomless replicator: shutdown complete");
Ok(())
}
Expand Down
13 changes: 7 additions & 6 deletions libsql-replication/src/injector/libsql_injector.rs
Original file line number Diff line number Diff line change
@@ -1,26 +1,27 @@
use std::mem::size_of;

use libsql_wal::io::StdIO;
use libsql_wal::replication::injector::Injector;
use libsql_wal::segment::sealed::SealedSegment;
use libsql_wal::segment::Frame as WalFrame;
use libsql_wal::{io::StdIO, storage::Storage};
use zerocopy::{AsBytes, FromZeroes};

use crate::frame::FrameNo;
use crate::rpc::replication::Frame as RpcFrame;

use super::error::{Error, Result};

pub struct LibsqlInjector {
injector: Injector<StdIO>,
pub struct LibsqlInjector<S> {
injector: Injector<StdIO, S>,
}

impl LibsqlInjector {
pub fn new(injector: Injector<StdIO>) -> Self {
impl<S> LibsqlInjector<S> {
pub fn new(injector: Injector<StdIO, S>) -> Self {
Self { injector }
}
}

impl super::Injector for LibsqlInjector {
impl<S: Storage<Segment = SealedSegment<std::fs::File>>> super::Injector for LibsqlInjector<S> {
async fn inject_frame(&mut self, frame: RpcFrame) -> Result<Option<FrameNo>> {
// this is a bit annoying be we want to read the frame, and it has to be aligned, so we
// must copy it...
Expand Down
2 changes: 1 addition & 1 deletion libsql-server/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "libsql-server"
version = "0.24.26"
version = "0.24.27"
edition = "2021"
default-run = "sqld"
repository = "https://github.com/tursodatabase/libsql"
Expand Down
4 changes: 2 additions & 2 deletions libsql-server/src/connection/connection_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,13 @@ pub type ConnId = u64;
pub type InnerWalManager =
Either3<Sqlite3WalManager, LibsqlWalManager<StdIO, SqldStorage>, DurableWalManager>;
#[cfg(feature = "durable-wal")]
pub type InnerWal = Either3<Sqlite3Wal, LibsqlWal<StdIO>, DurableWal>;
pub type InnerWal = Either3<Sqlite3Wal, LibsqlWal<StdIO, SqldStorage>, DurableWal>;

#[cfg(not(feature = "durable-wal"))]
pub type InnerWalManager = Either<Sqlite3WalManager, LibsqlWalManager<StdIO, SqldStorage>>;

#[cfg(not(feature = "durable-wal"))]
pub type InnerWal = Either<Sqlite3Wal, LibsqlWal<StdIO>>;
pub type InnerWal = Either<Sqlite3Wal, LibsqlWal<StdIO, SqldStorage>>;
pub type ManagedConnectionWal = WrappedWal<ManagedConnectionWalWrapper, InnerWal>;

#[derive(Copy, Clone, Debug)]
Expand Down
2 changes: 1 addition & 1 deletion libsql-server/src/connection/libsql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ impl MakeConnection for MakeLibsqlConnection {

#[derive(Clone)]
pub struct LibsqlConnection {
inner: Arc<Mutex<CoreConnection<LibsqlWal<StdIO>>>>,
inner: Arc<Mutex<CoreConnection<LibsqlWal<StdIO, SqldStorage>>>>,
}

impl LibsqlConnection {
Expand Down
2 changes: 1 addition & 1 deletion libsql-server/src/namespace/configurator/libsql_fork.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ pub(crate) async fn libsql_wal_fork(
}

async fn try_inject(
to_shared: Arc<SharedWal<StdIO>>,
to_shared: Arc<SharedWal<StdIO, SqldStorage>>,
stream: &mut Pin<
Box<dyn Stream<Item = Result<Box<Frame>, libsql_wal::replication::Error>> + Send + '_>,
>,
Expand Down
1 change: 1 addition & 0 deletions libsql-server/src/namespace/meta_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ pub async fn metastore_connection_maker(
s3_max_parallelism: 32,
s3_max_retries: 10,
skip_snapshot: false,
skip_shutdown_upload: false,
};
let mut replicator = bottomless::replicator::Replicator::with_options(
db_path.join("data").to_str().unwrap(),
Expand Down
5 changes: 3 additions & 2 deletions libsql-server/src/replication/replicator_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,11 @@ use crate::metrics::{
use crate::namespace::meta_store::MetaStoreHandle;
use crate::namespace::{NamespaceName, NamespaceStore};
use crate::replication::FrameNo;
use crate::SqldStorage;

pub enum WalImpl {
LibsqlWal {
shared: Arc<SharedWal<StdIO>>,
shared: Arc<SharedWal<StdIO, SqldStorage>>,
},
SqliteWal {
meta: WalIndexMeta,
Expand All @@ -52,7 +53,7 @@ impl WalImpl {
})
}

pub fn new_libsql(shared: Arc<SharedWal<StdIO>>) -> Self {
pub fn new_libsql(shared: Arc<SharedWal<StdIO, SqldStorage>>) -> Self {
Self::LibsqlWal { shared }
}

Expand Down
4 changes: 2 additions & 2 deletions libsql-server/src/rpc/replication/libsql_replicator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,12 +82,12 @@ pin_project_lite::pin_project! {
#[pin]
inner: S,
flavor: WalFlavor,
shared: Arc<SharedWal<StdIO>>,
shared: Arc<SharedWal<StdIO, SqldStorage>>,
}
}

impl<S> FrameStreamAdapter<S> {
fn new(inner: S, flavor: WalFlavor, shared: Arc<SharedWal<StdIO>>) -> Self {
fn new(inner: S, flavor: WalFlavor, shared: Arc<SharedWal<StdIO, SqldStorage>>) -> Self {
Self {
inner,
flavor,
Expand Down
20 changes: 12 additions & 8 deletions libsql-server/src/wal_toolkit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,11 @@ use libsql_wal::storage::backend::s3::S3Backend;
use libsql_wal::storage::backend::Backend;
use libsql_wal::storage::compaction::strategy::identity::IdentityStrategy;
use libsql_wal::storage::compaction::strategy::log_strategy::LogReductionStrategy;
use libsql_wal::storage::compaction::strategy::PartitionStrategy;
use libsql_wal::storage::compaction::strategy::tiered::LevelsStrategy;
use libsql_wal::storage::compaction::strategy::CompactionStrategy;
use libsql_wal::storage::compaction::Compactor;
use rusqlite::OpenFlags;

#[derive(Clone, Debug, clap::ValueEnum, Copy)]
pub enum CompactStrategy {
Logarithmic,
CompactAll,
}

#[derive(Debug, clap::Subcommand)]
pub enum WalToolkitCommand {
Monitor(MonitorCommand),
Expand Down Expand Up @@ -119,6 +114,13 @@ impl SyncCommand {
}
}

#[derive(Clone, Debug, clap::ValueEnum, Copy)]
pub enum CompactStrategy {
Logarithmic,
CompactAll,
Tiered,
}

#[derive(Debug, clap::Args)]
/// Compact segments into bigger segments
pub struct CompactCommand {
Expand Down Expand Up @@ -168,10 +170,12 @@ impl CompactCommand {
namespace: &NamespaceName,
) -> anyhow::Result<()> {
let analysis = compactor.analyze(&namespace)?;
let strat: Box<dyn PartitionStrategy> = match self.strategy {
let strat: Box<dyn CompactionStrategy> = match self.strategy {
CompactStrategy::Logarithmic => Box::new(LogReductionStrategy),
CompactStrategy::CompactAll => Box::new(IdentityStrategy),
CompactStrategy::Tiered => Box::new(LevelsStrategy::new(self.threshold)),
};

let set = analysis.shortest_restore_path();
if set.len() <= self.threshold {
println!(
Expand Down
3 changes: 3 additions & 0 deletions libsql-server/tests/cluster/replication.rs
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,9 @@ fn replica_interactive_transaction() {
.unwrap();
tx.commit().await.unwrap();

// libsql-client doesn't support read your writes yet
tokio::time::sleep(Duration::from_secs(1)).await;

let count = conn
.query("select count(0) from test", ())
.await
Expand Down
2 changes: 1 addition & 1 deletion libsql-wal/benches/benchmarks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ fn prepare_for_random_reads<W: Wal>(conn: &mut Connection<W>) {
}
}

fn with_libsql_conn(f: impl FnOnce(&mut Connection<LibsqlWal<StdIO>>)) {
fn with_libsql_conn(f: impl FnOnce(&mut Connection<LibsqlWal<StdIO, NoStorage>>)) {
let tmp = tempdir().unwrap();
let resolver = |_: &Path| NamespaceName::from_string("test".into());

Expand Down
5 changes: 4 additions & 1 deletion libsql-wal/src/checkpointer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ use tokio::task::JoinSet;

use crate::io::Io;
use crate::registry::WalRegistry;
use crate::segment::sealed::SealedSegment;
use crate::storage::Storage;

pub(crate) type NotifyCheckpointer = mpsc::Sender<NamespaceName>;

Expand All @@ -29,7 +31,7 @@ pub type LibsqlCheckpointer<IO, S> = Checkpointer<WalRegistry<IO, S>>;
impl<IO, S> LibsqlCheckpointer<IO, S>
where
IO: Io,
S: Sync + Send + 'static,
S: Storage<Segment = SealedSegment<IO::File>>,
{
pub fn new(
registry: Arc<WalRegistry<IO, S>>,
Expand All @@ -51,6 +53,7 @@ impl<IO, S> PerformCheckpoint for WalRegistry<IO, S>
where
IO: Io,
S: Sync + Send + 'static,
S: Storage<Segment = SealedSegment<IO::File>>,
{
#[tracing::instrument(skip(self))]
fn checkpoint(
Expand Down
11 changes: 11 additions & 0 deletions libsql-wal/src/io/buf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,17 @@ impl<T> ZeroCopyBoxIoBuf<T> {
Self { init: 0, inner }
}

/// same as new_uninit, but partially fills the buffer starting at offset
///
/// # Safety: The caller must ensure that the remaining bytes are initialized
pub unsafe fn new_uninit_partial(inner: Box<T>, offset: usize) -> Self {
assert!(offset < size_of::<T>());
Self {
inner,
init: offset,
}
}

fn is_init(&self) -> bool {
self.init == size_of::<T>()
}
Expand Down
Loading

0 comments on commit 9b5cc27

Please sign in to comment.