Skip to content

Commit

Permalink
WIP - Partitioning support
Browse files Browse the repository at this point in the history
  • Loading branch information
ggam committed May 1, 2024
1 parent d379e9e commit 135d274
Showing 1 changed file with 118 additions and 19 deletions.
137 changes: 118 additions & 19 deletions dbt/include/postgres/macros/adapters.sql
Original file line number Diff line number Diff line change
Expand Up @@ -4,29 +4,128 @@

{{ sql_header if sql_header is not none }}

create {% if temporary -%}
temporary
{%- elif unlogged -%}
unlogged
{%- endif %} table {{ relation }}
{% set contract_config = config.get('contract') %}
{% if contract_config.enforced %}
{{ get_assert_columns_equivalent(sql) }}
{% endif -%}
{% if contract_config.enforced and (not temporary) -%}
{{ get_table_columns_and_constraints() }} ;
insert into {{ relation }} (
{{ adapter.dispatch('get_column_names', 'dbt')() }}
)
{%- set sql = get_select_subquery(sql) %}
{% if config.get('partition_by') != None %}
{# Use partitioning #}

{%- set partition_by_field = config.get('partition_by')['field'] -%}
{%- set partition_by_granularity = config.get('partition_by')['granularity'] -%}

-- We cannot create a partitioned table with "create as select".
-- Create a dummy temporary table just to be used as a template for the real one
create temporary table "{{ this.identifier }}__tt" as
select *
from ({{ sql }}) model_subq
where 1 = 2;

-- Create partitioned table as a copy of the template
create {% if temporary -%}
temporary
{%- elif unlogged -%}
unlogged
{%- endif %} table {{ relation }} (like "{{ this.identifier }}__tt")
partition by range ({{ partition_by_field }});

{% set required_partitions_query %}
-- Partitions need to be manually created before inserting.
-- We execute the model SQL to get the first and last partitions and use the granularity to define what others need to be created.
--
-- Note that we this executes the SQL model a first time just to get the partitions and it will be done a second time to actually get the data.
-- That might be very inefficient depending on the selected volume of data.
-- There's a alternative:
-- - Create as select a temporary non-partitioned table.
-- - Get the min/max partitions from that table.
-- - Move the data to the final table once it's partitioned.
--
-- That option would mean the data is stored twice during model execution.
with partitions as (
-- Generate a sequence with one row per required partition, based on the min and max dates
select
generate_series(
date_trunc('{{ partition_by_granularity }}', min({{ partition_by_field }})),
date_trunc('{{ partition_by_granularity }}', max({{ partition_by_field }})),
'1 {{ partition_by_granularity }}'::interval
) as begin_date,
(select floor(random() * 100 + 1)::int) as rand -- Random number to avoid name colissions
from (
{{ sql }}
) model_subq
)
select
-- Generate the begin-end date and name suffix for each partition
begin_date,
begin_date + '1 {{ partition_by_granularity }}'::interval as end_date,
to_char(begin_date, '_yyyymmdd') as partition_suffix
from partitions;
{% endset %}
{% set required_partitions_results = run_query(required_partitions_query) %}

-- Create the required partitions
{% for required_partition in required_partitions_results.rows %}
create
{% if temporary -%}
temporary
{%- elif unlogged -%}
unlogged
{%- endif %}
table
{{ make_intermediate_relation(relation, required_partition['partition_suffix']) }}
partition of {{ relation }} for values from ('{{ required_partition["begin_date"] }}'::timestamp) to ('{{ required_partition["end_date"] }}'::timestamp);
{% endfor %}

-- Insert into the parent table
insert into {{ relation }}
{{ sql }};
{% else %}
as
create {% if temporary -%}
temporary
{%- elif unlogged -%}
unlogged
{%- endif %} table {{ relation }}
{% set contract_config = config.get('contract') %}
{% if contract_config.enforced %}
{{ get_assert_columns_equivalent(sql) }}
{% endif -%}
{% if contract_config.enforced and (not temporary) -%}
{{ get_table_columns_and_constraints() }} ;
insert into {{ relation }} (
{{ adapter.dispatch('get_column_names', 'dbt')() }}
)
{%- set sql = get_select_subquery(sql) %}
{% else %}
as
{% endif %}
(
{{ sql }}
);
{% endif %}
(
{{ sql }}
);
{%- endmacro %}

{% macro postgres__rename_relation(from_relation, to_relation) %}
{% set target_name = adapter.quote_as_configured(to_relation.identifier, 'identifier') %}
{% call statement('rename_relation') -%}
alter table {{ from_relation }} rename to {{ target_name }};

{# If the relation is partitioned, rename the subtables #}
{% set existing_partitions_query %}
select
inhrelid::regclass::text as from_table_name,
replace(
inhrelid::regclass::text,
'{{ from_relation.schema }}.{{ from_relation.identifier }}', -- Current partition name
'{{ to_relation.identifier }}' -- New partition name
) as to_table_name
from pg_catalog.pg_inherits
where inhparent = '{{ from_relation.schema }}.{{ from_relation.identifier }}'::regclass;
{% endset %}
{% set existing_partitions_results = run_query(existing_partitions_query) %}

-- Create the required partitions
{% for existing_partition in existing_partitions_results.rows %}
alter table {{ existing_partition["from_table_name"] }} rename to {{ existing_partition["to_table_name"] }};
{% endfor %}
{%- endcall %}
{% endmacro %}

{% macro postgres__get_create_index_sql(relation, index_dict) -%}
{%- set index_config = adapter.parse_index(index_dict) -%}
{%- set comma_separated_columns = ", ".join(index_config.columns) -%}
Expand Down

0 comments on commit 135d274

Please sign in to comment.