Skip to content

Commit

Permalink
Track orphans in AnVIL bundles (#6626)
Browse files Browse the repository at this point in the history
  • Loading branch information
nadove-ucsc committed Oct 16, 2024
1 parent b2b536c commit c00ca1c
Show file tree
Hide file tree
Showing 10 changed files with 278 additions and 19 deletions.
3 changes: 2 additions & 1 deletion src/azul/indexer/index_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,8 @@ def index(self, catalog: CatalogName, bundle: Bundle) -> None:
for contributions, replicas in transforms:
tallies.update(self.contribute(catalog, contributions))
self.replicate(catalog, replicas)
self.aggregate(tallies)
if tallies:
self.aggregate(tallies)

def delete(self, catalog: CatalogName, bundle: Bundle) -> None:
"""
Expand Down
27 changes: 18 additions & 9 deletions src/azul/plugins/metadata/anvil/bundle.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,28 +118,37 @@ def to_entity_link(self,
class AnvilBundle(Bundle[BUNDLE_FQID], ABC):
entities: dict[EntityReference, MutableJSON] = attrs.field(factory=dict)
links: set[EntityLink] = attrs.field(factory=set)
orphans: dict[EntityReference, MutableJSON] = attrs.field(factory=dict)

def reject_joiner(self, catalog: CatalogName):
# FIXME: Optimize joiner rejection and re-enable it for AnVIL
# https://github.com/DataBiosphere/azul/issues/5256
pass

def to_json(self) -> MutableJSON:
return {
'entities': {
def serialize_entities(entities):
return {
str(entity_ref): entity
for entity_ref, entity in sorted(self.entities.items())
},
for entity_ref, entity in sorted(entities.items())
}

return {
'entities': serialize_entities(self.entities),
'orphans': serialize_entities(self.orphans),
'links': [link.to_json() for link in sorted(self.links)]
}

@classmethod
def from_json(cls, fqid: BUNDLE_FQID, json_: JSON) -> Self:
def deserialize_entities(json_entities):
return {
EntityReference.parse(entity_ref): entity
for entity_ref, entity in json_entities.items()
}

return cls(
fqid=fqid,
entities={
EntityReference.parse(entity_ref): entity
for entity_ref, entity in json_['entities'].items()
},
links=set(map(EntityLink.from_json, json_['links']))
entities=deserialize_entities(json_['entities']),
links=set(map(EntityLink.from_json, json_['links'])),
orphans=deserialize_entities(json_['orphans'])
)
24 changes: 21 additions & 3 deletions src/azul/plugins/metadata/anvil/indexer/transform.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,9 @@ def aggregator(cls, entity_type) -> EntityAggregator:
assert False, entity_type

def estimate(self, partition: BundlePartition) -> int:
# Orphans are not considered when deciding whether to partition the
# bundle, but if the bundle is partitioned then each orphan will be
# replicated in a single partition
return sum(map(partial(self._contains, partition), self.bundle.entities))

def transform(self,
Expand All @@ -188,7 +191,9 @@ def _transform(self,
raise NotImplementedError

def _replicate(self, entity: EntityReference) -> tuple[str, JSON]:
return f'anvil_{entity.entity_type}', self.bundle.entities[entity]
replica_type = f'anvil_{entity.entity_type}'
content = ChainMap(self.bundle.entities, self.bundle.orphans)[entity]
return replica_type, content

def _pluralize(self, entity_type: str) -> str:
if entity_type == 'diagnosis':
Expand Down Expand Up @@ -400,7 +405,10 @@ def _file(self, file: EntityReference) -> MutableJSON:
uuid=file.entity_id)

def _only_dataset(self) -> EntityReference:
return one(self._entities_by_type['dataset'])
try:
return one(self._entities_by_type['dataset'])
except ValueError:
return one(o for o in self.bundle.orphans if o.entity_type == 'dataset')

_activity_polymorphic_types = {
'activity',
Expand Down Expand Up @@ -495,7 +503,9 @@ def _dataset(self, dataset: EntityReference) -> MutableJSON:
return super()._dataset(dataset)

def _list_entities(self) -> Iterable[EntityReference]:
yield self._singleton()
# Suppress contributions for bundles that only contain orphans
if self.bundle.entities:
yield self._singleton()

@abstractmethod
def _singleton(self) -> EntityReference:
Expand Down Expand Up @@ -553,6 +563,14 @@ def _singleton(self) -> EntityReference:
return EntityReference(entity_type='bundle',
entity_id=self.bundle.uuid)

def transform(self,
partition: BundlePartition
) -> Iterable[Contribution | Replica]:
yield from super().transform(partition)
for orphan in self.bundle.orphans:
if partition.contains(UUID(orphan.entity_id)):
yield self._replica(orphan, file_hub=None)


class DatasetTransformer(SingletonTransformer):

Expand Down
10 changes: 8 additions & 2 deletions src/azul/plugins/repository/tdr_anvil/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,9 +135,15 @@ def canning_qualifier(cls) -> str:
def add_entity(self,
entity: EntityReference,
version: str,
row: MutableJSON
row: MutableJSON,
*,
is_orphan: bool = False
) -> None:
assert entity not in self.entities, entity
dst = self.orphans if is_orphan else self.entities
# In DUOS bundles, the dataset is represented as both as entity and an
# orphan
assert entity not in dst, entity
metadata = dict(row,
version=version)
if entity.entity_type == 'file':
Expand All @@ -147,7 +153,7 @@ def add_entity(self,
metadata.update(drs_uri=drs_uri,
sha256='',
crc32='')
self.entities[entity] = metadata
dst[entity] = metadata

def add_links(self, links: Iterable[EntityLink]):
self.links.update(links)
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

200 changes: 200 additions & 0 deletions test/indexer/data/abc00000-0000-a000-0000-000000000000.tdr.anvil.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit c00ca1c

Please sign in to comment.