Skip to content

Commit

Permalink
feature: notify-styled watchdogs
Browse files Browse the repository at this point in the history
  • Loading branch information
sisungo committed Aug 17, 2024
1 parent 3c308cd commit c98ce58
Show file tree
Hide file tree
Showing 6 changed files with 60 additions and 41 deletions.
13 changes: 8 additions & 5 deletions airup-sdk/src/files/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -313,16 +313,16 @@ pub struct Watchdog {
/// Kind of the watchdog to use.
pub kind: Option<WatchdogKind>,

/// Time interval of polling health check command.
#[serde(default = "Watchdog::default_health_check_interval")]
pub health_check_interval: u32,
/// Time interval of watchdog checking.
#[serde(default = "Watchdog::default_interval")]
pub health_interval: u32,

/// Also mark the service failed on successful exits (`$? == 0`)
#[serde(default)]
pub successful_exit: bool,
}
impl Watchdog {
fn default_health_check_interval() -> u32 {
fn default_interval() -> u32 {
5000
}
}
Expand All @@ -331,8 +331,11 @@ impl Watchdog {
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq)]
#[serde(rename_all = "kebab-case")]
pub enum WatchdogKind {
/// Make the supervisor poll to execute the health check command.
/// The supervisor polls to execute the health check command.
HealthCheck,

/// The service regularly notifies the supervisor that it's normally running.
Notify,
}

/// Resource limitation.
Expand Down
17 changes: 5 additions & 12 deletions airupd/src/storage/services.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,25 +48,18 @@ impl Services {
files::read_merge(paths).await
}

/// Lists names of all services installed on the system, including sideloaded ones and on-filesystem ones.
/// Lists names of all services installed in the storage.
pub async fn list(&self) -> Vec<String> {
let mut result = Vec::new();
self.base_chain
.read_chain()
.await
.map(IntoIterator::into_iter)
.into_iter()
.flatten()
.filter(|x| {
let x = x.to_string_lossy();
!x.starts_with('.') && x.ends_with(".airs")
})
.for_each(|x| {
let name = x.to_string_lossy();
let name = name.strip_suffix(".airs").unwrap_or(&name);
result.push(name.into());
});
result
.map(|x| String::from(x.to_string_lossy()))
.filter(|x| !x.starts_with('.') && x.ends_with(".airs"))
.map(|x| x.strip_suffix(".airs").unwrap_or(&x).into())
.collect()
}
}
impl Default for Services {
Expand Down
57 changes: 37 additions & 20 deletions airupd/src/supervisor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ impl Manager {
continue;
}
};
if let Err(err) = v.update_def(Box::new(new)).await {
if let Err(err) = v.update_manifest(Box::new(new)).await {
errors.push((k.into(), err));
}
}
Expand Down Expand Up @@ -232,9 +232,12 @@ impl SupervisorHandle {
}
}

