Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix: Various problems in can_bundle script (#6669) #6670

42 changes: 21 additions & 21 deletions scripts/can_bundle.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
"""
Download manifest and metadata for a given bundle from the given repository
source and store them as separate JSON files in the index test data directory.
Download the contents of a given bundle from the given repository source and
store it as a single JSON file. Users are expected to be familiar with the
structure of the bundle FQIDs for the given source and provide the appropriate
attributes.

Note: silently overwrites the destination file.
"""
Expand All @@ -15,10 +17,6 @@
import sys
import uuid

from more_itertools import (
one,
)

from azul import (
cache,
config,
Expand All @@ -45,41 +43,45 @@
from azul.types import (
AnyJSON,
AnyMutableJSON,
JSON,
)

log = logging.getLogger(__name__)


def main(argv):
parser = argparse.ArgumentParser(description=__doc__, formatter_class=AzulArgumentHelpFormatter)
default_catalog = config.default_catalog
plugin_cls = RepositoryPlugin.load(default_catalog)
plugin = plugin_cls.create(default_catalog)
if len(plugin.sources) == 1:
source_arg = {'default': str(one(plugin.sources))}
else:
source_arg = {'required': True}
parser.add_argument('--source', '-s',
**source_arg,
required=True,
help='The repository source containing the bundle')
parser.add_argument('--uuid', '-b',
required=True,
help='The UUID of the bundle to can.')
parser.add_argument('--version', '-v',
help='The version of the bundle to can (default: the latest version).')
help='The version of the bundle to can. Required for HCA, ignored for AnVIL.')
parser.add_argument('--table-name',
help='The BigQuery table of the bundle to can. Only applicable for AnVIL.')
parser.add_argument('--output-dir', '-O',
default=os.path.join(config.project_root, 'test', 'indexer', 'data'),
help='The path to the output directory (default: %(default)s).')
parser.add_argument('--redaction-key', '-K',
help='Provide a key to redact confidential or sensitive information from the output files')
args = parser.parse_args(argv)
bundle = fetch_bundle(args.source, args.uuid, args.version)
fqid_fields = parse_fqid_fields(args)
bundle = fetch_bundle(args.source, fqid_fields)
if args.redaction_key:
redact_bundle(bundle, args.redaction_key.encode())
save_bundle(bundle, args.output_dir)


def fetch_bundle(source: str, bundle_uuid: str, bundle_version: str) -> Bundle:
def parse_fqid_fields(args: argparse.Namespace) -> JSON:
fields = {'uuid': args.uuid, 'version': args.version}
if args.table_name is not None:
fields['table_name'] = args.table_name
return fields


def fetch_bundle(source: str, fqid_args: JSON) -> Bundle:
for catalog in config.catalogs:
plugin = plugin_for(catalog)
try:
Expand All @@ -90,10 +92,8 @@ def fetch_bundle(source: str, bundle_uuid: str, bundle_version: str) -> Bundle:
log.debug('Searching for %r in catalog %r', source, catalog)
for plugin_source_spec in plugin.sources:
if source_ref.spec.eq_ignoring_prefix(plugin_source_spec):
fqid = SourcedBundleFQIDJSON(source=source_ref.to_json(),
uuid=bundle_uuid,
version=bundle_version)
fqid = plugin.resolve_bundle(fqid)
fqid = SourcedBundleFQIDJSON(source=source_ref.to_json(), **fqid_args)
fqid = plugin.bundle_fqid_from_json(fqid)
bundle = plugin.fetch_bundle(fqid)
log.info('Fetched bundle %r version %r from catalog %r.',
fqid.uuid, fqid.version, catalog)
Expand Down
2 changes: 1 addition & 1 deletion src/azul/indexer/index_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ def fetch_bundle(self,
bundle_fqid: SourcedBundleFQIDJSON
) -> Bundle:
plugin = self.repository_plugin(catalog)
bundle_fqid = plugin.resolve_bundle(bundle_fqid)
bundle_fqid = plugin.bundle_fqid_from_json(bundle_fqid)
return plugin.fetch_bundle(bundle_fqid)

def index(self, catalog: CatalogName, bundle: Bundle) -> None:
Expand Down
10 changes: 7 additions & 3 deletions src/azul/plugins/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -583,6 +583,13 @@ def _bundle_fqid_cls(self) -> type[BUNDLE_FQID]:
bundle_cls, spec_cls, ref_cls, fqid_cls = self._generic_params
return fqid_cls

def bundle_fqid_from_json(self, fqid: SourcedBundleFQIDJSON) -> BUNDLE_FQID:
"""
Instantiate a :class:`SourcedBundleFQID` from its JSON representation.
The expected input matches the output format of `SourcedBundleFQID.to_json`.
"""
return self._bundle_fqid_cls.from_json(fqid)

@property
def _bundle_cls(self) -> type[BUNDLE]:
bundle_cls, spec_cls, ref_cls, fqid_cls = self._generic_params
Expand All @@ -607,9 +614,6 @@ def _lookup_source_id(self, spec: SOURCE_SPEC) -> str:
"""
raise NotImplementedError

def resolve_bundle(self, fqid: SourcedBundleFQIDJSON) -> BUNDLE_FQID:
return self._bundle_fqid_cls.from_json(fqid)

@abstractmethod
def _count_subgraphs(self, source: SOURCE_SPEC) -> int:
"""
Expand Down
20 changes: 0 additions & 20 deletions src/azul/plugins/repository/tdr_anvil/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -233,26 +233,6 @@ def _list_bundles(self,
))
return bundles

