Skip to content

Commit

Permalink
Implement OGC (un)register with airflow
Browse files Browse the repository at this point in the history
  • Loading branch information
Drew Meyers committed May 7, 2024
1 parent e8f283d commit 950f847
Showing 1 changed file with 65 additions and 65 deletions.
130 changes: 65 additions & 65 deletions app/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@

from __future__ import annotations

# import os
# import shutil
# import time
import os
import shutil
import time
import uuid
from contextlib import asynccontextmanager
from datetime import datetime
Expand Down Expand Up @@ -612,37 +612,37 @@ def register_process(
# # TODO should probably wrap in a try except that unregisters the DAG
# # TODO verify that the DAG does not already exist in the registered dags directory and does not exist in Airflow

# # Verify that the process_id corresponds with a DAG ID by filename in the DAG catalog
# dag_filename = process.id + ".py"
# os.path.isfile(os.path.join(settings.dag_catalog_directory, dag_filename))

# # Copy DAG from the DAG catalog PVC to deployed PVC
# shutil.copy2(
# os.path.join(settings.dag_catalog_directory, dag_filename),
# settings.registered_dags_directory,
# )

# ems_api_auth = HTTPBasicAuth(settings.ems_api_auth_username, settings.ems_api_auth_password)

# # Poll the EMS API to verify DAG existence
# timeout = 20 # Timeout after 20 seconds
# start_time = time.time()
# while time.time() - start_time < timeout:
# try:
# response = requests.get(f"{settings.ems_api_url}/dags/{process.id}", auth=ems_api_auth)
# response.raise_for_status()
# break # Exit the loop if the DAG is found
# except requests.exceptions.HTTPError:
# time.sleep(5) # Wait for a few seconds before retrying
# else:
# # If we exit the loop without breaking, it means we timed out
# raise HTTPException(
# status_code=fastapi_status.HTTP_504_GATEWAY_TIMEOUT,
# detail=f"Timeout waiting for DAG {process.id} to be available in Airflow."
# )

# # Unpause DAG
# pause_dag(settings.ems_api_url, process.id, ems_api_auth, pause=False)
# Verify that the process_id corresponds with a DAG ID by filename in the DAG catalog
dag_filename = process.id + ".py"
os.path.isfile(os.path.join(settings.dag_catalog_directory, dag_filename))

# Copy DAG from the DAG catalog PVC to deployed PVC
shutil.copy2(
os.path.join(settings.dag_catalog_directory, dag_filename),
settings.registered_dags_directory,
)

ems_api_auth = HTTPBasicAuth(settings.ems_api_auth_username, settings.ems_api_auth_password)

# Poll the EMS API to verify DAG existence
timeout = 20 # Timeout after 20 seconds
start_time = time.time()
while time.time() - start_time < timeout:
try:
response = requests.get(f"{settings.ems_api_url}/dags/{process.id}", auth=ems_api_auth)
response.raise_for_status()
break # Exit the loop if the DAG is found
except requests.exceptions.HTTPError:
time.sleep(5) # Wait for a few seconds before retrying
else:
# If we exit the loop without breaking, it means we timed out
raise HTTPException(
status_code=fastapi_status.HTTP_504_GATEWAY_TIMEOUT,
detail=f"Timeout waiting for DAG {process.id} to be available in Airflow.",
)

# Unpause DAG
pause_dag(settings.ems_api_url, process.id, ems_api_auth, pause=False)
return crud.create_process(db, process)


Expand All @@ -661,37 +661,37 @@ def unregister_process(
"""
# # TODO should first check existence of DAG in the registered DAGs directory and in Airflow
# # TODO should probably wrap in a try except that keeps it registered if anything fails
# ems_api_auth = HTTPBasicAuth(settings.ems_api_auth_username, settings.ems_api_auth_password)

# # Pause the DAG first
# pause_dag(settings.ems_api_url, process_id, ems_api_auth, pause=True)

# # List and stop active DAG runs and their task instances
# active_dag_runs = list_active_dag_runs(settings.ems_api_url, process_id, ems_api_auth)
# for dag_run in active_dag_runs:
# stop_dag_run(settings.ems_api_url, process_id, dag_run["dag_run_id"], ems_api_auth)
# stop_task_instances(settings.ems_api_url, process_id, dag_run["dag_run_id"], ems_api_auth)

# try:
# os.remove(os.path.join(settings.registered_dags_directory, process_id + ".py"))
# except OSError as e:
# # If it fails, inform the user.
# print("Error: %s - %s." % (e.filename, e.strerror))

# # # Poll for the removal of the DAG from the Airflow API
# # timeout = 20 # Timeout after 20 seconds
# # start_time = time.time()
# # while time.time() - start_time < timeout:
# # response = requests.get(f"{settings.ems_api_url}/dags/{process_id}", auth=ems_api_auth)
# # if response.status_code == 404:
# # break # Exit loop if DAG is confirmed removed
# # time.sleep(5) # Wait before retrying
# # else:
# # # If we timeout waiting for the DAG to disappear, raise an error
# # raise HTTPException(
# # status_code=fastapi_status.HTTP_504_GATEWAY_TIMEOUT,
# # detail="Timeout waiting for DAG to be fully removed from Airflow."
# # )
ems_api_auth = HTTPBasicAuth(settings.ems_api_auth_username, settings.ems_api_auth_password)

# Pause the DAG first
pause_dag(settings.ems_api_url, process_id, ems_api_auth, pause=True)

# List and stop active DAG runs and their task instances
active_dag_runs = list_active_dag_runs(settings.ems_api_url, process_id, ems_api_auth)
for dag_run in active_dag_runs:
stop_dag_run(settings.ems_api_url, process_id, dag_run["dag_run_id"], ems_api_auth)
stop_task_instances(settings.ems_api_url, process_id, dag_run["dag_run_id"], ems_api_auth)

try:
os.remove(os.path.join(settings.registered_dags_directory, process_id + ".py"))
except OSError as e:
# If it fails, inform the user.
print("Error: %s - %s." % (e.filename, e.strerror))

# Poll for the removal of the DAG from the Airflow API
timeout = 20 # Timeout after 20 seconds
start_time = time.time()
while time.time() - start_time < timeout:
response = requests.get(f"{settings.ems_api_url}/dags/{process_id}", auth=ems_api_auth)
if response.status_code == 404:
break # Exit loop if DAG is confirmed removed
time.sleep(5) # Wait before retrying
else:
# If we timeout waiting for the DAG to disappear, raise an error
raise HTTPException(
status_code=fastapi_status.HTTP_504_GATEWAY_TIMEOUT,
detail="Timeout waiting for DAG to be fully removed from Airflow.",
)

process = check_process_integrity(db, process_id, new_process=False)
crud.delete_process(db, process)
Expand Down

0 comments on commit 950f847

Please sign in to comment.