-
Notifications
You must be signed in to change notification settings - Fork 7
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Issue/pool reporting #8293
base: master
Are you sure you want to change the base?
Issue/pool reporting #8293
Changes from 13 commits
6bc1daf
66670dc
4966ef2
d66600d
98e12fb
74ee0f9
06cb13b
188e67f
02095e0
cc1afe3
2f38e1a
4fbff56
9d72fd3
1861b0a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,6 @@ | ||
description: Update database pool status reporting for the new scheduler | ||
issue-nr: 8228 | ||
change-type: minor | ||
destination-branches: [master] | ||
sections: | ||
minor-improvement: "{{description}}" | ||
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -101,7 +101,7 @@ src/inmanta/ast/type.py:0: error: No overload variant of "reversed" matches argu | |
src/inmanta/ast/type.py:0: note: Possible overload variants: | ||
src/inmanta/ast/type.py:0: note: def [_T] __new__(cls, Reversible[_T], /) -> reversed[_T] | ||
src/inmanta/ast/type.py:0: note: def [_T] __new__(cls, SupportsLenAndGetItem[_T], /) -> reversed[_T] | ||
src/inmanta/server/services/databaseservice.py:0: error: Incompatible return value type (got "dict[str, object]", expected "dict[str, BaseModel | UUID | bool | int | float | datetime | str | Sequence[BaseModel | UUID | bool | int | float | datetime | str] | Mapping[str, BaseModel | UUID | bool | int | float | datetime | str]]") [return-value] | ||
src/inmanta/server/services/metricservice.py:0: error: Incompatible types in assignment (expression has type "float", variable has type "int") [assignment] | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. pre-existing issue, moved |
||
src/inmanta/protocol/decorators.py:0: error: "FuncT" has no attribute "__protocol_method__" [attr-defined] | ||
src/inmanta/protocol/decorators.py:0: error: "FuncT" has no attribute "__protocol_mapping__" [attr-defined] | ||
src/inmanta/protocol/decorators.py:0: error: "FuncT" has no attribute "__api_version__" [attr-defined] | ||
|
@@ -156,6 +156,7 @@ src/inmanta/ast/attribute.py:0: error: Incompatible types in assignment (express | |
src/inmanta/ast/attribute.py:0: error: Missing type parameters for generic type "ResultVariable" [type-arg] | ||
src/inmanta/ast/attribute.py:0: error: Missing type parameters for generic type "ResultVariable" [type-arg] | ||
src/inmanta/util/__init__.py:0: error: Argument "key" to "sorted" has incompatible type "Callable[[T], S]"; expected "Callable[[T], SupportsDunderLT[Any] | SupportsDunderGT[Any]]" [arg-type] | ||
src/inmanta/util/__init__.py:0: error: Returning Any from function declared to return "ScheduledTask | None" [no-any-return] | ||
src/inmanta/util/__init__.py:0: error: Returning Any from function declared to return "bool" [no-any-return] | ||
src/inmanta/util/__init__.py:0: error: Incompatible return value type (got "bool | Awaitable[bool]", expected "bool") [return-value] | ||
src/inmanta/resources.py:0: error: Function is missing a type annotation [no-untyped-def] | ||
|
@@ -236,6 +237,7 @@ src/inmanta/protocol/methods_v2.py:0: error: Missing return statement [empty-bo | |
src/inmanta/protocol/methods_v2.py:0: error: Missing return statement [empty-body] | ||
src/inmanta/protocol/methods_v2.py:0: error: Missing return statement [empty-body] | ||
src/inmanta/protocol/methods_v2.py:0: error: Missing return statement [empty-body] | ||
src/inmanta/protocol/methods_v2.py:0: error: Missing return statement [empty-body] | ||
src/inmanta/protocol/methods_v2.py:0: error: Cannot determine type of "RVID_OPTS" [has-type] | ||
src/inmanta/protocol/methods_v2.py:0: error: Cannot determine type of "RVID_OPTS" [has-type] | ||
src/inmanta/protocol/methods_v2.py:0: error: Missing return statement [empty-body] | ||
|
@@ -1101,6 +1103,7 @@ src/inmanta/server/services/orchestrationservice.py:0: error: Incompatible types | |
src/inmanta/server/services/orchestrationservice.py:0: error: Incompatible types in assignment (expression has type "set[str | None]", variable has type "set[str]") [assignment] | ||
src/inmanta/server/services/orchestrationservice.py:0: error: Argument "agents" to "_trigger_auto_deploy" of "OrchestrationService" has incompatible type "Collection[str]"; expected "Sequence[str] | None" [arg-type] | ||
src/inmanta/server/services/orchestrationservice.py:0: error: Incompatible return value type (got "ReturnValueWithMeta[Sequence[DesiredStateVersion]]", expected "ReturnValue[list[DesiredStateVersion]]") [return-value] | ||
src/inmanta/server/agentmanager.py:0: error: Value of type "dict[str, Any] | None" is not indexable [index] | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. decoding of api results, difficult to avoid |
||
src/inmanta/server/agentmanager.py:0: error: Argument 1 to "ensure_agent_registered" of "AgentManager" has incompatible type "Environment | None"; expected "Environment" [arg-type] | ||
src/inmanta/agent/forking_executor.py:0: error: Argument 1 of "connection_made" is incompatible with supertype "BaseProtocol"; supertype defines the argument type as "BaseTransport" [override] | ||
src/inmanta/agent/forking_executor.py:0: note: This violates the Liskov substitution principle | ||
|
@@ -1115,7 +1118,6 @@ src/inmanta/server/services/paramservice.py:0: error: Unsupported operand types | |
src/inmanta/server/services/paramservice.py:0: note: Left operand is of type "datetime | None" | ||
src/inmanta/server/services/paramservice.py:0: error: Unsupported operand types for > ("timedelta" and "datetime") [operator] | ||
src/inmanta/server/services/paramservice.py:0: note: Left operand is of type "datetime | timedelta" | ||
src/inmanta/server/services/metricservice.py:0: error: Incompatible types in assignment (expression has type "float", variable has type "int") [assignment] | ||
src/inmanta/server/services/environment_metrics_service.py:0: error: Function is missing a return type annotation [no-untyped-def] | ||
src/inmanta/server/services/environment_metrics_service.py:0: error: "Sequence[EnvironmentMetricsGauge]" has no attribute "append" [attr-defined] | ||
src/inmanta/server/services/environment_metrics_service.py:0: error: Function is missing a return type annotation [no-untyped-def] | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -24,15 +24,17 @@ | |
from concurrent.futures.thread import ThreadPoolExecutor | ||
from typing import Any, Callable, Coroutine, Optional, Union | ||
|
||
from inmanta import config, const, protocol | ||
import inmanta.server.config as opt | ||
from inmanta import config, const, data, protocol | ||
from inmanta.agent import config as cfg | ||
from inmanta.agent import executor, forking_executor | ||
from inmanta.agent.reporting import collect_report | ||
from inmanta.const import AGENT_SCHEDULER_ID | ||
from inmanta.data.model import AttributeStateChange, ResourceVersionIdStr | ||
from inmanta.data.model import AttributeStateChange, DataBaseReport, ResourceVersionIdStr | ||
from inmanta.deploy import scheduler | ||
from inmanta.deploy.work import TaskPriority | ||
from inmanta.protocol import SessionEndpoint, methods, methods_v2 | ||
from inmanta.server.services.databaseservice import DatabaseMonitor | ||
from inmanta.types import Apireturn | ||
from inmanta.util import ( | ||
CronSchedule, | ||
|
@@ -84,6 +86,20 @@ def __init__( | |
|
||
self._set_deploy_and_repair_intervals() | ||
|
||
async def start(self) -> None: | ||
# Make mypy happy | ||
assert data.Resource._connection_pool is not None | ||
self._db_monitor = DatabaseMonitor( | ||
data.Resource._connection_pool, | ||
opt.db_name.get(), | ||
opt.db_host.get(), | ||
"scheduler", | ||
str(self.environment), | ||
) | ||
self._db_monitor.start() | ||
Comment on lines
+92
to
+99
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Scheduler gets DB monitor as well |
||
|
||
await super().start() | ||
|
||
def _set_deploy_and_repair_intervals(self) -> None: | ||
""" | ||
Fetch the settings related to automatic deploys and repairs from the config | ||
|
@@ -323,6 +339,20 @@ async def get_facts(self, env: uuid.UUID, agent: str, resource: dict[str, Any]) | |
async def get_status(self) -> Apireturn: | ||
return 200, collect_report(self) | ||
|
||
@protocol.handle(methods_v2.get_db_status) | ||
async def get_db_status(self) -> DataBaseReport: | ||
if self._db_monitor is None: | ||
return DataBaseReport( | ||
connected=False, | ||
database="", | ||
host="", | ||
max_pool=0, | ||
open_connections=0, | ||
free_connections=0, | ||
pool_exhaustion_count=0, | ||
) | ||
return await self._db_monitor.get_status() | ||
Comment on lines
+361
to
+372
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. new status endpoint on the scheduler to get database status |
||
|
||
def check_storage(self) -> dict[str, str]: | ||
""" | ||
Check if the server storage is configured and ready to use. Ultimately, this is | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -67,6 +67,7 @@ | |
from inmanta.server import config as opt | ||
from inmanta.server.bootloader import InmantaBootloader | ||
from inmanta.server.services.databaseservice import initialize_database_connection_pool | ||
from inmanta.server.services.metricservice import MetricsService | ||
from inmanta.signals import safe_shutdown, setup_signal_handlers | ||
from inmanta.util import get_compiler_version | ||
from inmanta.warnings import WarningsManager | ||
|
@@ -156,6 +157,9 @@ async def start() -> None: | |
connection_pool_max_size=agent_config.scheduler_db_connection_pool_max_size.get(), | ||
connection_timeout=agent_config.scheduler_db_connection_timeout.get(), | ||
) | ||
# also report metrics if this is relevant | ||
metrics_reporter = MetricsService() | ||
metrics_reporter.start_metric_reporters() | ||
Comment on lines
+161
to
+162
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. scheduler will now also report to influxdb |
||
await a.start() | ||
|
||
setup_signal_handlers(a.stop) | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -23,7 +23,7 @@ | |
from collections import abc | ||
from collections.abc import Sequence | ||
from enum import Enum, StrEnum | ||
from typing import ClassVar, NewType, Optional, Self, Union | ||
from typing import ClassVar, Mapping, NewType, Optional, Self, Union | ||
|
||
import pydantic | ||
import pydantic.schema | ||
|
@@ -89,7 +89,7 @@ class SliceStatus(BaseModel): | |
""" | ||
|
||
name: str | ||
status: dict[str, ArgumentTypes] | ||
status: Mapping[str, ArgumentTypes | Mapping[str, ArgumentTypes]] | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This breaks the FE, but allow nicer presentation |
||
|
||
|
||
class FeatureStatus(BaseModel): | ||
|
@@ -859,3 +859,30 @@ def has_source(self) -> bool: | |
|
||
|
||
LEGACY_PIP_DEFAULT = PipConfig(use_system_config=True) | ||
|
||
|
||
class DataBaseReport(BaseModel): | ||
connected: bool | ||
database: str | ||
host: str | ||
max_pool: int | ||
open_connections: int | ||
free_connections: int | ||
pool_exhaustion_count: int | ||
|
||
def __add__(self, other: "DataBaseReport") -> "DataBaseReport": | ||
if not isinstance(other, DataBaseReport): | ||
return NotImplemented | ||
if other.database != self.database: | ||
return NotImplemented | ||
if other.host != self.host: | ||
return NotImplemented | ||
return DataBaseReport( | ||
connected=self.connected and other.connected, | ||
database=self.database, | ||
host=self.host, | ||
max_pool=self.max_pool + other.max_pool, | ||
open_connections=self.open_connections + other.open_connections, | ||
free_connections=self.free_connections + other.free_connections, | ||
pool_exhaustion_count=self.pool_exhaustion_count + other.pool_exhaustion_count, | ||
) |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -19,7 +19,7 @@ | |
# flake8: noqa: F401 | ||
|
||
SLICE_SERVER = "core.server" | ||
SLICE_AGENT_MANAGER = "core.agentmanager" | ||
SLICE_AGENT_MANAGER = "core.scheduler_manager" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. rename for FE, hopefully OK |
||
SLICE_AUTOSTARTED_AGENT_MANAGER = "core.autostarted_agent_manager" | ||
SLICE_SESSION_MANAGER = "core.session" | ||
SLICE_DATABASE = "core.database" | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -27,6 +27,7 @@ | |
from collections.abc import Collection, Iterable, Iterator, Mapping, Sequence, Set | ||
from datetime import datetime | ||
from enum import Enum | ||
from functools import reduce | ||
from typing import Any, Optional, Union, cast | ||
from uuid import UUID | ||
|
||
|
@@ -38,9 +39,9 @@ | |
from inmanta import const, data, tracing | ||
from inmanta.agent import config as agent_cfg | ||
from inmanta.config import Config | ||
from inmanta.const import UNDEPLOYABLE_NAMES, AgentAction, AgentStatus | ||
from inmanta.data import APILIMIT, InvalidSort, model | ||
from inmanta.data.model import ResourceIdStr | ||
from inmanta.const import AGENT_SCHEDULER_ID, UNDEPLOYABLE_NAMES, AgentAction, AgentStatus | ||
from inmanta.data import APILIMIT, Environment, InvalidSort, model | ||
from inmanta.data.model import DataBaseReport, ResourceIdStr | ||
from inmanta.protocol import encode_token, handle, methods, methods_v2 | ||
from inmanta.protocol.common import ReturnValue | ||
from inmanta.protocol.exceptions import BadRequest, Forbidden, NotFound, ShutdownInProgress | ||
|
@@ -168,12 +169,67 @@ def __init__(self, closesessionsonstart: bool = True, fact_back_off: Optional[in | |
|
||
self.closesessionsonstart: bool = closesessionsonstart | ||
|
||
async def get_status(self) -> dict[str, ArgumentTypes]: | ||
return { | ||
async def get_status(self) -> Mapping[str, ArgumentTypes | Mapping[str, ArgumentTypes]]: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This method is very tricky, as it has to return withing the timeout of the status endpoint. I chose to make it report what it has, even if that makes the format somewhat inconsistent. |
||
# The basic report | ||
|
||
out: dict[str, ArgumentTypes | Mapping[str, ArgumentTypes]] = { | ||
"resource_facts": len(self._fact_resource_block_set), | ||
"sessions": len(self.sessions), | ||
} | ||
|
||
# Try to get more info from scheduler, but make sure not to timeout | ||
schedulers = self.get_all_schedulers() | ||
deadline = 0.9 * Server.GET_SERVER_STATUS_TIMEOUT | ||
|
||
async def get_report(env: uuid.UUID, session: protocol.Session) -> tuple[uuid.UUID, DataBaseReport]: | ||
result = await asyncio.wait_for(session.client.get_db_status(), deadline) | ||
assert result.code == 200 | ||
sanderr marked this conversation as resolved.
Show resolved
Hide resolved
|
||
# Mypy can't help here, .... | ||
sanderr marked this conversation as resolved.
Show resolved
Hide resolved
|
||
return (env, DataBaseReport(**result.result["data"])) | ||
|
||
# Get env name mapping in parallel with next call | ||
env_mapping = asyncio.create_task( | ||
asyncio.wait_for(Environment.get_list(details=False, is_marked_for_deletion=False), deadline) | ||
) | ||
# Get the reports of all schedulers | ||
results = await asyncio.gather(*[get_report(env, scheduler) for env, scheduler in schedulers], return_exceptions=True) | ||
try: | ||
# This can timeout, but not likely | ||
env_mapping_result = await env_mapping | ||
uuid_to_name = {env.id: env.name for env in env_mapping_result} | ||
except TimeoutError: | ||
# default to uuid's | ||
uuid_to_name = {} | ||
|
||
# Filter out timeouts and other errors | ||
good_reports = [x[1] for x in results if not isinstance(x, BaseException)] | ||
|
||
def short_report(report: DataBaseReport) -> Mapping[str, ArgumentTypes]: | ||
return { | ||
"connected": report.connected, | ||
"max_pool": report.max_pool, | ||
"open_connections": report.open_connections, | ||
"free_connections": report.free_connections, | ||
"pool_exhaustion_count": report.pool_exhaustion_count, | ||
} | ||
|
||
if good_reports: | ||
total: DataBaseReport = reduce(lambda x, y: x + y, good_reports) | ||
out["database"] = total.database | ||
out["host"] = total.host | ||
out["total"] = short_report(total) | ||
|
||
for result in results: | ||
if isinstance(result, BaseException): | ||
logging.debug("Failed to collect database status for scheduler", exc_info=True) | ||
else: | ||
the_uuid = result[0] | ||
# Default name to uuid | ||
name = uuid_to_name.get(the_uuid, str(the_uuid)) | ||
out[name] = short_report(result[1]) | ||
|
||
return out | ||
|
||
def get_dependencies(self) -> list[str]: | ||
return [SLICE_DATABASE, SLICE_SESSION_MANAGER] | ||
|
||
|
@@ -185,7 +241,6 @@ async def prestart(self, server: protocol.Server) -> None: | |
autostarted_agent_manager = server.get_slice(SLICE_AUTOSTARTED_AGENT_MANAGER) | ||
assert isinstance(autostarted_agent_manager, AutostartedAgentManager) | ||
self._autostarted_agent_manager = autostarted_agent_manager | ||
|
||
presession = server.get_slice(SLICE_SESSION_MANAGER) | ||
assert isinstance(presession, SessionManager) | ||
presession.add_listener(self) | ||
|
@@ -197,13 +252,27 @@ async def start(self) -> None: | |
await self._expire_all_sessions_in_db() | ||
|
||
self.add_background_task(self._process_session_listener_actions()) | ||
# Schedule cleanup agentprocess and agentinstance tables | ||
agent_process_purge_interval = opt.agent_process_purge_interval.get() | ||
if agent_process_purge_interval > 0: | ||
self.schedule( | ||
self._purge_agent_processes, interval=agent_process_purge_interval, initial_delay=0, cancel_on_stop=False | ||
) | ||
Hugo-Inmanta marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
async def prestop(self) -> None: | ||
await super().prestop() | ||
|
||
async def stop(self) -> None: | ||
await super().stop() | ||
|
||
def get_all_schedulers(self) -> list[tuple[uuid.UUID, protocol.Session]]: | ||
# Linear scan, but every item should be a hit | ||
return [ | ||
(env_id, session) | ||
for (env_id, agent_id), session in self.tid_endpoint_to_session.items() | ||
if agent_id == AGENT_SCHEDULER_ID | ||
] | ||
|
||
async def halt_agents(self, env: data.Environment, connection: Optional[asyncpg.connection.Connection] = None) -> None: | ||
""" | ||
Halts all agents for an environment. Persists prior paused state. | ||
|
@@ -516,6 +585,10 @@ async def _expire_all_sessions_in_db(self) -> None: | |
await data.AgentInstance.expire_all(now=datetime.now().astimezone(), connection=connection) | ||
await data.Agent.mark_all_as_non_primary(connection=connection) | ||
|
||
async def _purge_agent_processes(self) -> None: | ||
agent_processes_to_keep = opt.agent_processes_to_keep.get() | ||
await data.AgentProcess.cleanup(nr_expired_records_to_keep=agent_processes_to_keep) | ||
|
||
# Util | ||
async def _use_new_active_session_for_agent(self, tid: uuid.UUID, endpoint_name: str) -> Optional[protocol.Session]: | ||
""" | ||
|
@@ -943,7 +1016,7 @@ def __init__(self) -> None: | |
self._agent_procs: dict[UUID, subprocess.Process] = {} # env uuid -> subprocess.Process | ||
self.agent_lock = asyncio.Lock() # Prevent concurrent updates on _agent_procs | ||
|
||
async def get_status(self) -> dict[str, ArgumentTypes]: | ||
async def get_status(self) -> Mapping[str, ArgumentTypes]: | ||
return {"processes": len(self._agent_procs)} | ||
|
||
async def prestart(self, server: protocol.Server) -> None: | ||
|
@@ -1367,12 +1440,21 @@ async def _make_agent_config( | |
name={opt.db_name.get()} | ||
username={opt.db_username.get()} | ||
password={opt.db_password.get()} | ||
[scheduler] | ||
db-connection-pool-min-size={agent_cfg.scheduler_db_connection_pool_min_size.get()} | ||
db-connection-pool-max-size={agent_cfg.scheduler_db_connection_pool_max_size.get()} | ||
db-connection-timeout={agent_cfg.scheduler_db_connection_timeout.get()} | ||
""" | ||
[influxdb] | ||
host = {opt.influxdb_host.get()} | ||
port = {opt.influxdb_port.get()} | ||
name = {opt.influxdb_name.get()} | ||
username = {opt.influxdb_username.get()} | ||
password = {opt.influxdb_password.get()} | ||
interval = {opt.influxdb_interval.get()} | ||
tags = {opt.influxdb_tags.get()} | ||
""" | ||
return config | ||
|
||
async def _fork_inmanta( | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this deserve a change entry, considering that the new scheduler has not been publicly released? Or has it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is a user visible change, but perhaps I should add more detail?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I mean the entire scheduler is new, so I would assume that to be one change entry rather than a separate one for each prt it affects.