def resolve_bundle(self, fqid: SourcedBundleFQIDJSON) -> TDRAnvilBundleFQID:
if 'table_name' not in fqid:
# Resolution of bundles without the table name is expensive, so we
# only support it during canning.
assert not config.is_in_lambda, ('Bundle FQID lacks table name', fqid)
source = self.source_from_json(fqid['source'])
entity_id = uuids.change_version(fqid['uuid'],
self.bundle_uuid_version,
self.datarepo_row_uuid_version)
rows = self._run_sql(' UNION ALL '.join((
f'''
SELECT {bundle_type.value!r} AS table_name
FROM {backtick(self._full_table_name(source.spec, bundle_type.value))}
WHERE datarepo_row_id = {entity_id!r}
'''
for bundle_type in BundleType
)))
fqid = {**fqid, **one(rows)}
return super().resolve_bundle(fqid)

def _emulate_bundle(self, bundle_fqid: TDRAnvilBundleFQID) -> TDRAnvilBundle:
if bundle_fqid.table_name is BundleType.primary:
log.info('Bundle %r is a primary bundle', bundle_fqid.uuid)
Expand Down
4 changes: 2 additions & 2 deletions test/indexer/test_indexer_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ def test_contribute_and_aggregate(self):
notified_fqids = list(map(self._fqid_from_notification, notifications))
notified_bundles = [bundles[fqid] for fqid in notified_fqids]
mock_plugin.fetch_bundle.side_effect = notified_bundles
mock_plugin.resolve_bundle.side_effect = DSSBundleFQID.from_json
mock_plugin.bundle_fqid_from_json.side_effect = DSSBundleFQID.from_json
mock_plugin.sources = [source]
with patch.object(IndexService, 'repository_plugin', return_value=mock_plugin):
with patch.object(BundlePartition, 'max_partition_size', 4):
Expand All @@ -241,7 +241,7 @@ def test_contribute_and_aggregate(self):

# Assert plugin calls by controller
expected_calls = [call(fqid.to_json()) for fqid in notified_fqids]
self.assertEqual(expected_calls, mock_plugin.resolve_bundle.mock_calls)
self.assertEqual(expected_calls, mock_plugin.bundle_fqid_from_json.mock_calls)
expected_calls = list(map(call, notified_fqids))
self.assertEqual(expected_calls, mock_plugin.fetch_bundle.mock_calls)

