Skip to content

Commit

Permalink
Merge pull request #4 from cardo-org/version-0.4.0
Browse files Browse the repository at this point in the history
Version 0.4.0
  • Loading branch information
attdona authored Feb 1, 2024
2 parents f795c0d + 6a3d355 commit ffd5055
Show file tree
Hide file tree
Showing 6 changed files with 87 additions and 50 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# Change Log

## 0.4.0

- Update supervisor status at process termination [#3](https://github.com/cardo-org/Visor.jl/issues/3)

## 0.3.0

- Setup supervisors settings with `setsupervisor` and `setroot` functions.
Expand Down
2 changes: 1 addition & 1 deletion Project.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
name = "Visor"
uuid = "cf786855-3531-4b86-ba6e-3e33dce7dcdb"
authors = ["Attilio Donà"]
version = "0.3.0"
version = "0.4.0"

[deps]
DataStructures = "864edb3b-99cc-5e75-8d2d-829cb0a9cfe8"
Expand Down
2 changes: 1 addition & 1 deletion docs/src/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ function producer(td)
end
end
# Tasks are started following the list order and are shutted down in reverse order:
# Tasks are started following the list order and are shut down in reverse order:
# producer is started last and terminated first.
tasks = [
process(consumer)
Expand Down
2 changes: 1 addition & 1 deletion examples/producer_consumer.jl
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ function producer(self)
end
end

# Tasks are started following the list order and are shutted down in reverse order:
# Tasks are started following the list order and are shut down in reverse order:
# producer is started last and terminated first.
tasks = [
process(consumer)
Expand Down
2 changes: 1 addition & 1 deletion examples/publisher_tcp.jl
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ function producer(self)
end
end

# Tasks are started following the list order and are shutted down in reverse order:
# Tasks are started following the list order and are shut down in reverse order:
# producer is started last and terminated first.
tasks = [
process(publisher; debounce_time=1)
Expand Down
125 changes: 79 additions & 46 deletions src/Visor.jl
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ export application
export call
export cast
export from
export getphase
export ifrestart
export process
export hassupervised
export isprocstarted
export isrequest
Expand All @@ -34,6 +34,7 @@ export process
export procs
export receive
export reply
export setphase
export shutdown
export startup
export supervise
Expand All @@ -52,7 +53,7 @@ struct Request <: Message
request::Any
end

Base.show(io::IO, message::Request) = print(io, "$(message.request)")
Base.show(io::IO, message::Request) = show(io, message.request)

Base.@kwdef struct Shutdown <: Command
reset::Bool = true
Expand All @@ -78,6 +79,7 @@ abstract type Supervised end
mutable struct Supervisor <: Supervised
id::String
status::SupervisedStatus
phase::Symbol
processes::OrderedDict{String,Supervised}
intensity::Int
period::Int
Expand All @@ -98,7 +100,16 @@ mutable struct Supervisor <: Supervised
evhandler=nothing,
)
return new(
id, idle, processes, intensity, period, strategy, terminateif, evhandler, []
id,
idle,
:undef,
processes,
intensity,
period,
strategy,
terminateif,
evhandler,
[],
)
end
function Supervisor(
Expand All @@ -114,6 +125,7 @@ mutable struct Supervisor <: Supervised
return new(
id,
idle,
:undef,
processes,
intensity,
period,
Expand All @@ -131,6 +143,7 @@ nproc(process::Supervisor) = length(process.processes)
mutable struct Process <: Supervised
id::String
status::SupervisedStatus
phase::Symbol
fn::Function
args::Tuple
namedargs::NamedTuple
Expand Down Expand Up @@ -161,6 +174,7 @@ mutable struct Process <: Supervised
new(
id,
idle,
:undef,
fn,
args,
namedargs,
Expand All @@ -179,6 +193,10 @@ mutable struct Process <: Supervised
end
end

setphase(node::Supervised, phase::Symbol) = node.phase = phase

getphase(node::Supervised) = node.phase

function __init__()
@async wait_signal(__ROOT__)
end
Expand All @@ -189,7 +207,7 @@ clear_hold(::Supervisor) = nothing
hold(process::Process) = process.onhold = true
hold(::Supervisor) = nothing

Base.show(io::IO, process::Supervised) = print(io, "$(process.id)")
Base.show(io::IO, process::Supervised) = print(io, process.id)

function dump(node::Supervisor)
children = [
Expand Down Expand Up @@ -303,7 +321,7 @@ end
process(id, fn;
args=(),
namedargs=(;),
force_interrupt_after::Real=0,
force_interrupt_after::Real=1.0,
stop_waiting_after::Real=Inf,
debounce_time=NaN,
thread=false,
Expand Down Expand Up @@ -414,6 +432,9 @@ struct ProcessInterrupt <: Exception
id::String
end

"Trigger a supervisor resync"
struct SupervisorResync end

## include("supervisor.jl")
# Returns the list of running nodes supervised by `supervisor` (processes and supervisors direct children).
function running_nodes(supervisor)
Expand Down Expand Up @@ -470,7 +491,7 @@ function start(sv::Supervisor)
end

function startchain(proc)
if isdefined(proc, :supervisor) &&
if isdefined(proc, :supervisor) &&
(!isdefined(proc.supervisor, :task) || istaskdone(proc.supervisor.task))
startchain(proc.supervisor)
else
Expand All @@ -493,7 +514,12 @@ function restart_policy(supervisor, process)
if !isnan(process.debounce_time)
sleep(process.debounce_time)
end
if supervisor.strategy === :one_for_one

if process.status === idle
# a shutdown was issued, terminate the restarts
@debug "[$process]: honore the shutdown request"
delete!(supervisor.processes, process.id)
elseif supervisor.strategy === :one_for_one
process.task = start(process)
elseif supervisor.strategy === :one_for_all
# If a child process terminates, all other child processes are terminated,
Expand Down Expand Up @@ -637,7 +663,7 @@ function supervisor_shutdown(
@debug "[$p] skipping shutdown: task already done"
else
# shutdown is sequential because in case a node refuses
# to shutdown remaining nodes aren't shutted down.
# to shutdown remaining nodes aren't shut down.
shutdown(p, reset)
end
end
Expand Down Expand Up @@ -708,29 +734,44 @@ function isalldone(supervisor)
return res
end

"""
resync(supervisor)
Restart processes previously stopped by supervisor policies.
Return true if all supervised processes terminated.
"""
function resync(supervisor)
if !isempty(supervisor.restarts)
@debug "[$supervisor] to be restarted: $(format4print(supervisor.restarts))"
# check all required processes are terminated
if all(proc -> proc.status !== running, supervisor.restarts)
@debug "[$supervisor] restarting processes"
restart_processes(supervisor, supervisor.restarts)
end
end

@debug "[$supervisor] procs:[$(format4print(supervisor.processes))], terminateif $(supervisor.terminateif)"
if supervisor.terminateif === :empty && isalldone(supervisor)
return true
end
return false
end

# Supervisor main loop.
function manage(supervisor)
@debug "[$supervisor] start supervisor event loop"
try
for msg in supervisor.inbox
trace(supervisor, msg)
@debug "[$supervisor] recv: $msg"
if isa(msg, Shutdown)
supervisor_shutdown(supervisor, nothing, msg.reset)
break
elseif isa(msg, ProcessReturn)
@debug "[$supervisor]: process [$(msg.process)] normal termination"
normal_return(supervisor, msg.process)
elseif isa(msg, ProcessInterrupted)
@debug "[$supervisor]: process [$(msg.process)] forcibly interrupted"
exitby_forced_shutdown(supervisor, msg.process)
elseif isa(msg, ProcessError)
@debug "[$supervisor]: applying restart policy for [$(msg.process)] ($(Int.(floor.(msg.process.startstamps))))"
msg.process.status = failed
exitby_exception(supervisor, msg.process)
elseif isa(msg, SupervisorResync)
# do nothing here, just a resync() is needed
elseif isa(msg, ProcessFatal)
@async evhandler(msg.process, msg)
@debug "[$supervisor] manage process fatal: delete [$(msg.process)]"
@debug "[$supervisor] manage process fatal: process done [$(msg.process)]"
msg.process.status = done
elseif isa(msg, Supervised)
add_node(supervisor, msg)
Expand All @@ -749,24 +790,13 @@ function manage(supervisor)
put!(msg.inbox, ErrorException("unknown message [$(msg.request)]"))
end
catch e
#showerror(stdout, e, catch_backtrace())
put!(msg.inbox, e)
end
else
unknown_message(supervisor, msg)
end

if !isempty(supervisor.restarts)
@debug "[$supervisor] to be restarted: $(format4print(supervisor.restarts))"
# check all required processes are terminated
if all(proc -> proc.status !== running, supervisor.restarts)
@debug "[$supervisor] restarting processes"
restart_processes(supervisor, supervisor.restarts)
end
end

@debug "[$supervisor] procs:[$(format4print(supervisor.processes))], terminateif $(supervisor.terminateif)"
if supervisor.terminateif === :empty && isalldone(supervisor)
if resync(supervisor)
break
end
end
Expand All @@ -778,7 +808,7 @@ function manage(supervisor)
while isready(supervisor.inbox)
msg = take!(supervisor.inbox)
if isrequest(msg)
put!(msg.inbox, ErrorException("[$(msg.request)]: supervisor shutted down"))
put!(msg.inbox, ErrorException("[$(msg.request)]: supervisor shut down"))
else
@debug "[$supervisor]: skipped msg: $msg"
end
Expand Down Expand Up @@ -1290,39 +1320,42 @@ end
function wait_child(supervisor::Supervisor, process::Process)
try
wait(process.task)
put!(supervisor.inbox, ProcessReturn(process))
normal_return(supervisor, process)
trace(supervisor, ProcessReturn(process))
catch e
taskerr = e.task.exception
process.status = failed
if isa(taskerr, ProcessInterrupt)
@debug "[$process] exit on exception: $taskerr"
put!(supervisor.inbox, ProcessInterrupted(process))
# elseif isa(taskerr, MethodError)
# @warn "[$process]: task failed: $taskerr"
# put!(supervisor.inbox, ProcessFatal(process))
@debug "[$supervisor]: process [$process] forcibly interrupted"
exitby_forced_shutdown(supervisor, process)
trace(supervisor, ProcessInterrupted(process))
else
@debug "[$process] exception: $taskerr"
evhandler(process, taskerr)
#showerror(stdout, e, catch_backtrace())
@debug "[$supervisor]: applying restart policy for [$process] ($(Int.(floor.(process.startstamps))))"
exitby_exception(supervisor, process)
trace(supervisor, ProcessInterrupted(process))

if isa(taskerr, Exception)
put!(supervisor.inbox, ProcessError(process, taskerr))
trace(supervisor, ProcessError(process, taskerr))
else
put!(
supervisor.inbox,
ProcessError(process, SystemError("process exception")),
)
trace(supervisor, ProcessError(process, SystemError("process exception")))
end
end
finally
if process.restart === :temporary
@debug "removing temporary process $process"
delete!(supervisor.processes, process.id)
end
put!(supervisor.inbox, SupervisorResync())
end
end

function wait_child(supervisor::Supervisor, process::Supervisor)
wait(process.task)
return put!(supervisor.inbox, ProcessReturn(process))
normal_return(supervisor, process)
put!(supervisor.inbox, SupervisorResync())
return nothing
end

function evhandler(process, event)
Expand Down

2 comments on commit ffd5055

@attdona
Copy link
Member Author

@attdona attdona commented on ffd5055 Feb 1, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@JuliaRegistrator register()

@JuliaRegistrator
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Registration pull request created: JuliaRegistries/General/100068

Tip: Release Notes

Did you know you can add release notes too? Just add markdown formatted text underneath the comment after the text
"Release notes:" and it will be added to the registry PR, and if TagBot is installed it will also be added to the
release that TagBot creates. i.e.

@JuliaRegistrator register

Release notes:

## Breaking changes

- blah

To add them here just re-invoke and the PR will be updated.

Tagging

After the above pull request is merged, it is recommended that a tag is created on this repository for the registered package version.

This will be done automatically if the Julia TagBot GitHub Action is installed, or can be done manually through the github interface, or via:

git tag -a v0.4.0 -m "<description of version>" ffd50556af12a403ec0ce9e808eba3354351f223
git push origin v0.4.0

Please sign in to comment.