From d48b6465bac899a920eaf9bd99578ea901d9586b Mon Sep 17 00:00:00 2001 From: James Corbett Date: Fri, 28 Jun 2024 18:04:15 -0700 Subject: [PATCH] dws: enforce k8s finalizer removal Problem: users have reported on elcap that some of their workflows hang in deletion because flux attempts to delete it without first removing the finalizer. Add additional logic for removing Flux's finalizer from workflows. Fixes #165. --- src/modules/coral2_dws.py | 50 +++++++++++++++++++++++++++++---------- 1 file changed, 37 insertions(+), 13 deletions(-) diff --git a/src/modules/coral2_dws.py b/src/modules/coral2_dws.py index 076b4f0..dadfc28 100755 --- a/src/modules/coral2_dws.py +++ b/src/modules/coral2_dws.py @@ -11,7 +11,6 @@ import sys import syslog import json -import re import functools import argparse import logging @@ -120,6 +119,23 @@ def wrapper(handle, arg, msg, k8s_api): return wrapper +def remove_finalizer(workflow_name, k8s_api, workflow=None): + """Remove the finalizer from the workflow so it can be deleted.""" + if workflow is None: + workflow = k8s_api.get_namespaced_custom_object(*WORKFLOW_CRD, workflow_name) + try: + workflow["metadata"]["finalizers"].remove(_FINALIZER) + except ValueError: + # finalizer is not present, nothing to do + pass + else: + k8s_api.patch_namespaced_custom_object( + *WORKFLOW_CRD, + workflow_name, + {"metadata": {"finalizers": workflow["metadata"]["finalizers"]}}, + ) + + def move_workflow_desiredstate(workflow_name, desiredstate, k8s_api): """Helper function for moving workflow to a desiredState.""" k8s_api.patch_namespaced_custom_object( @@ -293,6 +309,8 @@ def post_run_cb(handle, _t, msg, k8s_api): # the job hit an exception before beginning to run; transition # the workflow immediately to 'teardown' move_workflow_desiredstate(winfo.name, "Teardown", k8s_api) + # Remove the finalizer so the resource can be deleted. + remove_finalizer(winfo.name, k8s_api) winfo.toredown = True else: move_workflow_desiredstate(winfo.name, "PostRun", k8s_api) @@ -306,6 +324,11 @@ def state_complete(workflow, state): ) +def state_active(workflow, state): + """Helper function for checking whether a workflow is working on a given state.""" + return workflow["spec"]["desiredState"] == workflow["status"]["state"] == state + + def workflow_state_change_cb(event, handle, k8s_api, disable_fluxion): """Exception-catching wrapper around _workflow_state_change_cb_inner.""" try: @@ -333,6 +356,8 @@ def workflow_state_change_cb(event, handle, k8s_api, disable_fluxion): ) try: move_workflow_desiredstate(winfo.name, "Teardown", k8s_api) + # Remove the finalizer so the resource can be deleted. + remove_finalizer(winfo.name, k8s_api, workflow) except ApiException: LOGGER.exception( "Failed to move workflow with jobid %s to 'teardown' " @@ -354,18 +379,15 @@ def _workflow_state_change_cb_inner( if winfo.deleted: # deletion request has been submitted, nothing to do return - if state_complete(workflow, "Teardown"): - # delete workflow object and tell DWS jobtap plugin that the job is done - try: - workflow["metadata"]["finalizers"].remove(_FINALIZER) - except ValueError: - pass - else: - k8s_api.patch_namespaced_custom_object( - *WORKFLOW_CRD, - winfo.name, - {"metadata": {"finalizers": workflow["metadata"]["finalizers"]}}, - ) + if state_active(workflow, "Teardown") and not state_complete(workflow, "Teardown"): + # Remove the finalizer as soon as the workflow begins working on its + # teardown state. + remove_finalizer(winfo.name, k8s_api, workflow) + elif state_complete(workflow, "Teardown"): + # Delete workflow object and tell DWS jobtap plugin that the job is done. + # Attempt to remove the finalizer again in case the state transitioned + # too quickly for it to be noticed earlier. + remove_finalizer(winfo.name, k8s_api, workflow) k8s_api.delete_namespaced_custom_object(*WORKFLOW_CRD, winfo.name) winfo.deleted = True handle.rpc("job-manager.dws.epilog-remove", payload={"id": jobid}).then( @@ -422,6 +444,8 @@ def _workflow_state_change_cb_inner( elif state_complete(workflow, "DataOut"): # move workflow to next stage, teardown move_workflow_desiredstate(winfo.name, "Teardown", k8s_api) + # Remove the finalizer so the resource can be deleted. + remove_finalizer(winfo.name, k8s_api, workflow) winfo.toredown = True if workflow["status"].get("status") == "Error": # a fatal error has occurred