From abbac568c8fe97aab6df2f2d0621f62e1eaaf8c6 Mon Sep 17 00:00:00 2001 From: Matt Goldberg Date: Thu, 21 Nov 2024 08:51:18 -0500 Subject: [PATCH] shacl utilities: Add new SHACL path building utility with corresponding tests --- rdflib/extras/shacl.py | 119 +++++++++++++++++++++++++- test/test_extras/test_shacl_extras.py | 51 ++++++++++- 2 files changed, 167 insertions(+), 3 deletions(-) diff --git a/rdflib/extras/shacl.py b/rdflib/extras/shacl.py index e4f873572..bf7f4b92c 100644 --- a/rdflib/extras/shacl.py +++ b/rdflib/extras/shacl.py @@ -6,18 +6,28 @@ from typing import TYPE_CHECKING -from rdflib import Graph, Literal, URIRef, paths +from rdflib import BNode, Graph, Literal, URIRef, paths +from rdflib.collection import Collection from rdflib.namespace import RDF, SH from rdflib.paths import Path if TYPE_CHECKING: from rdflib.graph import _ObjectType + from rdflib.term import IdentifiedNode class SHACLPathError(Exception): pass +# Map the variable length path operators to the corresponding SHACL path predicates +_PATH_MOD_TO_PRED = { + paths.ZeroOrMore: SH.zeroOrMorePath, + paths.OneOrMore: SH.oneOrMorePath, + paths.ZeroOrOne: SH.zeroOrOnePath, +} + + # This implementation is roughly based on # pyshacl.helper.sparql_query_helper::SPARQLQueryHelper._shacl_path_to_sparql_path def parse_shacl_path( @@ -93,3 +103,110 @@ def parse_shacl_path( raise SHACLPathError(f"Cannot parse {repr(path_identifier)} as a SHACL Path.") return path + + +def _build_path_component( + graph: Graph, path_component: URIRef | Path +) -> IdentifiedNode: + """ + Helper method that implements the recursive component of SHACL path + triple construction. + + :param graph: A :class:`~rdflib.graph.Graph` into which to insert triples + :param graph_component: A :class:`~rdflib.term.URIRef` or + :class:`~rdflib.paths.Path` that is part of a path expression + :return: The :class:`~rdflib.term.IdentifiedNode of the resource in the + graph that corresponds to the provided `path_component + """ + # Literals or other types are not allowed + if not isinstance(path_component, (URIRef, Path)): + raise TypeError( + f"Objects of type {type(path_component)} are not valid " + + "components of a SHACL path." + ) + + # If the path component is a URI, return it + elif isinstance(path_component, URIRef): + return path_component + # Otherwise, the path component is represented as a blank node + bnode = BNode() + + # Handle Sequence Paths + if isinstance(path_component, paths.SequencePath): + # Sequence paths are a Collection directly with at least two items + if len(path_component.args) < 2: + raise SHACLPathError( + "A list of SHACL Sequence Paths must contain at least two path items." + ) + Collection( + graph, + bnode, + [_build_path_component(graph, arg) for arg in path_component.args], + ) + + # Handle Inverse Paths + elif isinstance(path_component, paths.InvPath): + graph.add( + (bnode, SH.inversePath, _build_path_component(graph, path_component.arg)) + ) + + # Handle Alternative Paths + elif isinstance(path_component, paths.AlternativePath): + # Alternative paths are a Collection but referenced by sh:alternativePath + # with at least two items + if len(path_component.args) < 2: + raise SHACLPathError( + "List of SHACL alternate paths must have at least two path items." + ) + coll = Collection( + graph, + BNode(), + [_build_path_component(graph, arg) for arg in path_component.args], + ) + graph.add((bnode, SH.alternativePath, coll.uri)) + + # Handle Variable Length Paths + elif isinstance(path_component, paths.MulPath): + # Get the predicate corresponding to the path modifiier + pred = _PATH_MOD_TO_PRED.get(path_component.mod) + if pred is None: + raise SHACLPathError(f"Unknown path modifier {path_component.mod}") + graph.add((bnode, pred, _build_path_component(graph, path_component.path))) + + # Return the blank node created for the provided path_component + return bnode + + +def build_shacl_path( + path: URIRef | Path, target_graph: Graph | None = None +) -> tuple[IdentifiedNode, Graph | None]: + """ + Build the SHACL Path triples for a path given by a :class:`~rdflib.term.URIRef` for + simple paths or a :class:`~rdflib.paths.Path` for complex paths. + + Returns an :class:`~rdflib.term.IdentifiedNode` for the path (which should be + the object of a triple with predicate `sh:path`) and the graph into which any + new triples were added. + + :param path: A :class:`~rdflib.term.URIRef` or a :class:`~rdflib.paths.Path` + :param target_graph: Optionally, a :class:`~rdflib.graph.Graph` into which to put + constructed triples. If not provided, a new graph will be created + :return: A (`path_identifier`, `graph`) tuple where: + - `path_identifier`: If `path` is a :class:`~rdflib.term.URIRef`, this is simply + the provided `path`. If `path` is a :class:`~rdflib.paths.Path`, this is + the :class:`~rdflib.term.BNode` corresponding to the root of the SHACL + path expression added to the graph. + - `graph`: `None` if `path` is a :class:`~rdflib.term.URIRef` (as no new triples + are constructed). If `path` is a :class:`~rdflib.paths.Path`, this is either the + `target_graph` provided or a new graph into which the path triples were added. + """ + # If a path is a URI, that's the whole path. No graph needs to be constructed. + if isinstance(path, URIRef): + return path, None + + # Create a graph if one was not provided + if target_graph is None: + target_graph = Graph() + + # Recurse through the path to build the graph representation + return _build_path_component(target_graph, path), target_graph diff --git a/test/test_extras/test_shacl_extras.py b/test/test_extras/test_shacl_extras.py index 33d3c892a..9fe5d6d36 100644 --- a/test/test_extras/test_shacl_extras.py +++ b/test/test_extras/test_shacl_extras.py @@ -4,8 +4,9 @@ import pytest -from rdflib import Graph, URIRef -from rdflib.extras.shacl import SHACLPathError, parse_shacl_path +from rdflib import Graph, Literal, URIRef, paths +from rdflib.compare import graph_diff +from rdflib.extras.shacl import SHACLPathError, build_shacl_path, parse_shacl_path from rdflib.namespace import SH, Namespace from rdflib.paths import Path @@ -248,3 +249,49 @@ def test_parse_shacl_path( parse_shacl_path(path_source_data, path_root) # type: ignore[arg-type] else: assert parse_shacl_path(path_source_data, path_root) == expected # type: ignore[arg-type] + + +@pytest.mark.parametrize( + ("resource", "path"), + ( + # Single SHACL Path + (EX.TestPropShape1, EX.pred1), + (EX.TestPropShape2a, EX.pred1 / EX.pred2 / EX.pred3), + (EX.TestPropShape3, ~EX.pred1), + (EX.TestPropShape4a, EX.pred1 | EX.pred2 | EX.pred3), + (EX.TestPropShape5, EX.pred1 * "*"), # type: ignore[operator] + (EX.TestPropShape6, EX.pred1 * "+"), # type: ignore[operator] + (EX.TestPropShape7, EX.pred1 * "?"), # type: ignore[operator] + # SHACL Path Combinations + (EX.TestPropShape8, ~EX.pred1 * "*"), + ( + EX.TestPropShape10a, + ~EX.pred1 + * "*" + / (~EX.pred1 * "*" | EX.pred1 | EX.pred2 * "+" | EX.pred3 * "*"), # type: ignore[operator] + ), + (TypeError, Literal("Not a valid path")), + (SHACLPathError, paths.SequencePath(SH.targetClass)), + (SHACLPathError, paths.AlternativePath(SH.targetClass)), + ), +) +def test_build_shacl_path( + path_source_data: Graph, resource: URIRef | type, path: Union[URIRef, Path] +): + if isinstance(resource, type): + with pytest.raises(resource): # type: ignore[arg-type] + build_shacl_path(path) + else: + expected_path_root = path_source_data.value(resource, SH.path) + actual_path_root, actual_path_graph = build_shacl_path(path) + if isinstance(expected_path_root, URIRef): + assert actual_path_root == expected_path_root + assert actual_path_graph is None + else: + assert isinstance(actual_path_graph, Graph) + expected_path_graph = path_source_data.cbd(expected_path_root) # type: ignore[arg-type] + in_both, in_first, in_second = graph_diff( + expected_path_graph, actual_path_graph + ) + assert len(in_first) == 0 + assert len(in_second) == 0