Skip to content

Commit

Permalink
Remove serverinfo log
Browse files Browse the repository at this point in the history
  • Loading branch information
bennybp committed May 6, 2024
1 parent ab80708 commit 6dffe19
Show file tree
Hide file tree
Showing 11 changed files with 55 additions and 540 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
"""Remove server stats log
Revision ID: 73b4838a6839
Revises: 75b80763e901
Create Date: 2024-05-06 10:54:44.383709
"""

import sqlalchemy as sa
from alembic import op
from sqlalchemy.dialects import postgresql

# revision identifiers, used by Alembic.
revision = "73b4838a6839"
down_revision = "75b80763e901"
branch_labels = None
depends_on = None


def upgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.drop_index("ix_server_stats_log_timestamp", table_name="server_stats_log", postgresql_using="brin")
op.drop_table("server_stats_log")

op.execute("DELETE FROM internal_jobs WHERE name = 'update_server_stats'")
# ### end Alembic commands ###


def downgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.create_table(
"server_stats_log",
sa.Column("id", sa.INTEGER(), autoincrement=True, nullable=False),
sa.Column("timestamp", postgresql.TIMESTAMP(timezone=True), autoincrement=False, nullable=False),
sa.Column("collection_count", sa.INTEGER(), autoincrement=False, nullable=True),
sa.Column("molecule_count", sa.BIGINT(), autoincrement=False, nullable=True),
sa.Column("record_count", sa.BIGINT(), autoincrement=False, nullable=True),
sa.Column("outputstore_count", sa.BIGINT(), autoincrement=False, nullable=True),
sa.Column("access_count", sa.BIGINT(), autoincrement=False, nullable=True),
sa.Column("db_total_size", sa.BIGINT(), autoincrement=False, nullable=True),
sa.Column("db_table_size", sa.BIGINT(), autoincrement=False, nullable=True),
sa.Column("db_index_size", sa.BIGINT(), autoincrement=False, nullable=True),
sa.Column("db_table_information", postgresql.JSON(astext_type=sa.Text()), autoincrement=False, nullable=True),
sa.Column("error_count", sa.BIGINT(), autoincrement=False, nullable=True),
sa.Column("service_queue_status", postgresql.JSON(astext_type=sa.Text()), autoincrement=False, nullable=True),
sa.Column("task_queue_status", postgresql.JSON(astext_type=sa.Text()), autoincrement=False, nullable=True),
sa.PrimaryKeyConstraint("id", name="server_stats_log_pkey"),
)
op.create_index(
"ix_server_stats_log_timestamp", "server_stats_log", ["timestamp"], unique=False, postgresql_using="brin"
)
# ### end Alembic commands ###
34 changes: 0 additions & 34 deletions qcfractal/qcfractal/components/serverinfo/db_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,40 +110,6 @@ def model_dict(self, exclude: Optional[Iterable[str]] = None) -> Dict[str, Any]:
return d


class ServerStatsLogORM(BaseORM):
"""
Table for storing server statistics
Server statistics (storage size, row count, etc) are periodically captured and
stored in this table
"""

__tablename__ = "server_stats_log"

id = Column(Integer, primary_key=True)
timestamp = Column(TIMESTAMP(timezone=True), nullable=False, default=now_at_utc)

# Raw counts
collection_count = Column(Integer)
molecule_count = Column(BigInteger)
record_count = Column(BigInteger)
outputstore_count = Column(BigInteger)
access_count = Column(BigInteger)
error_count = Column(BigInteger)

# Task & service queue status
task_queue_status = Column(JSON)
service_queue_status = Column(JSON)

# Database
db_total_size = Column(BigInteger)
db_table_size = Column(BigInteger)
db_index_size = Column(BigInteger)
db_table_information = Column(JSON)

__table_args__ = (Index("ix_server_stats_log_timestamp", "timestamp", postgresql_using="brin"),)


