Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improved dataset processing and dataset lifecycle management #2821

Open
wants to merge 24 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
db68de5
chore: update initial monthly currency
sylvanr Feb 2, 2024
264a6ef
chore: update IATI Data Dump url
sylvanr Feb 2, 2024
f8f86b3
chore: temporarily disabled code4iati datadump status check until upd…
sylvanr Feb 2, 2024
c5a8569
feat: updated old dataset drop functionality to better match with alw…
sylvanr Feb 5, 2024
7dd644b
chore: better logging and error handling for index_to_core
sylvanr Feb 5, 2024
9ac1c1e
chore: update codelists
sylvanr Feb 5, 2024
3100c9e
fix: added list to type checking for attribute cleaning
sylvanr Feb 5, 2024
5f4448d
feat: additional dataset metadata and include retry to failing datase…
sylvanr Feb 5, 2024
1a62b42
feat: update managed schema for dataset metadata
sylvanr Feb 5, 2024
8787aa5
feat: improve dataset updates with hash and id checking
sylvanr Feb 5, 2024
bbbb9fc
chore: update logging for dataset processing
sylvanr Feb 5, 2024
62cf9b8
refactor: remove double dump in favour of celery retry
sylvanr Feb 6, 2024
368548c
chore: update tests
sylvanr Feb 6, 2024
abe6cf5
chore: style update and fix tests
sylvanr Feb 6, 2024
c1ff7c8
refactor: maintainability
sylvanr Feb 6, 2024
9638d43
fix: drop data only if data exists
sylvanr Feb 7, 2024
554c6b2
chore: update formatting and coverage
sylvanr Feb 12, 2024
fb441ec
feat: added create and delete custom dataset feature
sylvanr Feb 12, 2024
b75dc43
feat: don't drop custom datasets in regular indexing cycle
sylvanr Feb 12, 2024
d0f272b
docs: update usage documentation
sylvanr Feb 12, 2024
897fa22
chore: update Django to 4.2.10
sylvanr Feb 12, 2024
456d56f
feat: updated parsing, parsing status and post-parse clean
sylvanr Feb 12, 2024
b42c6f7
fix: currency aggregation returning multiple activities on related ac…
sylvanr Feb 27, 2024
f43cd22
chore: dependency update
sylvanr Aug 22, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file modified .coverage
Binary file not shown.
2 changes: 1 addition & 1 deletion .env.example.docker
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ MEM_SOLR_MIN=20
MEM_SOLR_MAX=40

# Flower password
CELERYFLOWER_PASSWORD=zz
CELERYFLOWER_USER=zz
CELERYFLOWER_PASSWORD=zz

# DJANGO DEFAULT SUPERUSER WHEN FIRST RUN
DJANGO_SUPERUSER_USERNAME=admin_example
Expand Down
5 changes: 4 additions & 1 deletion direct_indexing/cleaning/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ def recursive_attribute_cleaning(data):
data = {key.replace('@', ''): item for key, item in data.items()}
# Remove the lang xml tag
data = {key.replace(XML_LANG_STR_STRIPPED, LANG_STR): item for key, item in data.items()}
data = {key: item for key, item in data.items() if '._' not in key}
data = {key: item for key, item in data.items() if 'http' not in key}

# A list of fields that need to be appended to the dataset
add_fields = {}
for key, value in data.items():
Expand Down Expand Up @@ -55,7 +58,7 @@ def extract_key_value_fields(data, add_fields, key, value):
add_fields = extract_single_values(add_fields, value, key, data)
# If the fields are not yet at the lowest level of key-value pair,
# process the underlying field.
elif type(value) in [OrderedDict, dict]: # was list instead of dict
elif type(value) in [OrderedDict, dict, list]: # was list instead of dict
data[key] = recursive_attribute_cleaning(value)
return add_fields

