Skip to content

Commit

Permalink
feat: airflow's DAG for vectorization of activities
Browse files Browse the repository at this point in the history
  • Loading branch information
Francesco Stablum committed Nov 30, 2021
1 parent bf95ae5 commit ae2e198
Show file tree
Hide file tree
Showing 3 changed files with 172 additions and 23 deletions.
73 changes: 54 additions & 19 deletions preprocess/dag.py → preprocess/download_sets_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,22 @@ def download(start, ti):
data = response.json()
large_mp.send(ti, data)

def persist_activity_data(page, ti):
db = persistency.mongo_db()
coll_out = db['activity_data']

# this large message is cleared in parse_sets_*,
# which is subsequent to persist_activity_data_*
data = large_mp.recv(ti, f"download_{page}")

def parse(page, ti):
for activity in data['response']['docs']:
activity_id = activity['iati_identifier']

coll_out.insert_one({
'activity_id': activity_id
})

def parse_sets(page, ti):
"""
Airflow task: parse downloaded page of activities
:param page: index of the downloaded page to be parsed
Expand Down Expand Up @@ -104,21 +118,27 @@ def parse(page, ti):
large_mp.send(ti, rels_vals)
large_mp.clear_recv(ti, f"download_{page}")

def clear(ti, rel):
def clear_activity_data(rel, ti):
db = persistency.mongo_db()

# remove all data previously stored for this relation
db['activity_data'].remove({})

def clear(rel, ti):
db = persistency.mongo_db()

# remove all data previously stored for this relation
db[rel].remove({})
db[rel.name].remove({})

def persist(page, ti):
def persist_sets(page, ti):
"""
Airflow tasks: store previosly parsed page of activities in the mondodb
:param page: index of the downloaded page to be stored
:param ti (str): task id
:return: None
"""
db = persistency.mongo_db()
data = large_mp.recv(ti, f'parse_{page}')
data = large_mp.recv(ti, f'parse_sets_{page}')

# FIXME: per-rel tasks
for rel, sets in data.items():
Expand All @@ -133,7 +153,7 @@ def persist(page, ti):
'set_': set_
})

large_mp.clear_recv(ti, f'parse_{page}')
large_mp.clear_recv(ti, f'parse_sets_{page}')


def codelists(ti):
Expand Down Expand Up @@ -342,16 +362,21 @@ def to_tsets(rel, ti):
op_kwargs={}
)

t_clear = {}
t_clear_activity_data = PythonOperator(
task_id=f"clear_activity_data",
python_callable=clear_activity_data,
start_date=days_ago(2)
)
t_clear_sets = {}
for rel in rels:
t_clear[rel] = PythonOperator(
task_id="clear",
t_clear_sets[rel.name] = PythonOperator(
task_id=f"clear_sets_{rel.name}",
python_callable=clear,
start_date=days_ago(2),
op_kwargs={'rel': rel}
)

t_persist = {}
t_persist_sets = {}
for page in pages:
start = page*config.download_page_size
t_download = PythonOperator(
Expand All @@ -361,22 +386,31 @@ def to_tsets(rel, ti):
op_kwargs={'start': start}
)

t_parse = PythonOperator(
task_id=f"parse_{page}",
python_callable=parse,
t_persist_activity_data = PythonOperator(
task_id=f"parse_sets_{page}",
python_callable=parse_sets,
start_date=days_ago(2),
op_kwargs={'page': page}
)

t_persist[page] = PythonOperator(
task_id=f"persist_{page}",
python_callable=persist,
t_parse_sets = PythonOperator(
task_id=f"parse_sets_{page}",
python_callable=parse_sets,
start_date=days_ago(2),
op_kwargs={'page': page}
)

t_persist_sets[page] = PythonOperator(
task_id=f"persist_sets_{page}",
python_callable=persist_sets,
start_date=days_ago(2),
op_kwargs={'page': page}
)
for rel in rels:
t_clear[rel] >> t_download
t_download>> t_parse >> t_persist[page]
t_clear_activity_data >> t_clear_sets[rel.name]
t_clear_sets[rel.name] >> t_download
t_download >> t_parse_sets >> t_persist_activity_data[page]
t_persist_activity_data[page] >> t_persist_sets[page]

for rel in rels:
t_to_npa = PythonOperator(
Expand Down Expand Up @@ -407,7 +441,8 @@ def to_tsets(rel, ti):
)

for page in pages:
t_persist[page] >> t_encode
t_persist_sets[page] >> t_encode
t_codelists >> t_encode >> t_arrayfy
t_arrayfy >> t_to_npa
t_arrayfy >> t_to_tsets

112 changes: 112 additions & 0 deletions preprocess/vectorize_activities_dag.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.utils.dates import days_ago
import datetime
import sys
import os
import collections

# since airflow's DAG modules are imported elsewhere (likely ~/airflow)
# we have to explicitly add the path of the parent directory to this module to python's path
path = os.path.abspath(os.path.dirname(os.path.abspath(__file__))+"/..")
sys.path = [path]+sys.path

from common import relspecs, persistency, utils
from preprocess import vectorize_activity

default_args = {
'retries': 2,
'retry_delay': datetime.timedelta(minutes=1),
'schedule_interval': None
}

rels = relspecs.rels.downloadable

def clear(ti):
"""
removes the previous yields of this DAG
:param ti:
:return:
"""
db = persistency.mongo_db()
db['activity_data_encoded'].remove()
db['activity_vectors'].remove()

def collect(ti):
"""
gets all the encoded sets and groups them by activity_id,
stores all this in an ad-hoc collection
:param ti:
:return:
"""
db = persistency.mongo_db()

# open all necessary collections
coll_sets = {}
coll_activity = db['activity_data']
coll_out = db['activity_data_encoded']
for rel in relspecs:
coll_sets[rel.name] = db[rel.name + "_encoded"]

activity_docs = coll_activity.find({}, {'activity_id':1})
activity_sets = collections.OrderedDict()
for activity_doc in activity_docs:
encoded_sets = collections.OrderedDict()
activity_id = activity_doc['activity_id']
for rel in relspecs:
encoded_sets[rel.name] = coll_sets[rel.name].find({'activity_id': activity_id})
new_document = {
'activity_id':activity_id,
'encoded_sets':encoded_sets
}
coll_out.insert_one(new_document)

def vectorize(ti):
"""
takes all relation sets belonging to an activity and generates
their latent code using the respective previously-trained Set AutoEncoder models
(this aspect is actually implemented in ActivityVectorizer).
Also, these codes are being stored in an ad-hoc collection.
:param ti:
:return:
"""
db = persistency.mongo_db()
coll_in = db['activity_data_encoded']
coll_out = db['activity_vectors']
activity_vectorizer = vectorize_activity.ActivityVectorizer()
for input_document in coll_in.find({}):
activity_vector = activity_vectorizer.process(input_document)
activity_vector_serialized = utils.serialize(activity_vector)
new_document = {
'activity_id': input_document['activity_id'],
'activity_vector': activity_vector_serialized
}
coll_out.insert_one(new_document)

with DAG(
'vectorize_activies',
description='Vectorize activities',
tags=['vectorize', 'preprocess', 'activities'],
default_args=default_args,
schedule_interval=None
) as dag:

t_clear = PythonOperator(
task_id="clear",
python_callable=clear,
start_date=days_ago(2)
)

t_collect = PythonOperator(
task_id="collect",
python_callable=collect,
start_date=days_ago(2)
)

t_vectorize = PythonOperator(
task_id="vectorize",
python_callable=vectorize,

)

t_clear >> t_collect >> t_vectorize
10 changes: 6 additions & 4 deletions preprocess/vectorize_activity.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,14 @@ class ActivityVectorizer(object):
"""
def __init__(self):
self.model_storage = models_storage.DSPNAEModelsStorage()
self.model_storage.load_all_models()

def process(self, activity):

def vectorize_activity(self, activity):
vectorized_fields = []
for rel in relspecs:
field_data = activity[rel.name]
vectorized_field = self.model_storage[rel.name].encoder(field_data)
for rel_name, encoded_set in activity.items():
encoded_set_npa = np.array(encoded_set)
vectorized_field = self.model_storage[rel_name].encoder(encoded_set_npa)
vectorized_fields.append(vectorized_field)
ret = np.hstack(vectorized_fields)
return ret

0 comments on commit ae2e198

Please sign in to comment.