From 3ca64597e012dd783682d7d7ffba2a214ae1a0d9 Mon Sep 17 00:00:00 2001 From: Quigley Malcolm Date: Wed, 2 Oct 2024 17:07:58 -0500 Subject: [PATCH] Add microbatch strategy This work is basically in entirety a duplicate of the work done by MichelleArk in https://github.com/dbt-labs/dbt-snowflake/pull/1179. I don't really expect this to work first try, but it might. I expect to need to do some edits, but who knows, maybe I'll get lucky. --- dbt/adapters/redshift/impl.py | 2 +- .../materializations/incremental_merge.sql | 31 +++++++++++++++++++ .../test_incremental_microbatch.py | 24 ++++++++++++++ 3 files changed, 56 insertions(+), 1 deletion(-) create mode 100644 tests/functional/adapter/incremental/test_incremental_microbatch.py diff --git a/dbt/adapters/redshift/impl.py b/dbt/adapters/redshift/impl.py index e0cefb989..6c40155d2 100644 --- a/dbt/adapters/redshift/impl.py +++ b/dbt/adapters/redshift/impl.py @@ -155,7 +155,7 @@ def valid_incremental_strategies(self): """The set of standard builtin strategies which this adapter supports out-of-the-box. Not used to validate custom strategies defined by end users. """ - return ["append", "delete+insert", "merge"] + return ["append", "delete+insert", "merge", "microbatch"] def timestamp_add_sql(self, add_to: str, number: int = 1, interval: str = "hour") -> str: return f"{add_to} + interval '{number} {interval}'" diff --git a/dbt/include/redshift/macros/materializations/incremental_merge.sql b/dbt/include/redshift/macros/materializations/incremental_merge.sql index 59a3391e3..79bc10ce1 100644 --- a/dbt/include/redshift/macros/materializations/incremental_merge.sql +++ b/dbt/include/redshift/macros/materializations/incremental_merge.sql @@ -65,3 +65,34 @@ ) {% endmacro %} + +{% macro redshift__get_incremental_microbatch_sql(arg_dict) %} + {%- set target = arg_dict["target_relation"] -%} + {%- set source = arg_dict["temp_relation"] -%} + {%- set dest_columns = arg_dict["dest_columns"] -%} + {%- set incremental_predicates = [] if arg_dict.get('incremental_predicates') is none else arg_dict.get('incremental_predicates') -%} + + {#-- Add additional incremental_predicates to filter for batch --#} + {% if model.config.get("__dbt_internal_microbatch_event_time_start") -%} + {% do incremental_predicates.append("DBT_INTERNAL_TARGET." ~ model.config.event_time ~ " >= TIMESTAMP '" ~ model.config.__dbt_internal_microbatch_event_time_start ~ "'") %} + {% endif %} + {% if model.config.__dbt_internal_microbatch_event_time_end -%} + {% do incremental_predicates.append("DBT_INTERNAL_TARGET." ~ model.config.event_time ~ " < TIMESTAMP '" ~ model.config.__dbt_internal_microbatch_event_time_end ~ "'") %} + {% endif %} + {% do arg_dict.update({'incremental_predicates': incremental_predicates}) %} + + delete from {{ target }} DBT_INTERNAL_TARGET + using {{ source }} + where ( + {% for predicate in incremental_predicates %} + {%- if not loop.first %}and {% endif -%} {{ predicate }} + {% endfor %} + ); + + {%- set dest_cols_csv = get_quoted_csv(dest_columns | map(attribute="name")) -%} + insert into {{ target }} ({{ dest_cols_csv }}) + ( + select {{ dest_cols_csv }} + from {{ source }} + ) +{% endmacro %} diff --git a/tests/functional/adapter/incremental/test_incremental_microbatch.py b/tests/functional/adapter/incremental/test_incremental_microbatch.py new file mode 100644 index 000000000..d04316d5d --- /dev/null +++ b/tests/functional/adapter/incremental/test_incremental_microbatch.py @@ -0,0 +1,24 @@ +import pytest +from dbt.tests.adapter.incremental.test_incremental_microbatch import ( + BaseMicrobatch, +) + + +# No requirement for a unique_id for redshift microbatch! +_microbatch_model_no_unique_id_sql = """ +{{ config(materialized='incremental', incremental_strategy='microbatch', event_time='event_time', batch_size='day') }} +select * from {{ ref('input_model') }} +""" + + +class TestSnowflakeMicrobatch(BaseMicrobatch): + @pytest.fixture(scope="class") + def microbatch_model_sql(self) -> str: + return _microbatch_model_no_unique_id_sql + + @pytest.fixture(scope="class") + def insert_two_rows_sql(self, project) -> str: + test_schema_relation = project.adapter.Relation.create( + database=project.database, schema=project.test_schema + ) + return f"insert into {test_schema_relation}.input_model (id, event_time) values (4, '2020-01-04 00:00:00-0'), (5, '2020-01-05 00:00:00-0')"