Skip to content

Commit

Permalink
Refactor table name/entity type conversion in AnVIL plugin
Browse files Browse the repository at this point in the history
  • Loading branch information
nadove-ucsc committed Oct 14, 2024
1 parent 9fdb89e commit 2c030f2
Show file tree
Hide file tree
Showing 3 changed files with 73 additions and 74 deletions.
119 changes: 61 additions & 58 deletions src/azul/plugins/repository/tdr_anvil/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@
KeyLinks = set[KeyLink]


class BundleEntityType(Enum):
class BundleType(Enum):
"""
AnVIL snapshots have no inherent notion of a "bundle". When indexing these
snapshots, we dynamically construct bundles by selecting individual entities
Expand Down Expand Up @@ -110,22 +110,22 @@ class BundleEntityType(Enum):
dataset fields during aggregation. This bundle contains only a single
dataset entity with only the `description` field populated.
"""
primary: EntityType = 'biosample'
supplementary: EntityType = 'file'
duos: EntityType = 'dataset'
primary = 'anvil_biosample'
supplementary = 'anvil_file'
duos = 'anvil_dataset'


class TDRAnvilBundleFQIDJSON(SourcedBundleFQIDJSON):
entity_type: str
table_name: str


@attrs.frozen(kw_only=True)
class TDRAnvilBundleFQID(TDRBundleFQID):
entity_type: BundleEntityType = attrs.field(converter=BundleEntityType)
table_name: str

def to_json(self) -> TDRAnvilBundleFQIDJSON:
return dict(super().to_json(),
entity_type=self.entity_type.value)
table_name=self.table_name)


class TDRAnvilBundle(AnvilBundle[TDRAnvilBundleFQID], TDRBundle):
Expand Down Expand Up @@ -172,10 +172,10 @@ def _version(self):
def _count_subgraphs(self, source: TDRSourceSpec) -> int:
rows = self._run_sql(f'''
SELECT COUNT(*) AS count
FROM {backtick(self._full_table_name(source, BundleEntityType.primary.value))}
FROM {backtick(self._full_table_name(source, BundleType.primary.value))}
UNION ALL
SELECT COUNT(*) AS count
FROM {backtick(self._full_table_name(source, BundleEntityType.supplementary.value))}
FROM {backtick(self._full_table_name(source, BundleType.supplementary.value))}
WHERE is_supplementary
''')
return sum(row['count'] for row in rows)
Expand All @@ -185,23 +185,23 @@ def _list_bundles(self,
prefix: str
) -> list[TDRAnvilBundleFQID]:
spec = source.spec
primary = BundleEntityType.primary.value
supplementary = BundleEntityType.supplementary.value
duos = BundleEntityType.duos.value
primary = BundleType.primary.value
supplementary = BundleType.supplementary.value
duos = BundleType.duos.value
rows = list(self._run_sql(f'''
SELECT datarepo_row_id, {primary!r} AS entity_type
SELECT datarepo_row_id, {primary!r} AS table_name
FROM {backtick(self._full_table_name(spec, primary))}
WHERE STARTS_WITH(datarepo_row_id, '{prefix}')
UNION ALL
SELECT datarepo_row_id, {supplementary!r} AS entity_type
SELECT datarepo_row_id, {supplementary!r} AS table_name
FROM {backtick(self._full_table_name(spec, supplementary))} AS supp
WHERE supp.is_supplementary AND STARTS_WITH(datarepo_row_id, '{prefix}')
''' + (
''
if config.duos_service_url is None else
f'''
UNION ALL
SELECT datarepo_row_id, {duos!r} AS entity_type
SELECT datarepo_row_id, {duos!r} AS table_name
FROM {backtick(self._full_table_name(spec, duos))}
'''
)))
Expand All @@ -218,7 +218,7 @@ def _list_bundles(self,
# single dataset. This verification is performed independently and
# concurrently for every partition, but only one partition actually
# emits the bundle.
if row['entity_type'] == duos:
if row['table_name'] == duos:
require(0 == duos_count)
duos_count += 1
# Ensure that one partition will always contain the DUOS bundle
Expand All @@ -229,43 +229,43 @@ def _list_bundles(self,
source=source,
uuid=bundle_uuid,
version=self._version,
entity_type=BundleEntityType(row['entity_type'])
table_name=row['table_name']
))
return bundles

