diff --git a/README.md b/README.md index 2f26260f..95387057 100644 --- a/README.md +++ b/README.md @@ -20,7 +20,7 @@ Further reading: - [OTE-API Services Documentation](https://github.com/EMMC-ASBL/oteapi-services) - [DLite](https://github.com/SINTEF/dlite) -A list and documentation for all the strategies in this plugin can be found [here](https://EMMC-ASBL.github.io/oteapi-dlite/latest/all_strategies). + ## License and copyright diff --git a/docs/all_strategies.md b/docs/all_strategies.md deleted file mode 100644 index 13e60eda..00000000 --- a/docs/all_strategies.md +++ /dev/null @@ -1,10 +0,0 @@ -# OTEAPI DLite Plugin Strategies - -This page provides documentation for the `oteapi_dlite.strategies` submodule, where all the OTEAPI DLite Plugin strategies are located. - -These strategies will be available when setting up a server in an environment with oteapi-dlite installed. - -::: oteapi_dlite.strategies - options: - show_submodules: true - show_if_no_docstring: true diff --git a/docs/api_reference/strategies/oceanlab_influx_parser.md b/docs/api_reference/strategies/oceanlab_influx_parser.md deleted file mode 100644 index 73cd9fcf..00000000 --- a/docs/api_reference/strategies/oceanlab_influx_parser.md +++ /dev/null @@ -1,5 +0,0 @@ -# oceanlab_influx_parser - -::: oteapi_dlite.strategies.oceanlab_influx_parser - options: - show_if_no_docstring: true diff --git a/docs/index.md b/docs/index.md index d25d6816..edb34259 100644 --- a/docs/index.md +++ b/docs/index.md @@ -20,7 +20,7 @@ Further reading: - [OTE-API Services Documentation](https://github.com/EMMC-ASBL/oteapi-services) - [DLite](https://github.com/SINTEF/dlite) -A list and documentation for all the strategies in this plugin can be found [here](all_strategies). + ## License and copyright diff --git a/mkdocs.yml b/mkdocs.yml index ada976cc..2d542d9d 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -84,7 +84,6 @@ nav: - Home: index.md - License: LICENSE.md - Changelog: CHANGELOG.md - - all_strategies.md - ... | api_reference/** watch: diff --git a/oteapi_dlite/strategies/oceanlab_influx_parser.py b/oteapi_dlite/strategies/oceanlab_influx_parser.py deleted file mode 100644 index cad99752..00000000 --- a/oteapi_dlite/strategies/oceanlab_influx_parser.py +++ /dev/null @@ -1,297 +0,0 @@ -"""Strategy for oceanlab data parsing from Influx DB.""" - -import logging -import sys -from typing import Annotated, Optional - -import cachetools # type: ignore -import dlite -import influxdb_client -import jinja2 -from oteapi.models import AttrDict, HostlessAnyUrl, ParserConfig, ResourceConfig -from pandas import DataFrame -from pydantic import BaseModel, Field, SecretStr -from pydantic.dataclasses import dataclass - -from oteapi_dlite.models import DLiteSessionUpdate -from oteapi_dlite.utils import get_collection, update_collection -from oteapi_dlite.utils.utils import get_meta - -if sys.version_info >= (3, 10): - from typing import Literal -else: - from typing_extensions import Literal -logger = logging.getLogger(__name__) - - -class Measurement(BaseModel): - """Measurement and field value from the Influx DB""" - - measurement: Annotated[str, Field(description="The measurement name")] - field: Annotated[ - str, Field(description="The measurement's field in the DB") - ] - - -class InfluxParseParseConfig(AttrDict): - """Configuration for DLite Excel parser.""" - - id: Annotated[ - Optional[str], Field(description="Optional id on new instance.") - ] = None - - label: Annotated[ - Optional[str], - Field( - description="Optional label for new instance in collection.", - ), - ] = "json-data" - - resourceType: Annotated[ - Optional[Literal["resource/url"]], - Field( - description=ResourceConfig.model_fields["resourceType"].description, - ), - ] = "resource/url" - downloadUrl: Annotated[ - Optional[HostlessAnyUrl], - Field( - description=ResourceConfig.model_fields["downloadUrl"].description, - ), - ] = None - mediaType: Annotated[ - Optional[str], - Field( - description=ResourceConfig.model_fields["mediaType"].description, - ), - ] = None - storage_path: Annotated[ - Optional[str], - Field( - description="Path to metadata storage", - ), - ] = None - collection_id: Annotated[ - Optional[str], Field(description="A reference to a DLite collection.") - ] = None - - url: Annotated[Optional[str], Field(description="url to the db")] = None - - USER: Annotated[Optional[str], Field(description="user to the db")] = None - - PASSWORD: Annotated[ - Optional[SecretStr], Field(description="user pwd to the db") - ] = None - - DATABASE: Annotated[Optional[str], Field(description="database name")] = ( - None - ) - - RETPOLICY: Annotated[ - Optional[str], Field(description="retpolicy for db") - ] = None - - time_range: Annotated[ - str, Field(description="timerange of values. eg : -12h") - ] = "-12h" - - size_limit: Annotated[ - int, Field(description="rows to be extracted", ge=0) - ] = 50 - - measurements: Annotated[ - list[Measurement], - Field(description="Measurement and field values as a list."), - ] = [ - Measurement(**_) - for _ in [ - { - "measurement": "ctd_conductivity_munkholmen", - "field": "conductivity", - }, - { - "measurement": "ctd_density_munkholmen", - "field": "density", - }, - { - "measurement": "ctd_salinity_munkholmen", - "field": "salinity", - }, - { - "measurement": "ctd_pressure_munkholmen", - "field": "pressure", - }, - ] - ] - - -class InfluxParseStrategyConfig(ParserConfig): - """DLite excel parse strategy config.""" - - configuration: Annotated[ - InfluxParseParseConfig, - Field( - description="DLite InfluxDB parse strategy-specific configuration." - ), - ] - - -class InfluxParserUpdate(DLiteSessionUpdate): - """Class for returning values from DLite json parser.""" - - inst_uuid: Annotated[ - str, - Field( - description="UUID of new instance.", - ), - ] - label: Annotated[ - str, - Field( - description="Label of the new instance in the collection.", - ), - ] - - -@dataclass -class InfluxParseStrategy: - """Parse strategy for Influx DB. - - **Registers strategies**: - - - `("parserType", - "influx/vnd.dlite-influx")` - - """ - - parse_config: InfluxParseStrategyConfig - - def initialize(self) -> DLiteSessionUpdate: - """Initialize.""" - collection_id = ( - self.parse_config.configuration.collection_id - or get_collection().uuid - ) - return DLiteSessionUpdate(collection_id=collection_id) - - def get(self) -> InfluxParserUpdate: - """Execute the strategy. - - This method will be called through the strategy-specific endpoint - of the OTE-API Services. - - Returns: - DLite instance. - - """ - config = self.parse_config.configuration - try: - # Update dlite storage paths if provided - if config.storage_path: - for storage_path in config.storage_path.split("|"): - dlite.storage_path.append(storage_path) - except Exception as e: - logger.error("Error during update of DLite storage path: %s", e) - raise RuntimeError("Failed to update DLite storage path.") from e - - try: - env = jinja2.Environment(loader=jinja2.BaseLoader, autoescape=True) - env.globals.update(enumerate=enumerate, str=str) - bucket = f"{config.DATABASE}/{config.RETPOLICY}" - configuration = { - "bucket": bucket, - "timeRange": config.time_range, - "limitSize": str(config.size_limit), - "measurements": [ - measurement.model_dump() - for measurement in config.measurements - ], - } - tmpl = env.from_string(TEMPLATE) - flux_query = tmpl.render(configuration).strip() - columns = query_to_df( - flux_query, - config.url, - config.USER, - config.PASSWORD.get_secret_value(), # type: ignore - ) - except Exception as e: - # Handle errors that occur during JSON parser instantiation or - # data retrieval. You can log the exception, raise a custom - # exception, or handle it as needed. For example, logging the - # error and raising a custom exception: - logger.error("Error during JSON parsing: %s", e) - raise RuntimeError("Failed to parse JSON data.") from e - - # Create DLite instance - meta = get_meta(self.parse_config.entity) - inst = meta(dims=[configuration["limitSize"]]) - - for name in [ - measurement["field"] # type: ignore - for measurement in configuration["measurements"] - ]: - inst[name] = columns[name] - inst["time"] = [ - d.strftime("%m/%d/%Y, %H:%M:%S") for d in columns["_time"] - ] - # Add collection and add the entity instance - coll = get_collection( - collection_id=self.parse_config.configuration.collection_id - ) - coll.add(config.label, inst) - update_collection(coll) - - return InfluxParserUpdate( - collection_id=coll.uuid, - inst_uuid=inst.uuid, - label=config.label, - ) - - -@cachetools.cached(cache=cachetools.LRUCache(maxsize=128)) -def query_to_df( - query: str, url: str, USER: str, PASSWORD: SecretStr -) -> DataFrame: - """query_to_df""" - with influxdb_client.InfluxDBClient( - url=url, token=f"{USER}:{PASSWORD}" - ) as client: - return client.query_api().query_data_frame(query) - - -# Define the Jinja2 template : -# This creates the query to fetch data from the -# influxdb based on the measurement and DB field. -TEMPLATE = """{% macro fetchData(measurement, field) %} - from(bucket: "{{ bucket }}") - |> range(start: -1d) - |> filter(fn: (r) => r._measurement == "{{ measurement }}") - |> filter(fn: (r) => r._field == "{{ field }}") - |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value") - |> limit(n: 50) - {% endmacro %} - - {%- for index, measurement in enumerate(measurements, 1) %} - data{{ index }} = {{ fetchData(measurement.measurement, measurement.field) }} - {%- endfor %} - - {%- for index in range(1, measurements | length) %} - join{{ index }} = join( - tables: { - left: {{ "data" + str(index) if index == 1 else "join" + str(index - 1) }}, - right: data{{ index + 1 }} - }, - on: ["_time"] - ) - {%- endfor %} - - finalData = join{{ measurements | length - 1 }} - |> keep(columns: ["_time", - {%- for measurement in measurements %} - "{{ measurement.field }}"{% if not loop.last %}, {% endif %} - {%- endfor %} - ]) - - finalData -""" diff --git a/requirements_docs.txt b/requirements_docs.txt index e1d77325..c0cfc810 100644 --- a/requirements_docs.txt +++ b/requirements_docs.txt @@ -1,3 +1,4 @@ +griffe<1.0 # This is necessary until mkdocstrings fully supports griffe v1 invoke~=2.2 mike~=2.0 mkdocs~=1.5 diff --git a/setup.cfg b/setup.cfg index c213d89e..cb619555 100644 --- a/setup.cfg +++ b/setup.cfg @@ -1,6 +1,5 @@ [options.entry_points] oteapi.parse = - oteapi_dlite.influx/vnd.dlite-influx = oteapi_dlite.strategies.oceanlab_influx_parser:InfluxParseStrategy oteapi_dlite.json/vnd.dlite-json = oteapi_dlite.strategies.parse_json:DLiteJsonStrategy oteapi.function = diff --git a/tests/strategies/test_influx_db_parser.py b/tests/strategies/test_influx_db_parser.py deleted file mode 100644 index 131fa8b4..00000000 --- a/tests/strategies/test_influx_db_parser.py +++ /dev/null @@ -1,77 +0,0 @@ -"""Test Influx db parser""" - -import unittest -from unittest.mock import MagicMock, patch - -from pydantic import ValidationError - -from oteapi_dlite.strategies.oceanlab_influx_parser import ( - InfluxParseParseConfig, - InfluxParseStrategyConfig, - query_to_df, -) - - -class TestInfluxParseParseConfig(unittest.TestCase): - """Test configuration""" - - def test_valid_config(self): - """Test configuration validity""" - config = InfluxParseParseConfig( - id="test_id", - label="test_label", - resourceType="resource/url", - downloadUrl="http://example.com", - mediaType="application/json", - storage_path="/path/to/storage", - collection_id="test_collection_id", - url="http://db.url", - USER="test_user", - PASSWORD="test_password", - DATABASE="test_db", - RETPOLICY="test_policy", - ) - assert config.id == "test_id" - assert config.label == "test_label" - - def test_invalid_config(self): - """Test validation error""" - with self.assertRaises(ValidationError): - InfluxParseParseConfig(id=123) # id should be a string or None - - -class TestInfluxParseStrategyConfig(unittest.TestCase): - """Test strategy config""" - - def test_valid_strategy_config(self): - """Test config instance""" - parse_config = InfluxParseParseConfig() - strategy_config = InfluxParseStrategyConfig( - parserType="influx/vnd.dlite-influx", - entity="http://onto-ns.com/meta/oceanlab/1/ctd_salinity_munkholmen", - configuration=parse_config, - ) - assert isinstance(strategy_config.configuration, InfluxParseParseConfig) - - -class TestInfluxParseStrategy(unittest.TestCase): - """Test functions in ParserStrategy""" - - @patch("influxdb_client.InfluxDBClient") - def test_query_to_df(self, mock_influxdb_client): - """Test query to df""" - mock_client = MagicMock() - mock_query_api = MagicMock() - mock_client.query_api.return_value = mock_query_api - mock_influxdb_client.return_value.__enter__.return_value = mock_client - - mock_df = MagicMock() - mock_query_api.query_data_frame.return_value = mock_df - - result = query_to_df( - "test_query", "http://db.url", "test_user", "test_password" - ) - assert result == mock_df - mock_influxdb_client.assert_called_once_with( - url="http://db.url", token="test_user:test_password" - )