diff --git a/.github/workflows/backwards_compatibility.yml b/.github/workflows/backwards_compatibility.yml index d8e3d73a8..075f2613c 100644 --- a/.github/workflows/backwards_compatibility.yml +++ b/.github/workflows/backwards_compatibility.yml @@ -74,6 +74,8 @@ jobs: runs-on: ubuntu-latest needs: check_commit_message if: ${{ needs.check_commit_message.outputs.should_skip != 'true' }} + env: + BRANCH_NAME: ${{ github.head_ref || github.ref_name }} steps: - run: echo "${{ needs.check_commit_message.outputs.should_skip == 'true' }}" @@ -87,63 +89,66 @@ jobs: with: key: "v2" - - name: Build (PR) - run: cargo build + - name: Reset to origin/master + run: git fetch && git reset --hard origin/master + + - name: Build iggy-server (origin/master) + run: IGGY_CI_BUILD=true cargo build - uses: JarvusInnovations/background-action@v1 - name: Run iggy-server in background (PR) + name: Run iggy-server in background (origin/master) with: run: | target/debug/iggy-server & wait-on: tcp:localhost:8090 wait-for: 1m log-output: true - log-output-if: true + log-output-if: timeout tail: true - - name: Run send bench (PR) + - name: Run send bench (origin/master) timeout-minutes: 1 - run: target/debug/iggy-bench --warmup-time 0 --verbose send --message-batches 25 --messages-per-batch 25 tcp + run: target/debug/iggy-bench --warmup-time 0 --verbose send --message-batches 50 --messages-per-batch 100 tcp - - name: Stop iggy-server + - name: Run poll bench (origin/master) timeout-minutes: 1 - run: pkill iggy-server && while pgrep -l iggy-server; do sleep 1; done; + run: target/debug/iggy-bench --warmup-time 0 --verbose poll --message-batches 50 --messages-per-batch 100 tcp + + - name: Stop iggy-server (origin/master) + timeout-minutes: 1 + run: pkill -15 iggy-server && while pgrep -l iggy-server; do sleep 2; done; - - name: Print iggy-server logs (PR) + - name: Print iggy-server logs (origin/master) run: cat local_data/logs/iggy* - - name: Remove iggy-server logs (PR) + - name: Remove iggy-server logs (origin/master) run: rm local_data/logs/iggy* - - name: Reset to origin/master - run: git reset --hard origin/master + - name: Reset to pull request branch (PR) + run: git reset --hard origin/$BRANCH_NAME - - name: Build (origin/master) - run: cargo build + - name: Build iggy-server (PR) + run: IGGY_CI_BUILD=true cargo build - uses: JarvusInnovations/background-action@v1 - name: Run iggy-server in background (origin/master) + name: Run iggy-server in background (PR) with: run: | target/debug/iggy-server & wait-on: tcp:localhost:8090 wait-for: 1m log-output: true - log-output-if: true + log-output-if: timeout tail: true - - name: Run poll bench (origin/master) - timeout-minutes: 1 - run: target/debug/iggy-bench --warmup-time 0 --verbose poll --message-batches 25 --messages-per-batch 25 tcp - - - name: Run send bench (origin/master) + - name: Run poll bench (PR) timeout-minutes: 1 - run: target/debug/iggy-bench --warmup-time 0 --verbose send --message-batches 25 --messages-per-batch 25 tcp + run: target/debug/iggy-bench --warmup-time 0 --verbose poll --message-batches 50 --messages-per-batch 100 tcp - - name: Stop iggy-server + - name: Stop iggy-server (PR) timeout-minutes: 1 run: pkill iggy-server && while pgrep -l iggy-server; do sleep 1; done; - - name: Print server logs (origin/master) + - name: Print server logs (PR) run: cat local_data/logs/iggy* diff --git a/Cargo.lock b/Cargo.lock index ea374f061..09f857b12 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3998,11 +3998,10 @@ dependencies = [ [[package]] name = "server" -version = "0.4.60" +version = "0.4.70" dependencies = [ "ahash 0.8.11", "anyhow", - "async-stream", "async-trait", "atone", "axum", diff --git a/configs/server.json b/configs/server.json index 548353f85..a7bade19f 100644 --- a/configs/server.json +++ b/configs/server.json @@ -184,7 +184,6 @@ "segment": { "size": "1 GB", "cache_indexes": true, - "cache_time_indexes": true, "message_expiry": "none", "archive_expired": false }, diff --git a/configs/server.toml b/configs/server.toml index d1f600042..5fe2120da 100644 --- a/configs/server.toml +++ b/configs/server.toml @@ -437,16 +437,11 @@ message_expiry = "none" # Configures whether expired segments are archived (boolean) or just deleted without archiving. archive_expired = false -# Controls whether to cache indexes for segment access (boolean). +# Controls whether to cache indexes (time and positional) for segment access (boolean). # `true` keeps indexes in memory, speeding up data retrieval. # `false` reads indexes from disk, which can conserve memory at the cost of access speed. cache_indexes = true -# Determines whether to cache time-based indexes for segments (boolean). -# `true` allows faster timestamp-based data retrieval by keeping indexes in memory. -# `false` conserves memory by reading time indexes from disk, which may slow down access. -cache_time_indexes = true - # Message deduplication configuration [system.message_deduplication] # Controls whether message deduplication is enabled (boolean). diff --git a/integration/tests/streaming/partition.rs b/integration/tests/streaming/partition.rs index 1089dad61..59dbe6e49 100644 --- a/integration/tests/streaming/partition.rs +++ b/integration/tests/streaming/partition.rs @@ -5,7 +5,7 @@ use iggy::utils::timestamp::IggyTimestamp; use server::state::system::PartitionState; use server::streaming::batching::appendable_batch_info::AppendableBatchInfo; use server::streaming::partitions::partition::Partition; -use server::streaming::segments::segment::{INDEX_EXTENSION, LOG_EXTENSION, TIME_INDEX_EXTENSION}; +use server::streaming::segments::segment::{INDEX_EXTENSION, LOG_EXTENSION}; use std::sync::atomic::{AtomicU32, AtomicU64}; use std::sync::Arc; use tokio::fs; @@ -202,10 +202,8 @@ async fn assert_persisted_partition(partition_path: &str, with_segment: bool) { let segment_path = format!("{}/{:0>20}", partition_path, start_offset); let log_path = format!("{}.{}", segment_path, LOG_EXTENSION); let index_path = format!("{}.{}", segment_path, INDEX_EXTENSION); - let time_index_path = format!("{}.{}", segment_path, TIME_INDEX_EXTENSION); assert!(fs::metadata(&log_path).await.is_ok()); assert!(fs::metadata(&index_path).await.is_ok()); - assert!(fs::metadata(&time_index_path).await.is_ok()); } } diff --git a/integration/tests/streaming/segment.rs b/integration/tests/streaming/segment.rs index 7673340bd..70faa72e8 100644 --- a/integration/tests/streaming/segment.rs +++ b/integration/tests/streaming/segment.rs @@ -6,7 +6,7 @@ use iggy::utils::expiry::IggyExpiry; use iggy::utils::{checksum, timestamp::IggyTimestamp}; use server::streaming::models::messages::RetainedMessage; use server::streaming::segments::segment; -use server::streaming::segments::segment::{INDEX_EXTENSION, LOG_EXTENSION, TIME_INDEX_EXTENSION}; +use server::streaming::segments::segment::{INDEX_EXTENSION, LOG_EXTENSION}; use server::streaming::sizeable::Sizeable; use std::sync::atomic::AtomicU64; use std::sync::Arc; @@ -111,7 +111,6 @@ async fn should_load_existing_segment_from_disk() { assert_eq!(loaded_segment.is_closed, segment.is_closed); assert_eq!(loaded_segment.log_path, segment.log_path); assert_eq!(loaded_segment.index_path, segment.index_path); - assert_eq!(loaded_segment.time_index_path, segment.time_index_path); assert!(loaded_messages.is_empty()); } } @@ -353,10 +352,8 @@ async fn assert_persisted_segment(partition_path: &str, start_offset: u64) { let segment_path = format!("{}/{:0>20}", partition_path, start_offset); let log_path = format!("{}.{}", segment_path, LOG_EXTENSION); let index_path = format!("{}.{}", segment_path, INDEX_EXTENSION); - let time_index_path = format!("{}.{}", segment_path, TIME_INDEX_EXTENSION); assert!(fs::metadata(&log_path).await.is_ok()); assert!(fs::metadata(&index_path).await.is_ok()); - assert!(fs::metadata(&time_index_path).await.is_ok()); } fn create_message(offset: u64, payload: &str, timestamp: IggyTimestamp) -> PolledMessage { diff --git a/sdk/src/utils/duration.rs b/sdk/src/utils/duration.rs index 3b20acada..5ace592e4 100644 --- a/sdk/src/utils/duration.rs +++ b/sdk/src/utils/duration.rs @@ -27,6 +27,12 @@ impl IggyDuration { IggyDuration { duration } } + pub fn new_from_secs(secs: u64) -> IggyDuration { + IggyDuration { + duration: Duration::from_secs(secs), + } + } + pub fn as_human_time_string(&self) -> String { format!("{}", format_duration(self.duration)) } diff --git a/server/Cargo.toml b/server/Cargo.toml index b14db14c8..3ec5beaf8 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "server" -version = "0.4.60" +version = "0.4.70" edition = "2021" build = "src/build.rs" @@ -12,7 +12,6 @@ tokio-console = ["dep:console-subscriber", "tokio/tracing"] [dependencies] ahash = { version = "0.8.11" } anyhow = "1.0.86" -async-stream = "0.3.5" async-trait = "0.1.82" atone = "0.3.7" axum = "0.7.5" @@ -36,13 +35,29 @@ moka = { version = "0.12.5", features = ["future"] } openssl = { version = "0.10.66", features = ["vendored"] } opentelemetry = { version = "0.26.0", features = ["trace", "logs"] } opentelemetry-appender-tracing = { version = "0.26.0", features = ["log"] } -opentelemetry-otlp = { version = "0.26.0", features = ["logs", "trace", "grpc-tonic", "http", "http-proto", "reqwest-client", "tokio"] } +opentelemetry-otlp = { version = "0.26.0", features = [ + "logs", + "trace", + "grpc-tonic", + "http", + "http-proto", + "reqwest-client", + "tokio", +] } opentelemetry-semantic-conventions = { version = "0.26.0" } -opentelemetry_sdk = { version = "0.26.0", features = ["rt-tokio", "logs", "trace", "tokio"] } +opentelemetry_sdk = { version = "0.26.0", features = [ + "rt-tokio", + "logs", + "trace", + "tokio", +] } prometheus-client = "0.22.2" quinn = { version = "0.11.5" } rcgen = "0.13.1" -reqwest = { version = "0.12.4", features = ["rustls-tls", "rustls-tls-no-provider"] } +reqwest = { version = "0.12.4", features = [ + "rustls-tls", + "rustls-tls-no-provider", +] } ring = "0.17.8" rmp-serde = "1.3.0" rust-s3 = { version = "0.34.0", features = ["default"] } @@ -79,10 +94,11 @@ tikv-jemallocator = { version = "0.6", optional = true } [build-dependencies] figment = { version = "0.10.18", features = ["json", "toml", "env"] } serde_json = "1.0.127" -vergen-git2 = { version = "1.0.0", features = ["build", +vergen-git2 = { version = "1.0.0", features = [ + "build", "cargo", "rustc", - "si" + "si", ] } [[bin]] diff --git a/server/src/channels/commands/maintain_messages.rs b/server/src/channels/commands/maintain_messages.rs index f68630747..3f6b612cd 100644 --- a/server/src/channels/commands/maintain_messages.rs +++ b/server/src/channels/commands/maintain_messages.rs @@ -439,11 +439,7 @@ async fn archive_segments( } let segment = segment.unwrap(); - let files = [ - segment.index_path.as_ref(), - segment.time_index_path.as_ref(), - segment.log_path.as_ref(), - ]; + let files = [segment.index_path.as_ref(), segment.log_path.as_ref()]; if let Err(error) = archiver.archive(&files, None).await { error!( "Failed to archive segment with start offset: {} for stream ID: {}, topic ID: {}, partition ID: {}. Error: {}", diff --git a/server/src/channels/commands/print_sysinfo.rs b/server/src/channels/commands/print_sysinfo.rs index ad6c7d202..f8b1c9585 100644 --- a/server/src/channels/commands/print_sysinfo.rs +++ b/server/src/channels/commands/print_sysinfo.rs @@ -60,7 +60,7 @@ impl ServerCommand for SysInfoPrintExecutor { / stats.total_memory.as_bytes_u64() as f64) * 100f64; - info!("CPU: {:.2}% / {:.2}% (IggyUsage/Total), Mem: {:.2}% / {} / {} / {} (Free/IggyUsage/TotalUsed/Total), Clients: {}, Messages processed: {}, Read: {}, Written: {}, Run Time: {} s", + info!("CPU: {:.2}% / {:.2}% (IggyUsage/Total), Mem: {:.2}% / {} / {} / {} (Free/IggyUsage/TotalUsed/Total), Clients: {}, Messages processed: {}, Read: {}, Written: {}, Uptime: {}", stats.cpu_usage, stats.total_cpu_usage, free_memory_percent, diff --git a/server/src/compat/index_conversion/index_converter.rs b/server/src/compat/index_conversion/index_converter.rs new file mode 100644 index 000000000..9b34fec92 --- /dev/null +++ b/server/src/compat/index_conversion/index_converter.rs @@ -0,0 +1,128 @@ +use std::path::Path; +use std::time::Duration; + +use crate::streaming::utils::file; +use crate::{server_error::ServerError, streaming::segments::storage::INDEX_SIZE}; +use bytes::{BufMut, BytesMut}; +use tokio::fs::OpenOptions; +use tokio::io::{AsyncReadExt, AsyncWriteExt, BufReader, BufWriter}; +use tokio::time::{sleep, timeout}; +use tracing::{error, info, trace}; + +pub struct IndexConverter { + pub index_path: String, + pub time_index_path: String, +} + +impl IndexConverter { + pub fn new(index_path: String, time_index_path: String) -> Self { + Self { + index_path, + time_index_path, + } + } + + pub async fn migrate(&self) -> Result<(), ServerError> { + let indexes = self.convert_indexes().await?; + self.replace_with_converted(indexes).await?; + + Ok(()) + } + + pub async fn needs_migration(&self) -> Result { + Ok(file::exists(&self.time_index_path).await?) + } + + async fn convert_indexes(&self) -> Result, ServerError> { + let index_file = file::open(&self.index_path).await?; + let time_index_file = file::open(&self.time_index_path).await?; + + let mut index_reader = BufReader::new(index_file); + let mut time_index_reader = BufReader::new(time_index_file); + + let new_index = Vec::new(); + let mut new_index_writer = BufWriter::new(new_index); + + loop { + let relative_offset_result = index_reader.read_u32_le().await; + let position_result = index_reader.read_u32_le().await; + + let time_relative_offset_result = time_index_reader.read_u32_le().await; + let timestamp_result = time_index_reader.read_u64_le().await; + + if relative_offset_result.is_err() + || position_result.is_err() + || time_relative_offset_result.is_err() + || timestamp_result.is_err() + { + trace!("Reached EOF for index file: {}", &self.index_path); + break; + } + + let relative_offset = relative_offset_result?; + let position = position_result?; + let time_relative_offset = time_relative_offset_result?; + let timestamp = timestamp_result?; + + if relative_offset != time_relative_offset { + return Err(ServerError::IndexMigrationError( + "Mismatched relative offsets in normal index: {relative_offset} vs time index: {time_relative_offset}".to_string(), + )); + } + + let mut new_index_entry = BytesMut::with_capacity(INDEX_SIZE as usize); // u32 + u32 + u64 + new_index_entry.put_u32_le(relative_offset); + new_index_entry.put_u32_le(position); + new_index_entry.put_u64_le(timestamp); + + new_index_writer.write_all(&new_index_entry).await?; + } + new_index_writer.flush().await?; + + Ok(new_index_writer.into_inner()) + } + + async fn replace_with_converted(&self, indexes: Vec) -> Result<(), ServerError> { + file::remove(&self.index_path).await?; + file::remove(&self.time_index_path).await?; + + let dir_path = + Path::new(&self.index_path) + .parent() + .ok_or(ServerError::IndexMigrationError( + "Failed to get parent directory of index file".to_string(), + ))?; + + let dir = OpenOptions::new().read(true).open(dir_path).await?; + + dir.sync_all().await.map_err(|e| { + ServerError::IndexMigrationError(format!("Failed to sync data for directory: {}", e)) + })?; + + let wait_duration = Duration::from_secs(2); + let sleep_duration = Duration::from_millis(10); + + let wait_future = async { + while file::exists(&self.time_index_path).await.unwrap_or(false) { + sleep(sleep_duration).await; + } + info!("File {} has been removed", &self.time_index_path); + }; + + if (timeout(wait_duration, wait_future).await).is_err() { + error!( + "Timeout waiting for file {} to be removed", + &self.time_index_path + ); + } + + let new_index_file = file::overwrite(&self.index_path).await?; + let mut new_index_writer = BufWriter::new(new_index_file); + new_index_writer.write_all(&indexes).await?; + new_index_writer.flush().await?; + + info!("Replaced old index with new index at {}", &self.index_path); + + Ok(()) + } +} diff --git a/server/src/compat/index_conversion/mod.rs b/server/src/compat/index_conversion/mod.rs new file mode 100644 index 000000000..ef6bdc058 --- /dev/null +++ b/server/src/compat/index_conversion/mod.rs @@ -0,0 +1 @@ +pub mod index_converter; diff --git a/server/src/compat/message_conversion/binary_schema.rs b/server/src/compat/message_conversion/binary_schema.rs deleted file mode 100644 index f26328aba..000000000 --- a/server/src/compat/message_conversion/binary_schema.rs +++ /dev/null @@ -1,5 +0,0 @@ -#[derive(Debug, Clone, Copy)] -pub enum BinarySchema { - RetainedMessageSchema, - RetainedMessageBatchSchema, -} diff --git a/server/src/compat/message_conversion/chunks_error.rs b/server/src/compat/message_conversion/chunks_error.rs deleted file mode 100644 index 531497642..000000000 --- a/server/src/compat/message_conversion/chunks_error.rs +++ /dev/null @@ -1,18 +0,0 @@ -use futures::stream::TryChunksError; -use iggy::error::IggyError; - -pub trait IntoTryChunksError { - fn into_try_chunks_error(self) -> TryChunksError; -} - -impl IntoTryChunksError for IggyError { - fn into_try_chunks_error(self) -> TryChunksError { - TryChunksError(Vec::new(), self) - } -} - -impl IntoTryChunksError for std::io::Error { - fn into_try_chunks_error(self) -> TryChunksError { - TryChunksError(Vec::new(), IggyError::from(self)) - } -} diff --git a/server/src/compat/message_conversion/conversion_writer.rs b/server/src/compat/message_conversion/conversion_writer.rs deleted file mode 100644 index c38fbea37..000000000 --- a/server/src/compat/message_conversion/conversion_writer.rs +++ /dev/null @@ -1,113 +0,0 @@ -use crate::streaming::utils::file; -use iggy::error::IggyError; -use tracing::trace; - -//TODO (numinex) - Make this writer transactional -pub struct ConversionWriter<'w> { - pub log_path: &'w str, - pub index_path: &'w str, - pub time_index_path: &'w str, - - pub alt_log_path: String, - pub alt_index_path: String, - pub alt_time_index_path: String, - - compat_backup_path: &'w str, -} - -impl<'w> ConversionWriter<'w> { - pub fn init( - log_path: &'w str, - index_path: &'w str, - time_index_path: &'w str, - compat_backup_path: &'w str, - ) -> ConversionWriter<'w> { - ConversionWriter { - log_path, - index_path, - time_index_path, - alt_log_path: format!("{}_temp.{}", log_path.split('.').next().unwrap(), "log"), - alt_index_path: format!("{}_temp.{}", index_path.split('.').next().unwrap(), "index"), - alt_time_index_path: format!( - "{}_temp.{}", - time_index_path.split('.').next().unwrap(), - "timeindex" - ), - compat_backup_path, - } - } - - pub async fn create_alt_directories(&self) -> Result<(), IggyError> { - tokio::fs::File::create(&self.alt_log_path).await?; - tokio::fs::File::create(&self.alt_index_path).await?; - tokio::fs::File::create(&self.alt_time_index_path).await?; - - trace!( - "Created temporary files for conversion, log: {}, index: {}, time_index: {}", - &self.alt_log_path, - &self.alt_index_path, - &self.alt_time_index_path - ); - Ok(()) - } - - pub async fn create_old_segment_backup(&self) -> Result<(), IggyError> { - let log_backup_path = &self - .log_path - .split_once('/') - .map(|(_, path)| format!("{}/{}", &self.compat_backup_path, path)) - .unwrap(); - let index_backup_path = &self - .index_path - .split_once('/') - .map(|(_, path)| format!("{}/{}", self.compat_backup_path, path)) - .unwrap(); - let time_index_backup_path = &self - .time_index_path - .split_once('/') - .map(|(_, path)| format!("{}/{}", self.compat_backup_path, path)) - .unwrap(); - - let log_path_last_idx = log_backup_path.rfind('/').unwrap(); - let index_path_last_idx = index_backup_path.rfind('/').unwrap(); - let time_index_path_last_idx = time_index_backup_path.rfind('/').unwrap(); - if tokio::fs::metadata(&log_backup_path[..log_path_last_idx]) - .await - .is_err() - { - tokio::fs::create_dir_all(&log_backup_path[..log_path_last_idx]).await?; - } - if tokio::fs::metadata(&index_backup_path[..index_path_last_idx]) - .await - .is_err() - { - tokio::fs::create_dir_all(&index_backup_path[..index_path_last_idx]).await?; - } - if tokio::fs::metadata(&time_index_backup_path[..time_index_path_last_idx]) - .await - .is_err() - { - tokio::fs::create_dir_all(&time_index_backup_path[..time_index_path_last_idx]).await?; - } - file::rename(self.log_path, log_backup_path).await?; - file::rename(self.index_path, index_backup_path).await?; - file::rename(self.time_index_path, time_index_backup_path).await?; - - trace!( - "Created backup of converted segment, log: {}, index: {}, time_index: {}", - &log_backup_path, - &index_backup_path, - &time_index_backup_path - ); - Ok(()) - } - - pub async fn replace_with_converted(&self) -> Result<(), IggyError> { - file::rename(&self.alt_log_path, self.log_path).await?; - file::rename(&self.alt_index_path, self.index_path).await?; - file::rename(&self.alt_time_index_path, self.time_index_path).await?; - - trace!("Replaced old segment with newly converted files"); - Ok(()) - } -} diff --git a/server/src/compat/message_conversion/message_converter.rs b/server/src/compat/message_conversion/message_converter.rs deleted file mode 100644 index 908a7ef72..000000000 --- a/server/src/compat/message_conversion/message_converter.rs +++ /dev/null @@ -1,99 +0,0 @@ -use crate::compat::message_conversion::samplers::message_sampler::MessageSampler; -use crate::compat::message_conversion::samplers::retained_batch_sampler::RetainedMessageBatchSampler; -use crate::compat::message_conversion::schema_sampler::BinarySchemaSampler; -use crate::streaming::sizeable::Sizeable; -use bytes::{BufMut, BytesMut}; -use iggy::error::IggyError; - -use crate::streaming::segments::storage::{INDEX_SIZE, TIME_INDEX_SIZE}; -use tokio::io::{AsyncWrite, AsyncWriteExt}; - -pub trait Extendable { - fn extend(&self, bytes: &mut BytesMut); -} - -pub trait MessageFormatConverterPersister { - async fn persist(&self, writer: &mut W) -> Result<(), IggyError>; - async fn persist_index( - &self, - position: u32, - relative_offset: u32, - writer: &mut W, - ) -> Result<(), IggyError>; - async fn persist_time_index( - &self, - timestamp: u64, - relative_offset: u32, - writer: &mut W, - ) -> Result<(), IggyError>; -} - -impl MessageFormatConverterPersister for T -where - T: Sizeable + Extendable, -{ - async fn persist(&self, writer: &mut W) -> Result<(), IggyError> { - let size = self.get_size_bytes(); - let mut batch_bytes = BytesMut::with_capacity(size as usize); - self.extend(&mut batch_bytes); - - writer.write_all(&batch_bytes).await?; - Ok(()) - } - - async fn persist_index( - &self, - position: u32, - relative_offset: u32, - writer: &mut W, - ) -> Result<(), IggyError> { - let mut index_bytes = BytesMut::with_capacity(INDEX_SIZE as usize); - index_bytes.put_u32_le(relative_offset); - index_bytes.put_u32_le(position); - - writer.write_all(&index_bytes).await?; - Ok(()) - } - - async fn persist_time_index( - &self, - timestamp: u64, - relative_offset: u32, - writer: &mut W, - ) -> Result<(), IggyError> { - let mut time_index_bytes = BytesMut::with_capacity(TIME_INDEX_SIZE as usize); - time_index_bytes.put_u32_le(relative_offset); - time_index_bytes.put_u64_le(timestamp); - - writer.write_all(&time_index_bytes).await?; - Ok(()) - } -} - -pub struct MessageFormatConverter { - pub samplers: Vec>, -} - -impl MessageFormatConverter { - pub fn init( - segment_start_offset: u64, - log_path: String, - index_path: String, - ) -> MessageFormatConverter { - // Always append new schemas to beginning of vec - MessageFormatConverter { - samplers: vec![ - Box::new(RetainedMessageBatchSampler::new( - segment_start_offset, - log_path.clone(), - index_path.clone(), - )), - Box::new(MessageSampler::new( - segment_start_offset, - log_path, - index_path, - )), - ], - } - } -} diff --git a/server/src/compat/message_conversion/message_stream.rs b/server/src/compat/message_conversion/message_stream.rs deleted file mode 100644 index f2b922085..000000000 --- a/server/src/compat/message_conversion/message_stream.rs +++ /dev/null @@ -1,6 +0,0 @@ -use futures::Stream; - -pub trait MessageStream { - type Item; - fn into_stream(self) -> impl Stream; -} diff --git a/server/src/compat/message_conversion/mod.rs b/server/src/compat/message_conversion/mod.rs deleted file mode 100644 index 715891011..000000000 --- a/server/src/compat/message_conversion/mod.rs +++ /dev/null @@ -1,9 +0,0 @@ -pub(crate) mod binary_schema; -pub(crate) mod chunks_error; -pub(crate) mod conversion_writer; -pub(crate) mod message_converter; -pub(crate) mod message_stream; -pub(crate) mod samplers; -pub(crate) mod schema_sampler; -pub(crate) mod snapshots; -pub(crate) mod streams; diff --git a/server/src/compat/message_conversion/samplers/message_sampler.rs b/server/src/compat/message_conversion/samplers/message_sampler.rs deleted file mode 100644 index 4e6e98afe..000000000 --- a/server/src/compat/message_conversion/samplers/message_sampler.rs +++ /dev/null @@ -1,53 +0,0 @@ -use crate::compat::message_conversion::binary_schema::BinarySchema; -use crate::compat::message_conversion::schema_sampler::BinarySchemaSampler; -use crate::compat::message_conversion::snapshots::message_snapshot::MessageSnapshot; -use crate::server_error::ServerError; -use crate::streaming::utils::file; -use async_trait::async_trait; -use bytes::{BufMut, BytesMut}; -use tokio::io::AsyncReadExt; - -pub struct MessageSampler { - pub segment_start_offset: u64, - pub log_path: String, - pub index_path: String, -} -impl MessageSampler { - pub fn new(segment_start_offset: u64, log_path: String, index_path: String) -> MessageSampler { - MessageSampler { - segment_start_offset, - log_path, - index_path, - } - } -} - -unsafe impl Send for MessageSampler {} -unsafe impl Sync for MessageSampler {} - -#[async_trait] -impl BinarySchemaSampler for MessageSampler { - async fn try_sample(&self) -> Result { - let mut index_file = file::open(&self.index_path).await?; - let mut log_file = file::open(&self.log_path).await?; - let log_file_size = log_file.metadata().await?.len(); - - if log_file_size == 0 { - return Ok(BinarySchema::RetainedMessageSchema); - } - - let _ = index_file.read_u32_le().await?; - let end_position = index_file.read_u32_le().await?; - - let buffer_size = end_position as usize; - let mut buffer = BytesMut::with_capacity(buffer_size); - buffer.put_bytes(0, buffer_size); - let _ = log_file.read_exact(&mut buffer).await?; - - let message = MessageSnapshot::try_from(buffer.freeze())?; - if message.offset != self.segment_start_offset { - return Err(ServerError::InvalidMessageOffsetFormatConversion); - } - Ok(BinarySchema::RetainedMessageSchema) - } -} diff --git a/server/src/compat/message_conversion/samplers/mod.rs b/server/src/compat/message_conversion/samplers/mod.rs deleted file mode 100644 index d75fb81ff..000000000 --- a/server/src/compat/message_conversion/samplers/mod.rs +++ /dev/null @@ -1,2 +0,0 @@ -pub(crate) mod message_sampler; -pub(crate) mod retained_batch_sampler; diff --git a/server/src/compat/message_conversion/samplers/retained_batch_sampler.rs b/server/src/compat/message_conversion/samplers/retained_batch_sampler.rs deleted file mode 100644 index a05fb191c..000000000 --- a/server/src/compat/message_conversion/samplers/retained_batch_sampler.rs +++ /dev/null @@ -1,63 +0,0 @@ -use crate::compat::message_conversion::binary_schema::BinarySchema; -use crate::compat::message_conversion::schema_sampler::BinarySchemaSampler; -use crate::compat::message_conversion::snapshots::retained_batch_snapshot::RetainedMessageBatchSnapshot; -use crate::server_error::ServerError; -use crate::streaming::utils::file; -use async_trait::async_trait; -use bytes::{BufMut, Bytes}; -use tokio::io::AsyncReadExt; - -pub struct RetainedMessageBatchSampler { - pub segment_start_offset: u64, - pub log_path: String, - pub index_path: String, -} - -impl RetainedMessageBatchSampler { - pub fn new( - segment_start_offset: u64, - log_path: String, - index_path: String, - ) -> RetainedMessageBatchSampler { - RetainedMessageBatchSampler { - segment_start_offset, - log_path, - index_path, - } - } -} - -unsafe impl Send for RetainedMessageBatchSampler {} -unsafe impl Sync for RetainedMessageBatchSampler {} - -#[async_trait] -impl BinarySchemaSampler for RetainedMessageBatchSampler { - async fn try_sample(&self) -> Result { - let mut index_file = file::open(&self.index_path).await?; - let mut log_file = file::open(&self.log_path).await?; - let log_file_size = log_file.metadata().await?.len(); - - if log_file_size == 0 { - return Ok(BinarySchema::RetainedMessageBatchSchema); - } - - let _ = index_file.read_u32_le().await?; - let _ = index_file.read_u32_le().await?; - let second_index_offset = index_file.read_u32_le().await; - let second_end_position = index_file.read_u32_le().await; - - let mut buffer = Vec::new(); - if second_index_offset.is_err() && second_end_position.is_err() { - let _ = log_file.read_to_end(&mut buffer).await?; - } else { - let buffer_size = second_end_position.unwrap() as usize; - buffer.put_bytes(0, buffer_size); - let _ = log_file.read_exact(&mut buffer).await?; - } - let batch = RetainedMessageBatchSnapshot::try_from(Bytes::from(buffer))?; - if batch.base_offset != self.segment_start_offset { - return Err(ServerError::InvalidBatchBaseOffsetFormatConversion); - } - Ok(BinarySchema::RetainedMessageBatchSchema) - } -} diff --git a/server/src/compat/message_conversion/schema_sampler.rs b/server/src/compat/message_conversion/schema_sampler.rs deleted file mode 100644 index 6c1f86f14..000000000 --- a/server/src/compat/message_conversion/schema_sampler.rs +++ /dev/null @@ -1,8 +0,0 @@ -use crate::compat::message_conversion::binary_schema::BinarySchema; -use crate::server_error::ServerError; -use async_trait::async_trait; - -#[async_trait] -pub trait BinarySchemaSampler: Send + Sync { - async fn try_sample(&self) -> Result; -} diff --git a/server/src/compat/message_conversion/snapshots/message_snapshot.rs b/server/src/compat/message_conversion/snapshots/message_snapshot.rs deleted file mode 100644 index dd425389b..000000000 --- a/server/src/compat/message_conversion/snapshots/message_snapshot.rs +++ /dev/null @@ -1,175 +0,0 @@ -use crate::compat::message_conversion::message_converter::Extendable; -use crate::server_error::ServerError; -use crate::streaming::sizeable::Sizeable; -use bytes::{BufMut, Bytes, BytesMut}; -use iggy::bytes_serializable::BytesSerializable; -use iggy::models::header::{self, HeaderKey, HeaderValue}; -use iggy::models::messages::MessageState; -use std::collections::HashMap; - -#[derive(Debug)] -pub struct MessageSnapshot { - pub offset: u64, - pub state: MessageState, - pub timestamp: u64, - pub id: u128, - pub payload: Bytes, - pub checksum: u32, - pub headers: Option>, -} - -impl MessageSnapshot { - pub fn new( - offset: u64, - state: MessageState, - timestamp: u64, - id: u128, - payload: Bytes, - checksum: u32, - headers: Option>, - ) -> MessageSnapshot { - MessageSnapshot { - offset, - state, - timestamp, - id, - payload, - checksum, - headers, - } - } -} - -impl Extendable for MessageSnapshot { - fn extend(&self, bytes: &mut BytesMut) { - let length = self.get_size_bytes() - 4; - let id = self.id; - let offset = self.offset; - let timestamp = self.timestamp; - let payload = self.payload.clone(); - let checksum = self.checksum; - let message_state = self.state; - let headers = &self.headers; - - bytes.put_u32_le(length); - bytes.put_u64_le(offset); - bytes.put_u8(message_state.as_code()); - bytes.put_u64_le(timestamp); - bytes.put_u128_le(id); - bytes.put_u32_le(checksum); - if let Some(headers) = headers { - #[allow(clippy::cast_possible_truncation)] - bytes.put_u32_le(headers.len() as u32); - bytes.put_slice(&headers.to_bytes()); - } else { - bytes.put_u32_le(0u32); - } - bytes.put_slice(&payload); - } -} - -impl Sizeable for MessageSnapshot { - fn get_size_bytes(&self) -> u32 { - let headers_size = header::get_headers_size_bytes(&self.headers); - 41 + headers_size + self.payload.len() as u32 - } -} - -impl TryFrom for MessageSnapshot { - type Error = ServerError; - - fn try_from(value: Bytes) -> Result { - let offset = u64::from_le_bytes( - value - .get(0..8) - .ok_or_else(|| { - ServerError::InvalidMessageFieldFormatConversionSampling( - "Invalid offset bytes for message snapshot".to_owned(), - ) - })? - .try_into()?, - ); - let state = MessageState::from_code(*value.get(8).ok_or_else(|| { - ServerError::InvalidMessageFieldFormatConversionSampling( - "Invalid state for message snapshot".to_owned(), - ) - })?)?; - let timestamp = u64::from_le_bytes( - value - .get(9..17) - .ok_or_else(|| { - ServerError::InvalidMessageFieldFormatConversionSampling( - "Invalid timestamp bytes for message snapshot".to_owned(), - ) - })? - .try_into()?, - ); - let id = u128::from_le_bytes( - value - .get(17..33) - .ok_or_else(|| { - ServerError::InvalidMessageFieldFormatConversionSampling( - "Invalid id bytes for message snapshot".to_owned(), - ) - })? - .try_into()?, - ); - let checksum = u32::from_le_bytes( - value - .get(33..37) - .ok_or_else(|| { - ServerError::InvalidMessageFieldFormatConversionSampling( - "Invalid checksum bytes for message snapshot".to_owned(), - ) - })? - .try_into()?, - ); - let headers_length = u32::from_le_bytes( - value - .get(37..41) - .ok_or_else(|| { - ServerError::InvalidMessageFieldFormatConversionSampling( - "Invalid headers_length bytes for message snapshot".to_owned(), - ) - })? - .try_into()?, - ); - let headers = match headers_length { - 0 => None, - _ => { - let headers_payload = &value[41..(41 + headers_length as usize)]; - let headers = HashMap::from_bytes(Bytes::copy_from_slice(headers_payload)) - .map_err(|_| { - ServerError::InvalidMessageFieldFormatConversionSampling( - "Invalid headers bytes for message snapshot".to_owned(), - ) - })?; - Some(headers) - } - }; - - let position = 41 + headers_length as usize; - let payload_length = u32::from_le_bytes( - value - .get(position..(position + 4)) - .ok_or_else(|| { - ServerError::InvalidMessageFieldFormatConversionSampling( - "Invalid payload bytes for message snapshot".to_owned(), - ) - })? - .try_into()?, - ); - let payload = - Bytes::copy_from_slice(&value[position + 4..position + 4 + payload_length as usize]); - - Ok(MessageSnapshot { - offset, - state, - timestamp, - id, - payload, - checksum, - headers, - }) - } -} diff --git a/server/src/compat/message_conversion/snapshots/mod.rs b/server/src/compat/message_conversion/snapshots/mod.rs deleted file mode 100644 index e3a3bf69c..000000000 --- a/server/src/compat/message_conversion/snapshots/mod.rs +++ /dev/null @@ -1,2 +0,0 @@ -pub(crate) mod message_snapshot; -pub(crate) mod retained_batch_snapshot; diff --git a/server/src/compat/message_conversion/snapshots/retained_batch_snapshot.rs b/server/src/compat/message_conversion/snapshots/retained_batch_snapshot.rs deleted file mode 100644 index f73d98638..000000000 --- a/server/src/compat/message_conversion/snapshots/retained_batch_snapshot.rs +++ /dev/null @@ -1,140 +0,0 @@ -use super::message_snapshot::MessageSnapshot; -use crate::compat::message_conversion::message_converter::Extendable; -use crate::server_error::ServerError; -use crate::streaming::sizeable::Sizeable; -use bytes::{BufMut, Bytes, BytesMut}; -use iggy::error::IggyError; - -pub struct RetainedMessageBatchSnapshot { - pub base_offset: u64, - pub last_offset_delta: u32, - pub max_timestamp: u64, - pub length: u32, - pub bytes: Bytes, -} - -impl RetainedMessageBatchSnapshot { - pub fn new( - base_offset: u64, - last_offset_delta: u32, - max_timestamp: u64, - length: u32, - bytes: Bytes, - ) -> RetainedMessageBatchSnapshot { - RetainedMessageBatchSnapshot { - base_offset, - last_offset_delta, - max_timestamp, - length, - bytes, - } - } - - pub fn get_last_offset(&self) -> u64 { - self.base_offset + self.last_offset_delta as u64 - } - - pub fn try_from_messages( - messages: Vec, - ) -> Result { - let first_message = messages.first().unwrap(); - let last_message = messages.last().unwrap(); - let base_offset = first_message.offset; - let last_offset_delta = last_message.offset - base_offset; - let max_timestamp = last_message.timestamp; - - let size = messages - .iter() - .map(|msg| msg.get_size_bytes() as usize) - .sum(); - let mut bytes = BytesMut::with_capacity(size); - for message in messages.iter() { - message.extend(&mut bytes); - } - Ok(RetainedMessageBatchSnapshot::new( - base_offset, - last_offset_delta as u32, - max_timestamp, - bytes.len() as u32, - bytes.freeze(), - )) - } -} -impl Sizeable for RetainedMessageBatchSnapshot { - fn get_size_bytes(&self) -> u32 { - 24 + self.bytes.len() as u32 - } -} - -impl Extendable for RetainedMessageBatchSnapshot { - fn extend(&self, bytes: &mut BytesMut) { - bytes.put_u64_le(self.base_offset); - bytes.put_u32_le(self.length); - bytes.put_u32_le(self.last_offset_delta); - bytes.put_u64_le(self.max_timestamp); - bytes.put_slice(&self.bytes); - } -} - -impl TryFrom for RetainedMessageBatchSnapshot { - type Error = ServerError; - fn try_from(value: Bytes) -> Result { - let base_offset = u64::from_le_bytes( - value - .get(0..8) - .ok_or_else(|| { - ServerError::CannotReadMessageBatchFormatConversion( - "Failed to read batch base offset".to_owned(), - ) - })? - .try_into()?, - ); - let length = u32::from_le_bytes( - value - .get(8..12) - .ok_or_else(|| { - ServerError::CannotReadMessageBatchFormatConversion( - "Failed to read batch length".to_owned(), - ) - })? - .try_into()?, - ); - let last_offset_delta = u32::from_le_bytes( - value - .get(12..16) - .ok_or_else(|| { - ServerError::CannotReadMessageBatchFormatConversion( - "Failed to read batch last_offset_delta".to_owned(), - ) - })? - .try_into()?, - ); - let max_timestamp = u64::from_le_bytes( - value - .get(16..24) - .ok_or_else(|| { - ServerError::CannotReadMessageBatchFormatConversion( - "Failed to read batch max_timestamp".to_owned(), - ) - })? - .try_into()?, - ); - let bytes = Bytes::from( - value - .get(24..length as usize) - .ok_or_else(|| { - ServerError::CannotReadMessageBatchFormatConversion( - "Failed to read batch payload".to_owned(), - ) - })? - .to_owned(), - ); - Ok(RetainedMessageBatchSnapshot { - base_offset, - last_offset_delta, - max_timestamp, - length, - bytes, - }) - } -} diff --git a/server/src/compat/message_conversion/streams/mod.rs b/server/src/compat/message_conversion/streams/mod.rs deleted file mode 100644 index eb6dc297c..000000000 --- a/server/src/compat/message_conversion/streams/mod.rs +++ /dev/null @@ -1,2 +0,0 @@ -pub(crate) mod retained_batch; -pub(crate) mod retained_message; diff --git a/server/src/compat/message_conversion/streams/retained_batch.rs b/server/src/compat/message_conversion/streams/retained_batch.rs deleted file mode 100644 index a01f52c6c..000000000 --- a/server/src/compat/message_conversion/streams/retained_batch.rs +++ /dev/null @@ -1,20 +0,0 @@ -use tokio::fs::File; -use tokio::io::BufWriter; - -const BUF_WRITER_CAPACITY_BYTES: usize = 512 * 1000; - -pub struct RetainedBatchWriter { - pub log_writer: BufWriter, - pub index_writer: BufWriter, - pub time_index_writer: BufWriter, -} - -impl RetainedBatchWriter { - pub fn init(log_file: File, index_file: File, time_index_file: File) -> Self { - RetainedBatchWriter { - log_writer: BufWriter::with_capacity(BUF_WRITER_CAPACITY_BYTES, log_file), - index_writer: BufWriter::with_capacity(BUF_WRITER_CAPACITY_BYTES, index_file), - time_index_writer: BufWriter::with_capacity(BUF_WRITER_CAPACITY_BYTES, time_index_file), - } - } -} diff --git a/server/src/compat/message_conversion/streams/retained_message.rs b/server/src/compat/message_conversion/streams/retained_message.rs deleted file mode 100644 index e1d4aa5ba..000000000 --- a/server/src/compat/message_conversion/streams/retained_message.rs +++ /dev/null @@ -1,82 +0,0 @@ -use crate::compat::message_conversion::message_stream::MessageStream; -use crate::compat::message_conversion::snapshots::message_snapshot::MessageSnapshot; - -use async_stream::try_stream; -use bytes::{BufMut, BytesMut}; -use futures::Stream; -use iggy::bytes_serializable::BytesSerializable; -use iggy::error::IggyError; -use iggy::models::messages::MessageState; -use std::collections::HashMap; -use tokio::fs::File; -use tokio::io::{AsyncReadExt, BufReader}; - -const BUF_READER_CAPACITY_BYTES: usize = 512 * 1000; - -pub struct RetainedMessageStream { - pub reader: BufReader, - read_length: u64, - read_bytes: u64, -} -impl RetainedMessageStream { - pub fn new(file: File, read_length: u64) -> RetainedMessageStream { - RetainedMessageStream { - reader: BufReader::with_capacity(BUF_READER_CAPACITY_BYTES, file), - read_bytes: 0, - read_length, - } - } -} - -impl MessageStream for RetainedMessageStream { - type Item = Result; - - fn into_stream(mut self) -> impl Stream { - try_stream! { - while self.read_bytes < self.read_length { - let offset = self.reader.read_u64_le().await?; - self.read_bytes += 8; - - let state = self.reader.read_u8().await?; - self.read_bytes += 1; - - let state = MessageState::from_code(state)?; - let timestamp = self.reader.read_u64_le().await?; - self.read_bytes += 8; - - let id = self.reader.read_u128_le().await?; - self.read_bytes += 16; - - let checksum = self.reader.read_u32_le().await?; - self.read_bytes += 4; - - let headers_length = self.reader.read_u32_le().await?; - self.read_bytes += 4; - - let headers = match headers_length { - 0 => None, - _ => { - let mut headers_payload = BytesMut::with_capacity(headers_length as usize); - headers_payload.put_bytes(0, headers_length as usize); - self.reader.read_exact(&mut headers_payload).await?; - - let headers = HashMap::from_bytes(headers_payload.freeze())?; - Some(headers) - } - }; - self.read_bytes += headers_length as u64; - - let payload_len = self.reader.read_u32_le().await?; - - let mut payload = BytesMut::with_capacity(payload_len as usize); - payload.put_bytes(0, payload_len as usize); - self.reader.read_exact(&mut payload).await?; - self.read_bytes += 4 + payload_len as u64; - - let message = - MessageSnapshot::new(offset, state, timestamp, id, payload.freeze(), checksum, headers); - yield message; - } - } - } -} diff --git a/server/src/compat/mod.rs b/server/src/compat/mod.rs index 9211d2ce9..b8badb778 100644 --- a/server/src/compat/mod.rs +++ b/server/src/compat/mod.rs @@ -1,2 +1,2 @@ -pub mod message_conversion; +pub mod index_conversion; pub mod storage_conversion; diff --git a/server/src/compat/storage_conversion/mod.rs b/server/src/compat/storage_conversion/mod.rs index 0663e7058..2a4e8d9dc 100644 --- a/server/src/compat/storage_conversion/mod.rs +++ b/server/src/compat/storage_conversion/mod.rs @@ -10,7 +10,6 @@ use crate::streaming::partitions::partition::{ConsumerOffset, Partition}; use crate::streaming::persistence::persister::Persister; use crate::streaming::segments::index::{Index, IndexRange}; use crate::streaming::segments::segment::Segment; -use crate::streaming::segments::time_index::TimeIndex; use crate::streaming::storage::{ PartitionStorage, SegmentStorage, StreamStorage, SystemInfoStorage, SystemStorage, TopicStorage, }; @@ -264,26 +263,11 @@ impl SegmentStorage for NoopSegmentStorage { Ok(()) } - async fn try_load_time_index_for_timestamp( + async fn try_load_index_for_timestamp( &self, _segment: &Segment, _timestamp: u64, - ) -> Result, IggyError> { + ) -> Result, IggyError> { Ok(None) } - - async fn load_all_time_indexes(&self, _segment: &Segment) -> Result, IggyError> { - Ok(vec![]) - } - - async fn load_last_time_index( - &self, - _segment: &Segment, - ) -> Result, IggyError> { - Ok(None) - } - - async fn save_time_index(&self, _index_path: &str, _index: TimeIndex) -> Result<(), IggyError> { - Ok(()) - } } diff --git a/server/src/configs/defaults.rs b/server/src/configs/defaults.rs index 688bd3d3e..43728435b 100644 --- a/server/src/configs/defaults.rs +++ b/server/src/configs/defaults.rs @@ -414,7 +414,6 @@ impl Default for SegmentConfig { SegmentConfig { size: SERVER_CONFIG.system.segment.size.parse().unwrap(), cache_indexes: SERVER_CONFIG.system.segment.cache_indexes, - cache_time_indexes: SERVER_CONFIG.system.segment.cache_time_indexes, message_expiry: SERVER_CONFIG.system.segment.message_expiry.parse().unwrap(), archive_expired: SERVER_CONFIG.system.segment.archive_expired, } diff --git a/server/src/configs/displays.rs b/server/src/configs/displays.rs index 18818973d..bc4094705 100644 --- a/server/src/configs/displays.rs +++ b/server/src/configs/displays.rs @@ -267,8 +267,8 @@ impl Display for SegmentConfig { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { write!( f, - "{{ size_bytes: {}, cache_indexes: {}, cache_time_indexes: {}, message_expiry: {}, archive_expired: {} }}", - self.size, self.cache_indexes, self.cache_time_indexes, self.message_expiry, self.archive_expired + "{{ size_bytes: {}, cache_indexes: {}, message_expiry: {}, archive_expired: {} }}", + self.size, self.cache_indexes, self.message_expiry, self.archive_expired ) } } diff --git a/server/src/configs/system.rs b/server/src/configs/system.rs index 7455db3c3..3ebcf3f3c 100644 --- a/server/src/configs/system.rs +++ b/server/src/configs/system.rs @@ -120,7 +120,6 @@ pub struct RecoveryConfig { pub struct SegmentConfig { pub size: IggyByteSize, pub cache_indexes: bool, - pub cache_time_indexes: bool, #[serde_as(as = "DisplayFromStr")] pub message_expiry: IggyExpiry, pub archive_expired: bool, diff --git a/server/src/server_error.rs b/server/src/server_error.rs index e1336eeec..45e0d22a0 100644 --- a/server/src/server_error.rs +++ b/server/src/server_error.rs @@ -33,18 +33,6 @@ pub enum ServerError { CacheConfigValidationFailure(String), #[error("Command length error: {0}")] CommandLengthError(String), - #[error("Cannot read message, when performing format conversion, {0}")] - InvalidMessageFieldFormatConversionSampling(String), - #[error("Invalid message offset, when performing format conversion")] - InvalidMessageOffsetFormatConversion, - #[error("Invalid batch base offset, when performing format conversion")] - InvalidBatchBaseOffsetFormatConversion, - #[error("Cannot read message batch, when performing format conversion, {0}")] - CannotReadMessageBatchFormatConversion(String), - #[error("Cannot remove old segment files")] - CannotRemoveOldSegmentFiles, - #[error("Cannot persist new segment files")] - CannotPersistNewSegmentFiles, #[error("Cannot archive file: {0}")] CannotArchiveFile(String), #[error("Cannot initialize S3 archiver")] @@ -53,4 +41,6 @@ pub enum ServerError { InvalidS3Credentials, #[error("File to archive not found: {0}")] FileToArchiveNotFound(String), + #[error("Index migration error: {0}")] + IndexMigrationError(String), } diff --git a/server/src/streaming/partitions/messages.rs b/server/src/streaming/partitions/messages.rs index bb4ff6a01..42c5e011d 100644 --- a/server/src/streaming/partitions/messages.rs +++ b/server/src/streaming/partitions/messages.rs @@ -37,38 +37,35 @@ impl Partition { let mut found_index = None; let mut start_offset = self.segments.last().unwrap().start_offset; // Since index cache is configurable globally, not per segment we can handle it this way. - let cache_time_index = self.config.segment.cache_time_indexes; - if !cache_time_index { + if !self.config.segment.cache_indexes { for segment in self.segments.iter() { - let time_index = segment + let index = segment .storage .as_ref() .segment - .try_load_time_index_for_timestamp(segment, timestamp) + .try_load_index_for_timestamp(segment, timestamp) .await?; - if time_index.is_none() { + if index.is_none() { continue; } else { - found_index = time_index; - start_offset = - segment.start_offset + found_index.unwrap().relative_offset as u64; + found_index = index; + start_offset = segment.start_offset + found_index.unwrap().offset as u64; break; } } } else { start_offset = self.segments.first().unwrap().start_offset; for segment in self.segments.iter().rev() { - let time_indexes = segment.time_indexes.as_ref().unwrap(); - let time_index = time_indexes + let indexes = segment.indexes.as_ref().unwrap(); + let index = indexes .iter() .rposition(|time_index| time_index.timestamp <= timestamp) - .map(|idx| time_indexes[idx]); - if time_index.is_none() { + .map(|idx| indexes[idx]); + if index.is_none() { continue; } else { - found_index = time_index; - start_offset = - segment.start_offset + found_index.unwrap().relative_offset as u64; + found_index = index; + start_offset = segment.start_offset + found_index.unwrap().offset as u64; break; } } diff --git a/server/src/streaming/partitions/storage.rs b/server/src/streaming/partitions/storage.rs index 692a4f299..bfadfb821 100644 --- a/server/src/streaming/partitions/storage.rs +++ b/server/src/streaming/partitions/storage.rs @@ -1,9 +1,9 @@ -use crate::compat::message_conversion::message_converter::MessageFormatConverter; +use crate::compat::index_conversion::index_converter::IndexConverter; use crate::state::system::PartitionState; use crate::streaming::batching::batch_accumulator::BatchAccumulator; use crate::streaming::partitions::partition::{ConsumerOffset, Partition}; use crate::streaming::persistence::persister::Persister; -use crate::streaming::segments::segment::{Segment, LOG_EXTENSION}; +use crate::streaming::segments::segment::{Segment, INDEX_EXTENSION, LOG_EXTENSION}; use crate::streaming::storage::PartitionStorage; use crate::streaming::utils::file; use anyhow::Context; @@ -53,16 +53,15 @@ impl PartitionStorage for FilePartitionStorage { let mut dir_entries = dir_entries.unwrap(); while let Some(dir_entry) = dir_entries.next_entry().await.unwrap_or(None) { - let metadata = dir_entry.metadata().await.unwrap(); - if metadata.is_dir() { - continue; - } - let path = dir_entry.path(); let extension = path.extension(); if extension.is_none() || extension.unwrap() != LOG_EXTENSION { continue; } + let metadata = dir_entry.metadata().await.unwrap(); + if metadata.is_dir() { + continue; + } let log_file_name = dir_entry .file_name() @@ -87,37 +86,24 @@ impl PartitionStorage for FilePartitionStorage { partition.messages_count.clone(), ); - let log_path = segment.log_path.to_owned(); let index_path = segment.index_path.to_owned(); - let message_format_converter = - MessageFormatConverter::init(start_offset, log_path, index_path); - - info!("Attempting to detect changes in binary schema for partition with ID: {} and segment with start offset: {}", partition.partition_id, start_offset); - let samplers_count = message_format_converter.samplers.len(); - // Check if partition has any segments - for (idx, sampler) in message_format_converter.samplers.iter().enumerate() { - trace!("Trying to sample the message format for partition with ID: {} and segment with start offset: {}", partition.partition_id, start_offset); - match sampler.try_sample().await { - Ok(schema) if idx == 0 => { - // Found message in the newest format, no conversion needed - trace!("Detected up to date binary schema: {:?}, for partition with ID: {} and segment with start offset: {}", schema, partition.partition_id, start_offset); - break; - } - Ok(schema) => { - // Found old format, need to convert it - info!("Detected changes in binary schema for partition with ID: {} and segment with start offset: {}", partition.partition_id, start_offset); - segment.convert_segment_from_schema(schema).await?; + let time_index_path = index_path.replace(INDEX_EXTENSION, "timeindex"); + + let index_converter = IndexConverter::new(index_path, time_index_path); + if let Ok(true) = index_converter.needs_migration().await { + match index_converter.migrate().await { + Ok(_) => { + info!( + "Migrated indexes for partition with ID: {} for stream with ID: {} and topic with ID: {}.", + partition.partition_id, partition.stream_id, partition.topic_id + ); } - Err(err) if idx + 1 == samplers_count => { - // Didn't find any message format, return an error - return Err(IggyError::CannotLoadResource(anyhow::anyhow!(err) - .context(format!( - "Failed to find a valid message format, when trying to perform a conversion for partition with ID: {} and segment with start offset: {}.", - partition.partition_id, - start_offset - )))); + Err(err) => { + error!( + "Failed to migrate indexes for partition with ID: {} for stream with ID: {} and topic with ID: {}. Error: {}", + partition.partition_id, partition.stream_id, partition.topic_id, err + ); } - _ => {} } } diff --git a/server/src/streaming/segments/index.rs b/server/src/streaming/segments/index.rs index 2c8cd0e20..242ec6e49 100644 --- a/server/src/streaming/segments/index.rs +++ b/server/src/streaming/segments/index.rs @@ -4,13 +4,14 @@ use iggy::error::IggyError::InvalidOffset; #[derive(Debug, Eq, Clone, Copy, Default)] pub struct Index { - pub relative_offset: u32, + pub offset: u32, pub position: u32, + pub timestamp: u64, } impl PartialEq for Index { fn eq(&self, other: &Self) -> bool { - self.relative_offset == other.relative_offset + self.offset == other.offset } } @@ -19,6 +20,7 @@ pub struct IndexRange { pub start: Index, pub end: Index, } + impl Segment { pub fn load_highest_lower_bound_index( &self, @@ -42,8 +44,9 @@ impl Segment { } } } + fn binary_search_index(indices: &[Index], offset: u32) -> Option { - match indices.binary_search_by(|index| index.relative_offset.cmp(&offset)) { + match indices.binary_search_by(|index| index.offset.cmp(&offset)) { Ok(index) => Some(index), Err(index) => { if index < indices.len() { @@ -54,16 +57,19 @@ fn binary_search_index(indices: &[Index], offset: u32) -> Option { } } } + impl IndexRange { pub fn max_range() -> Self { Self { start: Index { - relative_offset: 0, + offset: 0, position: 0, + timestamp: 0, }, end: Index { - relative_offset: u32::MAX - 1, + offset: u32::MAX - 1, position: u32::MAX, + timestamp: u64::MAX, }, } } @@ -112,24 +118,29 @@ mod tests { fn create_test_indices(segment: &mut Segment) { let indexes = vec![ Index { - relative_offset: 5, + offset: 5, position: 0, + timestamp: 1000, }, Index { - relative_offset: 20, + offset: 20, position: 100, + timestamp: 2000, }, Index { - relative_offset: 35, + offset: 35, position: 200, + timestamp: 3000, }, Index { - relative_offset: 50, + offset: 50, position: 300, + timestamp: 4000, }, Index { - relative_offset: 65, + offset: 65, position: 400, + timestamp: 5000, }, ]; segment.indexes.as_mut().unwrap().extend(indexes); @@ -143,8 +154,8 @@ mod tests { .load_highest_lower_bound_index(segment.indexes.as_ref().unwrap(), 15, 45) .unwrap(); - assert_eq!(result.start.relative_offset, 20); - assert_eq!(result.end.relative_offset, 50); + assert_eq!(result.start.offset, 20); + assert_eq!(result.end.offset, 50); } #[test] @@ -155,14 +166,14 @@ mod tests { .load_highest_lower_bound_index(segment.indexes.as_ref().unwrap(), 65, 100) .unwrap(); - assert_eq!(result_end_range.start.relative_offset, 65); - assert_eq!(result_end_range.end.relative_offset, 65); + assert_eq!(result_end_range.start.offset, 65); + assert_eq!(result_end_range.end.offset, 65); let result_start_range = segment .load_highest_lower_bound_index(segment.indexes.as_ref().unwrap(), 0, 5) .unwrap(); - assert_eq!(result_start_range.start.relative_offset, 5); - assert_eq!(result_start_range.end.relative_offset, 5); + assert_eq!(result_start_range.start.offset, 5); + assert_eq!(result_start_range.end.offset, 5); } #[test] @@ -173,8 +184,8 @@ mod tests { .load_highest_lower_bound_index(segment.indexes.as_ref().unwrap(), 5, 100) .unwrap(); - assert_eq!(result.start.relative_offset, 5); - assert_eq!(result.end.relative_offset, 65); + assert_eq!(result.start.offset, 5); + assert_eq!(result.end.offset, 65); } #[test] diff --git a/server/src/streaming/segments/messages.rs b/server/src/streaming/segments/messages.rs index 186795430..e519b77c8 100644 --- a/server/src/streaming/segments/messages.rs +++ b/server/src/streaming/segments/messages.rs @@ -4,7 +4,6 @@ use crate::streaming::batching::message_batch::{RetainedMessageBatch, RETAINED_B use crate::streaming::models::messages::RetainedMessage; use crate::streaming::segments::index::{Index, IndexRange}; use crate::streaming::segments::segment::Segment; -use crate::streaming::segments::time_index::TimeIndex; use crate::streaming::sizeable::Sizeable; use iggy::error::IggyError; use std::sync::atomic::Ordering; @@ -225,31 +224,16 @@ impl Segment { &mut self, batch_last_offset: u64, batch_max_timestamp: u64, - ) -> (Index, TimeIndex) { + ) -> Index { let relative_offset = (batch_last_offset - self.start_offset) as u32; trace!("Storing index for relative_offset: {relative_offset}"); let index = Index { - relative_offset, + offset: relative_offset, position: self.last_index_position, - }; - let time_index = TimeIndex { - relative_offset, timestamp: batch_max_timestamp, }; - match (&mut self.indexes, &mut self.time_indexes) { - (Some(indexes), Some(time_indexes)) => { - indexes.push(index); - time_indexes.push(time_index); - } - (Some(indexes), None) => { - indexes.push(index); - } - (None, Some(time_indexes)) => { - time_indexes.push(time_index); - } - (None, None) => {} - }; - (index, time_index) + self.indexes.as_mut().unwrap().push(index); + index } pub async fn persist_messages(&mut self) -> Result { @@ -264,7 +248,7 @@ impl Segment { } let batch_max_offset = batch_accumulator.batch_max_offset(); let batch_max_timestamp = batch_accumulator.batch_max_timestamp(); - let (index, time_index) = + let index = self.store_offset_and_timestamp_index_for_batch(batch_max_offset, batch_max_timestamp); let unsaved_messages_number = batch_accumulator.unsaved_messages_count(); @@ -282,9 +266,6 @@ impl Segment { } let saved_bytes = storage.save_batches(self, batch).await?; storage.save_index(&self.index_path, index).await?; - storage - .save_time_index(&self.time_index_path, time_index) - .await?; self.last_index_position += batch_size; self.size_bytes += RETAINED_BATCH_OVERHEAD; self.size_of_parent_stream diff --git a/server/src/streaming/segments/mod.rs b/server/src/streaming/segments/mod.rs index 15c9cc213..1e8a04b31 100644 --- a/server/src/streaming/segments/mod.rs +++ b/server/src/streaming/segments/mod.rs @@ -3,4 +3,3 @@ pub mod messages; pub mod persistence; pub mod segment; pub mod storage; -pub mod time_index; diff --git a/server/src/streaming/segments/segment.rs b/server/src/streaming/segments/segment.rs index 600240634..bcdddd456 100644 --- a/server/src/streaming/segments/segment.rs +++ b/server/src/streaming/segments/segment.rs @@ -1,32 +1,14 @@ -use crate::compat::message_conversion::chunks_error::IntoTryChunksError; -use crate::compat::message_conversion::conversion_writer::ConversionWriter; -use crate::compat::message_conversion::message_converter::MessageFormatConverterPersister; -use crate::compat::message_conversion::message_stream::MessageStream; -use crate::compat::message_conversion::snapshots::retained_batch_snapshot::RetainedMessageBatchSnapshot; -use crate::compat::message_conversion::streams::retained_batch::RetainedBatchWriter; -use crate::compat::message_conversion::streams::retained_message::RetainedMessageStream; use crate::configs::system::SystemConfig; +use crate::streaming::batching::batch_accumulator::BatchAccumulator; use crate::streaming::segments::index::Index; -use crate::streaming::segments::time_index::TimeIndex; -use crate::streaming::sizeable::Sizeable; use crate::streaming::storage::SystemStorage; -use crate::streaming::utils::file; -use crate::{ - compat::message_conversion::binary_schema::BinarySchema, - streaming::batching::batch_accumulator::BatchAccumulator, -}; -use futures::{pin_mut, TryStreamExt}; -use iggy::error::IggyError; use iggy::utils::expiry::IggyExpiry; use iggy::utils::timestamp::IggyTimestamp; use std::sync::atomic::AtomicU64; use std::sync::Arc; -use tokio::io::AsyncWriteExt; -use tracing::{info, trace}; pub const LOG_EXTENSION: &str = "log"; pub const INDEX_EXTENSION: &str = "index"; -pub const TIME_INDEX_EXTENSION: &str = "timeindex"; pub const MAX_SIZE_BYTES: u32 = 1000 * 1000 * 1000; #[derive(Debug)] @@ -39,7 +21,6 @@ pub struct Segment { pub current_offset: u64, pub index_path: String, pub log_path: String, - pub time_index_path: String, pub size_bytes: u32, pub last_index_position: u32, pub max_size_bytes: u32, @@ -54,7 +35,6 @@ pub struct Segment { pub(crate) unsaved_messages: Option, pub(crate) config: Arc, pub(crate) indexes: Option>, - pub(crate) time_indexes: Option>, pub(crate) storage: Arc, } @@ -86,7 +66,6 @@ impl Segment { current_offset: start_offset, log_path: Self::get_log_path(&path), index_path: Self::get_index_path(&path), - time_index_path: Self::get_time_index_path(&path), size_bytes: 0, last_index_position: 0, max_size_bytes: config.segment.size.as_bytes_u64() as u32, @@ -98,10 +77,6 @@ impl Segment { true => Some(Vec::new()), false => None, }, - time_indexes: match config.segment.cache_time_indexes { - true => Some(Vec::new()), - false => None, - }, unsaved_messages: None, is_closed: false, size_of_parent_stream, @@ -156,82 +131,6 @@ impl Segment { fn get_index_path(path: &str) -> String { format!("{}.{}", path, INDEX_EXTENSION) } - - fn get_time_index_path(path: &str) -> String { - format!("{}.{}", path, TIME_INDEX_EXTENSION) - } - - pub async fn convert_segment_from_schema(&self, schema: BinarySchema) -> Result<(), IggyError> { - let log_path = self.log_path.as_str(); - let index_path = self.index_path.as_str(); - let time_index_path = self.time_index_path.as_str(); - - match schema { - BinarySchema::RetainedMessageSchema => { - let file = file::open(&self.log_path).await?; - let file_size = file.metadata().await?.len(); - if file_size == 0 { - return Ok(()); - } - - let compat_backup_path = self.config.get_compatibility_backup_path(); - let conversion_writer = ConversionWriter::init( - log_path, - index_path, - time_index_path, - &compat_backup_path, - ); - conversion_writer.create_alt_directories().await?; - let retained_batch_writer = RetainedBatchWriter::init( - file::append(&conversion_writer.alt_log_path).await?, - file::append(&conversion_writer.alt_index_path).await?, - file::append(&conversion_writer.alt_time_index_path).await?, - ); - - let stream = RetainedMessageStream::new(file, file_size).into_stream(); - pin_mut!(stream); - let (_, mut retained_batch_writer) = stream - .try_chunks(1000) - .try_fold((0u32, retained_batch_writer), |(position, mut retained_batch_writer), messages| async move { - let batch = RetainedMessageBatchSnapshot::try_from_messages(messages) - .map_err(|err| err.into_try_chunks_error())?; - let size = batch.get_size_bytes(); - info!("Converted messages with start offset: {} and end offset: {}, with binary schema: {:?} to newest schema", - batch.base_offset, batch.get_last_offset(), schema); - - batch - .persist(&mut retained_batch_writer.log_writer) - .await - .map_err(|err| err.into_try_chunks_error())?; - trace!("Persisted message batch with new format to log file, saved {} bytes", size); - let relative_offset = (batch.get_last_offset() - self.start_offset) as u32; - batch - .persist_index(position, relative_offset, &mut retained_batch_writer.index_writer) - .await - .map_err(|err| err.into_try_chunks_error())?; - trace!("Persisted index with offset: {} and position: {} to index file", relative_offset, position); - batch - .persist_time_index(batch.max_timestamp, relative_offset, &mut retained_batch_writer.time_index_writer) - .await - .map_err(|err| err.into_try_chunks_error())?; - trace!("Persisted time index with offset: {} to time index file", relative_offset); - let position = position + size; - - Ok((position, retained_batch_writer)) - }) - .await - .map_err(|err| err.1)?; // For now - retained_batch_writer.log_writer.flush().await?; - retained_batch_writer.index_writer.flush().await?; - retained_batch_writer.time_index_writer.flush().await?; - - conversion_writer.create_old_segment_backup().await?; - conversion_writer.replace_with_converted().await?; - Ok(()) - } - BinarySchema::RetainedMessageBatchSchema => Ok(()), - } - } } #[cfg(test)] @@ -252,7 +151,6 @@ mod tests { let path = config.get_segment_path(stream_id, topic_id, partition_id, start_offset); let log_path = Segment::get_log_path(&path); let index_path = Segment::get_index_path(&path); - let time_index_path = Segment::get_time_index_path(&path); let message_expiry = IggyExpiry::ExpireDuration(IggyDuration::from(10)); let size_of_parent_stream = Arc::new(AtomicU64::new(0)); let size_of_parent_topic = Arc::new(AtomicU64::new(0)); @@ -286,11 +184,9 @@ mod tests { assert_eq!(segment.size_bytes, 0); assert_eq!(segment.log_path, log_path); assert_eq!(segment.index_path, index_path); - assert_eq!(segment.time_index_path, time_index_path); assert_eq!(segment.message_expiry, message_expiry); assert!(segment.unsaved_messages.is_none()); assert!(segment.indexes.is_some()); - assert!(segment.time_indexes.is_some()); assert!(!segment.is_closed); assert!(!segment.is_full().await); } @@ -335,44 +231,4 @@ mod tests { assert!(segment.indexes.is_none()); } - - #[test] - fn should_not_initialize_time_indexes_cache_when_disabled() { - let storage = Arc::new(get_test_system_storage()); - let stream_id = 1; - let topic_id = 2; - let partition_id = 3; - let start_offset = 0; - let config = Arc::new(SystemConfig { - segment: SegmentConfig { - cache_time_indexes: false, - ..Default::default() - }, - ..Default::default() - }); - let message_expiry = IggyExpiry::NeverExpire; - let size_of_parent_stream = Arc::new(AtomicU64::new(0)); - let size_of_parent_topic = Arc::new(AtomicU64::new(0)); - let size_of_parent_partition = Arc::new(AtomicU64::new(0)); - let messages_count_of_parent_stream = Arc::new(AtomicU64::new(0)); - let messages_count_of_parent_topic = Arc::new(AtomicU64::new(0)); - let messages_count_of_parent_partition = Arc::new(AtomicU64::new(0)); - - let segment = Segment::create( - stream_id, - topic_id, - partition_id, - start_offset, - config, - storage, - message_expiry, - size_of_parent_stream, - size_of_parent_topic, - size_of_parent_partition, - messages_count_of_parent_stream, - messages_count_of_parent_topic, - messages_count_of_parent_partition, - ); - assert!(segment.time_indexes.is_none()); - } } diff --git a/server/src/streaming/segments/storage.rs b/server/src/streaming/segments/storage.rs index bb14f1bd5..3cfd44e1b 100644 --- a/server/src/streaming/segments/storage.rs +++ b/server/src/streaming/segments/storage.rs @@ -4,7 +4,6 @@ use crate::streaming::models::messages::RetainedMessage; use crate::streaming::persistence::persister::Persister; use crate::streaming::segments::index::{Index, IndexRange}; use crate::streaming::segments::segment::Segment; -use crate::streaming::segments::time_index::TimeIndex; use crate::streaming::sizeable::Sizeable; use crate::streaming::storage::SegmentStorage; use crate::streaming::utils::file; @@ -23,9 +22,7 @@ use tokio::io::{AsyncReadExt, AsyncSeekExt, BufReader}; use tracing::{error, info, trace, warn}; const EMPTY_INDEXES: Vec = vec![]; -const EMPTY_TIME_INDEXES: Vec = vec![]; -pub(crate) const INDEX_SIZE: u32 = 8; -pub(crate) const TIME_INDEX_SIZE: u32 = 12; +pub const INDEX_SIZE: u32 = 16; // offset: 4 bytes, position: 4 bytes, timestamp: 8 bytes const BUF_READER_CAPACITY_BYTES: usize = 512 * 1000; #[derive(Debug)] @@ -56,6 +53,13 @@ impl SegmentStorage for FileSegmentStorage { if segment.config.segment.cache_indexes { segment.indexes = Some(segment.storage.segment.load_all_indexes(segment).await?); + let last_index_offset = if segment.indexes.as_ref().unwrap().is_empty() { + 0_u64 + } else { + segment.indexes.as_ref().unwrap().last().unwrap().offset as u64 + }; + + segment.current_offset = segment.start_offset + last_index_offset; info!( "Loaded {} indexes for segment with start offset: {} and partition with ID: {} for topic with ID: {} and stream with ID: {}.", segment.indexes.as_ref().unwrap().len(), @@ -66,36 +70,6 @@ impl SegmentStorage for FileSegmentStorage { ); } - if segment.config.segment.cache_time_indexes { - let time_indexes = self.load_all_time_indexes(segment).await?; - if !time_indexes.is_empty() { - let last_index = time_indexes.last().unwrap(); - segment.current_offset = segment.start_offset + last_index.relative_offset as u64; - segment.time_indexes = Some(time_indexes); - } - - info!( - "Loaded {} time indexes for segment with start offset: {} and partition with ID: {} for topic with ID: {} and stream with ID: {}.", - segment.time_indexes.as_ref().unwrap().len(), - segment.start_offset, - segment.partition_id, - segment.topic_id, - segment.stream_id - ); - } else { - let last_time_index = self.load_last_time_index(segment).await?; - if let Some(last_index) = last_time_index { - segment.current_offset = segment.start_offset + last_index.relative_offset as u64; - info!( - "Loaded last time index for segment with start offset: {} and partition with ID: {} for topic with ID: {} and stream with ID: {}.", - segment.start_offset, - segment.partition_id, - segment.topic_id, - segment.stream_id - ); - } - } - if segment.is_full().await { segment.is_closed = true; } @@ -103,7 +77,7 @@ impl SegmentStorage for FileSegmentStorage { let messages_count = segment.get_messages_count(); info!( - "Loaded segment log file of size {} for start offset {}, current offset: {}, and partition with ID: {} for topic with ID: {} and stream with ID: {}.", + "Loaded segment with log file of size {} ({messages_count} messages) for start offset {}, current offset: {}, and partition with ID: {} for topic with ID: {} and stream with ID: {}.", IggyByteSize::from(file_size), segment.start_offset, segment.current_offset, segment.partition_id, segment.topic_id, segment.stream_id ); @@ -144,18 +118,6 @@ impl SegmentStorage for FileSegmentStorage { )); } - if !Path::new(&segment.time_index_path).exists() - && self - .persister - .overwrite(&segment.time_index_path, &[]) - .await - .is_err() - { - return Err(IggyError::CannotCreateSegmentTimeIndexFile( - segment.time_index_path.clone(), - )); - } - if !Path::new(&segment.index_path).exists() && self .persister @@ -183,7 +145,6 @@ impl SegmentStorage for FileSegmentStorage { ); self.persister.delete(&segment.log_path).await?; self.persister.delete(&segment.index_path).await?; - self.persister.delete(&segment.time_index_path).await?; segment .size_of_parent_stream .fetch_sub(segment.size_bytes as u64, Ordering::SeqCst); @@ -331,9 +292,16 @@ impl SegmentStorage for FileSegmentStorage { offset, &error ) })?; + let timestamp = reader.read_u64_le().await.inspect_err(|error| { + error!( + "Cannot read timestamp from index file for offset: {}. Error: {}", + offset, &error + ) + })?; indexes.push(Index { - relative_offset: offset, + offset, position, + timestamp, }); } @@ -392,19 +360,21 @@ impl SegmentStorage for FileSegmentStorage { let mut reader = BufReader::with_capacity(BUF_READER_CAPACITY_BYTES, file); loop { - let relative_offset = reader.read_u32_le().await?; + let offset = reader.read_u32_le().await?; let position = reader.read_u32_le().await?; + let timestamp = reader.read_u64_le().await?; read_bytes += INDEX_SIZE; let idx = Index { - relative_offset, + offset, position, + timestamp, }; idx_pred.push(idx); - if relative_offset >= relative_start_offset { + if offset >= relative_start_offset { index_range.start = idx_pred.tail().unwrap_or_default(); } - if relative_offset >= relative_end_offset { + if offset >= relative_end_offset { index_range.end = idx; break; } @@ -426,8 +396,9 @@ impl SegmentStorage for FileSegmentStorage { async fn save_index(&self, index_path: &str, index: Index) -> Result<(), IggyError> { let mut bytes = BytesMut::with_capacity(INDEX_SIZE as usize); - bytes.put_u32_le(index.relative_offset); + bytes.put_u32_le(index.offset); bytes.put_u32_le(index.position); + bytes.put_u64_le(index.timestamp); if let Err(err) = self .persister .append(index_path, &bytes) @@ -440,17 +411,17 @@ impl SegmentStorage for FileSegmentStorage { Ok(()) } - async fn try_load_time_index_for_timestamp( + async fn try_load_index_for_timestamp( &self, segment: &Segment, timestamp: u64, - ) -> Result, IggyError> { + ) -> Result, IggyError> { trace!("Loading time indexes from file..."); - let file = file::open(&segment.time_index_path).await?; + let file = file::open(&segment.index_path).await?; let file_size = file.metadata().await?.len() as usize; if file_size == 0 { trace!("Time index file is empty."); - return Ok(Some(TimeIndex::default())); + return Ok(Some(Index::default())); } let mut reader = BufReader::with_capacity(BUF_READER_CAPACITY_BYTES, file); @@ -458,106 +429,23 @@ impl SegmentStorage for FileSegmentStorage { let mut idx_pred = HeadTailBuffer::new(); loop { let offset = reader.read_u32_le().await?; + let position = reader.read_u32_le().await?; let time = reader.read_u64_le().await?; - let idx = TimeIndex { - relative_offset: offset, + let idx = Index { + offset, + position, timestamp: time, }; idx_pred.push(idx); if time >= timestamp { return Ok(idx_pred.tail()); } - read_bytes += TIME_INDEX_SIZE as usize; + read_bytes += INDEX_SIZE as usize; if read_bytes == file_size { return Ok(None); } } } - - async fn load_all_time_indexes(&self, segment: &Segment) -> Result, IggyError> { - trace!("Loading time indexes from file..."); - let file = file::open(&segment.time_index_path).await?; - let file_size = file.metadata().await?.len() as usize; - if file_size == 0 { - trace!("Time index file is empty."); - return Ok(EMPTY_TIME_INDEXES); - } - - let indexes_count = file_size / TIME_INDEX_SIZE as usize; - let mut indexes = Vec::with_capacity(indexes_count); - let mut reader = BufReader::with_capacity(BUF_READER_CAPACITY_BYTES, file); - for idx_num in 0..indexes_count { - let offset = reader.read_u32_le().await.inspect_err(|error| { - error!( - "Cannot read offset from index file for offset: {}. Error: {}", - idx_num, &error - ) - })?; - let timestamp = reader.read_u64().await.inspect_err(|error| { - error!( - "Cannot read timestamp from index file for offset: {}. Error: {}", - offset, &error - ) - })?; - indexes.push(TimeIndex { - relative_offset: offset, - timestamp, - }); - } - if indexes.len() != indexes_count { - error!( - "Loaded {} time indexes from disk, expected {}.", - indexes.len(), - indexes_count - ); - } - - trace!("Loaded {} time indexes from file.", indexes_count); - - Ok(indexes) - } - - async fn load_last_time_index( - &self, - segment: &Segment, - ) -> Result, IggyError> { - trace!("Loading last time index from file..."); - let mut file = file::open(&segment.time_index_path).await?; - let file_size = file.metadata().await?.len() as usize; - if file_size == 0 { - trace!("Time index file is empty."); - return Ok(None); - } - - let last_index_position = file_size - TIME_INDEX_SIZE as usize; - file.seek(SeekFrom::Start(last_index_position as u64)) - .await?; - let index_offset = file.read_u32_le().await?; - let timestamp = file.read_u64_le().await?; - let index = TimeIndex { - relative_offset: index_offset, - timestamp, - }; - - trace!("Loaded last time index from file: {:?}", index); - Ok(Some(index)) - } - - async fn save_time_index(&self, index_path: &str, index: TimeIndex) -> Result<(), IggyError> { - let mut bytes = BytesMut::with_capacity(TIME_INDEX_SIZE as usize); - bytes.put_u32_le(index.relative_offset); - bytes.put_u64_le(index.timestamp); - if let Err(err) = self - .persister - .append(index_path, &bytes) - .await - .with_context(|| format!("Failed to save TimeIndex to segment: {}", index_path)) - { - return Err(IggyError::CannotSaveTimeIndexToSegment(err)); - } - - Ok(()) - } } async fn load_batches_by_range( @@ -574,9 +462,9 @@ async fn load_batches_by_range( trace!( "Loading message batches by index range: {} [{}] - {} [{}], file size: {file_size}", index_range.start.position, - index_range.start.relative_offset, + index_range.start.offset, index_range.end.position, - index_range.end.relative_offset + index_range.end.offset ); let mut reader = BufReader::with_capacity(BUF_READER_CAPACITY_BYTES, file); @@ -604,7 +492,7 @@ async fn load_batches_by_range( .map_err(|_| IggyError::CannotReadMaxTimestamp)?; let last_offset = batch_base_offset + (last_offset_delta as u64); - let index_last_offset = index_range.end.relative_offset as u64 + segment.start_offset; + let index_last_offset = index_range.end.offset as u64 + segment.start_offset; let payload_len = batch_length as usize; let mut payload = BytesMut::with_capacity(payload_len); diff --git a/server/src/streaming/segments/time_index.rs b/server/src/streaming/segments/time_index.rs deleted file mode 100644 index 6c43a6439..000000000 --- a/server/src/streaming/segments/time_index.rs +++ /dev/null @@ -1,11 +0,0 @@ -#[derive(Debug, Default, Eq, Clone, Copy)] -pub struct TimeIndex { - pub relative_offset: u32, - pub timestamp: u64, -} - -impl PartialEq for TimeIndex { - fn eq(&self, other: &Self) -> bool { - self.relative_offset == other.relative_offset && self.timestamp == other.timestamp - } -} diff --git a/server/src/streaming/storage.rs b/server/src/streaming/storage.rs index 1acb71c2f..b8a39330e 100644 --- a/server/src/streaming/storage.rs +++ b/server/src/streaming/storage.rs @@ -7,7 +7,6 @@ use crate::streaming::persistence::persister::Persister; use crate::streaming::segments::index::{Index, IndexRange}; use crate::streaming::segments::segment::Segment; use crate::streaming::segments::storage::FileSegmentStorage; -use crate::streaming::segments::time_index::TimeIndex; use crate::streaming::streams::storage::FileStreamStorage; use crate::streaming::streams::stream::Stream; use crate::streaming::systems::info::SystemInfo; @@ -86,15 +85,11 @@ pub trait SegmentStorage: Send + Sync { index_end_offset: u64, ) -> Result, IggyError>; async fn save_index(&self, index_path: &str, index: Index) -> Result<(), IggyError>; - async fn try_load_time_index_for_timestamp( + async fn try_load_index_for_timestamp( &self, segment: &Segment, timestamp: u64, - ) -> Result, IggyError>; - async fn load_all_time_indexes(&self, segment: &Segment) -> Result, IggyError>; - async fn load_last_time_index(&self, segment: &Segment) - -> Result, IggyError>; - async fn save_time_index(&self, index_path: &str, index: TimeIndex) -> Result<(), IggyError>; + ) -> Result, IggyError>; } #[derive(Debug)] @@ -158,7 +153,6 @@ pub(crate) mod tests { use crate::streaming::partitions::partition::Partition; use crate::streaming::segments::index::{Index, IndexRange}; use crate::streaming::segments::segment::Segment; - use crate::streaming::segments::time_index::TimeIndex; use crate::streaming::storage::*; use crate::streaming::streams::stream::Stream; use crate::streaming::topics::topic::Topic; @@ -330,35 +324,13 @@ pub(crate) mod tests { Ok(()) } - async fn try_load_time_index_for_timestamp( + async fn try_load_index_for_timestamp( &self, _segment: &Segment, _timestamp: u64, - ) -> Result, IggyError> { + ) -> Result, IggyError> { Ok(None) } - - async fn load_all_time_indexes( - &self, - _segment: &Segment, - ) -> Result, IggyError> { - Ok(vec![]) - } - - async fn load_last_time_index( - &self, - _segment: &Segment, - ) -> Result, IggyError> { - Ok(None) - } - - async fn save_time_index( - &self, - _index_path: &str, - _index: TimeIndex, - ) -> Result<(), IggyError> { - Ok(()) - } } pub fn get_test_system_storage() -> SystemStorage { diff --git a/server/src/streaming/systems/stats.rs b/server/src/streaming/systems/stats.rs index 20475f7e7..ca12423c8 100644 --- a/server/src/streaming/systems/stats.rs +++ b/server/src/streaming/systems/stats.rs @@ -1,8 +1,8 @@ use crate::streaming::session::Session; use crate::streaming::systems::system::System; -use iggy::error::IggyError; use iggy::locking::IggySharedMutFn; use iggy::models::stats::Stats; +use iggy::{error::IggyError, utils::duration::IggyDuration}; use std::sync::OnceLock; use sysinfo::{Pid, ProcessesToUpdate, System as SysinfoSystem}; use tokio::sync::Mutex; @@ -62,7 +62,7 @@ impl System { stats.process_id = process.pid().as_u32(); stats.cpu_usage = process.cpu_usage(); stats.memory_usage = process.memory().into(); - stats.run_time = process.run_time().into(); + stats.run_time = IggyDuration::new_from_secs(process.run_time()); stats.start_time = process.start_time().into(); let disk_usage = process.disk_usage(); diff --git a/server/src/streaming/utils/file.rs b/server/src/streaming/utils/file.rs index 57e993ef2..2bc4286fd 100644 --- a/server/src/streaming/utils/file.rs +++ b/server/src/streaming/utils/file.rs @@ -26,6 +26,10 @@ pub async fn rename(old_path: &str, new_path: &str) -> Result<(), std::io::Error tokio::fs::rename(Path::new(old_path), Path::new(new_path)).await } +pub async fn exists(path: &str) -> Result { + tokio::fs::try_exists(path).await +} + pub async fn folder_size

(path: P) -> std::io::Result where P: Into + AsRef,