Skip to content

Commit

Permalink
doc: documents segmented_log::segment::Segment
Browse files Browse the repository at this point in the history
  • Loading branch information
arindas committed May 24, 2024
1 parent 09f67d2 commit e4900cd
Showing 1 changed file with 61 additions and 0 deletions.
61 changes: 61 additions & 0 deletions src/storage/commit_log/segmented_log/segment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,25 @@ use std::{
time::{Duration, Instant},
};

/// [`Store`] and [`Index`] size configuration for a [`Segment`].
#[derive(Debug, Default, Clone, Copy, Serialize, Deserialize)]
pub struct Config<Size> {
pub max_store_size: Size,
pub max_store_overflow: Size,
pub max_index_size: Size,
}

/// A segment unit in a [`SegmentedLog`](super::SegmentedLog).
///
/// <p align="center">
/// <img src="https://raw.githubusercontent.com/arindas/laminarmq/assets/assets/diagrams/laminarmq-indexed-segmented-log-segment.drawio.png" alt="segmented_log_segment" />
/// </p>
/// <p align="center">
/// <b>Fig:</b> <code>Segment</code> diagram showing <code>Index</code>, mapping logical indices
/// to<code>Store</code> positions and a <code>Store</code> persisting record bytes at the
/// demarcated positions.
/// </p>

pub struct Segment<S, M, H, Idx, Size, SERP> {
index: Index<S, Idx>,
store: Store<S, H>,
Expand Down Expand Up @@ -80,17 +92,39 @@ where
}
}

/// Error type associated with operations on [`Segment`].
#[derive(Debug)]
pub enum SegmentError<StorageError, SerDeError> {
/// Used to denote errors from the backing [`Storage`] implementation.
StorageError(StorageError),

/// Used to denote errors from the underlying [`Store`].
StoreError(StoreError<StorageError>),

/// Used to denote errors from the underlying [`Index`].
IndexError(IndexError<StorageError>),

/// Used when the type used for representing positions is incompatible with [`u64`].
IncompatiblePositionType,

/// Used to denote errors when serializing or deserializing data. (for instance, [`Record`]
/// metadata)
SerializationError(SerDeError),

/// Used when the metadata associated with a [`Record`] is not found.
RecordMetadataNotFound,

/// Used when the provided append index is not the hghest index of the [`Segment`].
InvalidAppendIdx,

/// Used when the [`Segment`] is unable to regenerate an [`IndexRecord`] from the position and
/// [`RecordHeader`](super::store::common::RecordHeader).
InvalidIndexRecordGenerated,

/// Used when usize cannot be coerced to u32 and vice versa.
UsizeU32Inconvertible,

/// Used when a given [`Segment`] maxes out its capacity when we append to it.
SegmentMaxed,
}

Expand All @@ -111,6 +145,7 @@ where
{
}

#[doc(hidden)]
pub type SegmentOpError<S, SERP> =
SegmentError<<S as Storage>::Error, <SERP as SerializationProvider>::Error>;

Expand Down Expand Up @@ -220,6 +255,15 @@ where
Ok(write_index)
}

/// Appends a new [`Record`] to this [`Segment`].
///
/// Serializes the record metadata and bytes and writes them to the backing [`Store`]. Also
/// makes an [`IndexRecord`] entry in the underlying [`Index`] to keep track of the [`Record`].
///
/// Returns the index at which the [`Record`] was written.
///
/// Errors out with a [`SegmentError`] when necessary. Refer to [`SegmentError`] for more info
/// about error situations and types.
pub async fn append<XBuf, X, XE>(
&mut self,
record: Record<M, Idx, X>,
Expand Down Expand Up @@ -291,6 +335,8 @@ where
Idx: Serialize,
SERP: SerializationProvider,
{
/// Like [`Segment::append`] but the [`Record`] contains a contiguous slice of bytes, as
/// opposed to a stream.
pub async fn append_record_with_contiguous_bytes<X>(
&mut self,
record: &Record<M, Idx, X>,
Expand Down Expand Up @@ -399,31 +445,46 @@ where
SERP: SerializationProvider,
Idx: Unsigned + FromPrimitive + Copy + Eq,
{
/// Caches the [`Index`] contents i.e [`IndexRecord`] instances in memory for fast lookup.
pub async fn cache_index(&mut self) -> Result<(), SegmentError<S::Error, SERP::Error>> {
self.index.cache().await.map_err(SegmentError::IndexError)
}

/// Takes the cached [`IndexRecord`] instances from this [`Segment`], leaving [`None`] in their
/// place.
pub fn take_cached_index_records(&mut self) -> Option<Vec<IndexRecord>> {
self.index.take_cached_index_records()
}

/// Returns a reference to the cached [`IndexRecord`] instances.
pub fn cached_index_records(&self) -> Option<&Vec<IndexRecord>> {
self.index.cached_index_records()
}
}

/// Backing storage for an [`Index`] and [`Store`] within a [`Segment`].
pub struct SegmentStorage<S> {
pub store: S,
pub index: S,
}

/// Provides backing storage for [`Segment`] instances.
///
/// Used to abstract the mechanism of acquiring storage handles from the underlying persistent
/// media.
#[async_trait(?Send)]
pub trait SegmentStorageProvider<S, Idx>
where
S: Storage,
{
/// Returns the base indices of all the [`Segment`] instances persisted in this storage media.
async fn obtain_base_indices_of_stored_segments(&mut self) -> Result<Vec<Idx>, S::Error>;

/// Obtains a [`SegmentStorage`] instance for a [`Segment`] with the given `idx` as their base
/// index.
///
/// Implementations are required to allocate/arrange new storage handles if a [`Segment`] with
/// the given base index is not already persisted on the underlying storage media.
async fn obtain(&mut self, idx: &Idx) -> Result<SegmentStorage<S>, S::Error>;
}

Expand Down

0 comments on commit e4900cd

Please sign in to comment.