diff --git a/src/modules/coral2_dws.py b/src/modules/coral2_dws.py index 076b4f0..dadfc28 100755 --- a/src/modules/coral2_dws.py +++ b/src/modules/coral2_dws.py @@ -11,7 +11,6 @@ import sys import syslog import json -import re import functools import argparse import logging @@ -120,6 +119,23 @@ def wrapper(handle, arg, msg, k8s_api): return wrapper +def remove_finalizer(workflow_name, k8s_api, workflow=None): + """Remove the finalizer from the workflow so it can be deleted.""" + if workflow is None: + workflow = k8s_api.get_namespaced_custom_object(*WORKFLOW_CRD, workflow_name) + try: + workflow["metadata"]["finalizers"].remove(_FINALIZER) + except ValueError: + # finalizer is not present, nothing to do + pass + else: + k8s_api.patch_namespaced_custom_object( + *WORKFLOW_CRD, + workflow_name, + {"metadata": {"finalizers": workflow["metadata"]["finalizers"]}}, + ) + + def move_workflow_desiredstate(workflow_name, desiredstate, k8s_api): """Helper function for moving workflow to a desiredState.""" k8s_api.patch_namespaced_custom_object( @@ -293,6 +309,8 @@ def post_run_cb(handle, _t, msg, k8s_api): # the job hit an exception before beginning to run; transition # the workflow immediately to 'teardown' move_workflow_desiredstate(winfo.name, "Teardown", k8s_api) + # Remove the finalizer so the resource can be deleted. + remove_finalizer(winfo.name, k8s_api) winfo.toredown = True else: move_workflow_desiredstate(winfo.name, "PostRun", k8s_api) @@ -306,6 +324,11 @@ def state_complete(workflow, state): ) +def state_active(workflow, state): + """Helper function for checking whether a workflow is working on a given state.""" + return workflow["spec"]["desiredState"] == workflow["status"]["state"] == state + + def workflow_state_change_cb(event, handle, k8s_api, disable_fluxion): """Exception-catching wrapper around _workflow_state_change_cb_inner.""" try: @@ -333,6 +356,8 @@ def workflow_state_change_cb(event, handle, k8s_api, disable_fluxion): ) try: move_workflow_desiredstate(winfo.name, "Teardown", k8s_api) + # Remove the finalizer so the resource can be deleted. + remove_finalizer(winfo.name, k8s_api, workflow) except ApiException: LOGGER.exception( "Failed to move workflow with jobid %s to 'teardown' " @@ -354,18 +379,15 @@ def _workflow_state_change_cb_inner( if winfo.deleted: # deletion request has been submitted, nothing to do return - if state_complete(workflow, "Teardown"): - # delete workflow object and tell DWS jobtap plugin that the job is done - try: - workflow["metadata"]["finalizers"].remove(_FINALIZER) - except ValueError: - pass - else: - k8s_api.patch_namespaced_custom_object( - *WORKFLOW_CRD, - winfo.name, - {"metadata": {"finalizers": workflow["metadata"]["finalizers"]}}, - ) + if state_active(workflow, "Teardown") and not state_complete(workflow, "Teardown"): + # Remove the finalizer as soon as the workflow begins working on its + # teardown state. + remove_finalizer(winfo.name, k8s_api, workflow) + elif state_complete(workflow, "Teardown"): + # Delete workflow object and tell DWS jobtap plugin that the job is done. + # Attempt to remove the finalizer again in case the state transitioned + # too quickly for it to be noticed earlier. + remove_finalizer(winfo.name, k8s_api, workflow) k8s_api.delete_namespaced_custom_object(*WORKFLOW_CRD, winfo.name) winfo.deleted = True handle.rpc("job-manager.dws.epilog-remove", payload={"id": jobid}).then( @@ -422,6 +444,8 @@ def _workflow_state_change_cb_inner( elif state_complete(workflow, "DataOut"): # move workflow to next stage, teardown move_workflow_desiredstate(winfo.name, "Teardown", k8s_api) + # Remove the finalizer so the resource can be deleted. + remove_finalizer(winfo.name, k8s_api, workflow) winfo.toredown = True if workflow["status"].get("status") == "Error": # a fatal error has occurred