Skip to content

Commit

Permalink
Send queued time after receiving events (#25)
Browse files Browse the repository at this point in the history
* Send queued time after receiving events

* Add datadog dependency
  • Loading branch information
gerardsegarra authored Apr 29, 2024
1 parent afbb9a6 commit 36750c3
Show file tree
Hide file tree
Showing 5 changed files with 81 additions and 7 deletions.
2 changes: 1 addition & 1 deletion pytest.ini
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
[pytest]
testpaths = tests/tests.py
testpaths = tests/
pythonpath = src
1 change: 1 addition & 0 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ packages = find:
install_requires =
Flask>=2.2,<3
Flask-APScheduler==1.13.1
datadog==0.49.1

[flake8]
max-line-length = 120
Expand Down
20 changes: 18 additions & 2 deletions src/jobs.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import metrics

from datetime import datetime
from github import GithubJob

Expand All @@ -15,6 +17,14 @@ def __init__(self, github_job: GithubJob) -> None:

self.node_id = self.github_job.node_id

@property
def seconds_in_queue(self):
if self.status == "queued":
return (datetime.now() - self.queued_at).total_seconds()

if self.status == "in_progress" or self.status == "completed":
return (self.in_progress_at - self.queued_at).total_seconds()

def _update_attributes(self, github_job: GithubJob):
self.github_job: GithubJob = github_job
self.status = github_job.action
Expand Down Expand Up @@ -71,11 +81,17 @@ def _process_in_progress_event(self, event: dict):
job = self._create_job(GithubJob(event))
else:
job.update(GithubJob(event))
metrics.send_queued_job(
seconds_in_queue=job.seconds_in_queue,
job_name=job.github_job.job_name,
repository=job.github_job.repository,
runner=job.github_job.runner_name,
run_id=job.github_job.run_id,
public=job.github_job.runner_public,
)

self.in_progress[job_id] = job

# TODO send final time in queue

def _process_completed_event(self, event: dict):
job_id = self._get_event_job_id(event)
self.in_progress.pop(job_id, None)
Expand Down
29 changes: 29 additions & 0 deletions src/metrics.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
from datadog import initialize, statsd

options = {
"statsd_host": "datadog-agent.datadog.svc.cluster.local",
"statsd_port": 8125,
}

initialize(**options)


def send_queued_job(
seconds_in_queue: int,
job_name: str,
repository: str,
runner: str,
run_id: str,
public: bool,
):
statsd.histogram(
"midokura.github_runners.jobs.seconds_in_queue.histogram",
seconds_in_queue,
tags=[
f"job:{job_name}",
f"repository:{repository}",
f"runner_name:{runner}",
f"run_id:run-{run_id}", # "run-" added to group by run-id in DD
f"public:{public}",
],
)
36 changes: 32 additions & 4 deletions tests/test_jobs.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import pytest

from unittest.mock import Mock
from unittest.mock import Mock, patch

from datetime import datetime
from github import GithubJob
Expand All @@ -14,10 +14,15 @@ def new_job_event():
return {
"workflow_job": {
"id": "workflow_id",
"name": "workflow name",
"run_id": 1234567890,
"started_at": "2024-04-29T12:43:16Z",
"completed_at": None,
"node_id": "CR_kwDOHC6jj88AAAAFqGXrPQ",
"runner_name": "test runner",
"runner_group_name": "Runner Group Test",
},
"repository": {"full_name": "test/repo"},
"action": "queued",
}

Expand All @@ -27,10 +32,15 @@ def in_progress_job_event():
return {
"workflow_job": {
"id": "workflow_id",
"name": "workflow name",
"run_id": 1234567890,
"started_at": "2024-04-29T12:43:32Z",
"completed_at": None,
"node_id": "CR_kwDOHC6jj88AAAAFqGXrPQ",
"runner_name": "test runner",
"runner_group_name": "Runner Group Test",
},
"repository": {"full_name": "test/repo"},
"action": "in_progress",
}

Expand All @@ -40,10 +50,15 @@ def completed_job_event():
return {
"workflow_job": {
"id": "workflow_id",
"name": "workflow name",
"run_id": 1234567890,
"started_at": "2024-04-29T12:43:32Z",
"completed_at": "2024-04-29T12:45:09Z",
"node_id": "CR_kwDOHC6jj88AAAAFqGXrPQ",
"runner_name": "test runner",
"runner_group_name": "Runner Group Test",
},
"repository": {"full_name": "test/repo"},
"action": "completed",
}

Expand All @@ -56,22 +71,35 @@ def test_new_job_event(new_job_event):
assert handler.queued.get("workflow_id")


def test_in_progress_job_event(in_progress_job_event):
@patch("metrics.send_queued_job")
def test_in_progress_job_event(
send_queued_job_mock, new_job_event, in_progress_job_event
):
handler = JobEventsHandler()
job = Mock()
job = Job(GithubJob(new_job_event))
handler.queued["workflow_id"] = job

handler.process_event(in_progress_job_event)

assert not handler.queued.get("workflow_id")
assert handler.in_progress.get("workflow_id") == job
send_queued_job_mock.assert_called_with(
seconds_in_queue=16.0,
job_name="workflow name",
repository="test/repo",
runner="test runner",
run_id=1234567890,
public=False,
)


def test_unprocessed_in_progress_job_event(in_progress_job_event):
@patch("metrics.send_queued_job")
def test_unprocessed_in_progress_job_event(send_queued_job_mock, in_progress_job_event):
handler = JobEventsHandler()
handler.process_event(in_progress_job_event)

assert handler.in_progress.get("workflow_id")
send_queued_job_mock.assert_not_called()


def test_completed_job_event(completed_job_event):
Expand Down

0 comments on commit 36750c3

Please sign in to comment.