Skip to content

Commit

Permalink
Delete process from supervisor processes if it terminate normally
Browse files Browse the repository at this point in the history
  • Loading branch information
attdona committed Dec 12, 2023
1 parent ce16eb4 commit 72fa123
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 28 deletions.
78 changes: 50 additions & 28 deletions src/Visor.jl
Original file line number Diff line number Diff line change
Expand Up @@ -82,26 +82,39 @@ mutable struct Supervisor <: Supervised
task::Task
function Supervisor(
id,
processes = OrderedDict{String, Supervised}(),
processes=OrderedDict{String,Supervised}(),
intensity=1,
period=5,
strategy=:one_for_one,
terminateif=:empty,
evhandler=nothing
evhandler=nothing,
)
return new(id, idle, processes, intensity, period, strategy, terminateif, evhandler, [])
return new(
id, idle, processes, intensity, period, strategy, terminateif, evhandler, []
)
end
function Supervisor(
parent::Supervisor,
id,
processes = OrderedDict{String, Supervised}(),
processes=OrderedDict{String,Supervised}(),
intensity=1,
period=5,
strategy=:one_for_one,
terminateif=:empty,
evhandler=nothing
evhandler=nothing,
)
return new(id, idle, processes, intensity, period, strategy, terminateif, evhandler, [], parent)
return new(
id,
idle,
processes,
intensity,
period,
strategy,
terminateif,
evhandler,
[],
parent,
)
end
end

Expand Down Expand Up @@ -170,7 +183,8 @@ Base.show(io::IO, process::Supervised) = print(io, "$(process.id)")

