Description Test code for low latency streaming pipeline updates
- Clone this repo and switch to the checked out directory
- Designate or create a project to run the tests and create
terraform/terraform.tfvars
file with the following content:
project_id = "<your project id>"
- Create infrastructure to run:
cd terraform
terraform init
terraform apply
cd ..
This will start a pipeline which will be generating synthetic events:
./start-event-generation.sh <rate>
A typical rate is tens of thousands of events per second. A Dataflow pipeline named data-generator-<rate>
will be started. You can simulate event load increases by starting additional pipelines. Note, that you can't start
several pipelines with exactly the same rate because the pipeline name needs to be unique.
You can see the current publishing load by summing the rates of all active data generation pipelines.
This pipeline runs uninterrupted on a dedicated PubSub subscription. The goals are to collect the message processing latencies under the perfect circumstances and the unique message ids in order to later compare them with the pipelines being tested.
./start-baseline-pipeline.sh
./start-main-pipeline.sh
We are going to use the same pipeline code to update the existing pipeline - there is no difference in processing time
./update-pipeline.sh
All scripts below have time ranges defined in the beginning of the scripts, typically 20 minute intervals.
Adjust them as needed. For example, if you would like to check for a fixed time period add these lines after DECLARE
statements:
SET start_ts = TIMESTAMP '2023-06-06 08:32:00.00 America/Los_Angeles';
SET end_ts = TIMESTAMP '2023-06-06 08:45:00.00 America/Los_Angeles';
Use the following query to compare latency of ingestion of the baseline pipeline and the main pipeline:
DECLARE
start_ts DEFAULT TIMESTAMP_ADD(CURRENT_TIMESTAMP(), INTERVAL -40 MINUTE);
DECLARE
end_ts DEFAULT TIMESTAMP_ADD(CURRENT_TIMESTAMP(), INTERVAL -20 MINUTE);
WITH
latency AS (
SELECT
pipeline_type,
ingest_ts,
publish_ts,
TIMESTAMP_DIFF(ingest_ts, publish_ts, SECOND) latency_secs
FROM
pipeline_update.event
WHERE
publish_ts BETWEEN start_ts AND end_ts)
SELECT
pipeline_type,
COUNT(*) total_events,
AVG(latency.latency_secs) average_latency_secs,
MIN(latency.latency_secs) min_latency_secs,
MAX(latency.latency_secs) max_latency_secs,
STDDEV(latency.latency_secs) std_deviation
FROM
latency
GROUP BY
pipeline_type
ORDER BY
pipeline_type;
To check if there were missing records:
DECLARE
start_ts DEFAULT TIMESTAMP_ADD(CURRENT_TIMESTAMP(), INTERVAL -40 MINUTE);
DECLARE
end_ts DEFAULT TIMESTAMP_ADD(CURRENT_TIMESTAMP(), INTERVAL -20 MINUTE);
SELECT
COUNT(*) missed_events,
FROM
pipeline_update.event base
WHERE
base.publish_ts BETWEEN start_ts AND end_ts
AND pipeline_type = 'baseline'
AND NOT EXISTS(
SELECT
*
FROM
pipeline_update.event main
WHERE
main.publish_ts BETWEEN start_ts AND end_ts
AND pipeline_type = 'main'
AND base.id = main.id);
Duplicate events
DECLARE
start_ts DEFAULT TIMESTAMP_ADD(CURRENT_TIMESTAMP(), INTERVAL -40 MINUTE);
DECLARE
end_ts DEFAULT TIMESTAMP_ADD(CURRENT_TIMESTAMP(), INTERVAL -20 MINUTE);
SELECT
id,
pipeline_type,
COUNT(*) event_count,
FROM
pipeline_update.event base
WHERE
base.publish_ts BETWEEN start_ts AND end_ts
GROUP BY id, pipeline_type
HAVING event_count > 1
DECLARE
start_ts DEFAULT TIMESTAMP_ADD(CURRENT_TIMESTAMP(), INTERVAL -40 MINUTE);
DECLARE
end_ts DEFAULT TIMESTAMP_ADD(CURRENT_TIMESTAMP(), INTERVAL -20 MINUTE);
WITH
counts AS (
SELECT
COUNT(id) total_event_count,
COUNT(DISTINCT id) event_distinct_count,
pipeline_type,
FROM
pipeline_update.event base
WHERE
base.publish_ts BETWEEN start_ts
AND end_ts
GROUP BY
pipeline_type)
SELECT
event_distinct_count,
counts.total_event_count - counts.event_distinct_count AS dups_count,
(counts.total_event_count - counts.event_distinct_count)*100/counts.event_distinct_count dups_percentage,
pipeline_type
FROM
counts
ORDER BY
pipeline_type DESC;
./stop-event-generation.sh
./stop-processing-pipelines.sh
terraform -chdir terraform destroy
Contributions to this repo are always welcome and highly encouraged.
See CONTRIBUTING for more information how to get started.
Please note that this project is released with a Contributor Code of Conduct. By participating in this project you agree to abide by its terms. See Code of Conduct for more information.
Apache 2.0 - See LICENSE for more information.