From 2c030f24b90757522757de1174e077803842e82e Mon Sep 17 00:00:00 2001 From: Noa Aviel Dove Date: Fri, 11 Oct 2024 16:06:31 -0700 Subject: [PATCH] Refactor table name/entity type conversion in AnVIL plugin --- .../plugins/repository/tdr_anvil/__init__.py | 119 +++++++++--------- test/indexer/test_anvil.py | 10 +- test/integration_test.py | 18 ++- 3 files changed, 73 insertions(+), 74 deletions(-) diff --git a/src/azul/plugins/repository/tdr_anvil/__init__.py b/src/azul/plugins/repository/tdr_anvil/__init__.py index c7811d3d4..97300d495 100644 --- a/src/azul/plugins/repository/tdr_anvil/__init__.py +++ b/src/azul/plugins/repository/tdr_anvil/__init__.py @@ -72,7 +72,7 @@ KeyLinks = set[KeyLink] -class BundleEntityType(Enum): +class BundleType(Enum): """ AnVIL snapshots have no inherent notion of a "bundle". When indexing these snapshots, we dynamically construct bundles by selecting individual entities @@ -110,22 +110,22 @@ class BundleEntityType(Enum): dataset fields during aggregation. This bundle contains only a single dataset entity with only the `description` field populated. """ - primary: EntityType = 'biosample' - supplementary: EntityType = 'file' - duos: EntityType = 'dataset' + primary = 'anvil_biosample' + supplementary = 'anvil_file' + duos = 'anvil_dataset' class TDRAnvilBundleFQIDJSON(SourcedBundleFQIDJSON): - entity_type: str + table_name: str @attrs.frozen(kw_only=True) class TDRAnvilBundleFQID(TDRBundleFQID): - entity_type: BundleEntityType = attrs.field(converter=BundleEntityType) + table_name: str def to_json(self) -> TDRAnvilBundleFQIDJSON: return dict(super().to_json(), - entity_type=self.entity_type.value) + table_name=self.table_name) class TDRAnvilBundle(AnvilBundle[TDRAnvilBundleFQID], TDRBundle): @@ -172,10 +172,10 @@ def _version(self): def _count_subgraphs(self, source: TDRSourceSpec) -> int: rows = self._run_sql(f''' SELECT COUNT(*) AS count - FROM {backtick(self._full_table_name(source, BundleEntityType.primary.value))} + FROM {backtick(self._full_table_name(source, BundleType.primary.value))} UNION ALL SELECT COUNT(*) AS count - FROM {backtick(self._full_table_name(source, BundleEntityType.supplementary.value))} + FROM {backtick(self._full_table_name(source, BundleType.supplementary.value))} WHERE is_supplementary ''') return sum(row['count'] for row in rows) @@ -185,15 +185,15 @@ def _list_bundles(self, prefix: str ) -> list[TDRAnvilBundleFQID]: spec = source.spec - primary = BundleEntityType.primary.value - supplementary = BundleEntityType.supplementary.value - duos = BundleEntityType.duos.value + primary = BundleType.primary.value + supplementary = BundleType.supplementary.value + duos = BundleType.duos.value rows = list(self._run_sql(f''' - SELECT datarepo_row_id, {primary!r} AS entity_type + SELECT datarepo_row_id, {primary!r} AS table_name FROM {backtick(self._full_table_name(spec, primary))} WHERE STARTS_WITH(datarepo_row_id, '{prefix}') UNION ALL - SELECT datarepo_row_id, {supplementary!r} AS entity_type + SELECT datarepo_row_id, {supplementary!r} AS table_name FROM {backtick(self._full_table_name(spec, supplementary))} AS supp WHERE supp.is_supplementary AND STARTS_WITH(datarepo_row_id, '{prefix}') ''' + ( @@ -201,7 +201,7 @@ def _list_bundles(self, if config.duos_service_url is None else f''' UNION ALL - SELECT datarepo_row_id, {duos!r} AS entity_type + SELECT datarepo_row_id, {duos!r} AS table_name FROM {backtick(self._full_table_name(spec, duos))} ''' ))) @@ -218,7 +218,7 @@ def _list_bundles(self, # single dataset. This verification is performed independently and # concurrently for every partition, but only one partition actually # emits the bundle. - if row['entity_type'] == duos: + if row['table_name'] == duos: require(0 == duos_count) duos_count += 1 # Ensure that one partition will always contain the DUOS bundle @@ -229,43 +229,43 @@ def _list_bundles(self, source=source, uuid=bundle_uuid, version=self._version, - entity_type=BundleEntityType(row['entity_type']) + table_name=row['table_name'] )) return bundles def resolve_bundle(self, fqid: SourcedBundleFQIDJSON) -> TDRAnvilBundleFQID: - if 'entity_type' not in fqid: - # Resolution of bundles without entity type is expensive, so we only - # support it during canning. - assert not config.is_in_lambda, ('Bundle FQID lacks entity type', fqid) + if 'table_name' not in fqid: + # Resolution of bundles without the table name is expensive, so we + # only support it during canning. + assert not config.is_in_lambda, ('Bundle FQID lacks table name', fqid) source = self.source_from_json(fqid['source']) entity_id = uuids.change_version(fqid['uuid'], self.bundle_uuid_version, self.datarepo_row_uuid_version) rows = self._run_sql(' UNION ALL '.join(( f''' - SELECT {entity_type.value!r} AS entity_type - FROM {backtick(self._full_table_name(source.spec, entity_type.value))} + SELECT {bundle_type.value!r} AS table_name + FROM {backtick(self._full_table_name(source.spec, bundle_type.value))} WHERE datarepo_row_id = {entity_id!r} ''' - for entity_type in BundleEntityType + for bundle_type in BundleType ))) fqid = {**fqid, **one(rows)} return super().resolve_bundle(fqid) def _emulate_bundle(self, bundle_fqid: TDRAnvilBundleFQID) -> TDRAnvilBundle: - if bundle_fqid.entity_type is BundleEntityType.primary: + if bundle_fqid.table_name == BundleType.primary.value: log.info('Bundle %r is a primary bundle', bundle_fqid.uuid) return self._primary_bundle(bundle_fqid) - elif bundle_fqid.entity_type is BundleEntityType.supplementary: + elif bundle_fqid.table_name == BundleType.supplementary.value: log.info('Bundle %r is a supplementary bundle', bundle_fqid.uuid) return self._supplementary_bundle(bundle_fqid) - elif bundle_fqid.entity_type is BundleEntityType.duos: + elif bundle_fqid.table_name == BundleType.duos.value: assert config.duos_service_url is not None, bundle_fqid log.info('Bundle %r is a DUOS bundle', bundle_fqid.uuid) return self._duos_bundle(bundle_fqid) else: - assert False, bundle_fqid.entity_type + assert False, bundle_fqid.table_name def _primary_bundle(self, bundle_fqid: TDRAnvilBundleFQID) -> TDRAnvilBundle: source = bundle_fqid.source @@ -321,24 +321,25 @@ def _supplementary_bundle(self, bundle_fqid: TDRAnvilBundleFQID) -> TDRAnvilBund self.bundle_uuid_version, self.datarepo_row_uuid_version) source = bundle_fqid.source.spec - bundle_entity_type = bundle_fqid.entity_type.value + table_name = bundle_fqid.table_name result = TDRAnvilBundle(fqid=bundle_fqid) - columns = self._columns(bundle_entity_type) + columns = self._columns(table_name) bundle_entity = dict(one(self._run_sql(f''' SELECT {', '.join(sorted(columns))} - FROM {backtick(self._full_table_name(source, bundle_entity_type))} + FROM {backtick(self._full_table_name(source, table_name))} WHERE datarepo_row_id = '{entity_id}' '''))) - linked_entity_type = 'dataset' - columns = self._columns(linked_entity_type) + dataset = 'dataset' + dataset_table = f'anvil_{dataset}' + columns = self._columns(dataset_table) linked_entity = dict(one(self._run_sql(f''' SELECT {', '.join(sorted(columns))} - FROM {backtick(self._full_table_name(source, linked_entity_type))} + FROM {backtick(self._full_table_name(source, dataset_table))} '''))) link_args = {} for entity_type, row, arg in [ - (bundle_entity_type, bundle_entity, 'outputs'), - (linked_entity_type, linked_entity, 'inputs') + ('file', bundle_entity, 'outputs'), + (dataset, linked_entity, 'inputs') ]: entity_ref = EntityReference(entity_type=entity_type, entity_id=row['datarepo_row_id']) result.add_entity(entity_ref, self._version, row) @@ -352,7 +353,7 @@ def _duos_bundle(self, bundle_fqid: TDRAnvilBundleFQID) -> TDRAnvilBundle: entity_id = change_version(bundle_fqid.uuid, self.bundle_uuid_version, self.datarepo_row_uuid_version) - entity = EntityReference(entity_type=bundle_fqid.entity_type.value, + entity = EntityReference(entity_type='dataset', entity_id=entity_id) bundle = TDRAnvilBundle(fqid=bundle_fqid) bundle.add_entity(entity=entity, @@ -366,16 +367,17 @@ def _bundle_entity(self, bundle_fqid: TDRAnvilBundleFQID) -> KeyReference: entity_id = uuids.change_version(bundle_uuid, self.bundle_uuid_version, self.datarepo_row_uuid_version) - entity_type = bundle_fqid.entity_type.value + table_name = bundle_fqid.table_name + entity_type = table_name.removeprefix('anvil_') pk_column = entity_type + '_id' bundle_entity = one(self._run_sql(f''' SELECT {pk_column} - FROM {backtick(self._full_table_name(source.spec, entity_type))} + FROM {backtick(self._full_table_name(source.spec, table_name))} WHERE datarepo_row_id = '{entity_id}' '''))[pk_column] bundle_entity = KeyReference(key=bundle_entity, entity_type=entity_type) log.info('Bundle UUID %r resolved to primary key %r in table %r', - bundle_uuid, bundle_entity.key, entity_type) + bundle_uuid, bundle_entity.key, table_name) return bundle_entity def _full_table_name(self, source: TDRSourceSpec, table_name: str) -> str: @@ -445,7 +447,7 @@ def _upstream_from_biosamples(self, if biosample_ids: rows = self._run_sql(f''' SELECT b.biosample_id, b.donor_id, b.part_of_dataset_id - FROM {backtick(self._full_table_name(source, 'biosample'))} AS b + FROM {backtick(self._full_table_name(source, 'anvil_biosample'))} AS b WHERE b.biosample_id IN ({', '.join(map(repr, biosample_ids))}) ''') result: KeyLinks = set() @@ -470,7 +472,7 @@ def _upstream_from_files(self, if file_ids: rows = self._run_sql(f''' WITH file AS ( - SELECT f.file_id FROM {backtick(self._full_table_name(source, 'file'))} AS f + SELECT f.file_id FROM {backtick(self._full_table_name(source, 'anvil_file'))} AS f WHERE f.file_id IN ({', '.join(map(repr, file_ids))}) ) SELECT @@ -480,7 +482,7 @@ def _upstream_from_files(self, ama.used_file_id AS uses_file_id, [] AS uses_biosample_id, FROM file AS f - JOIN {backtick(self._full_table_name(source, 'alignmentactivity'))} AS ama + JOIN {backtick(self._full_table_name(source, 'anvil_alignmentactivity'))} AS ama ON f.file_id IN UNNEST(ama.generated_file_id) UNION ALL SELECT f.file_id, @@ -489,7 +491,7 @@ def _upstream_from_files(self, [], aya.used_biosample_id, FROM file AS f - JOIN {backtick(self._full_table_name(source, 'assayactivity'))} AS aya + JOIN {backtick(self._full_table_name(source, 'anvil_assayactivity'))} AS aya ON f.file_id IN UNNEST(aya.generated_file_id) UNION ALL SELECT f.file_id, @@ -498,7 +500,7 @@ def _upstream_from_files(self, [], sqa.used_biosample_id, FROM file AS f - JOIN {backtick(self._full_table_name(source, 'sequencingactivity'))} AS sqa + JOIN {backtick(self._full_table_name(source, 'anvil_sequencingactivity'))} AS sqa ON f.file_id IN UNNEST(sqa.generated_file_id) UNION ALL SELECT f.file_id, @@ -507,7 +509,7 @@ def _upstream_from_files(self, vca.used_file_id, [] FROM file AS f - JOIN {backtick(self._full_table_name(source, 'variantcallingactivity'))} AS vca + JOIN {backtick(self._full_table_name(source, 'anvil_variantcallingactivity'))} AS vca ON f.file_id IN UNNEST(vca.generated_file_id) UNION ALL SELECT f.file_id, @@ -516,7 +518,7 @@ def _upstream_from_files(self, a.used_file_id, a.used_biosample_id, FROM file AS f - JOIN {backtick(self._full_table_name(source, 'activity'))} AS a + JOIN {backtick(self._full_table_name(source, 'anvil_activity'))} AS a ON f.file_id IN UNNEST(a.generated_file_id) ''') return { @@ -545,7 +547,7 @@ def _diagnoses_from_donors(self, if donor_ids: rows = self._run_sql(f''' SELECT dgn.donor_id, dgn.diagnosis_id - FROM {backtick(self._full_table_name(source, 'diagnosis'))} as dgn + FROM {backtick(self._full_table_name(source, 'anvil_diagnosis'))} as dgn WHERE dgn.donor_id IN ({', '.join(map(repr, donor_ids))}) ''') return { @@ -569,21 +571,21 @@ def _downstream_from_biosamples(self, 'sequencingactivity' as activity_table, sqa.used_biosample_id, sqa.generated_file_id - FROM {backtick(self._full_table_name(source, 'sequencingactivity'))} AS sqa + FROM {backtick(self._full_table_name(source, 'anvil_sequencingactivity'))} AS sqa UNION ALL SELECT aya.assayactivity_id, 'assayactivity', aya.used_biosample_id, aya.generated_file_id, - FROM {backtick(self._full_table_name(source, 'assayactivity'))} AS aya + FROM {backtick(self._full_table_name(source, 'anvil_assayactivity'))} AS aya UNION ALL SELECT a.activity_id, 'activity', a.used_biosample_id, a.generated_file_id, - FROM {backtick(self._full_table_name(source, 'activity'))} AS a + FROM {backtick(self._full_table_name(source, 'anvil_activity'))} AS a ) SELECT biosample_id, @@ -617,19 +619,19 @@ def _downstream_from_files(self, 'alignmentactivity' AS activity_table, ala.used_file_id, ala.generated_file_id - FROM {backtick(self._full_table_name(source, 'alignmentactivity'))} AS ala + FROM {backtick(self._full_table_name(source, 'anvil_alignmentactivity'))} AS ala UNION ALL SELECT vca.variantcallingactivity_id, 'variantcallingactivity', vca.used_file_id, vca.generated_file_id - FROM {backtick(self._full_table_name(source, 'variantcallingactivity'))} AS vca + FROM {backtick(self._full_table_name(source, 'anvil_variantcallingactivity'))} AS vca UNION ALL SELECT a.activity_id, 'activity', a.used_file_id, a.generated_file_id - FROM {backtick(self._full_table_name(source, 'activity'))} AS a + FROM {backtick(self._full_table_name(source, 'anvil_activity'))} AS a ) SELECT used_file_id, @@ -657,8 +659,9 @@ def _retrieve_entities(self, keys: AbstractSet[Key], ) -> MutableJSONs: if keys: - table_name = self._full_table_name(source, entity_type) - columns = self._columns(entity_type) + table_name = f'anvil_{entity_type}' + columns = self._columns(table_name) + table_name = self._full_table_name(source, table_name) pk_column = entity_type + '_id' assert pk_column in columns, entity_type log.debug('Retrieving %i entities of type %r ...', len(keys), entity_type) @@ -693,7 +696,7 @@ def convert_column(value): for table in anvil_schema['tables'] } - def _columns(self, entity_type: EntityType) -> set[str]: - columns = set(self._schema_columns[f'anvil_{entity_type}']) + def _columns(self, table_name: str) -> set[str]: + columns = set(self._schema_columns[table_name]) columns.add('datarepo_row_id') return columns diff --git a/test/indexer/test_anvil.py b/test/indexer/test_anvil.py index 3ad0d3b71..49c9626f4 100644 --- a/test/indexer/test_anvil.py +++ b/test/indexer/test_anvil.py @@ -42,7 +42,7 @@ tdr_anvil, ) from azul.plugins.repository.tdr_anvil import ( - BundleEntityType, + BundleType, TDRAnvilBundle, TDRAnvilBundleFQID, ) @@ -105,13 +105,13 @@ def bundle_fqid(cls, *, uuid, version=None, - entity_type=BundleEntityType.primary + table_name=BundleType.primary.value ) -> TDRAnvilBundleFQID: assert version is None, 'All AnVIL bundles should use the same version' return TDRAnvilBundleFQID(source=cls.source, uuid=uuid, version=cls.version, - entity_type=entity_type) + table_name=table_name) @classmethod def primary_bundle(cls) -> TDRAnvilBundleFQID: @@ -120,12 +120,12 @@ def primary_bundle(cls) -> TDRAnvilBundleFQID: @classmethod def supplementary_bundle(cls) -> TDRAnvilBundleFQID: return cls.bundle_fqid(uuid='6b0f6c0f-5d80-a242-accb-840921351cd5', - entity_type=BundleEntityType.supplementary) + table_name=BundleType.supplementary.value) @classmethod def duos_bundle(cls) -> TDRAnvilBundleFQID: return cls.bundle_fqid(uuid='2370f948-2783-aeb6-afea-e022897f4dcf', - entity_type=BundleEntityType.duos) + table_name=BundleType.duos.value) class TestAnvilIndexer(AnvilIndexerTestCase, diff --git a/test/integration_test.py b/test/integration_test.py index ee66f595c..75849698d 100644 --- a/test/integration_test.py +++ b/test/integration_test.py @@ -152,7 +152,7 @@ Link, ) from azul.plugins.repository.tdr_anvil import ( - BundleEntityType, + BundleType, TDRAnvilBundleFQID, TDRAnvilBundleFQIDJSON, ) @@ -166,9 +166,6 @@ ManifestFormat, ManifestGenerator, ) -from azul.strings import ( - pluralize, -) from azul.terra import ( ServiceAccountCredentialsProvider, TDRClient, @@ -350,7 +347,7 @@ def _list_managed_access_bundles(self, if not ( # DUOS bundles are too sparse to fulfill the managed access tests config.is_anvil_enabled(catalog) - and cast(TDRAnvilBundleFQID, bundle_fqid).entity_type is BundleEntityType.duos + and cast(TDRAnvilBundleFQID, bundle_fqid).table_name == BundleType.duos.value ) ) bundle_fqid = self.random.choice(bundle_fqids) @@ -503,8 +500,7 @@ def _test_other_endpoints(self): if config.is_hca_enabled(catalog): bundle_index, project_index = 'bundles', 'projects' elif config.is_anvil_enabled(catalog): - bundle_index = pluralize(BundleEntityType.primary.value) - project_index = 'datasets' + bundle_index, project_index = 'biosamples', 'datasets' else: assert False, catalog service_paths = { @@ -1276,17 +1272,17 @@ def _get_indexed_bundles(self, # and 0 or more other entities. Biosamples only occur in primary # bundles. if len(hit['biosamples']) > 0: - entity_type = BundleEntityType.primary + bundle_type = BundleType.primary # Supplementary bundles contain only 1 file and 1 dataset. elif len(hit['files']) > 0: - entity_type = BundleEntityType.supplementary + bundle_type = BundleType.supplementary # DUOS bundles contain only 1 dataset. elif len(hit['datasets']) > 0: - entity_type = BundleEntityType.duos + bundle_type = BundleType.duos else: assert False, hit bundle_fqid = cast(TDRAnvilBundleFQIDJSON, bundle_fqid) - bundle_fqid['entity_type'] = entity_type.value + bundle_fqid['table_name'] = bundle_type.value bundle_fqid = self.repository_plugin(catalog).resolve_bundle(bundle_fqid) indexed_fqids.add(bundle_fqid) return indexed_fqids