Skip to content

Commit

Permalink
[r] Index orphaned replicas (#6626)
Browse files Browse the repository at this point in the history
  • Loading branch information
nadove-ucsc committed Oct 23, 2024
1 parent 70fe838 commit 3e954ba
Show file tree
Hide file tree
Showing 15 changed files with 557 additions and 173 deletions.
8 changes: 8 additions & 0 deletions scripts/can_bundle.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,9 @@ def main(argv):
help='The version of the bundle to can. Required for HCA, ignored for AnVIL.')
parser.add_argument('--table-name', '-t',
help='The BigQuery table of the bundle to can. Only applicable for AnVIL.')
parser.add_argument('--batch-prefix', '-p',
help='The batch prefix of the bundle to can. Only applicable for AnVIL. '
'Use "null" for non-batched bundle formats.')
parser.add_argument('--output-dir', '-O',
default=os.path.join(config.project_root, 'test', 'indexer', 'data'),
help='The path to the output directory (default: %(default)s).')
Expand Down Expand Up @@ -99,6 +102,11 @@ def parse_fqid_fields(args: argparse.Namespace) -> JSON:
table_name = args.table_name
if table_name is not None:
fields['table_name'] = table_name
batch_prefix = args.batch_prefix
if batch_prefix is not None:
if batch_prefix == 'null':
batch_prefix = None
fields['batch_prefix'] = batch_prefix
return fields


Expand Down
3 changes: 2 additions & 1 deletion src/azul/indexer/index_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,8 @@ def index(self, catalog: CatalogName, bundle: Bundle) -> None:
for contributions, replicas in transforms:
tallies.update(self.contribute(catalog, contributions))
self.replicate(catalog, replicas)
self.aggregate(tallies)
if tallies:
self.aggregate(tallies)

def delete(self, catalog: CatalogName, bundle: Bundle) -> None:
"""
Expand Down
27 changes: 18 additions & 9 deletions src/azul/plugins/metadata/anvil/bundle.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,28 +122,37 @@ class AnvilBundle(Bundle[BUNDLE_FQID], ABC):
# the former to the latter during transformation.
entities: dict[EntityReference, MutableJSON] = attrs.field(factory=dict)
links: set[EntityLink] = attrs.field(factory=set)
orphans: dict[EntityReference, MutableJSON] = attrs.field(factory=dict)

def reject_joiner(self, catalog: CatalogName):
# FIXME: Optimize joiner rejection and re-enable it for AnVIL
# https://github.com/DataBiosphere/azul/issues/5256
pass

def to_json(self) -> MutableJSON:
return {
'entities': {
def serialize_entities(entities):
return {
str(entity_ref): entity
for entity_ref, entity in sorted(self.entities.items())
},
for entity_ref, entity in sorted(entities.items())
}

return {
'entities': serialize_entities(self.entities),
'orphans': serialize_entities(self.orphans),
'links': [link.to_json() for link in sorted(self.links)]
}

@classmethod
def from_json(cls, fqid: BUNDLE_FQID, json_: JSON) -> Self:
def deserialize_entities(json_entities):
return {
EntityReference.parse(entity_ref): entity
for entity_ref, entity in json_entities.items()
}

return cls(
fqid=fqid,
entities={
EntityReference.parse(entity_ref): entity
for entity_ref, entity in json_['entities'].items()
},
links=set(map(EntityLink.from_json, json_['links']))
entities=deserialize_entities(json_['entities']),
links=set(map(EntityLink.from_json, json_['links'])),
orphans=deserialize_entities(json_['orphans'])
)
23 changes: 20 additions & 3 deletions src/azul/plugins/metadata/anvil/indexer/transform.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,9 @@ def aggregator(cls, entity_type) -> EntityAggregator:
assert False, entity_type

def estimate(self, partition: BundlePartition) -> int:
# Orphans are not considered when deciding whether to partition the
# bundle, but if the bundle is partitioned then each orphan will be
# replicated in a single partition
return sum(map(partial(self._contains, partition), self.bundle.entities))

def transform(self,
Expand All @@ -188,7 +191,8 @@ def _transform(self,
raise NotImplementedError

def _replicate(self, entity: EntityReference) -> tuple[str, JSON]:
return entity.entity_type, self.bundle.entities[entity]
content = ChainMap(self.bundle.entities, self.bundle.orphans)[entity]
return entity.entity_type, content

def _convert_entity_type(self, entity_type: str) -> str:
assert entity_type == 'bundle' or entity_type.startswith('anvil_'), entity_type
Expand Down Expand Up @@ -406,7 +410,10 @@ def _file(self, file: EntityReference) -> MutableJSON:
uuid=file.entity_id)

def _only_dataset(self) -> EntityReference:
return one(self._entities_by_type['anvil_dataset'])
try:
return one(self._entities_by_type['anvil_dataset'])
except ValueError:
return one(o for o in self.bundle.orphans if o.entity_type == 'anvil_dataset')

@cached_property
def _activity_polymorphic_types(self) -> AbstractSet[str]:
Expand Down Expand Up @@ -506,7 +513,9 @@ def _dataset(self, dataset: EntityReference) -> MutableJSON:
return super()._dataset(dataset)

def _list_entities(self) -> Iterable[EntityReference]:
yield self._singleton()
# Suppress contributions for bundles that only contain orphans
if self.bundle.entities:
yield self._singleton()

@abstractmethod
def _singleton(self) -> EntityReference:
Expand Down Expand Up @@ -564,6 +573,14 @@ def _singleton(self) -> EntityReference:
return EntityReference(entity_type='bundle',
entity_id=self.bundle.uuid)

def transform(self,
partition: BundlePartition
) -> Iterable[Contribution | Replica]:
yield from super().transform(partition)
for orphan in self.bundle.orphans:
if partition.contains(UUID(orphan.entity_id)):
yield self._replica(orphan, file_hub=None)


class DatasetTransformer(SingletonTransformer):

Expand Down
Loading

0 comments on commit 3e954ba

Please sign in to comment.