Skip to content

Commit

Permalink
Merge pull request #1782 from tursodatabase/tiered-compaction
Browse files Browse the repository at this point in the history
tiered compaction
  • Loading branch information
MarinPostma authored Oct 8, 2024
2 parents 21ae561 + f806789 commit ce0804a
Show file tree
Hide file tree
Showing 11 changed files with 359 additions and 14 deletions.
20 changes: 12 additions & 8 deletions libsql-server/src/wal_toolkit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,12 @@ use libsql_wal::io::StdIO;
use libsql_wal::storage::backend::s3::S3Backend;
use libsql_wal::storage::backend::Backend;
use libsql_wal::storage::compaction::strategy::identity::IdentityStrategy;
use libsql_wal::storage::compaction::strategy::levels::LevelsStrategy;
use libsql_wal::storage::compaction::strategy::log_strategy::LogReductionStrategy;
use libsql_wal::storage::compaction::strategy::PartitionStrategy;
use libsql_wal::storage::compaction::strategy::CompactionStrategy;
use libsql_wal::storage::compaction::Compactor;
use rusqlite::OpenFlags;

#[derive(Clone, Debug, clap::ValueEnum, Copy)]
pub enum CompactStrategy {
Logarithmic,
CompactAll,
}

#[derive(Debug, clap::Subcommand)]
pub enum WalToolkitCommand {
Monitor(MonitorCommand),
Expand Down Expand Up @@ -119,6 +114,13 @@ impl SyncCommand {
}
}

#[derive(Clone, Debug, clap::ValueEnum, Copy)]
pub enum CompactStrategy {
Logarithmic,
CompactAll,
Levels,
}