pub async fn update_def(&self, new: Box<Service>) -> Result<Service, Error> {
pub async fn update_manifest(&self, new: Box<Service>) -> Result<Service, Error> {
let (tx, rx) = oneshot::channel();
self.sender.send(Request::UpdateDef(new, tx)).await.unwrap();
self.sender
.send(Request::UpdateManifest(new, tx))
.await
.unwrap();
rx.await.unwrap()
}
}
Expand Down Expand Up @@ -263,7 +266,7 @@ impl Supervisor {
req = self.receiver.recv() => self.handle_req(req?).await,
Some(wait) = do_child(&self.context, self.current_task.has_task()) => self.handle_wait(wait).await,
Some(handle) = self.current_task.wait() => self.handle_wait_task(handle).await,
Some(_) = Timers::wait(&mut self.timers.health_check) => self.handle_health_check().await,
Some(_) = Timers::wait(&mut self.timers.watchdog) => self.handle_watchdog().await,
Ok(event) = self.events.recv() => self.handle_event(&event).await,
}
}
Expand All @@ -290,8 +293,8 @@ impl Supervisor {
Request::Reload(chan) => {
_ = chan.send(self.reload_service().await);
}
Request::UpdateDef(new, chan) => {
_ = chan.send(self.update_def(*new).await);
Request::UpdateManifest(new, chan) => {
_ = chan.send(self.update_manifest(*new).await);
}
Request::InterruptTask(chan) => {
let handle = self.current_task.interrupt();
Expand All @@ -315,6 +318,12 @@ impl Supervisor {

/// Called when an event is triggered.
async fn handle_event(&mut self, event: &Event) {
if event.id == "notify_watchdog" && event.payload == self.context.service.name {
if let Some(x) = &mut self.timers.watchdog {
x.reset();
}
return;
}
if let Some(exec) = self.context.service.event_handlers.get(&event.id) {
let Ok(mut ace) = task::ace(&self.context).await else {
return;
Expand Down Expand Up @@ -345,19 +354,22 @@ impl Supervisor {
return;
};
if handle.task_class() == "HealthCheck" {
self.context.last_error.set(Error::Watchdog);
_ = self.stop_service().await;
self.on_watchdog_failure().await;
}
if self.context.last_error.take_autosave() {
self.context.last_error.set(error);
}
}

/// Called when the health check timer goes off.
async fn handle_health_check(&mut self) {
/// Called when the watchdog timer goes off.
async fn handle_watchdog(&mut self) {
if self.context.status.get() == Status::Active {
_ = self.health_check().await;
} else if let Some(alarm) = &mut self.timers.health_check {
match self.context.service.watchdog.kind {
Some(WatchdogKind::HealthCheck) => _ = self.health_check().await,
Some(WatchdogKind::Notify) => _ = self.on_watchdog_failure().await,
None => (),
};
} else if let Some(alarm) = &mut self.timers.watchdog {
alarm.disable();
}
}
Expand All @@ -381,12 +393,12 @@ impl Supervisor {
}
}

/// Updates service definition of the supervisor. On success, returns the elder service definition.
/// Updates service manifest of the supervisor. On success, returns the elder service manifest.
///
/// # Errors
/// This method would fail if the internal context has more than one reference, usually when a task is running for this
/// supervisor.
async fn update_def(&mut self, new: Service) -> Result<Service, Error> {
async fn update_manifest(&mut self, new: Service) -> Result<Service, Error> {
self.current_task.interrupt_non_important().await;
let context = Arc::get_mut(&mut self.context).ok_or(Error::TaskExists)?;
if context.service != new {
Expand Down Expand Up @@ -483,6 +495,11 @@ impl Supervisor {
})
.await
}

async fn on_watchdog_failure(&mut self) {
self.context.last_error.set(Error::Watchdog);
_ = self.stop_service().await;
}
}

/// A container of current running task in the supervisor.
Expand Down Expand Up @@ -701,15 +718,15 @@ impl RetryContext {
/// A structure that provides a collection of supervisor timers.
#[derive(Debug, Default)]
struct Timers {
health_check: Option<Alarm>,
watchdog: Option<Alarm>,
}
impl From<&Service> for Timers {
fn from(service: &Service) -> Self {
let mut result = Self::default();

if matches!(service.watchdog.kind, Some(WatchdogKind::HealthCheck)) {
result.health_check = Some(Alarm::new(Duration::from_millis(
service.watchdog.health_check_interval as _,
if service.watchdog.kind.is_some() {
result.watchdog = Some(Alarm::new(Duration::from_millis(
service.watchdog.health_interval as _,
)));
}

Expand All @@ -727,7 +744,7 @@ impl Timers {

/// Called when the service started.
fn on_start(&mut self) {
if let Some(alarm) = &mut self.health_check {
if let Some(alarm) = &mut self.watchdog {
alarm.enable();
}
}
Expand Down Expand Up @@ -883,7 +900,7 @@ enum Request {
Stop(oneshot::Sender<Result<Arc<dyn TaskHandle>, Error>>),
Kill(oneshot::Sender<Result<(), Error>>),
Reload(oneshot::Sender<Result<Arc<dyn TaskHandle>, Error>>),
UpdateDef(Box<Service>, oneshot::Sender<Result<Service, Error>>),
UpdateManifest(Box<Service>, oneshot::Sender<Result<Service, Error>>),
InterruptTask(oneshot::Sender<Result<Arc<dyn TaskHandle>, Error>>),
MakeActive(oneshot::Sender<Result<Arc<dyn TaskHandle>, Error>>),
}
Expand Down
6 changes: 3 additions & 3 deletions airupd/src/supervisor/task/cleanup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ impl TaskHandle for CleanupServiceHandle {
}

fn is_important(&self) -> bool {
self.important.load(atomic::Ordering::SeqCst)
self.important.load(atomic::Ordering::Acquire)
}

fn send_interrupt(&self) {
Expand Down Expand Up @@ -94,15 +94,15 @@ impl CleanupService {
)
.await;

self.important.store(false, atomic::Ordering::SeqCst);
self.important.store(false, atomic::Ordering::Release);
self.helper
.would_interrupt(async {
tokio::time::sleep(Duration::from_millis(self.context.service.retry.delay)).await;
})
.await?;

if self.retry {
self.important.store(true, atomic::Ordering::SeqCst);
self.important.store(true, atomic::Ordering::Release);
let handle = super::start::start(self.context.clone());
tokio::select! {
_ = handle.wait() => {},
Expand Down
2 changes: 1 addition & 1 deletion airupfx/airupfx-time/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[package]
name = "airupfx-time"
authors = ["sisungo <[email protected]>"]
version = "0.9.2"
version = "0.10.6"
edition = "2021"
license = "MIT"
publish = false
Expand Down
6 changes: 6 additions & 0 deletions airupfx/airupfx-time/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,12 @@ impl Alarm {
self.interval = None;
}

pub fn reset(&mut self) {
if let Some(interval) = self.interval.as_mut() {
interval.reset();
}
}

pub async fn wait(&mut self) -> Option<()> {
match &mut self.interval {
Some(x) => {
Expand Down

0 comments on commit c98ce58

Please sign in to comment.