Expand Down
7 changes: 7 additions & 0 deletions direct_indexing/custom_fields/currency_aggregation.py
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,13 @@ def get_child_aggregations(dba, aggregation_fields):
# {MONGO_UNWIND: "$related-activity"},
{"$unwind": "$related-activity"},
{"$match": {"related-activity.type": 1}},
{'$group': {
'_id': '$_id',
'uniqueActivity': {
'$first': '$$ROOT'
}
}},
{'$replaceRoot': {'newRoot': '$uniqueActivity'}},
{"$group": group_object}
# {MONGO_GROUP: group_object}
]))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,6 @@ def iterate_third_level_children(child, data, field, target, target_field, total
field_index = -1
elif child in item:
field_index = total_field
print(item[child])
if not isinstance(item[child], list):
total_field += 1
else:
Expand Down
2 changes: 1 addition & 1 deletion direct_indexing/data_sources/codelists_dict.json

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion direct_indexing/data_sources/currency_monthlyaverage.json

Large diffs are not rendered by default.

38 changes: 27 additions & 11 deletions direct_indexing/direct_indexing.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import datetime
import json
import logging

Expand Down Expand Up @@ -76,27 +77,42 @@ def drop_removed_data():
existing = []

# Get the datasets that have been indexed
url = f'{settings.SOLR_DATASET}/select?fl=name%2Cid%2Ciati_cloud_indexed&indent=true&q.op=OR&q=*%3A*&rows=10000000'
url = f'{settings.SOLR_DATASET}/select?fl=name%2Cid%2Ciati_cloud_indexed%2Ciati_cloud_custom&indent=true&q.op=OR&q=*%3A*&rows=10000000' # NOQA: E501
data = requests.get(url)
data = data.json()['response']['docs']
if len(data) == 0:
logging.info('drop_removed_data:: No data found in the dataset index, skipping drop')
return

# Get a list of dataset names from the dataset metadata file
with open(f'{settings.BASE_DIR}/direct_indexing/data_sources/datasets/dataset_metadata.json') as f:
meta = json.load(f)
for dataset in meta:
existing.append(dataset['name'])
existing.append(dataset['id'])

for d in data:
if d['name'] not in existing:
dropped_list.append(d['name'])
if 'iati_cloud_custom' not in d and d['id'] not in existing:
dropped_list.append(d['id'])

# For every core with dataset data, delete the data for the dropped datasets identified with the dataset.name field
# For every core with dataset data, delete the data for the dropped datasets identified with the dataset.id field
for core in ['activity', 'transaction', 'result', 'budget']:
solr = pysolr.Solr(f'{settings.SOLR_URL}/{core}', always_commit=True)
for d_name in dropped_list:
if len(solr.search(f'dataset.name:"{d_name}"')) > 0:
solr.delete(q=f'dataset.name:"{d_name}"')
for d_id in dropped_list:
if len(solr.search(f'dataset.id:"{d_id}"')) > 0:
solr.delete(q=f'dataset.id:"{d_id}"')
solr = pysolr.Solr(settings.SOLR_DATASET, always_commit=True)
for d_name in dropped_list:
if len(solr.search(f'name:"{d_name}"')) > 0:
solr.delete(q='name:{d_name}')
for d_id in dropped_list:
if len(solr.search(f'id:"{d_id}"')) > 0:
# solr.delete(q=f'id:{d_id}')
iati_cloud_removed_date = str(datetime.datetime.now().isoformat())
# remove last three characters from the string and add Z
iati_cloud_removed_date = iati_cloud_removed_date[:-3] + 'Z'
update_data = {
'id': d_id,
'iati_cloud_removed_date': {'set': iati_cloud_removed_date},
'iati_cloud_removed_reason': {'set': 'The dataset was not available in the latest dataset download.'},
'iati_cloud_indexed': {'set': False},
'iati_cloud_should_be_indexed': {'set': False}
}
# Perform the partial update using atomic update syntax
solr.add([update_data])
22 changes: 16 additions & 6 deletions direct_indexing/metadata/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,16 @@ def __init__(self, message):

@shared_task
def subtask_process_dataset(dataset, update):
dataset_indexing_result, result = dataset_processing.fun(dataset, update)
dataset_indexing_result, result, should_retry = dataset_processing.fun(dataset, update)
if result == 'Successfully indexed' and dataset_indexing_result == 'Successfully indexed':
return result
elif dataset_indexing_result == 'Dataset invalid':
return dataset_indexing_result
elif should_retry:
raise subtask_process_dataset.retry(countdown=60, max_retries=2, exc=DatasetException(message=f'Error indexing dataset {dataset["id"]}\nDataset metadata:\n{result}\nDataset indexing:\n{str(dataset_indexing_result)}')) # NOQA
else:
raise DatasetException(message=f'Error indexing dataset {dataset["id"]}\nDataset metadata:\n{result}\nDataset indexing:\n{str(dataset_indexing_result)}') # NOQA
return "Dataset was not indexed"
# commented to prevent false positive exceptions. raise DatasetException(message=f'Error indexing dataset {dataset["id"]}\nDataset metadata:\n{result}\nDataset indexing:\n{str(dataset_indexing_result)}') # NOQA


def index_datasets_and_dataset_metadata(update, force_update):
Expand Down Expand Up @@ -74,13 +77,19 @@ def load_codelists():

def _get_existing_datasets():
url = settings.SOLR_DATASET + (
'/select?q=resources.hash:* AND extras.filetype:*'
'/select?q=*:*'
' AND id:*&rows=100000&wt=json&fl=resources.hash,id,extras.filetype'
)
data = requests.get(url).json()['response']['docs']
datasets = {}
for doc in data:
datasets[doc['id']] = {'hash': doc['resources.hash'][0], 'filetype': doc['extras.filetype']}
_hash = ""
if 'resources.hash' in doc:
_hash = doc['resources.hash'][0]
_filetype = ""
if 'extras.filetype' in doc:
_filetype = doc['extras.filetype']
datasets[doc['id']] = {'hash': _hash, 'filetype': _filetype}
return datasets


Expand All @@ -90,8 +99,9 @@ def prepare_update(dataset_metadata):
new_datasets = [d for d in dataset_metadata if d['id'] not in existing_datasets]
old_datasets = [d for d in dataset_metadata if d['id'] in existing_datasets]
changed_datasets = [
d for d in old_datasets if d['resources'][0]['hash'] != existing_datasets[d['id']]['hash']
] # Skip organisation files for incremental updates
d for d in old_datasets if
('' if 'hash' not in d['resources'][0] else d['resources'][0]['hash']) != existing_datasets[d['id']]['hash']
]
updated_datasets = new_datasets + changed_datasets
updated_datasets_bools = [False for _ in new_datasets] + [True for _ in changed_datasets]
return updated_datasets, updated_datasets_bools
96 changes: 61 additions & 35 deletions direct_indexing/processing/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from pysolr import Solr
from xmljson import badgerfish as bf

