Skip to content

Commit

Permalink
Make (un)deploy more fault tolerant
Browse files Browse the repository at this point in the history
  • Loading branch information
Drew Meyers committed May 14, 2024
1 parent 581709b commit 84cdee2
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 30 deletions.
6 changes: 3 additions & 3 deletions app/config.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
from pydantic import HttpUrl
from pydantic import HttpUrl, SecretStr
from pydantic_settings import BaseSettings


class Settings(BaseSettings):
DB_URL: str = "sqlite:///:memory:"
REDIS_HOST: str = "http://localhost"
REDIS_PORT: str = "6379"
REDIS_PORT: int = 6379
EMS_API_URL: HttpUrl = "http://localhost:8080/api/v1"
EMS_API_AUTH_USERNAME: str = "username"
EMS_API_AUTH_PASSWORD: str = "password"
EMS_API_AUTH_PASSWORD: SecretStr = "password"
DAG_CATALOG_DIRECTORY: str = "/dag-catalog"
DEPLOYED_DAGS_DIRECTORY: str = "/deployed-dags"
57 changes: 30 additions & 27 deletions app/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,6 @@
from functools import lru_cache

import requests

# from airflow_client.client.api import dag_api
from airflow_client.client.api_client import ApiClient
from airflow_client.client.configuration import Configuration

# from airflow_client.client.model.dag import DAG
# from airflow_client.client.model.error import Error
from fastapi import BackgroundTasks, Body, Depends, FastAPI, HTTPException
from fastapi import status as fastapi_status
from requests.auth import HTTPBasicAuth
Expand Down Expand Up @@ -69,15 +62,15 @@ def get_redis_locking_client():
return RedisLock(host=settings.REDIS_HOST, port=settings.REDIS_PORT)


@lru_cache()
def get_ems_client():
settings = get_settings()
configuration = Configuration(
host=settings.EMS_API_URL,
username=settings.EMS_API_AUTH_USERNAME,
password=settings.EMS_API_AUTH_PASSWORD,
)
return ApiClient(configuration)
# @lru_cache()
# def get_ems_client():
# settings = get_settings()
# configuration = Configuration(
# host=settings.EMS_API_URL,
# username=settings.EMS_API_AUTH_USERNAME,
# password=settings.EMS_API_AUTH_PASSWORD.get_secret_value(),
# )
# return ApiClient(configuration)


