Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue/pool reporting #8293

Open
wants to merge 14 commits into
base: master
Choose a base branch
from
6 changes: 6 additions & 0 deletions changelogs/unreleased/8228-database_pool_report.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
description: Update database pool status reporting for the new scheduler
issue-nr: 8228
change-type: minor
destination-branches: [master]
sections:
minor-improvement: "{{description}}"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this deserve a change entry, considering that the new scheduler has not been publicly released? Or has it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is a user visible change, but perhaps I should add more detail?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean the entire scheduler is new, so I would assume that to be one change entry rather than a separate one for each prt it affects.

6 changes: 4 additions & 2 deletions mypy-baseline.txt
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ src/inmanta/ast/type.py:0: error: No overload variant of "reversed" matches argu
src/inmanta/ast/type.py:0: note: Possible overload variants:
src/inmanta/ast/type.py:0: note: def [_T] __new__(cls, Reversible[_T], /) -> reversed[_T]
src/inmanta/ast/type.py:0: note: def [_T] __new__(cls, SupportsLenAndGetItem[_T], /) -> reversed[_T]
src/inmanta/server/services/databaseservice.py:0: error: Incompatible return value type (got "dict[str, object]", expected "dict[str, BaseModel | UUID | bool | int | float | datetime | str | Sequence[BaseModel | UUID | bool | int | float | datetime | str] | Mapping[str, BaseModel | UUID | bool | int | float | datetime | str]]") [return-value]
src/inmanta/server/services/metricservice.py:0: error: Incompatible types in assignment (expression has type "float", variable has type "int") [assignment]
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pre-existing issue, moved

