diff --git a/linkage_dag.py b/linkage_dag.py index 99fd773..1679db9 100644 --- a/linkage_dag.py +++ b/linkage_dag.py @@ -361,8 +361,8 @@ task_id="import_id_mapping", bucket=bucket, source_objects=[f"{tmp_dir}/id_mapping.jsonl"], - schema_object=f"{schema_dir}/sources.json", - destination_project_dataset_table=f"{staging_dataset}.sources", + schema_object=f"{schema_dir}/id_mapping.json", + destination_project_dataset_table=f"{staging_dataset}.id_mapping", source_format="NEWLINE_DELIMITED_JSON", create_disposition="CREATE_IF_NEEDED", write_disposition="WRITE_TRUNCATE" @@ -413,10 +413,12 @@ staging_tables = ["sources", "references", all_metadata_table] production_tables = ["sources", "references"] for table_name in staging_tables: + compare_table_name = table_name if table_name != all_metadata_table else all_metadata_table+"_last_run" + compare_dataset = production_dataset if table_name != all_metadata_table else staging_dataset check_queries.append(BigQueryCheckOperator( task_id="check_monotonic_increase_"+table_name.lower(), sql=(f"select (select count(0) from {staging_dataset}.{table_name}) >= " - f"(select 0.8*count(0) from {production_dataset}.{table_name})"), + f"(select 0.8*count(0) from {compare_dataset}.{compare_table_name})"), use_legacy_sql=False )) @@ -453,7 +455,7 @@ ), BigQueryCheckOperator( task_id="no_null_references", - sql=f"select count(0) = 0 from {staging_dataset}.references where id is null or ref_id is null", + sql=f"select count(0) = 0 from {staging_dataset}.references where merged_id is null or ref_id is null", use_legacy_sql = False ), BigQueryCheckOperator( diff --git a/schemas/id_mapping.json b/schemas/id_mapping.json new file mode 100644 index 0000000..f1e4bef --- /dev/null +++ b/schemas/id_mapping.json @@ -0,0 +1,14 @@ +[ + { + "mode": "REQUIRED", + "name": "merged_id", + "type": "STRING", + "description": "CSET merged id of a set of articles judged to be the same" + }, + { + "mode": "REQUIRED", + "name": "orig_id", + "type": "STRING", + "description": "Originating dataset's id for an article" + } +] \ No newline at end of file diff --git a/schemas/sources.json b/schemas/sources.json index caa4cab..7fb16d8 100644 --- a/schemas/sources.json +++ b/schemas/sources.json @@ -7,7 +7,7 @@ }, { "mode": "REQUIRED", - "name": "orig_ids", + "name": "orig_id", "type": "STRING", "description": "Originating dataset's id for an article" }, diff --git a/sql/sources.sql b/sql/sources.sql index 5287ec1..9a4b1d3 100644 --- a/sql/sources.sql +++ b/sql/sources.sql @@ -3,7 +3,7 @@ select distinct a.merged_id, a.orig_id, b.dataset -from {{staging_dataset}}.sources a -left join +from {{staging_dataset}}.id_mapping a +inner join {{staging_dataset}}.union_metadata b on a.orig_id = b.id