from direct_indexing.cleaning.dataset import recursive_attribute_cleaning
from direct_indexing.cleaning.dataset import recursive_attribute_cleaning#, broken_dataset
from direct_indexing.cleaning.metadata import clean_dataset_metadata
from direct_indexing.custom_fields import custom_fields, organisation_custom_fields
from direct_indexing.custom_fields.models import codelists
Expand Down Expand Up @@ -45,7 +45,8 @@ def fun(dataset, update=False):
dataset_metadata = custom_fields.get_custom_metadata(dataset)
# Validate the relevant files, mark others as invalid
validation_status = 'Valid'
if valid_version and 'dataset.extras.validation_status' in dataset_metadata:
should_be_indexed = False
if 'dataset.extras.validation_status' in dataset_metadata:
validation_status = 'Invalid' if dataset_metadata['dataset.extras.validation_status'] == 'Critical' else 'Valid'

# Add the validation status to the dataset
Expand All @@ -60,13 +61,15 @@ def fun(dataset, update=False):

# Index the relevant datasets,
# these are activity files of a valid version and that have been successfully validated (not critical)
if validation_status == 'Valid':
indexed, dataset_indexing_result = index_dataset(dataset_filepath, dataset_filetype, codelist, currencies,
dataset_metadata)
if validation_status == 'Valid' and valid_version:
indexed, dataset_indexing_result, should_be_indexed = index_dataset(dataset_filepath, dataset_filetype,
codelist, currencies, dataset_metadata)
# Add an indexing status to the dataset metadata.
dataset['iati_cloud_indexed'] = indexed
dataset['iati_cloud_indexed_datetime'] = str(datetime.now())

dataset['iati_cloud_should_be_indexed'] = should_be_indexed
if not indexed:
dataset['iati_cloud_removed_reason'] = dataset_indexing_result
# Index the dataset metadata
logging.info('-- Save the dataset metadata')
result = index(
Expand All @@ -75,7 +78,9 @@ def fun(dataset, update=False):
settings.SOLR_DATASET_URL
)

return dataset_indexing_result, result
should_retry = should_be_indexed and not indexed

return dataset_indexing_result, result, should_retry


