diff --git a/linkage_dag.py b/linkage_dag.py index 2d05466..4711aa8 100644 --- a/linkage_dag.py +++ b/linkage_dag.py @@ -409,8 +409,10 @@ # we're about to copy tables from staging to production, so do checks to make sure we haven't broken anything # along the way check_queries = [] - production_tables = ["sources", "references", "all_metadata_with_cld2_lid"] - for table_name in production_tables: + all_metadata_table = "all_metadata_with_cld2_lid" + staging_tables = ["sources", "references", all_metadata_table] + production_tables = ["sources", "references"] + for table_name in staging_tables: check_queries.append(BigQueryCheckOperator( task_id="check_monotonic_increase_"+table_name.lower(), sql=(f"select (select count(0) from {staging_dataset}.{table_name}) >= " @@ -489,6 +491,25 @@ ) start_production_cp >> push_to_production >> snapshot >> pop_descriptions >> success_alert + # We don't show the "all metadata" table in the production dataset, but we do need to + # be able to diff the current data from the data used in the last run in simhash_input + copy_cld2 = BigQueryToBigQueryOperator( + task_id=f"copy_{all_metadata_table}", + source_project_dataset_tables=[f"{staging_dataset}.{all_metadata_table}"], + destination_project_dataset_table=f"{staging_dataset}.{all_metadata_table}_last_run", + create_disposition="CREATE_IF_NEEDED", + write_disposition="WRITE_TRUNCATE" + ) + + snapshot_cld2 = BigQueryToBigQueryOperator( + task_id=f"snapshot_{all_metadata_table}", + source_project_dataset_tables=[f"{staging_dataset}.{all_metadata_table}"], + destination_project_dataset_table=f"{backup_dataset}.{all_metadata_table}_{curr_date}", + create_disposition="CREATE_IF_NEEDED", + write_disposition="WRITE_TRUNCATE" + ) + start_production_cp >> copy_cld2 >> snapshot_cld2 >> success_alert + # task structure clear_tmp_dir >> metadata_sequences_start (metadata_sequences_end >> union_ids >> check_unique_input_ids >> union_metadata >> export_metadata >> diff --git a/sql/simhash_input.sql b/sql/simhash_input.sql index a8d395a..9310842 100644 --- a/sql/simhash_input.sql +++ b/sql/simhash_input.sql @@ -11,7 +11,7 @@ where id not in ( select a.id from {{staging_dataset}}.all_metadata_norm_filt a left join - {{production_dataset}}.all_metadata_with_cld2_lid b + {{staging_dataset}}.all_metadata_with_cld2_lid_last_run b on a.id = b.id where (a.title = b.title) and (a.abstract = b.abstract) and (a.year = b.year) and (a.title != "") and (a.title is not null) and (a.abstract != "") and (a.abstract is not null) and (a.year is not null)