Skip to content

Commit

Permalink
Merge pull request #23 from midokura/Add-job-event-handler
Browse files Browse the repository at this point in the history
Add job event handler
  • Loading branch information
gerardsegarra authored Apr 29, 2024
2 parents e3f267b + 595ec09 commit b3bd49c
Show file tree
Hide file tree
Showing 8 changed files with 140 additions and 22 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,4 @@ jobs:
flake8
- name: Test with pytest
run: |
pytest --cov=src
pytest --cov=src tests/
12 changes: 8 additions & 4 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
repos:
- repo: https://github.com/pre-commit/pre-commit-hooks
rev: v4.4.0
rev: v4.6.0
hooks:
- id: trailing-whitespace
- id: check-docstring-first
Expand All @@ -12,16 +12,20 @@ repos:
- id: debug-statements
- id: end-of-file-fixer
- repo: https://github.com/myint/docformatter
rev: v1.5.1
rev: v1.7.5
hooks:
- id: docformatter
args: [--in-place]
- repo: https://github.com/asottile/pyupgrade
rev: v3.3.1
rev: v3.15.2
hooks:
- id: pyupgrade
args: [--py38-plus]
- repo: https://github.com/PyCQA/flake8
rev: 6.0.0
rev: 7.0.0
hooks:
- id: flake8
- repo: https://github.com/psf/black
rev: 24.4.2
hooks:
- id: black
19 changes: 10 additions & 9 deletions src/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

from const import GithubHeaders, LOGGING_CONFIG
from github import GithubJob
from jobs import JobEventsHandler
from utils import dict_to_logfmt

dictConfig(LOGGING_CONFIG)
Expand All @@ -23,9 +24,10 @@
if hasattr(logging, loglevel_flask):
loglevel_flask = getattr(logging, loglevel_flask)
log.setLevel(loglevel_flask)
logging.getLogger('apscheduler.executors.default').setLevel(logging.WARNING)
logging.getLogger("apscheduler.executors.default").setLevel(logging.WARNING)

jobs = dict()
job_handler = JobEventsHandler()


# check all calls are valid
Expand All @@ -50,7 +52,10 @@ def validate_origin_github():


def process_workflow_job():
job = GithubJob(request.get_json())
event = request.get_json()
job_handler.process_event(event)

job = GithubJob(event)

context_details = {
"action": job.action,
Expand All @@ -77,9 +82,7 @@ def process_workflow_job():
app.logger.error(f"Job {job.id} was in progress before being queued")
del jobs[job.id]
else:
time_to_start = (
job.time_start - job_requested.time_start
).seconds
time_to_start = (job.time_start - job_requested.time_start).seconds

context_details = {
**context_details,
Expand All @@ -100,9 +103,7 @@ def process_workflow_job():
app.logger.warning(f"Job {job.id} is {job.action} but not stored!")
time_to_finish = 0
else:
time_to_finish = (
job.time_completed - job.time_start
).seconds
time_to_finish = (job.time_completed - job.time_start).seconds
# delete from memory
del jobs[job.id]

Expand All @@ -124,7 +125,7 @@ def process_workflow_job():
return True


@scheduler.task('interval', id='monitor_queued', seconds=30)
@scheduler.task("interval", id="monitor_queued", seconds=30)
def monitor_queued_jobs():
"""Return the job that has been queued and not starting for long time."""
app.logger.debug("Starting monitor_queued_jobs")
Expand Down
2 changes: 1 addition & 1 deletion src/github.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from utils import parse_datetime


class GithubJob():
class GithubJob:
def __init__(self, json_body: str):
self.data = json_body

Expand Down
57 changes: 57 additions & 0 deletions src/jobs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
from github import GithubJob


class Job:
def __init__(self, github_job: GithubJob) -> None:
self.github_job = github_job


class JobEventsHandler:
def __init__(self) -> None:
self.queued = dict()
self.in_progress = dict()

def process_event(self, event: dict):
status = event["action"]

if status == "queued":
self._process_queued_event(event)

elif status == "in_progress":
self._process_in_progress_event(event)

elif status == "completed":
self._process_completed_event(event)

else:
pass

def _get_event_job_id(self, event: dict):
return event["workflow_job"]["id"]

def _create_job(self, githubJob: GithubJob) -> Job:
return Job(github_job=githubJob)

def _process_queued_event(self, event: dict):
job = self._create_job(GithubJob(event))
self.queued[self._get_event_job_id(event)] = job

def _process_in_progress_event(self, event: dict):
job_id = self._get_event_job_id(event)
job = self.queued.pop(job_id, None)

if not job:
job = self._create_job(GithubJob(event))
else:
# Update github job event from job
job.github_job = GithubJob(event)

self.in_progress[job_id] = job

# TODO send final time in queue

def _process_completed_event(self, event: dict):
job_id = self._get_event_job_id(event)
self.in_progress.pop(job_id, None)

# TODO send final time in progress
10 changes: 5 additions & 5 deletions tests/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
Flask
Flask-APScheduler==1.13.1
pytest
pytest-cov
flake8
-e .
pytest==8.2.0
pytest-cov==5.0.0
flake8==7.0.0
black==24.4.2
56 changes: 56 additions & 0 deletions tests/test_jobs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
import pytest

from unittest.mock import Mock

from jobs import JobEventsHandler


@pytest.fixture
def new_job_event():
return {"workflow_job": {"id": "workflow_id"}, "action": "queued"}


@pytest.fixture
def in_progress_job_event():
return {"workflow_job": {"id": "workflow_id"}, "action": "in_progress"}


@pytest.fixture
def completed_job_event():
return {"workflow_job": {"id": "workflow_id"}, "action": "completed"}


def test_new_job(new_job_event):
handler = JobEventsHandler()

handler.process_event(new_job_event)

assert handler.queued.get("workflow_id")


def test_in_progress_job(in_progress_job_event):
handler = JobEventsHandler()
job = Mock()
handler.queued["workflow_id"] = job

handler.process_event(in_progress_job_event)

assert not handler.queued.get("workflow_id")
assert handler.in_progress.get("workflow_id") == job


def test_unprocessed_in_progress_job(in_progress_job_event):
handler = JobEventsHandler()
handler.process_event(in_progress_job_event)

assert handler.in_progress.get("workflow_id")


def test_completed_job(completed_job_event):
handler = JobEventsHandler()
handler.in_progress["workflow_id"] = Mock()

handler.process_event(completed_job_event)

assert not handler.queued.get("workflow_id")
assert not handler.in_progress.get("workflow_id")
4 changes: 2 additions & 2 deletions tests/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ def test_method_not_allowed(client):


def test_headers_not_correct(client, caplog):
response = client.post(HOOK_ENDPOINT, headers={'User-Agent': 'foo'})
response = client.post(HOOK_ENDPOINT, headers={"User-Agent": "foo"})
assert response.status_code == 401
assert caplog.messages == [
"User-Agent is foo",
Expand Down Expand Up @@ -197,5 +197,5 @@ def test_line_break_in_job_name(client, caplog):
assert caplog.messages == [
'action=queued repository=foo/foo branch=new-feature-branch job_id=6 run_id=10 job_name="Build and push '
'images (actions-runner-dind, NPROC=2 , runner-images/devops/actions-runner-dind, l..."'
' workflow=CI requestor=testerbot'
" workflow=CI requestor=testerbot"
]

0 comments on commit b3bd49c

Please sign in to comment.