From 1470ca577f90d1e9ea79693ff3df4d40837eb97d Mon Sep 17 00:00:00 2001 From: TzeYiing Date: Wed, 22 Nov 2023 03:30:07 +0800 Subject: [PATCH 1/3] feat: tweaks broadway pipeline batch size, batch timeout --- lib/logflare/source/bigquery/pipeline.ex | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/lib/logflare/source/bigquery/pipeline.ex b/lib/logflare/source/bigquery/pipeline.ex index 9cfadc448..0b6e81b91 100644 --- a/lib/logflare/source/bigquery/pipeline.ex +++ b/lib/logflare/source/bigquery/pipeline.ex @@ -27,15 +27,16 @@ defmodule Logflare.Source.BigQuery.Pipeline do Broadway.start_link(__MODULE__, name: name(source.token), + max_restarts: 10, producer: [ module: {BufferProducer, rls}, hibernate_after: 30_000 ], processors: [ - default: [concurrency: 1] + default: [concurrency: max_batchers] ], batchers: [ - bq: [concurrency: max_batchers, batch_size: 250, batch_timeout: 1000] + bq: [concurrency: max_batchers, batch_size: 500, batch_timeout: 800] ], context: rls ) From cc6746a37ae30a90ba9515c14c10d9072a5c4b09 Mon Sep 17 00:00:00 2001 From: TzeYiing Date: Wed, 22 Nov 2023 10:05:13 +0800 Subject: [PATCH 2/3] feat: increase batch timeout --- lib/logflare/source/bigquery/pipeline.ex | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/logflare/source/bigquery/pipeline.ex b/lib/logflare/source/bigquery/pipeline.ex index 0b6e81b91..5cb6391c8 100644 --- a/lib/logflare/source/bigquery/pipeline.ex +++ b/lib/logflare/source/bigquery/pipeline.ex @@ -36,7 +36,7 @@ defmodule Logflare.Source.BigQuery.Pipeline do default: [concurrency: max_batchers] ], batchers: [ - bq: [concurrency: max_batchers, batch_size: 500, batch_timeout: 800] + bq: [concurrency: max_batchers, batch_size: 500, batch_timeout: 1_500] ], context: rls ) From 44725731fe526bc9f145ef02e98afd70a15ffe89 Mon Sep 17 00:00:00 2001 From: TzeYiing Date: Wed, 22 Nov 2023 10:33:38 +0800 Subject: [PATCH 3/3] chore: fix flaky test --- test/logflare/logs/logs_test.exs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/test/logflare/logs/logs_test.exs b/test/logflare/logs/logs_test.exs index 73c648ec8..a59a6e52b 100644 --- a/test/logflare/logs/logs_test.exs +++ b/test/logflare/logs/logs_test.exs @@ -124,7 +124,8 @@ defmodule Logflare.LogsTest do ] assert :ok = Logs.ingest_logs(batch, source) - :timer.sleep(1_500) + # batcher timneout is 1_500 + :timer.sleep(2_000) end end