From 6bc1daf2284217b4cc454d4057f0d6487b9bc7f4 Mon Sep 17 00:00:00 2001 From: wouter Date: Fri, 25 Oct 2024 13:36:17 +0200 Subject: [PATCH 01/11] temp --- src/inmanta/env.py | 2 +- src/inmanta/server/protocol.py | 2 +- .../server/services/databaseservice.py | 151 ++++++++++++------ src/inmanta/util/__init__.py | 30 +++- tests/server/test_databaseservice.py | 4 +- 5 files changed, 127 insertions(+), 62 deletions(-) diff --git a/src/inmanta/env.py b/src/inmanta/env.py index 37e88b2d4b..9bc610b882 100644 --- a/src/inmanta/env.py +++ b/src/inmanta/env.py @@ -1135,7 +1135,7 @@ def are_installed(self, requirements: req_list) -> bool: Return True iff the given requirements are installed in this environment. """ assert self.is_using_virtual_env() - return PythonWorkingSet.are_installed(requirements) + return PythonWorkingSet.are_instadelled(requirements) def install_for_config( self, diff --git a/src/inmanta/server/protocol.py b/src/inmanta/server/protocol.py index 9ad4968abd..7de393de63 100644 --- a/src/inmanta/server/protocol.py +++ b/src/inmanta/server/protocol.py @@ -327,7 +327,7 @@ def schedule( :quiet_mode: Set to true to disable logging the recurring notification that the action is being called. Use this to avoid polluting the server log for very frequent actions. """ - self._sched.add_action(call, IntervalSchedule(interval, initial_delay), cancel_on_stop, quiet_mode) + self._sched.schedule(call, interval, initial_delay, cancel_on_stop, quiet_mode) def schedule_cron(self, call: TaskMethod, cron: str, cancel_on_stop: bool = True) -> None: """ diff --git a/src/inmanta/server/services/databaseservice.py b/src/inmanta/server/services/databaseservice.py index 7b84194b3a..b4263c39c9 100644 --- a/src/inmanta/server/services/databaseservice.py +++ b/src/inmanta/server/services/databaseservice.py @@ -16,11 +16,12 @@ Contact: code@inmanta.com """ +import dataclasses import logging from typing import Optional import asyncpg -from pyformance import gauge +from pyformance import gauge, global_registry from pyformance.meters import CallbackGauge from inmanta import data, util @@ -28,21 +29,111 @@ from inmanta.server import config as opt from inmanta.server import protocol from inmanta.types import ArgumentTypes +from inmanta.util import Scheduler LOGGER = logging.getLogger(__name__) +@dataclasses.dataclass +class DataBaseReport: + + connected: bool + database: str + host: str + max_pool: int + open_connections: int + free_connections: int + pool_exhaustion_count: int + + +class DatabaseMonitor: + + def __init__(self, pool: asyncpg.pool.Pool, db_name: str, db_host: str) -> None: + self._pool = pool + self._scheduler = Scheduler(f"Database monitor for {db_name}") + self._db_pool_watcher = util.ExhaustedPoolWatcher(self._pool) + self.dn_name = db_name + self.db_host = db_host + + def start(self) -> None: + self.start_monitor() + + # Schedule database pool exhaustion watch: + # Check for pool exhaustion every 200 ms + self._scheduler.schedule(self._check_database_pool_exhaustion, interval=0.2, cancel_on_stop=True, quiet_mode=True) + # Report pool exhaustion every 24h + self._scheduler.schedule(self._report_database_pool_exhaustion, interval=3_600 * 24, cancel_on_stop=True) + + async def stop(self) -> None: + self.stop_monitor() + await self._scheduler.stop() + + async def get_status(self) -> DataBaseReport: + """Get the status of the database connection""" + connected = await self.get_connection_status() + + return DataBaseReport( + connected=connected, + database=self.dn_name, + host=self.db_host, + max_pool=self._pool.get_max_size(), + open_connections=self._pool.get_size(), + free_connections=self._pool.get_idle_size(), + pool_exhaustion_count=self._db_pool_watcher._exhausted_pool_events_count, + ) + + def start_monitor(self) -> None: + """Attach to monitoring system""" + gauge( + "db.connected", + CallbackGauge( + callback=lambda: 1 if (self._pool is not None and not self._pool._closing and not self._pool._closed) else 0 + ), + ) + gauge("db.max_pool", CallbackGauge(callback=lambda: self._pool.get_max_size() if self._pool is not None else 0)) + gauge("db.open_connections", CallbackGauge(callback=lambda: self._pool.get_size() if self._pool is not None else 0)) + gauge( + "db.free_connections", CallbackGauge(callback=lambda: self._pool.get_idle_size() if self._pool is not None else 0) + ) + gauge("db.pool_exhaustion_count", CallbackGauge(callback=lambda: self._db_pool_watcher._exhausted_pool_events_count)) + + def stop_monitor(self): + global_registry()._gauges.pop("db.connected", None) + global_registry()._gauges.pop("db.max_pool", None) + global_registry()._gauges.pop("db.open_connections", None) + global_registry()._gauges.pop("db.free_connections", None) + global_registry()._gauges.pop("db.pool_exhaustion_count", None) + # Add sub-tagging + # Tag + + async def get_connection_status(self) -> bool: + if self._pool is not None and not self._pool._closing and not self._pool._closed: + try: + async with self._pool.acquire(timeout=10): + return True + except Exception: + LOGGER.exception("Connection to PostgreSQL failed") + return False + + async def _report_database_pool_exhaustion(self) -> None: + assert self._db_pool_watcher is not None # Make mypy happy + self._db_pool_watcher.report_and_reset(LOGGER) + + async def _check_database_pool_exhaustion(self) -> None: + assert self._db_pool_watcher is not None # Make mypy happy + self._db_pool_watcher.check_for_pool_exhaustion() + + class DatabaseService(protocol.ServerSlice): """Slice to initialize the database""" def __init__(self) -> None: super().__init__(SLICE_DATABASE) self._pool: Optional[asyncpg.pool.Pool] = None - self._db_pool_watcher: Optional[util.ExhaustedPoolWatcher] = None + self._db_monitor: Optional[DatabaseMonitor] = None async def start(self) -> None: await super().start() - self.start_monitor() await self.connect_database() # Schedule cleanup agentprocess and agentinstance tables @@ -53,16 +144,13 @@ async def start(self) -> None: ) assert self._pool is not None # Make mypy happy - self._db_pool_watcher = util.ExhaustedPoolWatcher(self._pool) - # Schedule database pool exhaustion watch: - # Check for pool exhaustion every 200 ms - self.schedule(self._check_database_pool_exhaustion, interval=0.2, cancel_on_stop=True, quiet_mode=True) - # Report pool exhaustion every 24h - self.schedule(self._report_database_pool_exhaustion, interval=3_600 * 24, cancel_on_stop=True) + self._db_monitor = DatabaseMonitor(self._pool, opt.db_name.get(), opt.db_host.get()) + self._db_monitor.start() async def stop(self) -> None: await super().stop() await self.disconnect_database() + await self._db_monitor.stop() self._pool = None def get_dependencies(self) -> list[str]: @@ -84,54 +172,13 @@ async def disconnect_database(self) -> None: async def get_status(self) -> dict[str, ArgumentTypes]: """Get the status of the database connection""" - connected = await self.get_connection_status() - status = { - "connected": connected, - "database": opt.db_name.get(), - "host": opt.db_host.get(), - } - if self._pool is not None: - status["max_pool"] = self._pool.get_max_size() - status["open_connections"] = self._pool.get_size() - status["free_connections"] = self._pool.get_idle_size() - - return status - - def start_monitor(self) -> None: - """Attach to monitoring system""" - gauge( - "db.connected", - CallbackGauge( - callback=lambda: 1 if (self._pool is not None and not self._pool._closing and not self._pool._closed) else 0 - ), - ) - gauge("db.max_pool", CallbackGauge(callback=lambda: self._pool.get_max_size() if self._pool is not None else 0)) - gauge("db.open_connections", CallbackGauge(callback=lambda: self._pool.get_size() if self._pool is not None else 0)) - gauge( - "db.free_connections", CallbackGauge(callback=lambda: self._pool.get_idle_size() if self._pool is not None else 0) - ) - - async def get_connection_status(self) -> bool: - if self._pool is not None and not self._pool._closing and not self._pool._closed: - try: - async with self._pool.acquire(timeout=10): - return True - except Exception: - LOGGER.exception("Connection to PostgreSQL failed") - return False + return dataclasses.asdict(await self._db_monitor.get_status()) async def _purge_agent_processes(self) -> None: + # Move to agent manager agent_processes_to_keep = opt.agent_processes_to_keep.get() await data.AgentProcess.cleanup(nr_expired_records_to_keep=agent_processes_to_keep) - async def _report_database_pool_exhaustion(self) -> None: - assert self._db_pool_watcher is not None # Make mypy happy - self._db_pool_watcher.report_and_reset(LOGGER) - - async def _check_database_pool_exhaustion(self) -> None: - assert self._db_pool_watcher is not None # Make mypy happy - self._db_pool_watcher.check_for_pool_exhaustion() - async def server_db_connect() -> asyncpg.pool.Pool: database_host = opt.db_host.get() diff --git a/src/inmanta/util/__init__.py b/src/inmanta/util/__init__.py index e3ea3662b4..cf534fa6d0 100644 --- a/src/inmanta/util/__init__.py +++ b/src/inmanta/util/__init__.py @@ -392,6 +392,25 @@ def action_function() -> None: self._scheduled[task_spec] = handle return task_spec + def schedule( + self, + call: TaskMethod, + interval: float = 60, + initial_delay: Optional[float] = None, + cancel_on_stop: bool = True, + quiet_mode: bool = False, + ) -> Optional[ScheduledTask]: + """ + Schedule a task repeatedly with a given interval. Tasks with the same call and the same schedule are considered the + same. Clients that wish to be able to delete tasks should make sure to use a unique `call` function. + + :param interval: The interval between executions of the task. + :param initial_delay: The delay to execute the task for the first time. If not set, interval is used. + :quiet_mode: Set to true to disable logging the recurring notification that the action is being called. Use this to + avoid polluting the server log for very frequent actions. + """ + return self.add_action(call, IntervalSchedule(interval, initial_delay), cancel_on_stop, quiet_mode) + @stable_api def remove(self, task: ScheduledTask) -> None: """ @@ -850,6 +869,7 @@ class ExhaustedPoolWatcher: def __init__(self, pool: asyncpg.pool.Pool) -> None: self._exhausted_pool_events_count: int = 0 + self._last_report: int = 0 self._pool: asyncpg.pool.Pool = pool def report_and_reset(self, logger: logging.Logger) -> None: @@ -857,9 +877,10 @@ def report_and_reset(self, logger: logging.Logger) -> None: Log how many exhausted pool events were recorded since the last time the counter was reset, if any, and reset the counter. """ - if self._exhausted_pool_events_count > 0: - logger.warning("Database pool was exhausted %d times in the past 24h.", self._exhausted_pool_events_count) - self._reset_counter() + since_last = self._exhausted_pool_events_count - self._last_report + if since_last > 0: + logger.warning("Database pool was exhausted %d times in the past 24h.", since_last) + self._last_report = self._exhausted_pool_events_count def check_for_pool_exhaustion(self) -> None: """ @@ -869,9 +890,6 @@ def check_for_pool_exhaustion(self) -> None: if pool_exhausted: self._exhausted_pool_events_count += 1 - def _reset_counter(self) -> None: - self._exhausted_pool_events_count = 0 - def remove_comment_part_from_specifier(to_clean: str) -> str: """ diff --git a/tests/server/test_databaseservice.py b/tests/server/test_databaseservice.py index 7645ee1287..5ac64ceb40 100644 --- a/tests/server/test_databaseservice.py +++ b/tests/server/test_databaseservice.py @@ -74,7 +74,7 @@ def exhaustion_events_recorded() -> bool: """ Returns true if some database exhaustion events have been recorded """ - n_events: int = database_slice._db_pool_watcher._exhausted_pool_events_count + n_events: int = database_slice._db_monitor._db_pool_watcher._exhausted_pool_events_count return n_events > 0 with caplog.at_level(logging.WARNING, "inmanta.server.services.databaseservice"): @@ -91,7 +91,7 @@ def exhaustion_events_recorded() -> bool: await connection.close() # Call _report_database_pool_exhaustion manually (scheduled to run every 24h) - await database_slice._report_database_pool_exhaustion() + await database_slice._db_monitor._report_database_pool_exhaustion() log_contains( caplog, From 66670dcec001447defa3bbbaa0fd1902d66ab20e Mon Sep 17 00:00:00 2001 From: wouter Date: Mon, 28 Oct 2024 16:33:41 +0100 Subject: [PATCH 02/11] All bits and pieces --- src/inmanta/agent/agent_new.py | 31 ++++- src/inmanta/data/model.py | 29 +++++ src/inmanta/protocol/methods_v2.py | 11 +- src/inmanta/reporter.py | 2 + src/inmanta/server/agentmanager.py | 10 +- .../server/services/databaseservice.py | 109 +++++++++++++----- tests/server/test_server_status.py | 2 +- 7 files changed, 162 insertions(+), 32 deletions(-) diff --git a/src/inmanta/agent/agent_new.py b/src/inmanta/agent/agent_new.py index 576b338292..f68155e111 100644 --- a/src/inmanta/agent/agent_new.py +++ b/src/inmanta/agent/agent_new.py @@ -24,15 +24,18 @@ from concurrent.futures.thread import ThreadPoolExecutor from typing import Any, Callable, Coroutine, Optional, Union +import inmanta.server.config as opt from inmanta import config, const, protocol from inmanta.agent import config as cfg from inmanta.agent import executor, forking_executor from inmanta.agent.reporting import collect_report from inmanta.const import AGENT_SCHEDULER_ID -from inmanta.data.model import AttributeStateChange, ResourceVersionIdStr +from inmanta.data import Agent, Resource +from inmanta.data.model import AttributeStateChange, DataBaseReport, ResourceVersionIdStr from inmanta.deploy import scheduler from inmanta.deploy.work import TaskPriority from inmanta.protocol import SessionEndpoint, methods, methods_v2 +from inmanta.server.services.databaseservice import DatabaseMonitor from inmanta.types import Apireturn from inmanta.util import CronSchedule, IntervalSchedule, ScheduledTask, Scheduler, TaskMethod, TaskSchedule, join_threadpools @@ -75,6 +78,18 @@ def __init__( self._set_deploy_and_repair_intervals() + async def start(self) -> None: + # None of this very OO, this is a bit messy + self._db_monitor = DatabaseMonitor( + Resource._connection_pool, + opt.db_name.get(), + opt.db_host.get(), + "scheduler", + str(self.environment), + ) + + await super().start() + def _set_deploy_and_repair_intervals(self) -> None: """ Fetch the settings related to automatic deploys and repairs from the config @@ -314,6 +329,20 @@ async def get_facts(self, env: uuid.UUID, agent: str, resource: dict[str, Any]) async def get_status(self) -> Apireturn: return 200, collect_report(self) + @protocol.handle(methods_v2.get_db_status) + async def get_db_status(self) -> DataBaseReport: + if self._db_monitor is None: + return DataBaseReport( + connected=False, + database="", + host="", + max_pool=0, + open_connections=0, + free_connections=0, + pool_exhaustion_count=0, + ) + return await self._db_monitor.get_status() + def check_storage(self) -> dict[str, str]: """ Check if the server storage is configured and ready to use. diff --git a/src/inmanta/data/model.py b/src/inmanta/data/model.py index 0fa09b9a55..37381423f2 100644 --- a/src/inmanta/data/model.py +++ b/src/inmanta/data/model.py @@ -16,6 +16,7 @@ Contact: code@inmanta.com """ +import dataclasses import datetime import typing import urllib @@ -859,3 +860,31 @@ def has_source(self) -> bool: LEGACY_PIP_DEFAULT = PipConfig(use_system_config=True) + + +class DataBaseReport(BaseModel): + + connected: bool + database: str + host: str + max_pool: int + open_connections: int + free_connections: int + pool_exhaustion_count: int + + def __add__(self, other: BaseModel) -> BaseModel: + if not isinstance(other, BaseModel): + return NotImplemented + if other.database != self.database: + return NotImplemented + if other.host != self.host: + return NotImplemented + return DataBaseReport( + connected=self.connected, + database=self.database, + host=self.host, + max_pool=self.max_pool + other.max_pool, + open_connections=self.open_connections + other.open_connections, + free_connections=self.free_connections + other.free_connections, + pool_exhaustion_count=self.pool_exhaustion_count + other.pool_exhaustion_count, + ) diff --git a/src/inmanta/protocol/methods_v2.py b/src/inmanta/protocol/methods_v2.py index 713e631338..8e90425ca8 100644 --- a/src/inmanta/protocol/methods_v2.py +++ b/src/inmanta/protocol/methods_v2.py @@ -24,7 +24,7 @@ from inmanta.const import AgentAction, ApiDocsFormat, Change, ClientType, ParameterSource, ResourceState from inmanta.data import model -from inmanta.data.model import LinkedDiscoveredResource, PipConfig, ResourceIdStr +from inmanta.data.model import DataBaseReport, LinkedDiscoveredResource, PipConfig, ResourceIdStr from inmanta.protocol import methods from inmanta.protocol.common import ReturnValue from inmanta.protocol.decorators import typedmethod @@ -496,6 +496,15 @@ def update_agent_map(agent_map: dict[str, str]) -> None: """ +@typedmethod( + path="/db_status", api=False, server_agent=True, enforce_auth=False, operation="POST", client_types=[], api_version=2 +) +def get_db_status() -> DataBaseReport: + """ + Get a report of the DB connection pool status + """ + + @typedmethod( path="/compiledata/", operation="GET", diff --git a/src/inmanta/reporter.py b/src/inmanta/reporter.py index 15bbab0bd1..ee470466f2 100644 --- a/src/inmanta/reporter.py +++ b/src/inmanta/reporter.py @@ -80,6 +80,8 @@ class InfluxReporter(AsyncReporter): """ InfluxDB reporter using native http api (based on https://influxdb.com/docs/v1.1/guides/writing_data.html) + + If metric name end with `,a=b` these will append as tags `a=b` """ def __init__( diff --git a/src/inmanta/server/agentmanager.py b/src/inmanta/server/agentmanager.py index 5a8302b32b..2a1d0440e4 100644 --- a/src/inmanta/server/agentmanager.py +++ b/src/inmanta/server/agentmanager.py @@ -38,7 +38,7 @@ from inmanta import const, data, tracing from inmanta.agent import config as agent_cfg from inmanta.config import Config -from inmanta.const import UNDEPLOYABLE_NAMES, AgentAction, AgentStatus +from inmanta.const import AGENT_SCHEDULER_ID, UNDEPLOYABLE_NAMES, AgentAction, AgentStatus from inmanta.data import APILIMIT, InvalidSort, model from inmanta.data.model import ResourceIdStr from inmanta.protocol import encode_token, handle, methods, methods_v2 @@ -204,6 +204,14 @@ async def prestop(self) -> None: async def stop(self) -> None: await super().stop() + def get_all_schedulers(self) -> list[tuple[uuid.UUID, protocol.Session]]: + # Linear scan, but every item should be a hit + return [ + (env_id, session) + for (env_id, agent_id), session in self.tid_endpoint_to_session.items() + if agent_id == AGENT_SCHEDULER_ID + ] + async def halt_agents(self, env: data.Environment, connection: Optional[asyncpg.connection.Connection] = None) -> None: """ Halts all agents for an environment. Persists prior paused state. diff --git a/src/inmanta/server/services/databaseservice.py b/src/inmanta/server/services/databaseservice.py index b4263c39c9..799b92bd94 100644 --- a/src/inmanta/server/services/databaseservice.py +++ b/src/inmanta/server/services/databaseservice.py @@ -16,44 +16,44 @@ Contact: code@inmanta.com """ +import asyncio import dataclasses import logging -from typing import Optional +import typing +import uuid +from typing import Optional, cast import asyncpg from pyformance import gauge, global_registry from pyformance.meters import CallbackGauge from inmanta import data, util -from inmanta.server import SLICE_DATABASE +from inmanta.data.model import DataBaseReport +from inmanta.server import SLICE_AGENT_MANAGER, SLICE_DATABASE from inmanta.server import config as opt from inmanta.server import protocol +from inmanta.server.server import Server from inmanta.types import ArgumentTypes from inmanta.util import Scheduler LOGGER = logging.getLogger(__name__) - -@dataclasses.dataclass -class DataBaseReport: - - connected: bool - database: str - host: str - max_pool: int - open_connections: int - free_connections: int - pool_exhaustion_count: int +if typing.TYPE_CHECKING: + from inmanta.server import agentmanager class DatabaseMonitor: - def __init__(self, pool: asyncpg.pool.Pool, db_name: str, db_host: str) -> None: + def __init__( + self, pool: asyncpg.pool.Pool, db_name: str, db_host: str, component: str, environment: str | None = None + ) -> None: self._pool = pool self._scheduler = Scheduler(f"Database monitor for {db_name}") self._db_pool_watcher = util.ExhaustedPoolWatcher(self._pool) self.dn_name = db_name self.db_host = db_host + self.component = component + self.environment = environment def start(self) -> None: self.start_monitor() @@ -84,27 +84,42 @@ async def get_status(self) -> DataBaseReport: def start_monitor(self) -> None: """Attach to monitoring system""" + suffix = f",component={self.component}" + if self.environment: + suffix += f",environment={self.environment}" + gauge( - "db.connected", + "db.connected" + suffix, CallbackGauge( callback=lambda: 1 if (self._pool is not None and not self._pool._closing and not self._pool._closed) else 0 ), ) - gauge("db.max_pool", CallbackGauge(callback=lambda: self._pool.get_max_size() if self._pool is not None else 0)) - gauge("db.open_connections", CallbackGauge(callback=lambda: self._pool.get_size() if self._pool is not None else 0)) gauge( - "db.free_connections", CallbackGauge(callback=lambda: self._pool.get_idle_size() if self._pool is not None else 0) + "db.max_pool" + suffix, CallbackGauge(callback=lambda: self._pool.get_max_size() if self._pool is not None else 0) + ) + gauge( + "db.open_connections" + suffix, + CallbackGauge(callback=lambda: self._pool.get_size() if self._pool is not None else 0), + ) + gauge( + "db.free_connections" + suffix, + CallbackGauge(callback=lambda: self._pool.get_idle_size() if self._pool is not None else 0), + ) + gauge( + "db.pool_exhaustion_count" + suffix, + CallbackGauge(callback=lambda: self._db_pool_watcher._exhausted_pool_events_count), ) - gauge("db.pool_exhaustion_count", CallbackGauge(callback=lambda: self._db_pool_watcher._exhausted_pool_events_count)) def stop_monitor(self): - global_registry()._gauges.pop("db.connected", None) - global_registry()._gauges.pop("db.max_pool", None) - global_registry()._gauges.pop("db.open_connections", None) - global_registry()._gauges.pop("db.free_connections", None) - global_registry()._gauges.pop("db.pool_exhaustion_count", None) - # Add sub-tagging - # Tag + suffix = f",component={self.component}" + if self.environment: + suffix += f",environment={self.environment}" + + global_registry()._gauges.pop("db.connected" + suffix, None) + global_registry()._gauges.pop("db.max_pool" + suffix, None) + global_registry()._gauges.pop("db.open_connections" + suffix, None) + global_registry()._gauges.pop("db.free_connections" + suffix, None) + global_registry()._gauges.pop("db.pool_exhaustion_count" + suffix, None) async def get_connection_status(self) -> bool: if self._pool is not None and not self._pool._closing and not self._pool._closed: @@ -131,6 +146,16 @@ def __init__(self) -> None: super().__init__(SLICE_DATABASE) self._pool: Optional[asyncpg.pool.Pool] = None self._db_monitor: Optional[DatabaseMonitor] = None + self._server_handler: Server | None = None + self.agentmanager_service: "agentmanager.AgentManager" + + async def prestart(self, server: Server) -> None: + """ + Called by the RestServer host prior to start, can be used to collect references to other server slices + Dependencies are not up yet. + """ + self._server_handler = server + await super().prestart(server) async def start(self) -> None: await super().start() @@ -143,8 +168,10 @@ async def start(self) -> None: self._purge_agent_processes, interval=agent_process_purge_interval, initial_delay=0, cancel_on_stop=False ) + self.agentmanager_service = cast("agentmanager.AgentManager", self._server_handler.get_slice(SLICE_AGENT_MANAGER)) + assert self._pool is not None # Make mypy happy - self._db_monitor = DatabaseMonitor(self._pool, opt.db_name.get(), opt.db_host.get()) + self._db_monitor = DatabaseMonitor(self._pool, opt.db_name.get(), opt.db_host.get(), "server") self._db_monitor.start() async def stop(self) -> None: @@ -172,7 +199,33 @@ async def disconnect_database(self) -> None: async def get_status(self) -> dict[str, ArgumentTypes]: """Get the status of the database connection""" - return dataclasses.asdict(await self._db_monitor.get_status()) + schedulers = self.agentmanager_service.get_all_schedulers() + + deadline = 0.9 * Server.GET_SERVER_STATUS_TIMEOUT + + async def get_report(env: uuid.UUID, session: protocol.Session) -> tuple[uuid, DataBaseReport]: + result = await asyncio.wait_for(session.client.get_db_status(), deadline) + assert result.code == 200 + + return (env, DataBaseReport(**result.result["data"])) + + self_status_job = asyncio.create_task(self._db_monitor.get_status()) + results = await asyncio.gather(*[get_report(env, scheduler) for env, scheduler in schedulers], return_exceptions=True) + + self_status = await self_status_job + + total = self_status + + for result in results: + if isinstance(result, Exception): + raise result + else: + total += result[1] + + # Overall + # Server + # Scheduler + return (total).dict() async def _purge_agent_processes(self) -> None: # Move to agent manager diff --git a/tests/server/test_server_status.py b/tests/server/test_server_status.py index a87096f427..0904b9ed0a 100644 --- a/tests/server/test_server_status.py +++ b/tests/server/test_server_status.py @@ -23,7 +23,7 @@ from inmanta.server.services.compilerservice import CompilerService -async def test_server_status(server, client): +async def test_server_status(server, client, agent): result = await client.get_server_status() assert result.code == 200 From 4966ef24b51ab8fc13a7bbe040bce6dba2214d80 Mon Sep 17 00:00:00 2001 From: wouter Date: Tue, 29 Oct 2024 16:51:50 +0100 Subject: [PATCH 03/11] added DB server refactor --- .../server/services/databaseservice.py | 28 +------------------ 1 file changed, 1 insertion(+), 27 deletions(-) diff --git a/src/inmanta/server/services/databaseservice.py b/src/inmanta/server/services/databaseservice.py index 799b92bd94..e35a36a739 100644 --- a/src/inmanta/server/services/databaseservice.py +++ b/src/inmanta/server/services/databaseservice.py @@ -199,33 +199,7 @@ async def disconnect_database(self) -> None: async def get_status(self) -> dict[str, ArgumentTypes]: """Get the status of the database connection""" - schedulers = self.agentmanager_service.get_all_schedulers() - - deadline = 0.9 * Server.GET_SERVER_STATUS_TIMEOUT - - async def get_report(env: uuid.UUID, session: protocol.Session) -> tuple[uuid, DataBaseReport]: - result = await asyncio.wait_for(session.client.get_db_status(), deadline) - assert result.code == 200 - - return (env, DataBaseReport(**result.result["data"])) - - self_status_job = asyncio.create_task(self._db_monitor.get_status()) - results = await asyncio.gather(*[get_report(env, scheduler) for env, scheduler in schedulers], return_exceptions=True) - - self_status = await self_status_job - - total = self_status - - for result in results: - if isinstance(result, Exception): - raise result - else: - total += result[1] - - # Overall - # Server - # Scheduler - return (total).dict() + return (await self._db_monitor.get_status()).dict() async def _purge_agent_processes(self) -> None: # Move to agent manager From 98e12fbc5ec721a8418f22c61692533ed03c4108 Mon Sep 17 00:00:00 2001 From: wouter Date: Wed, 30 Oct 2024 10:32:06 +0100 Subject: [PATCH 04/11] temp --- src/inmanta/agent/agent_new.py | 5 +-- src/inmanta/data/model.py | 6 +-- src/inmanta/server/__init__.py | 2 +- src/inmanta/server/agentmanager.py | 43 +++++++++++++++++-- src/inmanta/server/protocol.py | 8 ++-- .../server/services/compilerservice.py | 2 +- .../server/services/databaseservice.py | 4 +- 7 files changed, 52 insertions(+), 18 deletions(-) diff --git a/src/inmanta/agent/agent_new.py b/src/inmanta/agent/agent_new.py index 0937e86ea1..9f02024184 100644 --- a/src/inmanta/agent/agent_new.py +++ b/src/inmanta/agent/agent_new.py @@ -25,12 +25,11 @@ from typing import Any, Callable, Coroutine, Optional, Union import inmanta.server.config as opt -from inmanta import config, const, protocol +from inmanta import config, const, protocol, data from inmanta.agent import config as cfg from inmanta.agent import executor, forking_executor from inmanta.agent.reporting import collect_report from inmanta.const import AGENT_SCHEDULER_ID -from inmanta.data import Agent, Resource from inmanta.data.model import AttributeStateChange, DataBaseReport, ResourceVersionIdStr from inmanta.deploy import scheduler from inmanta.deploy.work import TaskPriority @@ -90,7 +89,7 @@ def __init__( async def start(self) -> None: # None of this very OO, this is a bit messy self._db_monitor = DatabaseMonitor( - Resource._connection_pool, + data.Resource._connection_pool, opt.db_name.get(), opt.db_host.get(), "scheduler", diff --git a/src/inmanta/data/model.py b/src/inmanta/data/model.py index 37381423f2..616e7e4153 100644 --- a/src/inmanta/data/model.py +++ b/src/inmanta/data/model.py @@ -90,7 +90,7 @@ class SliceStatus(BaseModel): """ name: str - status: dict[str, ArgumentTypes] + status: dict[str, ArgumentTypes | dict[str, ArgumentTypes]] class FeatureStatus(BaseModel): @@ -872,8 +872,8 @@ class DataBaseReport(BaseModel): free_connections: int pool_exhaustion_count: int - def __add__(self, other: BaseModel) -> BaseModel: - if not isinstance(other, BaseModel): + def __add__(self, other: "DataBaseReport") -> "DataBaseReport": + if not isinstance(other, DataBaseReport): return NotImplemented if other.database != self.database: return NotImplemented diff --git a/src/inmanta/server/__init__.py b/src/inmanta/server/__init__.py index 09335c22be..c9d0a0adb0 100644 --- a/src/inmanta/server/__init__.py +++ b/src/inmanta/server/__init__.py @@ -19,7 +19,7 @@ # flake8: noqa: F401 SLICE_SERVER = "core.server" -SLICE_AGENT_MANAGER = "core.agentmanager" +SLICE_AGENT_MANAGER = "core.scheduler_manager" SLICE_AUTOSTARTED_AGENT_MANAGER = "core.autostarted_agent_manager" SLICE_SESSION_MANAGER = "core.session" SLICE_DATABASE = "core.database" diff --git a/src/inmanta/server/agentmanager.py b/src/inmanta/server/agentmanager.py index 305bbcd1d3..70cc5167d5 100644 --- a/src/inmanta/server/agentmanager.py +++ b/src/inmanta/server/agentmanager.py @@ -27,6 +27,7 @@ from collections.abc import Collection, Iterable, Iterator, Mapping, Sequence, Set from datetime import datetime from enum import Enum +from functools import reduce from typing import Any, Optional, Union, cast from uuid import UUID @@ -40,7 +41,7 @@ from inmanta.config import Config from inmanta.const import AGENT_SCHEDULER_ID, UNDEPLOYABLE_NAMES, AgentAction, AgentStatus from inmanta.data import APILIMIT, InvalidSort, model -from inmanta.data.model import ResourceIdStr +from inmanta.data.model import ResourceIdStr, DataBaseReport from inmanta.protocol import encode_token, handle, methods, methods_v2 from inmanta.protocol.common import ReturnValue from inmanta.protocol.exceptions import BadRequest, Forbidden, NotFound, ShutdownInProgress @@ -168,12 +169,46 @@ def __init__(self, closesessionsonstart: bool = True, fact_back_off: Optional[in self.closesessionsonstart: bool = closesessionsonstart - async def get_status(self) -> dict[str, ArgumentTypes]: - return { + async def get_status(self) -> Mapping[str, ArgumentTypes | Mapping[str, ArgumentTypes]]: + schedulers = self.get_all_schedulers() + deadline = 0.9 * Server.GET_SERVER_STATUS_TIMEOUT + + async def get_report(env: uuid.UUID, session: protocol.Session) -> tuple[uuid.UUID, DataBaseReport]: + result = await asyncio.wait_for(session.client.get_db_status(), deadline) + assert result.code == 200 + return (env, DataBaseReport(**result.result["data"])) + + results = await asyncio.gather(*[get_report(env, scheduler) for env, scheduler in schedulers], return_exceptions=True) + + total: DataBaseReport = reduce(lambda x,y: x+y,(x[1] for x in results if not isinstance(x, BaseException))) + + out: dict[str, ArgumentTypes | Mapping[str, ArgumentTypes]] = { "resource_facts": len(self._fact_resource_block_set), "sessions": len(self.sessions), } + def short_report(report: DataBaseReport) -> Mapping[str, ArgumentTypes]: + return { + "connected": report.connected, + "max_pool": report.max_pool, + "open_connections": report.open_connections, + "free_connections": report.free_connections, + "pool_exhaustion_count": report.pool_exhaustion_count, + } + + if total: + out["database"] = total.database + out["host"] = total.host + out["total"] = short_report(total) + + for result in results: + if isinstance(result, BaseException): + logging.debug("Failed to collect database status for scheduler", exc_info=True) + else: + out[result[0]] = short_report(result[1]) + + return out + def get_dependencies(self) -> list[str]: return [SLICE_DATABASE, SLICE_SESSION_MANAGER] @@ -951,7 +986,7 @@ def __init__(self) -> None: self._agent_procs: dict[UUID, subprocess.Process] = {} # env uuid -> subprocess.Process self.agent_lock = asyncio.Lock() # Prevent concurrent updates on _agent_procs - async def get_status(self) -> dict[str, ArgumentTypes]: + async def get_status(self) -> Mapping[str, ArgumentTypes]: return {"processes": len(self._agent_procs)} async def prestart(self, server: protocol.Server) -> None: diff --git a/src/inmanta/server/protocol.py b/src/inmanta/server/protocol.py index 7de393de63..113a24b65f 100644 --- a/src/inmanta/server/protocol.py +++ b/src/inmanta/server/protocol.py @@ -26,7 +26,7 @@ from collections import defaultdict from collections.abc import Sequence from datetime import timedelta -from typing import TYPE_CHECKING, Callable, Optional, Union +from typing import TYPE_CHECKING, Callable, Optional, Union, Mapping from tornado import gen, queues, routing, web @@ -399,7 +399,7 @@ def get_extension_statuses(cls, slices: list["ServerSlice"]) -> list[ExtensionSt result[ext_status.name] = ext_status return list(result.values()) - async def get_status(self) -> dict[str, ArgumentTypes]: + async def get_status(self) -> Mapping[str, ArgumentTypes | Mapping[str, ArgumentTypes]]: """ Get the status of this slice. """ @@ -620,7 +620,7 @@ async def stop(self) -> None: await super().stop() await self.server._transport.join() - async def get_status(self) -> dict[str, ArgumentTypes]: + async def get_status(self) -> Mapping[str, ArgumentTypes]: def format_socket(sock: socket.socket) -> str: sname = sock.getsockname() return f"{sname[0]}:{sname[1]}" @@ -667,7 +667,7 @@ def __init__(self) -> None: # Listeners self.listeners: list[SessionListener] = [] - async def get_status(self) -> dict[str, ArgumentTypes]: + async def get_status(self) -> Mapping[str, ArgumentTypes]: return {"hangtime": self.hangtime, "interval": self.interval, "sessions": len(self._sessions)} def add_listener(self, listener: SessionListener) -> None: diff --git a/src/inmanta/server/services/compilerservice.py b/src/inmanta/server/services/compilerservice.py index 7720ad01c2..49badc55ae 100644 --- a/src/inmanta/server/services/compilerservice.py +++ b/src/inmanta/server/services/compilerservice.py @@ -540,7 +540,7 @@ def __init__(self) -> None: self._queue_count_cache: int = 0 self._queue_count_cache_lock = asyncio.locks.Lock() - async def get_status(self) -> dict[str, ArgumentTypes]: + async def get_status(self) -> Mapping[str, ArgumentTypes]: return {"task_queue": self._queue_count_cache, "listeners": len(self.async_listeners) + len(self.blocking_listeners)} def add_listener(self, listener: CompileStateListener) -> None: diff --git a/src/inmanta/server/services/databaseservice.py b/src/inmanta/server/services/databaseservice.py index 1b43e633e6..8525d476c5 100644 --- a/src/inmanta/server/services/databaseservice.py +++ b/src/inmanta/server/services/databaseservice.py @@ -21,7 +21,7 @@ import logging import typing import uuid -from typing import Optional, cast +from typing import Optional, cast, Mapping import asyncpg from pyformance import gauge, global_registry @@ -207,7 +207,7 @@ async def disconnect_database(self) -> None: """Disconnect the database""" await data.disconnect() - async def get_status(self) -> dict[str, ArgumentTypes]: + async def get_status(self) -> Mapping[str, ArgumentTypes]: """Get the status of the database connection""" return (await self._db_monitor.get_status()).dict() From 74ee0f9937827c4cd852fe5eceffa27acec11e1a Mon Sep 17 00:00:00 2001 From: wouter Date: Wed, 30 Oct 2024 14:35:28 +0100 Subject: [PATCH 05/11] Added reporting --- mypy-baseline.txt | 6 ++- src/inmanta/agent/agent_new.py | 6 ++- src/inmanta/app.py | 4 ++ src/inmanta/data/model.py | 5 +- src/inmanta/env.py | 2 +- src/inmanta/server/agentmanager.py | 46 +++++++++++++++---- src/inmanta/server/protocol.py | 13 +----- .../server/services/databaseservice.py | 17 ++++--- stubs/pyformance/__init__.pyi | 3 ++ tests/server/test_server_status.py | 10 +++- tests/test_influxdbreporting.py | 38 +++++++++++---- 11 files changed, 101 insertions(+), 49 deletions(-) diff --git a/mypy-baseline.txt b/mypy-baseline.txt index 0aee49f4d3..94eeaf6b0a 100644 --- a/mypy-baseline.txt +++ b/mypy-baseline.txt @@ -101,7 +101,7 @@ src/inmanta/ast/type.py:0: error: No overload variant of "reversed" matches argu src/inmanta/ast/type.py:0: note: Possible overload variants: src/inmanta/ast/type.py:0: note: def [_T] __new__(cls, Reversible[_T], /) -> reversed[_T] src/inmanta/ast/type.py:0: note: def [_T] __new__(cls, SupportsLenAndGetItem[_T], /) -> reversed[_T] -src/inmanta/server/services/databaseservice.py:0: error: Incompatible return value type (got "dict[str, object]", expected "dict[str, BaseModel | UUID | bool | int | float | datetime | str | Sequence[BaseModel | UUID | bool | int | float | datetime | str] | Mapping[str, BaseModel | UUID | bool | int | float | datetime | str]]") [return-value] +src/inmanta/server/services/metricservice.py:0: error: Incompatible types in assignment (expression has type "float", variable has type "int") [assignment] src/inmanta/protocol/decorators.py:0: error: "FuncT" has no attribute "__protocol_method__" [attr-defined] src/inmanta/protocol/decorators.py:0: error: "FuncT" has no attribute "__protocol_mapping__" [attr-defined] src/inmanta/protocol/decorators.py:0: error: "FuncT" has no attribute "__api_version__" [attr-defined] @@ -156,6 +156,7 @@ src/inmanta/ast/attribute.py:0: error: Incompatible types in assignment (express src/inmanta/ast/attribute.py:0: error: Missing type parameters for generic type "ResultVariable" [type-arg] src/inmanta/ast/attribute.py:0: error: Missing type parameters for generic type "ResultVariable" [type-arg] src/inmanta/util/__init__.py:0: error: Argument "key" to "sorted" has incompatible type "Callable[[T], S]"; expected "Callable[[T], SupportsDunderLT[Any] | SupportsDunderGT[Any]]" [arg-type] +src/inmanta/util/__init__.py:0: error: Returning Any from function declared to return "ScheduledTask | None" [no-any-return] src/inmanta/util/__init__.py:0: error: Returning Any from function declared to return "bool" [no-any-return] src/inmanta/util/__init__.py:0: error: Incompatible return value type (got "bool | Awaitable[bool]", expected "bool") [return-value] src/inmanta/resources.py:0: error: Function is missing a type annotation [no-untyped-def] @@ -236,6 +237,7 @@ src/inmanta/protocol/methods_v2.py:0: error: Missing return statement [empty-bo src/inmanta/protocol/methods_v2.py:0: error: Missing return statement [empty-body] src/inmanta/protocol/methods_v2.py:0: error: Missing return statement [empty-body] src/inmanta/protocol/methods_v2.py:0: error: Missing return statement [empty-body] +src/inmanta/protocol/methods_v2.py:0: error: Missing return statement [empty-body] src/inmanta/protocol/methods_v2.py:0: error: Cannot determine type of "RVID_OPTS" [has-type] src/inmanta/protocol/methods_v2.py:0: error: Cannot determine type of "RVID_OPTS" [has-type] src/inmanta/protocol/methods_v2.py:0: error: Missing return statement [empty-body] @@ -1101,6 +1103,7 @@ src/inmanta/server/services/orchestrationservice.py:0: error: Incompatible types src/inmanta/server/services/orchestrationservice.py:0: error: Incompatible types in assignment (expression has type "set[str | None]", variable has type "set[str]") [assignment] src/inmanta/server/services/orchestrationservice.py:0: error: Argument "agents" to "_trigger_auto_deploy" of "OrchestrationService" has incompatible type "Collection[str]"; expected "Sequence[str] | None" [arg-type] src/inmanta/server/services/orchestrationservice.py:0: error: Incompatible return value type (got "ReturnValueWithMeta[Sequence[DesiredStateVersion]]", expected "ReturnValue[list[DesiredStateVersion]]") [return-value] +src/inmanta/server/agentmanager.py:0: error: Value of type "dict[str, Any] | None" is not indexable [index] src/inmanta/server/agentmanager.py:0: error: Argument 1 to "ensure_agent_registered" of "AgentManager" has incompatible type "Environment | None"; expected "Environment" [arg-type] src/inmanta/agent/forking_executor.py:0: error: Argument 1 of "connection_made" is incompatible with supertype "BaseProtocol"; supertype defines the argument type as "BaseTransport" [override] src/inmanta/agent/forking_executor.py:0: note: This violates the Liskov substitution principle @@ -1115,7 +1118,6 @@ src/inmanta/server/services/paramservice.py:0: error: Unsupported operand types src/inmanta/server/services/paramservice.py:0: note: Left operand is of type "datetime | None" src/inmanta/server/services/paramservice.py:0: error: Unsupported operand types for > ("timedelta" and "datetime") [operator] src/inmanta/server/services/paramservice.py:0: note: Left operand is of type "datetime | timedelta" -src/inmanta/server/services/metricservice.py:0: error: Incompatible types in assignment (expression has type "float", variable has type "int") [assignment] src/inmanta/server/services/environment_metrics_service.py:0: error: Function is missing a return type annotation [no-untyped-def] src/inmanta/server/services/environment_metrics_service.py:0: error: "Sequence[EnvironmentMetricsGauge]" has no attribute "append" [attr-defined] src/inmanta/server/services/environment_metrics_service.py:0: error: Function is missing a return type annotation [no-untyped-def] diff --git a/src/inmanta/agent/agent_new.py b/src/inmanta/agent/agent_new.py index 9f02024184..bf13eadb08 100644 --- a/src/inmanta/agent/agent_new.py +++ b/src/inmanta/agent/agent_new.py @@ -25,7 +25,7 @@ from typing import Any, Callable, Coroutine, Optional, Union import inmanta.server.config as opt -from inmanta import config, const, protocol, data +from inmanta import config, const, data, protocol from inmanta.agent import config as cfg from inmanta.agent import executor, forking_executor from inmanta.agent.reporting import collect_report @@ -87,7 +87,8 @@ def __init__( self._set_deploy_and_repair_intervals() async def start(self) -> None: - # None of this very OO, this is a bit messy + # Make mypy happy + assert data.Resource._connection_pool is not None self._db_monitor = DatabaseMonitor( data.Resource._connection_pool, opt.db_name.get(), @@ -95,6 +96,7 @@ async def start(self) -> None: "scheduler", str(self.environment), ) + self._db_monitor.start() await super().start() diff --git a/src/inmanta/app.py b/src/inmanta/app.py index 535daf7286..57a2f64fdb 100644 --- a/src/inmanta/app.py +++ b/src/inmanta/app.py @@ -67,6 +67,7 @@ from inmanta.server import config as opt from inmanta.server.bootloader import InmantaBootloader from inmanta.server.services.databaseservice import initialize_database_connection_pool +from inmanta.server.services.metricservice import MetricsService from inmanta.signals import safe_shutdown, setup_signal_handlers from inmanta.util import get_compiler_version from inmanta.warnings import WarningsManager @@ -156,6 +157,9 @@ async def start() -> None: connection_pool_max_size=agent_config.scheduler_db_connection_pool_max_size.get(), connection_timeout=agent_config.scheduler_db_connection_timeout.get(), ) + # also report metrics if this is relevant + metrics_reporter = MetricsService() + metrics_reporter.start_metric_reporters() await a.start() setup_signal_handlers(a.stop) diff --git a/src/inmanta/data/model.py b/src/inmanta/data/model.py index 616e7e4153..e27f3db690 100644 --- a/src/inmanta/data/model.py +++ b/src/inmanta/data/model.py @@ -16,7 +16,6 @@ Contact: code@inmanta.com """ -import dataclasses import datetime import typing import urllib @@ -24,7 +23,7 @@ from collections import abc from collections.abc import Sequence from enum import Enum, StrEnum -from typing import ClassVar, NewType, Optional, Self, Union +from typing import ClassVar, NewType, Optional, Self, Union, Mapping import pydantic import pydantic.schema @@ -90,7 +89,7 @@ class SliceStatus(BaseModel): """ name: str - status: dict[str, ArgumentTypes | dict[str, ArgumentTypes]] + status: Mapping[str, ArgumentTypes | Mapping[str, ArgumentTypes]] class FeatureStatus(BaseModel): diff --git a/src/inmanta/env.py b/src/inmanta/env.py index 9bc610b882..37e88b2d4b 100644 --- a/src/inmanta/env.py +++ b/src/inmanta/env.py @@ -1135,7 +1135,7 @@ def are_installed(self, requirements: req_list) -> bool: Return True iff the given requirements are installed in this environment. """ assert self.is_using_virtual_env() - return PythonWorkingSet.are_instadelled(requirements) + return PythonWorkingSet.are_installed(requirements) def install_for_config( self, diff --git a/src/inmanta/server/agentmanager.py b/src/inmanta/server/agentmanager.py index 70cc5167d5..9ccfbb8055 100644 --- a/src/inmanta/server/agentmanager.py +++ b/src/inmanta/server/agentmanager.py @@ -40,8 +40,8 @@ from inmanta.agent import config as agent_cfg from inmanta.config import Config from inmanta.const import AGENT_SCHEDULER_ID, UNDEPLOYABLE_NAMES, AgentAction, AgentStatus -from inmanta.data import APILIMIT, InvalidSort, model -from inmanta.data.model import ResourceIdStr, DataBaseReport +from inmanta.data import APILIMIT, Environment, InvalidSort, model +from inmanta.data.model import DataBaseReport, ResourceIdStr from inmanta.protocol import encode_token, handle, methods, methods_v2 from inmanta.protocol.common import ReturnValue from inmanta.protocol.exceptions import BadRequest, Forbidden, NotFound, ShutdownInProgress @@ -170,22 +170,37 @@ def __init__(self, closesessionsonstart: bool = True, fact_back_off: Optional[in self.closesessionsonstart: bool = closesessionsonstart async def get_status(self) -> Mapping[str, ArgumentTypes | Mapping[str, ArgumentTypes]]: + # The basic report + + out: dict[str, ArgumentTypes | Mapping[str, ArgumentTypes]] = { + "resource_facts": len(self._fact_resource_block_set), + "sessions": len(self.sessions), + } + + # Try to get more info from scheduler, but make sure not to timeout schedulers = self.get_all_schedulers() deadline = 0.9 * Server.GET_SERVER_STATUS_TIMEOUT async def get_report(env: uuid.UUID, session: protocol.Session) -> tuple[uuid.UUID, DataBaseReport]: result = await asyncio.wait_for(session.client.get_db_status(), deadline) assert result.code == 200 + # Mypy can't help here, .... return (env, DataBaseReport(**result.result["data"])) + env_mapping = asyncio.create_task( + asyncio.wait_for(Environment.get_list(details=False, is_marked_for_deletion=False), deadline) + ) results = await asyncio.gather(*[get_report(env, scheduler) for env, scheduler in schedulers], return_exceptions=True) + try: + # This can timeout, but not likely + env_mapping_result = await env_mapping + uuid_to_name = {env.id: env.name for env in env_mapping_result} + except TimeoutError: + # default to uuid's + uuid_to_name = {} - total: DataBaseReport = reduce(lambda x,y: x+y,(x[1] for x in results if not isinstance(x, BaseException))) - - out: dict[str, ArgumentTypes | Mapping[str, ArgumentTypes]] = { - "resource_facts": len(self._fact_resource_block_set), - "sessions": len(self.sessions), - } + # Filter out timeouts + total: DataBaseReport = reduce(lambda x, y: x + y, (x[1] for x in results if not isinstance(x, BaseException))) def short_report(report: DataBaseReport) -> Mapping[str, ArgumentTypes]: return { @@ -205,7 +220,9 @@ def short_report(report: DataBaseReport) -> Mapping[str, ArgumentTypes]: if isinstance(result, BaseException): logging.debug("Failed to collect database status for scheduler", exc_info=True) else: - out[result[0]] = short_report(result[1]) + the_uuid = result[0] + name = uuid_to_name.get(the_uuid, str(the_uuid)) + out[name] = short_report(result[1]) return out @@ -1410,12 +1427,21 @@ async def _make_agent_config( name={opt.db_name.get()} username={opt.db_username.get()} password={opt.db_password.get()} + [scheduler] db-connection-pool-min-size={agent_cfg.scheduler_db_connection_pool_min_size.get()} db-connection-pool-max-size={agent_cfg.scheduler_db_connection_pool_max_size.get()} db-connection-timeout={agent_cfg.scheduler_db_connection_timeout.get()} - """ +[influxdb] +host = {opt.influxdb_host.get()} +port = {opt.influxdb_port.get()} +name = {opt.influxdb_name.get()} +username = {opt.influxdb_username.get()} +password = {opt.influxdb_password.get()} +interval = {opt.influxdb_interval.get()} +tags = {opt.influxdb_tags.get()} +""" return config async def _fork_inmanta( diff --git a/src/inmanta/server/protocol.py b/src/inmanta/server/protocol.py index 113a24b65f..675d6dc0ed 100644 --- a/src/inmanta/server/protocol.py +++ b/src/inmanta/server/protocol.py @@ -26,7 +26,7 @@ from collections import defaultdict from collections.abc import Sequence from datetime import timedelta -from typing import TYPE_CHECKING, Callable, Optional, Union, Mapping +from typing import TYPE_CHECKING, Callable, Mapping, Optional, Union from tornado import gen, queues, routing, web @@ -39,16 +39,7 @@ from inmanta.server import SLICE_SESSION_MANAGER, SLICE_TRANSPORT from inmanta.server import config as opt from inmanta.types import ArgumentTypes, JsonType -from inmanta.util import ( - CronSchedule, - CycleException, - IntervalSchedule, - ScheduledTask, - Scheduler, - TaskHandler, - TaskMethod, - stable_depth_first, -) +from inmanta.util import CronSchedule, CycleException, ScheduledTask, Scheduler, TaskHandler, TaskMethod, stable_depth_first if TYPE_CHECKING: from inmanta.server.extensions import Feature, FeatureManager diff --git a/src/inmanta/server/services/databaseservice.py b/src/inmanta/server/services/databaseservice.py index 8525d476c5..eb8ed8e2fa 100644 --- a/src/inmanta/server/services/databaseservice.py +++ b/src/inmanta/server/services/databaseservice.py @@ -16,12 +16,9 @@ Contact: code@inmanta.com """ -import asyncio -import dataclasses import logging import typing -import uuid -from typing import Optional, cast, Mapping +from typing import Mapping, Optional, cast import asyncpg from pyformance import gauge, global_registry @@ -32,7 +29,6 @@ from inmanta.server import SLICE_AGENT_MANAGER, SLICE_DATABASE from inmanta.server import config as opt from inmanta.server import protocol -from inmanta.server.server import Server from inmanta.types import ArgumentTypes from inmanta.util import Scheduler @@ -110,7 +106,7 @@ def start_monitor(self) -> None: CallbackGauge(callback=lambda: self._db_pool_watcher._exhausted_pool_events_count), ) - def stop_monitor(self): + def stop_monitor(self) -> None: suffix = f",component={self.component}" if self.environment: suffix += f",environment={self.environment}" @@ -146,10 +142,10 @@ def __init__(self) -> None: super().__init__(SLICE_DATABASE) self._pool: Optional[asyncpg.pool.Pool] = None self._db_monitor: Optional[DatabaseMonitor] = None - self._server_handler: Server | None = None + self._server_handler: protocol.Server | None = None self.agentmanager_service: "agentmanager.AgentManager" - async def prestart(self, server: Server) -> None: + async def prestart(self, server: protocol.Server) -> None: """ Called by the RestServer host prior to start, can be used to collect references to other server slices Dependencies are not up yet. @@ -168,6 +164,7 @@ async def start(self) -> None: self._purge_agent_processes, interval=agent_process_purge_interval, initial_delay=0, cancel_on_stop=False ) + assert self._server_handler is not None # Make mypy happy self.agentmanager_service = cast("agentmanager.AgentManager", self._server_handler.get_slice(SLICE_AGENT_MANAGER)) assert self._pool is not None # Make mypy happy @@ -176,8 +173,9 @@ async def start(self) -> None: async def stop(self) -> None: await super().stop() + if self._db_monitor is not None: + await self._db_monitor.stop() await self.disconnect_database() - await self._db_monitor.stop() self._pool = None def get_dependencies(self) -> list[str]: @@ -209,6 +207,7 @@ async def disconnect_database(self) -> None: async def get_status(self) -> Mapping[str, ArgumentTypes]: """Get the status of the database connection""" + assert self._db_monitor is not None # make mypy happy return (await self._db_monitor.get_status()).dict() async def _purge_agent_processes(self) -> None: diff --git a/stubs/pyformance/__init__.pyi b/stubs/pyformance/__init__.pyi index dbe8409fbe..ac76505079 100644 --- a/stubs/pyformance/__init__.pyi +++ b/stubs/pyformance/__init__.pyi @@ -5,6 +5,9 @@ from pyformance.meters import Gauge class MetricsRegistry: + + _gauges: dict[str, Gauge] + def __init__(self, clock): ... def dump_metrics(self)-> abc.Mapping[str, abc.Mapping[str, int|float|str]]: ... diff --git a/tests/server/test_server_status.py b/tests/server/test_server_status.py index 0904b9ed0a..5467bc9906 100644 --- a/tests/server/test_server_status.py +++ b/tests/server/test_server_status.py @@ -23,7 +23,7 @@ from inmanta.server.services.compilerservice import CompilerService -async def test_server_status(server, client, agent): +async def test_server_status(server, client, agent, environment): result = await client.get_server_status() assert result.code == 200 @@ -38,6 +38,14 @@ async def test_server_status(server, client, agent): assert len([x for x in status["slices"] if x["name"] == "core.database"]) == 1 assert db_status[0]["status"]["connected"] is True + scheduler_status = [x for x in status["slices"] if x["name"] == "core.scheduler_manager"] + assert len(scheduler_status) == 1 + + scheduler_stats = scheduler_status[0]["status"] + assert scheduler_stats["total"]["connected"] is True + # test environment is called dev + assert scheduler_stats["dev"]["connected"] is True + assert "features" in status assert len(status["features"]) > 0 diff --git a/tests/test_influxdbreporting.py b/tests/test_influxdbreporting.py index 0db3afa088..7ab90be255 100644 --- a/tests/test_influxdbreporting.py +++ b/tests/test_influxdbreporting.py @@ -26,7 +26,9 @@ from tornado.web import url from inmanta.reporter import AsyncReporter, InfluxReporter +from inmanta.server.config import influxdb_host, influxdb_interval, influxdb_port from inmanta.server.services.metricservice import CPUMicroBenchMark +from utils import retry_limited class QueryMockHandler(tornado.web.RequestHandler): @@ -36,13 +38,13 @@ def initialize(self, parent): def get(self, *args, **kwargs): self.parent.querycount += 1 try: - assert self.request.query_arguments["q"] == [b"CREATE DATABASE metrics"] + assert self.request.query_arguments["q"] == [b"CREATE DATABASE inmanta"] except Exception as e: # carry over failures self.parent.failure = e -influxlineprotocol = re.compile(r"\w+(,\w+=\w+)* (\w+=[\d.e+-]*)(,\w+=[\d.e+-]*)* \d+") +influxlineprotocol = re.compile(r"\w+(,\w+=[^, ]+)* (\w+=[\d.e+-]*)(,\w+=[\d.e+-]*)* \d+") class WriteMockHandler(tornado.web.RequestHandler): @@ -102,7 +104,7 @@ def influxdb(event_loop, free_socket): async def test_influxdb(influxdb): - rep = InfluxReporter(port=influxdb.port, tags={"mark": "X"}, autocreate_database=True) + rep = InfluxReporter(port=influxdb.port, tags={"mark": "X"}, autocreate_database=True, database="inmanta") with timer("test").time(): pass @@ -182,20 +184,36 @@ async def test_available_metrics(server): base_types = [base_type] types[key] = base_types - assert metrics["db.connected"]["value"] - assert "db.max_pool" in metrics - assert "db.open_connections" in metrics - assert "db.free_connections" in metrics + assert metrics["db.connected,component=server"]["value"] + assert "db.max_pool,component=server" in metrics + assert "db.open_connections,component=server" in metrics + assert "db.free_connections,component=server" in metrics assert "self.spec.cpu" in metrics # ensure it doesn't crash when the server is down await server.stop() metrics = global_registry().dump_metrics() - assert metrics["db.max_pool"]["value"] == 0 - assert not metrics["db.connected"]["value"] - async def test_safeness_if_server_down(): # ensure it works is there is no server global_registry().dump_metrics() + + +@pytest.fixture +async def server_config(server_config, influxdb): + influxdb_host.set("127.0.0.1") + influxdb_port.set(str(influxdb.port)) + influxdb_interval.set(str(1)) + + +@pytest.mark.parametrize("auto_start_agent", [True]) +async def test_influxdb_server_and_scheduler(server, influxdb, auto_start_agent, environment): + """Test that we receive influxdb information from the scheduler""" + + # Using this function signature makes test easier to debug, but doesn't test the intialization + # async def test_influxdb_server_and_scheduler(server, agent, influxdb): + def has_scheduler_metrics(): + return any("metrics,key=db.pool_exhaustion_count,component=scheduler,environment=" in line for line in influxdb.lines) + + await retry_limited(has_scheduler_metrics, 10) From 06cb13bef6e1fef00b071786a61ff3ee39194382 Mon Sep 17 00:00:00 2001 From: wouter Date: Wed, 30 Oct 2024 14:38:59 +0100 Subject: [PATCH 06/11] added changelog --- changelogs/unreleased/8228-database_pool_report.yml | 6 ++++++ 1 file changed, 6 insertions(+) create mode 100644 changelogs/unreleased/8228-database_pool_report.yml diff --git a/changelogs/unreleased/8228-database_pool_report.yml b/changelogs/unreleased/8228-database_pool_report.yml new file mode 100644 index 0000000000..19a6af8bbf --- /dev/null +++ b/changelogs/unreleased/8228-database_pool_report.yml @@ -0,0 +1,6 @@ +description: Update database pool status reporting for the new scheduler +issue-nr: 8228 +change-type: minor +destination-branches: [master] +sections: + minor-improvement: "{{description}}" From 188e67f54443918685e8a302563bf6ae38085983 Mon Sep 17 00:00:00 2001 From: wouter Date: Wed, 30 Oct 2024 15:08:14 +0100 Subject: [PATCH 07/11] cleanup --- src/inmanta/data/model.py | 2 +- src/inmanta/server/agentmanager.py | 10 ++++++ .../server/services/databaseservice.py | 35 ++----------------- 3 files changed, 14 insertions(+), 33 deletions(-) diff --git a/src/inmanta/data/model.py b/src/inmanta/data/model.py index e27f3db690..f94ad646ef 100644 --- a/src/inmanta/data/model.py +++ b/src/inmanta/data/model.py @@ -23,7 +23,7 @@ from collections import abc from collections.abc import Sequence from enum import Enum, StrEnum -from typing import ClassVar, NewType, Optional, Self, Union, Mapping +from typing import ClassVar, Mapping, NewType, Optional, Self, Union import pydantic import pydantic.schema diff --git a/src/inmanta/server/agentmanager.py b/src/inmanta/server/agentmanager.py index 9ccfbb8055..f57a07d4d6 100644 --- a/src/inmanta/server/agentmanager.py +++ b/src/inmanta/server/agentmanager.py @@ -576,6 +576,10 @@ async def _expire_all_sessions_in_db(self) -> None: await data.AgentInstance.expire_all(now=datetime.now().astimezone(), connection=connection) await data.Agent.mark_all_as_non_primary(connection=connection) + async def _purge_agent_processes(self) -> None: + agent_processes_to_keep = opt.agent_processes_to_keep.get() + await data.AgentProcess.cleanup(nr_expired_records_to_keep=agent_processes_to_keep) + # Util async def _use_new_active_session_for_agent(self, tid: uuid.UUID, endpoint_name: str) -> Optional[protocol.Session]: """ @@ -1023,6 +1027,12 @@ async def prestart(self, server: protocol.Server) -> None: async def start(self) -> None: await super().start() self.add_background_task(self._start_agents()) + # Schedule cleanup agentprocess and agentinstance tables + agent_process_purge_interval = opt.agent_process_purge_interval.get() + if agent_process_purge_interval > 0: + self.schedule( + self._purge_agent_processes, interval=agent_process_purge_interval, initial_delay=0, cancel_on_stop=False + ) async def prestop(self) -> None: await super().prestop() diff --git a/src/inmanta/server/services/databaseservice.py b/src/inmanta/server/services/databaseservice.py index eb8ed8e2fa..f2c7c5e796 100644 --- a/src/inmanta/server/services/databaseservice.py +++ b/src/inmanta/server/services/databaseservice.py @@ -17,8 +17,7 @@ """ import logging -import typing -from typing import Mapping, Optional, cast +from typing import Mapping, Optional import asyncpg from pyformance import gauge, global_registry @@ -26,7 +25,7 @@ from inmanta import data, util from inmanta.data.model import DataBaseReport -from inmanta.server import SLICE_AGENT_MANAGER, SLICE_DATABASE +from inmanta.server import SLICE_DATABASE from inmanta.server import config as opt from inmanta.server import protocol from inmanta.types import ArgumentTypes @@ -34,9 +33,6 @@ LOGGER = logging.getLogger(__name__) -if typing.TYPE_CHECKING: - from inmanta.server import agentmanager - class DatabaseMonitor: @@ -142,31 +138,11 @@ def __init__(self) -> None: super().__init__(SLICE_DATABASE) self._pool: Optional[asyncpg.pool.Pool] = None self._db_monitor: Optional[DatabaseMonitor] = None - self._server_handler: protocol.Server | None = None - self.agentmanager_service: "agentmanager.AgentManager" - - async def prestart(self, server: protocol.Server) -> None: - """ - Called by the RestServer host prior to start, can be used to collect references to other server slices - Dependencies are not up yet. - """ - self._server_handler = server - await super().prestart(server) async def start(self) -> None: await super().start() await self.connect_database() - # Schedule cleanup agentprocess and agentinstance tables - agent_process_purge_interval = opt.agent_process_purge_interval.get() - if agent_process_purge_interval > 0: - self.schedule( - self._purge_agent_processes, interval=agent_process_purge_interval, initial_delay=0, cancel_on_stop=False - ) - - assert self._server_handler is not None # Make mypy happy - self.agentmanager_service = cast("agentmanager.AgentManager", self._server_handler.get_slice(SLICE_AGENT_MANAGER)) - assert self._pool is not None # Make mypy happy self._db_monitor = DatabaseMonitor(self._pool, opt.db_name.get(), opt.db_host.get(), "server") self._db_monitor.start() @@ -207,14 +183,9 @@ async def disconnect_database(self) -> None: async def get_status(self) -> Mapping[str, ArgumentTypes]: """Get the status of the database connection""" - assert self._db_monitor is not None # make mypy happy + assert self._db_monitor is not None # make mypy happy return (await self._db_monitor.get_status()).dict() - async def _purge_agent_processes(self) -> None: - # Move to agent manager - agent_processes_to_keep = opt.agent_processes_to_keep.get() - await data.AgentProcess.cleanup(nr_expired_records_to_keep=agent_processes_to_keep) - async def initialize_database_connection_pool( database_host: str, From 02095e005d41315773646b9457631aa75ced2c03 Mon Sep 17 00:00:00 2001 From: wouter Date: Wed, 30 Oct 2024 15:12:10 +0100 Subject: [PATCH 08/11] added comments --- src/inmanta/server/agentmanager.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/inmanta/server/agentmanager.py b/src/inmanta/server/agentmanager.py index f57a07d4d6..ca65c9ff11 100644 --- a/src/inmanta/server/agentmanager.py +++ b/src/inmanta/server/agentmanager.py @@ -187,9 +187,11 @@ async def get_report(env: uuid.UUID, session: protocol.Session) -> tuple[uuid.UU # Mypy can't help here, .... return (env, DataBaseReport(**result.result["data"])) + # Get env name mapping in parallel with next call env_mapping = asyncio.create_task( asyncio.wait_for(Environment.get_list(details=False, is_marked_for_deletion=False), deadline) ) + # Get the reports of all schedulers results = await asyncio.gather(*[get_report(env, scheduler) for env, scheduler in schedulers], return_exceptions=True) try: # This can timeout, but not likely @@ -221,6 +223,7 @@ def short_report(report: DataBaseReport) -> Mapping[str, ArgumentTypes]: logging.debug("Failed to collect database status for scheduler", exc_info=True) else: the_uuid = result[0] + # Default name to uuid name = uuid_to_name.get(the_uuid, str(the_uuid)) out[name] = short_report(result[1]) @@ -237,7 +240,6 @@ async def prestart(self, server: protocol.Server) -> None: autostarted_agent_manager = server.get_slice(SLICE_AUTOSTARTED_AGENT_MANAGER) assert isinstance(autostarted_agent_manager, AutostartedAgentManager) self._autostarted_agent_manager = autostarted_agent_manager - presession = server.get_slice(SLICE_SESSION_MANAGER) assert isinstance(presession, SessionManager) presession.add_listener(self) From cc1afe38cf09b9a81d0dfc27d32e847104c4a5e8 Mon Sep 17 00:00:00 2001 From: Wouter De Borger Date: Thu, 31 Oct 2024 08:42:46 +0100 Subject: [PATCH 09/11] Update tests/test_influxdbreporting.py Co-authored-by: Hugo-Inmanta <98876549+Hugo-Inmanta@users.noreply.github.com> --- tests/test_influxdbreporting.py | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/test_influxdbreporting.py b/tests/test_influxdbreporting.py index 7ab90be255..6f5cbd984f 100644 --- a/tests/test_influxdbreporting.py +++ b/tests/test_influxdbreporting.py @@ -192,7 +192,6 @@ async def test_available_metrics(server): # ensure it doesn't crash when the server is down await server.stop() - metrics = global_registry().dump_metrics() async def test_safeness_if_server_down(): From 2f38e1a5bc0f2a3035e60fa1c76252e5413b9760 Mon Sep 17 00:00:00 2001 From: wouter Date: Thu, 31 Oct 2024 09:42:07 +0100 Subject: [PATCH 10/11] Fixes --- src/inmanta/data/model.py | 3 +- src/inmanta/server/agentmanager.py | 19 ++++---- .../server/services/databaseservice.py | 48 +++++++++++-------- 3 files changed, 38 insertions(+), 32 deletions(-) diff --git a/src/inmanta/data/model.py b/src/inmanta/data/model.py index f94ad646ef..76422d45a8 100644 --- a/src/inmanta/data/model.py +++ b/src/inmanta/data/model.py @@ -862,7 +862,6 @@ def has_source(self) -> bool: class DataBaseReport(BaseModel): - connected: bool database: str host: str @@ -879,7 +878,7 @@ def __add__(self, other: "DataBaseReport") -> "DataBaseReport": if other.host != self.host: return NotImplemented return DataBaseReport( - connected=self.connected, + connected=self.connected and other.connected, database=self.database, host=self.host, max_pool=self.max_pool + other.max_pool, diff --git a/src/inmanta/server/agentmanager.py b/src/inmanta/server/agentmanager.py index ca65c9ff11..d69e6eb096 100644 --- a/src/inmanta/server/agentmanager.py +++ b/src/inmanta/server/agentmanager.py @@ -201,8 +201,8 @@ async def get_report(env: uuid.UUID, session: protocol.Session) -> tuple[uuid.UU # default to uuid's uuid_to_name = {} - # Filter out timeouts - total: DataBaseReport = reduce(lambda x, y: x + y, (x[1] for x in results if not isinstance(x, BaseException))) + # Filter out timeouts and other errors + good_reports = [x[1] for x in results if not isinstance(x, BaseException)] def short_report(report: DataBaseReport) -> Mapping[str, ArgumentTypes]: return { @@ -213,7 +213,8 @@ def short_report(report: DataBaseReport) -> Mapping[str, ArgumentTypes]: "pool_exhaustion_count": report.pool_exhaustion_count, } - if total: + if good_reports: + total: DataBaseReport = reduce(lambda x, y: x + y, good_reports) out["database"] = total.database out["host"] = total.host out["total"] = short_report(total) @@ -251,6 +252,12 @@ async def start(self) -> None: await self._expire_all_sessions_in_db() self.add_background_task(self._process_session_listener_actions()) + # Schedule cleanup agentprocess and agentinstance tables + agent_process_purge_interval = opt.agent_process_purge_interval.get() + if agent_process_purge_interval > 0: + self.schedule( + self._purge_agent_processes, interval=agent_process_purge_interval, initial_delay=0, cancel_on_stop=False + ) async def prestop(self) -> None: await super().prestop() @@ -1029,12 +1036,6 @@ async def prestart(self, server: protocol.Server) -> None: async def start(self) -> None: await super().start() self.add_background_task(self._start_agents()) - # Schedule cleanup agentprocess and agentinstance tables - agent_process_purge_interval = opt.agent_process_purge_interval.get() - if agent_process_purge_interval > 0: - self.schedule( - self._purge_agent_processes, interval=agent_process_purge_interval, initial_delay=0, cancel_on_stop=False - ) async def prestop(self) -> None: await super().prestop() diff --git a/src/inmanta/server/services/databaseservice.py b/src/inmanta/server/services/databaseservice.py index f2c7c5e796..42997d24e3 100644 --- a/src/inmanta/server/services/databaseservice.py +++ b/src/inmanta/server/services/databaseservice.py @@ -47,6 +47,13 @@ def __init__( self.component = component self.environment = environment + # Influxdb part + self.infuxdb_suffix = f",component={self.component}" + if self.environment: + self.infuxdb_suffix += f",environment={self.environment}" + + self.registered_guages: list[str] = [] + def start(self) -> None: self.start_monitor() @@ -74,44 +81,43 @@ async def get_status(self) -> DataBaseReport: pool_exhaustion_count=self._db_pool_watcher._exhausted_pool_events_count, ) + def _add_guage(self, name: str, the_gauge: CallbackGauge) -> None: + """Helper to register gauges and keep track of registrations""" + name = name + self.infuxdb_suffix + gauge(name, the_gauge) + self.registered_guages.append(name) + def start_monitor(self) -> None: """Attach to monitoring system""" - suffix = f",component={self.component}" - if self.environment: - suffix += f",environment={self.environment}" - gauge( - "db.connected" + suffix, + self._add_guage( + "db.connected", CallbackGauge( callback=lambda: 1 if (self._pool is not None and not self._pool._closing and not self._pool._closed) else 0 ), ) - gauge( - "db.max_pool" + suffix, CallbackGauge(callback=lambda: self._pool.get_max_size() if self._pool is not None else 0) + self._add_guage( + "db.max_pool", CallbackGauge(callback=lambda: self._pool.get_max_size() if self._pool is not None else 0) ) - gauge( - "db.open_connections" + suffix, + self._add_guage( + "db.open_connections", CallbackGauge(callback=lambda: self._pool.get_size() if self._pool is not None else 0), ) - gauge( - "db.free_connections" + suffix, + self._add_guage( + "db.free_connections", CallbackGauge(callback=lambda: self._pool.get_idle_size() if self._pool is not None else 0), ) - gauge( - "db.pool_exhaustion_count" + suffix, + self._add_guage( + "db.pool_exhaustion_count", CallbackGauge(callback=lambda: self._db_pool_watcher._exhausted_pool_events_count), ) def stop_monitor(self) -> None: - suffix = f",component={self.component}" - if self.environment: - suffix += f",environment={self.environment}" + """Disconnect form pyformance""" + for key in self.registered_guages: + global_registry()._gauges.pop(key, None) - global_registry()._gauges.pop("db.connected" + suffix, None) - global_registry()._gauges.pop("db.max_pool" + suffix, None) - global_registry()._gauges.pop("db.open_connections" + suffix, None) - global_registry()._gauges.pop("db.free_connections" + suffix, None) - global_registry()._gauges.pop("db.pool_exhaustion_count" + suffix, None) + self.registered_guages.clear() async def get_connection_status(self) -> bool: if self._pool is not None and not self._pool._closing and not self._pool._closed: From 9d72fd319d2eca630f7c3e0362533f8b1db03611 Mon Sep 17 00:00:00 2001 From: wouter Date: Thu, 31 Oct 2024 10:15:00 +0100 Subject: [PATCH 11/11] Fix typo --- .../server/services/databaseservice.py | 20 +++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/src/inmanta/server/services/databaseservice.py b/src/inmanta/server/services/databaseservice.py index 42997d24e3..705744e35b 100644 --- a/src/inmanta/server/services/databaseservice.py +++ b/src/inmanta/server/services/databaseservice.py @@ -52,7 +52,7 @@ def __init__( if self.environment: self.infuxdb_suffix += f",environment={self.environment}" - self.registered_guages: list[str] = [] + self.registered_gauges: list[str] = [] def start(self) -> None: self.start_monitor() @@ -81,43 +81,43 @@ async def get_status(self) -> DataBaseReport: pool_exhaustion_count=self._db_pool_watcher._exhausted_pool_events_count, ) - def _add_guage(self, name: str, the_gauge: CallbackGauge) -> None: + def _add_gauge(self, name: str, the_gauge: CallbackGauge) -> None: """Helper to register gauges and keep track of registrations""" name = name + self.infuxdb_suffix gauge(name, the_gauge) - self.registered_guages.append(name) + self.registered_gauges.append(name) def start_monitor(self) -> None: """Attach to monitoring system""" - self._add_guage( + self._add_gauge( "db.connected", CallbackGauge( callback=lambda: 1 if (self._pool is not None and not self._pool._closing and not self._pool._closed) else 0 ), ) - self._add_guage( + self._add_gauge( "db.max_pool", CallbackGauge(callback=lambda: self._pool.get_max_size() if self._pool is not None else 0) ) - self._add_guage( + self._add_gauge( "db.open_connections", CallbackGauge(callback=lambda: self._pool.get_size() if self._pool is not None else 0), ) - self._add_guage( + self._add_gauge( "db.free_connections", CallbackGauge(callback=lambda: self._pool.get_idle_size() if self._pool is not None else 0), ) - self._add_guage( + self._add_gauge( "db.pool_exhaustion_count", CallbackGauge(callback=lambda: self._db_pool_watcher._exhausted_pool_events_count), ) def stop_monitor(self) -> None: """Disconnect form pyformance""" - for key in self.registered_guages: + for key in self.registered_gauges: global_registry()._gauges.pop(key, None) - self.registered_guages.clear() + self.registered_gauges.clear() async def get_connection_status(self) -> bool: if self._pool is not None and not self._pool._closing and not self._pool._closed: