Skip to content

Commit

Permalink
Benchmark for Producer cycle
Browse files Browse the repository at this point in the history
* Implementing benchmark for the producer cycle similar to how it's used
  with the sessions in Chipmunk.
* Benchmarks still suffers from noise form the async runtime, but this
  noise is reduced as possible with variety of configuration and tricks
* Remove async-trait from mocks after rebase from master
  • Loading branch information
AmmarAbouZor authored and marcmo committed Sep 18, 2024
1 parent a996912 commit 747b5fe
Show file tree
Hide file tree
Showing 4 changed files with 227 additions and 76 deletions.
102 changes: 74 additions & 28 deletions application/apps/indexer/sources/benches/mocks/mock_parser.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
use std::fmt::Display;

use parsers::{LogMessage, Parser};
use criterion::black_box;
use parsers::{Attachment, LogMessage, Parser};
use serde::Serialize;

#[derive(Debug, Clone, Copy)]
pub struct MockParser {
/// Sets how much bytes each call of [`Parser::parse()`] should consume.
consume: usize,
/// Sets how many times the method [`Parser::parse()`] will be called before it'll return None.
max_count: usize,
/// Internal counter to keep track how many times [`Parser::parse()`] has been called.
Expand All @@ -15,12 +14,9 @@ pub struct MockParser {

impl MockParser {
/// Creates new instance of the mock parser with the given settings.
///
/// * `consume`: Sets how much bytes each call of [`Parser::parse()`] should consume.
/// * `max_count`: Sets how many times the method [`Parser::parse()`] will be called before it'll return None.
pub fn new(consume: usize, max_count: usize) -> Self {
pub const fn new(max_count: usize) -> Self {
Self {
consume,
max_count,
counter: 0,
}
Expand Down Expand Up @@ -50,34 +46,84 @@ impl LogMessage for MockMessage {
}
}

impl MockMessage {
pub fn new(msg: String) -> Self {
Self { content: msg }
}
}

impl From<String> for MockMessage {
fn from(value: String) -> Self {
Self::new(value)
}
}

// NOTE: Methods within trait implementation have inner non-async function that should never be
// inline and the trait method should be always inline. This reduces the noise in the benchmarks.
impl Parser<MockMessage> for MockParser {
/// This will keep returning a valid item result until the counter reaches max count then it
/// will be return [`parsers::Error::Eof`]
fn parse<'a>(
&mut self,
input: &'a [u8],
_timestamp: Option<u64>,
timestamp: Option<u64>,
) -> Result<(&'a [u8], Option<parsers::ParseYield<MockMessage>>), parsers::Error> {
if self.counter >= self.max_count {
return Err(parsers::Error::Eof);
#[inline(never)]
fn inner(
counter: usize,
max_count: usize,
input: &[u8],
_timestamp: Option<u64>,
) -> Result<(&'static [u8], Option<parsers::ParseYield<MockMessage>>), parsers::Error>
{
// Return `Eof` Once the counter reaches max_count.
if counter >= max_count {
const ERR: parsers::Error = parsers::Error::Eof;

return Err(criterion::black_box(ERR));
}

// Unnecessary check to convince the compiler that we are using the input.
if input.is_empty() {
return Err(black_box(parsers::Error::Eof));
}

const MSG: &str = "msg";

// Unnecessary checks to convince the compiler that all return options are possible.
if black_box(50) > black_box(60) {
Err(parsers::Error::Incomplete)
} else if black_box(50) > black_box(0) {
// Only this value will be always returned if the calls counter still smaller than
// the max value.
Ok((
black_box(&[]),
Some(parsers::ParseYield::Message(MockMessage {
content: black_box(MSG).into(),
})),
))
} else if black_box(20) > black_box(30) {
Ok((
black_box(&[]),
Some(parsers::ParseYield::Attachment(Attachment {
size: black_box(10),
name: String::from(black_box(MSG)),
data: Vec::new(),
messages: Vec::new(),
created_date: None,
modified_date: None,
})),
))
} else {
Ok((
black_box(&[]),
Some(parsers::ParseYield::MessageAndAttachment((
MockMessage {
content: black_box(MSG).into(),
},
Attachment {
size: black_box(10),
name: String::from(black_box(MSG)),
data: Vec::new(),
messages: Vec::new(),
created_date: None,
modified_date: None,
},
))),
))
}
}
self.counter += 1;

let msg = String::from_utf8_lossy(&input[..self.consume]).to_string();
self.counter += 1;

Ok((
&input[..self.consume],
Some(parsers::ParseYield::Message(msg.into())),
))
inner(self.counter, self.max_count, input, timestamp)
}
}
87 changes: 54 additions & 33 deletions application/apps/indexer/sources/benches/mocks/mock_source.rs
Original file line number Diff line number Diff line change
@@ -1,58 +1,79 @@
use std::iter;

use async_trait::async_trait;
use sources::{ByteSource, ReloadInfo};
use criterion::black_box;
use sources::ByteSource;

#[derive(Debug, Clone)]
pub struct MockByteSource {
/// Represent the bytes that will be repeated to fill the internal buffer
data_sample: u8,
/// Sets how many bytes will be loaded into the internal buffer on each
/// [`ByteSource::reload()`] call.
load_amount: usize,
/// The internal buffer
buffer: Vec<u8>,
}
pub struct MockByteSource {}

impl MockByteSource {
/// Creates a new instant of [`MockByteSource`]
///
/// * `data_sample`: Represent the bytes that will be repeated to fill the internal buffer
/// * `load_amount`: Sets how many bytes will be loaded into the internal buffer on
/// each [`ByteSource::reload()`] call.
pub fn new(data_sample: u8, load_amount: usize) -> Self {
Self {
data_sample,
load_amount,
buffer: Vec::new(),
}
pub fn new() -> Self {
Self {}
}
}

#[async_trait]
// NOTE: Methods within trait implementation have inner non-async function that should never be
// inline and the trait method should be always inline. This remove unnecessary `Future::poll()`
// calls from the runtime to reduce its noise.

impl ByteSource for MockByteSource {
#[inline(always)]
fn consume(&mut self, offset: usize) {
self.buffer
.truncate(self.buffer.len().checked_sub(offset).unwrap())
#[inline(never)]
fn inner(offset: usize) {
const ZERO: usize = 0;

if offset == black_box(ZERO) {
println!("Random message to make sure the compiler won't optimize this");
}
}

inner(offset);
}

#[inline(always)]
fn current_slice(&self) -> &[u8] {
&self.buffer
#[inline(never)]
fn inner(_phantom: &MockByteSource) -> &[u8] {
black_box({
const BYTES: [u8; 3] = [b'a', b's', b'a'];
const REF: &[u8] = &BYTES;

REF
})
}

inner(self)
}

#[inline(always)]
fn len(&self) -> usize {
self.buffer.len()
#[inline(never)]
fn inner() -> usize {
const LEN: usize = 3;

black_box(LEN)
}

inner()
}

#[inline(always)]
async fn reload(
&mut self,
_filter: Option<&sources::SourceFilter>,
) -> Result<Option<sources::ReloadInfo>, sources::Error> {
self.buffer
.extend(iter::repeat(self.data_sample).take(self.load_amount));
#[inline(never)]
fn inner() -> Result<Option<sources::ReloadInfo>, sources::Error> {
const AA: Result<Option<sources::ReloadInfo>, sources::Error> =
Ok(Some(sources::ReloadInfo {
available_bytes: 5,
newly_loaded_bytes: 5,
skipped_bytes: 0,
last_known_ts: None,
}));

let info = ReloadInfo::new(self.load_amount, self.len(), 0, None);
black_box(AA)
}

Ok(Some(info))
inner()
}
}
109 changes: 97 additions & 12 deletions application/apps/indexer/sources/benches/producer_benchmarks.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,106 @@
use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion};
use std::time::Duration;

async fn produce(val: usize, val_2: usize) {
//TODO AAZ:
assert_eq!(val, val_2);
}
use criterion::{black_box, criterion_group, criterion_main, BenchmarkId, Criterion};
use futures::StreamExt;
use mocks::{mock_parser::MockParser, mock_source::MockByteSource};
use parsers::{LogMessage, MessageStreamItem};
use sources::producer::MessageProducer;

mod mocks;

/// Runs Benchmarks replicating the producer loop within Chipmunk sessions, using mocks for
/// [`parsers::Parser`] and [`sources::ByteSource`] to ensure that the measurements is for the
/// producer loop only.
///
/// NOTE: This benchmark suffers unfortunately from a lot of noise because we are running it with
/// asynchronous runtime. This test is configured to reduce this amount of noise as possible,
/// However it would be better to run it multiple time for double checking.
fn producer_benchmark(c: &mut Criterion) {
let val = 1024;
let max_parse_calls = 50000;

c.bench_with_input(
BenchmarkId::new("run_producer", max_parse_calls),
&(max_parse_calls),
|bencher, &max| {
bencher
// It's important to spawn a new runtime on each run to ensure to reduce the
// potential noise produced from one runtime created at the start of all benchmarks
// only.
.to_async(tokio::runtime::Runtime::new().unwrap())
.iter_batched(
|| {
// Exclude initiation time from benchmarks.
let parser = MockParser::new(max);
let byte_source = MockByteSource::new();
let producer = MessageProducer::new(parser, byte_source, black_box(None));

producer
},
|producer| run_producer(producer),
criterion::BatchSize::SmallInput,
)
},
);
}

/// Creates a message producer from the given arguments, then creates a stream of it and consumes
/// it, replicating the producer loop inside Chipmunk sessions
async fn run_producer<P, B, T>(mut producer: MessageProducer<T, P, B>)
where
P: parsers::Parser<T>,
B: sources::ByteSource,
T: LogMessage,
{
let s = producer.as_stream();
//
// using `tokio::pin!()` provided more stable (and faster) benchmarks than
// `futures::pin_mut!()`
tokio::pin!(s);

while let Some((_, i)) = s.next().await {
match i {
MessageStreamItem::Item(item) => match item {
parsers::ParseYield::Message(msg) => {
if msg.to_string().is_empty() {
println!("This should never be printed")
}
}
parsers::ParseYield::Attachment(att) => {
if att.size > 10 {
println!("This should never be printed")
}
}
parsers::ParseYield::MessageAndAttachment((msg, att)) => {
if msg.to_string().is_empty() || att.size > 10 {
println!("This should never be printed")
}
}
},
MessageStreamItem::Skipped => {
println!("This should never be printed")
}
MessageStreamItem::Incomplete => {
println!("This should never be printed")
}
MessageStreamItem::Empty => println!("This should never be printed"),
MessageStreamItem::Done => break,
}
}
}

c.bench_with_input(BenchmarkId::new("producer", val), &val, |bencher, &v| {
bencher
.to_async(tokio::runtime::Runtime::new().unwrap())
.iter(|| produce(v, v));
});
criterion_group! {
name = benches;
config = Criterion::default()
// Warm up time is very important here because multiple async runtimes will be spawn in
// that time which make the next ones to spawn more stable.
.warm_up_time(Duration::from_secs(10))
// Measurement time and sample sized to role out noise in the measurements as possible.
.measurement_time(Duration::from_secs(20))
.sample_size(200)
// These two values help to reduce the noise level in the results.
.significance_level(0.01)
.noise_threshold(0.03);
targets = producer_benchmark
}

criterion_group!(benches, producer_benchmark);
criterion_main!(benches);
5 changes: 2 additions & 3 deletions application/apps/indexer/sources/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
// TODO AAZ: Write better comment.
// This can't be used with benchmarks for now until issue "https://github.com/rust-lang/rust/issues/129637" is resolved.
// #![deny(unused_crate_dependencies)]
// Rust can't currently distinguish between dev and none-dev dependencies at the moment. There is
// an open issue for this case: "https://github.com/rust-lang/rust/issues/129637"

use thiserror::Error;

Expand Down

0 comments on commit 747b5fe

Please sign in to comment.