Skip to content

Commit

Permalink
move open logic to SharedWal
Browse files Browse the repository at this point in the history
  • Loading branch information
MarinPostma committed Oct 9, 2024
1 parent 1fd9118 commit cb2cced
Show file tree
Hide file tree
Showing 15 changed files with 390 additions and 368 deletions.
13 changes: 7 additions & 6 deletions libsql-replication/src/injector/libsql_injector.rs
Original file line number Diff line number Diff line change
@@ -1,26 +1,27 @@
use std::mem::size_of;

use libsql_wal::io::StdIO;
use libsql_wal::replication::injector::Injector;
use libsql_wal::segment::sealed::SealedSegment;
use libsql_wal::segment::Frame as WalFrame;
use libsql_wal::{io::StdIO, storage::Storage};
use zerocopy::{AsBytes, FromZeroes};

use crate::frame::FrameNo;
use crate::rpc::replication::Frame as RpcFrame;

use super::error::{Error, Result};

pub struct LibsqlInjector {
injector: Injector<StdIO>,
pub struct LibsqlInjector<S> {
injector: Injector<StdIO, S>,
}

impl LibsqlInjector {
pub fn new(injector: Injector<StdIO>) -> Self {
impl<S> LibsqlInjector<S> {
pub fn new(injector: Injector<StdIO, S>) -> Self {
Self { injector }
}
}

impl super::Injector for LibsqlInjector {
impl<S: Storage<Segment = SealedSegment<std::fs::File>>> super::Injector for LibsqlInjector<S> {
async fn inject_frame(&mut self, frame: RpcFrame) -> Result<Option<FrameNo>> {
// this is a bit annoying be we want to read the frame, and it has to be aligned, so we
// must copy it...
Expand Down
2 changes: 1 addition & 1 deletion libsql-server/src/connection/connection_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ pub type InnerWal = Either3<Sqlite3Wal, LibsqlWal<StdIO>, DurableWal>;
pub type InnerWalManager = Either<Sqlite3WalManager, LibsqlWalManager<StdIO, SqldStorage>>;

#[cfg(not(feature = "durable-wal"))]
pub type InnerWal = Either<Sqlite3Wal, LibsqlWal<StdIO>>;
pub type InnerWal = Either<Sqlite3Wal, LibsqlWal<StdIO, SqldStorage>>;
pub type ManagedConnectionWal = WrappedWal<ManagedConnectionWalWrapper, InnerWal>;

#[derive(Copy, Clone, Debug)]
Expand Down
2 changes: 1 addition & 1 deletion libsql-server/src/connection/libsql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ impl MakeConnection for MakeLibsqlConnection {

#[derive(Clone)]
pub struct LibsqlConnection {
inner: Arc<Mutex<CoreConnection<LibsqlWal<StdIO>>>>,
inner: Arc<Mutex<CoreConnection<LibsqlWal<StdIO, SqldStorage>>>>,
}

impl LibsqlConnection {
Expand Down
2 changes: 1 addition & 1 deletion libsql-server/src/namespace/configurator/libsql_fork.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ pub(crate) async fn libsql_wal_fork(
}

async fn try_inject(
to_shared: Arc<SharedWal<StdIO>>,
to_shared: Arc<SharedWal<StdIO, SqldStorage>>,
stream: &mut Pin<
Box<dyn Stream<Item = Result<Box<Frame>, libsql_wal::replication::Error>> + Send + '_>,
>,
Expand Down
5 changes: 3 additions & 2 deletions libsql-server/src/replication/replicator_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,11 @@ use crate::metrics::{
use crate::namespace::meta_store::MetaStoreHandle;
use crate::namespace::{NamespaceName, NamespaceStore};
use crate::replication::FrameNo;
use crate::SqldStorage;

pub enum WalImpl {
LibsqlWal {
shared: Arc<SharedWal<StdIO>>,
shared: Arc<SharedWal<StdIO, SqldStorage>>,
},
SqliteWal {
meta: WalIndexMeta,
Expand All @@ -52,7 +53,7 @@ impl WalImpl {
})
}

pub fn new_libsql(shared: Arc<SharedWal<StdIO>>) -> Self {
pub fn new_libsql(shared: Arc<SharedWal<StdIO, SqldStorage>>) -> Self {
Self::LibsqlWal { shared }
}

Expand Down
4 changes: 2 additions & 2 deletions libsql-server/src/rpc/replication/libsql_replicator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,12 +82,12 @@ pin_project_lite::pin_project! {
#[pin]
inner: S,
flavor: WalFlavor,
shared: Arc<SharedWal<StdIO>>,
shared: Arc<SharedWal<StdIO, SqldStorage>>,
}
}

impl<S> FrameStreamAdapter<S> {
fn new(inner: S, flavor: WalFlavor, shared: Arc<SharedWal<StdIO>>) -> Self {
fn new(inner: S, flavor: WalFlavor, shared: Arc<SharedWal<StdIO, SqldStorage>>) -> Self {
Self {
inner,
flavor,
Expand Down
2 changes: 1 addition & 1 deletion libsql-wal/benches/benchmarks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ fn prepare_for_random_reads<W: Wal>(conn: &mut Connection<W>) {
}
}

fn with_libsql_conn(f: impl FnOnce(&mut Connection<LibsqlWal<StdIO>>)) {
fn with_libsql_conn(f: impl FnOnce(&mut Connection<LibsqlWal<StdIO, NoStorage>>)) {
let tmp = tempdir().unwrap();
let resolver = |_: &Path| NamespaceName::from_string("test".into());

Expand Down
5 changes: 4 additions & 1 deletion libsql-wal/src/checkpointer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ use tokio::task::JoinSet;

use crate::io::Io;
use crate::registry::WalRegistry;
use crate::segment::sealed::SealedSegment;
use crate::storage::Storage;

pub(crate) type NotifyCheckpointer = mpsc::Sender<NamespaceName>;

Expand All @@ -29,7 +31,7 @@ pub type LibsqlCheckpointer<IO, S> = Checkpointer<WalRegistry<IO, S>>;
impl<IO, S> LibsqlCheckpointer<IO, S>
where
IO: Io,
S: Sync + Send + 'static,
S: Storage<Segment = SealedSegment<IO::File>>,
{
pub fn new(
registry: Arc<WalRegistry<IO, S>>,
Expand All @@ -51,6 +53,7 @@ impl<IO, S> PerformCheckpoint for WalRegistry<IO, S>
where
IO: Io,
S: Sync + Send + 'static,
S: Storage<Segment = SealedSegment<IO::File>>,
{
#[tracing::instrument(skip(self))]
fn checkpoint(
Expand Down
11 changes: 7 additions & 4 deletions libsql-wal/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ pub mod test {
Self { tmp, registry, wal }
}

pub fn shared(&self, namespace: &str) -> Arc<SharedWal<IO>> {
pub fn shared(&self, namespace: &str) -> Arc<SharedWal<IO, TestStorage<IO>>> {
let path = self.tmp.path().join(namespace).join("data");
let registry = self.registry.clone();
let namespace = NamespaceName::from_string(namespace.into());
Expand All @@ -135,7 +135,10 @@ pub mod test {
self.tmp.path().join(namespace)
}

pub fn open_conn(&self, namespace: &'static str) -> libsql_sys::Connection<LibsqlWal<IO>> {
pub fn open_conn(
&self,
namespace: &'static str,
) -> libsql_sys::Connection<LibsqlWal<IO, TestStorage<IO>>> {
let path = self.db_path(namespace);
let wal = self.wal.clone();
std::fs::create_dir_all(&path).unwrap();
Expand All @@ -159,7 +162,7 @@ pub mod test {
}
}

pub fn seal_current_segment<IO: Io>(shared: &SharedWal<IO>) {
pub fn seal_current_segment<IO: Io>(shared: &SharedWal<IO, TestStorage<IO>>) {
let mut tx = shared.begin_read(99999).into();
shared.upgrade(&mut tx).unwrap();
{
Expand All @@ -170,7 +173,7 @@ pub mod test {
tx.end();
}

pub async fn wait_current_durable<IO: Io>(shared: &SharedWal<IO>) {
pub async fn wait_current_durable<IO: Io>(shared: &SharedWal<IO, TestStorage>) {
let current = shared.current.load().next_frame_no().get() - 1;
loop {
{
Expand Down
Loading

0 comments on commit cb2cced

Please sign in to comment.