Skip to content

Commit

Permalink
review
Browse files Browse the repository at this point in the history
  • Loading branch information
attdona committed Dec 6, 2023
1 parent fb66a05 commit 7b73582
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 86 deletions.
111 changes: 28 additions & 83 deletions src/Visor.jl
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ end
uid()::String = string(uuid4())

format4print(dict::AbstractDict) = join(["$k=>$v($(v.status))" for (k, v) in dict], ", ")
format4print(lst::AbstractArray) = join(["$(v.id)=>$v($(v.status))" for v in lst], ", ")

@enum SupervisedStatus idle = 1 running = 2 failed = 3 done = 4

Expand All @@ -75,6 +76,7 @@ mutable struct Supervisor <: Supervised
strategy::Symbol
terminateif::Symbol
evhandler::Union{Nothing,Function}
restarts::Vector{Supervised}
supervisor::Supervisor
inbox::Channel
task::Task
Expand All @@ -87,7 +89,7 @@ mutable struct Supervisor <: Supervised
terminateif=:empty,
evhandler=nothing
)
return new(id, idle, processes, intensity, period, strategy, terminateif, evhandler)
return new(id, idle, processes, intensity, period, strategy, terminateif, evhandler, [])
end
function Supervisor(
parent::Supervisor,
Expand All @@ -99,7 +101,7 @@ mutable struct Supervisor <: Supervised
terminateif=:empty,
evhandler=nothing
)
return new(id, idle, processes, intensity, period, strategy, terminateif, evhandler, parent)
return new(id, idle, processes, intensity, period, strategy, terminateif, evhandler, [], parent)
end
end

Expand Down Expand Up @@ -159,10 +161,10 @@ mutable struct Process <: Supervised
end

clear_hold(process::Process) = process.onhold = false
clear_hold(process::Supervisor) = nothing
clear_hold(::Supervisor) = nothing

hold(process::Process) = process.onhold = true
hold(process::Supervisor) = nothing
hold(::Supervisor) = nothing

Base.show(io::IO, process::Supervised) = print(io, "$(process.id)")

