Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parquet to BigQuery import for GCP-backed AnVIL snapshots (#6355) #6392

Open
wants to merge 4 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions .gitlab-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,19 @@ deploy:
paths:
- terraform/plan.json

import:
extends: .base_on_push
stage: deploy
# The 1000G snapshot on `anvildev` takes about 3.5 minutes to import. There
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure assuming that every snapshot is as big as 1000G leads to practical timeout.

A timeout is a heuristic defense against hung workloads, i.e., workloads that stop making significant progress. We don't want to constantly update the timeout, we don't want it to prematurely kill workloads that are progressing at the average rate, and we don't want the workload to be in the hung state for > 80% of it's running time. A 5min timeout goes against the first rule, a 30h timeout goes against the last.

# are currently 257 snapshots on `anvilprod`. 257 * 3.5 / 60 = 14.99 hours,
# which we choose to double.
timeout: 30h
needs:
- build_image
- deploy
script:
- make import

deploy_browser:
extends: .base_on_push
stage: deploy
Expand Down
6 changes: 5 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -102,12 +102,16 @@ $(1)terraform: lambdas

.PHONY: $(1)deploy
$(1)deploy: check_python $(1)terraform
python $(project_root)/scripts/post_deploy_tdr.py
endef

$(eval $(call deploy,))
$(eval $(call deploy,auto_))

.PHONY: import
import: check_python
python $(project_root)/scripts/reindex.py --import --sources "tdr:parquet:gcp:${GOOGLE_PROJECT}:*"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
python $(project_root)/scripts/reindex.py --import --sources "tdr:parquet:gcp:${GOOGLE_PROJECT}:*"
python $(project_root)/scripts/reindex.py --import --sources "tdr:parquet:gcp:*"

python $(project_root)/scripts/verify_tdr_sources.py

nadove-ucsc marked this conversation as resolved.
Show resolved Hide resolved
.PHONY: destroy
destroy:
$(MAKE) -C terraform destroy
Expand Down
8 changes: 8 additions & 0 deletions UPGRADING.rst
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,14 @@ branch that does not have the listed changes, the steps would need to be
reverted. This is all fairly informal and loosely defined. Hopefully we won't
have too many entries in this file.

#6355 Parquet to BigQuery import for GCP-backed AnVIL snapshots
===============================================================

For all personal deployments colocated with ``anvilbox``, update
``environment.py`` to use source type "parquet" and Google project
"platform-anvil-dev" for snapshot
"ANVIL_1000G_2019_Dev_20230609_ANV5_202306121732".


#6355 Explicitly configure source type in environment files
===========================================================
Expand Down
Binary file added bin/wheels/runtime/idna-3.10-py3-none-any.whl
Binary file not shown.
Binary file removed bin/wheels/runtime/idna-3.8-py3-none-any.whl
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
2 changes: 1 addition & 1 deletion deployments/anvilbox/environment.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ def mkdict(previous_catalog: dict[str, str],


anvil_sources = mkdict({}, 3, mkdelta([
mksrc('bigquery', 'datarepo-dev-e53e74aa', 'ANVIL_1000G_2019_Dev_20230609_ANV5_202306121732', 6804),
mksrc('parquet', 'platform-anvil-dev', 'ANVIL_1000G_2019_Dev_20230609_ANV5_202306121732', 6804),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The source spec should state where the source is, not where it will be when it is imported. The logic should be to import every parquet source. So I think this should read

Suggested change
mksrc('parquet', 'platform-anvil-dev', 'ANVIL_1000G_2019_Dev_20230609_ANV5_202306121732', 6804),
mksrc('bigquery', 'platform-anvil-dev', 'ANVIL_1000G_2019_Dev_20230609_ANV5_202306121732', 6804),

mksrc('bigquery', 'datarepo-dev-42c70e6a', 'ANVIL_CCDG_Sample_1_20230228_ANV5_202302281520', 28),
mksrc('bigquery', 'datarepo-dev-97ad270b', 'ANVIL_CMG_Sample_1_20230225_ANV5_202302281509', 25)
]))
Expand Down
2 changes: 1 addition & 1 deletion deployments/anvildev/environment.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ def mkdict(previous_catalog: dict[str, str],


anvil_sources = mkdict({}, 3, mkdelta([
mksrc('bigquery', 'datarepo-dev-e53e74aa', 'ANVIL_1000G_2019_Dev_20230609_ANV5_202306121732', 6804),
mksrc('parquet', 'platform-anvil-dev', 'ANVIL_1000G_2019_Dev_20230609_ANV5_202306121732', 6804),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
mksrc('parquet', 'platform-anvil-dev', 'ANVIL_1000G_2019_Dev_20230609_ANV5_202306121732', 6804),
mksrc('parquet', 'datarepo-dev-e53e74aa', 'ANVIL_1000G_2019_Dev_20230609_ANV5_202306121732', 6804),

mksrc('bigquery', 'datarepo-dev-42c70e6a', 'ANVIL_CCDG_Sample_1_20230228_ANV5_202302281520', 28),
mksrc('bigquery', 'datarepo-dev-97ad270b', 'ANVIL_CMG_Sample_1_20230225_ANV5_202302281509', 25)
]))
Expand Down
4 changes: 4 additions & 0 deletions environment.py
Original file line number Diff line number Diff line change
Expand Up @@ -710,6 +710,10 @@ def env() -> Mapping[str, Optional[str]]:
# configured to index. All configured snapshots must reside in the same
# location.
#
# This variable is used both to *verify* that the sources' actual
# location matches our expectations, and to *determine* the location of
# any sources we create ourselves.
#
# https://cloud.google.com/bigquery/docs/locations
#
'AZUL_TDR_SOURCE_LOCATION': None,
Expand Down
12 changes: 6 additions & 6 deletions requirements.all.txt
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ blinker==1.8.2
boto3==1.28.63
boto3-stubs==1.28.63
botocore==1.31.63
botocore-stubs==1.35.15
botocore-stubs==1.35.22
brotli==1.1.0
cachetools==5.5.0
certifi==2024.8.30
Expand Down Expand Up @@ -50,13 +50,13 @@ google-cloud-storage==2.12.0
google-crc32c==1.6.0
google-resumable-media==2.7.2
googleapis-common-protos==1.65.0
greenlet==3.0.3
greenlet==3.1.0
grpcio==1.66.1
grpcio-status==1.62.3
http-message-signatures==0.4.4
http_sfv==0.9.9
httplib2==0.22.0
idna==3.8
idna==3.10
importlib-resources==5.13.0
inquirer==2.10.1
itsdangerous==2.2.0
Expand Down Expand Up @@ -94,8 +94,8 @@ proto-plus==1.24.0
protobuf==4.25.4
psutil==6.0.0
py-partiql-parser==0.3.3
pyasn1==0.6.0
pyasn1_modules==0.4.0
pyasn1==0.6.1
pyasn1_modules==0.4.1
pycodestyle==2.9.1
pycparser==2.22
pyflakes==2.5.0
Expand All @@ -109,7 +109,7 @@ python-dateutil==2.9.0.post0
python-dxf==11.4.0
python-editor==1.0.4
python-gitlab==3.13.0
pytz==2024.1
pytz==2024.2
pyyaml==6.0.1
pyzmq==26.2.0
readchar==4.2.0
Expand Down
4 changes: 2 additions & 2 deletions requirements.dev.trans.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
blessed==1.20.0
blinker==1.8.2
botocore-stubs==1.35.15
botocore-stubs==1.35.22
brotli==1.1.0
click==8.1.7
colorama==0.4.4
Expand All @@ -13,7 +13,7 @@ flask-cors==5.0.0
geventhttpclient==2.3.1
gitdb==4.0.11
google-auth-httplib2==0.2.0
greenlet==3.0.3
greenlet==3.1.0
httplib2==0.22.0
importlib-resources==5.13.0
inquirer==2.10.1
Expand Down
8 changes: 4 additions & 4 deletions requirements.trans.txt
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,18 @@ googleapis-common-protos==1.65.0
grpcio==1.66.1
grpcio-status==1.62.3
http_sfv==0.9.9
idna==3.8
idna==3.10
markupsafe==2.1.5
orderedmultidict==1.0.1
packaging==24.1
proto-plus==1.24.0
protobuf==4.25.4
pyasn1==0.6.0
pyasn1_modules==0.4.0
pyasn1==0.6.1
pyasn1_modules==0.4.1
pycparser==2.22
pyopenssl==24.2.1
python-dateutil==2.9.0.post0
pytz==2024.1
pytz==2024.2
s3transfer==0.7.0
setuptools-scm==5.0.2
six==1.16.0
Expand Down
103 changes: 103 additions & 0 deletions scripts/download_tdr_parquet.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
"""
Export Parquet files from TDR and download them to local storage.
"""
from argparse import (
ArgumentParser,
)
import logging
from pathlib import (
Path,
)
import sys
from typing import (
Iterator,
)
from uuid import (
UUID,
)

import attrs
from furl import (
furl,
)

from azul import (
cached_property,
config,
reject,
)
from azul.http import (
HasCachedHttpClient,
)
from azul.logging import (
configure_script_logging,
)
from azul.terra import (
TDRClient,
TerraStatusException,
)

log = logging.getLogger(__name__)


@attrs.frozen
class ParquetDownloader(HasCachedHttpClient):
snapshot_id: str

@cached_property
def tdr(self) -> TDRClient:
return TDRClient.for_indexer()

def get_download_urls(self) -> dict[str, list[furl]]:
urls = self.tdr.export_parquet_urls(self.snapshot_id)
reject(urls is None,
'No Parquet access information is available for snapshot %r', self.snapshot_id)
return urls

def get_data(self, parquet_urls: list[furl]) -> Iterator[bytes]:
for url in parquet_urls:
response = self._http_client.request('GET', str(url))
if response.status != 200:
raise TerraStatusException(url, response)
if response.headers.get('x-ms-resource-type') == 'directory':
log.info('Skipping Azure directory URL')
else:
yield response.data

def download_table(self,
table_name: str,
download_urls: list[furl],
location: Path):
data = None
for i, data in enumerate(self.get_data(download_urls)):
output_path = location / f'{self.snapshot_id}_{table_name}_{i}.parquet'
log.info('Writing to %s', output_path)
with open(output_path, 'wb') as f:
f.write(data)
reject(data is None,
'No Parquet files found for snapshot %r. Tried URLs: %r',
self.snapshot_id, download_urls)


def main(argv):
parser = ArgumentParser(add_help=True, description=__doc__)
parser.add_argument('snapshot_id',
type=UUID,
help='The UUID of the snapshot')
parser.add_argument('-O',
'--output-dir',
type=Path,
default=Path(config.project_root) / 'parquet',
help='Where to save the downloaded files')
args = parser.parse_args(argv)

downloader = ParquetDownloader(args.snapshot_id)

urls_by_table = downloader.get_download_urls()
for table_name, urls in urls_by_table.items():
downloader.download_table(table_name, urls, args.output_dir)


if __name__ == '__main__':
configure_script_logging(log)
main(sys.argv[1:])
28 changes: 28 additions & 0 deletions scripts/reindex.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,15 @@
from azul.logging import (
configure_script_logging,
)
from azul.plugins.repository import (
tdr_anvil,
)
from azul.plugins.repository.tdr import (
TDRPlugin,
)
from azul.terra import (
TDRSourceSpec,
)

log = logging.getLogger(__name__)

Expand Down Expand Up @@ -105,6 +111,11 @@
default=False,
action='store_true',
help='Purge the queues before taking any action on the indices.')
parser.add_argument('--import',
default=False,
action='store_true',
dest='import_',
help='Import sources into BigQuery data from TDR')
parser.add_argument('--nowait', '--no-wait',
dest='wait',
default=True,
Expand Down Expand Up @@ -159,6 +170,23 @@ def main(argv: list[str]):
parser.error('Cannot specify sources when performing a local reindex')
assert False

if args.import_:
if config.deployment.is_personal:
log.warning('Skipping table import for all catalogs. Usually, the '
'import is only be performed in shared deployments.')
else:
for catalog, sources in sources_by_catalog.items():
if config.is_tdr_enabled(catalog) and config.is_anvil_enabled(catalog) and sources:
plugin = azul.repository_plugin(catalog)
assert isinstance(plugin, tdr_anvil.Plugin)
for source in sources:
spec = TDRSourceSpec.parse(source)
if spec.type == TDRSourceSpec.Type.parquet:
source = plugin.resolve_source(source)
plugin.import_tables(source)
else:
log.info('Skipping table import for catalog %r', catalog)

if args.deindex:
require(not any((args.index, args.delete, args.create)),
'--deindex is incompatible with --index, --create, and --delete.')
Expand Down
File renamed without changes.
35 changes: 35 additions & 0 deletions src/azul/plugins/repository/tdr_anvil/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,17 @@
)

import attrs
from furl import (
furl,
)
from more_itertools import (
one,
)

from azul import (
cached_property,
config,
reject,
require,
uuids,
)
Expand Down Expand Up @@ -740,3 +744,34 @@ def _columns(self, entity_type: EntityType) -> set[str]:
entity_columns = {column['name'] for column in table['columns']}
entity_columns.add('datarepo_row_id')
return entity_columns

def import_tables(self, source: TDRSourceRef):
"""
Import tables for an AnVIL snapshot into BigQuery via TDR's Parquet
export API. Only tables defined in the AnVIL schema will be imported.
Currently, only GS-backed snapshots are supported.
"""
require(source.spec.subdomain == config.google_project(), source)

dataset_name = source.spec.name
self.tdr.create_dataset(dataset_name)

urls_by_table = self.tdr.export_parquet_urls(source.id)
reject(urls_by_table is None,
'No Parquet access information is available for snapshot %r.', source.spec)

for table in anvil_schema['tables']:
table_name = table['name']
uris = urls_by_table[table_name]
for uri in uris:
require(uri.origin == 'https://storage.googleapis.com',
'Unsupported storage location for snapshot %r: %r',
source.spec, uri)
uri.load(furl(scheme='gs',
netloc=uri.path.segments[0],
path=uri.path.segments[1:]))
self.tdr.create_table(dataset_name=dataset_name,
table_name=table_name,
import_uris=uris,
overwrite=False,
clustering_fields=table['primaryKey'])
Loading
Loading