def resolve_bundle(self, fqid: SourcedBundleFQIDJSON) -> TDRAnvilBundleFQID:
if 'entity_type' not in fqid:
# Resolution of bundles without entity type is expensive, so we only
# support it during canning.
assert not config.is_in_lambda, ('Bundle FQID lacks entity type', fqid)
if 'table_name' not in fqid:
# Resolution of bundles without the table name is expensive, so we
# only support it during canning.
assert not config.is_in_lambda, ('Bundle FQID lacks table name', fqid)
source = self.source_from_json(fqid['source'])
entity_id = uuids.change_version(fqid['uuid'],
self.bundle_uuid_version,
self.datarepo_row_uuid_version)
rows = self._run_sql(' UNION ALL '.join((
f'''
SELECT {entity_type.value!r} AS entity_type
FROM {backtick(self._full_table_name(source.spec, entity_type.value))}
SELECT {bundle_type.value!r} AS table_name
FROM {backtick(self._full_table_name(source.spec, bundle_type.value))}
WHERE datarepo_row_id = {entity_id!r}
'''
for entity_type in BundleEntityType
for bundle_type in BundleType
)))
fqid = {**fqid, **one(rows)}
return super().resolve_bundle(fqid)

def _emulate_bundle(self, bundle_fqid: TDRAnvilBundleFQID) -> TDRAnvilBundle:
if bundle_fqid.entity_type is BundleEntityType.primary:
if bundle_fqid.table_name == BundleType.primary.value:
log.info('Bundle %r is a primary bundle', bundle_fqid.uuid)
return self._primary_bundle(bundle_fqid)
elif bundle_fqid.entity_type is BundleEntityType.supplementary:
elif bundle_fqid.table_name == BundleType.supplementary.value:
log.info('Bundle %r is a supplementary bundle', bundle_fqid.uuid)
return self._supplementary_bundle(bundle_fqid)
elif bundle_fqid.entity_type is BundleEntityType.duos:
elif bundle_fqid.table_name == BundleType.duos.value:
assert config.duos_service_url is not None, bundle_fqid
log.info('Bundle %r is a DUOS bundle', bundle_fqid.uuid)
return self._duos_bundle(bundle_fqid)
else:
assert False, bundle_fqid.entity_type
assert False, bundle_fqid.table_name

