Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add influx parser (#10) #13

Merged
merged 34 commits into from
Jul 11, 2024
Merged
Show file tree
Hide file tree
Changes from 22 commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
ee9fc41
add influx parser (#10)
Treesarj Jul 10, 2024
281fc8c
Update test_influx_db_parser.py
Treesarj Jul 10, 2024
7e40f59
Update ci_tests.yml for jinja2 safety fail
Treesarj Jul 10, 2024
ab1b3d9
Apply suggestions from code review
Treesarj Jul 10, 2024
a2a90cc
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Jul 10, 2024
ece677f
code review changes
Treesarj Jul 10, 2024
e5a5b78
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Jul 10, 2024
6de8ce4
fix imports
Treesarj Jul 10, 2024
05e658b
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Jul 10, 2024
f138f9c
code review suggestions
Treesarj Jul 10, 2024
048141d
Merge branch '12-merge-in-influx-db-parser-and-tests' of github.com:S…
Treesarj Jul 10, 2024
7eef732
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Jul 10, 2024
b93bf78
fix models
Treesarj Jul 10, 2024
ff2c003
Update oceanlab_influx_parser.py
Treesarj Jul 10, 2024
9f50a8d
Update oceanlab_influx_parser.py
Treesarj Jul 10, 2024
e26c51a
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Jul 10, 2024
2040ede
fix models
Treesarj Jul 10, 2024
d9ea89a
Merge branch '12-merge-in-influx-db-parser-and-tests' of github.com:S…
Treesarj Jul 10, 2024
b63bc64
fix models
Treesarj Jul 10, 2024
13bfdab
remove unused imports
Treesarj Jul 10, 2024
bf2fde1
fix line too long
Treesarj Jul 10, 2024
26695ae
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Jul 10, 2024
9522f75
Apply suggestions from code review
Treesarj Jul 10, 2024
817b038
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Jul 10, 2024
721d114
fix logging and models
Treesarj Jul 10, 2024
1773af0
remove print statements
Treesarj Jul 10, 2024
bd30546
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Jul 10, 2024
8ac1339
Update .pre-commit-config.yaml
Treesarj Jul 11, 2024
5d74161
Update .pre-commit-config.yaml
Treesarj Jul 11, 2024
71252ca
Update .pre-commit-config.yaml
Treesarj Jul 11, 2024
5d22078
fix pre-commit errors
Treesarj Jul 11, 2024
b12a630
fix pylint errors
Treesarj Jul 11, 2024
e6db3c4
Update oceanlab_influx_parser.py
Treesarj Jul 11, 2024
c41973d
fix pre-commit errors
Treesarj Jul 11, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/ci_tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ jobs:
# Remove ignoring 48547 as soon as RDFLib/rdflib#1844 has been fixed and the fix
# has been released.
- name: Run safety
run: pip freeze | safety check --stdin --ignore 44715 --ignore 48547
run: pip freeze | safety check --stdin --ignore 44715 --ignore 48547 --ignore=70612

pytest-linux:
runs-on: ubuntu-latest
Expand Down
5 changes: 5 additions & 0 deletions docs/api_reference/strategies/oceanlab_influx_parser.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
# oceanlab_influx_parser

::: oteapi_dlite.strategies.oceanlab_influx_parser
options:
show_if_no_docstring: true
2 changes: 1 addition & 1 deletion oteapi_dlite/strategies/generate.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
"""Generic generate strategy using DLite storage plugin."""

# pylint: disable=unused-argument,invalid-name
# pylint: disable=unused-argument,invalid-name,possibly-used-before-assignment
import tempfile
from typing import TYPE_CHECKING, Annotated, Optional

Expand Down
279 changes: 279 additions & 0 deletions oteapi_dlite/strategies/oceanlab_influx_parser.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,279 @@
"""Strategy for oceanlab data parsing from Influx DB."""

import sys
from typing import Annotated, Optional

import cachetools # type: ignore
import dlite
import influxdb_client
import jinja2
from fastapi import logger
Treesarj marked this conversation as resolved.
Show resolved Hide resolved
from oteapi.models import AttrDict, HostlessAnyUrl, ParserConfig, ResourceConfig
from pandas import DataFrame
from pydantic import Field, SecretStr
from pydantic.dataclasses import dataclass

from oteapi_dlite.models import DLiteSessionUpdate
from oteapi_dlite.utils import get_collection, update_collection
from oteapi_dlite.utils.utils import get_meta

if sys.version_info >= (3, 10):
from typing import Literal
else:
from typing_extensions import Literal


class InfluxParseParseConfig(AttrDict):
"""Configuration for DLite Excel parser."""

id: Annotated[
Optional[str], Field(description="Optional id on new instance.")
] = None

label: Annotated[
Optional[str],
Field(
description="Optional label for new instance in collection.",
),
] = "json-data"

resourceType: Annotated[
Optional[Literal["resource/url"]],
Field(
description=ResourceConfig.model_fields["resourceType"].description,
),
] = "resource/url"
downloadUrl: Annotated[
Optional[HostlessAnyUrl],
Field(
description=ResourceConfig.model_fields["downloadUrl"].description,
),
] = None
mediaType: Annotated[
Optional[str],
Field(
description=ResourceConfig.model_fields["mediaType"].description,
),
] = None
storage_path: Annotated[
Optional[str],
Field(
description="Path to metadata storage",
),
] = None
collection_id: Annotated[
Optional[str], Field(description="A reference to a DLite collection.")
] = None

url: Annotated[Optional[str], Field(description="url to the db")] = None

USER: Annotated[Optional[str], Field(description="user to the db")] = None

PASSWORD: Annotated[
Optional[SecretStr], Field(description="user pwd to the db")
] = None

DATABASE: Annotated[Optional[str], Field(description="database name")] = (
None
)

RETPOLICY: Annotated[
Optional[str], Field(description="retpolicy for db")
] = None

time_range: Annotated[
str, Field(description="timerange of values. eg : -12h")
] = "-12h"

size_limit: Annotated[str, Field(description="rows to be extracted")] = "50"
Treesarj marked this conversation as resolved.
Show resolved Hide resolved

measurements: Annotated[
list[dict],
Field(description="Measurement and field values as list of dictionary"),
] = [
{
"measurement": "ctd_conductivity_munkholmen",
"field": "conductivity",
},
{
"measurement": "ctd_density_munkholmen",
"field": "density",
},
{
"measurement": "ctd_salinity_munkholmen",
"field": "salinity",
},
{
"measurement": "ctd_pressure_munkholmen",
"field": "pressure",
},
]
Treesarj marked this conversation as resolved.
Show resolved Hide resolved


class InfluxParseStrategyConfig(ParserConfig):
"""DLite excel parse strategy config."""

configuration: Annotated[
InfluxParseParseConfig,
Field(
description="DLite InfluxDB parse strategy-specific configuration."
),
]


class InfluxParserUpdate(DLiteSessionUpdate):
"""Class for returning values from DLite json parser."""

inst_uuid: Annotated[
str,
Field(
description="UUID of new instance.",
),
]
label: Annotated[
str,
Field(
description="Label of the new instance in the collection.",
),
]


@dataclass
class InfluxParseStrategy:
"""Parse strategy for Influx DB.

**Registers strategies**:

- `("parserType",
"influx/vnd.dlite-influx")`

"""

parse_config: InfluxParseStrategyConfig

def initialize(self) -> DLiteSessionUpdate:
"""Initialize."""
collection_id = (
self.parse_config.configuration.collection_id
or get_collection().uuid
)
return DLiteSessionUpdate(collection_id=collection_id)

def get(self) -> InfluxParserUpdate:
"""Execute the strategy.

This method will be called through the strategy-specific endpoint
of the OTE-API Services.

Returns:
DLite instance.

"""
config = self.parse_config.configuration
try:
# Update dlite storage paths if provided
if config.storage_path:
for storage_path in config.storage_path.split("|"):
dlite.storage_path.append(storage_path)
except Exception as e:
logger.error(f"Error during update of DLite storage path: {e}")
raise RuntimeError("Failed to update DLite storage path.") from e

try:
env = jinja2.Environment(loader=jinja2.BaseLoader, autoescape=True)
env.globals.update(enumerate=enumerate, str=str)
bucket = f"{config.DATABASE}/{config.RETPOLICY}"
configuration = {
"bucket": bucket,
"timeRange": config.time_range,
"limitSize": config.size_limit,
Treesarj marked this conversation as resolved.
Show resolved Hide resolved
"measurements": config.measurements,
Treesarj marked this conversation as resolved.
Show resolved Hide resolved
}
tmpl = env.from_string(TEMPLATE)
flux_query = tmpl.render(configuration).strip()
columns = query_to_df(
flux_query,
config.url,
config.USER,
config.PASSWORD.get_secret_value(), # type: ignore
)
except Exception as e:
# Handle errors that occur during JSON parser instantiation or
# data retrieval. You can log the exception, raise a custom
# exception, or handle it as needed. For example, logging the
# error and raising a custom exception:
logger.error(f"Error during JSON parsing: {e}")
raise RuntimeError("Failed to parse JSON data.") from e

# Create DLite instance
meta = get_meta(self.parse_config.entity)
inst = meta(dims=[configuration["limitSize"]])

for name in [
measurement["field"] # type: ignore
for measurement in configuration["measurements"]
]:
inst[name] = columns[name]
inst["time"] = [
d.strftime("%m/%d/%Y, %H:%M:%S") for d in columns["_time"]
]
# Add collection and add the entity instance
coll = get_collection(
collection_id=self.parse_config.configuration.collection_id
)
coll.add(config.label, inst)
update_collection(coll)

return InfluxParserUpdate(
collection_id=coll.uuid,
inst_uuid=inst.uuid,
label=config.label,
)


@cachetools.cached(cache=cachetools.LRUCache(maxsize=128))
def query_to_df(
query: str, url: str, USER: str, PASSWORD: SecretStr
) -> DataFrame:
"""query_to_df"""
with influxdb_client.InfluxDBClient(
url=url, token=f"{USER}:{PASSWORD}"
) as client:
return client.query_api().query_data_frame(query)


# Define the Jinja2 template :
# This creates the query to fetchdata from the
Treesarj marked this conversation as resolved.
Show resolved Hide resolved
# influxdb based on the measurement and DB field.
TEMPLATE = """{% macro fetchData(measurement, field) %}
from(bucket: "{{ bucket }}")
|> range(start: -1d)
|> filter(fn: (r) => r._measurement == "{{ measurement }}")
|> filter(fn: (r) => r._field == "{{ field }}")
|> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")
|> limit(n: 50)
{% endmacro %}

{%- for index, measurement in enumerate(measurements, 1) %}
data{{ index }} = {{ fetchData(measurement.measurement, measurement.field) }}
{%- endfor %}

{%- for index in range(1, measurements | length) %}
join{{ index }} = join(
tables: {
left: {{ "data" + str(index) if index == 1 else "join" + str(index - 1) }},
right: data{{ index + 1 }}
},
on: ["_time"]
)
{%- endfor %}

finalData = join{{ measurements | length - 1 }}
|> keep(columns: ["_time",
{%- for measurement in measurements %}
"{{ measurement.field }}"{% if not loop.last %}, {% endif %}
{%- endfor %}
])

finalData
"""
9 changes: 7 additions & 2 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,7 +1,12 @@
cachetools>=5.3.3
DLite-Python>=0.4.5,<1.0
fastapi>=0.111.0
influxdb_client>=1.44.0
jinja2>=3.1.4
numpy>=1.21,<2
oteapi-core @ git+https://github.com/EMMC-ASBL/oteapi-core@master
pandas>=2.2.2
Pillow>=9.0.1,<11
# psycopg2-binary!=2.9.6
# pydantic>=2.4.2,<3 # Indirect requirement
rdflib>=7.0.0
SPARQLWrapper>=2.0.0
tripper==0.2.15
31 changes: 1 addition & 30 deletions setup.cfg
Original file line number Diff line number Diff line change
@@ -1,39 +1,10 @@
[options.entry_points]
oteapi.parse =
oteapi_dlite.application/vnd.dlite-parse = oteapi_dlite.strategies.parse:DLiteParseStrategy
#oteapi_dlite.application/json = oteapi_dlite.strageties.parse:DLiteParseStrategy
#oteapi_dlite.application/yaml = oteapi_dlite.strategies.parse:DLiteParseStrategy
#oteapi_dlite.application/x-hdf5 = oteapi_dlite.strategies.parse:DLiteParseStrategy
#oteapi_dlite.application/octet-stream = oteapi_dlite.strategies.parse:DLiteParseStrategy
#oteapi_dlite.application/bson = oteapi_dlite.strategies.parse:DLiteParseStrategy
#oteapi_dlite.application/x-sqlite = oteapi_dlite.strategies.parse:DLiteParseStrategy
#oteapi_dlite.application/n-triples = oteapi_dlite.strategies.parse:DLiteParseStrategy
#oteapi_dlite.text/turtle = oteapi_dlite.strategies.parse:DLiteParseStrategy
#oteapi_dlite.text/csv = oteapi_dlite.strategies.parse:DLiteParseStrategy
#oteapi_dlite.image/gif = oteapi_dlite.strategies.parse_image:DLiteImageParseStrategy
#oteapi_dlite.image/jpeg = oteapi_dlite.strategies.parse_image:DLiteImageParseStrategy
#oteapi_dlite.image/jpg = oteapi_dlite.strategies.parse_image:DLiteImageParseStrategy
#oteapi_dlite.image/png = oteapi_dlite.strategies.parse_image:DLiteImageParseStrategy
#oteapi_dlite.image/tiff = oteapi_dlite.strategies.parse_image:DLiteImageParseStrategy

oteapi_dlite.application/vnd.dlite-xlsx = oteapi_dlite.strategies.parse_excel:DLiteExcelStrategy
oteapi_dlite.influx/vnd.dlite-influx = oteapi_dlite.strategies.oceanlab_influx_parser:InfluxParseStrategy
oteapi_dlite.json/vnd.dlite-json = oteapi_dlite.strategies.parse_json:DLiteJsonStrategy
oteapi_dlite.image/vnd.dlite-image = oteapi_dlite.strategies.parse_image:DLiteImageParseStrategy

# To be removed
oteapi_dlite.application/vnd.dlite-json = oteapi_dlite.strategies.parse:DLiteParseStrategy
oteapi_dlite.application/vnd.dlite-yaml = oteapi_dlite.strategies.parse:DLiteParseStrategy
oteapi_dlite.image/vnd.dlite-gif = oteapi_dlite.strategies.parse_image:DLiteImageParseStrategy
oteapi_dlite.image/vnd.dlite-jpeg = oteapi_dlite.strategies.parse_image:DLiteImageParseStrategy
oteapi_dlite.image/vnd.dlite-jpg = oteapi_dlite.strategies.parse_image:DLiteImageParseStrategy
oteapi_dlite.image/vnd.dlite-jp2 = oteapi_dlite.strategies.parse_image:DLiteImageParseStrategy
oteapi_dlite.image/vnd.dlite-png = oteapi_dlite.strategies.parse_image:DLiteImageParseStrategy
oteapi_dlite.image/vnd.dlite-tiff = oteapi_dlite.strategies.parse_image:DLiteImageParseStrategy


oteapi.function =
oteapi_dlite.application/vnd.dlite-generate = oteapi_dlite.strategies.generate:DLiteGenerateStrategy
oteapi_dlite.application/vnd.dlite-convert = oteapi_dlite.strategies.convert:DLiteConvertStrategy

oteapi.mapping =
oteapi_dlite.mappings = oteapi_dlite.strategies.mapping:DLiteMappingStrategy
Expand Down
28 changes: 28 additions & 0 deletions tests/entities/salinity.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
{
"uri": "http://onto-ns.com/meta/oceanlab/1/ctd_salinity_munkholmen",
"description": "ctd_salinity_munkholmen",
"dimensions": [
{
"name": "N",
"description": "Number of samples"
}
],
"properties": [
{
"name": "salinity",
"type": "float",
"dims": [
"N"
],
"description": "salinity"
},
{
"name": "time",
"type": "string",
"dims": [
"N"
],
"description": "timestamp"
}
]
}
Loading
Loading