Expand Down Expand Up @@ -220,7 +222,7 @@ julia> using Visor
julia> mytask(pd) = ();
julia> supervisor("mysupervisor", process(mytask))
Visor.SupervisorSpec("mysupervisor", Visor.Spec[Visor.ProcessSpec("mytask", mytask, (), NamedTuple(), 1.0, Inf, NaN, false, :transient)], 1, 5, :one_for_one, :empty)
mysupervisor
```
```jldoctest
Expand All @@ -231,7 +233,7 @@ julia> tsk1(pd) = ();
julia> tsk2(pd) = ();
julia> supervisor("mysupervisor", [process(tsk1), process(tsk2)])
Visor.SupervisorSpec("mysupervisor", Visor.Spec[Visor.ProcessSpec("tsk1", tsk1, (), NamedTuple(), 1.0, Inf, NaN, false, :transient), Visor.ProcessSpec("tsk2", tsk2, (), NamedTuple(), 1.0, Inf, NaN, false, :transient)], 1, 5, :one_for_one, :empty)```
mysupervisor
See [Supervisor](@ref) documentation for more details.
"""
Expand All @@ -251,10 +253,6 @@ function supervisor(
error("immediate shutdown of supervisor [$id] with no processes")
end

## specmap = OrderedDict{String,Supervised}()
## for spec in specs
## specmap[id] = spec
## end
return Supervisor(id,
OrderedDict{String,Supervised}(map(spec -> spec.id => spec, specs)),
intensity, period, strategy, terminateif)
Expand Down Expand Up @@ -371,7 +369,6 @@ end

# The message `ProcessInterrupted` signals that
# process node termination was forced by supervisor.manage: Visor.ProcessError(serve_zeromq:79e6b5c0-05a3-41a4-99a7-8ebec038b2d0, ZMQ.StateError("Address already in use"))

struct ProcessInterrupted <: ExitReason
process::Supervised
end
Expand All @@ -394,7 +391,6 @@ function running_nodes(supervisor)
end

function start(proc::Process)
@debug "[$proc] starting task"
if proc.thread
proc.task = Threads.@spawn proc.fn(proc, proc.args...; proc.namedargs...)
else
Expand All @@ -412,8 +408,6 @@ function start(sv::Supervisor)
sv.task = @async manage(sv)
sv.status = running

yield()

@debug "[$sv] task: $(pointer_from_objref(sv.task))"
for node in values(sv.processes)
@debug "[$sv]: starting [$node]"
Expand Down Expand Up @@ -444,33 +438,15 @@ function restart_policy(supervisor, process)
# If a child process terminates, all other child processes are terminated,
# and then all child processes, including the terminated one, are restarted.
@debug "[$supervisor] restart strategy: $(supervisor.strategy)"
supervisor_shutdown(supervisor)
# start again from start
@debug "[$supervisor] restarting processes:[$(format4print(supervisor.processes))]"
restart_processes(supervisor, values(supervisor.processes))
stopped = supervisor_shutdown(supervisor)
supervisor.restarts = stopped
elseif supervisor.strategy === :rest_for_one
stopped = supervisor_shutdown(supervisor, process)
restart_processes(supervisor, reverse(stopped))
supervisor.restarts = stopped
end
end
end

##function restart_processes(procs)
## for proc in procs
## proc.task = start(proc)
## if isdefined(proc, :debounce_time) && !isnan(proc.debounce_time)
## @debug "[$proc] waiting for debounce_time $(proc.debounce_time) secs"
## sleep(proc.debounce_time)
## end
## if istaskfailed(proc.task)
## @debug "[$proc] task failed"
## break
## else
## clear_hold(proc)
## end
## end
##end

function restart_processes(supervisor, procs)
for proc in procs
add_node(supervisor, proc)
Expand All @@ -485,6 +461,7 @@ function restart_processes(supervisor, procs)
clear_hold(proc)
end
end
supervisor.restarts = []
end


Expand Down Expand Up @@ -515,22 +492,9 @@ function startup(supervisor::Supervisor, spec::Supervised)
return call(supervisor, spec)
end

#function startup(node::Supervised; start_last=true)
# start(node)
# if isdefined(node, :supervisor)
# if start_last
# node.supervisor.processes[node.id] = node
# else
# node.supervisor.processes = merge(
# OrderedDict(node.id => node), node.supervisor.processes
# )
# end
# end
#end

function add_node(id::String, supervisor::Supervisor, spec::Supervised)
try
@debug "[$supervisor] add proc: [$spec]"
@debug "[$supervisor] starting proc: [$spec]"
if isa(spec, Supervisor)
# it is a supervisor
proc = start_processes(
Expand All @@ -545,26 +509,13 @@ function add_node(id::String, supervisor::Supervisor, spec::Supervised)
@async wait_child(supervisor, proc)
return proc
else
#### proc = Process(
#### id,
#### spec.fn,
#### spec.args,
#### spec.namedargs,
#### spec.force_interrupt_after,
#### spec.stop_waiting_after,
#### spec.debounce_time,
#### spec.thread,
#### spec.restart,
#### supervisor,
#### )
spec.supervisor = supervisor
supervisor.processes[spec.id] = spec
start(spec)
return spec
end
catch e
@error "ERR: $e"
showerror(stdout, e, catch_backtrace())
@error "[$supervisor] starting proc [$spec]: $e"
end
end

Expand All @@ -575,16 +526,10 @@ end
#
# `add_node` is for internal use: it is not thread safe.
# For thread safety use the `startup` method.
function add_node(supervisor::Supervisor, spec::Supervised)
id::String = spec.id
add_node(supervisor::Supervisor, spec::Supervised) = add_node(spec.id, supervisor, spec)

#### if haskey(supervisor.processes, id)
#### id = id * ":" * uid()
#### end
return add_node(id, supervisor, spec)
end

# Starts processes defined by `specs` specification.
# Start processes defined by `specs` specification.
# A dedicated supervisor is created and attached to parent supervisor if `parent`
# is a supervisor instance.
# `intensity` and `period` define process restart policy.
Expand Down Expand Up @@ -644,10 +589,9 @@ function supervisor_shutdown(
# shutdown is sequential because in case a node refuses
# to shutdown remaining nodes aren't shutted down.
shutdown(p, reset)
p.status = done
end
end
return stopped_procs
return reverse(stopped_procs)
end

function root_supervisor(process::Supervised)
Expand All @@ -666,7 +610,6 @@ function normal_return(supervisor::Supervisor, child::Process)
else
@debug "[$supervisor] normal_return: [$child] done"
child.status = done
#### delete!(supervisor.processes, child.id)
end
end
end
Expand All @@ -677,15 +620,13 @@ function exitby_exception(supervisor::Supervisor, child::Process)
restart_policy(supervisor, child)
else
@debug "[$supervisor] exitby_exception: delete [$child]"
#### delete!(supervisor.processes, child.id)
end
end
end

function exitby_forced_shutdown(supervisor::Supervisor, child::Process)
if istaskdone(child.task)
@debug "[$supervisor] exitby_forced_shutdown: delete [$child]"
#### delete!(supervisor.processes, child.id)
end
end

Expand All @@ -694,7 +635,6 @@ function normal_return(supervisor::Supervisor, child::Supervisor)
if istaskdone(child.task)
@debug "[$supervisor] normal_return: supervisor [$child] done"
child.status = done
#### delete!(supervisor.processes, child.id)
else
@debug "[$child] spourious return signal: task was restarted"
end
Expand All @@ -711,14 +651,13 @@ function trace(supervisor, msg)
end

function isalldone(supervisor)
#### res = all(proc -> istaskdone(proc.task), values(supervisor.processes))
res = all(proc -> proc.status === done, values(supervisor.processes))
res
end

# Supervisor main loop.
function manage(supervisor)
@info "[$supervisor]: start manage"
@debug "[$supervisor] start supervisor event loop"
try
for msg in supervisor.inbox
trace(supervisor, msg)
Expand All @@ -740,9 +679,6 @@ function manage(supervisor)
@async evhandler(msg.process, msg)
@debug "[$supervisor] manage process fatal: delete [$(msg.process)]"
msg.process.status = done

### delete!(supervisor.processes, msg.process.id)
#supervisor_shutdown(supervisor)
elseif isa(msg, Supervised)
add_node(supervisor, msg)
elseif isrequest(msg)
Expand All @@ -764,6 +700,16 @@ function manage(supervisor)
unknown_message(supervisor, msg)
end

if !isempty(supervisor.restarts)
@debug "[$supervisor] to be restarted: $(format4print(supervisor.restarts))"
# check all required processes are terminated
if all(proc->proc.status!==running, supervisor.restarts)
@debug "[$supervisor] restarting processes"
restart_processes(supervisor, supervisor.restarts)
end

end

@debug "[$supervisor] procs:[$(format4print(supervisor.processes))], terminateif $(supervisor.terminateif)"
if supervisor.terminateif === :empty && isalldone(supervisor)
break
Expand Down Expand Up @@ -849,7 +795,6 @@ function shutdown(node::Process, _reset::Bool=true)
elseif node.force_interrupt_after == 0
force_shutdown(node)
else
## @debug "[$node] scheduling forced shutdown after $(node.force_interrupt_after)"
timer = Timer((tim) -> force_shutdown(node), node.force_interrupt_after)
end

Expand Down
1 change: 0 additions & 1 deletion test/test_combo.jl
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ end, 8)
terminated = true

sleep(1)
##Visor.procs(Visor.__ROOT__)

wait(sv)
sleep(1)
Expand Down
2 changes: 0 additions & 2 deletions test/test_supervisor.jl
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,7 @@ function terminateif_shutdown()
tim = Timer(request_shutdown, 2)
@async add_process()

@info "AAAAAAAAAAAAAAAAAAAAAAAAAAAAA"
supervise(supervisor("dynamic"; terminateif=:shutdown))
@info "BBBBBBBBBBBBBBBBBBBBBBBBBBBBB"
return close(tim)
end

Expand Down

0 comments on commit 7b73582

Please sign in to comment.