def get_db():
Expand All @@ -92,6 +85,8 @@ def check_process_integrity(db: Session, process_id: str, new_process: bool):
process = None
try:
process = crud.get_process(db, process_id)
# TODO If not a new process, check if deployment_status is complete
# If not, raise an exception that it it's deployment status is not complete
if new_process and process is not None:
raise ValueError
except NoResultFound:
Expand Down Expand Up @@ -273,7 +268,6 @@ def deploy_process(
background_tasks: BackgroundTasks,
settings: Annotated[config.Settings, Depends(get_settings)],
redis_locking_client: Annotated[RedisLock, Depends(get_redis_locking_client)],
ems_client: Annotated[ApiClient, Depends(get_ems_client)],
db: Session = Depends(get_db),
process: Process = Body(...),
):
Expand All @@ -282,23 +276,21 @@ def deploy_process(
**Note:** This is not an officially supported endpoint in the OGC Processes specification.
"""
# api_instance = dag_api.DAGApi(ems_client)
# api_response = api_instance.get_dags()
# print(api_response)
check_process_integrity(db, process.id, new_process=True)

with redis_locking_client.lock("deploy_process_" + process.id): # as lock:
pass

# Acquire lock
# Create process in DB w/ deployment_status field "deploying"
# Check if DAG exists in Airflow
# Check if file exists in DAG folder
# Check if file exists in DAG catalog
# Copy file to DAG folder
# Poll Airflow until DAG appears
# Unpause DAG
# Check if DAG is_active is True
# Create process in DB
# Update process in DB w/ deployment_status field "deployed"
# Release lock

# Verify that the process_id corresponds with a DAG ID by filename in the DAG catalog
Expand Down Expand Up @@ -332,7 +324,9 @@ def deploy_process(
)

# Poll the EMS API to verify DAG existence
ems_api_auth = HTTPBasicAuth(settings.EMS_API_AUTH_USERNAME, settings.EMS_API_AUTH_PASSWORD)
ems_api_auth = HTTPBasicAuth(
settings.EMS_API_AUTH_USERNAME, settings.EMS_API_AUTH_PASSWORD.get_secret_value()
)
timeout = 20
start_time = time.time()
while time.time() - start_time < timeout:
Expand Down Expand Up @@ -361,7 +355,6 @@ def undeploy_process(
background_tasks: BackgroundTasks,
settings: Annotated[config.Settings, Depends(get_settings)],
redis_locking_client: Annotated[RedisLock, Depends(get_redis_locking_client)],
ems_client: Annotated[ApiClient, Depends(get_ems_client)],
process_id: str,
db: Session = Depends(get_db),
force: bool = False,
Expand All @@ -377,6 +370,7 @@ def undeploy_process(
pass

# Acquire lock
# Update process in DB w/ deployment_status field "undeploying"
# Check if DAG exists in Airflow
# Pause the DAG
# Stop all DAG runs and tasks
Expand All @@ -385,7 +379,9 @@ def undeploy_process(
# Delete process from DB
# Release lock

ems_api_auth = HTTPBasicAuth(settings.EMS_API_AUTH_USERNAME, settings.EMS_API_AUTH_PASSWORD)
ems_api_auth = HTTPBasicAuth(
settings.EMS_API_AUTH_USERNAME, settings.EMS_API_AUTH_PASSWORD.get_secret_value()
)
# response = requests.get(f"{settings.EMS_API_URL}/dags/{process_id}", auth=ems_api_auth)
# if response.status_code == 200:
# return True # DAG exists
Expand Down Expand Up @@ -493,7 +489,9 @@ def execute(
For more information, see [Section 7.11](https://docs.ogc.org/is/18-062r2/18-062r2.html#sc_create_job).
"""
check_process_integrity(db, process_id, new_process=False)
ems_api_auth = HTTPBasicAuth(settings.EMS_API_AUTH_USERNAME, settings.EMS_API_AUTH_PASSWORD)
ems_api_auth = HTTPBasicAuth(
settings.EMS_API_AUTH_USERNAME, settings.EMS_API_AUTH_PASSWORD.get_secret_value()
)
try:
response = requests.get(f"{settings.EMS_API_URL}/dags/{process_id}", auth=ems_api_auth)
response.raise_for_status()
Expand Down Expand Up @@ -545,7 +543,9 @@ def status(
"""
job = check_job_integrity(db, job_id, new_job=False)
job = StatusInfo.model_validate(job)
ems_api_auth = HTTPBasicAuth(settings.EMS_API_AUTH_USERNAME, settings.EMS_API_AUTH_PASSWORD)
ems_api_auth = HTTPBasicAuth(
settings.EMS_API_AUTH_USERNAME, settings.EMS_API_AUTH_PASSWORD.get_secret_value()
)
try:
response = requests.get(
f"{settings.EMS_API_URL}/dags/{job.processID}/dagRuns/{job.jobID}",
Expand Down Expand Up @@ -594,8 +594,11 @@ def dismiss(
For more information, see [Section 13](https://docs.ogc.org/is/18-062r2/18-062r2.html#Dismiss).
"""
job = check_job_integrity(db, job_id, new_job=False)
ems_api_auth = HTTPBasicAuth(settings.EMS_API_AUTH_USERNAME, settings.EMS_API_AUTH_PASSWORD)
ems_api_auth = HTTPBasicAuth(
settings.EMS_API_AUTH_USERNAME, settings.EMS_API_AUTH_PASSWORD.get_secret_value()
)
try:
# TODO also need to cancel all task instances
response = requests.delete(
f"{settings.EMS_API_URL}/dags/{job.processID}/dagRuns/{job.jobID}",
auth=ems_api_auth,
Expand Down

0 comments on commit 84cdee2

Please sign in to comment.