From 1d37ce9d2ab28d5adf79b089635bb7ed1337fbd4 Mon Sep 17 00:00:00 2001 From: Phil Snyder Date: Fri, 19 Jul 2024 16:13:12 -0700 Subject: [PATCH] create a procedure for each parquet datatype --- .../recover/schema/parquet/deploy.sql | 24 ++++++++ .../procedure/copy_into_table_from_stage.sql | 11 ++-- .../schema/parquet/procedure/deploy.sql | 60 ++++++++++++++++++- 3 files changed, 87 insertions(+), 8 deletions(-) diff --git a/snowflake/objects/database/recover/schema/parquet/deploy.sql b/snowflake/objects/database/recover/schema/parquet/deploy.sql index dd6c72f..0da976a 100644 --- a/snowflake/objects/database/recover/schema/parquet/deploy.sql +++ b/snowflake/objects/database/recover/schema/parquet/deploy.sql @@ -1,5 +1,8 @@ /* Create a parquet schema (if it doesn't yet exist) and deploy all child objects. + + Jinja templating variables: + git_branch - The name of the git branch from which we are deploying. */ CREATE SCHEMA IF NOT EXISTS parquet; USE SCHEMA parquet; @@ -22,3 +25,24 @@ EXECUTE IMMEDIATE ); EXECUTE IMMEDIATE FROM './table/deploy.sql'; +EXECUTE IMMEDIATE +$$ +BEGIN + IF ('{{ git_branch }}' = 'main') THEN + -- Our procedures will reference the prod stage + EXECUTE IMMEDIATE + FROM './procedure/deploy.sql' + USING ( + stage_name => $parquet_prod_stage_name, + file_format => $parquet_file_format_name + ); + ELSE + EXECUTE IMMEDIATE + FROM './procedure/deploy.sql' + USING ( + stage_name => $parquet_dev_stage_name, + file_format => $parquet_file_format_name + ); + END IF; +END; +$$; diff --git a/snowflake/objects/database/recover/schema/parquet/procedure/copy_into_table_from_stage.sql b/snowflake/objects/database/recover/schema/parquet/procedure/copy_into_table_from_stage.sql index 01329a7..062a069 100644 --- a/snowflake/objects/database/recover/schema/parquet/procedure/copy_into_table_from_stage.sql +++ b/snowflake/objects/database/recover/schema/parquet/procedure/copy_into_table_from_stage.sql @@ -1,5 +1,5 @@ /* - A stored procedure which copies data from a named stage into a table. + A stored procedure which copies Parquet data from a named stage into a table. Because of limitations in how we can pass variables to stage names, this procedure is specific to a stage location. That is, we cannot @@ -8,11 +8,12 @@ to use a specific stage location. Jinja templating variables: - stage_name - The name of the stage where we copy data from - stage_path - The location within the stage where our data is - file_format - The name of the file format object used during copy + datatype - The datatype which our stage location refers to. + stage_name - The name of the stage where our data exists. + stage_path - The location within the stage where our data exists. + file_format - The name of the file format object used during copy. */ -CREATE OR REPLACE PROCEDURE copy_into_table_from_stage( +CREATE OR REPLACE PROCEDURE copy_into_table_from_{{ datatype }}_parquet_stage( target_table VARCHAR ) RETURNS TABLE () diff --git a/snowflake/objects/database/recover/schema/parquet/procedure/deploy.sql b/snowflake/objects/database/recover/schema/parquet/procedure/deploy.sql index 9ab1585..8353867 100644 --- a/snowflake/objects/database/recover/schema/parquet/procedure/deploy.sql +++ b/snowflake/objects/database/recover/schema/parquet/procedure/deploy.sql @@ -1,7 +1,61 @@ /* Deploy all PROCEDURE objects - This script does nothing (yet) because all of our procuedures are stage-specific. - See limitations of specifying variables in stage names in - `copy_into_table_from_stage.sql`. + Jinja templating variables: + stage_name - The name of the stage where our data exists. + file_format - The name of the file format object used by the + `copy_into_table_from_stage.sql` procedure. */ + +WITH AS PROCEDURE () + RETURNS VARCHAR + LANGUAGE SQL +AS +$$ +DECLARE + parquet_datatypes ARRAY := [ + 'enrolledparticipants_customfields_symptoms', + 'enrolledparticipants_customfields_treatments', + 'enrolledparticipants', + 'fitbitactivitylogs', + 'fitbitdailydata', + 'fitbitdevices', + 'fitbitecg', + 'fitbitecg_waveformsamples', + 'fitbitintradaycombined', + 'fitbitrestingheartrates', + 'fitbitsleeplogs', + 'fitbitsleeplogs_sleeplogdetails', + 'googlefitsamples', + 'healthkitv2activitysummaries', + 'healthkitv2electrocardiogram', + 'healthkitv2electrocardiogram_subsamples', + 'healthkitv2heartbeat', + 'healthkitv2heartbeat_subsamples', + 'healthkitv2samples', + 'healthkitv2statistics', + 'healthkitv2workouts_events', + 'healthkitv2workouts', + 'symptomlog', + 'symptomlog_value_symptoms', + 'symptomlog_value_treatments' + ]; + datatype VARCHAR; + dataset_name VARCHAR; +BEGIN + FOR i in 0 to array_size(:parquet_datatypes)-1 DO + datatype := GET(:parquet_datatypes, :i); + dataset_name := CONCAT('dataset_', :datatype); + -- Create a stored procedure which uses this data type's stage location + EXECUTE IMMEDIATE + FROM './copy_into_table_from_stage.sql' + USING ( + datatype => :datatype, + stage_name => '{{ stage_name }}', + stage_path => :dataset_name, + file_format => '{{ file_format }}' + ); + END FOR; +END; +$$ +CALL create_procedure_for_each_parquet_table();