diff --git a/src/azul/indexer/index_service.py b/src/azul/indexer/index_service.py index ab9571afd..dd1fcab75 100644 --- a/src/azul/indexer/index_service.py +++ b/src/azul/indexer/index_service.py @@ -212,7 +212,8 @@ def index(self, catalog: CatalogName, bundle: Bundle) -> None: for contributions, replicas in transforms: tallies.update(self.contribute(catalog, contributions)) self.replicate(catalog, replicas) - self.aggregate(tallies) + if tallies: + self.aggregate(tallies) def delete(self, catalog: CatalogName, bundle: Bundle) -> None: """ diff --git a/src/azul/plugins/metadata/anvil/bundle.py b/src/azul/plugins/metadata/anvil/bundle.py index e28801857..a3357f1dc 100644 --- a/src/azul/plugins/metadata/anvil/bundle.py +++ b/src/azul/plugins/metadata/anvil/bundle.py @@ -118,6 +118,7 @@ def to_entity_link(self, class AnvilBundle(Bundle[BUNDLE_FQID], ABC): entities: dict[EntityReference, MutableJSON] = attrs.field(factory=dict) links: set[EntityLink] = attrs.field(factory=set) + orphans: dict[EntityReference, MutableJSON] = attrs.field(factory=dict) def reject_joiner(self, catalog: CatalogName): # FIXME: Optimize joiner rejection and re-enable it for AnVIL @@ -125,21 +126,29 @@ def reject_joiner(self, catalog: CatalogName): pass def to_json(self) -> MutableJSON: - return { - 'entities': { + def serialize_entities(entities): + return { str(entity_ref): entity - for entity_ref, entity in sorted(self.entities.items()) - }, + for entity_ref, entity in sorted(entities.items()) + } + + return { + 'entities': serialize_entities(self.entities), + 'orphans': serialize_entities(self.orphans), 'links': [link.to_json() for link in sorted(self.links)] } @classmethod def from_json(cls, fqid: BUNDLE_FQID, json_: JSON) -> Self: + def deserialize_entities(json_entities): + return { + EntityReference.parse(entity_ref): entity + for entity_ref, entity in json_entities.items() + } + return cls( fqid=fqid, - entities={ - EntityReference.parse(entity_ref): entity - for entity_ref, entity in json_['entities'].items() - }, - links=set(map(EntityLink.from_json, json_['links'])) + entities=deserialize_entities(json_['entities']), + links=set(map(EntityLink.from_json, json_['links'])), + orphans=deserialize_entities(json_['orphans']) ) diff --git a/src/azul/plugins/metadata/anvil/indexer/transform.py b/src/azul/plugins/metadata/anvil/indexer/transform.py index dab724291..3ee0fc394 100644 --- a/src/azul/plugins/metadata/anvil/indexer/transform.py +++ b/src/azul/plugins/metadata/anvil/indexer/transform.py @@ -169,6 +169,9 @@ def aggregator(cls, entity_type) -> EntityAggregator: assert False, entity_type def estimate(self, partition: BundlePartition) -> int: + # Orphans are not considered when deciding whether to partition the + # bundle, but if the bundle is partitioned then each orphan will be + # replicated in a single partition return sum(map(partial(self._contains, partition), self.bundle.entities)) def transform(self, @@ -188,7 +191,9 @@ def _transform(self, raise NotImplementedError def _replicate(self, entity: EntityReference) -> tuple[str, JSON]: - return f'anvil_{entity.entity_type}', self.bundle.entities[entity] + replica_type = f'anvil_{entity.entity_type}' + content = ChainMap(self.bundle.entities, self.bundle.orphans)[entity] + return replica_type, content def _pluralize(self, entity_type: str) -> str: if entity_type == 'diagnosis': @@ -400,7 +405,10 @@ def _file(self, file: EntityReference) -> MutableJSON: uuid=file.entity_id) def _only_dataset(self) -> EntityReference: - return one(self._entities_by_type['dataset']) + try: + return one(self._entities_by_type['dataset']) + except ValueError: + return one(o for o in self.bundle.orphans if o.entity_type == 'dataset') _activity_polymorphic_types = { 'activity', @@ -495,7 +503,9 @@ def _dataset(self, dataset: EntityReference) -> MutableJSON: return super()._dataset(dataset) def _list_entities(self) -> Iterable[EntityReference]: - yield self._singleton() + # Suppress contributions for bundles that only contain orphans + if self.bundle.entities: + yield self._singleton() @abstractmethod def _singleton(self) -> EntityReference: @@ -553,6 +563,14 @@ def _singleton(self) -> EntityReference: return EntityReference(entity_type='bundle', entity_id=self.bundle.uuid) + def transform(self, + partition: BundlePartition + ) -> Iterable[Contribution | Replica]: + yield from super().transform(partition) + for orphan in self.bundle.orphans: + if partition.contains(UUID(orphan.entity_id)): + yield self._replica(orphan, file_hub=None) + class DatasetTransformer(SingletonTransformer): diff --git a/src/azul/plugins/repository/tdr_anvil/__init__.py b/src/azul/plugins/repository/tdr_anvil/__init__.py index 134d9fb91..12a2a0666 100644 --- a/src/azul/plugins/repository/tdr_anvil/__init__.py +++ b/src/azul/plugins/repository/tdr_anvil/__init__.py @@ -136,9 +136,14 @@ def canning_qualifier(cls) -> str: def add_entity(self, entity: EntityReference, version: str, - row: MutableJSON + row: MutableJSON, + *, + is_orphan: bool = False ) -> None: - assert entity not in self.entities, entity + dst = self.orphans if is_orphan else self.entities + # In DUOS bundles, the dataset is represented as both as entity and an + # orphan + assert entity not in dst, entity metadata = dict(row, version=version) if entity.entity_type == 'file': @@ -148,7 +153,7 @@ def add_entity(self, metadata.update(drs_uri=drs_uri, sha256='', crc32='') - self.entities[entity] = metadata + dst[entity] = metadata def add_links(self, links: Iterable[EntityLink]): self.links.update(links) diff --git a/test/indexer/data/2370f948-2783-aeb6-afea-e022897f4dcf.tdr.anvil.json b/test/indexer/data/2370f948-2783-aeb6-afea-e022897f4dcf.tdr.anvil.json index 89a1b7b8e..c9467c07e 100644 --- a/test/indexer/data/2370f948-2783-aeb6-afea-e022897f4dcf.tdr.anvil.json +++ b/test/indexer/data/2370f948-2783-aeb6-afea-e022897f4dcf.tdr.anvil.json @@ -5,5 +5,6 @@ "version": "2022-06-01T00:00:00.000000Z" } }, - "links": [] + "links": [], + "orphans": {} } \ No newline at end of file diff --git a/test/indexer/data/6b0f6c0f-5d80-a242-accb-840921351cd5.tdr.anvil.json b/test/indexer/data/6b0f6c0f-5d80-a242-accb-840921351cd5.tdr.anvil.json index df83b4bbd..068070133 100644 --- a/test/indexer/data/6b0f6c0f-5d80-a242-accb-840921351cd5.tdr.anvil.json +++ b/test/indexer/data/6b0f6c0f-5d80-a242-accb-840921351cd5.tdr.anvil.json @@ -53,5 +53,6 @@ "file/6b0f6c0f-5d80-4242-accb-840921351cd5" ] } - ] + ], + "orphans": {} } \ No newline at end of file diff --git a/test/indexer/data/826dea02-e274-affe-aabc-eb3db63ad068.tdr.anvil.json b/test/indexer/data/826dea02-e274-affe-aabc-eb3db63ad068.tdr.anvil.json index 96d468b36..9c0da1923 100644 --- a/test/indexer/data/826dea02-e274-affe-aabc-eb3db63ad068.tdr.anvil.json +++ b/test/indexer/data/826dea02-e274-affe-aabc-eb3db63ad068.tdr.anvil.json @@ -231,5 +231,6 @@ "biosample/826dea02-e274-4ffe-aabc-eb3db63ad068" ] } - ] + ], + "orphans": {} } diff --git a/test/indexer/data/abc00000-0000-a000-0000-000000000000.tdr.anvil.json b/test/indexer/data/abc00000-0000-a000-0000-000000000000.tdr.anvil.json new file mode 100644 index 000000000..ad1688c7a --- /dev/null +++ b/test/indexer/data/abc00000-0000-a000-0000-000000000000.tdr.anvil.json @@ -0,0 +1,200 @@ +{ + "entities": {}, + "links": [], + "orphans": { + "dataset/2370f948-2783-4eb6-afea-e022897f4dcf": { + "consent_group": [ + "DS-BDIS" + ], + "data_modality": [], + "data_use_permission": [ + "DS-BDIS" + ], + "datarepo_row_id": "2370f948-2783-4eb6-afea-e022897f4dcf", + "dataset_id": "52ee7665-7033-63f2-a8d9-ce8e32666739", + "owner": [ + "Debbie Nickerson" + ], + "principal_investigator": [], + "registered_identifier": [ + "phs000693" + ], + "source_datarepo_row_ids": [ + "workspace_attributes:7a22b629-9d81-4e4d-9297-f9e44ed760bc" + ], + "title": "ANVIL_CMG_UWASH_DS_BDIS", + "version": "2022-06-01T00:00:00.000000Z" + }, + "activity/7c0b04d4-09a3-4e00-a142-3a4c5fc4c3fb": { + "datarepo_row_id": "7c0b04d4-09a3-4e00-a142-3a4c5fc4c3fb", + "activity_id": "71f12c9c-5115-374b-cd96-9414b6f1935a", + "activity_type": "Unknown", + "used_file_id": [ + "ad7932a7-55d3-36af-ba7d-cb22b627f10d" + ], + "generated_file_id": [ + "00195da1-8e9f-3d95-957b-41607759de38" + ], + "used_biosample_id": [], + "source_datarepo_row_ids": [ + "file_inventory:239769e6-1ea0-4066-8289-4c1a07d180ee", + "file_inventory:0945564c-b45b-482e-94db-7506118fc50d" + ], + "version": "2022-06-01T00:00:00.000000Z" + }, + "activity/7fce9cf8-5d2b-4a27-83fc-bc22214116e3": { + "datarepo_row_id": "7fce9cf8-5d2b-4a27-83fc-bc22214116e3", + "activity_id": "8d502145-9cd0-0a30-a6fe-8c4f529086c4", + "activity_type": "Unknown", + "used_file_id": [ + "af3ca37e-76a8-3568-94aa-b9f0f256bcf2" + ], + "generated_file_id": [ + "0027abb3-5038-3849-aeaf-adccdc692259" + ], + "used_biosample_id": [], + "source_datarepo_row_ids": [ + "file_inventory:c4d7b383-cab6-46e3-8d8f-37b0c8e42f25", + "file_inventory:c332b008-d443-4fa2-878a-3af2b3ebd893" + ], + "version": "2022-06-01T00:00:00.000000Z" + }, + "activity/fc734969-1c7f-4806-b0f8-299584a6158d": { + "datarepo_row_id": "fc734969-1c7f-4806-b0f8-299584a6158d", + "activity_id": "07361e60-5446-4a38-b4ae-c765c6f4cfbd", + "activity_type": "Unknown", + "used_file_id": [ + "1c086a00-2869-3f58-9ed8-87e5f4d59e33" + ], + "generated_file_id": [ + "002eddb4-d277-36a4-bf0e-8c8318d10531" + ], + "used_biosample_id": [], + "source_datarepo_row_ids": [ + "file_inventory:b856a7f2-c4ed-4cd1-a2fb-cd9dc7a810f5", + "file_inventory:53f3388d-314e-407c-8548-85797a56b735" + ], + "version": "2022-06-01T00:00:00.000000Z" + }, + "activity/71a0dc30-4531-45cd-9c5f-2f65a562f391": { + "datarepo_row_id": "71a0dc30-4531-45cd-9c5f-2f65a562f391", + "activity_id": "4dde5557-daee-fadf-649f-f611d9f093ea", + "activity_type": "Unknown", + "used_file_id": [ + "7f8d77be-3113-3f57-b152-403f43e9b15c" + ], + "generated_file_id": [ + "003369a9-c81c-37ca-8484-6fde5a59baf3" + ], + "used_biosample_id": [], + "source_datarepo_row_ids": [ + "file_inventory:50da565e-fa26-447e-956e-809a11fe22ac", + "file_inventory:6a578163-a160-4a70-a674-6aaca57d2241" + ], + "version": "2022-06-01T00:00:00.000000Z" + }, + "activity/1d7639a4-f451-4b5a-96f6-ed8a3b18da93": { + "datarepo_row_id": "1d7639a4-f451-4b5a-96f6-ed8a3b18da93", + "activity_id": "25668d4c-5b40-4688-ae37-5a641d322922", + "activity_type": "Unknown", + "used_file_id": [ + "c80dea89-e141-3687-811e-e910dae867d5" + ], + "generated_file_id": [ + "00369305-b094-363f-b81d-73717da52260" + ], + "used_biosample_id": [], + "source_datarepo_row_ids": [ + "file_inventory:59fb470a-800b-44d2-85ab-e1f60b8b8254", + "file_inventory:c93e30f7-2122-4705-a843-a4753398dbbb" + ], + "version": "2022-06-01T00:00:00.000000Z" + }, + "activity/3def8066-0af9-42f8-aa96-9fa972bd089d": { + "datarepo_row_id": "3def8066-0af9-42f8-aa96-9fa972bd089d", + "activity_id": "d6070656-586d-66e9-39b6-c940f50216c0", + "activity_type": "Unknown", + "used_file_id": [ + "63d35f20-5985-3f25-836e-7dabf14d06c8" + ], + "generated_file_id": [ + "00600d8b-a9b2-3aa0-9908-004631916cd3" + ], + "used_biosample_id": [], + "source_datarepo_row_ids": [ + "file_inventory:9c86966c-e067-44aa-95e2-812a975ea35f", + "file_inventory:fd680963-281c-40b8-84eb-204964fbaff3" + ], + "version": "2022-06-01T00:00:00.000000Z" + }, + "activity/d9ffcbc8-fc43-43bc-8743-c6932fe5867a": { + "datarepo_row_id": "d9ffcbc8-fc43-43bc-8743-c6932fe5867a", + "activity_id": "0cc7ffe4-7302-adb8-eebe-9223c8dc2ee1", + "activity_type": "Unknown", + "used_file_id": [ + "adf28eac-afc1-30ed-ac47-4c9577840d28" + ], + "generated_file_id": [ + "0068ed87-2357-3cde-960a-7ce720cff759" + ], + "used_biosample_id": [], + "source_datarepo_row_ids": [ + "file_inventory:8de0c9fd-3cd0-406a-b074-27fcea961cb3", + "file_inventory:3f446e83-6780-44d9-9cf6-d457a9074408" + ], + "version": "2022-06-01T00:00:00.000000Z" + }, + "activity/c1a81b5a-70dc-4d19-b071-af6559e5ac3b": { + "datarepo_row_id": "c1a81b5a-70dc-4d19-b071-af6559e5ac3b", + "activity_id": "7ce79724-d533-16fd-172b-ab8e416e3a8e", + "activity_type": "Unknown", + "used_file_id": [], + "generated_file_id": [ + "0070bc37-bd1e-32f5-a5fa-e083d3f3ff2e", + "19d62a88-6459-3e38-a3ba-9ce3ecd60cba", + "1cd69bb6-4c1d-3e83-8459-e68a5b6aed84" + ], + "used_biosample_id": [ + "1e607e37-34fd-a21e-dfa3-94f4a1e29391" + ], + "source_datarepo_row_ids": [ + "sample:57423aa3-6b17-4684-ae7f-bd86790630cd" + ], + "version": "2022-06-01T00:00:00.000000Z" + }, + "activity/e9cf06ed-092c-4867-8009-979d0fb1c05f": { + "datarepo_row_id": "e9cf06ed-092c-4867-8009-979d0fb1c05f", + "activity_id": "db9b9867-bf6c-942f-8f79-0d49a573faf7", + "activity_type": "Unknown", + "used_file_id": [ + "824a9b0c-04bf-35b1-bea1-7b3a900bf880" + ], + "generated_file_id": [ + "00b0567a-932c-378f-ab7c-d8b42bd58e3e" + ], + "used_biosample_id": [], + "source_datarepo_row_ids": [ + "file_inventory:e8752934-998e-4a25-adb2-748d0913d699", + "file_inventory:8cb2cb49-0499-4a0a-87f2-de0b70cc2f4a" + ], + "version": "2022-06-01T00:00:00.000000Z" + }, + "activity/b240b3ac-e7e4-47b2-aae4-c47d65e84ccd": { + "datarepo_row_id": "b240b3ac-e7e4-47b2-aae4-c47d65e84ccd", + "activity_id": "363a0bed-be4d-8f10-208f-705857d8518f", + "activity_type": "Unknown", + "used_file_id": [ + "32073db0-ef1a-3889-a470-077c09b1b741" + ], + "generated_file_id": [ + "00b6d7db-c79d-3aa6-8c5d-5dd0e7618988" + ], + "used_biosample_id": [], + "source_datarepo_row_ids": [ + "file_inventory:01f5a54b-d1b1-4296-a368-ef76034452d4", + "file_inventory:3d9dfc08-457d-49be-a1d8-2db165fcd4bc" + ], + "version": "2022-06-01T00:00:00.000000Z" + } + } +} \ No newline at end of file diff --git a/test/indexer/test_anvil.py b/test/indexer/test_anvil.py index 4273972c1..193e292fa 100644 --- a/test/indexer/test_anvil.py +++ b/test/indexer/test_anvil.py @@ -124,6 +124,11 @@ def duos_bundle(cls) -> TDRAnvilBundleFQID: return cls.bundle_fqid(uuid='2370f948-2783-aeb6-afea-e022897f4dcf', version=BundleType.duos.value) + @classmethod + def replica_bundle(cls) -> TDRAnvilBundleFQID: + return cls.bundle_fqid(uuid='abc00000-0000-a000-0000-000000000000', + table_name='anvil_activity') + class TestAnvilIndexer(AnvilIndexerTestCase, TDRPluginTestCase[tdr_anvil.Plugin], @@ -180,6 +185,7 @@ def test_list_and_fetch_bundles(self): self.assertEqual(canned_bundle.fqid, bundle.fqid) self.assertEqual(canned_bundle.entities, bundle.entities) self.assertEqual(canned_bundle.links, bundle.links) + self.assertEqual(canned_bundle.orphans, bundle.orphans) class TestAnvilIndexerWithIndexesSetUp(AnvilIndexerTestCase): @@ -234,3 +240,19 @@ def test_dataset_description(self): # the files (hubs) from its bundle above **({DocumentType.replica: 1} if config.enable_replicas else {}) }) + + def test_orphans(self): + bundle = self._load_canned_bundle(self.replica_bundle()) + self._index_bundle(bundle) + expected = bundle.orphans if config.enable_replicas else {} + actual = {} + hits = self._get_all_hits() + for hit in hits: + qualifier, doc_type = self._parse_index_name(hit) + self.assertEqual(DocumentType.replica, doc_type) + source = hit['_source'] + self.assertEqual(source['hub_ids'], []) + ref = EntityReference(entity_type=source['replica_type'].removeprefix('anvil_'), + entity_id=source['entity_id']) + actual[ref] = source['contents'] + self.assertEqual(expected, actual) diff --git a/test/integration_test.py b/test/integration_test.py index e87c61d7c..7fa548412 100644 --- a/test/integration_test.py +++ b/test/integration_test.py @@ -1889,7 +1889,7 @@ def _test_catalog(self, catalog: config.Catalog): } self.assertIsSubset(set(stitched), metadata_ids) elif metadata_plugin_name == 'anvil': - self.assertEqual({'entities', 'links'}, bundle_json.keys()) + self.assertEqual({'entities', 'links', 'orphans'}, bundle_json.keys()) entities, links = bundle_json['entities'], bundle_json['links'] self.assertIsInstance(entities, dict) self.assertIsInstance(links, list)