From 9cf48c1ed5f5be285b3a73090a8e4b145054764d Mon Sep 17 00:00:00 2001 From: Klaus Ma Date: Fri, 12 Jan 2024 14:00:21 +0000 Subject: [PATCH] Retry tasks. Signed-off-by: Klaus Ma --- common/src/apis.rs | 10 ++++---- session_manager/src/storage/engine/mod.rs | 1 + session_manager/src/storage/engine/sqlite.rs | 24 ++++++++++++++++++++ session_manager/src/storage/mod.rs | 5 ++++ 4 files changed, 34 insertions(+), 6 deletions(-) diff --git a/common/src/apis.rs b/common/src/apis.rs index e233a60..8e9d5ba 100644 --- a/common/src/apis.rs +++ b/common/src/apis.rs @@ -89,6 +89,10 @@ pub struct Task { } impl Task { + pub fn is_completed(&self) -> bool { + self.state == TaskState::Succeed || self.state == TaskState::Failed + } + pub fn gid(&self) -> TaskGID { TaskGID { ssn_id: self.ssn_id, @@ -155,12 +159,6 @@ pub struct SessionContext { pub common_data: Option, } -impl Task { - pub fn is_completed(&self) -> bool { - self.state == TaskState::Succeed || self.state == TaskState::Failed - } -} - impl Session { pub fn is_closed(&self) -> bool { self.status.state == SessionState::Closed diff --git a/session_manager/src/storage/engine/mod.rs b/session_manager/src/storage/engine/mod.rs index 49129ff..e502f92 100644 --- a/session_manager/src/storage/engine/mod.rs +++ b/session_manager/src/storage/engine/mod.rs @@ -41,6 +41,7 @@ pub trait Engine: Send + Sync + 'static { task_input: Option, ) -> Result; async fn get_task(&self, gid: TaskGID) -> Result; + async fn retry_task(&self, gid: TaskGID) -> Result; async fn delete_task(&self, gid: TaskGID) -> Result; async fn update_task_state(&self, gid: TaskGID, state: TaskState) -> Result; async fn find_tasks(&self, ssn_id: SessionID) -> Result, FlameError>; diff --git a/session_manager/src/storage/engine/sqlite.rs b/session_manager/src/storage/engine/sqlite.rs index 65dda8d..20804d8 100644 --- a/session_manager/src/storage/engine/sqlite.rs +++ b/session_manager/src/storage/engine/sqlite.rs @@ -298,6 +298,30 @@ impl Engine for SqliteEngine { task.try_into() } + + async fn retry_task(&self, gid: TaskGID) -> Result { + let mut tx = self + .pool + .begin() + .await + .map_err(|e| FlameError::Storage(e.to_string()))?; + + let sql = r#"UPDATE tasks SET state=? WHERE id=? AND ssn_id=? RETURNING *"#; + let task: TaskDao = sqlx::query_as(sql) + .bind(TaskState::Pending as i32) + .bind(gid.task_id) + .bind(gid.ssn_id) + .fetch_one(&mut *tx) + .await + .map_err(|e| FlameError::Storage(e.to_string()))?; + + tx.commit() + .await + .map_err(|e| FlameError::Storage(e.to_string()))?; + + task.try_into() + } + async fn update_task_state(&self, gid: TaskGID, state: TaskState) -> Result { let mut tx = self .pool diff --git a/session_manager/src/storage/mod.rs b/session_manager/src/storage/mod.rs index 8bc10a1..dc20423 100644 --- a/session_manager/src/storage/mod.rs +++ b/session_manager/src/storage/mod.rs @@ -90,6 +90,11 @@ impl Storage { let task_list = self.engine.find_tasks(ssn.id).await?; let mut ssn = ssn.clone(); for task in task_list { + let task = match task.state { + TaskState::Running => self.engine.retry_task(task.gid()).await?, + _ => task, + }; + ssn.update_task(&task); }