From 9cbf90e38d854d066629e10eee270d089710c4d3 Mon Sep 17 00:00:00 2001 From: Bruno Antonellini Date: Wed, 7 Feb 2024 21:09:59 -0300 Subject: [PATCH] Adapt existent dags (#201) --- orchestrate/dags/daily_loan_run.py | 4 +-- orchestrate/dags/datacoves_sample_dag.py | 29 +++++++++---------- .../dags_yml_definitions/daily_loan_run.yml | 1 - 3 files changed, 15 insertions(+), 19 deletions(-) diff --git a/orchestrate/dags/daily_loan_run.py b/orchestrate/dags/daily_loan_run.py index 02425ce6..bb8c7681 100644 --- a/orchestrate/dags/daily_loan_run.py +++ b/orchestrate/dags/daily_loan_run.py @@ -1,8 +1,7 @@ import datetime from airflow.decorators import dag, task_group -from airflow.providers.airbyte.operators.airbyte import \ - AirbyteTriggerSyncOperator +from airflow.providers.airbyte.operators.airbyte import AirbyteTriggerSyncOperator from fivetran_provider.operators.fivetran import FivetranOperator from fivetran_provider.sensors.fivetran import FivetranSensor from operators.datacoves.bash import DatacovesBashOperator @@ -56,7 +55,6 @@ def extract_and_load_fivetran(): tg_extract_and_load_fivetran = extract_and_load_fivetran() extract_and_load_dlt = DatacovesBashOperator( task_id="extract_and_load_dlt", - activate_venv=True, tooltip="dlt Extract and Load", bash_command="python load/dlt/csv_to_snowflake/load_csv_data.py", ) diff --git a/orchestrate/dags/datacoves_sample_dag.py b/orchestrate/dags/datacoves_sample_dag.py index fcebc9fb..150f6e7c 100644 --- a/orchestrate/dags/datacoves_sample_dag.py +++ b/orchestrate/dags/datacoves_sample_dag.py @@ -1,14 +1,15 @@ """## Datacoves Bash Operator DAG This DAG is a sample using the Datacoves Airflow Operators""" -from pendulum import datetime from airflow import DAG from airflow.decorators import dag, task -from operators.datacoves.dbt import DatacovesDbtOperator from operators.datacoves.bash import DatacovesBashOperator +from operators.datacoves.dbt import DatacovesDbtOperator +from pendulum import datetime # Only here for reference, this is automatically activated by Datacoves Operator -DATACOVES_VIRTIAL_ENV = '/opt/datacoves/virtualenvs/main/bin/activate' +DATACOVES_VIRTIAL_ENV = "/opt/datacoves/virtualenvs/main/bin/activate" + @dag( default_args={ @@ -17,39 +18,37 @@ "email": "gomezn@example.com", "email_on_failure": True, }, - catchup=False, - tags = ["version_6"], - description = "Datacoves Sample dag", - + tags=["version_6"], + description="Datacoves Sample dag", # This is a regular CRON schedule. Helpful resources # https://cron-ai.vercel.app/ # https://crontab.guru/ - schedule_interval = "0 0 1 */12 *" + schedule_interval="0 0 1 */12 *", ) def datacoves_sample_dag(): # Calling dbt commands dbt_task = DatacovesDbtOperator( - task_id = "run_dbt_task", - bash_command = "dbt debug", - doc_md = """\ + task_id="run_dbt_task", + bash_command="dbt debug", + doc_md="""\ #### Task Documentation This task leveraged the DatacovesDbtOperator - """ + """, ) # This is calling an external Python file after activating the venv # use this instead of the Python Operator python_task = DatacovesBashOperator( - task_id = "run_python_script", - activate_venv=True, - bash_command = "python orchestrate/python_scripts/sample_script.py" + task_id="run_python_script", + bash_command="python orchestrate/python_scripts/sample_script.py", ) # Define task dependencies python_task.set_upstream([dbt_task]) + # Invoke Dag dag = datacoves_sample_dag() dag.doc_md = __doc__ diff --git a/orchestrate/dags_yml_definitions/daily_loan_run.yml b/orchestrate/dags_yml_definitions/daily_loan_run.yml index fe269de2..b79f6525 100644 --- a/orchestrate/dags_yml_definitions/daily_loan_run.yml +++ b/orchestrate/dags_yml_definitions/daily_loan_run.yml @@ -26,7 +26,6 @@ nodes: extract_and_load_dlt: type: task operator: operators.datacoves.bash.DatacovesBashOperator - activate_venv: true tooltip: "dlt Extract and Load" bash_command: "python load/dlt/csv_to_snowflake/load_csv_data.py"