Expand Down
45 changes: 26 additions & 19 deletions test/integration_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@
from azul.indexer import (
SourceJSON,
SourceRef,
SourceSpec,
SourcedBundleFQID,
)
from azul.indexer.document import (
Expand Down Expand Up @@ -699,7 +700,7 @@ def _get_one_outer_file(self, catalog: CatalogName) -> JSON:
self.fail('No files found')
return one(hits)

def _source_spec(self, catalog: CatalogName, entity: JSON) -> TDRSourceSpec:
def _source_spec(self, catalog: CatalogName, entity: JSON) -> SourceSpec:
if config.is_hca_enabled(catalog):
field = 'sourceSpec'
elif config.is_anvil_enabled(catalog):
Expand Down Expand Up @@ -753,9 +754,10 @@ def _uuid_column_name(self, catalog: CatalogName) -> str:

def _test_dos_and_drs(self, catalog: CatalogName):
if config.is_dss_enabled(catalog) and config.dss_direct_access:
_, file = self._get_one_inner_file(catalog)
self._test_dos(catalog, file)
self._test_drs(catalog, file)
outer_file, inner_file = self._get_one_inner_file(catalog)
source = self._source_spec(catalog, outer_file)
self._test_dos(catalog, inner_file)
self._test_drs(catalog, source, inner_file)

@property
def _service_account_credentials(self) -> ContextManager:
Expand Down Expand Up @@ -1138,13 +1140,14 @@ def _validate_file_content(self, content: ReadableFileObject, file: FileInnerEnt

def _validate_file_response(self,
response: urllib3.HTTPResponse,
source: TDRSourceSpec,
source: SourceSpec,
file: FileInnerEntity):
"""
Note: The response object must have been obtained with stream=True
"""
try:
if source.name == 'ANVIL_1000G_2019_Dev_20230609_ANV5_202306121732':
special = 'ANVIL_1000G_2019_Dev_20230609_ANV5_202306121732'
if isinstance(source, TDRSourceSpec) and source.name == special:
# All files in this snapshot were truncated to zero bytes by the
# Broad to save costs. The metadata is not a reliable indication
# of these files' actual size.
Expand All @@ -1154,7 +1157,11 @@ def _validate_file_response(self,
finally:
response.close()

def _test_drs(self, catalog: CatalogName, file: FileInnerEntity):
def _test_drs(self,
catalog: CatalogName,
source: SourceSpec,
file: FileInnerEntity
) -> None:
repository_plugin = self.azul_client.repository_plugin(catalog)
drs = repository_plugin.drs_client()
for access_method in AccessMethod:
Expand All @@ -1165,7 +1172,7 @@ def _test_drs(self, catalog: CatalogName, file: FileInnerEntity):
self.assertIsNone(access.headers)
if access.method is AccessMethod.https:
response = self._get_url(GET, furl(access.url), stream=True)
self._validate_file_response(response, file)
self._validate_file_response(response, source, file)
elif access.method is AccessMethod.gs:
content = self._get_gs_url_content(furl(access.url), size=self.num_fastq_bytes)
self._validate_file_content(content, file)
Expand Down Expand Up @@ -1872,10 +1879,7 @@ def _test_catalog(self, catalog: config.Catalog):
fqid = self.bundle_fqid(catalog.name)
log.info('Canning bundle %r from catalog %r', fqid, catalog.name)
with tempfile.TemporaryDirectory() as d:
self._can_bundle(source=str(fqid.source.spec),
uuid=fqid.uuid,
version=fqid.version,
output_dir=d)
self._can_bundle(fqid, output_dir=d)
generated_file = one(os.listdir(d))
with open(os.path.join(d, generated_file)) as f:
bundle_json = json.load(f)
Expand Down Expand Up @@ -1952,16 +1956,19 @@ def bundle_fqid(self, catalog: CatalogName) -> SourcedBundleFQID:
return self.random.choice(sorted(bundle_fqids))

def _can_bundle(self,
source: str,
uuid: str,
version: str,
fqid: SourcedBundleFQID,
output_dir: str
) -> None:
args = [
'--source', source,
'--uuid', uuid,
'--version', version,
'--output-dir', output_dir
'--uuid', fqid.uuid,
'--version', fqid.version,
'--source', str(fqid.source.spec),
*(
['--table-name', fqid.table_name.value]
if isinstance(fqid, TDRAnvilBundleFQID) else
[]
),
'--output-dir', output_dir,
]
return self._can_bundle_main(args)

Expand Down
Loading