Skip to content

Commit

Permalink
Stop using S3 Select in indexer
Browse files Browse the repository at this point in the history
  • Loading branch information
sir-sigurd committed Nov 8, 2024
1 parent 7972243 commit 97155ef
Showing 1 changed file with 56 additions and 38 deletions.
94 changes: 56 additions & 38 deletions lambdas/indexer/index.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@


import datetime
import functools
import json
import os
import pathlib
Expand Down Expand Up @@ -168,12 +169,7 @@
# currently only affects .parquet, TODO: extend to other extensions
assert 'SKIP_ROWS_EXTS' in os.environ
SKIP_ROWS_EXTS = separated_env_to_iter('SKIP_ROWS_EXTS')
SELECT_PACKAGE_META = "SELECT * from S3Object o WHERE o.version IS NOT MISSING LIMIT 1"
# No WHERE clause needed for aggregations since S3 Select skips missing fields for aggs
SELECT_PACKAGE_STATS = (
"SELECT COALESCE(SUM(obj['size']), 0) as total_bytes,"
" COUNT(obj['size']) as total_files from S3Object obj"
)
DUCKDB_SELECT_LAMBDA_ARN = os.environ["DUCKDB_SELECT_LAMBDA_ARN"]
TEST_EVENT = "s3:TestEvent"
# we need to filter out GetObject and HeadObject calls generated by the present
# lambda in order to display accurate analytics in the Quilt catalog
Expand All @@ -182,6 +178,7 @@


logger = get_quilt_logger()
s3_client = boto3.client("s3", config=botocore.config.Config(user_agent_extra=USER_AGENT_EXTRA))


def now_like_boto3():
Expand Down Expand Up @@ -247,13 +244,10 @@ def select_manifest_meta(s3_client, bucket: str, key: str):
wrapper for retry and returning a string
"""
try:
raw = query_manifest_content(
s3_client,
bucket=bucket,
key=key,
sql_stmt=SELECT_PACKAGE_META
)
return json.load(raw)
body = s3_client.get_object(Bucket=bucket, Key=key)["Body"]
with body: # this *might* be needed to close the stream ASAP
for line in body.iter_lines():
return json.loads(line)
except (botocore.exceptions.ClientError, json.JSONDecodeError) as cle:
print(f"Unable to S3 select manifest: {cle}")

Expand Down Expand Up @@ -439,7 +433,7 @@ def get_pkg_data():
first = select_manifest_meta(s3_client, bucket, manifest_key)
if not first:
return
stats = select_package_stats(s3_client, bucket, manifest_key)
stats = select_package_stats(bucket, manifest_key)
if not stats:
return

Expand Down Expand Up @@ -472,33 +466,52 @@ def get_pkg_data():
return True


def select_package_stats(s3_client, bucket, manifest_key) -> str:
@functools.lru_cache(maxsize=None)
def get_bucket_region(bucket: str) -> str:
resp = s3_client.head_bucket(Bucket=bucket)
return resp["ResponseMetadata"]["HTTPHeaders"]["x-amz-bucket-region"]


@functools.lru_cache(maxsize=None)
def get_presigner_client(bucket: str):
return boto3.client(
"s3",
region_name=get_bucket_region(bucket),
config=botocore.config.Config(signature_version="s3v4"),
)


def select_package_stats(bucket, manifest_key) -> Optional[dict]:
"""use s3 select to generate file stats for package"""
logger_ = get_quilt_logger()
try:
raw_stats = query_manifest_content(
s3_client,
bucket=bucket,
key=manifest_key,
sql_stmt=SELECT_PACKAGE_STATS
).read()

if raw_stats:
stats = json.loads(raw_stats)
assert isinstance(stats['total_bytes'], int)
assert isinstance(stats['total_files'], int)

return stats

except (
AssertionError,
botocore.exceptions.ClientError,
json.JSONDecodeError,
KeyError,
) as err:
logger_.exception("Unable to compute package stats via S3 select")
presigner_client = get_presigner_client(bucket)
url = presigner_client.generate_presigned_url(
ClientMethod="get_object",
Params={
"Bucket": bucket,
"Key": manifest_key,
},
)
lambda_ = make_lambda_client()
q = f"""
SELECT COALESCE(SUM(size), 0) AS total_bytes,
COUNT(size) AS total_files FROM read_ndjson('{url}', columns={{size='UBIGINT'}}) obj
"""
resp = lambda_.invoke(
FunctionName=DUCKDB_SELECT_LAMBDA_ARN,
Payload=json.dumps({"query": q}),
)

return None
payload = resp["Payload"].read()
# FIXME: error handling, return None on errors
if "FunctionError" in resp:
raise Exception(payload.decode())
parsed = json.loads(payload)
if "error" in parsed:
raise Exception(parsed["error"])

rows = parsed["rows"]
return rows[0] if rows else None


def extract_pptx(fileobj, max_size: int) -> str:
Expand Down Expand Up @@ -732,6 +745,11 @@ def make_s3_client():
return boto3.client("s3", config=configuration)


@functools.lru_cache(maxsize=None)
def make_lambda_client():
return boto3.client("lambda")


def map_event_name(event: dict):
"""transform eventbridge names into S3-like ones"""
input_ = event["eventName"]
Expand Down

0 comments on commit 97155ef

Please sign in to comment.