From e95f34396692e62ec59b62eebf4bd9426539cff5 Mon Sep 17 00:00:00 2001 From: Francesco Stablum Date: Tue, 30 Nov 2021 15:23:15 +0100 Subject: [PATCH] perf: added indices to mongodb collections; using instead of deprecated --- preprocess/download_sets_dag.py | 37 ++++++++++++++++++-------- preprocess/vectorize_activities_dag.py | 9 ++++--- 2 files changed, 31 insertions(+), 15 deletions(-) diff --git a/preprocess/download_sets_dag.py b/preprocess/download_sets_dag.py index c52ac36..123ddcc 100644 --- a/preprocess/download_sets_dag.py +++ b/preprocess/download_sets_dag.py @@ -120,15 +120,19 @@ def parse_sets(page, ti): def clear_activity_data(rel, ti): db = persistency.mongo_db() + coll = db['activity_data'] # remove all data previously stored for this relation - db['activity_data'].remove({}) + coll.delete_many({}) + coll.create_index([("activity_id", -1)]) def clear(rel, ti): db = persistency.mongo_db() + coll = db[rel.name] # remove all data previously stored for this relation - db[rel.name].remove({}) + coll.delete_many({}) + coll.create_index([("activity_id", -1)]) def persist_sets(page, ti): """ @@ -141,14 +145,14 @@ def persist_sets(page, ti): data = large_mp.recv(ti, f'parse_sets_{page}') # FIXME: per-rel tasks - for rel, sets in data.items(): + for rel_name, sets in data.items(): for activity_id, set_ in sets.items(): - # remove previously stored activity, if present FIXME: deprecated? - db[rel].delete_one({'activity_id': activity_id}) # remove pre-existing set for this activity + # remove pre-existing set for this activity + db[rel_name].delete_one({'activity_id': activity_id}) - db[rel].insert_one({ + db[rel_name].insert_one({ 'activity_id': activity_id, 'set_': set_ }) @@ -163,6 +167,8 @@ def codelists(ti): :return: None """ db = persistency.mongo_db() + coll_out = db['codelists'] + coll_out.create_index([("name", -1)]) for codelist_name in extract_codelists(rels): url = DATASTORE_CODELIST_URL.format(codelist_name) params = {'format': 'json'} @@ -171,8 +177,8 @@ def codelists(ti): lst = [] for curr in data: lst.append(curr['code']) - db['codelists'].delete_many({'name': codelist_name}) - db['codelists'].insert({ + coll_out.delete_many({'name': codelist_name}) + coll_out.insert({ 'name': codelist_name, 'codelist': lst }) @@ -203,7 +209,8 @@ def encode(rel, ti): coll_out = db[rel.name + "_encoded"] # remove existing data in the collection - coll_out.remove({}) + coll_out.delete_many({}) + coll_out.create_index([("activity_id", -1)]) for document in coll_in.find(no_cursor_timeout=True): document = dict(document) # copy @@ -250,6 +257,7 @@ def arrayfy(rel, ti): coll_in = db[rel.name+"_encoded"] coll_out = db[rel.name+"_arrayfied"] coll_out.delete_many({}) # empty the collection + coll_out.create_index([("activity_id", -1)]) for set_index, document in enumerate(coll_in.find()): set_npas = [] set_ = document['set_'] @@ -279,6 +287,9 @@ def to_npa(rel, ti): db = persistency.mongo_db() coll_in = db[rel.name+"_arrayfied"] coll_out = db['npas'] + coll_out.create_index([("rel", -1)]) + coll_out.create_index([("creation_date", -1)]) + coll_out.create_index([("npa_file_id", -1)]) rel_npas = [] for document in coll_in.find(): set_npa = utils.deserialize(document['npa']) @@ -286,7 +297,7 @@ def to_npa(rel, ti): set_index_col = np.ones((set_npa.shape[0], 1))*set_index rel_npas.append(np.hstack([set_index_col, set_npa])) rel_npa = np.vstack(rel_npas) - coll_out.remove({'rel': rel.name}) + coll_out.delete_many({'rel': rel.name}) coll_out.insert_one({ 'rel': rel.name, @@ -311,6 +322,10 @@ def to_tsets(rel, ti): set_indices = list(set(map(lambda document: document['set_index'], set_indices_results))) train_indices, test_indices = sklearn.model_selection.train_test_split(set_indices, train_size=0.90) coll_out = db['npas_tsets'] + coll_out.create_index([("rel", -1)]) + coll_out.create_index([("creation_date", -1)]) + coll_out.create_index([("train_npa_file_id", -1)]) + coll_out.create_index([("test_npa_file_id", -1)]) train_npas = [] test_npas = [] for document in coll_in.find(): @@ -326,7 +341,7 @@ def to_tsets(rel, ti): train_npa = np.vstack(train_npas) test_npa = np.vstack(test_npas) - coll_out.remove({'rel': rel.name}) + coll_out.delete_many({'rel': rel.name}) coll_out.insert_one({ 'rel': rel.name, 'creation_time': utils.strnow_iso(), diff --git a/preprocess/vectorize_activities_dag.py b/preprocess/vectorize_activities_dag.py index 33ca90c..199da99 100644 --- a/preprocess/vectorize_activities_dag.py +++ b/preprocess/vectorize_activities_dag.py @@ -29,8 +29,10 @@ def clear(ti): :return: """ db = persistency.mongo_db() - db['activity_data_encoded'].remove() - db['activity_vectors'].remove() + db['activity_data_encoded'].delete_many({}) + db['activity_data_encoded'].create_index([("activity_id", -1)]) + db['activity_vectors'].delete_many({}) + db['activity_vectors'].create_index([("activity_id", -1)]) def collect(ti): """ @@ -49,7 +51,6 @@ def collect(ti): coll_sets[rel.name] = db[rel.name + "_encoded"] activity_docs = coll_activity.find({}, {'activity_id':1}) - activity_sets = collections.OrderedDict() for activity_doc in activity_docs: encoded_sets = collections.OrderedDict() activity_id = activity_doc['activity_id'] @@ -106,7 +107,7 @@ def vectorize(ti): t_vectorize = PythonOperator( task_id="vectorize", python_callable=vectorize, - + start_date=days_ago(2) ) t_clear >> t_collect >> t_vectorize