Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue/pool reporting #8293

Open
wants to merge 14 commits into
base: master
Choose a base branch
from
3 changes: 1 addition & 2 deletions src/inmanta/data/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -862,7 +862,6 @@ def has_source(self) -> bool:


class DataBaseReport(BaseModel):

connected: bool
database: str
host: str
Expand All @@ -879,7 +878,7 @@ def __add__(self, other: "DataBaseReport") -> "DataBaseReport":
if other.host != self.host:
return NotImplemented
return DataBaseReport(
connected=self.connected,
connected=self.connected and other.connected,
database=self.database,
host=self.host,
max_pool=self.max_pool + other.max_pool,
Expand Down
19 changes: 10 additions & 9 deletions src/inmanta/server/agentmanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,8 +201,8 @@ async def get_report(env: uuid.UUID, session: protocol.Session) -> tuple[uuid.UU
# default to uuid's
uuid_to_name = {}

# Filter out timeouts
total: DataBaseReport = reduce(lambda x, y: x + y, (x[1] for x in results if not isinstance(x, BaseException)))
# Filter out timeouts and other errors
good_reports = [x[1] for x in results if not isinstance(x, BaseException)]

def short_report(report: DataBaseReport) -> Mapping[str, ArgumentTypes]:
return {
Expand All @@ -213,7 +213,8 @@ def short_report(report: DataBaseReport) -> Mapping[str, ArgumentTypes]:
"pool_exhaustion_count": report.pool_exhaustion_count,
}

if total:
if good_reports:
total: DataBaseReport = reduce(lambda x, y: x + y, good_reports)
out["database"] = total.database
out["host"] = total.host
out["total"] = short_report(total)
Expand Down Expand Up @@ -251,6 +252,12 @@ async def start(self) -> None:
await self._expire_all_sessions_in_db()

self.add_background_task(self._process_session_listener_actions())
# Schedule cleanup agentprocess and agentinstance tables
agent_process_purge_interval = opt.agent_process_purge_interval.get()
if agent_process_purge_interval > 0:
self.schedule(
self._purge_agent_processes, interval=agent_process_purge_interval, initial_delay=0, cancel_on_stop=False
)
Hugo-Inmanta marked this conversation as resolved.
Show resolved Hide resolved

async def prestop(self) -> None:
await super().prestop()
Expand Down Expand Up @@ -1029,12 +1036,6 @@ async def prestart(self, server: protocol.Server) -> None:
async def start(self) -> None:
await super().start()
self.add_background_task(self._start_agents())
# Schedule cleanup agentprocess and agentinstance tables
agent_process_purge_interval = opt.agent_process_purge_interval.get()
if agent_process_purge_interval > 0:
self.schedule(
self._purge_agent_processes, interval=agent_process_purge_interval, initial_delay=0, cancel_on_stop=False
)

async def prestop(self) -> None:
await super().prestop()
Expand Down
48 changes: 27 additions & 21 deletions src/inmanta/server/services/databaseservice.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,13 @@ def __init__(
self.component = component
self.environment = environment

# Influxdb part
self.infuxdb_suffix = f",component={self.component}"
if self.environment:
self.infuxdb_suffix += f",environment={self.environment}"

self.registered_guages: list[str] = []
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
self.registered_guages: list[str] = []
self.registered_gauges: list[str] = []


def start(self) -> None:
self.start_monitor()

Expand Down Expand Up @@ -74,44 +81,43 @@ async def get_status(self) -> DataBaseReport:
pool_exhaustion_count=self._db_pool_watcher._exhausted_pool_events_count,
)

def _add_guage(self, name: str, the_gauge: CallbackGauge) -> None:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
def _add_guage(self, name: str, the_gauge: CallbackGauge) -> None:
def _add_gauge(self, name: str, the_gauge: CallbackGauge) -> None:

"""Helper to register gauges and keep track of registrations"""
name = name + self.infuxdb_suffix
gauge(name, the_gauge)
self.registered_guages.append(name)

def start_monitor(self) -> None:
"""Attach to monitoring system"""
suffix = f",component={self.component}"
if self.environment:
suffix += f",environment={self.environment}"

gauge(
"db.connected" + suffix,
self._add_guage(
"db.connected",
CallbackGauge(
callback=lambda: 1 if (self._pool is not None and not self._pool._closing and not self._pool._closed) else 0
),
)
gauge(
"db.max_pool" + suffix, CallbackGauge(callback=lambda: self._pool.get_max_size() if self._pool is not None else 0)
self._add_guage(
"db.max_pool", CallbackGauge(callback=lambda: self._pool.get_max_size() if self._pool is not None else 0)
)
gauge(
"db.open_connections" + suffix,
self._add_guage(
"db.open_connections",
CallbackGauge(callback=lambda: self._pool.get_size() if self._pool is not None else 0),
)
gauge(
"db.free_connections" + suffix,
self._add_guage(
"db.free_connections",
CallbackGauge(callback=lambda: self._pool.get_idle_size() if self._pool is not None else 0),
)
gauge(
"db.pool_exhaustion_count" + suffix,
self._add_guage(
"db.pool_exhaustion_count",
CallbackGauge(callback=lambda: self._db_pool_watcher._exhausted_pool_events_count),
)

def stop_monitor(self) -> None:
suffix = f",component={self.component}"
if self.environment:
suffix += f",environment={self.environment}"
"""Disconnect form pyformance"""
for key in self.registered_guages:
global_registry()._gauges.pop(key, None)

global_registry()._gauges.pop("db.connected" + suffix, None)
global_registry()._gauges.pop("db.max_pool" + suffix, None)
global_registry()._gauges.pop("db.open_connections" + suffix, None)
global_registry()._gauges.pop("db.free_connections" + suffix, None)
global_registry()._gauges.pop("db.pool_exhaustion_count" + suffix, None)
self.registered_guages.clear()

async def get_connection_status(self) -> bool:
if self._pool is not None and not self._pool._closing and not self._pool._closed:
Expand Down
1 change: 0 additions & 1 deletion tests/test_influxdbreporting.py
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,6 @@ async def test_available_metrics(server):

# ensure it doesn't crash when the server is down
await server.stop()
metrics = global_registry().dump_metrics()


async def test_safeness_if_server_down():
Expand Down