src/inmanta/protocol/decorators.py:0: error: "FuncT" has no attribute "__protocol_method__" [attr-defined]
src/inmanta/protocol/decorators.py:0: error: "FuncT" has no attribute "__protocol_mapping__" [attr-defined]
src/inmanta/protocol/decorators.py:0: error: "FuncT" has no attribute "__api_version__" [attr-defined]
Expand Down Expand Up @@ -156,6 +156,7 @@ src/inmanta/ast/attribute.py:0: error: Incompatible types in assignment (express
src/inmanta/ast/attribute.py:0: error: Missing type parameters for generic type "ResultVariable" [type-arg]
src/inmanta/ast/attribute.py:0: error: Missing type parameters for generic type "ResultVariable" [type-arg]
src/inmanta/util/__init__.py:0: error: Argument "key" to "sorted" has incompatible type "Callable[[T], S]"; expected "Callable[[T], SupportsDunderLT[Any] | SupportsDunderGT[Any]]" [arg-type]
src/inmanta/util/__init__.py:0: error: Returning Any from function declared to return "ScheduledTask | None" [no-any-return]
src/inmanta/util/__init__.py:0: error: Returning Any from function declared to return "bool" [no-any-return]
src/inmanta/util/__init__.py:0: error: Incompatible return value type (got "bool | Awaitable[bool]", expected "bool") [return-value]
src/inmanta/resources.py:0: error: Function is missing a type annotation [no-untyped-def]
Expand Down Expand Up @@ -236,6 +237,7 @@ src/inmanta/protocol/methods_v2.py:0: error: Missing return statement [empty-bo
src/inmanta/protocol/methods_v2.py:0: error: Missing return statement [empty-body]
src/inmanta/protocol/methods_v2.py:0: error: Missing return statement [empty-body]
src/inmanta/protocol/methods_v2.py:0: error: Missing return statement [empty-body]
src/inmanta/protocol/methods_v2.py:0: error: Missing return statement [empty-body]
src/inmanta/protocol/methods_v2.py:0: error: Cannot determine type of "RVID_OPTS" [has-type]
src/inmanta/protocol/methods_v2.py:0: error: Cannot determine type of "RVID_OPTS" [has-type]
src/inmanta/protocol/methods_v2.py:0: error: Missing return statement [empty-body]
Expand Down Expand Up @@ -1101,6 +1103,7 @@ src/inmanta/server/services/orchestrationservice.py:0: error: Incompatible types
src/inmanta/server/services/orchestrationservice.py:0: error: Incompatible types in assignment (expression has type "set[str | None]", variable has type "set[str]") [assignment]
src/inmanta/server/services/orchestrationservice.py:0: error: Argument "agents" to "_trigger_auto_deploy" of "OrchestrationService" has incompatible type "Collection[str]"; expected "Sequence[str] | None" [arg-type]
src/inmanta/server/services/orchestrationservice.py:0: error: Incompatible return value type (got "ReturnValueWithMeta[Sequence[DesiredStateVersion]]", expected "ReturnValue[list[DesiredStateVersion]]") [return-value]
src/inmanta/server/agentmanager.py:0: error: Value of type "dict[str, Any] | None" is not indexable [index]
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

decoding of api results, difficult to avoid

src/inmanta/server/agentmanager.py:0: error: Argument 1 to "ensure_agent_registered" of "AgentManager" has incompatible type "Environment | None"; expected "Environment" [arg-type]
src/inmanta/agent/forking_executor.py:0: error: Argument 1 of "connection_made" is incompatible with supertype "BaseProtocol"; supertype defines the argument type as "BaseTransport" [override]
src/inmanta/agent/forking_executor.py:0: note: This violates the Liskov substitution principle
Expand All @@ -1115,7 +1118,6 @@ src/inmanta/server/services/paramservice.py:0: error: Unsupported operand types
src/inmanta/server/services/paramservice.py:0: note: Left operand is of type "datetime | None"
src/inmanta/server/services/paramservice.py:0: error: Unsupported operand types for > ("timedelta" and "datetime") [operator]
src/inmanta/server/services/paramservice.py:0: note: Left operand is of type "datetime | timedelta"
src/inmanta/server/services/metricservice.py:0: error: Incompatible types in assignment (expression has type "float", variable has type "int") [assignment]
src/inmanta/server/services/environment_metrics_service.py:0: error: Function is missing a return type annotation [no-untyped-def]
src/inmanta/server/services/environment_metrics_service.py:0: error: "Sequence[EnvironmentMetricsGauge]" has no attribute "append" [attr-defined]
src/inmanta/server/services/environment_metrics_service.py:0: error: Function is missing a return type annotation [no-untyped-def]
Expand Down
34 changes: 32 additions & 2 deletions src/inmanta/agent/agent_new.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,17 @@
from concurrent.futures.thread import ThreadPoolExecutor
from typing import Any, Callable, Coroutine, Optional, Union

from inmanta import config, const, protocol
import inmanta.server.config as opt
from inmanta import config, const, data, protocol
from inmanta.agent import config as cfg
from inmanta.agent import executor, forking_executor
from inmanta.agent.reporting import collect_report
from inmanta.const import AGENT_SCHEDULER_ID
from inmanta.data.model import AttributeStateChange, ResourceVersionIdStr
from inmanta.data.model import AttributeStateChange, DataBaseReport, ResourceVersionIdStr
from inmanta.deploy import scheduler
from inmanta.deploy.work import TaskPriority
from inmanta.protocol import SessionEndpoint, methods, methods_v2
from inmanta.server.services.databaseservice import DatabaseMonitor
from inmanta.types import Apireturn
from inmanta.util import (
CronSchedule,
Expand Down Expand Up @@ -84,6 +86,20 @@ def __init__(

self._set_deploy_and_repair_intervals()

async def start(self) -> None:
# Make mypy happy
assert data.Resource._connection_pool is not None
self._db_monitor = DatabaseMonitor(
data.Resource._connection_pool,
opt.db_name.get(),
opt.db_host.get(),
"scheduler",
str(self.environment),
)
self._db_monitor.start()
Comment on lines +92 to +99
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Scheduler gets DB monitor as well


await super().start()

def _set_deploy_and_repair_intervals(self) -> None:
"""
Fetch the settings related to automatic deploys and repairs from the config
Expand Down Expand Up @@ -323,6 +339,20 @@ async def get_facts(self, env: uuid.UUID, agent: str, resource: dict[str, Any])
async def get_status(self) -> Apireturn:
return 200, collect_report(self)

@protocol.handle(methods_v2.get_db_status)
async def get_db_status(self) -> DataBaseReport:
if self._db_monitor is None:
return DataBaseReport(
connected=False,
database="",
host="",
max_pool=0,
open_connections=0,
free_connections=0,
pool_exhaustion_count=0,
)
return await self._db_monitor.get_status()
Comment on lines +361 to +372
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

new status endpoint on the scheduler to get database status


def check_storage(self) -> dict[str, str]:
"""
Check if the server storage is configured and ready to use. Ultimately, this is
Expand Down
4 changes: 4 additions & 0 deletions src/inmanta/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
from inmanta.server import config as opt
from inmanta.server.bootloader import InmantaBootloader
from inmanta.server.services.databaseservice import initialize_database_connection_pool
from inmanta.server.services.metricservice import MetricsService
from inmanta.signals import safe_shutdown, setup_signal_handlers
from inmanta.util import get_compiler_version
from inmanta.warnings import WarningsManager
Expand Down Expand Up @@ -156,6 +157,9 @@ async def start() -> None:
connection_pool_max_size=agent_config.scheduler_db_connection_pool_max_size.get(),
connection_timeout=agent_config.scheduler_db_connection_timeout.get(),
)
# also report metrics if this is relevant
metrics_reporter = MetricsService()
metrics_reporter.start_metric_reporters()
Comment on lines +161 to +162
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

scheduler will now also report to influxdb

await a.start()

setup_signal_handlers(a.stop)
Expand Down
31 changes: 29 additions & 2 deletions src/inmanta/data/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
from collections import abc
from collections.abc import Sequence
from enum import Enum, StrEnum
from typing import ClassVar, NewType, Optional, Self, Union
from typing import ClassVar, Mapping, NewType, Optional, Self, Union

import pydantic
import pydantic.schema
Expand Down Expand Up @@ -89,7 +89,7 @@ class SliceStatus(BaseModel):
"""

name: str
status: dict[str, ArgumentTypes]
status: Mapping[str, ArgumentTypes | Mapping[str, ArgumentTypes]]
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This breaks the FE, but allow nicer presentation



class FeatureStatus(BaseModel):
Expand Down Expand Up @@ -859,3 +859,30 @@ def has_source(self) -> bool:


LEGACY_PIP_DEFAULT = PipConfig(use_system_config=True)


class DataBaseReport(BaseModel):
connected: bool
database: str
host: str
max_pool: int
open_connections: int
free_connections: int
pool_exhaustion_count: int

def __add__(self, other: "DataBaseReport") -> "DataBaseReport":
if not isinstance(other, DataBaseReport):
return NotImplemented
if other.database != self.database:
return NotImplemented
if other.host != self.host:
return NotImplemented
return DataBaseReport(
connected=self.connected and other.connected,
database=self.database,
host=self.host,
max_pool=self.max_pool + other.max_pool,
open_connections=self.open_connections + other.open_connections,
free_connections=self.free_connections + other.free_connections,
pool_exhaustion_count=self.pool_exhaustion_count + other.pool_exhaustion_count,
)
11 changes: 10 additions & 1 deletion src/inmanta/protocol/methods_v2.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@

from inmanta.const import AgentAction, ApiDocsFormat, Change, ClientType, ParameterSource, ResourceState
from inmanta.data import model
from inmanta.data.model import LinkedDiscoveredResource, PipConfig, ResourceIdStr
from inmanta.data.model import DataBaseReport, LinkedDiscoveredResource, PipConfig, ResourceIdStr
from inmanta.protocol import methods
from inmanta.protocol.common import ReturnValue
from inmanta.protocol.decorators import typedmethod
Expand Down Expand Up @@ -496,6 +496,15 @@ def update_agent_map(agent_map: dict[str, str]) -> None:
"""


@typedmethod(
path="/db_status", api=False, server_agent=True, enforce_auth=False, operation="POST", client_types=[], api_version=2
)
def get_db_status() -> DataBaseReport:
"""
Get a report of the DB connection pool status
"""


@typedmethod(
path="/compiledata/<id>",
operation="GET",
Expand Down
2 changes: 2 additions & 0 deletions src/inmanta/reporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ class InfluxReporter(AsyncReporter):
"""
InfluxDB reporter using native http api
(based on https://influxdb.com/docs/v1.1/guides/writing_data.html)

If metric name end with `,a=b` these will append as tags `a=b`
"""

def __init__(
Expand Down
2 changes: 1 addition & 1 deletion src/inmanta/server/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
# flake8: noqa: F401

SLICE_SERVER = "core.server"
SLICE_AGENT_MANAGER = "core.agentmanager"
SLICE_AGENT_MANAGER = "core.scheduler_manager"
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rename for FE, hopefully OK

SLICE_AUTOSTARTED_AGENT_MANAGER = "core.autostarted_agent_manager"
SLICE_SESSION_MANAGER = "core.session"
SLICE_DATABASE = "core.database"
Expand Down
98 changes: 90 additions & 8 deletions src/inmanta/server/agentmanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
from collections.abc import Collection, Iterable, Iterator, Mapping, Sequence, Set
from datetime import datetime
from enum import Enum
from functools import reduce
from typing import Any, Optional, Union, cast
from uuid import UUID

Expand All @@ -38,9 +39,9 @@
from inmanta import const, data, tracing
from inmanta.agent import config as agent_cfg
from inmanta.config import Config
from inmanta.const import UNDEPLOYABLE_NAMES, AgentAction, AgentStatus
from inmanta.data import APILIMIT, InvalidSort, model
from inmanta.data.model import ResourceIdStr
from inmanta.const import AGENT_SCHEDULER_ID, UNDEPLOYABLE_NAMES, AgentAction, AgentStatus
from inmanta.data import APILIMIT, Environment, InvalidSort, model
from inmanta.data.model import DataBaseReport, ResourceIdStr
from inmanta.protocol import encode_token, handle, methods, methods_v2
from inmanta.protocol.common import ReturnValue
from inmanta.protocol.exceptions import BadRequest, Forbidden, NotFound, ShutdownInProgress
Expand Down Expand Up @@ -168,12 +169,67 @@ def __init__(self, closesessionsonstart: bool = True, fact_back_off: Optional[in

self.closesessionsonstart: bool = closesessionsonstart

async def get_status(self) -> dict[str, ArgumentTypes]:
return {
async def get_status(self) -> Mapping[str, ArgumentTypes | Mapping[str, ArgumentTypes]]:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method is very tricky, as it has to return withing the timeout of the status endpoint.

I chose to make it report what it has, even if that makes the format somewhat inconsistent.
(e.g. if the DB is down, the environments will be reported by UUID instead of name)

# The basic report

out: dict[str, ArgumentTypes | Mapping[str, ArgumentTypes]] = {
"resource_facts": len(self._fact_resource_block_set),
"sessions": len(self.sessions),
}

# Try to get more info from scheduler, but make sure not to timeout
schedulers = self.get_all_schedulers()
deadline = 0.9 * Server.GET_SERVER_STATUS_TIMEOUT

async def get_report(env: uuid.UUID, session: protocol.Session) -> tuple[uuid.UUID, DataBaseReport]:
result = await asyncio.wait_for(session.client.get_db_status(), deadline)
assert result.code == 200
sanderr marked this conversation as resolved.
Show resolved Hide resolved
# Mypy can't help here, ....
sanderr marked this conversation as resolved.
Show resolved Hide resolved
return (env, DataBaseReport(**result.result["data"]))

# Get env name mapping in parallel with next call
env_mapping = asyncio.create_task(
asyncio.wait_for(Environment.get_list(details=False, is_marked_for_deletion=False), deadline)
)
# Get the reports of all schedulers
results = await asyncio.gather(*[get_report(env, scheduler) for env, scheduler in schedulers], return_exceptions=True)
try:
# This can timeout, but not likely
env_mapping_result = await env_mapping
uuid_to_name = {env.id: env.name for env in env_mapping_result}
except TimeoutError:
# default to uuid's
uuid_to_name = {}

# Filter out timeouts and other errors
good_reports = [x[1] for x in results if not isinstance(x, BaseException)]

def short_report(report: DataBaseReport) -> Mapping[str, ArgumentTypes]:
return {
"connected": report.connected,
"max_pool": report.max_pool,
"open_connections": report.open_connections,
"free_connections": report.free_connections,
"pool_exhaustion_count": report.pool_exhaustion_count,
}

if good_reports:
total: DataBaseReport = reduce(lambda x, y: x + y, good_reports)
out["database"] = total.database
out["host"] = total.host
out["total"] = short_report(total)

for result in results:
if isinstance(result, BaseException):
logging.debug("Failed to collect database status for scheduler", exc_info=True)
else:
the_uuid = result[0]
# Default name to uuid
name = uuid_to_name.get(the_uuid, str(the_uuid))
out[name] = short_report(result[1])

return out

def get_dependencies(self) -> list[str]:
return [SLICE_DATABASE, SLICE_SESSION_MANAGER]

Expand All @@ -185,7 +241,6 @@ async def prestart(self, server: protocol.Server) -> None:
autostarted_agent_manager = server.get_slice(SLICE_AUTOSTARTED_AGENT_MANAGER)
assert isinstance(autostarted_agent_manager, AutostartedAgentManager)
self._autostarted_agent_manager = autostarted_agent_manager

presession = server.get_slice(SLICE_SESSION_MANAGER)
assert isinstance(presession, SessionManager)
presession.add_listener(self)
Expand All @@ -197,13 +252,27 @@ async def start(self) -> None:
await self._expire_all_sessions_in_db()

self.add_background_task(self._process_session_listener_actions())
# Schedule cleanup agentprocess and agentinstance tables
agent_process_purge_interval = opt.agent_process_purge_interval.get()
if agent_process_purge_interval > 0:
self.schedule(
self._purge_agent_processes, interval=agent_process_purge_interval, initial_delay=0, cancel_on_stop=False
)
Hugo-Inmanta marked this conversation as resolved.
Show resolved Hide resolved

async def prestop(self) -> None:
await super().prestop()

async def stop(self) -> None:
await super().stop()

def get_all_schedulers(self) -> list[tuple[uuid.UUID, protocol.Session]]:
# Linear scan, but every item should be a hit
return [
(env_id, session)
for (env_id, agent_id), session in self.tid_endpoint_to_session.items()
if agent_id == AGENT_SCHEDULER_ID
]

async def halt_agents(self, env: data.Environment, connection: Optional[asyncpg.connection.Connection] = None) -> None:
"""
Halts all agents for an environment. Persists prior paused state.
Expand Down Expand Up @@ -516,6 +585,10 @@ async def _expire_all_sessions_in_db(self) -> None:
await data.AgentInstance.expire_all(now=datetime.now().astimezone(), connection=connection)
await data.Agent.mark_all_as_non_primary(connection=connection)

async def _purge_agent_processes(self) -> None:
agent_processes_to_keep = opt.agent_processes_to_keep.get()
await data.AgentProcess.cleanup(nr_expired_records_to_keep=agent_processes_to_keep)

# Util
async def _use_new_active_session_for_agent(self, tid: uuid.UUID, endpoint_name: str) -> Optional[protocol.Session]:
"""
Expand Down Expand Up @@ -943,7 +1016,7 @@ def __init__(self) -> None:
self._agent_procs: dict[UUID, subprocess.Process] = {} # env uuid -> subprocess.Process
self.agent_lock = asyncio.Lock() # Prevent concurrent updates on _agent_procs

async def get_status(self) -> dict[str, ArgumentTypes]:
async def get_status(self) -> Mapping[str, ArgumentTypes]:
return {"processes": len(self._agent_procs)}

async def prestart(self, server: protocol.Server) -> None:
Expand Down Expand Up @@ -1367,12 +1440,21 @@ async def _make_agent_config(
name={opt.db_name.get()}
username={opt.db_username.get()}
password={opt.db_password.get()}

[scheduler]
db-connection-pool-min-size={agent_cfg.scheduler_db_connection_pool_min_size.get()}
db-connection-pool-max-size={agent_cfg.scheduler_db_connection_pool_max_size.get()}
db-connection-timeout={agent_cfg.scheduler_db_connection_timeout.get()}

"""
[influxdb]
host = {opt.influxdb_host.get()}
port = {opt.influxdb_port.get()}
name = {opt.influxdb_name.get()}
username = {opt.influxdb_username.get()}
password = {opt.influxdb_password.get()}
interval = {opt.influxdb_interval.get()}
tags = {opt.influxdb_tags.get()}
"""
return config

async def _fork_inmanta(
Expand Down
Loading