From 71bda1c9209bdbd3fc4dfc1415a044d5092928f1 Mon Sep 17 00:00:00 2001 From: Noa Aviel Dove Date: Fri, 11 Oct 2024 19:07:11 -0700 Subject: [PATCH] [r] Index orphaned replicas (#6626) --- .../plugins/repository/tdr_anvil/__init__.py | 427 ++++++++++++------ src/azul/terra.py | 9 + src/azul/uuids.py | 25 + ...0000-a000-0000-000000000000.tdr.anvil.json | 80 ++++ ...09d-46a4-b845-7584df39263b.tables.tdr.json | 55 +++ test/indexer/test_anvil.py | 35 +- test/indexer/test_tdr.py | 4 +- 7 files changed, 489 insertions(+), 146 deletions(-) create mode 100644 test/indexer/data/00000000-0000-a000-0000-000000000000.tdr.anvil.json diff --git a/src/azul/plugins/repository/tdr_anvil/__init__.py b/src/azul/plugins/repository/tdr_anvil/__init__.py index 8b2fd0295..a0840d2cb 100644 --- a/src/azul/plugins/repository/tdr_anvil/__init__.py +++ b/src/azul/plugins/repository/tdr_anvil/__init__.py @@ -2,6 +2,7 @@ from enum import ( Enum, ) +import itertools import logging from operator import ( itemgetter, @@ -10,6 +11,7 @@ AbstractSet, Callable, Iterable, + Self, ) import attrs @@ -24,12 +26,14 @@ uuids, ) from azul.bigquery import ( + BigQueryRow, backtick, ) from azul.drs import ( DRSURI, ) from azul.indexer import ( + Prefix, SourcedBundleFQIDJSON, ) from azul.indexer.document import ( @@ -61,6 +65,7 @@ ) from azul.uuids import ( change_version, + zero_pad, ) log = logging.getLogger(__name__) @@ -74,49 +79,66 @@ class BundleType(Enum): """ - AnVIL snapshots have no inherent notion of a "bundle". When indexing these - snapshots, we dynamically construct bundles by selecting individual entities - and following their foreign keys to discover associated entities. The - initial entity from which this graph traversal begins is termed the - "bundle entity", and its FQID serves as the basis for the bundle FQID. Each - member of this enumeration represents a strategy for selecting bundle - entities. - - Our primary such strategy is to use every biosample in a given snapshot as a - bundle entity. Biosamples were chosen for this role based on a desirable - balance between the size and number of the resulting bundles as well as the - degree of overlap between them. The implementation of the graph traversal is - tightly coupled to this choice, and switching to a different entity type - would require re-implementing much of the Plugin code. Primary bundles - consist of at least one biosample (the bundle entity), exactly one dataset, - and zero or more other entities of assorted types. - - Some snapshots include file entities that lack any foreign keys that - associate the file with any other entity. To ensure that these "orphaned" - files are indexed, they are also used as bundle entities. As with primary - bundles, the creation of these supplementary bundles depends on a - specifically tailored traversal implementation. Supplementary bundles always - consist of exactly two entities: one file (the bundle entity) and one - dataset. - - The `dataset.description` field is unusual in that it is not stored in - BigQuery and must be retrieved via Terra's DUOS API. There is only one - dataset per snapshot, which is referenced in all primary and supplementary - bundles. Therefore, only one request to DUOS per *snapshot* is necessary, - but if `description` is retrieved at the same time as the other dataset - fields, we will make one request per *bundle* instead, potentially - overloading the DUOS service. Our solution is to retrieve `description` only - in a dedicated bundle format, once per snapshot, and merge it with the other - dataset fields during aggregation. This bundle contains only a single - dataset entity with only the `description` field populated. + AnVIL snapshots have no inherent notion of a "bundle". During indexing, we + dynamically construct bundles by querying each table in the snapshot. This + class enumerates the tables that require special strategies for listing and + fetching their bundles. + + Primary bundles are defined by a biosample entity, termed the bundle entity. + Each primary bundle includes all entities derived from the bundle entity + and all the ancestors of those entities, which are discovered by iteratively + following foreign keys. Biosamples were chosen for this role based on a + desirable balance between the size and number of the resulting bundles as + well as the degree of overlap between them. The implementation of the graph + traversal is tightly coupled to this choice, and switching to a different + entity type would require re-implementing much of the Plugin code. Primary + bundles consist of at least one biosample (the bundle entity), exactly one + dataset, and zero or more other entities of assorted types. Primary bundles + never contain orphans because they are bijective to rows in the biosample + table. + + Supplementary bundles consist of batches of file entities, which may include + supplementary files, which lack any foreign keys that associate them with + any other entity. Non-supplementary files in the bundle are classified as + orphans. The bundle also includes a dataset entity linked to the + supplementary bundles. + + Duos bundles consist of a single dataset entity. The "entity" includes only + the dataset description retrieved from DUOS, while a copy of the BigQuery + row for this dataset is also included as an orphan. We chose this design + because there is only one dataset per snapshot, which is referenced in all + primary and supplementary bundles. Therefore, only one request to DUOS per + *snapshot* is necessary, but if `description` is retrieved at the same time + as the other dataset fields, we will make one request per *bundle* instead, + potentially overloading the DUOS service. Our solution is to retrieve + `description` only in a dedicated bundle format, once per snapshot, and + merge it with the other dataset fields during aggregation. + + All other tables are batched like supplementary bundles, but only include + orphans and have no links. """ primary = 'anvil_biosample' supplementary = 'anvil_file' duos = 'anvil_dataset' + def is_batched(self: Self | str) -> bool: + """ + >>> BundleType.primary.is_batched() + False + + >>> BundleType.is_batched('anvil_activity') + True + """ + if isinstance(self, str): + try: + self = BundleType(self) + except ValueError: + return True + return self not in (BundleType.primary, BundleType.duos) + class TDRAnvilBundleFQIDJSON(SourcedBundleFQIDJSON): - pass + batch_prefix_length: int | None @attrs.frozen(kw_only=True) @@ -125,6 +147,26 @@ class TDRAnvilBundleFQID(TDRBundleFQID): def to_json(self) -> TDRAnvilBundleFQIDJSON: return super().to_json() + batch_prefix_length: int | None + + def __attrs_post_init__(self): + should_be_batched = BundleType.is_batched(self.version) + is_batched = self.is_batched + assert is_batched == should_be_batched, self + if is_batched: + assert 0 <= self.batch_prefix_length <= 8, self + + @property + def is_batched(self) -> bool: + return self.batch_prefix_length is not None + + @property + def batch_prefix(self) -> str | None: + if self.batch_prefix_length is None: + return None + else: + return self.uuid[:self.batch_prefix_length] + class TDRAnvilBundle(AnvilBundle[TDRAnvilBundleFQID], TDRBundle): @@ -174,72 +216,79 @@ def _version(self): bundle_uuid_version = 10 def _count_subgraphs(self, source: TDRSourceSpec) -> int: - rows = self._run_sql(f''' - SELECT COUNT(*) AS count - FROM {backtick(self._full_table_name(source, BundleType.primary.value))} - UNION ALL - SELECT COUNT(*) AS count - FROM {backtick(self._full_table_name(source, BundleType.supplementary.value))} - WHERE is_supplementary - ''') - return sum(row['count'] for row in rows) + total = 0 + for table_name in self._list_tables(source): + if BundleType.is_batched(table_name): + _, bundle_count = self._batch_table(source, table_name, prefix='') + else: + rows = self._run_sql(f''' + SELECT COUNT(*) AS count + FROM {backtick(self._full_table_name(source, table_name))} + ''') + bundle_count = one(rows)['count'] + total += bundle_count + return total def _list_bundles(self, source: TDRSourceRef, prefix: str ) -> list[TDRAnvilBundleFQID]: - spec = source.spec - primary = BundleType.primary.value - supplementary = BundleType.supplementary.value - duos = BundleType.duos.value - rows = list(self._run_sql(f''' - SELECT datarepo_row_id, {primary!r} AS table_name - FROM {backtick(self._full_table_name(spec, primary))} - WHERE STARTS_WITH(datarepo_row_id, '{prefix}') - UNION ALL - SELECT datarepo_row_id, {supplementary!r} AS table_name - FROM {backtick(self._full_table_name(spec, supplementary))} AS supp - WHERE supp.is_supplementary AND STARTS_WITH(datarepo_row_id, '{prefix}') - ''' + ( - '' - if config.duos_service_url is None else - f''' - UNION ALL - SELECT datarepo_row_id, {duos!r} AS table_name - FROM {backtick(self._full_table_name(spec, duos))} - ''' - ))) bundles = [] - duos_count = 0 - for row in rows: - # Reversibly tweak the entity UUID to prevent - # collisions between entity IDs and bundle IDs - bundle_uuid = uuids.change_version(row['datarepo_row_id'], - self.datarepo_row_uuid_version, - self.bundle_uuid_version) - # We intentionally omit the WHERE clause for datasets so that we can - # verify our assumption that each snapshot only contains rows for a - # single dataset. This verification is performed independently and - # concurrently for every partition, but only one partition actually - # emits the bundle. - if row['table_name'] == duos: - require(0 == duos_count) - duos_count += 1 - # Ensure that one partition will always contain the DUOS bundle - # regardless of the choice of common prefix - if not bundle_uuid.startswith(prefix): - continue - bundles.append(TDRAnvilBundleFQID( - source=source, - uuid=bundle_uuid, - version=row['table_name'] - )) + spec = source.spec + for table_name in self._list_tables(source.spec): + if table_name == BundleType.duos.value: + batch_prefix_length = None + row = one(self._run_sql(f''' + SELECT datarepo_row_id + FROM {backtick(self._full_table_name(spec, table_name))} + ''')) + dataset_row_id = row['datarepo_row_id'] + # We intentionally omit the WHERE clause for datasets in order + # to verify our assumption that each snapshot only contains rows + # for a single dataset. This verification is performed + # independently and concurrently for every partition, but only + # one partition actually emits the bundle. + bundle_uuids = [ + change_version(dataset_row_id, + self.datarepo_row_uuid_version, + self.bundle_uuid_version) + ] if dataset_row_id.startswith(prefix) else [] + elif table_name == BundleType.primary.value: + batch_prefix_length = None + bundle_uuids = ( + change_version(row['datarepo_row_id'], + self.datarepo_row_uuid_version, + self.bundle_uuid_version) + for row in self._run_sql(f''' + SELECT datarepo_row_id + FROM {backtick(self._full_table_name(spec, table_name))} + WHERE STARTS_WITH(datarepo_row_id, {prefix!r}) + ''') + ) + else: + batch_prefix_length, _ = self._batch_table(spec, table_name, prefix) + if batch_prefix_length is None: + bundle_uuids = [] + else: + batch_prefix = Prefix(common=prefix, + partition=batch_prefix_length - len(prefix)) + bundle_uuids = ( + zero_pad(batch, self.bundle_uuid_version) + for batch in batch_prefix.partition_prefixes() + ) + for bundle_uuid in bundle_uuids: + bundles.append(TDRAnvilBundleFQID( + source=source, + uuid=bundle_uuid, + version=table_name, + batch_prefix_length=batch_prefix_length, + )) return bundles def resolve_bundle(self, fqid: SourcedBundleFQIDJSON) -> TDRAnvilBundleFQID: if fqid['version'] is None: - # Resolution of bundles without the table name is expensive, so we - # only support it during canning. + # Resolution of bundles without the table name and batch prefix + # length is expensive, so we only support it during canning. assert not config.is_in_lambda, ('Bundle FQID lacks table name', fqid) source = self.source_from_json(fqid['source']) entity_id = uuids.change_version(fqid['uuid'], @@ -247,13 +296,20 @@ def resolve_bundle(self, fqid: SourcedBundleFQIDJSON) -> TDRAnvilBundleFQID: self.datarepo_row_uuid_version) rows = self._run_sql(' UNION ALL '.join(( f''' - SELECT {bundle_type.value!r} AS table_name - FROM {backtick(self._full_table_name(source.spec, bundle_type.value))} + SELECT {table_name!r} AS table_name + FROM {backtick(self._full_table_name(source.spec, table_name))} WHERE datarepo_row_id = {entity_id!r} ''' - for bundle_type in BundleType + for table_name in self._list_tables(source.spec) ))) - fqid = {**fqid, **one(rows)} + table_name = one(rows)['table_name'] + if BundleType.is_batched(table_name): + batch_prefix_length, _ = self._batch_table(source.spec, table_name, '') + else: + batch_prefix_length = None + fqid = TDRAnvilBundleFQIDJSON(**fqid, + version=table_name, + batch_prefix_length=batch_prefix_length) return super().resolve_bundle(fqid) def _emulate_bundle(self, bundle_fqid: TDRAnvilBundleFQID) -> TDRAnvilBundle: @@ -268,9 +324,78 @@ def _emulate_bundle(self, bundle_fqid: TDRAnvilBundleFQID) -> TDRAnvilBundle: log.info('Bundle %r is a DUOS bundle', bundle_fqid.uuid) return self._duos_bundle(bundle_fqid) else: - assert False, bundle_fqid.version + log.info('Bundle %r is a replica bundle', bundle_fqid.uuid) + return self._replica_bundle(bundle_fqid) + + def _batch_table(self, + source: TDRSourceSpec, + table_name: str, + prefix: str, + ) -> tuple[int | None, int]: + """ + Find a batch prefix length that yields as close to 256 rows per batch + as possible within the specified table partition. The first element is + the prefix length (*including* the partition prefix), or None if the + table contains no rows, and the second element is the resulting number + of batches. + + Because the partitions of a table do not contain exactly the same number + of bundles, calculating the batch size statistics for the entire table + at once produces a different result than performing the same calculation + for any individual partition. We expect the inconsistencies to average + out across partitions so that `_count_subgraphs` and `_list_bundles` + give consistent results. + + This method relies on BigQuery's `AVG` function, which is + nondeterministic for floating-point return values. The probability that + this affects this method's return value is very small, but nonzero. + https://cloud.google.com/bigquery/docs/reference/standard-sql/aggregate_functions#avg + """ + max_length = 4 + + def repeat(fmt): + return ', '.join(fmt.format(i=i) for i in range(1, max_length + 1)) + + prefix_len = len(prefix) + log.info('Calculating batch prefix length for partition %r of table %r ' + 'in source %s', prefix, table_name, source) + query = f''' + WITH counts AS ( + SELECT + {repeat(f'SUBSTR(datarepo_row_id, {prefix_len} + {{i}}, 1) AS p{{i}}')}, + COUNT(*) AS num_rows + FROM {backtick(self._full_table_name(source, table_name))} + WHERE STARTS_WITH(datarepo_row_id, {prefix!r}) + GROUP BY ROLLUP ({repeat('p{i}')}) + ) + SELECT + {prefix_len} + LENGTH(CONCAT( + {repeat('IFNULL(p{i}, "")')} + )) AS batch_prefix_length, + AVG(num_rows) AS average_batch_size, + COUNT(*) AS num_batches + FROM counts + GROUP BY batch_prefix_length + ''' + rows = list(self._run_sql(query)) + target_size = 256 + # `average_batch_size` is nondeterministic here + if len(rows) > 0: + row = min(rows, key=lambda row: abs(target_size - row['average_batch_size'])) + prefix_length = row['batch_prefix_length'] + average_size = row['average_batch_size'] + count = row['num_batches'] + log.info('Selected batch prefix length %d (average batch size %.1f, ' + 'num batches %d)', prefix_length, average_size, count) + else: + log.info('Table is empty, emitting no batches') + prefix_length = None + count = 0 + + return prefix_length, count def _primary_bundle(self, bundle_fqid: TDRAnvilBundleFQID) -> TDRAnvilBundle: + assert not bundle_fqid.is_batched, bundle_fqid source = bundle_fqid.source bundle_entity = self._bundle_entity(bundle_fqid) @@ -320,50 +445,74 @@ def _primary_bundle(self, bundle_fqid: TDRAnvilBundleFQID) -> TDRAnvilBundle: return result def _supplementary_bundle(self, bundle_fqid: TDRAnvilBundleFQID) -> TDRAnvilBundle: - entity_id = uuids.change_version(bundle_fqid.uuid, - self.bundle_uuid_version, - self.datarepo_row_uuid_version) + assert bundle_fqid.is_batched, bundle_fqid source = bundle_fqid.source.spec - table_name = bundle_fqid.version result = TDRAnvilBundle(fqid=bundle_fqid) - columns = self._columns(table_name) - bundle_entity = dict(one(self._run_sql(f''' - SELECT {', '.join(sorted(columns))} - FROM {backtick(self._full_table_name(source, table_name))} - WHERE datarepo_row_id = '{entity_id}' - '''))) - dataset = 'dataset' - dataset_table = f'anvil_{dataset}' - columns = self._columns(dataset_table) - linked_entity = dict(one(self._run_sql(f''' - SELECT {', '.join(sorted(columns))} - FROM {backtick(self._full_table_name(source, dataset_table))} - '''))) - link_args = {} - for entity_type, row, arg in [ - ('file', bundle_entity, 'outputs'), - (dataset, linked_entity, 'inputs') - ]: - entity_ref = EntityReference(entity_type=entity_type, entity_id=row['datarepo_row_id']) - result.add_entity(entity_ref, self._version, row) - link_args[arg] = {entity_ref} - result.add_links({EntityLink(**link_args)}) + linked_file_refs = set() + for file_ref, file_row in self._get_batch(bundle_fqid): + is_supplementary = file_row['is_supplementary'] + result.add_entity(file_ref, + self._version, + dict(file_row), + is_orphan=not is_supplementary) + if is_supplementary: + linked_file_refs.add(file_ref) + dataset_ref, dataset_row = self._get_dataset(source) + result.add_entity(dataset_ref, self._version, dict(dataset_row)) + result.add_links([EntityLink(inputs={dataset_ref}, outputs=linked_file_refs)]) return result def _duos_bundle(self, bundle_fqid: TDRAnvilBundleFQID) -> TDRAnvilBundle: + assert not bundle_fqid.is_batched, bundle_fqid duos_info = self.tdr.get_duos(bundle_fqid.source) description = None if duos_info is None else duos_info.get('studyDescription') - entity_id = change_version(bundle_fqid.uuid, - self.bundle_uuid_version, - self.datarepo_row_uuid_version) - entity = EntityReference(entity_type='dataset', - entity_id=entity_id) + ref, row = self._get_dataset(bundle_fqid.source.spec) + expected_entity_id = change_version(bundle_fqid.uuid, + self.bundle_uuid_version, + self.datarepo_row_uuid_version) + assert ref.entity_id == expected_entity_id, (ref, bundle_fqid) bundle = TDRAnvilBundle(fqid=bundle_fqid) - bundle.add_entity(entity=entity, - version=self._version, - row={'description': description}) + bundle.add_entity(ref, self._version, {'description': description}) + # Classify as orphan to suppress the emission of a contribution + bundle.add_entity(ref, self._version, dict(row), is_orphan=True) return bundle + def _replica_bundle(self, bundle_fqid: TDRAnvilBundleFQID) -> TDRAnvilBundle: + assert bundle_fqid.is_batched, bundle_fqid + source = bundle_fqid.source.spec + result = TDRAnvilBundle(fqid=bundle_fqid) + batch = self._get_batch(bundle_fqid) + dataset = self._get_dataset(source) + for (ref, row) in itertools.chain([dataset], batch): + result.add_entity(ref, self._version, dict(row), is_orphan=True) + return result + + def _get_dataset(self, source: TDRSourceSpec) -> tuple[EntityReference, BigQueryRow]: + table_name = 'anvil_dataset' + columns = self._columns(table_name) + row = one(self._run_sql(f''' + SELECT {', '.join(sorted(columns))} + FROM {backtick(self._full_table_name(source, table_name))} + ''')) + ref = EntityReference(entity_type='dataset', entity_id=row['datarepo_row_id']) + return ref, row + + def _get_batch(self, + bundle_fqid: TDRAnvilBundleFQID + ) -> Iterable[tuple[EntityReference, BigQueryRow]]: + source = bundle_fqid.source.spec + batch_prefix = bundle_fqid.batch_prefix + table_name = bundle_fqid.version + columns = self._columns(table_name) + for row in self._run_sql(f''' + SELECT {', '.join(sorted(columns))} + FROM {backtick(self._full_table_name(source, table_name))} + WHERE STARTS_WITH(datarepo_row_id, {batch_prefix!r}) + '''): + ref = EntityReference(entity_type=table_name.removeprefix('anvil_'), + entity_id=row['datarepo_row_id']) + yield ref, row + def _bundle_entity(self, bundle_fqid: TDRAnvilBundleFQID) -> KeyReference: source = bundle_fqid.source bundle_uuid = bundle_fqid.uuid @@ -694,6 +843,16 @@ def convert_column(value): } def _columns(self, table_name: str) -> set[str]: - columns = set(self._schema_columns[table_name]) - columns.add('datarepo_row_id') - return columns + try: + columns = self._schema_columns[table_name] + except KeyError: + return {'*'} + else: + columns = set(columns) + columns.add('datarepo_row_id') + return columns + + def _list_tables(self, source: TDRSourceSpec) -> AbstractSet[str]: + return set(self.tdr.list_tables(source)) - { + 'datarepo_row_ids' + } diff --git a/src/azul/terra.py b/src/azul/terra.py index a468c382e..ba4febd32 100644 --- a/src/azul/terra.py +++ b/src/azul/terra.py @@ -39,6 +39,7 @@ bigquery, ) from google.cloud.bigquery import ( + DatasetReference, QueryJob, QueryJobConfig, QueryPriority, @@ -507,6 +508,14 @@ def run_sql(self, query: str) -> BigQueryRows: log.debug('Job info: %s', json.dumps(self._job_info(job))) return result + def list_tables(self, source: TDRSourceSpec) -> set[str]: + bigquery = self._bigquery(self.credentials.project_id) + ref = DatasetReference(project=source.subdomain, dataset_id=source.name) + return { + table.to_api_repr()['tableReference']['tableId'] + for table in bigquery.list_tables(ref) + } + def _trunc_query(self, query: str) -> str: return trunc_ellipses(query, 2048) diff --git a/src/azul/uuids.py b/src/azul/uuids.py index ad1017ad1..9e2610f64 100644 --- a/src/azul/uuids.py +++ b/src/azul/uuids.py @@ -131,6 +131,31 @@ def change_version(uuid: str, old_version: int, new_version: int) -> str: return uuid +def zero_pad(prefix: str, version: int) -> str: + """ + Extend a prefix with zeros to produce a valid UUID. + + >>> zero_pad('', 1) + '00000000-0000-1000-8000-000000000000' + + >>> zero_pad('abcd', 4) + 'abcd0000-0000-4000-8000-000000000000' + + >>> zero_pad('f' * 32, 1) + 'ffffffff-ffff-1fff-bfff-ffffffffffff' + + >>> zero_pad('f' * 33, 1) + Traceback (most recent call last): + ... + ValueError: badly formed hexadecimal UUID string + """ + # The intermediary representation is necessary to support non-standard + # versions + temp_version = 1 + u = str(UUID(prefix.ljust(32, '0'), version=temp_version)) + return change_version(u, temp_version, version) + + UUID_PARTITION = TypeVar('UUID_PARTITION', bound='UUIDPartition') diff --git a/test/indexer/data/00000000-0000-a000-0000-000000000000.tdr.anvil.json b/test/indexer/data/00000000-0000-a000-0000-000000000000.tdr.anvil.json new file mode 100644 index 000000000..3c32fe667 --- /dev/null +++ b/test/indexer/data/00000000-0000-a000-0000-000000000000.tdr.anvil.json @@ -0,0 +1,80 @@ +{ + "entities": {}, + "links": [], + "orphans": { + "dataset/2370f948-2783-4eb6-afea-e022897f4dcf": { + "consent_group": [ + "DS-BDIS" + ], + "data_modality": [], + "data_use_permission": [ + "DS-BDIS" + ], + "datarepo_row_id": "2370f948-2783-4eb6-afea-e022897f4dcf", + "dataset_id": "52ee7665-7033-63f2-a8d9-ce8e32666739", + "owner": [ + "Debbie Nickerson" + ], + "principal_investigator": [], + "registered_identifier": [ + "phs000693" + ], + "source_datarepo_row_ids": [ + "workspace_attributes:7a22b629-9d81-4e4d-9297-f9e44ed760bc" + ], + "title": "ANVIL_CMG_UWASH_DS_BDIS", + "version": "2022-06-01T00:00:00.000000Z" + }, + "activity/7c0b04d4-09a3-4e00-a142-3a4c5fc4c3fb": { + "datarepo_row_id": "7c0b04d4-09a3-4e00-a142-3a4c5fc4c3fb", + "activity_id": "71f12c9c-5115-374b-cd96-9414b6f1935a", + "activity_type": "Unknown", + "used_file_id": [ + "ad7932a7-55d3-36af-ba7d-cb22b627f10d" + ], + "generated_file_id": [ + "00195da1-8e9f-3d95-957b-41607759de38" + ], + "used_biosample_id": [], + "source_datarepo_row_ids": [ + "file_inventory:239769e6-1ea0-4066-8289-4c1a07d180ee", + "file_inventory:0945564c-b45b-482e-94db-7506118fc50d" + ], + "version": "2022-06-01T00:00:00.000000Z" + }, + "activity/7fce9cf8-5d2b-4a27-83fc-bc22214116e3": { + "datarepo_row_id": "7fce9cf8-5d2b-4a27-83fc-bc22214116e3", + "activity_id": "8d502145-9cd0-0a30-a6fe-8c4f529086c4", + "activity_type": "Unknown", + "used_file_id": [ + "af3ca37e-76a8-3568-94aa-b9f0f256bcf2" + ], + "generated_file_id": [ + "0027abb3-5038-3849-aeaf-adccdc692259" + ], + "used_biosample_id": [], + "source_datarepo_row_ids": [ + "file_inventory:c4d7b383-cab6-46e3-8d8f-37b0c8e42f25", + "file_inventory:c332b008-d443-4fa2-878a-3af2b3ebd893" + ], + "version": "2022-06-01T00:00:00.000000Z" + }, + "activity/fc734969-1c7f-4806-b0f8-299584a6158d": { + "datarepo_row_id": "fc734969-1c7f-4806-b0f8-299584a6158d", + "activity_id": "07361e60-5446-4a38-b4ae-c765c6f4cfbd", + "activity_type": "Unknown", + "used_file_id": [ + "1c086a00-2869-3f58-9ed8-87e5f4d59e33" + ], + "generated_file_id": [ + "002eddb4-d277-36a4-bf0e-8c8318d10531" + ], + "used_biosample_id": [], + "source_datarepo_row_ids": [ + "file_inventory:b856a7f2-c4ed-4cd1-a2fb-cd9dc7a810f5", + "file_inventory:53f3388d-314e-407c-8548-85797a56b735" + ], + "version": "2022-06-01T00:00:00.000000Z" + } + } +} \ No newline at end of file diff --git a/test/indexer/data/6c87f0e1-509d-46a4-b845-7584df39263b.tables.tdr.json b/test/indexer/data/6c87f0e1-509d-46a4-b845-7584df39263b.tables.tdr.json index c1783920e..f31a92201 100644 --- a/test/indexer/data/6c87f0e1-509d-46a4-b845-7584df39263b.tables.tdr.json +++ b/test/indexer/data/6c87f0e1-509d-46a4-b845-7584df39263b.tables.tdr.json @@ -10,6 +10,54 @@ "source_datarepo_row_ids": [], "used_biosample_id": [], "used_file_id": [] + }, + { + "datarepo_row_id": "7c0b04d4-09a3-4e00-a142-3a4c5fc4c3fb", + "activity_id": "71f12c9c-5115-374b-cd96-9414b6f1935a", + "activity_type": "Unknown", + "used_file_id": [ + "ad7932a7-55d3-36af-ba7d-cb22b627f10d" + ], + "generated_file_id": [ + "00195da1-8e9f-3d95-957b-41607759de38" + ], + "used_biosample_id": [], + "source_datarepo_row_ids": [ + "file_inventory:239769e6-1ea0-4066-8289-4c1a07d180ee", + "file_inventory:0945564c-b45b-482e-94db-7506118fc50d" + ] + }, + { + "datarepo_row_id": "7fce9cf8-5d2b-4a27-83fc-bc22214116e3", + "activity_id": "8d502145-9cd0-0a30-a6fe-8c4f529086c4", + "activity_type": "Unknown", + "used_file_id": [ + "af3ca37e-76a8-3568-94aa-b9f0f256bcf2" + ], + "generated_file_id": [ + "0027abb3-5038-3849-aeaf-adccdc692259" + ], + "used_biosample_id": [], + "source_datarepo_row_ids": [ + "file_inventory:c4d7b383-cab6-46e3-8d8f-37b0c8e42f25", + "file_inventory:c332b008-d443-4fa2-878a-3af2b3ebd893" + ] + }, + { + "datarepo_row_id": "fc734969-1c7f-4806-b0f8-299584a6158d", + "activity_id": "07361e60-5446-4a38-b4ae-c765c6f4cfbd", + "activity_type": "Unknown", + "used_file_id": [ + "1c086a00-2869-3f58-9ed8-87e5f4d59e33" + ], + "generated_file_id": [ + "002eddb4-d277-36a4-bf0e-8c8318d10531" + ], + "used_biosample_id": [], + "source_datarepo_row_ids": [ + "file_inventory:b856a7f2-c4ed-4cd1-a2fb-cd9dc7a810f5", + "file_inventory:53f3388d-314e-407c-8548-85797a56b735" + ] } ] }, @@ -254,6 +302,13 @@ "variantcallingactivity_id": "8db89e5e-1c8a-487e-b0c1-b26218ac6b7b" } ] + }, + "non_schema_orphan_table": { + "rows": [ + { + "datarepo_row_id": "" + } + ] } } } \ No newline at end of file diff --git a/test/indexer/test_anvil.py b/test/indexer/test_anvil.py index 7e50815b8..2bf631c5a 100644 --- a/test/indexer/test_anvil.py +++ b/test/indexer/test_anvil.py @@ -49,6 +49,9 @@ from azul.terra import ( TDRClient, ) +from azul.uuids import ( + zero_pad, +) from azul_test_case import ( TDRTestCase, ) @@ -105,12 +108,14 @@ def bundle_fqid(cls, *, uuid, version=None, - table_name=BundleType.primary.value + table_name=BundleType.primary.value, + batch_prefix_length=None ) -> TDRAnvilBundleFQID: assert version is None, 'All AnVIL bundles should use the same version' return TDRAnvilBundleFQID(source=cls.source, uuid=uuid, - version=table_name) + version=table_name, + batch_prefix_length=batch_prefix_length) @classmethod def primary_bundle(cls) -> TDRAnvilBundleFQID: @@ -119,7 +124,8 @@ def primary_bundle(cls) -> TDRAnvilBundleFQID: @classmethod def supplementary_bundle(cls) -> TDRAnvilBundleFQID: return cls.bundle_fqid(uuid='6b0f6c0f-5d80-a242-accb-840921351cd5', - table_name=BundleType.supplementary.value) + table_name=BundleType.supplementary.value, + batch_prefix_length=0) @classmethod def duos_bundle(cls) -> TDRAnvilBundleFQID: @@ -128,8 +134,9 @@ def duos_bundle(cls) -> TDRAnvilBundleFQID: @classmethod def replica_bundle(cls) -> TDRAnvilBundleFQID: - return cls.bundle_fqid(uuid='abc00000-0000-a000-0000-000000000000', - table_name='anvil_activity') + return cls.bundle_fqid(uuid='00000000-0000-a000-0000-000000000000', + table_name='anvil_activity', + batch_prefix_length=0) class TestAnvilIndexer(AnvilIndexerTestCase, @@ -169,13 +176,21 @@ def test_indexing(self): def test_list_and_fetch_bundles(self): source_ref = self.source - self._make_mock_tdr_tables(source_ref) - expected_bundle_fqids = sorted([ + tables = self._make_mock_tdr_tables(source_ref) + plugin = self.plugin_for_source_spec(source_ref.spec) + unbatched_bundles = [ self.primary_bundle(), - self.supplementary_bundle(), self.duos_bundle() - ]) - plugin = self.plugin_for_source_spec(source_ref.spec) + ] + batched_bundles = [ + self.bundle_fqid(uuid=zero_pad('', plugin.bundle_uuid_version), + table_name=table_name, + batch_prefix_length=self.source.spec.prefix.partition) + for table_name in tables.keys() + if table_name not in (BundleType.primary.value, BundleType.duos.value) + ] + self.assertIn(self.supplementary_bundle(), batched_bundles) + expected_bundle_fqids = sorted(unbatched_bundles + batched_bundles) bundle_fqids = sorted(plugin.list_bundles(source_ref, '')) self.assertEqual(expected_bundle_fqids, bundle_fqids) for bundle_fqid in bundle_fqids: diff --git a/test/indexer/test_tdr.py b/test/indexer/test_tdr.py index 62aab9dfe..3e59985cb 100644 --- a/test/indexer/test_tdr.py +++ b/test/indexer/test_tdr.py @@ -197,8 +197,7 @@ def setUpClass(cls): '--dataset=' + cls.source.spec.name ]) - def _make_mock_tdr_tables(self, - source: TDRSourceRef) -> None: + def _make_mock_tdr_tables(self, source: TDRSourceRef) -> JSON: tables = self._load_canned_file_version(uuid=source.id, version=None, extension='tables.tdr')['tables'] @@ -206,6 +205,7 @@ def _make_mock_tdr_tables(self, self._make_mock_entity_table(source.spec, table_name, table_rows['rows']) + return tables def _make_mock_entity_table(self, source: TDRSourceSpec,