Skip to content

Commit

Permalink
Merge pull request #4662 from opsmill/dga-20241018-metric-context
Browse files Browse the repository at this point in the history
Add support for additional context to the metrics for query execution time
  • Loading branch information
dgarros authored Oct 25, 2024
2 parents 59f6700 + 0e78474 commit cc53b44
Show file tree
Hide file tree
Showing 7 changed files with 54 additions and 12 deletions.
6 changes: 3 additions & 3 deletions backend/infrahub/core/protocols_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,15 +44,15 @@ async def transaction(self, name: Optional[str]) -> AsyncTransaction: ...
async def close(self) -> None: ...

async def execute_query(
self, query: str, params: Optional[dict[str, Any]] = None, name: Optional[str] = "undefined"
self, query: str, params: Optional[dict[str, Any]] = None, name: str = "undefined"
) -> list[Record]: ...

async def execute_query_with_metadata(
self, query: str, params: Optional[dict[str, Any]] = None, name: Optional[str] = "undefined"
self, query: str, params: Optional[dict[str, Any]] = None, name: str = "undefined"
) -> tuple[list[Record], dict[str, Any]]: ...

async def run_query(
self, query: str, params: Optional[dict[str, Any]] = None, name: Optional[str] = "undefined"
self, query: str, params: Optional[dict[str, Any]] = None, name: str = "undefined"
) -> AsyncResult: ...

def render_list_comprehension(self, items: str, item_name: str) -> str: ...
Expand Down
12 changes: 10 additions & 2 deletions backend/infrahub/core/query/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -406,6 +406,11 @@ async def init(
async def query_init(self, db: InfrahubDatabase, **kwargs: Any) -> None:
raise NotImplementedError

def get_context(self) -> dict[str, str]:
"""Provide additional context for this query, beyond the name.
Right now it's mainly used to add more labels to the metrics."""
return {}

def add_to_query(self, query: Union[str, list[str]]) -> None:
"""Add a new section at the end of the query.
Expand Down Expand Up @@ -529,13 +534,15 @@ async def execute(self, db: InfrahubDatabase) -> Self:

if self.type == QueryType.READ:
if self.limit or self.offset:
results = await db.execute_query(query=query_str, params=self.params, name=self.name)
results = await db.execute_query(
query=query_str, params=self.params, name=self.name, context=self.get_context()
)
else:
results = await self.query_with_size_limit(db=db)

elif self.type == QueryType.WRITE:
results, metadata = await db.execute_query_with_metadata(
query=query_str, params=self.params, name=self.name
query=query_str, params=self.params, name=self.name, context=self.get_context()
)
if "stats" in metadata:
self.stats.add(metadata.get("stats"))
Expand All @@ -560,6 +567,7 @@ async def query_with_size_limit(self, db: InfrahubDatabase) -> list[Record]:
query=self.get_query(limit=query_limit, offset=offset),
params=self.params,
name=self.name,
context=self.get_context(),
)
if "stats" in metadata:
self.stats.add(metadata.get("stats"))
Expand Down
3 changes: 3 additions & 0 deletions backend/infrahub/core/validators/uniqueness/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ def __init__(
self.min_count_required = min_count_required
super().__init__(**kwargs)

def get_context(self) -> dict[str, str]:
return {"kind": self.query_request.kind}

async def query_init(self, db: InfrahubDatabase, **kwargs: Any) -> None:
branch_filter, branch_params = self.branch.get_query_filter_path(at=self.at.to_string(), is_isolated=False)
self.params.update(branch_params)
Expand Down
36 changes: 31 additions & 5 deletions backend/infrahub/database/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -297,31 +297,57 @@ async def close(self) -> None:
await self._driver.close()

async def execute_query(
self, query: str, params: Optional[dict[str, Any]] = None, name: Optional[str] = "undefined"
self,
query: str,
params: dict[str, Any] | None = None,
name: str = "undefined",
context: dict[str, str] | None = None,
) -> list[Record]:
results, _ = await self.execute_query_with_metadata(query=query, params=params, name=name)
results, _ = await self.execute_query_with_metadata(query=query, params=params, name=name, context=context)
return results

async def execute_query_with_metadata(
self, query: str, params: Optional[dict[str, Any]] = None, name: Optional[str] = "undefined"
self,
query: str,
params: dict[str, Any] | None = None,
name: str = "undefined",
context: dict[str, str] | None = None,
) -> tuple[list[Record], dict[str, Any]]:
with trace.get_tracer(__name__).start_as_current_span("execute_db_query_with_metadata") as span:
span.set_attribute("query", query)
if name:
span.set_attribute("query_name", name)

runtime = Neo4jRuntime.UNDEFINED

try:
query_config = self.queries_names_to_config[name]
if self.db_type == DatabaseType.NEO4J:
runtime = self.queries_names_to_config[name].neo4j_runtime
if runtime != Neo4jRuntime.DEFAULT:
if runtime not in [Neo4jRuntime.DEFAULT, Neo4jRuntime.UNDEFINED]:
query = f"CYPHER runtime = {runtime.value}\n" + query
if query_config.profile_memory:
query = "PROFILE\n" + query
except KeyError:
pass # No specific config for this query

with QUERY_EXECUTION_METRICS.labels(self._session_mode.value, name).time():
labels = {
"type": self._session_mode.value,
"query": name,
"runtime": runtime.value,
"context1": "",
"context2": "",
}
if context:
labels.update(
{
f"context{idx + 1}": f"{key}__{value}"
for idx, (key, value) in enumerate(context.items())
if idx <= 1
}
)

with QUERY_EXECUTION_METRICS.labels(**labels).time():
response = await self.run_query(query=query, params=params, name=name)
results = [item async for item in response]
return results, response._metadata or {}
Expand Down
1 change: 1 addition & 0 deletions backend/infrahub/database/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ class Neo4jRuntime(str, Enum):
SLOTTED = "slotted"
PIPELINED = "pipelined"
PARALLEL = "parallel"
UNDEFINED = "undefined"


class IndexType(str, Enum):
Expand Down
2 changes: 1 addition & 1 deletion backend/infrahub/database/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
QUERY_EXECUTION_METRICS = Histogram(
f"{METRIC_PREFIX}_query_execution_seconds",
"Execution time to query the database",
labelnames=["type", "query"],
labelnames=["type", "query", "runtime", "context1", "context2"],
buckets=[0.005, 0.01, 0.02, 0.03, 0.04, 0.05, 0.1, 0.5, 1],
)

Expand Down
6 changes: 5 additions & 1 deletion backend/tests/helpers/query_benchmark/db_query_profiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,11 @@ def get_context(self) -> dict[str, Any]:
return ctx

async def execute_query_with_metadata(
self, query: str, params: dict[str, Any] | None = None, name: str | None = "undefined"
self,
query: str,
params: dict[str, Any] | None = None,
name: str = "undefined",
context: dict[str, str] | None = None,
) -> tuple[list[Record], dict[str, Any]]:
if not self.profiling_enabled:
# Profiling might be disabled to avoid capturing queries while loading data
Expand Down

0 comments on commit cc53b44

Please sign in to comment.