diff --git a/pynautobot/core/endpoint.py b/pynautobot/core/endpoint.py index df43a11..14b4109 100644 --- a/pynautobot/core/endpoint.py +++ b/pynautobot/core/endpoint.py @@ -14,6 +14,8 @@ # # This file has been modified by NetworktoCode, LLC. +from time import sleep + from typing import List, Dict, Any from uuid import UUID from pynautobot.core.query import Request, RequestError @@ -675,6 +677,72 @@ def run(self, *args, api_version=None, **kwargs): return response_loader(req, self.return_obj, self) + def run_and_wait(self, *args, api_version=None, interval=5, max_rechecks=50, **kwargs): + """Runs a job and waits for the response. + + Args: + *args (str, optional): Freeform search string that's + accepted on given endpoint. + **kwargs (str, optional): Any search argument the + endpoint accepts can be added as a keyword arg. + api_version (str, optional): Override default or globally-set + Nautobot REST API version for this single request. + interval (int, optional): Time in seconds to wait between + checking job results. + max_rechecks (int, optional): Number of times to check job result + before exiting the method. + + Returns: + obj: Job details: job_result object uuid found at `obj.result.id`. + + Examples: + To run a job for verifying hostnames: + >>> nb.extras.jobs.run_and_wait( + class_path="local/data_quality/VerifyHostnames", + data={"hostname_regex": ".*"}, + commit=True, + interval=5, + max_rechecks=10, + ) + """ + if max_rechecks <= 0: + raise ValueError("Attribute `max_rechecks` must be a postive integer to prevent recursive loops.") + + job_obj = self.run(*args, api_version=api_version, **kwargs) + job_result_id = job_obj.job_result.id + job_result_url = f"{self.base_url}/extras/job-results/{job_result_id}/" + + # Job statuses which indicate a job not yet started or in progress. + # If the job status is not in this list, it will consider the job complete and return the job result object. + active_job_statuses = ( + "RECEIVED", + "PENDING", + "STARTED", + "RETRY", + ) + + interval_counter = 0 + + while interval_counter <= max_rechecks: + # Sleep for interval and increment counter + sleep(interval) + interval_counter += 1 + + req = Request( + base=job_result_url, + token=self.token, + http_session=self.api.http_session, + api_version=api_version, + ).get() + + result = req.get("job_result", {}) + status = result.get("status", {}).get("value") + + if status not in active_job_statuses: + return response_loader(req, self.return_obj, self) + + raise ValueError("Did not receieve completed job result for job.") + class GraphqlEndpoint(Endpoint): """Extend Endpoint class to support run method for graphql queries.""" diff --git a/tests/unit/test_endpoint.py b/tests/unit/test_endpoint.py index abacdab..b4fb437 100644 --- a/tests/unit/test_endpoint.py +++ b/tests/unit/test_endpoint.py @@ -258,6 +258,42 @@ def test_run_greater_v1_3(self): test = test_obj.run(job_id="test") self.assertEqual(len(test), 1) + # Run and Wait Tests + # ================================================= + + @patch("pynautobot.core.query.Request.get", return_value=Mock()) + @patch("pynautobot.core.query.Request.post", return_value=Mock()) + def test_run_and_wait_less_v1_3(self, mock_post, mock_get): + api = Mock(base_url="http://localhost:8000/api", api_version="1.2") + app = Mock(name="test") + test_obj = JobsEndpoint(api, app, "test") + mock_post.return_value = {"schedule": {"id": 123}, "job_result": {"id": 123, "status": {"value": "PENDING"}}} + mock_get.return_value = {"schedule": {"id": 123}, "job_result": {"id": 123, "status": {"value": "SUCCESS"}}} + test = test_obj.run_and_wait(class_path="test", interval=1, max_rechecks=5) + self.assertEqual(test.job_result.status.value, "SUCCESS") + + @patch("pynautobot.core.query.Request.get", return_value=Mock()) + @patch("pynautobot.core.query.Request.post", return_value=Mock()) + def test_run_and_wait_greater_v1_3(self, mock_post, mock_get): + api = Mock(base_url="http://localhost:8000/api", api_version="1.3") + app = Mock(name="test") + test_obj = JobsEndpoint(api, app, "test") + mock_post.return_value = {"schedule": {"id": 123}, "job_result": {"id": 123, "status": {"value": "PENDING"}}} + mock_get.return_value = {"schedule": {"id": 123}, "job_result": {"id": 123, "status": {"value": "SUCCESS"}}} + test = test_obj.run_and_wait(job_id="test", interval=1, max_rechecks=5) + self.assertEqual(test.job_result.status.value, "SUCCESS") + + @patch("pynautobot.core.query.Request.get", return_value=Mock()) + @patch("pynautobot.core.query.Request.post", return_value=Mock()) + def test_run_and_wait_no_complete(self, mock_post, mock_get): + api = Mock(base_url="http://localhost:8000/api", api_version="1.3") + app = Mock(name="test") + test_obj = JobsEndpoint(api, app, "test") + mock_post.return_value = {"schedule": {"id": 123}, "job_result": {"id": 123, "status": {"value": "PENDING"}}} + mock_get.return_value = {"schedule": {"id": 123}, "job_result": {"id": 123, "status": {"value": "PENDING"}}} + with self.assertRaises(ValueError): + test_obj.run_and_wait(job_id="test", interval=1, max_rechecks=2) + class GraphqlEndPointTestCase(unittest.TestCase): def test_invalid_arg(self):