Skip to content

Commit

Permalink
定时任务添加时区,添加回调
Browse files Browse the repository at this point in the history
  • Loading branch information
woshiyanghai committed Sep 4, 2024
1 parent 7dc9e9f commit 1fce5c5
Showing 1 changed file with 10 additions and 2 deletions.
12 changes: 10 additions & 2 deletions sql/utils/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,25 @@
from celery.result import AsyncResult
from celery.exceptions import SoftTimeLimitExceeded
import logging
import pytz

logger = logging.getLogger("default")

def add_sql_schedule(name, run_date, workflow_id):
# 使用 Celery 的 apply_async 方法来调度任务
# 因这里存在循环调用问题,所以不能直接import execute
sig = signature('sql.utils.execute_sql.execute', args=(workflow_id,))
sig.apply_async(eta=run_date, task_id=name)
tz = pytz.timezone('Asia/Shanghai')
if run_date.tzinfo is None or run_date.tzinfo.utcoffset(run_date) is None:
run_date = tz.localize(run_date) # 确保 run_date 带有时区信息
execute_sql_sig = signature('sql.utils.execute_sql.execute', args=(workflow_id,))
# 创建回调任务的签名
callback_sig = signature('sql.utils.execute_sql.execute_callback', args=(name,workflow_id))
# 调度主任务,并链接回调任务
execute_sql_sig.apply_async(eta=run_date, task_id=name, link=callback_sig)
logger.warning(f"添加 SQL 定时执行任务:{name} 执行时间:{run_date}")



def add_kill_conn_schedule(name, run_date, instance_id, thread_id):
"""添加/修改终止数据库连接的定时任务"""
del_schedule(name)
Expand Down

0 comments on commit 1fce5c5

Please sign in to comment.