Skip to content

Commit

Permalink
Merge pull request #93 from k82cn/retry_task
Browse files Browse the repository at this point in the history
Retry tasks.
  • Loading branch information
Klaus Ma authored Jan 12, 2024
2 parents e2b775c + 9cf48c1 commit 47e3b30
Show file tree
Hide file tree
Showing 4 changed files with 34 additions and 6 deletions.
10 changes: 4 additions & 6 deletions common/src/apis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,10 @@ pub struct Task {
}

impl Task {
pub fn is_completed(&self) -> bool {
self.state == TaskState::Succeed || self.state == TaskState::Failed
}

pub fn gid(&self) -> TaskGID {
TaskGID {
ssn_id: self.ssn_id,
Expand Down Expand Up @@ -155,12 +159,6 @@ pub struct SessionContext {
pub common_data: Option<CommonData>,
}

impl Task {
pub fn is_completed(&self) -> bool {
self.state == TaskState::Succeed || self.state == TaskState::Failed
}
}

impl Session {
pub fn is_closed(&self) -> bool {
self.status.state == SessionState::Closed
Expand Down
1 change: 1 addition & 0 deletions session_manager/src/storage/engine/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ pub trait Engine: Send + Sync + 'static {
task_input: Option<TaskInput>,
) -> Result<Task, FlameError>;
async fn get_task(&self, gid: TaskGID) -> Result<Task, FlameError>;
async fn retry_task(&self, gid: TaskGID) -> Result<Task, FlameError>;
async fn delete_task(&self, gid: TaskGID) -> Result<Task, FlameError>;
async fn update_task_state(&self, gid: TaskGID, state: TaskState) -> Result<Task, FlameError>;
async fn find_tasks(&self, ssn_id: SessionID) -> Result<Vec<Task>, FlameError>;
Expand Down
24 changes: 24 additions & 0 deletions session_manager/src/storage/engine/sqlite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,30 @@ impl Engine for SqliteEngine {

task.try_into()
}

async fn retry_task(&self, gid: TaskGID) -> Result<Task, FlameError> {
let mut tx = self
.pool
.begin()
.await
.map_err(|e| FlameError::Storage(e.to_string()))?;

let sql = r#"UPDATE tasks SET state=? WHERE id=? AND ssn_id=? RETURNING *"#;
let task: TaskDao = sqlx::query_as(sql)
.bind(TaskState::Pending as i32)
.bind(gid.task_id)
.bind(gid.ssn_id)
.fetch_one(&mut *tx)
.await
.map_err(|e| FlameError::Storage(e.to_string()))?;

tx.commit()
.await
.map_err(|e| FlameError::Storage(e.to_string()))?;

task.try_into()
}

async fn update_task_state(&self, gid: TaskGID, state: TaskState) -> Result<Task, FlameError> {
let mut tx = self
.pool
Expand Down
5 changes: 5 additions & 0 deletions session_manager/src/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,11 @@ impl Storage {
let task_list = self.engine.find_tasks(ssn.id).await?;
let mut ssn = ssn.clone();
for task in task_list {
let task = match task.state {
TaskState::Running => self.engine.retry_task(task.gid()).await?,
_ => task,
};

ssn.update_task(&task);
}

Expand Down

0 comments on commit 47e3b30

Please sign in to comment.