function dump(node::Supervisor)
children = [
isa(p, Process) ? "$(p.id)($(p.status))" : "supervisor:$(p.id)($(p.status))" for p in values(node.processes)
isa(p, Process) ? "$(p.id)($(p.status))" : "supervisor:$(p.id)($(p.status))" for
p in values(node.processes)
]
println("[$node] nodes: $children")
for (id, el) in node.processes
Expand Down Expand Up @@ -253,9 +267,14 @@ function supervisor(
error("immediate shutdown of supervisor [$id] with no processes")
end

return Supervisor(id,
return Supervisor(
id,
OrderedDict{String,Supervised}(map(proc -> proc.id => proc, processes)),
intensity, period, strategy, terminateif)
intensity,
period,
strategy,
terminateif,
)
end

function supervisor(
Expand All @@ -265,7 +284,9 @@ function supervisor(
error("wrong shutdown value $shutdown: must be one of :empty, :shutdown")
end

return Supervisor(id, OrderedDict(proc.id=>proc), intensity, period, strategy, terminateif)
return Supervisor(
id, OrderedDict(proc.id => proc), intensity, period, strategy, terminateif
)
end

"""
Expand Down Expand Up @@ -344,7 +365,6 @@ process(
args=args,
namedargs=namedargs,
force_interrupt_after=force_interrupt_after,

debounce_time=debounce_time,
stop_waiting_after=stop_waiting_after,
thread=thread,
Expand Down Expand Up @@ -439,7 +459,7 @@ function restart_policy(supervisor, process)
# and then all child processes, including the terminated one, are restarted.
@debug "[$supervisor] restart strategy: $(supervisor.strategy)"
stopped = supervisor_shutdown(supervisor)
supervisor.restarts = stopped
supervisor.restarts = stopped
elseif supervisor.strategy === :rest_for_one
stopped = supervisor_shutdown(supervisor, process)
supervisor.restarts = stopped
Expand All @@ -457,14 +477,11 @@ function restart_processes(supervisor, procs)
if istaskfailed(proc.task)
@debug "[$proc] task failed"
break
else
clear_hold(proc)
end
end
supervisor.restarts = []
return supervisor.restarts = []
end


"""
startup(proc::Supervised)
Expand Down Expand Up @@ -494,8 +511,9 @@ function startup(supervisor::Supervisor, proc::Supervised)
@async supervise(Supervised[])
yield()
end
if haskey(supervisor.processes, proc.id)
@warn "[$supervisor] already supervisioning proc [$proc]" && !istaskdone(supervisor.processes[proc.id].task)
if haskey(supervisor.processes, proc.id) &&
!istaskdone(supervisor.processes[proc.id].task)
@warn "[$supervisor] already supervisioning proc [$proc]"
else
call(supervisor, proc)
end
Expand All @@ -516,13 +534,13 @@ function add_node(id::String, supervisor::Supervisor, proc::Supervised)
terminateif=proc.terminateif,
)
@async wait_child(supervisor, proc)
return proc
else
proc.supervisor = supervisor
supervisor.processes[proc.id] = proc
start(proc)
return proc
end
clear_hold(proc)
return proc
catch e
@error "[$supervisor] starting proc [$proc]: $e"
end
Expand All @@ -537,7 +555,6 @@ end
# For thread safety use the `startup` method.
add_node(supervisor::Supervisor, proc::Supervised) = add_node(proc.id, supervisor, proc)


# Start processes defined by `specs` specification.
# A dedicated supervisor is created and attached to parent supervisor if `parent`
# is a supervisor instance.
Expand Down Expand Up @@ -586,6 +603,7 @@ function supervisor_shutdown(
stopped_procs = []
revs = reverse(collect(values(supervisor.processes)))
for p in revs
@debug "[$p] set onhold"
hold(p)
push!(stopped_procs, p)
if p === failed_proc
Expand Down Expand Up @@ -617,7 +635,10 @@ function normal_return(supervisor::Supervisor, child::Process)
if child.restart === :permanent
!child.onhold && restart_policy(supervisor, child)
else
@debug "[$supervisor] normal_return: [$child] done"
@debug "[$supervisor] normal_return: [$child] done, onhold:$(child.onhold)"
if !child.onhold
delete!(supervisor.processes, child.id)
end
child.status = done
end
end
Expand Down Expand Up @@ -661,7 +682,7 @@ end

function isalldone(supervisor)
res = all(proc -> proc.status === done, values(supervisor.processes))
res
return res
end

# Supervisor main loop.
Expand Down Expand Up @@ -712,11 +733,10 @@ function manage(supervisor)
if !isempty(supervisor.restarts)
@debug "[$supervisor] to be restarted: $(format4print(supervisor.restarts))"
# check all required processes are terminated
if all(proc->proc.status!==running, supervisor.restarts)
if all(proc -> proc.status !== running, supervisor.restarts)
@debug "[$supervisor] restarting processes"
restart_processes(supervisor, supervisor.restarts)
end

end

@debug "[$supervisor] procs:[$(format4print(supervisor.processes))], terminateif $(supervisor.terminateif)"
Expand Down Expand Up @@ -837,7 +857,7 @@ end
Break the loop if a shutdown control message is received.
"""
macro isshutdown(msg)
:(isshutdown($(esc(msg))) && break)
return :(isshutdown($(esc(msg))) && break)
end

"""
Expand Down Expand Up @@ -1254,7 +1274,7 @@ end

function wait_child(supervisor::Supervisor, process::Supervisor)
wait(process.task)
put!(supervisor.inbox, ProcessReturn(process))
return put!(supervisor.inbox, ProcessReturn(process))
end

function evhandler(process, event)
Expand All @@ -1263,6 +1283,8 @@ function evhandler(process, event)
end
end

const __ROOT__::Supervisor = Supervisor(ROOT_SUPERVISOR, OrderedDict{String, Supervised}(), 1, 5, :one_for_one, :empty)
const __ROOT__::Supervisor = Supervisor(
ROOT_SUPERVISOR, OrderedDict{String,Supervised}(), 1, 5, :one_for_one, :empty
)

end
2 changes: 2 additions & 0 deletions test/test_permanent_supervisor.jl
Original file line number Diff line number Diff line change
Expand Up @@ -41,3 +41,5 @@ wait(handle)
delta = time() - t
@info "delta time for transient supervisor: $delta secs"
@test delta < 0.1

shutdown(handle)

0 comments on commit 72fa123

Please sign in to comment.