From 66b4e62a80cedbf0100c9444d29e0891de2582b9 Mon Sep 17 00:00:00 2001 From: Gerard Segarra Date: Fri, 26 Apr 2024 12:29:27 +0200 Subject: [PATCH] Get job --- src/app.py | 12 ++++++------ src/job_processor.py | 13 ++++++------- 2 files changed, 12 insertions(+), 13 deletions(-) diff --git a/src/app.py b/src/app.py index c47d55b..df6f859 100644 --- a/src/app.py +++ b/src/app.py @@ -37,7 +37,7 @@ logging.getLogger('apscheduler.executors.default').setLevel(logging.WARNING) jobs = dict() -node_ids = set() +node_ids = dict() # check all calls are valid @@ -79,10 +79,10 @@ def process_workflow_job(): if job.action == "queued": # add to memory jobs[job.id] = job - node_ids.add(job.node_id) + node_ids[job.node_id] = job elif job.action == "in_progress": - node_ids.discard(job.node_id) + node_ids.pop(job.node_id, None) job_requested = jobs.get(job.id) time_to_start = None if not job_requested: @@ -110,7 +110,7 @@ def process_workflow_job(): jobs[job.id] = job elif job.action == "completed": - node_ids.discard(job.node_id) + node_ids.pop(job.node_id, None) job_requested = jobs.get(job.id) if not job_requested: app.logger.warning(f"Job {job.id} is {job.action} but not stored!") @@ -151,11 +151,11 @@ def monitor_queued_jobs(): if not node_ids: return - jobs_data = query_nodes(list(node_ids)) + jobs_data = query_nodes(node_ids.keys()) details = extract_jobs_metrics_from_data(jobs_data, node_ids) for run in details: - job = jobs[run["job_id"]] + job = node_ids[run["job_id"]] app.logger.info(f"Got job {job}") statsd.histogram( 'midokura.github_runners.jobs.seconds_in_queue.histogram', diff --git a/src/job_processor.py b/src/job_processor.py index b378331..10de9e0 100644 --- a/src/job_processor.py +++ b/src/job_processor.py @@ -1,22 +1,16 @@ from datetime import datetime -def extract_jobs_metrics_from_data(jobs_data: dict, queued_node_ids_set: set): +def extract_jobs_metrics_from_data(jobs_data: dict, queued_node_ids: dict): jobs_metrics = [] for job in jobs_data["nodes"]: - - if job["status"] != "QUEUED": - queued_node_ids_set.discard(job["id"]) - continue - started_at = datetime.strptime(job["startedAt"], "%Y-%m-%dT%H:%M:%SZ") now = datetime.now() context_details = { "action": "monitor_queued", "job_id": job["id"], - "job_run": job["checkSuite"]["workflowRun"]["runNumber"], "job_name": job["name"], "status": job["status"], "started_at": job["startedAt"], @@ -25,4 +19,9 @@ def extract_jobs_metrics_from_data(jobs_data: dict, queued_node_ids_set: set): } jobs_metrics.append(context_details) + + if job["status"] != "QUEUED": + queued_node_ids.pop(job["id"], None) + continue + return jobs_metrics