class MessageOfTheDayORM(BaseORM):
"""
Table for storing the Message-of-the-Day
Expand Down
15 changes: 0 additions & 15 deletions qcfractal/qcfractal/components/serverinfo/routes.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
from qcportal.serverinfo import (
AccessLogSummaryFilters,
AccessLogQueryFilters,
ServerStatsQueryFilters,
ErrorLogQueryFilters,
DeleteBeforeDateBody,
)
Expand Down Expand Up @@ -68,20 +67,6 @@ def query_access_summary_v1(url_params: AccessLogSummaryFilters):
return storage_socket.serverinfo.query_access_summary(url_params)


@api_v1.route("/server_stats/query", methods=["POST"])
@wrap_route("READ")
def query_server_stats_v1(body_data: ServerStatsQueryFilters):
max_limit = current_app.config["QCFRACTAL_CONFIG"].api_limits.get_server_stats
body_data.limit = calculate_limit(max_limit, body_data.limit)
return storage_socket.serverinfo.query_server_stats(body_data)


@api_v1.route("/server_stats/bulkDelete", methods=["POST"])
@wrap_route("DELETE")
def delete_server_stats_v1(body_data: DeleteBeforeDateBody):
return storage_socket.serverinfo.delete_server_stats(before=body_data.before)


@api_v1.route("/server_errors/query", methods=["POST"])
@wrap_route("READ")
def query_error_log_v1(body_data: ErrorLogQueryFilters):
Expand Down
195 changes: 2 additions & 193 deletions qcfractal/qcfractal/components/serverinfo/socket.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,25 +11,19 @@
from typing import TYPE_CHECKING

import requests
from sqlalchemy import and_, or_, func, text, select, delete
from sqlalchemy import and_, or_, func, select, delete
from sqlalchemy.orm import load_only

import qcfractal
from qcfractal.components.auth.db_models import UserIDMapSubquery
from qcfractal.components.dataset_db_models import BaseDatasetORM
from qcfractal.components.molecules.db_models import MoleculeORM
from qcfractal.components.record_db_models import BaseRecordORM, OutputStoreORM
from qcfractal.components.services.db_models import ServiceQueueORM
from qcfractal.components.tasks.db_models import TaskQueueORM
from qcfractal.db_socket.helpers import get_query_proj_options
from qcportal.serverinfo import (
AccessLogQueryFilters,
AccessLogSummaryFilters,
ErrorLogQueryFilters,
ServerStatsQueryFilters,
)
from qcportal.utils import now_at_utc
from .db_models import AccessLogORM, InternalErrorLogORM, ServerStatsLogORM, MessageOfTheDayORM, ServerStatsMetadataORM
from .db_models import AccessLogORM, InternalErrorLogORM, MessageOfTheDayORM, ServerStatsMetadataORM

if TYPE_CHECKING:
from sqlalchemy.orm.session import Session
Expand All @@ -54,7 +48,6 @@ class ServerInfoSocket:
def __init__(self, root_socket: SQLAlchemySocket):
self.root_socket = root_socket
self._logger = logging.getLogger(__name__)
self._server_stats_frequency = root_socket.qcf_config.statistics_frequency

self._geoip2_dir = root_socket.qcf_config.geoip2_dir
self._geoip2_file_path = os.path.join(self._geoip2_dir, root_socket.qcf_config.geoip2_filename)
Expand All @@ -79,40 +72,12 @@ def __init__(self, root_socket: SQLAlchemySocket):
"GeoIP2 package not found. To include locations in access logs, install the geoip2 package"
)

# Server stats job. Don't do it right at startup
self.add_internal_job_server_stats(self._server_stats_frequency)

# Updating the geolocation database file
self.add_internal_job_update_geoip2(0.0)

# Updating the access log with geolocation info. Don't do it right at startup
self.add_internal_job_geolocate_accesses(self._geolocate_accesses_frequency)

def add_internal_job_server_stats(self, delay: float, *, session: Optional[Session] = None):
"""
Adds an internal job to update the server statistics
Parameters
----------
delay
Schedule for this many seconds in the future
session
An existing SQLAlchemy session to use. If None, one will be created. If an existing session
is used, it will be flushed (but not committed) before returning from this function.
"""
with self.root_socket.optional_session(session) as session:
self.root_socket.internal_jobs.add(
"update_server_stats",
now_at_utc() + timedelta(seconds=delay),
"serverinfo.update_server_stats",
{},
user_id=None,
unique_name=True,
after_function="serverinfo.add_internal_job_server_stats",
after_function_kwargs={"delay": self._server_stats_frequency},
session=session,
)

def add_internal_job_update_geoip2(self, delay: float, *, session: Optional[Session] = None):
"""
Adds an internal job to update the geoip database
Expand Down Expand Up @@ -386,92 +351,6 @@ def save_error(self, error_data: Dict[str, Any], *, session: Optional[Session] =
session.flush()
return log.id

def update_server_stats(self, session: Session, job_progress: JobProgress) -> None:
"""
Obtains some statistics about the server and stores them in the database
Parameters
----------
session
An existing SQLAlchemy session to use. If None, one will be created. If an existing session
is used, it will be flushed (but not committed) before returning from this function.
"""

table_list = [BaseDatasetORM, MoleculeORM, BaseRecordORM, OutputStoreORM, AccessLogORM, InternalErrorLogORM]
db_name = self.root_socket.qcf_config.database.database_name

table_counts = {}
with self.root_socket.optional_session(session) as session:
# total size of the database
db_size = session.execute(text("SELECT pg_database_size(:dbname)"), {"dbname": db_name}).scalar()

# Count the number of rows in each table
for table in table_list:
table_name = table.__tablename__
table_counts[table_name] = session.execute(text(f"SELECT count(*) FROM {table_name}")).scalar()

table_info_sql = f"""
SELECT relname AS table_name
, c.reltuples::BIGINT AS row_estimate
, pg_total_relation_size(c.oid) AS total_bytes
, pg_indexes_size(c.oid) AS index_bytes
, pg_total_relation_size(reltoastrelid) AS toast_bytes
FROM pg_class c
LEFT JOIN pg_namespace n ON n.oid = c.relnamespace
WHERE relkind = 'r' AND relname NOT LIKE 'pg_%' AND relname NOT LIKE 'sql_%';
"""

table_info_result = session.execute(text(table_info_sql)).fetchall()

table_info_rows = [list(r) for r in table_info_result]
table_info = {
"columns": ["table_name", "row_estimate", "total_bytes", "index_bytes", "toast_bytes"],
"rows": table_info_rows,
}

# Task queue and Service queue status
task_query = (
session.query(BaseRecordORM.record_type, BaseRecordORM.status, func.count(TaskQueueORM.id))
.join(BaseRecordORM, BaseRecordORM.id == TaskQueueORM.record_id)
.group_by(BaseRecordORM.record_type, BaseRecordORM.status)
.all()
)
task_stats = {"columns": ["record_type", "status", "count"], "rows": [list(r) for r in task_query]}

service_query = (
session.query(BaseRecordORM.record_type, BaseRecordORM.status, func.count(ServiceQueueORM.id))
.join(BaseRecordORM, BaseRecordORM.id == ServiceQueueORM.record_id)
.group_by(BaseRecordORM.record_type, BaseRecordORM.status)
.all()
)
service_stats = {"columns": ["record_type", "status", "count"], "rows": [list(r) for r in service_query]}

# Calculate combined table info
table_size = 0
index_size = 0
for row in table_info_rows:
table_size += row[2] - row[3] - (row[4] or 0)
index_size += row[3]

# Build out final data
data = {
"collection_count": table_counts[BaseDatasetORM.__tablename__],
"molecule_count": table_counts[MoleculeORM.__tablename__],
"record_count": table_counts[BaseRecordORM.__tablename__],
"outputstore_count": table_counts[OutputStoreORM.__tablename__],
"access_count": table_counts[AccessLogORM.__tablename__],
"error_count": table_counts[InternalErrorLogORM.__tablename__],
"task_queue_status": task_stats,
"service_queue_status": service_stats,
"db_total_size": db_size,
"db_table_size": table_size,
"db_index_size": index_size,
"db_table_information": table_info,
}

log = ServerStatsLogORM(**data)
session.add(log)

def query_access_log(
self,
query_data: AccessLogQueryFilters,
Expand Down Expand Up @@ -697,54 +576,6 @@ def query_error_log(

return result_dicts

def query_server_stats(
self,
query_data: ServerStatsQueryFilters,
*,
session: Optional[Session] = None,
) -> List[Dict[str, Any]]:
"""
General query of server statistics
All search criteria are merged via 'and'. Therefore, records will only
be found that match all the criteria.
Parameters
----------
query_data
Fields/filters to query for
session
An existing SQLAlchemy session to use. If None, one will be created. If an existing session
is used, it will be flushed (but not committed) before returning from this function.
Returns
-------
:
A list of server statistic entries (as dictionaries) that were found in the database.
"""

and_query = []
if query_data.before:
and_query.append(ServerStatsLogORM.timestamp <= query_data.before)
if query_data.after:
and_query.append(ServerStatsLogORM.timestamp >= query_data.after)

with self.root_socket.optional_session(session, True) as session:
stmt = select(ServerStatsLogORM).filter(and_(True, *and_query))

if query_data.cursor is not None:
stmt = stmt.where(ServerStatsLogORM.id < query_data.cursor)

stmt = stmt.order_by(ServerStatsLogORM.id.desc())
stmt = stmt.limit(query_data.limit)
stmt = stmt.distinct(ServerStatsLogORM.id)
results = session.execute(stmt).scalars().all()

# TODO - could be done in sql query (with subquery?)
result_dicts = [x.model_dict() for x in sorted(results, key=lambda x: x.timestamp, reverse=True)]

return result_dicts

def delete_access_logs(self, before: datetime, *, session: Optional[Session] = None) -> int:
"""
Deletes access logs that were created before a certain date & time
Expand Down Expand Up @@ -788,25 +619,3 @@ def delete_error_logs(self, before: datetime, *, session: Optional[Session] = No
stmt = delete(InternalErrorLogORM).where(InternalErrorLogORM.error_date < before)
r = session.execute(stmt)
return r.rowcount

def delete_server_stats(self, before: datetime, *, session: Optional[Session] = None) -> int:
"""
Deletes server statistics that were created before a certain date & time
Parameters
----------
before
Delete server stats before this time
session
An existing SQLAlchemy session to use. If None, one will be created. If an existing session
is used, it will be flushed (but not committed) before returning from this function.
Returns
-------
The number of deleted entries
"""

with self.root_socket.optional_session(session, False) as session:
stmt = delete(ServerStatsLogORM).where(ServerStatsLogORM.timestamp < before)
r = session.execute(stmt)
return r.rowcount
Loading

0 comments on commit 6dffe19

Please sign in to comment.