diff --git a/changelogs/unreleased/8228-database_pool_report.yml b/changelogs/unreleased/8228-database_pool_report.yml new file mode 100644 index 0000000000..19a6af8bbf --- /dev/null +++ b/changelogs/unreleased/8228-database_pool_report.yml @@ -0,0 +1,6 @@ +description: Update database pool status reporting for the new scheduler +issue-nr: 8228 +change-type: minor +destination-branches: [master] +sections: + minor-improvement: "{{description}}" diff --git a/mypy-baseline.txt b/mypy-baseline.txt index 5c5632c6ca..1f3d59c2b6 100644 --- a/mypy-baseline.txt +++ b/mypy-baseline.txt @@ -101,7 +101,7 @@ src/inmanta/ast/type.py:0: error: No overload variant of "reversed" matches argu src/inmanta/ast/type.py:0: note: Possible overload variants: src/inmanta/ast/type.py:0: note: def [_T] __new__(cls, Reversible[_T], /) -> reversed[_T] src/inmanta/ast/type.py:0: note: def [_T] __new__(cls, SupportsLenAndGetItem[_T], /) -> reversed[_T] -src/inmanta/server/services/databaseservice.py:0: error: Incompatible return value type (got "dict[str, object]", expected "dict[str, BaseModel | UUID | bool | int | float | datetime | str | Sequence[BaseModel | UUID | bool | int | float | datetime | str] | Mapping[str, BaseModel | UUID | bool | int | float | datetime | str]]") [return-value] +src/inmanta/server/services/metricservice.py:0: error: Incompatible types in assignment (expression has type "float", variable has type "int") [assignment] src/inmanta/protocol/decorators.py:0: error: "FuncT" has no attribute "__protocol_method__" [attr-defined] src/inmanta/protocol/decorators.py:0: error: "FuncT" has no attribute "__protocol_mapping__" [attr-defined] src/inmanta/protocol/decorators.py:0: error: "FuncT" has no attribute "__api_version__" [attr-defined] @@ -146,10 +146,6 @@ src/inmanta/execute/runtime.py:0: error: Argument 2 to "assign" of "AssignableNo src/inmanta/execute/runtime.py:0: error: Argument 1 to "AttributeException" has incompatible type "Instance"; expected "Locatable" [arg-type] src/inmanta/execute/runtime.py:0: error: Missing type parameters for generic type "ResultVariable" [type-arg] src/inmanta/execute/runtime.py:0: error: Argument 1 to "RuntimeException" has incompatible type "Instance"; expected "Locatable | None" [arg-type] -src/inmanta/data/model.py:0: error: Return type "str" of "__getstate__" incompatible with return type "dict[Any, Any]" in supertype "BaseModel" [override] -src/inmanta/data/model.py:0: error: Argument 1 of "__setstate__" is incompatible with supertype "BaseModel"; supertype defines the argument type as "dict[Any, Any]" [override] -src/inmanta/data/model.py:0: note: This violates the Liskov substitution principle -src/inmanta/data/model.py:0: note: See https://mypy.readthedocs.io/en/stable/common_issues.html#incompatible-overrides src/inmanta/data/model.py:0: error: Function is missing a type annotation [no-untyped-def] src/inmanta/data/model.py:0: error: Unused "type: ignore" comment, use narrower [prop-decorator] instead of [misc] code [unused-ignore] src/inmanta/ast/attribute.py:0: error: Incompatible types in assignment (expression has type "property", variable has type "Type") [assignment] @@ -555,8 +551,11 @@ src/inmanta/data/__init__.py:0: error: Incompatible return value type (got "Reso src/inmanta/data/__init__.py:0: error: Argument 1 to "add_logs" of "ResourceAction" has incompatible type "list[dict[str, object]]"; expected "str | None" [arg-type] src/inmanta/data/__init__.py:0: error: "object" has no attribute "__iter__"; maybe "__dir__" or "__str__"? (not iterable) [attr-defined] src/inmanta/data/__init__.py:0: error: Argument "changes" to "ResourceAction" has incompatible type "dict[ResourceVersionIdStr, dict[str, object]] | None"; expected "dict[str, Any] | None" [arg-type] +src/inmanta/data/__init__.py:0: error: Function is missing a return type annotation [no-untyped-def] src/inmanta/data/__init__.py:0: error: Missing type parameters for generic type "dict" [type-arg] src/inmanta/data/__init__.py:0: error: Unsupported left operand type for + ("object") [operator] +src/inmanta/data/__init__.py:0: error: Function is missing a type annotation [no-untyped-def] +src/inmanta/data/__init__.py:0: error: Call to untyped function "convert_or_ignore" in typed context [no-untyped-call] src/inmanta/data/__init__.py:0: error: Incompatible return value type (got "Sequence[Resource]", expected "list[Resource]") [return-value] src/inmanta/data/__init__.py:0: error: Argument 1 to "ResourceIdStr" has incompatible type "object"; expected "str" [arg-type] src/inmanta/data/__init__.py:0: error: Enum index should be a string (actual index type "object") [misc] @@ -1102,6 +1101,7 @@ src/inmanta/server/services/orchestrationservice.py:0: error: Incompatible types src/inmanta/server/services/orchestrationservice.py:0: error: Incompatible types in assignment (expression has type "set[str | None]", variable has type "set[str]") [assignment] src/inmanta/server/services/orchestrationservice.py:0: error: Argument "agents" to "_trigger_auto_deploy" of "OrchestrationService" has incompatible type "Collection[str]"; expected "Sequence[str] | None" [arg-type] src/inmanta/server/services/orchestrationservice.py:0: error: Incompatible return value type (got "ReturnValueWithMeta[Sequence[DesiredStateVersion]]", expected "ReturnValue[list[DesiredStateVersion]]") [return-value] +src/inmanta/server/agentmanager.py:0: error: Argument 1 to "ensure_agent_registered" of "AgentManager" has incompatible type "Environment | None"; expected "Environment" [arg-type] src/inmanta/agent/forking_executor.py:0: error: Argument 1 of "connection_made" is incompatible with supertype "BaseProtocol"; supertype defines the argument type as "BaseTransport" [override] src/inmanta/agent/forking_executor.py:0: note: This violates the Liskov substitution principle src/inmanta/agent/forking_executor.py:0: note: See https://mypy.readthedocs.io/en/stable/common_issues.html#incompatible-overrides diff --git a/src/inmanta/agent/agent_new.py b/src/inmanta/agent/agent_new.py index f8f76b3ec7..95cc013d88 100644 --- a/src/inmanta/agent/agent_new.py +++ b/src/inmanta/agent/agent_new.py @@ -24,15 +24,17 @@ from concurrent.futures.thread import ThreadPoolExecutor from typing import Any, Callable, Coroutine, Optional, Union -from inmanta import config, const, protocol +import inmanta.server.config as opt +from inmanta import config, const, data, protocol from inmanta.agent import config as cfg from inmanta.agent import executor, forking_executor from inmanta.agent.reporting import collect_report from inmanta.const import AGENT_SCHEDULER_ID -from inmanta.data.model import AttributeStateChange, ResourceVersionIdStr +from inmanta.data.model import AttributeStateChange, DataBaseReport, ResourceVersionIdStr from inmanta.deploy import scheduler from inmanta.deploy.work import TaskPriority from inmanta.protocol import SessionEndpoint, methods, methods_v2 +from inmanta.server.services.databaseservice import DatabaseMonitor from inmanta.types import Apireturn from inmanta.util import ( CronSchedule, @@ -84,6 +86,20 @@ def __init__( self._set_deploy_and_repair_intervals() + async def start(self) -> None: + # Make mypy happy + assert data.Resource._connection_pool is not None + self._db_monitor = DatabaseMonitor( + data.Resource._connection_pool, + opt.db_name.get(), + opt.db_host.get(), + "scheduler", + str(self.environment), + ) + self._db_monitor.start() + + await super().start() + def _set_deploy_and_repair_intervals(self) -> None: """ Fetch the settings related to automatic deploys and repairs from the config @@ -341,6 +357,20 @@ async def get_facts(self, env: uuid.UUID, agent: str, resource: dict[str, Any]) async def get_status(self) -> Apireturn: return 200, collect_report(self) + @protocol.handle(methods_v2.get_db_status) + async def get_db_status(self) -> DataBaseReport: + if self._db_monitor is None: + return DataBaseReport( + connected=False, + database="", + host="", + max_pool=0, + open_connections=0, + free_connections=0, + pool_exhaustion_count=0, + ) + return await self._db_monitor.get_status() + def check_storage(self) -> dict[str, str]: """ Check if the server storage is configured and ready to use. Ultimately, this is diff --git a/src/inmanta/app.py b/src/inmanta/app.py index 71ba313e0b..6f8a996913 100644 --- a/src/inmanta/app.py +++ b/src/inmanta/app.py @@ -67,6 +67,7 @@ from inmanta.server import config as opt from inmanta.server.bootloader import InmantaBootloader from inmanta.server.services.databaseservice import initialize_database_connection_pool +from inmanta.server.services.metricservice import MetricsService from inmanta.signals import safe_shutdown, setup_signal_handlers from inmanta.util import get_compiler_version from inmanta.warnings import WarningsManager @@ -156,6 +157,9 @@ async def start() -> None: connection_pool_max_size=agent_config.scheduler_db_connection_pool_max_size.get(), connection_timeout=agent_config.scheduler_db_connection_timeout.get(), ) + # also report metrics if this is relevant + metrics_reporter = MetricsService() + metrics_reporter.start_metric_reporters() await a.start() LOGGER.info("Agent with Resource scheduler starting now") diff --git a/src/inmanta/data/model.py b/src/inmanta/data/model.py index 8cfe469018..dde92768ef 100644 --- a/src/inmanta/data/model.py +++ b/src/inmanta/data/model.py @@ -24,7 +24,7 @@ from collections import abc from collections.abc import Sequence from enum import Enum, StrEnum -from typing import ClassVar, NewType, Optional, Self, Union +from typing import ClassVar, Mapping, NewType, Optional, Self, Union import pydantic import pydantic.schema @@ -90,7 +90,7 @@ class SliceStatus(BaseModel): """ name: str - status: dict[str, ArgumentTypes] + status: Mapping[str, ArgumentTypes | Mapping[str, ArgumentTypes]] class FeatureStatus(BaseModel): @@ -870,3 +870,30 @@ def has_source(self) -> bool: LEGACY_PIP_DEFAULT = PipConfig(use_system_config=True) + + +class DataBaseReport(BaseModel): + connected: bool + database: str + host: str + max_pool: int + open_connections: int + free_connections: int + pool_exhaustion_count: int + + def __add__(self, other: "DataBaseReport") -> "DataBaseReport": + if not isinstance(other, DataBaseReport): + return NotImplemented + if other.database != self.database: + return NotImplemented + if other.host != self.host: + return NotImplemented + return DataBaseReport( + connected=self.connected and other.connected, + database=self.database, + host=self.host, + max_pool=self.max_pool + other.max_pool, + open_connections=self.open_connections + other.open_connections, + free_connections=self.free_connections + other.free_connections, + pool_exhaustion_count=self.pool_exhaustion_count + other.pool_exhaustion_count, + ) diff --git a/src/inmanta/protocol/methods_v2.py b/src/inmanta/protocol/methods_v2.py index 713e631338..8e90425ca8 100644 --- a/src/inmanta/protocol/methods_v2.py +++ b/src/inmanta/protocol/methods_v2.py @@ -24,7 +24,7 @@ from inmanta.const import AgentAction, ApiDocsFormat, Change, ClientType, ParameterSource, ResourceState from inmanta.data import model -from inmanta.data.model import LinkedDiscoveredResource, PipConfig, ResourceIdStr +from inmanta.data.model import DataBaseReport, LinkedDiscoveredResource, PipConfig, ResourceIdStr from inmanta.protocol import methods from inmanta.protocol.common import ReturnValue from inmanta.protocol.decorators import typedmethod @@ -496,6 +496,15 @@ def update_agent_map(agent_map: dict[str, str]) -> None: """ +@typedmethod( + path="/db_status", api=False, server_agent=True, enforce_auth=False, operation="POST", client_types=[], api_version=2 +) +def get_db_status() -> DataBaseReport: + """ + Get a report of the DB connection pool status + """ + + @typedmethod( path="/compiledata/", operation="GET", diff --git a/src/inmanta/reporter.py b/src/inmanta/reporter.py index 15bbab0bd1..ee470466f2 100644 --- a/src/inmanta/reporter.py +++ b/src/inmanta/reporter.py @@ -80,6 +80,8 @@ class InfluxReporter(AsyncReporter): """ InfluxDB reporter using native http api (based on https://influxdb.com/docs/v1.1/guides/writing_data.html) + + If metric name end with `,a=b` these will append as tags `a=b` """ def __init__( diff --git a/src/inmanta/server/__init__.py b/src/inmanta/server/__init__.py index 09335c22be..c9d0a0adb0 100644 --- a/src/inmanta/server/__init__.py +++ b/src/inmanta/server/__init__.py @@ -19,7 +19,7 @@ # flake8: noqa: F401 SLICE_SERVER = "core.server" -SLICE_AGENT_MANAGER = "core.agentmanager" +SLICE_AGENT_MANAGER = "core.scheduler_manager" SLICE_AUTOSTARTED_AGENT_MANAGER = "core.autostarted_agent_manager" SLICE_SESSION_MANAGER = "core.session" SLICE_DATABASE = "core.database" diff --git a/src/inmanta/server/agentmanager.py b/src/inmanta/server/agentmanager.py index 39b3e6a009..f27fb4ed8f 100644 --- a/src/inmanta/server/agentmanager.py +++ b/src/inmanta/server/agentmanager.py @@ -27,6 +27,7 @@ from collections.abc import Iterable, Iterator, Mapping, Sequence, Set from datetime import datetime from enum import Enum +from functools import reduce from typing import Any, Optional, Union, cast from uuid import UUID @@ -38,9 +39,9 @@ from inmanta import const, data, tracing from inmanta.agent import config as agent_cfg from inmanta.config import Config -from inmanta.const import UNDEPLOYABLE_NAMES, AgentAction, AgentStatus -from inmanta.data import APILIMIT, InvalidSort, model -from inmanta.data.model import ResourceIdStr +from inmanta.const import AGENT_SCHEDULER_ID, UNDEPLOYABLE_NAMES, AgentAction, AgentStatus +from inmanta.data import APILIMIT, Environment, InvalidSort, model +from inmanta.data.model import DataBaseReport, ResourceIdStr from inmanta.protocol import encode_token, handle, methods, methods_v2 from inmanta.protocol.common import ReturnValue from inmanta.protocol.exceptions import BadRequest, Forbidden, NotFound, ShutdownInProgress @@ -168,12 +169,67 @@ def __init__(self, closesessionsonstart: bool = True, fact_back_off: Optional[in self.closesessionsonstart: bool = closesessionsonstart - async def get_status(self) -> dict[str, ArgumentTypes]: - return { + async def get_status(self) -> Mapping[str, ArgumentTypes | Mapping[str, ArgumentTypes]]: + # The basic report + + out: dict[str, ArgumentTypes | Mapping[str, ArgumentTypes]] = { "resource_facts": len(self._fact_resource_block_set), "sessions": len(self.sessions), } + # Try to get more info from scheduler, but make sure not to timeout + schedulers = self.get_all_schedulers() + deadline = 0.9 * Server.GET_SERVER_STATUS_TIMEOUT + + async def get_report(env: uuid.UUID, session: protocol.Session) -> tuple[uuid.UUID, DataBaseReport]: + result = await asyncio.wait_for(session.client.get_db_status(), deadline) + assert result.code == 200 + # Mypy can't help here, .... + return (env, DataBaseReport(**result.result["data"])) + + # Get env name mapping in parallel with next call + env_mapping = asyncio.create_task( + asyncio.wait_for(Environment.get_list(details=False, is_marked_for_deletion=False), deadline) + ) + # Get the reports of all schedulers + results = await asyncio.gather(*[get_report(env, scheduler) for env, scheduler in schedulers], return_exceptions=True) + try: + # This can timeout, but not likely + env_mapping_result = await env_mapping + uuid_to_name = {env.id: env.name for env in env_mapping_result} + except TimeoutError: + # default to uuid's + uuid_to_name = {} + + # Filter out timeouts and other errors + good_reports = [x[1] for x in results if not isinstance(x, BaseException)] + + def short_report(report: DataBaseReport) -> Mapping[str, ArgumentTypes]: + return { + "connected": report.connected, + "max_pool": report.max_pool, + "open_connections": report.open_connections, + "free_connections": report.free_connections, + "pool_exhaustion_count": report.pool_exhaustion_count, + } + + if good_reports: + total: DataBaseReport = reduce(lambda x, y: x + y, good_reports) + out["database"] = total.database + out["host"] = total.host + out["total"] = short_report(total) + + for result in results: + if isinstance(result, BaseException): + logging.debug("Failed to collect database status for scheduler", exc_info=True) + else: + the_uuid = result[0] + # Default name to uuid + name = uuid_to_name.get(the_uuid, str(the_uuid)) + out[name] = short_report(result[1]) + + return out + def get_dependencies(self) -> list[str]: return [SLICE_DATABASE, SLICE_SESSION_MANAGER] @@ -185,7 +241,6 @@ async def prestart(self, server: protocol.Server) -> None: autostarted_agent_manager = server.get_slice(SLICE_AUTOSTARTED_AGENT_MANAGER) assert isinstance(autostarted_agent_manager, AutostartedAgentManager) self._autostarted_agent_manager = autostarted_agent_manager - presession = server.get_slice(SLICE_SESSION_MANAGER) assert isinstance(presession, SessionManager) presession.add_listener(self) @@ -197,6 +252,12 @@ async def start(self) -> None: await self._expire_all_sessions_in_db() self.add_background_task(self._process_session_listener_actions()) + # Schedule cleanup agentprocess and agentinstance tables + agent_process_purge_interval = opt.agent_process_purge_interval.get() + if agent_process_purge_interval > 0: + self.schedule( + self._purge_agent_processes, interval=agent_process_purge_interval, initial_delay=0, cancel_on_stop=False + ) async def prestop(self) -> None: await super().prestop() @@ -204,6 +265,14 @@ async def prestop(self) -> None: async def stop(self) -> None: await super().stop() + def get_all_schedulers(self) -> list[tuple[uuid.UUID, protocol.Session]]: + # Linear scan, but every item should be a hit + return [ + (env_id, session) + for (env_id, agent_id), session in self.tid_endpoint_to_session.items() + if agent_id == AGENT_SCHEDULER_ID + ] + async def halt_agents(self, env: data.Environment, connection: Optional[asyncpg.connection.Connection] = None) -> None: """ Halts all agents for an environment. Persists prior paused state. @@ -513,6 +582,10 @@ async def _expire_all_sessions_in_db(self) -> None: await data.AgentInstance.expire_all(now=datetime.now().astimezone(), connection=connection) await data.Agent.mark_all_as_non_primary(connection=connection) + async def _purge_agent_processes(self) -> None: + agent_processes_to_keep = opt.agent_processes_to_keep.get() + await data.AgentProcess.cleanup(nr_expired_records_to_keep=agent_processes_to_keep) + # Util async def _use_new_active_session_for_agent(self, tid: uuid.UUID, endpoint_name: str) -> Optional[protocol.Session]: """ @@ -940,7 +1013,7 @@ def __init__(self) -> None: self._agent_procs: dict[UUID, subprocess.Process] = {} # env uuid -> subprocess.Process self.agent_lock = asyncio.Lock() # Prevent concurrent updates on _agent_procs - async def get_status(self) -> dict[str, ArgumentTypes]: + async def get_status(self) -> Mapping[str, ArgumentTypes]: return {"processes": len(self._agent_procs)} async def prestart(self, server: protocol.Server) -> None: @@ -1282,12 +1355,21 @@ async def _make_agent_config( name={opt.db_name.get()} username={opt.db_username.get()} password={opt.db_password.get()} + [scheduler] db-connection-pool-min-size={agent_cfg.scheduler_db_connection_pool_min_size.get()} db-connection-pool-max-size={agent_cfg.scheduler_db_connection_pool_max_size.get()} db-connection-timeout={agent_cfg.scheduler_db_connection_timeout.get()} - """ +[influxdb] +host = {opt.influxdb_host.get()} +port = {opt.influxdb_port.get()} +name = {opt.influxdb_name.get()} +username = {opt.influxdb_username.get()} +password = {opt.influxdb_password.get()} +interval = {opt.influxdb_interval.get()} +tags = {opt.influxdb_tags.get()} +""" return config async def _fork_inmanta( diff --git a/src/inmanta/server/protocol.py b/src/inmanta/server/protocol.py index 9ad4968abd..675d6dc0ed 100644 --- a/src/inmanta/server/protocol.py +++ b/src/inmanta/server/protocol.py @@ -26,7 +26,7 @@ from collections import defaultdict from collections.abc import Sequence from datetime import timedelta -from typing import TYPE_CHECKING, Callable, Optional, Union +from typing import TYPE_CHECKING, Callable, Mapping, Optional, Union from tornado import gen, queues, routing, web @@ -39,16 +39,7 @@ from inmanta.server import SLICE_SESSION_MANAGER, SLICE_TRANSPORT from inmanta.server import config as opt from inmanta.types import ArgumentTypes, JsonType -from inmanta.util import ( - CronSchedule, - CycleException, - IntervalSchedule, - ScheduledTask, - Scheduler, - TaskHandler, - TaskMethod, - stable_depth_first, -) +from inmanta.util import CronSchedule, CycleException, ScheduledTask, Scheduler, TaskHandler, TaskMethod, stable_depth_first if TYPE_CHECKING: from inmanta.server.extensions import Feature, FeatureManager @@ -327,7 +318,7 @@ def schedule( :quiet_mode: Set to true to disable logging the recurring notification that the action is being called. Use this to avoid polluting the server log for very frequent actions. """ - self._sched.add_action(call, IntervalSchedule(interval, initial_delay), cancel_on_stop, quiet_mode) + self._sched.schedule(call, interval, initial_delay, cancel_on_stop, quiet_mode) def schedule_cron(self, call: TaskMethod, cron: str, cancel_on_stop: bool = True) -> None: """ @@ -399,7 +390,7 @@ def get_extension_statuses(cls, slices: list["ServerSlice"]) -> list[ExtensionSt result[ext_status.name] = ext_status return list(result.values()) - async def get_status(self) -> dict[str, ArgumentTypes]: + async def get_status(self) -> Mapping[str, ArgumentTypes | Mapping[str, ArgumentTypes]]: """ Get the status of this slice. """ @@ -620,7 +611,7 @@ async def stop(self) -> None: await super().stop() await self.server._transport.join() - async def get_status(self) -> dict[str, ArgumentTypes]: + async def get_status(self) -> Mapping[str, ArgumentTypes]: def format_socket(sock: socket.socket) -> str: sname = sock.getsockname() return f"{sname[0]}:{sname[1]}" @@ -667,7 +658,7 @@ def __init__(self) -> None: # Listeners self.listeners: list[SessionListener] = [] - async def get_status(self) -> dict[str, ArgumentTypes]: + async def get_status(self) -> Mapping[str, ArgumentTypes]: return {"hangtime": self.hangtime, "interval": self.interval, "sessions": len(self._sessions)} def add_listener(self, listener: SessionListener) -> None: diff --git a/src/inmanta/server/services/compilerservice.py b/src/inmanta/server/services/compilerservice.py index 7720ad01c2..49badc55ae 100644 --- a/src/inmanta/server/services/compilerservice.py +++ b/src/inmanta/server/services/compilerservice.py @@ -540,7 +540,7 @@ def __init__(self) -> None: self._queue_count_cache: int = 0 self._queue_count_cache_lock = asyncio.locks.Lock() - async def get_status(self) -> dict[str, ArgumentTypes]: + async def get_status(self) -> Mapping[str, ArgumentTypes]: return {"task_queue": self._queue_count_cache, "listeners": len(self.async_listeners) + len(self.blocking_listeners)} def add_listener(self, listener: CompileStateListener) -> None: diff --git a/src/inmanta/server/services/databaseservice.py b/src/inmanta/server/services/databaseservice.py index 26921612db..705744e35b 100644 --- a/src/inmanta/server/services/databaseservice.py +++ b/src/inmanta/server/services/databaseservice.py @@ -17,51 +17,146 @@ """ import logging -from typing import Optional +from typing import Mapping, Optional import asyncpg -from pyformance import gauge +from pyformance import gauge, global_registry from pyformance.meters import CallbackGauge from inmanta import data, util +from inmanta.data.model import DataBaseReport from inmanta.server import SLICE_DATABASE from inmanta.server import config as opt from inmanta.server import protocol from inmanta.types import ArgumentTypes +from inmanta.util import Scheduler LOGGER = logging.getLogger(__name__) +class DatabaseMonitor: + + def __init__( + self, pool: asyncpg.pool.Pool, db_name: str, db_host: str, component: str, environment: str | None = None + ) -> None: + self._pool = pool + self._scheduler = Scheduler(f"Database monitor for {db_name}") + self._db_pool_watcher = util.ExhaustedPoolWatcher(self._pool) + self.dn_name = db_name + self.db_host = db_host + self.component = component + self.environment = environment + + # Influxdb part + self.infuxdb_suffix = f",component={self.component}" + if self.environment: + self.infuxdb_suffix += f",environment={self.environment}" + + self.registered_gauges: list[str] = [] + + def start(self) -> None: + self.start_monitor() + + # Schedule database pool exhaustion watch: + # Check for pool exhaustion every 200 ms + self._scheduler.schedule(self._check_database_pool_exhaustion, interval=0.2, cancel_on_stop=True, quiet_mode=True) + # Report pool exhaustion every 24h + self._scheduler.schedule(self._report_database_pool_exhaustion, interval=3_600 * 24, cancel_on_stop=True) + + async def stop(self) -> None: + self.stop_monitor() + await self._scheduler.stop() + + async def get_status(self) -> DataBaseReport: + """Get the status of the database connection""" + connected = await self.get_connection_status() + + return DataBaseReport( + connected=connected, + database=self.dn_name, + host=self.db_host, + max_pool=self._pool.get_max_size(), + open_connections=self._pool.get_size(), + free_connections=self._pool.get_idle_size(), + pool_exhaustion_count=self._db_pool_watcher._exhausted_pool_events_count, + ) + + def _add_gauge(self, name: str, the_gauge: CallbackGauge) -> None: + """Helper to register gauges and keep track of registrations""" + name = name + self.infuxdb_suffix + gauge(name, the_gauge) + self.registered_gauges.append(name) + + def start_monitor(self) -> None: + """Attach to monitoring system""" + + self._add_gauge( + "db.connected", + CallbackGauge( + callback=lambda: 1 if (self._pool is not None and not self._pool._closing and not self._pool._closed) else 0 + ), + ) + self._add_gauge( + "db.max_pool", CallbackGauge(callback=lambda: self._pool.get_max_size() if self._pool is not None else 0) + ) + self._add_gauge( + "db.open_connections", + CallbackGauge(callback=lambda: self._pool.get_size() if self._pool is not None else 0), + ) + self._add_gauge( + "db.free_connections", + CallbackGauge(callback=lambda: self._pool.get_idle_size() if self._pool is not None else 0), + ) + self._add_gauge( + "db.pool_exhaustion_count", + CallbackGauge(callback=lambda: self._db_pool_watcher._exhausted_pool_events_count), + ) + + def stop_monitor(self) -> None: + """Disconnect form pyformance""" + for key in self.registered_gauges: + global_registry()._gauges.pop(key, None) + + self.registered_gauges.clear() + + async def get_connection_status(self) -> bool: + if self._pool is not None and not self._pool._closing and not self._pool._closed: + try: + async with self._pool.acquire(timeout=10): + return True + except Exception: + LOGGER.exception("Connection to PostgreSQL failed") + return False + + async def _report_database_pool_exhaustion(self) -> None: + assert self._db_pool_watcher is not None # Make mypy happy + self._db_pool_watcher.report_and_reset(LOGGER) + + async def _check_database_pool_exhaustion(self) -> None: + assert self._db_pool_watcher is not None # Make mypy happy + self._db_pool_watcher.check_for_pool_exhaustion() + + class DatabaseService(protocol.ServerSlice): """Slice to initialize the database""" def __init__(self) -> None: super().__init__(SLICE_DATABASE) self._pool: Optional[asyncpg.pool.Pool] = None - self._db_pool_watcher: Optional[util.ExhaustedPoolWatcher] = None + self._db_monitor: Optional[DatabaseMonitor] = None async def start(self) -> None: await super().start() - self.start_monitor() await self.connect_database() - # Schedule cleanup agentprocess and agentinstance tables - agent_process_purge_interval = opt.agent_process_purge_interval.get() - if agent_process_purge_interval > 0: - self.schedule( - self._purge_agent_processes, interval=agent_process_purge_interval, initial_delay=0, cancel_on_stop=False - ) - assert self._pool is not None # Make mypy happy - self._db_pool_watcher = util.ExhaustedPoolWatcher(self._pool) - # Schedule database pool exhaustion watch: - # Check for pool exhaustion every 200 ms - self.schedule(self._check_database_pool_exhaustion, interval=0.2, cancel_on_stop=True, quiet_mode=True) - # Report pool exhaustion every 24h - self.schedule(self._report_database_pool_exhaustion, interval=3_600 * 24, cancel_on_stop=True) + self._db_monitor = DatabaseMonitor(self._pool, opt.db_name.get(), opt.db_host.get(), "server") + self._db_monitor.start() async def stop(self) -> None: await super().stop() + if self._db_monitor is not None: + await self._db_monitor.stop() await self.disconnect_database() self._pool = None @@ -92,55 +187,10 @@ async def disconnect_database(self) -> None: """Disconnect the database""" await data.disconnect() - async def get_status(self) -> dict[str, ArgumentTypes]: + async def get_status(self) -> Mapping[str, ArgumentTypes]: """Get the status of the database connection""" - connected = await self.get_connection_status() - status = { - "connected": connected, - "database": opt.db_name.get(), - "host": opt.db_host.get(), - } - if self._pool is not None: - status["max_pool"] = self._pool.get_max_size() - status["open_connections"] = self._pool.get_size() - status["free_connections"] = self._pool.get_idle_size() - - return status - - def start_monitor(self) -> None: - """Attach to monitoring system""" - gauge( - "db.connected", - CallbackGauge( - callback=lambda: 1 if (self._pool is not None and not self._pool._closing and not self._pool._closed) else 0 - ), - ) - gauge("db.max_pool", CallbackGauge(callback=lambda: self._pool.get_max_size() if self._pool is not None else 0)) - gauge("db.open_connections", CallbackGauge(callback=lambda: self._pool.get_size() if self._pool is not None else 0)) - gauge( - "db.free_connections", CallbackGauge(callback=lambda: self._pool.get_idle_size() if self._pool is not None else 0) - ) - - async def get_connection_status(self) -> bool: - if self._pool is not None and not self._pool._closing and not self._pool._closed: - try: - async with self._pool.acquire(timeout=10): - return True - except Exception: - LOGGER.exception("Connection to PostgreSQL failed") - return False - - async def _purge_agent_processes(self) -> None: - agent_processes_to_keep = opt.agent_processes_to_keep.get() - await data.AgentProcess.cleanup(nr_expired_records_to_keep=agent_processes_to_keep) - - async def _report_database_pool_exhaustion(self) -> None: - assert self._db_pool_watcher is not None # Make mypy happy - self._db_pool_watcher.report_and_reset(LOGGER) - - async def _check_database_pool_exhaustion(self) -> None: - assert self._db_pool_watcher is not None # Make mypy happy - self._db_pool_watcher.check_for_pool_exhaustion() + assert self._db_monitor is not None # make mypy happy + return (await self._db_monitor.get_status()).dict() async def initialize_database_connection_pool( diff --git a/src/inmanta/util/__init__.py b/src/inmanta/util/__init__.py index e3ea3662b4..cf534fa6d0 100644 --- a/src/inmanta/util/__init__.py +++ b/src/inmanta/util/__init__.py @@ -392,6 +392,25 @@ def action_function() -> None: self._scheduled[task_spec] = handle return task_spec + def schedule( + self, + call: TaskMethod, + interval: float = 60, + initial_delay: Optional[float] = None, + cancel_on_stop: bool = True, + quiet_mode: bool = False, + ) -> Optional[ScheduledTask]: + """ + Schedule a task repeatedly with a given interval. Tasks with the same call and the same schedule are considered the + same. Clients that wish to be able to delete tasks should make sure to use a unique `call` function. + + :param interval: The interval between executions of the task. + :param initial_delay: The delay to execute the task for the first time. If not set, interval is used. + :quiet_mode: Set to true to disable logging the recurring notification that the action is being called. Use this to + avoid polluting the server log for very frequent actions. + """ + return self.add_action(call, IntervalSchedule(interval, initial_delay), cancel_on_stop, quiet_mode) + @stable_api def remove(self, task: ScheduledTask) -> None: """ @@ -850,6 +869,7 @@ class ExhaustedPoolWatcher: def __init__(self, pool: asyncpg.pool.Pool) -> None: self._exhausted_pool_events_count: int = 0 + self._last_report: int = 0 self._pool: asyncpg.pool.Pool = pool def report_and_reset(self, logger: logging.Logger) -> None: @@ -857,9 +877,10 @@ def report_and_reset(self, logger: logging.Logger) -> None: Log how many exhausted pool events were recorded since the last time the counter was reset, if any, and reset the counter. """ - if self._exhausted_pool_events_count > 0: - logger.warning("Database pool was exhausted %d times in the past 24h.", self._exhausted_pool_events_count) - self._reset_counter() + since_last = self._exhausted_pool_events_count - self._last_report + if since_last > 0: + logger.warning("Database pool was exhausted %d times in the past 24h.", since_last) + self._last_report = self._exhausted_pool_events_count def check_for_pool_exhaustion(self) -> None: """ @@ -869,9 +890,6 @@ def check_for_pool_exhaustion(self) -> None: if pool_exhausted: self._exhausted_pool_events_count += 1 - def _reset_counter(self) -> None: - self._exhausted_pool_events_count = 0 - def remove_comment_part_from_specifier(to_clean: str) -> str: """ diff --git a/stubs/pyformance/__init__.pyi b/stubs/pyformance/__init__.pyi index dbe8409fbe..ac76505079 100644 --- a/stubs/pyformance/__init__.pyi +++ b/stubs/pyformance/__init__.pyi @@ -5,6 +5,9 @@ from pyformance.meters import Gauge class MetricsRegistry: + + _gauges: dict[str, Gauge] + def __init__(self, clock): ... def dump_metrics(self)-> abc.Mapping[str, abc.Mapping[str, int|float|str]]: ... diff --git a/tests/server/test_databaseservice.py b/tests/server/test_databaseservice.py index fa379b4992..1025677dea 100644 --- a/tests/server/test_databaseservice.py +++ b/tests/server/test_databaseservice.py @@ -74,7 +74,7 @@ def exhaustion_events_recorded() -> bool: """ Returns true if some database exhaustion events have been recorded """ - n_events: int = database_slice._db_pool_watcher._exhausted_pool_events_count + n_events: int = database_slice._db_monitor._db_pool_watcher._exhausted_pool_events_count return n_events > 0 with caplog.at_level(logging.WARNING, "inmanta.server.services.databaseservice"): @@ -91,7 +91,7 @@ def exhaustion_events_recorded() -> bool: await connection.close() # Call _report_database_pool_exhaustion manually (scheduled to run every 24h) - await database_slice._report_database_pool_exhaustion() + await database_slice._db_monitor._report_database_pool_exhaustion() log_contains( caplog, diff --git a/tests/server/test_server_status.py b/tests/server/test_server_status.py index a87096f427..5467bc9906 100644 --- a/tests/server/test_server_status.py +++ b/tests/server/test_server_status.py @@ -23,7 +23,7 @@ from inmanta.server.services.compilerservice import CompilerService -async def test_server_status(server, client): +async def test_server_status(server, client, agent, environment): result = await client.get_server_status() assert result.code == 200 @@ -38,6 +38,14 @@ async def test_server_status(server, client): assert len([x for x in status["slices"] if x["name"] == "core.database"]) == 1 assert db_status[0]["status"]["connected"] is True + scheduler_status = [x for x in status["slices"] if x["name"] == "core.scheduler_manager"] + assert len(scheduler_status) == 1 + + scheduler_stats = scheduler_status[0]["status"] + assert scheduler_stats["total"]["connected"] is True + # test environment is called dev + assert scheduler_stats["dev"]["connected"] is True + assert "features" in status assert len(status["features"]) > 0 diff --git a/tests/test_influxdbreporting.py b/tests/test_influxdbreporting.py index 0db3afa088..6f5cbd984f 100644 --- a/tests/test_influxdbreporting.py +++ b/tests/test_influxdbreporting.py @@ -26,7 +26,9 @@ from tornado.web import url from inmanta.reporter import AsyncReporter, InfluxReporter +from inmanta.server.config import influxdb_host, influxdb_interval, influxdb_port from inmanta.server.services.metricservice import CPUMicroBenchMark +from utils import retry_limited class QueryMockHandler(tornado.web.RequestHandler): @@ -36,13 +38,13 @@ def initialize(self, parent): def get(self, *args, **kwargs): self.parent.querycount += 1 try: - assert self.request.query_arguments["q"] == [b"CREATE DATABASE metrics"] + assert self.request.query_arguments["q"] == [b"CREATE DATABASE inmanta"] except Exception as e: # carry over failures self.parent.failure = e -influxlineprotocol = re.compile(r"\w+(,\w+=\w+)* (\w+=[\d.e+-]*)(,\w+=[\d.e+-]*)* \d+") +influxlineprotocol = re.compile(r"\w+(,\w+=[^, ]+)* (\w+=[\d.e+-]*)(,\w+=[\d.e+-]*)* \d+") class WriteMockHandler(tornado.web.RequestHandler): @@ -102,7 +104,7 @@ def influxdb(event_loop, free_socket): async def test_influxdb(influxdb): - rep = InfluxReporter(port=influxdb.port, tags={"mark": "X"}, autocreate_database=True) + rep = InfluxReporter(port=influxdb.port, tags={"mark": "X"}, autocreate_database=True, database="inmanta") with timer("test").time(): pass @@ -182,20 +184,35 @@ async def test_available_metrics(server): base_types = [base_type] types[key] = base_types - assert metrics["db.connected"]["value"] - assert "db.max_pool" in metrics - assert "db.open_connections" in metrics - assert "db.free_connections" in metrics + assert metrics["db.connected,component=server"]["value"] + assert "db.max_pool,component=server" in metrics + assert "db.open_connections,component=server" in metrics + assert "db.free_connections,component=server" in metrics assert "self.spec.cpu" in metrics # ensure it doesn't crash when the server is down await server.stop() - metrics = global_registry().dump_metrics() - - assert metrics["db.max_pool"]["value"] == 0 - assert not metrics["db.connected"]["value"] async def test_safeness_if_server_down(): # ensure it works is there is no server global_registry().dump_metrics() + + +@pytest.fixture +async def server_config(server_config, influxdb): + influxdb_host.set("127.0.0.1") + influxdb_port.set(str(influxdb.port)) + influxdb_interval.set(str(1)) + + +@pytest.mark.parametrize("auto_start_agent", [True]) +async def test_influxdb_server_and_scheduler(server, influxdb, auto_start_agent, environment): + """Test that we receive influxdb information from the scheduler""" + + # Using this function signature makes test easier to debug, but doesn't test the intialization + # async def test_influxdb_server_and_scheduler(server, agent, influxdb): + def has_scheduler_metrics(): + return any("metrics,key=db.pool_exhaustion_count,component=scheduler,environment=" in line for line in influxdb.lines) + + await retry_limited(has_scheduler_metrics, 10)