diff --git a/eodag/plugins/search/qssearch.py b/eodag/plugins/search/qssearch.py index 3d3b2b50d..f12d522f5 100644 --- a/eodag/plugins/search/qssearch.py +++ b/eodag/plugins/search/qssearch.py @@ -17,10 +17,12 @@ # limitations under the License. from __future__ import annotations +import concurrent import logging import re from collections.abc import Iterable from copy import copy as copy_copy +from queue import Empty, Queue from typing import ( TYPE_CHECKING, Any, @@ -55,7 +57,7 @@ from requests.adapters import HTTPAdapter from requests.auth import AuthBase -from eodag.api.product import EOProduct +from eodag.api.product import AssetsDict, EOProduct from eodag.api.product.metadata_mapping import ( NOT_AVAILABLE, format_query_params, @@ -81,6 +83,7 @@ format_dict_items, get_args, get_ssl_context, + guess_file_type, quote, string_to_jsonpath, update_nested_dict, @@ -99,8 +102,14 @@ ) if TYPE_CHECKING: + from concurrent.futures import Future + from eodag.config import PluginConfig + +DATA_EXTENSIONS = ["jp2", "tiff", "nc", "grib"] + + logger = logging.getLogger("eodag.search.qssearch") @@ -1243,9 +1252,122 @@ def normalize_results( # once metadata pre-mapping applied execute QueryStringSearch.normalize_results products = super(ODataV4Search, self).normalize_results(results, **kwargs) + _ODataV4NodeCrawler(self).discover_assets(products) + return products +class _ODataV4NodeCrawler: + """A class that crawls through the node tree structure of OData v.4 products. + + Add the all files (node leaves) as assets of the product.""" + + def __init__(self, search_plugin: ODataV4Search, max_connections: int = 100): + """Init the crawler. + + :param search_plugin: The OData search plugin + :type search_plugin: :class:`~eodag.plugins.search.ODataV4Search` + :param max_connections: (optional) Maximum number of connections for HTTP requests + :type max_connections: int + """ + self.search_plugin = search_plugin + self.max_connections = max_connections + self.nodes_queue: Queue = Queue() + + def _fetch_node(self, product: EOProduct, url: str) -> Tuple[EOProduct, str, Dict]: + """Send the HTTP request of a single node. + + :param product: The EO product associated with the given URL + :type product: :class:`~eodag.api.product._product.EOProduct` + :param url: URL of the node to fetch + :type url: string + :returns: The EO Product, the node URL, the JSON response + :rtype: Tuple[:class:`~eodag.api.product._product.EOProduct`, str, Dict] + """ + response = QueryStringSearch._request( + self.search_plugin, + PreparedSearch( + url=url, + exception_message=f"Skipping error while fetching node {url}", + ), + ) + return (product, url, response.json()) + + def _scrape_nodes(self, future: Future): + """Scrape child nodes. + + Given the a node, scrape all the child nodes and add them a new jobs to the executor. + + :param future: The result of `_fetch_node()` + :type product: :class:`~concurrent.futures.Future` + """ + product, url, json_response = future.result() + # loop over the list of children + for node in json_response["result"]: + if node["ChildrenNumber"] > 0: + # explore sub-nodes + self.nodes_queue.put(node["Nodes"]["uri"]) + elif node["ContentLength"] > 0: + # this is a file + # replace suffix "/Nodes" with "/$value" in the download link + file_url = node["Nodes"]["uri"] + file_url = file_url[: -len("/Nodes")] + "/$value" + # build the asset's key as the sequence of all the node's names (skip the first node) + url_parse = urlparse(file_url) + node_path = [ + p for p in url_parse.path.split("/") if p.startswith("Nodes(") + ] + if node_path: + # skip the first node + node_path = node_path[1:] + if node_path: + # extract the names by removing "Nodes(...)" + node_path = [p[6:-1] for p in node_path] + asset_key = "/".join(node_path) + role = ( + "data" + if node["Id"].split(".")[-1] in DATA_EXTENSIONS + else "metadata" + ) + product.assets[asset_key] = { + "title": node["Id"], + "roles": [role], + "href": file_url, + } + if mime_type := guess_file_type(node["Id"]): + product.assets[asset_key]["type"] = mime_type + + def discover_assets(self, products: list[EOProduct]) -> None: + """Add discovered assets to the products in the given list. + + :param products: List of EO products whose assets are discovered + :type products: list[:class:`~eodag.api.product._product.EOProduct`] + """ + + if not getattr(self.search_plugin.config, "discover_assets", False): + return + + discover_endpoint = self.search_plugin.config.api_endpoint.rstrip("/") + with concurrent.futures.ThreadPoolExecutor( + max_workers=self.max_connections + ) as executor: + self.nodes_queue = Queue() + # init the queue with the root node of each product + for product in products: + product.assets = AssetsDict(product) + id = product.properties.get("uid") + base_url = f"{discover_endpoint}({id})" + self.nodes_queue.put(f"{base_url}/Nodes") + # start consuming the queue and submitting jobs + while True: + try: + url = self.nodes_queue.get(timeout=HTTP_REQ_TIMEOUT) + job = executor.submit(self._fetch_node, product, url) + job.add_done_callback(self._scrape_nodes) + except Empty: + return + + class PostJsonSearch(QueryStringSearch): """A specialisation of a QueryStringSearch that uses POST method""" diff --git a/eodag/resources/providers.yml b/eodag/resources/providers.yml index 79f569e0d..105079dfd 100644 --- a/eodag/resources/providers.yml +++ b/eodag/resources/providers.yml @@ -3898,6 +3898,7 @@ - "ContentDate/Start lt {completionTimeFromAscendingNode#to_iso_utc_datetime}" - "ContentDate/End gt {startTimeFromAscendingNode#to_iso_utc_datetime}" - contains(Name,'{id}') + discover_assets: true discover_metadata: auto_discovery: true metadata_pattern: '^(?!collection)[a-zA-Z0-9]+$' @@ -4003,7 +4004,7 @@ realm: 'CDSE' client_id: 'cdse-public' client_secret: null - token_provision: qs + token_provision: header token_qs_key: 'token' auth_error_code: 401 ssl_verify: true diff --git a/tests/units/test_download_plugins.py b/tests/units/test_download_plugins.py index 83b61ccdb..d4b8f14d3 100644 --- a/tests/units/test_download_plugins.py +++ b/tests/units/test_download_plugins.py @@ -32,7 +32,7 @@ import yaml from eodag.api.product.metadata_mapping import DEFAULT_METADATA_MAPPING -from eodag.utils import ProgressCallback +from eodag.utils import MockResponse, ProgressCallback from eodag.utils.exceptions import DownloadError from tests import TEST_RESOURCES_PATH from tests.context import ( @@ -1999,3 +1999,95 @@ def test_plugins_download_creodias_s3( self.assertEqual(mock_finalize_s2_safe_product.call_count, 0) self.assertEqual(mock_check_manifest_file_list.call_count, 0) self.assertEqual(mock_flatten_top_directories.call_count, 1) + + +class TestDownloadPluginODataV4(BaseDownloadPluginTest): + def setUp(self): + super(TestDownloadPluginODataV4, self).setUp() + + @mock.patch("eodag.plugins.download.http.ODataV4Download._request", autospec=True) + def test_plugins_download_odatav4_discover_assets(self, mock__request): + """ODataV4Download.discover_assets must add additional nodes as assets if configured""" + product = EOProduct( + "cop_dataspace", + dict( + geometry="POINT (0 0)", + title="dummy_product", + id="dummy", + ), + ) + plugin = self.get_download_plugin(product) + api_endpoint = plugin.config.discover_assets_endpoint.rstrip("/") + # response to /Products(1234-5678)/Nodes(dummy)/Nodes + response_1 = { + "result": [ + { + "Id": "bar", + "Name": "bar", + "ContentLength": 0, + "ChildrenNumber": 2, + "Nodes": { + "uri": f"{api_endpoint}(1234-5678)/Nodes(dummy)/Nodes(bar)/Nodes" + }, + }, + { + "Id": "metadata.xml", + "Name": "metadata.xml", + "ContentLength": 100, + "ChildrenNumber": 0, + "Nodes": { + "uri": f"{api_endpoint}(1234-5678)/Nodes(dummy)/Nodes(metadata.xml)/Nodes" + }, + }, + ] + } + # response to /Products(1234-5678)/Nodes(dummy)/Nodes(bar)/Nodes + response_2 = { + "result": [ + { + "Id": "img_1.jp2", + "Name": "img_1.jp2", + "ContentLength": 100, + "ChildrenNumber": 0, + "Nodes": { + "uri": f"{api_endpoint}(1234-5678)/Nodes(dummy)/Nodes(bar)/Nodes(img_1.jp2)/Nodes" + }, + }, + { + "Id": "img_2.jp2", + "Name": "img_2.jp2", + "ContentLength": 100, + "ChildrenNumber": 0, + "Nodes": { + "uri": f"{api_endpoint}(1234-5678)/Nodes(dummy)/Nodes(bar)/Nodes(img_2.jp2)/Nodes" + }, + }, + ] + } + mock__request.side_effect = [ + MockResponse(response_1, 200), + MockResponse(response_2, 200), + ] + + expected_assets = { + "metadata.xml": { + "title": "metadata.xml", + "roles": ["metadata"], + "href": f"{api_endpoint}(1234-5678)/Nodes(dummy)/Nodes(metadata.xml)/$value", + "type": "text/xml", + }, + "bar/img_1.jp2": { + "title": "img_1.jp2", + "roles": ["data"], + "href": f"{api_endpoint}(1234-5678)/Nodes(dummy)/Nodes(bar)/Nodes(img_1.jp2)/$value", + "type": "image/jpeg2000", + }, + "bar/img_2.jp2": { + "title": "img_2.jp2", + "roles": ["data"], + "href": f"{api_endpoint}(1234-5678)/Nodes(dummy)/Nodes(bar)/Nodes(img_2.jp2)/$value", + "type": "image/jpeg2000", + }, + } + product = plugin.discover_assets(product) + self.assertDictEqual(product.assets.as_dict(), expected_assets)