Skip to content

Commit

Permalink
Merge pull request #168 from jameshcorbett/finalizer-hang
Browse files Browse the repository at this point in the history
dws: enforce k8s finalizer removal
  • Loading branch information
mergify[bot] authored Jul 1, 2024
2 parents d291879 + d48b646 commit 6316fa0
Showing 1 changed file with 37 additions and 13 deletions.
50 changes: 37 additions & 13 deletions src/modules/coral2_dws.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
import sys
import syslog
import json
import re
import functools
import argparse
import logging
Expand Down Expand Up @@ -120,6 +119,23 @@ def wrapper(handle, arg, msg, k8s_api):
return wrapper


def remove_finalizer(workflow_name, k8s_api, workflow=None):
"""Remove the finalizer from the workflow so it can be deleted."""
if workflow is None:
workflow = k8s_api.get_namespaced_custom_object(*WORKFLOW_CRD, workflow_name)
try:
workflow["metadata"]["finalizers"].remove(_FINALIZER)
except ValueError:
# finalizer is not present, nothing to do
pass
else:
k8s_api.patch_namespaced_custom_object(
*WORKFLOW_CRD,
workflow_name,
{"metadata": {"finalizers": workflow["metadata"]["finalizers"]}},
)


def move_workflow_desiredstate(workflow_name, desiredstate, k8s_api):
"""Helper function for moving workflow to a desiredState."""
k8s_api.patch_namespaced_custom_object(
Expand Down Expand Up @@ -293,6 +309,8 @@ def post_run_cb(handle, _t, msg, k8s_api):
# the job hit an exception before beginning to run; transition
# the workflow immediately to 'teardown'
move_workflow_desiredstate(winfo.name, "Teardown", k8s_api)
# Remove the finalizer so the resource can be deleted.
remove_finalizer(winfo.name, k8s_api)
winfo.toredown = True
else:
move_workflow_desiredstate(winfo.name, "PostRun", k8s_api)
Expand All @@ -306,6 +324,11 @@ def state_complete(workflow, state):
)


def state_active(workflow, state):
"""Helper function for checking whether a workflow is working on a given state."""
return workflow["spec"]["desiredState"] == workflow["status"]["state"] == state


def workflow_state_change_cb(event, handle, k8s_api, disable_fluxion):
"""Exception-catching wrapper around _workflow_state_change_cb_inner."""
try:
Expand Down Expand Up @@ -333,6 +356,8 @@ def workflow_state_change_cb(event, handle, k8s_api, disable_fluxion):
)
try:
move_workflow_desiredstate(winfo.name, "Teardown", k8s_api)
# Remove the finalizer so the resource can be deleted.
remove_finalizer(winfo.name, k8s_api, workflow)
except ApiException:
LOGGER.exception(
"Failed to move workflow with jobid %s to 'teardown' "
Expand All @@ -354,18 +379,15 @@ def _workflow_state_change_cb_inner(
if winfo.deleted:
# deletion request has been submitted, nothing to do
return
if state_complete(workflow, "Teardown"):
# delete workflow object and tell DWS jobtap plugin that the job is done
try:
workflow["metadata"]["finalizers"].remove(_FINALIZER)
except ValueError:
pass
else:
k8s_api.patch_namespaced_custom_object(
*WORKFLOW_CRD,
winfo.name,
{"metadata": {"finalizers": workflow["metadata"]["finalizers"]}},
)
if state_active(workflow, "Teardown") and not state_complete(workflow, "Teardown"):
# Remove the finalizer as soon as the workflow begins working on its
# teardown state.
remove_finalizer(winfo.name, k8s_api, workflow)
elif state_complete(workflow, "Teardown"):
# Delete workflow object and tell DWS jobtap plugin that the job is done.
# Attempt to remove the finalizer again in case the state transitioned
# too quickly for it to be noticed earlier.
remove_finalizer(winfo.name, k8s_api, workflow)
k8s_api.delete_namespaced_custom_object(*WORKFLOW_CRD, winfo.name)
winfo.deleted = True
handle.rpc("job-manager.dws.epilog-remove", payload={"id": jobid}).then(
Expand Down Expand Up @@ -422,6 +444,8 @@ def _workflow_state_change_cb_inner(
elif state_complete(workflow, "DataOut"):
# move workflow to next stage, teardown
move_workflow_desiredstate(winfo.name, "Teardown", k8s_api)
# Remove the finalizer so the resource can be deleted.
remove_finalizer(winfo.name, k8s_api, workflow)
winfo.toredown = True
if workflow["status"].get("status") == "Error":
# a fatal error has occurred
Expand Down

0 comments on commit 6316fa0

Please sign in to comment.