def index_dataset(internal_url, dataset_filetype, codelist, currencies, dataset_metadata):
Expand All @@ -88,20 +93,23 @@ def index_dataset(internal_url, dataset_filetype, codelist, currencies, dataset_
:param currencies: An initialized currencies object
:return: true if indexing successful, false if failed.
"""
should_be_indexed = False
try:
core_url = settings.SOLR_ACTIVITY_URL if dataset_filetype == 'activity' else settings.SOLR_ORGANISATION_URL
json_path = convert_and_save_xml_to_processed_json(internal_url, dataset_filetype, codelist, currencies,
dataset_metadata)
logging.info("-- Get JSON path")
json_path, should_be_indexed, p_res = convert_and_save_xml_to_processed_json(internal_url, dataset_filetype,
codelist, currencies, dataset_metadata)
if json_path:
logging.info("-- INDEXING JSON PATH")
result = index_to_core(core_url, json_path, remove=True)
logging.debug(f'result of indexing {result}')
logging.info(f'result of indexing {result}')
if result == 'Successfully indexed':
return True, result
return False, result
return False, "No JSON Path found"
return True, result, should_be_indexed
return False, "Unable to index the processed dataset.", False
return False, p_res, should_be_indexed
except Exception as e: # NOQA
logging.warning(f'Exception occurred while indexing {dataset_filetype} dataset:\n{internal_url}\n{e}\nTherefore the dataset will not be indexed.') # NOQA
return False, str(e)
return False, str(e), should_be_indexed


def convert_and_save_xml_to_processed_json(filepath, filetype, codelist, currencies, dataset_metadata):
Expand All @@ -116,27 +124,22 @@ def convert_and_save_xml_to_processed_json(filepath, filetype, codelist, currenc
:param dataset_metadata: The metadata of the dataset.
:return: The filepath of the json file.
"""
should_be_indexed = False
parser = ET.XMLParser(encoding='utf-8')
try:
etree = ET.parse(filepath, parser=parser)
tree = ET.tostring(etree.getroot())
except ET.ParseError:
return None
logging.info(f'-- Error parsing {filepath}')
return None, should_be_indexed, "Unable to read XML file."
# Convert the tree to json using BadgerFish method.
data = bf.data(ET.fromstring(tree))
# Retrieve activities
data_found = False
if filetype == 'activity' and 'iati-activities' in data:
if 'iati-activity' in data['iati-activities']:
data = data['iati-activities']['iati-activity']
data_found = True
elif filetype == 'organisation' and 'iati-organisations' in data:
if 'iati-organisation' in data['iati-organisations']:
data = data['iati-organisations']['iati-organisation']
data_found = True
data, data_found = extract_activity_or_organisation_data(filetype, data)

if not data_found:
return data_found
logging.info(f'-- No data found in {filepath}')
return data_found, should_be_indexed, "Data was not present in the data dump."
# Clean the dataset
data = recursive_attribute_cleaning(data)

Expand All @@ -145,17 +148,37 @@ def convert_and_save_xml_to_processed_json(filepath, filetype, codelist, currenc
data = custom_fields.add_all(data, codelist, currencies, dataset_metadata)
if filetype == 'organisation':
data = organisation_custom_fields.add_all(data)

json_path = json_filepath(filepath)
if not json_path:
return False
with open(json_path, 'w') as json_file:
json.dump(data, json_file)
logging.info(f'-- Error creating json path for {filepath}')
return False, should_be_indexed, "A JSON path could not be created for the dataset."
should_be_indexed = True
logging.info(f'-- Saving to {json_path}')
try:
with open(json_path, 'w') as json_file:
json.dump(data, json_file)
except Exception:
logging.info(f'-- Error saving to {json_path}, failed')
return False, should_be_indexed, "Processed data could not be saved as JSON."

if not settings.FCDO_INSTANCE:
dataset_subtypes(filetype, data, json_path)

return json_path
return json_path, should_be_indexed, "Success"


def extract_activity_or_organisation_data(filetype, data):
data_found = False
if filetype == 'activity' and 'iati-activities' in data:
if 'iati-activity' in data['iati-activities']:
data = data['iati-activities']['iati-activity']
data_found = True
elif filetype == 'organisation' and 'iati-organisations' in data:
if 'iati-organisation' in data['iati-organisations']:
data = data['iati-organisations']['iati-organisation']
data_found = True
return data, data_found


def json_filepath(filepath):
Expand Down Expand Up @@ -204,8 +227,11 @@ def index_subtypes(json_path, subtypes):
"""
for subtype in subtypes:
subtype_json_path = f'{os.path.splitext(json_path)[0]}_{subtype}.json'
with open(subtype_json_path, 'w') as json_file:
json.dump(subtypes[subtype], json_file)

solr_url = activity_subtypes.AVAILABLE_SUBTYPES[subtype]
index_to_core(solr_url, subtype_json_path, remove=True)
try:
with open(subtype_json_path, 'w') as json_file:
json.dump(subtypes[subtype], json_file)

solr_url = activity_subtypes.AVAILABLE_SUBTYPES[subtype]
index_to_core(solr_url, subtype_json_path, remove=True)
except Exception as e:
logging.error(f'Error indexing subtype {subtype} of {json_path}:\n{e}')
5 changes: 5 additions & 0 deletions direct_indexing/solr/cores/dataset/managed-schema
Original file line number Diff line number Diff line change
Expand Up @@ -496,6 +496,11 @@
<field name="extras.verified" type="text_general_single"/>
<field name="extras.validation_status" type="text_general_single"/>
<field name="iati_cloud_indexed" type="boolean"/>
<field name="iati_cloud_indexed_datetime" type="pdate"/>
<field name="iati_cloud_removed_date" type="pdate"/>
<field name="iati_cloud_removed_reason" type="text_general_single"/>
<field name="iati_cloud_should_be_indexed" type="boolean"/>
<field name="iati_cloud_custom" type="boolean"/>
<field name="id" type="string" multiValued="false" indexed="true" required="true" stored="true"/>
<field name="isopen" type="boolean"/>
<field name="license_id" type="text_general_single"/>
Expand Down
Loading
Loading