Skip to content

Commit

Permalink
fix: clearing some of the mongodb database before the other tasks; re…
Browse files Browse the repository at this point in the history
…moved some prints
  • Loading branch information
Francesco Stablum committed Nov 29, 2021
1 parent b44db9d commit bf95ae5
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 10 deletions.
4 changes: 0 additions & 4 deletions models/models_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,21 +71,17 @@ def filenames(self,rel,extension):
"trained_models",
f"DSPNAE_{rel.name}*.{extension}"
)
print("filenames_glob",filenames_glob)
ret = {}
filenames = glob.glob(filenames_glob)
for curr in filenames:
m = re.match(f'.*-v(\d+).{extension}', curr)
if m:
# has version number
print(curr, m.groups())
version = int(m.groups()[0])
else:
# no version number in filename: this was the first
print(f'filename {curr} not matching versioned pattern')
version = 0
ret[version] = curr
print("ret",ret)
return ret

def kwargs_filenames(self,rel):
Expand Down
24 changes: 18 additions & 6 deletions preprocess/dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,11 @@ def parse(page, ti):
large_mp.send(ti, rels_vals)
large_mp.clear_recv(ti, f"download_{page}")

def clear(ti, rel):
db = persistency.mongo_db()

# remove all data previously stored for this relation
db[rel].remove({})

def persist(page, ti):
"""
Expand All @@ -118,9 +123,6 @@ def persist(page, ti):
# FIXME: per-rel tasks
for rel, sets in data.items():

# remove all data previously stored for this relation
db[rel].remove({})

for activity_id, set_ in sets.items():

# remove previously stored activity, if present FIXME: deprecated?
Expand Down Expand Up @@ -183,13 +185,12 @@ def encode(rel, ti):
# remove existing data in the collection
coll_out.remove({})

# how much time does each item require to be encoded

for document in coll_in.find(no_cursor_timeout=True):
document = dict(document) # copy
set_ = document['set_']
set_size = get_set_size(set_)

# how much time does each item require to be encoded
start = time.time()

for field in rel.fields:
Expand Down Expand Up @@ -341,6 +342,15 @@ def to_tsets(rel, ti):
op_kwargs={}
)

t_clear = {}
for rel in rels:
t_clear[rel] = PythonOperator(
task_id="clear",
python_callable=clear,
start_date=days_ago(2),
op_kwargs={'rel': rel}
)

t_persist = {}
for page in pages:
start = page*config.download_page_size
Expand All @@ -364,7 +374,9 @@ def to_tsets(rel, ti):
start_date=days_ago(2),
op_kwargs={'page': page}
)
t_download >> t_parse >> t_persist[page]
for rel in rels:
t_clear[rel] >> t_download
t_download>> t_parse >> t_persist[page]

for rel in rels:
t_to_npa = PythonOperator(
Expand Down

0 comments on commit bf95ae5

Please sign in to comment.