#[derive(Debug, clap::Args)]
/// Compact segments into bigger segments
pub struct CompactCommand {
Expand Down Expand Up @@ -168,10 +170,12 @@ impl CompactCommand {
namespace: &NamespaceName,
) -> anyhow::Result<()> {
let analysis = compactor.analyze(&namespace)?;
let strat: Box<dyn PartitionStrategy> = match self.strategy {
let strat: Box<dyn CompactionStrategy> = match self.strategy {
CompactStrategy::Logarithmic => Box::new(LogReductionStrategy),
CompactStrategy::CompactAll => Box::new(IdentityStrategy),
CompactStrategy::Levels => Box::new(LevelsStrategy::new(self.threshold)),
};

let set = analysis.shortest_restore_path();
if set.len() <= self.threshold {
println!(
Expand Down
12 changes: 11 additions & 1 deletion libsql-wal/src/storage/compaction/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -591,13 +591,23 @@ impl AnalyzedSegments {

/// A set of segments, with the guarantee that segments are non-overlapping and increasing in
/// frameno
#[derive(Clone)]
#[derive(Clone, Debug)]
pub struct SegmentSet {
namespace: NamespaceName,
segments: Vec<SegmentKey>,
}

impl SegmentSet {
/// return segments end - start
pub fn span(&self) -> u64 {
if self.is_empty() {
0
} else {
self.segments.last().unwrap().end_frame_no
- self.segments.first().unwrap().start_frame_no
}
}

pub fn range(&self) -> Option<(u64, u64)> {
self.segments
.first()
Expand Down
4 changes: 2 additions & 2 deletions libsql-wal/src/storage/compaction/strategy/identity.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
use crate::storage::compaction::SegmentSet;

use super::PartitionStrategy;
use super::CompactionStrategy;

/// partition strategy that doesn't split the passed set
pub struct IdentityStrategy;

impl PartitionStrategy for IdentityStrategy {
impl CompactionStrategy for IdentityStrategy {
fn partition(&self, segments: &SegmentSet) -> Vec<SegmentSet> {
let mut out = Vec::with_capacity(1);
out.push(segments.clone());
Expand Down
180 changes: 180 additions & 0 deletions libsql-wal/src/storage/compaction/strategy/levels.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
//! The `LevelsStrategy` is a partial compaction strategy that compacts segments in increasingly
//! bigger sizes. Is squashes segments starting from the most recent, and working towards the
//! oldest. Segments are added to the result set as follow:
//! - if the number of segments in the set is less than the strategy's threshold, no compaction
//! occurs
//! - else, add all segments above the threshold to the result set
//! - while the span of the result set is greater than the next considered segment, add that
//! segmetn to the set too.
//!
//! In the following example, we identify segments to their span for clarity:
//!
//! initial : input_set: [10, 7, 21, 14] result_set: [], threshold: 3
//! step 1 => input_set: [10, 7] result_set: [21, 14] | span of the result set: 21 + 14 = 35
//! step 2 => input_set: [10] result_set: [7, 21, 14] | 35 > 7, new span: 35 + 7 = 42
//! step 3 => input_set: [] result_set: [10, 7, 21, 14] | 42 > 10, new span: 42 + 10 = 52
//! resulting segment (after compaction) [52]
//! after more segment are added to the set:
//! [52, 5, 6, 4] -> [52, 15] (15 is less than 52)
//!
//! remarks:
//! - this compaction will always shrink the input set size, as long as it's length is greater than
//! the strategy threshold: the input set will at worst be threshold - 1 segments long
//! - segments grow toward the size of the first segment in the set. The goal is to delays having
//! to merge large segments, so that we don't need to fetch them too often, and write back large
//! segments to the storage. The idea is that we maximize the 'useful' information contained by
//! smaller segments toward the head of the set.
use crate::storage::{compaction::SegmentSet, SegmentKey};

use super::CompactionStrategy;

pub struct LevelsStrategy {
threshold: usize,
}

impl LevelsStrategy {
pub fn new(threshold: usize) -> Self {
Self { threshold }
}
}

impl CompactionStrategy for LevelsStrategy {
fn partition(&self, segments: &SegmentSet) -> Vec<SegmentSet> {
// no need to compact, not enough segments
if segments.len() < self.threshold {
return Vec::new();
}

let overflow = segments.len() - self.threshold + 1;

let mut out = segments
.iter()
.rev()
.cloned()
.take(overflow)
.collect::<Vec<_>>();

let remaining_segs = &segments[..segments.len() - overflow];

for seg in remaining_segs.iter().rev() {
if span(&out) >= seg.span() {
out.push(seg.clone());
}
}

// segments are in inverted order, so we need to reverse the array
out.reverse();

vec![SegmentSet {
namespace: segments.namespace.clone(),
segments: out,
}]
}
}

// returns the amount of frame spanned by the passed set of segments.
//
// the passed set is expected to be non-emtpy, in reverse segment order
fn span(segs: &[SegmentKey]) -> u64 {
debug_assert!(!segs.is_empty());
debug_assert!(segs.first().unwrap().start_frame_no >= segs.last().unwrap().start_frame_no);

segs.first().unwrap().end_frame_no - segs.last().unwrap().start_frame_no
}

#[cfg(test)]
mod test {
use insta::assert_debug_snapshot;
use libsql_sys::name::NamespaceName;

use super::*;

#[test]
fn partition_tiered() {
let ns = NamespaceName::from_string("test".into());
let s = LevelsStrategy { threshold: 5 };
let mut set = SegmentSet {
namespace: ns.clone(),
segments: vec![
SegmentKey {
start_frame_no: 1,
end_frame_no: 20,
timestamp: 0,
},
SegmentKey {
start_frame_no: 21,
end_frame_no: 27,
timestamp: 0,
},
SegmentKey {
start_frame_no: 28,
end_frame_no: 41,
timestamp: 0,
},
],
};

assert!(s.partition(&set).is_empty());

set.segments.push(SegmentKey {
start_frame_no: 42,
end_frame_no: 70,
timestamp: 0,
});
set.segments.push(SegmentKey {
start_frame_no: 71,
end_frame_no: 81,
timestamp: 0,
});
set.segments.push(SegmentKey {
start_frame_no: 82,
end_frame_no: 100,
timestamp: 0,
});

let partition = s.partition(&set);
assert_eq!(partition.len(), 1);
// we should compact all segments into one
assert_debug_snapshot!(partition.first().unwrap());

let set = SegmentSet {
namespace: ns.clone(),
segments: vec![
SegmentKey {
start_frame_no: 1,
end_frame_no: 100,
timestamp: 0,
},
SegmentKey {
start_frame_no: 101,
end_frame_no: 105,
timestamp: 0,
},
SegmentKey {
start_frame_no: 106,
end_frame_no: 110,
timestamp: 0,
},
SegmentKey {
start_frame_no: 111,
end_frame_no: 115,
timestamp: 0,
},
SegmentKey {
start_frame_no: 116,
end_frame_no: 120,
timestamp: 0,
},
SegmentKey {
start_frame_no: 121,
end_frame_no: 122,
timestamp: 0,
},
],
};

let partition = s.partition(&set);
assert_eq!(partition.len(), 1);
assert_debug_snapshot!(partition.first().unwrap());
}
}
4 changes: 2 additions & 2 deletions libsql-wal/src/storage/compaction/strategy/log_strategy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@ use std::ops::Deref as _;

use crate::storage::compaction::SegmentSet;

use super::PartitionStrategy;
use super::CompactionStrategy;

/// partition the SegmentSet in logarithmically reducing sets
pub struct LogReductionStrategy;

impl PartitionStrategy for LogReductionStrategy {
impl CompactionStrategy for LogReductionStrategy {
fn partition(&self, segments: &SegmentSet) -> Vec<SegmentSet> {
let mut segs = segments.deref();
let mut out = Vec::new();
Expand Down
3 changes: 2 additions & 1 deletion libsql-wal/src/storage/compaction/strategy/mod.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
use super::SegmentSet;

pub mod identity;
pub mod levels;
pub mod log_strategy;

pub trait PartitionStrategy {
pub trait CompactionStrategy {
fn partition(&self, segments: &SegmentSet) -> Vec<SegmentSet>;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
---
source: libsql-wal/src/storage/compaction/strategy/levels.rs
expression: partition.first().unwrap()
---
SegmentSet {
namespace: test,
segments: [
SegmentKey {
start_frame_no: 101,
end_frame_no: 105,
timestamp: 1970-01-01T00:00:00Z,
},
SegmentKey {
start_frame_no: 106,
end_frame_no: 110,
timestamp: 1970-01-01T00:00:00Z,
},
SegmentKey {
start_frame_no: 111,
end_frame_no: 115,
timestamp: 1970-01-01T00:00:00Z,
},
SegmentKey {
start_frame_no: 116,
end_frame_no: 120,
timestamp: 1970-01-01T00:00:00Z,
},
SegmentKey {
start_frame_no: 121,
end_frame_no: 122,
timestamp: 1970-01-01T00:00:00Z,
},
],
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
---
source: libsql-wal/src/storage/compaction/strategy/levels.rs
expression: partition.first().unwrap()
---
SegmentSet {
namespace: test,
segments: [
SegmentKey {
start_frame_no: 1,
end_frame_no: 20,
timestamp: 1970-01-01T00:00:00Z,
},
SegmentKey {
start_frame_no: 21,
end_frame_no: 27,
timestamp: 1970-01-01T00:00:00Z,
},
SegmentKey {
start_frame_no: 28,
end_frame_no: 41,
timestamp: 1970-01-01T00:00:00Z,
},
SegmentKey {
start_frame_no: 42,
end_frame_no: 70,
timestamp: 1970-01-01T00:00:00Z,
},
SegmentKey {
start_frame_no: 71,
end_frame_no: 81,
timestamp: 1970-01-01T00:00:00Z,
},
SegmentKey {
start_frame_no: 82,
end_frame_no: 100,
timestamp: 1970-01-01T00:00:00Z,
},
],
}
Loading

0 comments on commit ce0804a

Please sign in to comment.