def _primary_bundle(self, bundle_fqid: TDRAnvilBundleFQID) -> TDRAnvilBundle:
source = bundle_fqid.source
Expand Down Expand Up @@ -321,24 +321,25 @@ def _supplementary_bundle(self, bundle_fqid: TDRAnvilBundleFQID) -> TDRAnvilBund
self.bundle_uuid_version,
self.datarepo_row_uuid_version)
source = bundle_fqid.source.spec
bundle_entity_type = bundle_fqid.entity_type.value
table_name = bundle_fqid.table_name
result = TDRAnvilBundle(fqid=bundle_fqid)
columns = self._columns(bundle_entity_type)
columns = self._columns(table_name)
bundle_entity = dict(one(self._run_sql(f'''
SELECT {', '.join(sorted(columns))}
FROM {backtick(self._full_table_name(source, bundle_entity_type))}
FROM {backtick(self._full_table_name(source, table_name))}
WHERE datarepo_row_id = '{entity_id}'
''')))
linked_entity_type = 'dataset'
columns = self._columns(linked_entity_type)
dataset = 'dataset'
dataset_table = f'anvil_{dataset}'
columns = self._columns(dataset_table)
linked_entity = dict(one(self._run_sql(f'''
SELECT {', '.join(sorted(columns))}
FROM {backtick(self._full_table_name(source, linked_entity_type))}
FROM {backtick(self._full_table_name(source, dataset_table))}
''')))
link_args = {}
for entity_type, row, arg in [
(bundle_entity_type, bundle_entity, 'outputs'),
(linked_entity_type, linked_entity, 'inputs')
('file', bundle_entity, 'outputs'),
(dataset, linked_entity, 'inputs')
]:
entity_ref = EntityReference(entity_type=entity_type, entity_id=row['datarepo_row_id'])
result.add_entity(entity_ref, self._version, row)
Expand All @@ -352,7 +353,7 @@ def _duos_bundle(self, bundle_fqid: TDRAnvilBundleFQID) -> TDRAnvilBundle:
entity_id = change_version(bundle_fqid.uuid,
self.bundle_uuid_version,
self.datarepo_row_uuid_version)
entity = EntityReference(entity_type=bundle_fqid.entity_type.value,
entity = EntityReference(entity_type='dataset',
entity_id=entity_id)
bundle = TDRAnvilBundle(fqid=bundle_fqid)
bundle.add_entity(entity=entity,
Expand All @@ -366,16 +367,17 @@ def _bundle_entity(self, bundle_fqid: TDRAnvilBundleFQID) -> KeyReference:
entity_id = uuids.change_version(bundle_uuid,
self.bundle_uuid_version,
self.datarepo_row_uuid_version)
entity_type = bundle_fqid.entity_type.value
table_name = bundle_fqid.table_name
entity_type = table_name.removeprefix('anvil_')
pk_column = entity_type + '_id'
bundle_entity = one(self._run_sql(f'''
SELECT {pk_column}
FROM {backtick(self._full_table_name(source.spec, entity_type))}
FROM {backtick(self._full_table_name(source.spec, table_name))}
WHERE datarepo_row_id = '{entity_id}'
'''))[pk_column]
bundle_entity = KeyReference(key=bundle_entity, entity_type=entity_type)
log.info('Bundle UUID %r resolved to primary key %r in table %r',
bundle_uuid, bundle_entity.key, entity_type)
bundle_uuid, bundle_entity.key, table_name)
return bundle_entity

def _full_table_name(self, source: TDRSourceSpec, table_name: str) -> str:
Expand Down Expand Up @@ -445,7 +447,7 @@ def _upstream_from_biosamples(self,
if biosample_ids:
rows = self._run_sql(f'''
SELECT b.biosample_id, b.donor_id, b.part_of_dataset_id
FROM {backtick(self._full_table_name(source, 'biosample'))} AS b
FROM {backtick(self._full_table_name(source, 'anvil_biosample'))} AS b
WHERE b.biosample_id IN ({', '.join(map(repr, biosample_ids))})
''')
result: KeyLinks = set()
Expand All @@ -470,7 +472,7 @@ def _upstream_from_files(self,
if file_ids:
rows = self._run_sql(f'''
WITH file AS (
SELECT f.file_id FROM {backtick(self._full_table_name(source, 'file'))} AS f
SELECT f.file_id FROM {backtick(self._full_table_name(source, 'anvil_file'))} AS f
WHERE f.file_id IN ({', '.join(map(repr, file_ids))})
)
SELECT
Expand All @@ -480,7 +482,7 @@ def _upstream_from_files(self,
ama.used_file_id AS uses_file_id,
[] AS uses_biosample_id,
FROM file AS f
JOIN {backtick(self._full_table_name(source, 'alignmentactivity'))} AS ama
JOIN {backtick(self._full_table_name(source, 'anvil_alignmentactivity'))} AS ama
ON f.file_id IN UNNEST(ama.generated_file_id)
UNION ALL SELECT
f.file_id,
Expand All @@ -489,7 +491,7 @@ def _upstream_from_files(self,
[],
aya.used_biosample_id,
FROM file AS f
JOIN {backtick(self._full_table_name(source, 'assayactivity'))} AS aya
JOIN {backtick(self._full_table_name(source, 'anvil_assayactivity'))} AS aya
ON f.file_id IN UNNEST(aya.generated_file_id)
UNION ALL SELECT
f.file_id,
Expand All @@ -498,7 +500,7 @@ def _upstream_from_files(self,
[],
sqa.used_biosample_id,
FROM file AS f
JOIN {backtick(self._full_table_name(source, 'sequencingactivity'))} AS sqa
JOIN {backtick(self._full_table_name(source, 'anvil_sequencingactivity'))} AS sqa
ON f.file_id IN UNNEST(sqa.generated_file_id)
UNION ALL SELECT
f.file_id,
Expand All @@ -507,7 +509,7 @@ def _upstream_from_files(self,
vca.used_file_id,
[]
FROM file AS f
JOIN {backtick(self._full_table_name(source, 'variantcallingactivity'))} AS vca
JOIN {backtick(self._full_table_name(source, 'anvil_variantcallingactivity'))} AS vca
ON f.file_id IN UNNEST(vca.generated_file_id)
UNION ALL SELECT
f.file_id,
Expand All @@ -516,7 +518,7 @@ def _upstream_from_files(self,
a.used_file_id,
a.used_biosample_id,
FROM file AS f
JOIN {backtick(self._full_table_name(source, 'activity'))} AS a
JOIN {backtick(self._full_table_name(source, 'anvil_activity'))} AS a
ON f.file_id IN UNNEST(a.generated_file_id)
''')
return {
Expand Down Expand Up @@ -545,7 +547,7 @@ def _diagnoses_from_donors(self,
if donor_ids:
rows = self._run_sql(f'''
SELECT dgn.donor_id, dgn.diagnosis_id
FROM {backtick(self._full_table_name(source, 'diagnosis'))} as dgn
FROM {backtick(self._full_table_name(source, 'anvil_diagnosis'))} as dgn
WHERE dgn.donor_id IN ({', '.join(map(repr, donor_ids))})
''')
return {
Expand All @@ -569,21 +571,21 @@ def _downstream_from_biosamples(self,
'sequencingactivity' as activity_table,
sqa.used_biosample_id,
sqa.generated_file_id
FROM {backtick(self._full_table_name(source, 'sequencingactivity'))} AS sqa
FROM {backtick(self._full_table_name(source, 'anvil_sequencingactivity'))} AS sqa
UNION ALL
SELECT
aya.assayactivity_id,
'assayactivity',
aya.used_biosample_id,
aya.generated_file_id,
FROM {backtick(self._full_table_name(source, 'assayactivity'))} AS aya
FROM {backtick(self._full_table_name(source, 'anvil_assayactivity'))} AS aya
UNION ALL
SELECT
a.activity_id,
'activity',
a.used_biosample_id,
a.generated_file_id,
FROM {backtick(self._full_table_name(source, 'activity'))} AS a
FROM {backtick(self._full_table_name(source, 'anvil_activity'))} AS a
)
SELECT
biosample_id,
Expand Down Expand Up @@ -617,19 +619,19 @@ def _downstream_from_files(self,
'alignmentactivity' AS activity_table,
ala.used_file_id,
ala.generated_file_id
FROM {backtick(self._full_table_name(source, 'alignmentactivity'))} AS ala
FROM {backtick(self._full_table_name(source, 'anvil_alignmentactivity'))} AS ala
UNION ALL SELECT
vca.variantcallingactivity_id,
'variantcallingactivity',
vca.used_file_id,
vca.generated_file_id
FROM {backtick(self._full_table_name(source, 'variantcallingactivity'))} AS vca
FROM {backtick(self._full_table_name(source, 'anvil_variantcallingactivity'))} AS vca
UNION ALL SELECT
a.activity_id,
'activity',
a.used_file_id,
a.generated_file_id
FROM {backtick(self._full_table_name(source, 'activity'))} AS a
FROM {backtick(self._full_table_name(source, 'anvil_activity'))} AS a
)
SELECT
used_file_id,
Expand Down Expand Up @@ -657,8 +659,9 @@ def _retrieve_entities(self,
keys: AbstractSet[Key],
) -> MutableJSONs:
if keys:
table_name = self._full_table_name(source, entity_type)
columns = self._columns(entity_type)
table_name = f'anvil_{entity_type}'
columns = self._columns(table_name)
table_name = self._full_table_name(source, table_name)
pk_column = entity_type + '_id'
assert pk_column in columns, entity_type
log.debug('Retrieving %i entities of type %r ...', len(keys), entity_type)
Expand Down Expand Up @@ -693,7 +696,7 @@ def convert_column(value):
for table in anvil_schema['tables']
}

def _columns(self, entity_type: EntityType) -> set[str]:
columns = set(self._schema_columns[f'anvil_{entity_type}'])
def _columns(self, table_name: str) -> set[str]:
columns = set(self._schema_columns[table_name])
columns.add('datarepo_row_id')
return columns
10 changes: 5 additions & 5 deletions test/indexer/test_anvil.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
tdr_anvil,
)
from azul.plugins.repository.tdr_anvil import (
BundleEntityType,
BundleType,
TDRAnvilBundle,
TDRAnvilBundleFQID,
)
Expand Down Expand Up @@ -105,13 +105,13 @@ def bundle_fqid(cls,
*,
uuid,
version=None,
entity_type=BundleEntityType.primary
table_name=BundleType.primary.value
) -> TDRAnvilBundleFQID:
assert version is None, 'All AnVIL bundles should use the same version'
return TDRAnvilBundleFQID(source=cls.source,
uuid=uuid,
version=cls.version,
entity_type=entity_type)
table_name=table_name)

@classmethod
def primary_bundle(cls) -> TDRAnvilBundleFQID:
Expand All @@ -120,12 +120,12 @@ def primary_bundle(cls) -> TDRAnvilBundleFQID:
@classmethod
def supplementary_bundle(cls) -> TDRAnvilBundleFQID:
return cls.bundle_fqid(uuid='6b0f6c0f-5d80-a242-accb-840921351cd5',
entity_type=BundleEntityType.supplementary)
table_name=BundleType.supplementary.value)

@classmethod
def duos_bundle(cls) -> TDRAnvilBundleFQID:
return cls.bundle_fqid(uuid='2370f948-2783-aeb6-afea-e022897f4dcf',
entity_type=BundleEntityType.duos)
table_name=BundleType.duos.value)


class TestAnvilIndexer(AnvilIndexerTestCase,
Expand Down
Loading

0 comments on commit 2c030f2

Please sign in to comment.