Skip to content

Commit

Permalink
perf: trying to tackle some errors in airflow tasks
Browse files Browse the repository at this point in the history
  • Loading branch information
Francesco Stablum committed Nov 23, 2021
1 parent f32c78b commit 3075aa5
Show file tree
Hide file tree
Showing 5 changed files with 15 additions and 5 deletions.
File renamed without changes.
4 changes: 4 additions & 0 deletions airflow/launch_airflow.sh
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@ screen -X -S airflow_webserver quit

sleep 1

ps aux | grep airflow | grep -v launch | awk '{print $2}' | xargs kill

sleep 1

screen -L -Logfile logs/airflow_scheduler_${TS}.log -S airflow_scheduler -d -m airflow scheduler
screen -L -Logfile logs/airflow_webserver_${TS}.log -S airflow_webserver -d -m airflow webserver

Expand Down
2 changes: 1 addition & 1 deletion models/dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ def train_model(rel,ti):
train_cmd = f"cd {project_root_dir}; python3 models/dspn_autoencoder.py {config.models_dag_config_name} --rel_name={rel.name}"

t_train_model = BashOperator(
task_id=f"train_dsp_model_{rel.name}",
task_id=f"train_dspn_model_{rel.name}",
depends_on_past=False,
bash_command=train_cmd,
start_date=in_days((rel_i-1)*days_interval),
Expand Down
4 changes: 2 additions & 2 deletions models/text_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,11 @@ def __init__(self):
full_path = os.path.join(directory,MODEL_FILENAME)
self.model = Doc2Vec.load(full_path)
def encode(self, text):
logging.info("encoding "+str(text))
#logging.info("encoding "+str(text))
tokens = nltk.word_tokenize(text)
tokens = [curr.lower() for curr in tokens]
ret = self.model.infer_vector(tokens)
logging.info("text encode returning "+str(ret))
#logging.info("text encode returning "+str(ret))
return ret

def decode(self, code):
Expand Down
10 changes: 8 additions & 2 deletions preprocess/dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import sklearn.model_selection
import os
import sys
import pymongo

# since airflow's DAG modules are imported elsewhere (likely ~/airflow)
# we have to explicitly add the path of the parent directory to this module to python's path
Expand All @@ -27,7 +28,7 @@
DATASTORE_ACTIVITY_URL = "https://datastore.iati.cloud/api/v2/activity"
DATASTORE_CODELIST_URL = "https://datastore.iati.cloud/api/codelists/{}/"
PAGE_SIZE = 1000
MAX_PAGES = 500
MAX_PAGES = 3


def extract_codelists(_rels):
Expand Down Expand Up @@ -200,8 +201,13 @@ def encode(rel, ti):
logging.info(document)
raise Exception(msg)
coll_out.delete_one({'activity_id': document['activity_id']}) # remove pre-existing set for this activity
coll_out.insert_one(document)

try:
coll_out.insert_one(document)
except pymongo.errors.DocumentTooLarge as e:
for field in rel.fields:
logging.info(f"{field.name} shape: {set_[field.name].shape}")
raise Exception(f"cannot insert document into relation {rel.name} because {str(e)}")

def arrayfy(rel, ti):
"""
Expand Down

0 comments on commit 3075aa5

Please sign in to comment.