diff --git a/emgapi/management/commands/populate_metagenomics_exchange.py b/emgapi/management/commands/populate_metagenomics_exchange.py index 8707cd8e9..da7e7c252 100644 --- a/emgapi/management/commands/populate_metagenomics_exchange.py +++ b/emgapi/management/commands/populate_metagenomics_exchange.py @@ -15,15 +15,15 @@ # limitations under the License. import logging +from datetime import timedelta from django.conf import settings from django.core.management import BaseCommand -from django.utils import timezone from django.core.paginator import Paginator -from datetime import timedelta +from django.utils import timezone -from emgapi.models import AnalysisJob from emgapi.metagenomics_exchange import MetagenomicsExchangeAPI +from emgapi.models import AnalysisJob logger = logging.getLogger(__name__) @@ -65,7 +65,9 @@ def handle(self, *args, **options): self.dry_run = options.get("dry_run") self.pipeline_version = options.get("pipeline") - self.mgx_api = MetagenomicsExchangeAPI(base_url=settings.METAGENOMICS_EXCHANGE_API) + self.mgx_api = MetagenomicsExchangeAPI( + base_url=settings.METAGENOMICS_EXCHANGE_API + ) # never indexed or updated after indexed analyses_to_index_and_update = AnalysisJob.objects_for_mgx_indexing.to_add() @@ -96,63 +98,87 @@ def handle(self, *args, **options): def process_to_index_and_update_records(self, analyses_to_index_and_update): logging.info(f"Indexing {len(analyses_to_index_and_update)} new analyses") - for page in Paginator(analyses_to_index_and_update, settings.METAGENOMICS_EXCHANGE_PAGINATOR_NUMBER): + for page in Paginator( + analyses_to_index_and_update, + settings.METAGENOMICS_EXCHANGE_PAGINATOR_NUMBER, + ): jobs_to_update = [] - for ajob in page: + for annotation_job in page: + sequence_accession = "" + if annotation_job.run: + sequence_accession = annotation_job.run.accession + if annotation_job.assembly: + sequence_accession = annotation_job.assembly.accession + metadata = self.mgx_api.generate_metadata( - mgya=ajob.accession, - run_accession=ajob.run, - status="public" if not ajob.is_private else "private", + mgya=annotation_job.accession, sequence_accession=sequence_accession ) registry_id, metadata_match = self.mgx_api.check_analysis( - source_id=ajob.accession, sequence_id=ajob.run, metadata=metadata + mgya=annotation_job.accession, + sequence_accession=sequence_accession, + metadata=metadata, ) # The job is not registered if not registry_id: - logging.info(f"Add new {ajob}") + logging.info(f"Add new {annotation_job}") if self.dry_run: - logging.info(f"Dry-mode run: no addition to real ME for {ajob}") + logging.info( + f"Dry-mode run: no addition to real ME for {annotation_job}" + ) continue response = self.mgx_api.add_analysis( - mgya=ajob.accession, - run_accession=ajob.run, - public=not ajob.is_private, + mgya=annotation_job.accession, + sequence_accession=sequence_accession, ) if response.ok: - logging.info(f"Successfully added {ajob}") + logging.info(f"Successfully added {annotation_job}") registry_id, metadata_match = self.mgx_api.check_analysis( - source_id=ajob.accession, sequence_id=ajob.run) - ajob.mgx_accession = registry_id - ajob.last_mgx_indexed = timezone.now() + timedelta(minutes=1) - jobs_to_update.append(ajob) + mgya=annotation_job.accession, + sequence_accession=sequence_accession, + ) + annotation_job.mgx_accession = registry_id + annotation_job.last_mgx_indexed = timezone.now() + timedelta( + minutes=1 + ) + jobs_to_update.append(annotation_job) else: - logging.error(f"Error adding {ajob}: {response.message}") + logging.error( + f"Error adding {annotation_job}: {response.message}" + ) # else we have to check if the metadata matches, if not we need to update it else: if not metadata_match: - logging.info(f"Patch existing {ajob}") + logging.info(f"Patch existing {annotation_job}") if self.dry_run: logging.info( - f"Dry-mode run: no patch to real ME for {ajob}" + f"Dry-mode run: no patch to real ME for {annotation_job}" ) continue if self.mgx_api.patch_analysis( - registry_id=registry_id, data=metadata + registry_id=registry_id, data=metadata ): - logging.info(f"Analysis {ajob} updated successfully") + logging.info( + f"Analysis {annotation_job} updated successfully" + ) # Just to be safe, update the MGX accession - ajob.mgx_accession = registry_id - ajob.last_mgx_indexed = timezone.now() + timedelta(minutes=1) - jobs_to_update.append(ajob) + annotation_job.mgx_accession = registry_id + annotation_job.last_mgx_indexed = ( + timezone.now() + timedelta(minutes=1) + ) + jobs_to_update.append(annotation_job) else: - logging.error(f"Analysis {ajob} update failed") + logging.error(f"Analysis {annotation_job} update failed") else: - logging.debug(f"No edit for {ajob}, metadata is correct") + logging.debug( + f"No edit for {annotation_job}, metadata is correct" + ) AnalysisJob.objects.bulk_update( - jobs_to_update, ["last_mgx_indexed", "mgx_accession"], batch_size=settings.METAGENOMICS_EXCHANGE_PAGINATOR_NUMBER + jobs_to_update, + ["last_mgx_indexed", "mgx_accession"], + batch_size=settings.METAGENOMICS_EXCHANGE_PAGINATOR_NUMBER, ) def process_to_delete_records(self, analyses_to_delete): @@ -161,36 +187,48 @@ def process_to_delete_records(self, analyses_to_delete): """ logging.info(f"Processing {len(analyses_to_delete)} analyses to remove") - for page in Paginator(analyses_to_delete, settings.METAGENOMICS_EXCHANGE_PAGINATOR_NUMBER): + for page in Paginator( + analyses_to_delete, settings.METAGENOMICS_EXCHANGE_PAGINATOR_NUMBER + ): jobs_to_update = [] - for ajob in page: + for annotation_job in page: + sequence_accession = "" + if annotation_job.run: + sequence_accession = annotation_job.run.accession + if annotation_job.assembly: + sequence_accession = annotation_job.assembly.accession + metadata = self.mgx_api.generate_metadata( - mgya=ajob.accession, - run_accession=ajob.run, - status="public" if not ajob.is_private else "private", + mgya=annotation_job.accession, sequence_accession=sequence_accession ) registry_id, _ = self.mgx_api.check_analysis( - source_id=ajob.accession, sequence_id=ajob.run, metadata=metadata + mgya=annotation_job.accession, + sequence_accession=sequence_accession, + metadata=metadata, ) if registry_id: - logging.info(f"Deleting {ajob}") + logging.info(f"Deleting {annotation_job}") if self.dry_run: - logging.info(f"Dry-mode run: no delete from real ME for {ajob}") + logging.info( + f"Dry-mode run: no delete from real ME for {annotation_job}" + ) continue if self.mgx_api.delete_analysis(registry_id): - logging.info(f"{ajob} successfully deleted") - ajob.last_mgx_indexed = timezone.now() - jobs_to_update.append(ajob) + logging.info(f"{annotation_job} successfully deleted") + annotation_job.last_mgx_indexed = timezone.now() + jobs_to_update.append(annotation_job) else: - logging.info(f"{ajob} failed on delete") + logging.info(f"{annotation_job} failed on delete") else: logging.info( - f"{ajob} doesn't exist in the registry, nothing to delete" + f"{annotation_job} doesn't exist in the registry, nothing to delete" ) # BULK UPDATE # AnalysisJob.objects.bulk_update( - jobs_to_update, ["last_mgx_indexed"], batch_size=settings.METAGENOMICS_EXCHANGE_PAGINATOR_NUMBER + jobs_to_update, + ["last_mgx_indexed"], + batch_size=settings.METAGENOMICS_EXCHANGE_PAGINATOR_NUMBER, ) diff --git a/emgapi/metagenomics_exchange.py b/emgapi/metagenomics_exchange.py index dd69f5902..55ad381a3 100644 --- a/emgapi/metagenomics_exchange.py +++ b/emgapi/metagenomics_exchange.py @@ -15,9 +15,10 @@ # limitations under the License. import logging -import requests +import requests from django.conf import settings +from requests.exceptions import HTTPError class MetagenomicsExchangeAPI: @@ -25,7 +26,7 @@ class MetagenomicsExchangeAPI: def __init__(self, base_url=None): self.base_url = base_url or settings.METAGENOMICS_EXCHANGE_API - self.__token = settings.METAGENOMICS_EXCHANGE_API_TOKEN + self.__token = f"mgx {settings.METAGENOMICS_EXCHANGE_API_TOKEN}" self.broker = settings.METAGENOMICS_EXCHANGE_MGNIFY_BROKER def get_request(self, endpoint: str, params: dict): @@ -68,40 +69,95 @@ def patch_request(self, endpoint: str, data: dict): ) return response - def generate_metadata(self, mgya, run_accession, status): + def generate_metadata(self, mgya, sequence_accession): + """Generate the metadata object for the Metagenomics Exchange API. + + Parameters: + mgya : str + The MGnify Analysis accession. + sequence_accession : str + Either the Run accession or the Assembly accession related to the MGYA. + + Returns: + dict + A dictionary containing metadata for the Metagenomics Exchange API. + """ return { "confidence": "full", "endPoint": f"https://www.ebi.ac.uk/metagenomics/analyses/{mgya}", "method": ["other_metadata"], "sourceID": mgya, - "sequenceID": run_accession, - "status": status, + "sequenceID": sequence_accession, + "status": "public", "brokerID": self.broker, } - def add_analysis(self, mgya: str, run_accession: str, public: bool): - data = self.generate_metadata(mgya, run_accession, public) - response = self.post_request(endpoint="datasets", data=data) + def add_analysis(self, mgya: str, sequence_accession: str): + """Add an analysis to the M. Exchange + + Parameters: + mgya : str + The MGnify Analysis accession. + sequence_accession : str + Either the Run accession or the Assembly accession related to the MGYA. + + Returns: + requests.models.Response + The response object from the API request. + """ + data = self.generate_metadata(mgya, sequence_accession) + try: + response = self.post_request(endpoint="datasets", data=data) + except HTTPError as http_error: + try: + response_json = http_error.response.json() + logging.error(f"API response content: {response_json}") + except: + pass + raise http_error return response - def check_analysis( - self, source_id: str, sequence_id: str, public=None, metadata=None - ): - logging.info(f"Check {source_id} {sequence_id}") - params = {} - if public: - params = { - "status": "public" if public else "private", - "broker": self.broker, - } - endpoint = f"sequences/{sequence_id}/datasets" + def check_analysis(self, mgya: str, sequence_accession: str, metadata=None): + """Check if a sequence exists in the M. Exchange + + Parameters: + mgya : str + The MGnify Analysis accession. + sequence_accession : str + Either the Run accession or the Assembly accession related to the MGYA. + + Returns: + tuple + A tuple containing two elements: + - analysis_registry_id : str + The analysis registry ID. + - metadata_match : boolean + True, if the metadata matchs. + """ + if not mgya: + raise ValueError(f"mgya is mandatory.") + if not sequence_accession: + raise ValueError(f"sequence_accession is mandatory.") + + logging.info(f"Checking {mgya} - {sequence_accession}") + + params = { + "broker": self.broker, + } + + endpoint = f"sequences/{sequence_accession}/datasets" analysis_registry_id = None - metadata_match = True + metadata_match = False try: response = self.get_request(endpoint=endpoint, params=params) - except: - logging.error(f"Get API request failed") + except HTTPError as http_error: + logging.error(f"Get API request failed. HTTP Error: {http_error}") + try: + response_json = http_error.response.json() + logging.error(f"API response content: {response_json}") + except: + pass return analysis_registry_id, metadata_match data = response.json() @@ -109,28 +165,39 @@ def check_analysis( # The API will return an emtpy datasets array if it can find the accession if not len(datasets): - logging.info(f"{source_id} does not exist in ME") + logging.info(f"{mgya} does not exist in ME") return analysis_registry_id, metadata_match + # TODO: this code needs some refactoring to improve it: + """ + try: + found_record = next(item for item in datasets if item.get("sourceID") == mgya) + except StopIteration + ... + """ sourceIDs = [item.get("sourceID") for item in datasets] - if source_id in sourceIDs: - found_record = [ - item for item in datasets if item.get("sourceID") == source_id - ][0] - logging.info(f"{source_id} exists in ME") + if mgya in sourceIDs: + found_record = [item for item in datasets if item.get("sourceID") == mgya][ + 0 + ] + logging.info(f"{mgya} exists in ME") analysis_registry_id = found_record.get("registryID") + if not analysis_registry_id: + raise ValueError(f"The Metagenomics Exchange 'registryID' for {mgya} is null.") + if metadata: for metadata_record in metadata: if not (metadata_record in found_record): - metadata_match = False - return analysis_registry_id, metadata_match + return analysis_registry_id, False else: if metadata[metadata_record] != found_record[metadata_record]: metadata_match = False logging.info( - f"Incorrect field {metadata[metadata_record]} != {found_record[metadata_record]})" + f"The metadata doesn't match, for field {metadata[metadata_record]} != {found_record[metadata_record]})" ) - return analysis_registry_id, metadata_match + else: + metadata_match = True + return analysis_registry_id, metadata_match return analysis_registry_id, metadata_match return analysis_registry_id, metadata_match diff --git a/tests/me/test_metagenomics_exchange.py b/tests/me/test_metagenomics_exchange.py index c964c9d85..05148d497 100644 --- a/tests/me/test_metagenomics_exchange.py +++ b/tests/me/test_metagenomics_exchange.py @@ -2,31 +2,43 @@ # -*- coding: utf-8 -*- import pytest -import logging - +import requests +import responses from django.conf import settings from emgapi.metagenomics_exchange import MetagenomicsExchangeAPI -import requests -import responses -from unittest import mock - class TestME: - def test_check_existing_analysis_me(self): + @responses.activate + def test_check_existing_analysis_me(self, settings): + # FIXME: this test doesn't check if the metadata matches or not. + mgya = "MGYA00293719" + sequence_accession = "ERR3063408" + responses.add( + responses.GET, + f"{settings.METAGENOMICS_EXCHANGE_API}/sequences/{sequence_accession}/datasets", + json={"datasets": [{"sourceID": mgya, "registryID": "MGX_FAKE"}]}, + status=200, + ) me_api = MetagenomicsExchangeAPI() - source_id = "MGYA00293719" - seq_id = "ERR3063408" - return_values = me_api.check_analysis(source_id, seq_id, True) - assert return_values[0] + registry_id, _ = me_api.check_analysis(mgya, sequence_accession) + assert registry_id == "MGX_FAKE" + + @responses.activate def test_check_not_existing_analysis_me(self): + mgya = "MGYA10293719" + sequence_accession = "ERR3063408" + responses.add( + responses.GET, + f"{settings.METAGENOMICS_EXCHANGE_API}/sequences/{sequence_accession}/datasets", + json={"datasets": []}, + status=200, + ) me_api = MetagenomicsExchangeAPI() - source_id = "MGYA10293719" - seq_id = "ERR3063408" - return_values = me_api.check_analysis(source_id, seq_id, True) + return_values = me_api.check_analysis(mgya, sequence_accession) assert not return_values[0] @pytest.mark.skip(reason="Error on ME API side") @@ -35,9 +47,7 @@ def test_post_existing_analysis_me(self): source_id = "MGYA00293719" # Should return -> https://developer.mozilla.org/en-US/docs/Web/HTTP/Status/409 with pytest.raises(requests.HTTPError, match="401 Client Error"): - me_api.add_analysis( - mgya=source_id, run_accession="ERR3063408", public=True - ).json() + me_api.add_analysis(mgya=source_id, sequence_accession="ERR3063408").json() @responses.activate def test_mock_post_new_analysis(self): @@ -48,7 +58,7 @@ def test_mock_post_new_analysis(self): responses.add(responses.POST, url, json={"success": True}, status=201) response = me_api.add_analysis( - mgya="MGYA00593709", run_accession="SRR3960575", public=True + mgya="MGYA00593709", sequence_accession="SRR3960575" ) assert response.status_code == 201 @@ -67,38 +77,41 @@ def test_mock_delete_analysis_from_me(self): assert response.status_code == 201 assert response.json() == {"success": True} - def test_wrong_delete_request_me(self): + @responses.activate + def test_incorrect_delete_request_me(self): + # TODO: this test doesn't make much sense me_api = MetagenomicsExchangeAPI() + responses.add( + responses.DELETE, + f"{settings.METAGENOMICS_EXCHANGE_API}/dataset/MGX0000780", + status=401, + ) registry_id = "MGX0000780" endpoint = f"dataset/{registry_id}" - assert not me_api.delete_request(endpoint) + response = me_api.delete_request(endpoint) + assert response.status_code == 401 - @mock.patch("emgapi.metagenomics_exchange.MetagenomicsExchangeAPI.patch_request") - def test_patch_analysis_me(self, mock_patch_request): + @responses.activate + def test_patch_analysis_me(self): me_api = MetagenomicsExchangeAPI() - class MockResponse: - def __init__(self, json_data, status_code): - self.json_data = json_data - self.status_code = status_code - self.ok = True - - def json(self): - return self.json_data - - mock_patch_request.return_value = MockResponse({}, 200) registry_id = "MGX0000788" mgya = "MGYA00593709" run_accession = "SRR3960575" - public = False - data = { "confidence": "full", "endPoint": f"https://www.ebi.ac.uk/metagenomics/analyses/{mgya}", "method": ["other_metadata"], "sourceID": mgya, "sequenceID": run_accession, - "status": "public" if public else "private", + "status": "public", "brokerID": "EMG", } + + responses.add( + responses.PATCH, + f"{settings.METAGENOMICS_EXCHANGE_API}/datasets/{registry_id}", + status=200, + ) + assert me_api.patch_analysis(registry_id, data) diff --git a/tests/me/test_populate_metagenomics_exchange.py b/tests/me/test_populate_metagenomics_exchange.py index d9a35f32d..d10083ac1 100644 --- a/tests/me/test_populate_metagenomics_exchange.py +++ b/tests/me/test_populate_metagenomics_exchange.py @@ -42,8 +42,10 @@ def test_population_dry_mode(self, caplog): assert "Dry-mode run: no addition to real ME for MGYA00466090" in caplog.text assert "Dry-mode run: no addition to real ME for MGYA00466091" in caplog.text assert "Processing 1 analyses to remove" in caplog.text - assert "MGYA00005678 doesn't exist in the registry, nothing to delete" in caplog.text - + assert ( + "MGYA00005678 doesn't exist in the registry, nothing to delete" + in caplog.text + ) @pytest.mark.usefixtures("run_multiple_analysis_me") @mock.patch("emgapi.metagenomics_exchange.MetagenomicsExchangeAPI.add_analysis") @@ -56,16 +58,18 @@ def test_add_new_analysis(self, mock_check_analysis, mock_add_analysis, caplog): """ pipeline = 4.1 registry_id = "MGX1" + class MockResponse: def __init__(self, json_data, status_code): self.json_data = json_data self.status_code = status_code self.ok = True + def json(self): return self.json_data def mock_check_process(*args, **kwargs): - if 'metadata' in kwargs: + if "metadata" in kwargs: return None, True else: return registry_id, True @@ -84,14 +88,10 @@ def mock_check_process(*args, **kwargs): assert ajob.last_mgx_indexed assert ajob.mgx_accession == registry_id - @pytest.mark.usefixtures("run_multiple_analysis_me") @mock.patch("emgapi.metagenomics_exchange.MetagenomicsExchangeAPI.check_analysis") @mock.patch("emgapi.metagenomics_exchange.MetagenomicsExchangeAPI.delete_analysis") - def test_removals(self, - mock_delete_analysis, - mock_check_analysis, - caplog): + def test_removals(self, mock_delete_analysis, mock_check_analysis, caplog): """ Test delete process. 1 analysis should be removed and updated indexed field in DB @@ -100,10 +100,7 @@ def test_removals(self, mock_check_analysis.return_value = True, True mock_delete_analysis.return_value = True - call_command( - "populate_metagenomics_exchange", - pipeline=pipeline - ) + call_command("populate_metagenomics_exchange", pipeline=pipeline) assert "Indexing 0 new analyses" in caplog.text assert "Processing 1 analyses to remove" in caplog.text assert "Deleting MGYA00005678" in caplog.text @@ -114,10 +111,7 @@ def test_removals(self, @pytest.mark.usefixtures("run_multiple_analysis_me") @mock.patch("emgapi.metagenomics_exchange.MetagenomicsExchangeAPI.check_analysis") @mock.patch("emgapi.metagenomics_exchange.MetagenomicsExchangeAPI.patch_analysis") - def test_update(self, - mock_patch_analysis, - mock_check_analysis, - caplog): + def test_update(self, mock_patch_analysis, mock_check_analysis, caplog): """ Test update process for job that was indexed before updated. MGX accession and last_mgx_indexed should be updated @@ -138,5 +132,3 @@ def test_update(self, ajob = AnalysisJob.objects.filter(pipeline__release_version=pipeline).first() assert ajob.last_mgx_indexed.date() == timezone.now().date() assert ajob.mgx_accession == registry_id - -