Skip to content

Commit

Permalink
Add track for jobs
Browse files Browse the repository at this point in the history
  • Loading branch information
gerardsegarra committed Apr 29, 2024
1 parent 13b54f3 commit d6a97d0
Show file tree
Hide file tree
Showing 2 changed files with 96 additions and 13 deletions.
21 changes: 10 additions & 11 deletions src/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,21 +128,20 @@ def process_workflow_job():

@scheduler.task("interval", id="monitor_jobs", seconds=15)
def monitor_jobs():
queued_nodes = [
job.node_id for job in job_handler.values() if job.status == "queued"
]
queued_nodes = [job.node_id for job in job_handler.queued.values()]
jobs_data = query_jobs(queued_nodes)

for job_data in jobs_data["nodes"]:
job = job_handler.queued[job_data["id"]]
job = job_handler.queued.get(job_data["id"])
if job_data["status"] != "QUEUED":
job = job_handler.queued.pop(job["id"], None)
job.status = job_data["status"].lower()
job.in_progress_at = parse_datetime(job_data["startedAt"])
job.completed_at = parse_datetime(job_data["completedAt"])
job.final_queued_time_updated = True

job.send_queued_metrics()
job = job_handler.queued.pop(job_data["id"], None)
if job:
job.status = job_data["status"].lower()
job.in_progress_at = parse_datetime(job_data["startedAt"])
job.completed_at = parse_datetime(job_data["completedAt"])
job.final_queued_time_updated = True

job.send_queued_metric()


@scheduler.task("interval", id="monitor_queued", seconds=30)
Expand Down
88 changes: 86 additions & 2 deletions tests/test_app.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,86 @@
def test_monitor_jobs():
assert False
from unittest.mock import patch
import pytest
import app
from app import monitor_jobs
from github import GithubJob
from jobs import Job, JobEventsHandler


@pytest.fixture
def queued_job():
return Job(
GithubJob(
{
"workflow_job": {
"id": "workflow_id_queued",
"name": "workflow name",
"run_id": 1234567890,
"started_at": "2024-04-29T12:43:16Z",
"completed_at": None,
"node_id": "CR_queued_1234",
"runner_name": "test runner",
"runner_group_name": "Runner Group Test",
},
"repository": {"full_name": "test/repo"},
"action": "queued",
}
)
)


@pytest.fixture
def in_progress_job():
return Job(
GithubJob(
{
"workflow_job": {
"id": "workflow_id_in_progress",
"name": "workflow name",
"run_id": 1234567890,
"started_at": "2024-04-29T12:43:16Z",
"completed_at": None,
"node_id": "CR_in_progress_1234",
"runner_name": "test runner",
"runner_group_name": "Runner Group Test",
},
"repository": {"full_name": "test/repo"},
"action": "queued",
}
)
)


@patch("jobs.Job.send_queued_metric")
@patch("app.query_jobs")
def test_monitor_jobs(
query_jobs_mock, send_queued_metric_mock, queued_job, in_progress_job
):
app.job_handler = JobEventsHandler()
app.job_handler.queued = {
"workflow_id_queued": queued_job,
"workflow_id_in_progress": in_progress_job,
}

query_jobs_mock.return_value = {
"nodes": [
{
"id": "workflow_id_queued",
"status": "QUEUED",
"startedAt": "2024-04-29T12:43:16Z",
"completedAt": None,
},
{
"id": "workflow_id_in_progress",
"status": "IN_PROGRESS",
"startedAt": "2024-04-29T12:43:32Z",
"completedAt": None,
},
]
}

monitor_jobs()

assert "workflow_id_in_progress" not in app.job_handler.queued
assert in_progress_job.status == "in_progress"
assert in_progress_job.in_progress_at is not None
send_queued_metric_mock.assert_called()

0 comments on commit d6a97d0

Please sign in to comment.