diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index 71273aaa2a..c1bc950eaf 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -97,6 +97,19 @@ deploy: paths: - terraform/plan.json +import: + extends: .base_on_push + stage: deploy + # The 1000G snapshot on `anvildev` takes about 3.5 minutes to import. There + # are currently 257 snapshots on `anvilprod`. 257 * 3.5 / 60 = 14.99 hours, + # which we choose to double. + timeout: 30h + needs: + - build_image + - deploy + script: + - make import + deploy_browser: extends: .base_on_push stage: deploy diff --git a/Makefile b/Makefile index 717574bc3e..1804fdf3fd 100644 --- a/Makefile +++ b/Makefile @@ -102,12 +102,16 @@ $(1)terraform: lambdas .PHONY: $(1)deploy $(1)deploy: check_python $(1)terraform - python $(project_root)/scripts/post_deploy_tdr.py endef $(eval $(call deploy,)) $(eval $(call deploy,auto_)) +.PHONY: import +import: check_python + python $(project_root)/scripts/reindex.py --import --sources "tdr:parquet:gcp:${GOOGLE_PROJECT}:*" + python $(project_root)/scripts/verify_tdr_sources.py + .PHONY: destroy destroy: $(MAKE) -C terraform destroy diff --git a/UPGRADING.rst b/UPGRADING.rst index ee374c9626..72becfcb9e 100644 --- a/UPGRADING.rst +++ b/UPGRADING.rst @@ -19,6 +19,14 @@ branch that does not have the listed changes, the steps would need to be reverted. This is all fairly informal and loosely defined. Hopefully we won't have too many entries in this file. +#6355 Parquet to BigQuery import for GCP-backed AnVIL snapshots +=============================================================== + +For all personal deployments colocated with ``anvilbox``, update +``environment.py`` to use source type "parquet" and Google project +"platform-anvil-dev" for snapshot +"ANVIL_1000G_2019_Dev_20230609_ANV5_202306121732". + #6355 Explicitly configure source type in environment files =========================================================== diff --git a/bin/wheels/runtime/idna-3.10-py3-none-any.whl b/bin/wheels/runtime/idna-3.10-py3-none-any.whl new file mode 100644 index 0000000000..52759bdd23 Binary files /dev/null and b/bin/wheels/runtime/idna-3.10-py3-none-any.whl differ diff --git a/bin/wheels/runtime/idna-3.8-py3-none-any.whl b/bin/wheels/runtime/idna-3.8-py3-none-any.whl deleted file mode 100644 index 84d04cdf9f..0000000000 Binary files a/bin/wheels/runtime/idna-3.8-py3-none-any.whl and /dev/null differ diff --git a/bin/wheels/runtime/pyasn1-0.6.0-py2.py3-none-any.whl b/bin/wheels/runtime/pyasn1-0.6.0-py2.py3-none-any.whl deleted file mode 100644 index a6a9380b06..0000000000 Binary files a/bin/wheels/runtime/pyasn1-0.6.0-py2.py3-none-any.whl and /dev/null differ diff --git a/bin/wheels/runtime/pyasn1-0.6.1-py3-none-any.whl b/bin/wheels/runtime/pyasn1-0.6.1-py3-none-any.whl new file mode 100644 index 0000000000..eef3fa5340 Binary files /dev/null and b/bin/wheels/runtime/pyasn1-0.6.1-py3-none-any.whl differ diff --git a/bin/wheels/runtime/pyasn1_modules-0.4.0-py3-none-any.whl b/bin/wheels/runtime/pyasn1_modules-0.4.0-py3-none-any.whl deleted file mode 100644 index c719fed167..0000000000 Binary files a/bin/wheels/runtime/pyasn1_modules-0.4.0-py3-none-any.whl and /dev/null differ diff --git a/bin/wheels/runtime/pyasn1_modules-0.4.1-py3-none-any.whl b/bin/wheels/runtime/pyasn1_modules-0.4.1-py3-none-any.whl new file mode 100644 index 0000000000..cddf2d6e2c Binary files /dev/null and b/bin/wheels/runtime/pyasn1_modules-0.4.1-py3-none-any.whl differ diff --git a/bin/wheels/runtime/pytz-2024.1-py2.py3-none-any.whl b/bin/wheels/runtime/pytz-2024.1-py2.py3-none-any.whl deleted file mode 100644 index 571f586a0d..0000000000 Binary files a/bin/wheels/runtime/pytz-2024.1-py2.py3-none-any.whl and /dev/null differ diff --git a/bin/wheels/runtime/pytz-2024.2-py2.py3-none-any.whl b/bin/wheels/runtime/pytz-2024.2-py2.py3-none-any.whl new file mode 100644 index 0000000000..b4644561e8 Binary files /dev/null and b/bin/wheels/runtime/pytz-2024.2-py2.py3-none-any.whl differ diff --git a/deployments/anvilbox/environment.py b/deployments/anvilbox/environment.py index f8bfaf3feb..77ebbe91d5 100644 --- a/deployments/anvilbox/environment.py +++ b/deployments/anvilbox/environment.py @@ -73,7 +73,7 @@ def mkdict(previous_catalog: dict[str, str], anvil_sources = mkdict({}, 3, mkdelta([ - mksrc('bigquery', 'datarepo-dev-e53e74aa', 'ANVIL_1000G_2019_Dev_20230609_ANV5_202306121732', 6804), + mksrc('parquet', 'platform-anvil-dev', 'ANVIL_1000G_2019_Dev_20230609_ANV5_202306121732', 6804), mksrc('bigquery', 'datarepo-dev-42c70e6a', 'ANVIL_CCDG_Sample_1_20230228_ANV5_202302281520', 28), mksrc('bigquery', 'datarepo-dev-97ad270b', 'ANVIL_CMG_Sample_1_20230225_ANV5_202302281509', 25) ])) diff --git a/deployments/anvildev/environment.py b/deployments/anvildev/environment.py index bfe5990e0e..9c3fae5c27 100644 --- a/deployments/anvildev/environment.py +++ b/deployments/anvildev/environment.py @@ -64,7 +64,7 @@ def mkdict(previous_catalog: dict[str, str], anvil_sources = mkdict({}, 3, mkdelta([ - mksrc('bigquery', 'datarepo-dev-e53e74aa', 'ANVIL_1000G_2019_Dev_20230609_ANV5_202306121732', 6804), + mksrc('parquet', 'platform-anvil-dev', 'ANVIL_1000G_2019_Dev_20230609_ANV5_202306121732', 6804), mksrc('bigquery', 'datarepo-dev-42c70e6a', 'ANVIL_CCDG_Sample_1_20230228_ANV5_202302281520', 28), mksrc('bigquery', 'datarepo-dev-97ad270b', 'ANVIL_CMG_Sample_1_20230225_ANV5_202302281509', 25) ])) diff --git a/environment.py b/environment.py index f7200b2327..474f7f62ba 100644 --- a/environment.py +++ b/environment.py @@ -710,6 +710,10 @@ def env() -> Mapping[str, Optional[str]]: # configured to index. All configured snapshots must reside in the same # location. # + # This variable is used both to *verify* that the sources' actual + # location matches our expectations, and to *determine* the location of + # any sources we create ourselves. + # # https://cloud.google.com/bigquery/docs/locations # 'AZUL_TDR_SOURCE_LOCATION': None, diff --git a/requirements.all.txt b/requirements.all.txt index faca0ab3cc..3156b73bf4 100644 --- a/requirements.all.txt +++ b/requirements.all.txt @@ -10,7 +10,7 @@ blinker==1.8.2 boto3==1.28.63 boto3-stubs==1.28.63 botocore==1.31.63 -botocore-stubs==1.35.15 +botocore-stubs==1.35.22 brotli==1.1.0 cachetools==5.5.0 certifi==2024.8.30 @@ -50,13 +50,13 @@ google-cloud-storage==2.12.0 google-crc32c==1.6.0 google-resumable-media==2.7.2 googleapis-common-protos==1.65.0 -greenlet==3.0.3 +greenlet==3.1.0 grpcio==1.66.1 grpcio-status==1.62.3 http-message-signatures==0.4.4 http_sfv==0.9.9 httplib2==0.22.0 -idna==3.8 +idna==3.10 importlib-resources==5.13.0 inquirer==2.10.1 itsdangerous==2.2.0 @@ -94,8 +94,8 @@ proto-plus==1.24.0 protobuf==4.25.4 psutil==6.0.0 py-partiql-parser==0.3.3 -pyasn1==0.6.0 -pyasn1_modules==0.4.0 +pyasn1==0.6.1 +pyasn1_modules==0.4.1 pycodestyle==2.9.1 pycparser==2.22 pyflakes==2.5.0 @@ -109,7 +109,7 @@ python-dateutil==2.9.0.post0 python-dxf==11.4.0 python-editor==1.0.4 python-gitlab==3.13.0 -pytz==2024.1 +pytz==2024.2 pyyaml==6.0.1 pyzmq==26.2.0 readchar==4.2.0 diff --git a/requirements.dev.trans.txt b/requirements.dev.trans.txt index 23de37512c..b83c416189 100644 --- a/requirements.dev.trans.txt +++ b/requirements.dev.trans.txt @@ -1,6 +1,6 @@ blessed==1.20.0 blinker==1.8.2 -botocore-stubs==1.35.15 +botocore-stubs==1.35.22 brotli==1.1.0 click==8.1.7 colorama==0.4.4 @@ -13,7 +13,7 @@ flask-cors==5.0.0 geventhttpclient==2.3.1 gitdb==4.0.11 google-auth-httplib2==0.2.0 -greenlet==3.0.3 +greenlet==3.1.0 httplib2==0.22.0 importlib-resources==5.13.0 inquirer==2.10.1 diff --git a/requirements.trans.txt b/requirements.trans.txt index 69273cf90f..f2f8a1db1a 100644 --- a/requirements.trans.txt +++ b/requirements.trans.txt @@ -12,18 +12,18 @@ googleapis-common-protos==1.65.0 grpcio==1.66.1 grpcio-status==1.62.3 http_sfv==0.9.9 -idna==3.8 +idna==3.10 markupsafe==2.1.5 orderedmultidict==1.0.1 packaging==24.1 proto-plus==1.24.0 protobuf==4.25.4 -pyasn1==0.6.0 -pyasn1_modules==0.4.0 +pyasn1==0.6.1 +pyasn1_modules==0.4.1 pycparser==2.22 pyopenssl==24.2.1 python-dateutil==2.9.0.post0 -pytz==2024.1 +pytz==2024.2 s3transfer==0.7.0 setuptools-scm==5.0.2 six==1.16.0 diff --git a/scripts/download_tdr_parquet.py b/scripts/download_tdr_parquet.py new file mode 100644 index 0000000000..41f07a1478 --- /dev/null +++ b/scripts/download_tdr_parquet.py @@ -0,0 +1,103 @@ +""" +Export Parquet files from TDR and download them to local storage. +""" +from argparse import ( + ArgumentParser, +) +import logging +from pathlib import ( + Path, +) +import sys +from typing import ( + Iterator, +) +from uuid import ( + UUID, +) + +import attrs +from furl import ( + furl, +) + +from azul import ( + cached_property, + config, + reject, +) +from azul.http import ( + HasCachedHttpClient, +) +from azul.logging import ( + configure_script_logging, +) +from azul.terra import ( + TDRClient, + TerraStatusException, +) + +log = logging.getLogger(__name__) + + +@attrs.frozen +class ParquetDownloader(HasCachedHttpClient): + snapshot_id: str + + @cached_property + def tdr(self) -> TDRClient: + return TDRClient.for_indexer() + + def get_download_urls(self) -> dict[str, list[furl]]: + urls = self.tdr.export_parquet_urls(self.snapshot_id) + reject(urls is None, + 'No Parquet access information is available for snapshot %r', self.snapshot_id) + return urls + + def get_data(self, parquet_urls: list[furl]) -> Iterator[bytes]: + for url in parquet_urls: + response = self._http_client.request('GET', str(url)) + if response.status != 200: + raise TerraStatusException(url, response) + if response.headers.get('x-ms-resource-type') == 'directory': + log.info('Skipping Azure directory URL') + else: + yield response.data + + def download_table(self, + table_name: str, + download_urls: list[furl], + location: Path): + data = None + for i, data in enumerate(self.get_data(download_urls)): + output_path = location / f'{self.snapshot_id}_{table_name}_{i}.parquet' + log.info('Writing to %s', output_path) + with open(output_path, 'wb') as f: + f.write(data) + reject(data is None, + 'No Parquet files found for snapshot %r. Tried URLs: %r', + self.snapshot_id, download_urls) + + +def main(argv): + parser = ArgumentParser(add_help=True, description=__doc__) + parser.add_argument('snapshot_id', + type=UUID, + help='The UUID of the snapshot') + parser.add_argument('-O', + '--output-dir', + type=Path, + default=Path(config.project_root) / 'parquet', + help='Where to save the downloaded files') + args = parser.parse_args(argv) + + downloader = ParquetDownloader(args.snapshot_id) + + urls_by_table = downloader.get_download_urls() + for table_name, urls in urls_by_table.items(): + downloader.download_table(table_name, urls, args.output_dir) + + +if __name__ == '__main__': + configure_script_logging(log) + main(sys.argv[1:]) diff --git a/scripts/reindex.py b/scripts/reindex.py index cd13cc6882..f18ab9f951 100755 --- a/scripts/reindex.py +++ b/scripts/reindex.py @@ -26,9 +26,15 @@ from azul.logging import ( configure_script_logging, ) +from azul.plugins.repository import ( + tdr_anvil, +) from azul.plugins.repository.tdr import ( TDRPlugin, ) +from azul.terra import ( + TDRSourceSpec, +) log = logging.getLogger(__name__) @@ -105,6 +111,11 @@ default=False, action='store_true', help='Purge the queues before taking any action on the indices.') +parser.add_argument('--import', + default=False, + action='store_true', + dest='import_', + help='Import sources into BigQuery data from TDR') parser.add_argument('--nowait', '--no-wait', dest='wait', default=True, @@ -159,6 +170,23 @@ def main(argv: list[str]): parser.error('Cannot specify sources when performing a local reindex') assert False + if args.import_: + if config.deployment.is_personal: + log.warning('Skipping table import for all catalogs. Usually, the ' + 'import is only be performed in shared deployments.') + else: + for catalog, sources in sources_by_catalog.items(): + if config.is_tdr_enabled(catalog) and config.is_anvil_enabled(catalog) and sources: + plugin = azul.repository_plugin(catalog) + assert isinstance(plugin, tdr_anvil.Plugin) + for source in sources: + spec = TDRSourceSpec.parse(source) + if spec.type == TDRSourceSpec.Type.parquet: + source = plugin.resolve_source(source) + plugin.import_tables(source) + else: + log.info('Skipping table import for catalog %r', catalog) + if args.deindex: require(not any((args.index, args.delete, args.create)), '--deindex is incompatible with --index, --create, and --delete.') diff --git a/scripts/post_deploy_tdr.py b/scripts/verify_tdr_sources.py similarity index 100% rename from scripts/post_deploy_tdr.py rename to scripts/verify_tdr_sources.py diff --git a/src/azul/plugins/repository/tdr_anvil/__init__.py b/src/azul/plugins/repository/tdr_anvil/__init__.py index d5873ea5d9..069cbda2ad 100644 --- a/src/azul/plugins/repository/tdr_anvil/__init__.py +++ b/src/azul/plugins/repository/tdr_anvil/__init__.py @@ -17,6 +17,9 @@ ) import attrs +from furl import ( + furl, +) from more_itertools import ( one, ) @@ -24,6 +27,7 @@ from azul import ( cached_property, config, + reject, require, uuids, ) @@ -740,3 +744,34 @@ def _columns(self, entity_type: EntityType) -> set[str]: entity_columns = {column['name'] for column in table['columns']} entity_columns.add('datarepo_row_id') return entity_columns + + def import_tables(self, source: TDRSourceRef): + """ + Import tables for an AnVIL snapshot into BigQuery via TDR's Parquet + export API. Only tables defined in the AnVIL schema will be imported. + Currently, only GS-backed snapshots are supported. + """ + require(source.spec.subdomain == config.google_project(), source) + + dataset_name = source.spec.name + self.tdr.create_dataset(dataset_name) + + urls_by_table = self.tdr.export_parquet_urls(source.id) + reject(urls_by_table is None, + 'No Parquet access information is available for snapshot %r.', source.spec) + + for table in anvil_schema['tables']: + table_name = table['name'] + uris = urls_by_table[table_name] + for uri in uris: + require(uri.origin == 'https://storage.googleapis.com', + 'Unsupported storage location for snapshot %r: %r', + source.spec, uri) + uri.load(furl(scheme='gs', + netloc=uri.path.segments[0], + path=uri.path.segments[1:])) + self.tdr.create_table(dataset_name=dataset_name, + table_name=table_name, + import_uris=uris, + overwrite=False, + clustering_fields=table['primaryKey']) diff --git a/src/azul/terra.py b/src/azul/terra.py index bbe69397e7..8313303f40 100644 --- a/src/azul/terra.py +++ b/src/azul/terra.py @@ -16,7 +16,6 @@ ) from typing import ( ClassVar, - Optional, ) import attrs @@ -39,9 +38,15 @@ bigquery, ) from google.cloud.bigquery import ( + Dataset, + DatasetReference, + LoadJobConfig, + ParquetOptions, QueryJob, QueryJobConfig, QueryPriority, + SourceFormat, + WriteDisposition, ) from more_itertools import ( one, @@ -149,7 +154,6 @@ def parse(cls, spec: str) -> 'TDRSourceSpec': service, type, domain, subdomain, name = rest.split(':') assert service == 'tdr', service type = cls.Type(type) - reject(type == cls.Type.parquet, 'Parquet sources are not yet supported') domain = cls.Domain(domain) reject(domain == cls.Domain.azure, 'Azure sources are not yet supported') self = cls(prefix=prefix, @@ -256,7 +260,7 @@ def oauth2_scopes(self) -> Sequence[str]: return [ *super().oauth2_scopes(), 'https://www.googleapis.com/auth/devstorage.read_only', - 'https://www.googleapis.com/auth/bigquery.readonly' + 'https://www.googleapis.com/auth/bigquery' ] @@ -409,15 +413,11 @@ def lookup_source(self, source_spec: TDRSourceSpec) -> str: """ source = self._lookup_source(source_spec) actual_project = source['dataProject'] - require(actual_project == source_spec.subdomain, - 'Actual Google project of TDR source differs from configured one', - actual_project, source_spec.subdomain) - storage = one( - resource - for resource in source['storage'] - if resource['cloudResource'] == 'bigquery' - ) - actual_location = storage['region'] + if source_spec.subdomain != config.google_project(): + require(actual_project == source_spec.subdomain, + 'Actual Google project of TDR source differs from configured one', + actual_project, source_spec.subdomain) + actual_location = self._get_region(source, 'bigquery') # Uppercase is standard for multi-regions in the documentation but TDR # returns 'us' in lowercase require(actual_location.lower() == config.tdr_source_location.lower(), @@ -425,6 +425,14 @@ def lookup_source(self, source_spec: TDRSourceSpec) -> str: actual_location, config.tdr_source_location) return source['id'] + def _get_region(self, source: JSON, resource_type: str) -> str: + storage = one( + resource + for resource in source['storage'] + if resource['cloudResource'] == resource_type + ) + return storage['region'] + def _retrieve_source(self, source: TDRSourceRef) -> MutableJSON: endpoint = self._repository_endpoint('snapshots', source.id) response = self._request('GET', endpoint) @@ -534,7 +542,8 @@ def _check_response(self, endpoint: furl, response: urllib3.HTTPResponse ) -> MutableJSON: - if response.status == 200: + # 202 is observed while waiting for the Parquet export + if response.status in (200, 202): return json.loads(response.data) # FIXME: Azul sometimes conflates 401 and 403 # https://github.com/DataBiosphere/azul/issues/4463 @@ -557,7 +566,7 @@ def snapshot_ids(self) -> set[str]: def snapshot_names_by_id(self, *, - filter: Optional[str] = None + filter: str | None = None ) -> dict[str, str]: """ List the TDR snapshots accessible to the current credentials. @@ -635,7 +644,7 @@ def for_registered_user(cls, authentication: OAuth2) -> 'TDRClient': def drs_client(self) -> DRSClient: return DRSClient(http_client=self._http_client) - def get_duos(self, source: TDRSourceRef) -> Optional[MutableJSON]: + def get_duos(self, source: TDRSourceRef) -> MutableJSON | None: response = self._retrieve_source(source) try: duos_id = response['duosFirecloudGroup']['duosId'] @@ -651,3 +660,137 @@ def get_duos(self, source: TDRSourceRef) -> Optional[MutableJSON]: return None else: return self._check_response(url, response) + + def create_dataset(self, dataset_name: str): + """ + Create a BigQuery dataset in the GCP project associated with the current + credentials and the GCP region configured for the current deployment. + + :param dataset_name: Unqualified name of the dataset to create. + `google.cloud.exceptions.Conflict` will be raised + if a dataset with the same name already exists. + """ + bigquery = self._bigquery(self.credentials.project_id) + ref = DatasetReference(bigquery.project, dataset_name) + location = config.tdr_source_location + # We get a false warning from PyCharm here, probably because of + # + # https://youtrack.jetbrains.com/issue/PY-23400/regression-PEP484-type-annotations-in-docstrings-nearly-completely-broken + # + # Google uses the docstring syntax to annotate types in its BQ client. + # + # noinspection PyTypeChecker + dataset = Dataset(ref) + dataset.location = location + log.info('Creating BigQuery dataset %r in region %r', + dataset.dataset_id, dataset.location) + actual_dataset = bigquery.create_dataset(dataset) + require(actual_dataset.reference == ref) + require(actual_dataset.project == self.credentials.project_id) + require(actual_dataset.location == location) + + def create_table(self, + dataset_name: str, + table_name: str, + import_uris: Sequence[furl], + *, + overwrite: bool, + clustering_fields: Sequence[str] | None = None): + """ + Create a BigQuery table in the project and region configured for the + current deployment. + + :param dataset_name: Unqualified name of the dataset to contain the new + table + + :param table_name: Unqualified name of the new table + + :param import_uris: URIs of Parquet file(s) to populate the table. The + URI scheme must `gs://` and the GCS bucket's region + must be compatible with the target dataset's. See + https://cloud.google.com/bigquery/docs/loading-data-cloud-storage-parquet#limitations + + :param overwrite: Overwrite existing table with the same ID as the table + we're trying to create (true) or raise an exception if + such a table exists (false) + + :param clustering_fields: Fields defining clustering for the table. See + https://cloud.google.com/bigquery/docs/clustered-tables + """ + for uri in import_uris: + require(uri.scheme == 'gs', 'Expected gs:// URI', uri) + table_id = f'{dataset_name}.{table_name}' + bigquery = self._bigquery(self.credentials.project_id) + write_disposition = ( + WriteDisposition.WRITE_TRUNCATE if overwrite else WriteDisposition.WRITE_EMPTY + ) + job_config = LoadJobConfig( + write_disposition=write_disposition, + clustering_fields=clustering_fields, + source_format=SourceFormat.PARQUET, + # With this option, array columns such as `anvil_diagnosis.disease` + # are created with the type `ARRAY`, as desired. Without it, + # they're given convoluted types like + # `STRUCT>>`. + parquet_options=ParquetOptions.from_api_repr(dict(enable_list_inference=True)) + ) + table_ref = f'{bigquery.project}.{dataset_name}.{table_name}' + log.info('Creating BigQuery table %r', table_ref) + load_job = bigquery.load_table_from_uri(source_uris=list(map(str, import_uris)), + destination=table_id, + job_config=job_config) + load_job.result() + log.info('Table %r created successfully', table_ref) + + def export_parquet_urls(self, + snapshot_id: str + ) -> dict[str, list[mutable_furl]] | None: + """ + Obtain URLs of Parquet files for the data tables of the specified + snapshot. This is a time-consuming operation that usually takes on the + order of one minute to complete. + + :param snapshot_id: The UUID of the snapshot + + :return: A mapping of table names to lists of Parquet file download + URLs, or `None` if if no Parquet downloads are available for + the specified snapshot. The URLs are typically expiring signed + URLs pointing to a cloud storage service such as GCS or Azure. + """ + url = self._repository_endpoint('snapshots', snapshot_id, 'export') + # Required for Azure-backed snapshots + url.args.add('validatePrimaryKeyUniqueness', False) + delays = [10, 20, 40, 80] + for delay in [*delays, None]: + response = self._request('GET', url) + response_body = self._check_response(url, response) + jobs_status = response_body['job_status'] + job_id = response_body['id'] + if jobs_status == 'running': + url = self._repository_endpoint('jobs', job_id) + log.info('Waiting for job %r ...', job_id) + if delay is None: + raise RuntimeError(f'TDR export job {job_id} timed out after {sum(delays)}s') + else: + sleep(delay) + continue + elif jobs_status == 'succeeded': + break + else: + raise TerraStatusException(url, response) + else: + assert False + url = self._repository_endpoint('jobs', job_id, 'result') + response = self._request('GET', url) + response_body = self._check_response(url, response) + parquet = response_body['format'].get('parquet') + if parquet is not None: + dataset = one(response_body['snapshot']['source'])['dataset'] + region = self._get_region(dataset, 'bucket') + require(config.tdr_source_location == region, + config.tdr_source_location, region) + parquet = { + table['name']: list(map(furl, table['paths'])) + for table in parquet['location']['tables'] + } + return parquet diff --git a/terraform/authentication.tf.json.template.py b/terraform/authentication.tf.json.template.py index 782c1581b9..07f29b3c3e 100644 --- a/terraform/authentication.tf.json.template.py +++ b/terraform/authentication.tf.json.template.py @@ -60,6 +60,9 @@ "title": f"azul_{config.deployment_stage}", "permissions": [ "bigquery.jobs.create", + "bigquery.datasets.create", + "bigquery.tables.create", + "bigquery.tables.updateData", *[ f'bigquery.{resource}.{action}' for resource in ('capacityCommitments', 'reservations')