forked from opensafely-core/job-runner
-
Notifications
You must be signed in to change notification settings - Fork 0
/
sync.py
138 lines (109 loc) · 3.81 KB
/
sync.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
"""
Script which polls the job-server endpoint for active JobRequests and POSTs
back any associated Jobs.
"""
import logging
import sys
import time
import requests
from .log_utils import configure_logging, set_log_context
from . import config
from .create_or_update_jobs import create_or_update_jobs
from .database import find_where
from .models import JobRequest, Job
session = requests.Session()
log = logging.getLogger(__name__)
class SyncAPIError(Exception):
pass
def main():
log.info(
f"Polling for JobRequests at: "
f"{config.JOB_SERVER_ENDPOINT.rstrip('/')}/job-requests/"
)
while True:
sync()
time.sleep(config.POLL_INTERVAL)
def sync():
response = api_get(
"job-requests",
# We're deliberately not paginating here on the assumption that the set
# of active jobs is always going to be small enough that we can fetch
# them in a single request and we don't need the extra complexity
params={"backend": config.BACKEND},
)
job_requests = [job_request_from_remote_format(i) for i in response["results"]]
# Bail early if there's nothing to do
if not job_requests:
return
job_request_ids = [i.id for i in job_requests]
for job_request in job_requests:
with set_log_context(job_request=job_request):
create_or_update_jobs(job_request)
jobs = find_where(Job, job_request_id__in=job_request_ids)
jobs_data = [job_to_remote_format(i) for i in jobs]
log.debug(f"Syncing {len(jobs_data)} jobs back to job-server")
api_post("jobs", json=jobs_data)
def api_get(*args, **kwargs):
return api_request("get", *args, **kwargs)
def api_post(*args, **kwargs):
return api_request("post", *args, **kwargs)
def api_request(method, path, *args, **kwargs):
url = "{}/{}/".format(config.JOB_SERVER_ENDPOINT.rstrip("/"), path.strip("/"))
# We could do this just once on import, but it makes changing the config in
# tests more fiddly
session.headers = {"Authorization": config.JOB_SERVER_TOKEN}
response = session.request(method, url, *args, **kwargs)
log.debug(
"%s %s %s post_data=%s %s"
% (
method.upper(),
response.status_code,
url,
kwargs.get("json", '""'),
response.text,
)
)
try:
response.raise_for_status()
except Exception as e:
raise SyncAPIError(e) from e
return response.json()
def job_request_from_remote_format(job_request):
"""
Convert a JobRequest as received from the job-server into our own internal
representation
"""
return JobRequest(
id=str(job_request["identifier"]),
repo_url=job_request["workspace"]["repo"],
commit=job_request.get("sha"),
branch=job_request["workspace"]["branch"],
requested_actions=job_request["requested_actions"],
workspace=job_request["workspace"]["name"],
database_name=job_request["workspace"]["db"],
force_run_dependencies=job_request["force_run_dependencies"],
original=job_request,
)
def job_to_remote_format(job):
"""
Convert our internal representation of a Job into whatever format the
job-server expects
"""
return {
"identifier": job.id,
"job_request_id": job.job_request_id,
"action": job.action,
"status": job.state.value,
"status_code": job.status_code.value if job.status_code else "",
"status_message": job.status_message or "",
"created_at": job.created_at_isoformat,
"updated_at": job.updated_at_isoformat,
"started_at": job.started_at_isoformat,
"completed_at": job.completed_at_isoformat,
}
if __name__ == "__main__":
configure_logging()
try:
main()
except KeyboardInterrupt:
sys.exit(0)