diff --git a/application/apps/indexer/sources/benches/mocks/mock_parser.rs b/application/apps/indexer/sources/benches/mocks/mock_parser.rs index bd9c0327d..e065c9463 100644 --- a/application/apps/indexer/sources/benches/mocks/mock_parser.rs +++ b/application/apps/indexer/sources/benches/mocks/mock_parser.rs @@ -1,12 +1,11 @@ use std::fmt::Display; -use parsers::{LogMessage, Parser}; +use criterion::black_box; +use parsers::{Attachment, LogMessage, Parser}; use serde::Serialize; #[derive(Debug, Clone, Copy)] pub struct MockParser { - /// Sets how much bytes each call of [`Parser::parse()`] should consume. - consume: usize, /// Sets how many times the method [`Parser::parse()`] will be called before it'll return None. max_count: usize, /// Internal counter to keep track how many times [`Parser::parse()`] has been called. @@ -15,12 +14,9 @@ pub struct MockParser { impl MockParser { /// Creates new instance of the mock parser with the given settings. - /// - /// * `consume`: Sets how much bytes each call of [`Parser::parse()`] should consume. /// * `max_count`: Sets how many times the method [`Parser::parse()`] will be called before it'll return None. - pub fn new(consume: usize, max_count: usize) -> Self { + pub const fn new(max_count: usize) -> Self { Self { - consume, max_count, counter: 0, } @@ -50,34 +46,84 @@ impl LogMessage for MockMessage { } } -impl MockMessage { - pub fn new(msg: String) -> Self { - Self { content: msg } - } -} - -impl From for MockMessage { - fn from(value: String) -> Self { - Self::new(value) - } -} - +// NOTE: Methods within trait implementation have inner non-async function that should never be +// inline and the trait method should be always inline. This reduces the noise in the benchmarks. impl Parser for MockParser { + /// This will keep returning a valid item result until the counter reaches max count then it + /// will be return [`parsers::Error::Eof`] fn parse<'a>( &mut self, input: &'a [u8], - _timestamp: Option, + timestamp: Option, ) -> Result<(&'a [u8], Option>), parsers::Error> { - if self.counter >= self.max_count { - return Err(parsers::Error::Eof); + #[inline(never)] + fn inner( + counter: usize, + max_count: usize, + input: &[u8], + _timestamp: Option, + ) -> Result<(&'static [u8], Option>), parsers::Error> + { + // Return `Eof` Once the counter reaches max_count. + if counter >= max_count { + const ERR: parsers::Error = parsers::Error::Eof; + + return Err(criterion::black_box(ERR)); + } + + // Unnecessary check to convince the compiler that we are using the input. + if input.is_empty() { + return Err(black_box(parsers::Error::Eof)); + } + + const MSG: &str = "msg"; + + // Unnecessary checks to convince the compiler that all return options are possible. + if black_box(50) > black_box(60) { + Err(parsers::Error::Incomplete) + } else if black_box(50) > black_box(0) { + // Only this value will be always returned if the calls counter still smaller than + // the max value. + Ok(( + black_box(&[]), + Some(parsers::ParseYield::Message(MockMessage { + content: black_box(MSG).into(), + })), + )) + } else if black_box(20) > black_box(30) { + Ok(( + black_box(&[]), + Some(parsers::ParseYield::Attachment(Attachment { + size: black_box(10), + name: String::from(black_box(MSG)), + data: Vec::new(), + messages: Vec::new(), + created_date: None, + modified_date: None, + })), + )) + } else { + Ok(( + black_box(&[]), + Some(parsers::ParseYield::MessageAndAttachment(( + MockMessage { + content: black_box(MSG).into(), + }, + Attachment { + size: black_box(10), + name: String::from(black_box(MSG)), + data: Vec::new(), + messages: Vec::new(), + created_date: None, + modified_date: None, + }, + ))), + )) + } } - self.counter += 1; - let msg = String::from_utf8_lossy(&input[..self.consume]).to_string(); + self.counter += 1; - Ok(( - &input[..self.consume], - Some(parsers::ParseYield::Message(msg.into())), - )) + inner(self.counter, self.max_count, input, timestamp) } } diff --git a/application/apps/indexer/sources/benches/mocks/mock_source.rs b/application/apps/indexer/sources/benches/mocks/mock_source.rs index 85b7f0517..52f88467d 100644 --- a/application/apps/indexer/sources/benches/mocks/mock_source.rs +++ b/application/apps/indexer/sources/benches/mocks/mock_source.rs @@ -1,58 +1,79 @@ -use std::iter; - -use async_trait::async_trait; -use sources::{ByteSource, ReloadInfo}; +use criterion::black_box; +use sources::ByteSource; #[derive(Debug, Clone)] -pub struct MockByteSource { - /// Represent the bytes that will be repeated to fill the internal buffer - data_sample: u8, - /// Sets how many bytes will be loaded into the internal buffer on each - /// [`ByteSource::reload()`] call. - load_amount: usize, - /// The internal buffer - buffer: Vec, -} +pub struct MockByteSource {} impl MockByteSource { - /// Creates a new instant of [`MockByteSource`] - /// - /// * `data_sample`: Represent the bytes that will be repeated to fill the internal buffer - /// * `load_amount`: Sets how many bytes will be loaded into the internal buffer on - /// each [`ByteSource::reload()`] call. - pub fn new(data_sample: u8, load_amount: usize) -> Self { - Self { - data_sample, - load_amount, - buffer: Vec::new(), - } + pub fn new() -> Self { + Self {} } } -#[async_trait] +// NOTE: Methods within trait implementation have inner non-async function that should never be +// inline and the trait method should be always inline. This remove unnecessary `Future::poll()` +// calls from the runtime to reduce its noise. + impl ByteSource for MockByteSource { + #[inline(always)] fn consume(&mut self, offset: usize) { - self.buffer - .truncate(self.buffer.len().checked_sub(offset).unwrap()) + #[inline(never)] + fn inner(offset: usize) { + const ZERO: usize = 0; + + if offset == black_box(ZERO) { + println!("Random message to make sure the compiler won't optimize this"); + } + } + + inner(offset); } + #[inline(always)] fn current_slice(&self) -> &[u8] { - &self.buffer + #[inline(never)] + fn inner(_phantom: &MockByteSource) -> &[u8] { + black_box({ + const BYTES: [u8; 3] = [b'a', b's', b'a']; + const REF: &[u8] = &BYTES; + + REF + }) + } + + inner(self) } + #[inline(always)] fn len(&self) -> usize { - self.buffer.len() + #[inline(never)] + fn inner() -> usize { + const LEN: usize = 3; + + black_box(LEN) + } + + inner() } + #[inline(always)] async fn reload( &mut self, _filter: Option<&sources::SourceFilter>, ) -> Result, sources::Error> { - self.buffer - .extend(iter::repeat(self.data_sample).take(self.load_amount)); + #[inline(never)] + fn inner() -> Result, sources::Error> { + const AA: Result, sources::Error> = + Ok(Some(sources::ReloadInfo { + available_bytes: 5, + newly_loaded_bytes: 5, + skipped_bytes: 0, + last_known_ts: None, + })); - let info = ReloadInfo::new(self.load_amount, self.len(), 0, None); + black_box(AA) + } - Ok(Some(info)) + inner() } } diff --git a/application/apps/indexer/sources/benches/producer_benchmarks.rs b/application/apps/indexer/sources/benches/producer_benchmarks.rs index 2f6db7eac..782722f1d 100644 --- a/application/apps/indexer/sources/benches/producer_benchmarks.rs +++ b/application/apps/indexer/sources/benches/producer_benchmarks.rs @@ -1,21 +1,106 @@ -use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion}; +use std::time::Duration; -async fn produce(val: usize, val_2: usize) { - //TODO AAZ: - assert_eq!(val, val_2); -} +use criterion::{black_box, criterion_group, criterion_main, BenchmarkId, Criterion}; +use futures::StreamExt; +use mocks::{mock_parser::MockParser, mock_source::MockByteSource}; +use parsers::{LogMessage, MessageStreamItem}; +use sources::producer::MessageProducer; mod mocks; +/// Runs Benchmarks replicating the producer loop within Chipmunk sessions, using mocks for +/// [`parsers::Parser`] and [`sources::ByteSource`] to ensure that the measurements is for the +/// producer loop only. +/// +/// NOTE: This benchmark suffers unfortunately from a lot of noise because we are running it with +/// asynchronous runtime. This test is configured to reduce this amount of noise as possible, +/// However it would be better to run it multiple time for double checking. fn producer_benchmark(c: &mut Criterion) { - let val = 1024; + let max_parse_calls = 50000; + + c.bench_with_input( + BenchmarkId::new("run_producer", max_parse_calls), + &(max_parse_calls), + |bencher, &max| { + bencher + // It's important to spawn a new runtime on each run to ensure to reduce the + // potential noise produced from one runtime created at the start of all benchmarks + // only. + .to_async(tokio::runtime::Runtime::new().unwrap()) + .iter_batched( + || { + // Exclude initiation time from benchmarks. + let parser = MockParser::new(max); + let byte_source = MockByteSource::new(); + let producer = MessageProducer::new(parser, byte_source, black_box(None)); + + producer + }, + |producer| run_producer(producer), + criterion::BatchSize::SmallInput, + ) + }, + ); +} + +/// Creates a message producer from the given arguments, then creates a stream of it and consumes +/// it, replicating the producer loop inside Chipmunk sessions +async fn run_producer(mut producer: MessageProducer) +where + P: parsers::Parser, + B: sources::ByteSource, + T: LogMessage, +{ + let s = producer.as_stream(); + // + // using `tokio::pin!()` provided more stable (and faster) benchmarks than + // `futures::pin_mut!()` + tokio::pin!(s); + + while let Some((_, i)) = s.next().await { + match i { + MessageStreamItem::Item(item) => match item { + parsers::ParseYield::Message(msg) => { + if msg.to_string().is_empty() { + println!("This should never be printed") + } + } + parsers::ParseYield::Attachment(att) => { + if att.size > 10 { + println!("This should never be printed") + } + } + parsers::ParseYield::MessageAndAttachment((msg, att)) => { + if msg.to_string().is_empty() || att.size > 10 { + println!("This should never be printed") + } + } + }, + MessageStreamItem::Skipped => { + println!("This should never be printed") + } + MessageStreamItem::Incomplete => { + println!("This should never be printed") + } + MessageStreamItem::Empty => println!("This should never be printed"), + MessageStreamItem::Done => break, + } + } +} - c.bench_with_input(BenchmarkId::new("producer", val), &val, |bencher, &v| { - bencher - .to_async(tokio::runtime::Runtime::new().unwrap()) - .iter(|| produce(v, v)); - }); +criterion_group! { + name = benches; + config = Criterion::default() + // Warm up time is very important here because multiple async runtimes will be spawn in + // that time which make the next ones to spawn more stable. + .warm_up_time(Duration::from_secs(10)) + // Measurement time and sample sized to role out noise in the measurements as possible. + .measurement_time(Duration::from_secs(20)) + .sample_size(200) + // These two values help to reduce the noise level in the results. + .significance_level(0.01) + .noise_threshold(0.03); + targets = producer_benchmark } -criterion_group!(benches, producer_benchmark); criterion_main!(benches); diff --git a/application/apps/indexer/sources/src/lib.rs b/application/apps/indexer/sources/src/lib.rs index c32098224..bb7eaf845 100644 --- a/application/apps/indexer/sources/src/lib.rs +++ b/application/apps/indexer/sources/src/lib.rs @@ -1,6 +1,5 @@ -// TODO AAZ: Write better comment. -// This can't be used with benchmarks for now until issue "https://github.com/rust-lang/rust/issues/129637" is resolved. -// #![deny(unused_crate_dependencies)] +// Rust can't currently distinguish between dev and none-dev dependencies at the moment. There is +// an open issue for this case: "https://github.com/rust-lang/rust/issues/129637" use thiserror::Error;