Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: graceful shutdown to kill bq job #12

Merged
merged 3 commits into from
Jul 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 11 additions & 3 deletions task/bq2bq/executor/bumblebee/bigquery_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ def get_table(self, full_table_name):

class BigqueryService(BaseBigqueryService):

def __init__(self, client, labels, writer, on_job_finish = None):
def __init__(self, client, labels, writer, on_job_finish = None, on_job_cancelled = None):
"""

:rtype:
Expand All @@ -62,6 +62,7 @@ def __init__(self, client, labels, writer, on_job_finish = None):
self.labels = labels
self.writer = writer
self.on_job_finish = on_job_finish
self.on_job_cancelled = on_job_cancelled

def execute_query(self, query):
query_job_config = QueryJobConfig()
Expand All @@ -76,6 +77,10 @@ def execute_query(self, query):
job_config=query_job_config)
logger.info("Job {} is initially in state {} of {} project".format(query_job.job_id, query_job.state,
query_job.project))

if self.on_job_cancelled:
self.on_job_cancelled(self.client, query_job)

try:
result = query_job.result()
except (GoogleCloudError, Forbidden, BadRequest) as ex:
Expand Down Expand Up @@ -124,6 +129,9 @@ def transform_load(self,
logger.info("Job {} is initially in state {} of {} project".format(query_job.job_id, query_job.state,
query_job.project))

if self.on_job_cancelled:
self.on_job_cancelled(self.client, query_job)

try:
result = query_job.result()
except (GoogleCloudError, Forbidden, BadRequest) as ex:
Expand Down Expand Up @@ -166,7 +174,7 @@ def get_table(self, full_table_name):
return self.client.get_table(table_ref)


def create_bigquery_service(task_config: TaskConfigFromEnv, labels, writer, on_job_finish = None):
def create_bigquery_service(task_config: TaskConfigFromEnv, labels, writer, on_job_finish = None, on_job_cancelled = None):
if writer is None:
writer = writer.StdWriter()

Expand All @@ -175,7 +183,7 @@ def create_bigquery_service(task_config: TaskConfigFromEnv, labels, writer, on_j
default_query_job_config.priority = task_config.query_priority
default_query_job_config.allow_field_addition = task_config.allow_field_addition
client = bigquery.Client(project=task_config.execution_project, credentials=credentials, default_query_job_config=default_query_job_config)
return BigqueryService(client, labels, writer, on_job_finish=on_job_finish)
return BigqueryService(client, labels, writer, on_job_finish=on_job_finish, on_job_cancelled=on_job_cancelled)


def _get_bigquery_credentials():
Expand Down
3 changes: 2 additions & 1 deletion task/bq2bq/executor/bumblebee/bq2bq.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ def bq2bq(properties_file: str,
labels: dict = {},
output_on: str = './return.json',
on_job_finish = None,
on_job_cancelled = None,
):

logger.info("Using bumblebee version: {}".format(VERSION))
Expand All @@ -38,7 +39,7 @@ def bq2bq(properties_file: str,

bigquery_service = DummyService()
if not dry_run:
bigquery_service = create_bigquery_service(task_config, job_labels, writer, on_job_finish=on_job_finish)
bigquery_service = create_bigquery_service(task_config, job_labels, writer, on_job_finish=on_job_finish, on_job_cancelled=on_job_cancelled)

transformation = Transformation(bigquery_service,
task_config,
Expand Down
16 changes: 16 additions & 0 deletions task/bq2bq/executor/bumblebee/handler.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
import signal
import sys
from bumblebee.log import get_logger

logger = get_logger(__name__)

class BigqueryJobHandler:
def __init__(self) -> None:
self._sum_slot_millis = 0
Expand All @@ -7,6 +13,16 @@ def handle_job_finish(self, job) -> None:
self._sum_slot_millis += job.slot_millis
self._sum_total_bytes_processed += job.total_bytes_processed

def handle_job_cancelled(self, client, job):
c = client
job_id = job.job_id
def handler(signum, frame):
c.cancel_job(job_id)
logger.info(f"{job_id} successfully cancelled")
sys.exit(1)

signal.signal(signal.SIGTERM, handler)

def get_sum_slot_millis(self) -> int:
return self._sum_slot_millis

Expand Down
1 change: 1 addition & 0 deletions task/bq2bq/executor/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
app_config.job_labels,
app_config.xcom_path,
on_job_finish = job_handler.handle_job_finish,
on_job_cancelled = job_handler.handle_job_cancelled,
)

xcom_data['monitoring'] = {
Expand Down
4 changes: 2 additions & 2 deletions task/bq2bq/optimus-plugin-bq2bq.yaml
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
name: bq2bq
description: BigQuery to BigQuery transformation task
plugintype: task
pluginversion: 0.3.10 # update this with expected tag before release
image: docker.io/gotocompany/optimus-task-bq2bq-executor:0.3.10
pluginversion: 0.3.11 # update this with expected tag before release
image: docker.io/gotocompany/optimus-task-bq2bq-executor:0.3.11
entrypoint:
script: "python3 /opt/bumblebee/main.py"
questions:
Expand Down
Loading