From f7ec548c6865c509b4dcad03ca501f74ec971a4e Mon Sep 17 00:00:00 2001 From: Quigley Malcolm Date: Tue, 1 Oct 2024 11:57:46 -0500 Subject: [PATCH 01/11] Switch from environment variable to behavior flag for gating microbatch functionality --- .../adapter/incremental/test_incremental_microbatch.py | 9 ++++++++- dbt/adapters/base/impl.py | 4 ++-- 2 files changed, 10 insertions(+), 3 deletions(-) diff --git a/dbt-tests-adapter/dbt/tests/adapter/incremental/test_incremental_microbatch.py b/dbt-tests-adapter/dbt/tests/adapter/incremental/test_incremental_microbatch.py index 5bbabbe1..f3a3177b 100644 --- a/dbt-tests-adapter/dbt/tests/adapter/incremental/test_incremental_microbatch.py +++ b/dbt-tests-adapter/dbt/tests/adapter/incremental/test_incremental_microbatch.py @@ -63,7 +63,14 @@ def assert_row_count(self, project, relation_name: str, expected_row_count: int) assert len(result) == expected_row_count, f"{relation_name}:{pformat(result)}" - @mock.patch.dict(os.environ, {"DBT_EXPERIMENTAL_MICROBATCH": "True"}) + @pytest.fixture(scope="class") + def project_config_update(self): + return { + "flags": { + "require_builtin_microbatch_strategy": True, + } + } + def test_run_with_event_time(self, project, insert_two_rows_sql): # initial run -- backfills all data with patch_microbatch_end_time("2020-01-03 13:57:00"): diff --git a/dbt/adapters/base/impl.py b/dbt/adapters/base/impl.py index f3788fe3..964c173f 100644 --- a/dbt/adapters/base/impl.py +++ b/dbt/adapters/base/impl.py @@ -312,7 +312,7 @@ def _behavior_flags(self) -> List[BehaviorFlag]: """ This method should be overwritten by adapter maintainers to provide platform-specific flags """ - return [] + return [{"name": "require_builtin_microbatch_strategy", "default": False}] ### # Methods that pass through to the connection manager @@ -1570,7 +1570,7 @@ def valid_incremental_strategies(self): def builtin_incremental_strategies(self): builtin_strategies = ["append", "delete+insert", "merge", "insert_overwrite"] - if os.environ.get("DBT_EXPERIMENTAL_MICROBATCH"): + if self.behavior.require_builtin_microbatch_strategy: builtin_strategies.append("microbatch") return builtin_strategies From 344a54a4739c2b1374f7c58b91570f358f2b1303 Mon Sep 17 00:00:00 2001 From: Quigley Malcolm Date: Tue, 1 Oct 2024 12:20:04 -0500 Subject: [PATCH 02/11] Remove unused imports from test_incremental_microbatch --- .../adapter/incremental/test_incremental_microbatch.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/dbt-tests-adapter/dbt/tests/adapter/incremental/test_incremental_microbatch.py b/dbt-tests-adapter/dbt/tests/adapter/incremental/test_incremental_microbatch.py index f3a3177b..55610fb5 100644 --- a/dbt-tests-adapter/dbt/tests/adapter/incremental/test_incremental_microbatch.py +++ b/dbt-tests-adapter/dbt/tests/adapter/incremental/test_incremental_microbatch.py @@ -1,6 +1,4 @@ -import os from pprint import pformat -from unittest import mock import pytest @@ -69,8 +67,8 @@ def project_config_update(self): "flags": { "require_builtin_microbatch_strategy": True, } - } - + } + def test_run_with_event_time(self, project, insert_two_rows_sql): # initial run -- backfills all data with patch_microbatch_end_time("2020-01-03 13:57:00"): From 977b3dd86ba79535d00268f01e3288f8ce6929d0 Mon Sep 17 00:00:00 2001 From: Quigley Malcolm Date: Tue, 1 Oct 2024 14:44:14 -0500 Subject: [PATCH 03/11] Add description to `require_builtin_microbatch_strategy` behavior flag --- dbt/adapters/base/impl.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/dbt/adapters/base/impl.py b/dbt/adapters/base/impl.py index 964c173f..7a02318a 100644 --- a/dbt/adapters/base/impl.py +++ b/dbt/adapters/base/impl.py @@ -312,7 +312,13 @@ def _behavior_flags(self) -> List[BehaviorFlag]: """ This method should be overwritten by adapter maintainers to provide platform-specific flags """ - return [{"name": "require_builtin_microbatch_strategy", "default": False}] + return [ + { + "name": "require_builtin_microbatch_strategy", + "default": False, + "description": "If True, then a builtin `microbatch` materialization strategy can be expected to exist." + } + ] ### # Methods that pass through to the connection manager From 0db38d5529ef56c69eeec4e72b6b5496280c265d Mon Sep 17 00:00:00 2001 From: Quigley Malcolm Date: Tue, 1 Oct 2024 16:37:38 -0500 Subject: [PATCH 04/11] removed unused 'os' import in 'impl.py' --- dbt/adapters/base/impl.py | 1 - 1 file changed, 1 deletion(-) diff --git a/dbt/adapters/base/impl.py b/dbt/adapters/base/impl.py index 7a02318a..1eaae811 100644 --- a/dbt/adapters/base/impl.py +++ b/dbt/adapters/base/impl.py @@ -22,7 +22,6 @@ Union, TYPE_CHECKING, ) -import os import pytz from dbt_common.behavior_flags import Behavior, BehaviorFlag from dbt_common.clients.jinja import CallableMacroGenerator From 3e54575d29a9df03c4bcbc10bff3bc4245147521 Mon Sep 17 00:00:00 2001 From: Quigley Malcolm Date: Tue, 1 Oct 2024 16:49:29 -0500 Subject: [PATCH 05/11] Fix lint of 'impl.py' --- dbt/adapters/base/impl.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dbt/adapters/base/impl.py b/dbt/adapters/base/impl.py index 1eaae811..b7e6272b 100644 --- a/dbt/adapters/base/impl.py +++ b/dbt/adapters/base/impl.py @@ -315,7 +315,7 @@ def _behavior_flags(self) -> List[BehaviorFlag]: { "name": "require_builtin_microbatch_strategy", "default": False, - "description": "If True, then a builtin `microbatch` materialization strategy can be expected to exist." + "description": "If True, then a builtin `microbatch` materialization strategy can be expected to exist.", } ] From aa2d7df79c5639bf73fa88fb17f5e4519b4b47aa Mon Sep 17 00:00:00 2001 From: Quigley Malcolm Date: Tue, 1 Oct 2024 16:54:17 -0500 Subject: [PATCH 06/11] Add changie doc --- .changes/unreleased/Features-20241001-165406.yaml | 7 +++++++ 1 file changed, 7 insertions(+) create mode 100644 .changes/unreleased/Features-20241001-165406.yaml diff --git a/.changes/unreleased/Features-20241001-165406.yaml b/.changes/unreleased/Features-20241001-165406.yaml new file mode 100644 index 00000000..609684d4 --- /dev/null +++ b/.changes/unreleased/Features-20241001-165406.yaml @@ -0,0 +1,7 @@ +kind: Features +body: Use a behavior flag to gate microbatch functionality (instead of an environment + variable) +time: 2024-10-01T16:54:06.121016-05:00 +custom: + Author: QMalcolm + Issue: "327" From 16986be3921559b86f440c834a9d71c980489677 Mon Sep 17 00:00:00 2001 From: Quigley Malcolm Date: Fri, 11 Oct 2024 14:02:16 -0500 Subject: [PATCH 07/11] Link to microbatch docs page in microbatch behavior flag --- dbt/adapters/base/impl.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dbt/adapters/base/impl.py b/dbt/adapters/base/impl.py index b7e6272b..28d50d08 100644 --- a/dbt/adapters/base/impl.py +++ b/dbt/adapters/base/impl.py @@ -315,7 +315,7 @@ def _behavior_flags(self) -> List[BehaviorFlag]: { "name": "require_builtin_microbatch_strategy", "default": False, - "description": "If True, then a builtin `microbatch` materialization strategy can be expected to exist.", + "docs_url": "https://docs.getdbt.com/docs/build/incremental-microbatch", } ] From f950e9d00748bc1c57972bc02ecb56c37e7dc657 Mon Sep 17 00:00:00 2001 From: Quigley Malcolm Date: Fri, 11 Oct 2024 14:11:23 -0500 Subject: [PATCH 08/11] Add test case for when microbatch behavior flag is off --- .../incremental/test_incremental_microbatch.py | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/dbt-tests-adapter/dbt/tests/adapter/incremental/test_incremental_microbatch.py b/dbt-tests-adapter/dbt/tests/adapter/incremental/test_incremental_microbatch.py index 55610fb5..dfd329ac 100644 --- a/dbt-tests-adapter/dbt/tests/adapter/incremental/test_incremental_microbatch.py +++ b/dbt-tests-adapter/dbt/tests/adapter/incremental/test_incremental_microbatch.py @@ -61,6 +61,7 @@ def assert_row_count(self, project, relation_name: str, expected_row_count: int) assert len(result) == expected_row_count, f"{relation_name}:{pformat(result)}" +class TestMicrobatchOn(BaseMicrobatch): @pytest.fixture(scope="class") def project_config_update(self): return { @@ -99,3 +100,17 @@ def test_run_with_event_time(self, project, insert_two_rows_sql): with patch_microbatch_end_time("2020-01-05 14:57:00"): run_dbt(["run", "--select", "microbatch_model"]) self.assert_row_count(project, "microbatch_model", 5) + +class TestMicrobatchOff(BaseMicrobatch): + @pytest.fixture(scope="class") + def project_config_update(self): + return { + "flags": { + "require_builtin_microbatch_strategy": False, + } + } + + def test_run_with_event_time(self, project): + with patch_microbatch_end_time("2020-01-03 13:57:00"): + run_dbt(["run"], expect_pass=False) + From 88db9c86154eb83b4bd9219e1695ba04fec3dd4c Mon Sep 17 00:00:00 2001 From: Quigley Malcolm Date: Mon, 14 Oct 2024 10:46:37 -0500 Subject: [PATCH 09/11] Revert/drop: Temporarily bump versions --- dbt-tests-adapter/dbt/tests/__about__.py | 2 +- dbt/adapters/__about__.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/dbt-tests-adapter/dbt/tests/__about__.py b/dbt-tests-adapter/dbt/tests/__about__.py index 8c657eec..977620c3 100644 --- a/dbt-tests-adapter/dbt/tests/__about__.py +++ b/dbt-tests-adapter/dbt/tests/__about__.py @@ -1 +1 @@ -version = "1.10.2" +version = "1.10.3" diff --git a/dbt/adapters/__about__.py b/dbt/adapters/__about__.py index a55413d1..116d5667 100644 --- a/dbt/adapters/__about__.py +++ b/dbt/adapters/__about__.py @@ -1 +1 @@ -version = "1.7.0" +version = "1.7.1" From da6e6ab9468721ed00d5179011e3c3f88e7c7973 Mon Sep 17 00:00:00 2001 From: Quigley Malcolm Date: Mon, 14 Oct 2024 14:47:55 -0500 Subject: [PATCH 10/11] Update microbatch functional tests for proper use in downstream adapters --- .../tests/adapter/incremental/test_incremental_microbatch.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/dbt-tests-adapter/dbt/tests/adapter/incremental/test_incremental_microbatch.py b/dbt-tests-adapter/dbt/tests/adapter/incremental/test_incremental_microbatch.py index dfd329ac..2408b1a5 100644 --- a/dbt-tests-adapter/dbt/tests/adapter/incremental/test_incremental_microbatch.py +++ b/dbt-tests-adapter/dbt/tests/adapter/incremental/test_incremental_microbatch.py @@ -61,7 +61,7 @@ def assert_row_count(self, project, relation_name: str, expected_row_count: int) assert len(result) == expected_row_count, f"{relation_name}:{pformat(result)}" -class TestMicrobatchOn(BaseMicrobatch): +class BaseTestMicrobatchOn(BaseMicrobatch): @pytest.fixture(scope="class") def project_config_update(self): return { @@ -101,7 +101,7 @@ def test_run_with_event_time(self, project, insert_two_rows_sql): run_dbt(["run", "--select", "microbatch_model"]) self.assert_row_count(project, "microbatch_model", 5) -class TestMicrobatchOff(BaseMicrobatch): +class BaseTestMicrobatchOff(BaseMicrobatch): @pytest.fixture(scope="class") def project_config_update(self): return { From a20bf5158318026190a8159eada66d51c8fbbc5f Mon Sep 17 00:00:00 2001 From: Quigley Malcolm Date: Mon, 21 Oct 2024 15:02:53 -0700 Subject: [PATCH 11/11] Fix linting --- .../tests/adapter/incremental/test_incremental_microbatch.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/dbt-tests-adapter/dbt/tests/adapter/incremental/test_incremental_microbatch.py b/dbt-tests-adapter/dbt/tests/adapter/incremental/test_incremental_microbatch.py index 2408b1a5..d35dee8c 100644 --- a/dbt-tests-adapter/dbt/tests/adapter/incremental/test_incremental_microbatch.py +++ b/dbt-tests-adapter/dbt/tests/adapter/incremental/test_incremental_microbatch.py @@ -61,6 +61,7 @@ def assert_row_count(self, project, relation_name: str, expected_row_count: int) assert len(result) == expected_row_count, f"{relation_name}:{pformat(result)}" + class BaseTestMicrobatchOn(BaseMicrobatch): @pytest.fixture(scope="class") def project_config_update(self): @@ -101,6 +102,7 @@ def test_run_with_event_time(self, project, insert_two_rows_sql): run_dbt(["run", "--select", "microbatch_model"]) self.assert_row_count(project, "microbatch_model", 5) + class BaseTestMicrobatchOff(BaseMicrobatch): @pytest.fixture(scope="class") def project_config_update(self): @@ -109,8 +111,7 @@ def project_config_update(self): "require_builtin_microbatch_strategy": False, } } - + def test_run_with_event_time(self, project): with patch_microbatch_end_time("2020-01-03 13:57:00"): run_dbt(["run"], expect_pass=False) -