Skip to content

Commit

Permalink
Add microbatch strategy
Browse files Browse the repository at this point in the history
This work is basically in entirety a duplicate of the work done by
MichelleArk in dbt-labs/dbt-snowflake#1179.
I don't really expect this to work first try, but it might. I expect
to need to do some edits, but who knows, maybe I'll get lucky.
  • Loading branch information
QMalcolm committed Oct 2, 2024
1 parent 0b02178 commit 3ca6459
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 1 deletion.
2 changes: 1 addition & 1 deletion dbt/adapters/redshift/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ def valid_incremental_strategies(self):
"""The set of standard builtin strategies which this adapter supports out-of-the-box.
Not used to validate custom strategies defined by end users.
"""
return ["append", "delete+insert", "merge"]
return ["append", "delete+insert", "merge", "microbatch"]

def timestamp_add_sql(self, add_to: str, number: int = 1, interval: str = "hour") -> str:
return f"{add_to} + interval '{number} {interval}'"
Expand Down
31 changes: 31 additions & 0 deletions dbt/include/redshift/macros/materializations/incremental_merge.sql
Original file line number Diff line number Diff line change
Expand Up @@ -65,3 +65,34 @@
)

{% endmacro %}

{% macro redshift__get_incremental_microbatch_sql(arg_dict) %}
{%- set target = arg_dict["target_relation"] -%}
{%- set source = arg_dict["temp_relation"] -%}
{%- set dest_columns = arg_dict["dest_columns"] -%}
{%- set incremental_predicates = [] if arg_dict.get('incremental_predicates') is none else arg_dict.get('incremental_predicates') -%}

{#-- Add additional incremental_predicates to filter for batch --#}
{% if model.config.get("__dbt_internal_microbatch_event_time_start") -%}
{% do incremental_predicates.append("DBT_INTERNAL_TARGET." ~ model.config.event_time ~ " >= TIMESTAMP '" ~ model.config.__dbt_internal_microbatch_event_time_start ~ "'") %}
{% endif %}
{% if model.config.__dbt_internal_microbatch_event_time_end -%}
{% do incremental_predicates.append("DBT_INTERNAL_TARGET." ~ model.config.event_time ~ " < TIMESTAMP '" ~ model.config.__dbt_internal_microbatch_event_time_end ~ "'") %}
{% endif %}
{% do arg_dict.update({'incremental_predicates': incremental_predicates}) %}

delete from {{ target }} DBT_INTERNAL_TARGET
using {{ source }}
where (
{% for predicate in incremental_predicates %}
{%- if not loop.first %}and {% endif -%} {{ predicate }}
{% endfor %}
);

{%- set dest_cols_csv = get_quoted_csv(dest_columns | map(attribute="name")) -%}
insert into {{ target }} ({{ dest_cols_csv }})
(
select {{ dest_cols_csv }}
from {{ source }}
)
{% endmacro %}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
import pytest
from dbt.tests.adapter.incremental.test_incremental_microbatch import (
BaseMicrobatch,
)


# No requirement for a unique_id for redshift microbatch!
_microbatch_model_no_unique_id_sql = """
{{ config(materialized='incremental', incremental_strategy='microbatch', event_time='event_time', batch_size='day') }}
select * from {{ ref('input_model') }}
"""


class TestSnowflakeMicrobatch(BaseMicrobatch):
@pytest.fixture(scope="class")
def microbatch_model_sql(self) -> str:
return _microbatch_model_no_unique_id_sql

@pytest.fixture(scope="class")
def insert_two_rows_sql(self, project) -> str:
test_schema_relation = project.adapter.Relation.create(
database=project.database, schema=project.test_schema
)
return f"insert into {test_schema_relation}.input_model (id, event_time) values (4, '2020-01-04 00:00:00-0'), (5, '2020-01-05 00:00:00-0')"

0 comments on commit 3ca6459

Please sign in to comment.