diff --git a/task/bq2bq/executor/bumblebee/bigquery_service.py b/task/bq2bq/executor/bumblebee/bigquery_service.py index 09be950..0bb16c1 100644 --- a/task/bq2bq/executor/bumblebee/bigquery_service.py +++ b/task/bq2bq/executor/bumblebee/bigquery_service.py @@ -53,7 +53,7 @@ def get_table(self, full_table_name): class BigqueryService(BaseBigqueryService): - def __init__(self, client, labels, writer, on_job_finish = None): + def __init__(self, client, labels, writer, on_job_finish = None, on_job_cancelled = None): """ :rtype: @@ -62,6 +62,7 @@ def __init__(self, client, labels, writer, on_job_finish = None): self.labels = labels self.writer = writer self.on_job_finish = on_job_finish + self.on_job_cancelled = on_job_cancelled def execute_query(self, query): query_job_config = QueryJobConfig() @@ -76,6 +77,10 @@ def execute_query(self, query): job_config=query_job_config) logger.info("Job {} is initially in state {} of {} project".format(query_job.job_id, query_job.state, query_job.project)) + + if self.on_job_cancelled: + self.on_job_cancelled(self.client, query_job) + try: result = query_job.result() except (GoogleCloudError, Forbidden, BadRequest) as ex: @@ -124,6 +129,9 @@ def transform_load(self, logger.info("Job {} is initially in state {} of {} project".format(query_job.job_id, query_job.state, query_job.project)) + if self.on_job_cancelled: + self.on_job_cancelled(self.client, query_job) + try: result = query_job.result() except (GoogleCloudError, Forbidden, BadRequest) as ex: @@ -166,7 +174,7 @@ def get_table(self, full_table_name): return self.client.get_table(table_ref) -def create_bigquery_service(task_config: TaskConfigFromEnv, labels, writer, on_job_finish = None): +def create_bigquery_service(task_config: TaskConfigFromEnv, labels, writer, on_job_finish = None, on_job_cancelled = None): if writer is None: writer = writer.StdWriter() @@ -175,7 +183,7 @@ def create_bigquery_service(task_config: TaskConfigFromEnv, labels, writer, on_j default_query_job_config.priority = task_config.query_priority default_query_job_config.allow_field_addition = task_config.allow_field_addition client = bigquery.Client(project=task_config.execution_project, credentials=credentials, default_query_job_config=default_query_job_config) - return BigqueryService(client, labels, writer, on_job_finish=on_job_finish) + return BigqueryService(client, labels, writer, on_job_finish=on_job_finish, on_job_cancelled=on_job_cancelled) def _get_bigquery_credentials(): diff --git a/task/bq2bq/executor/bumblebee/bq2bq.py b/task/bq2bq/executor/bumblebee/bq2bq.py index 8dfec35..f638110 100644 --- a/task/bq2bq/executor/bumblebee/bq2bq.py +++ b/task/bq2bq/executor/bumblebee/bq2bq.py @@ -22,6 +22,7 @@ def bq2bq(properties_file: str, labels: dict = {}, output_on: str = './return.json', on_job_finish = None, + on_job_cancelled = None, ): logger.info("Using bumblebee version: {}".format(VERSION)) @@ -38,7 +39,7 @@ def bq2bq(properties_file: str, bigquery_service = DummyService() if not dry_run: - bigquery_service = create_bigquery_service(task_config, job_labels, writer, on_job_finish=on_job_finish) + bigquery_service = create_bigquery_service(task_config, job_labels, writer, on_job_finish=on_job_finish, on_job_cancelled=on_job_cancelled) transformation = Transformation(bigquery_service, task_config, diff --git a/task/bq2bq/executor/bumblebee/handler.py b/task/bq2bq/executor/bumblebee/handler.py index df6416a..4ef9fc4 100644 --- a/task/bq2bq/executor/bumblebee/handler.py +++ b/task/bq2bq/executor/bumblebee/handler.py @@ -1,3 +1,9 @@ +import signal +import sys +from bumblebee.log import get_logger + +logger = get_logger(__name__) + class BigqueryJobHandler: def __init__(self) -> None: self._sum_slot_millis = 0 @@ -7,6 +13,16 @@ def handle_job_finish(self, job) -> None: self._sum_slot_millis += job.slot_millis self._sum_total_bytes_processed += job.total_bytes_processed + def handle_job_cancelled(self, client, job): + c = client + job_id = job.job_id + def handler(signum, frame): + c.cancel_job(job_id) + logger.info(f"{job_id} successfully cancelled") + sys.exit(1) + + signal.signal(signal.SIGTERM, handler) + def get_sum_slot_millis(self) -> int: return self._sum_slot_millis diff --git a/task/bq2bq/executor/main.py b/task/bq2bq/executor/main.py index 949cb08..cc8cbb0 100644 --- a/task/bq2bq/executor/main.py +++ b/task/bq2bq/executor/main.py @@ -26,6 +26,7 @@ app_config.job_labels, app_config.xcom_path, on_job_finish = job_handler.handle_job_finish, + on_job_cancelled = job_handler.handle_job_cancelled, ) xcom_data['monitoring'] = { diff --git a/task/bq2bq/optimus-plugin-bq2bq.yaml b/task/bq2bq/optimus-plugin-bq2bq.yaml index e8286b1..effb935 100644 --- a/task/bq2bq/optimus-plugin-bq2bq.yaml +++ b/task/bq2bq/optimus-plugin-bq2bq.yaml @@ -1,8 +1,8 @@ name: bq2bq description: BigQuery to BigQuery transformation task plugintype: task -pluginversion: 0.3.10 # update this with expected tag before release -image: docker.io/gotocompany/optimus-task-bq2bq-executor:0.3.10 +pluginversion: 0.3.11 # update this with expected tag before release +image: docker.io/gotocompany/optimus-task-bq2bq-executor:0.3.11 entrypoint: script: "python3 /opt/bumblebee/main.py" questions: