Skip to content

Commit

Permalink
Remove compute manager log
Browse files Browse the repository at this point in the history
  • Loading branch information
bennybp committed May 6, 2024
1 parent 0642482 commit ab80708
Show file tree
Hide file tree
Showing 8 changed files with 51 additions and 138 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
"""Remove compute manager log
Revision ID: 75b80763e901
Revises: f31c7897345f
Create Date: 2024-05-06 10:08:30.711531
"""

import sqlalchemy as sa
from alembic import op
from sqlalchemy.dialects import postgresql

# revision identifiers, used by Alembic.
revision = "75b80763e901"
down_revision = "f31c7897345f"
branch_labels = None
depends_on = None


def upgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.drop_index("ix_compute_manager_log_manager_id", table_name="compute_manager_log")
op.drop_table("compute_manager_log")
# ### end Alembic commands ###


def downgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.create_table(
"compute_manager_log",
sa.Column("id", sa.INTEGER(), autoincrement=True, nullable=False),
sa.Column("manager_id", sa.INTEGER(), autoincrement=False, nullable=False),
sa.Column("timestamp", postgresql.TIMESTAMP(timezone=True), autoincrement=False, nullable=False),
sa.Column("successes", sa.INTEGER(), autoincrement=False, nullable=False),
sa.Column("claimed", sa.INTEGER(), autoincrement=False, nullable=False),
sa.Column("failures", sa.INTEGER(), autoincrement=False, nullable=False),
sa.Column("total_cpu_hours", sa.DOUBLE_PRECISION(precision=53), autoincrement=False, nullable=False),
sa.Column("active_tasks", sa.INTEGER(), autoincrement=False, nullable=False),
sa.Column("active_cores", sa.INTEGER(), autoincrement=False, nullable=False),
sa.Column("active_memory", sa.DOUBLE_PRECISION(precision=53), autoincrement=False, nullable=False),
sa.Column("rejected", sa.INTEGER(), autoincrement=False, nullable=False),
sa.ForeignKeyConstraint(
["manager_id"], ["compute_manager.id"], name="compute_manager_log_manager_id_fkey", ondelete="CASCADE"
),
sa.PrimaryKeyConstraint("id", name="compute_manager_log_pkey"),
)
op.create_index("ix_compute_manager_log_manager_id", "compute_manager_log", ["manager_id"], unique=False)
# ### end Alembic commands ###
35 changes: 0 additions & 35 deletions qcfractal/qcfractal/components/managers/db_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,34 +19,6 @@
from qcportal.utils import now_at_utc


class ComputeManagerLogORM(BaseORM):
"""
Table for storing manager logs
This contains information about a manager at a particular point in time. This table
is periodically appended to, with updated information about a manager.
"""

__tablename__ = "compute_manager_log"

id = Column(Integer, primary_key=True)
manager_id = Column(Integer, ForeignKey("compute_manager.id", ondelete="cascade"), nullable=False)

timestamp = Column(TIMESTAMP(timezone=True), default=now_at_utc, nullable=False)

claimed = Column(Integer, nullable=False)
successes = Column(Integer, nullable=False)
failures = Column(Integer, nullable=False)
rejected = Column(Integer, nullable=False)

active_tasks = Column(Integer, nullable=False, default=0)
active_cores = Column(Integer, nullable=False, default=0)
active_memory = Column(Float, nullable=False, default=0.0)
total_cpu_hours = Column(Float, nullable=False, default=0.0)

__table_args__ = (Index("ix_compute_manager_log_manager_id", "manager_id"),)


class ComputeManagerORM(BaseORM):
"""
Table for storing information about active and inactive compute managers
Expand Down Expand Up @@ -81,13 +53,6 @@ class ComputeManagerORM(BaseORM):
manager_version = Column(String, nullable=False)
programs = Column(JSON, nullable=False)

log = relationship(
ComputeManagerLogORM,
order_by=ComputeManagerLogORM.timestamp.desc(),
cascade="all, delete-orphan",
passive_deletes=True,
)

__table_args__ = (
Index("ix_compute_manager_status", "status"),
Index("ix_compute_manager_modified_on", "modified_on", postgresql_using="brin"),
Expand Down
45 changes: 3 additions & 42 deletions qcfractal/qcfractal/components/managers/socket.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from qcportal.exceptions import MissingDataError, ComputeManagerError
from qcportal.managers import ManagerStatusEnum, ManagerName, ManagerQueryFilters
from qcportal.utils import now_at_utc
from .db_models import ComputeManagerLogORM, ComputeManagerORM
from .db_models import ComputeManagerORM

if TYPE_CHECKING:
from sqlalchemy.orm.session import Session
Expand Down Expand Up @@ -60,26 +60,6 @@ def add_internal_job_check_heartbeats(self, delay: float, *, session: Optional[S
session=session,
)

@staticmethod
def save_snapshot(orm: ComputeManagerORM):
"""
Saves the statistics of a manager to its log entries
"""

log_orm = ComputeManagerLogORM(
claimed=orm.claimed,
successes=orm.successes,
failures=orm.failures,
rejected=orm.rejected,
active_tasks=orm.active_tasks,
active_cores=orm.active_cores,
active_memory=orm.active_memory,
total_cpu_hours=orm.total_cpu_hours,
timestamp=orm.modified_on,
)

orm.log.append(log_orm)

def activate(
self,
name_data: ManagerName,
Expand Down Expand Up @@ -141,13 +121,12 @@ def update_resource_stats(
session: Optional[Session] = None,
):
"""
Updates the resources available/in use by a manager, and saves it to its log entries
Updates the resources available/in use by a manager
"""

with self.root_socket.optional_session(session) as session:
stmt = (
select(ComputeManagerORM)
.options(selectinload(ComputeManagerORM.log))
.where(ComputeManagerORM.name == name)
.with_for_update(skip_locked=False)
)
Expand All @@ -164,8 +143,6 @@ def update_resource_stats(
manager.total_cpu_hours = total_cpu_hours
manager.modified_on = now_at_utc()

self.save_snapshot(manager)

def deactivate(
self,
name: Optional[Iterable[str]] = None,
Expand Down Expand Up @@ -347,20 +324,4 @@ def check_manager_heartbeats(self, session: Session, job_progress: JobProgress)
dead_managers = self.deactivate(modified_before=dt, reason="missing heartbeat", session=session)

if dead_managers:
self._logger.info(f"Deactivated {len(dead_managers)} managers due to missing heartbeats")

####################################################
# Some stuff to be retrieved for managers
####################################################

def get_log(self, name: str, *, session: Optional[Session] = None) -> List[Dict[str, Any]]:
stmt = select(ComputeManagerORM)
stmt = stmt.options(defer("*"), lazyload("*"))
stmt = stmt.options(joinedload(ComputeManagerORM.log).options(undefer("*")))
stmt = stmt.where(ComputeManagerORM.name == name)

with self.root_socket.optional_session(session) as session:
rec = session.execute(stmt).unique().scalar_one_or_none()
if rec is None:
raise MissingDataError(f"Cannot find manager {name}")
return [x.model_dict() for x in rec.log]
self._logger.info(f"Deactivated {len(dead_managers)} managers due to missing heartbeats")
2 changes: 0 additions & 2 deletions qcfractal/qcfractal/components/managers/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ def test_manager_client_get(snowflake: QCATestingSnowflake):
assert manager[1].modified_on > time_0
assert manager[1].created_on < time_1
assert manager[1].modified_on < time_1
assert manager[1].log is not None

assert manager[0].name == name2
assert manager[0].tags == ["tag1"]
Expand All @@ -61,7 +60,6 @@ def test_manager_client_get(snowflake: QCATestingSnowflake):
assert manager[0].modified_on > time_1
assert manager[0].created_on < time_2
assert manager[0].modified_on < time_2
assert manager[0].log is not None

assert manager[2].id == manager[1].id
assert manager[3].id == manager[0].id
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ def test_manager_client_query(queryable_managers_client: PortalClient):
query_res = queryable_managers_client.query_managers(manager_id=[managers[0].id, managers[1].id])
query_res_l = list(query_res)
assert len(query_res_l) == 2
assert all(x.log is not None for x in query_res_l)


def test_manager_client_query_empty_iter(queryable_managers_client: PortalClient):
Expand Down
19 changes: 0 additions & 19 deletions qcfractal/qcfractal/components/managers/test_manager_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -235,15 +235,13 @@ def test_manager_mclient_heartbeat(snowflake: QCATestingSnowflake):

client = snowflake.client()
manager = client.get_managers(name1)
assert len(manager.log) == 0

# Now do a heartbeat
mclient1.heartbeat(total_cpu_hours=5.678, active_tasks=3, active_cores=10, active_memory=3.45)

time_2 = now_at_utc()

manager = client.get_managers(name1)
assert len(manager.log) == 1

# Was the data stored in the manager
assert manager.total_cpu_hours == 5.678
Expand All @@ -253,14 +251,6 @@ def test_manager_mclient_heartbeat(snowflake: QCATestingSnowflake):
assert manager.modified_on > time_1
assert manager.modified_on < time_2

# and the log
log = manager.log[0]
assert log.total_cpu_hours == 5.678
assert log.active_tasks == 3
assert log.active_cores == 10
assert log.active_memory == 3.45
assert log.timestamp == manager.modified_on

# Now do another heartbeat
mclient1.heartbeat(
total_cpu_hours=2 * 5.678,
Expand All @@ -272,7 +262,6 @@ def test_manager_mclient_heartbeat(snowflake: QCATestingSnowflake):
time_3 = now_at_utc()

manager = client.get_managers(name1)
assert len(manager.log) == 2

# Was the data stored in the manager
assert manager.total_cpu_hours == 2 * 5.678
Expand All @@ -282,14 +271,6 @@ def test_manager_mclient_heartbeat(snowflake: QCATestingSnowflake):
assert manager.modified_on > time_2
assert manager.modified_on < time_3

# and the log
log = manager.log[0]
assert log.total_cpu_hours == 2 * 5.678
assert log.active_tasks == 2 * 3
assert log.active_cores == 2 * 10
assert log.active_memory == 2 * 3.45
assert log.timestamp == manager.modified_on


def test_manager_mclient_heartbeat_deactivated(snowflake: QCATestingSnowflake):
mname1 = ManagerName(cluster="test_cluster", hostname="a_host", uuid="1234-5678-1234-5678")
Expand Down
1 change: 0 additions & 1 deletion qcportal/qcportal/managers/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
from .models import (
ManagerStatusEnum,
ManagerName,
ComputeManagerLogEntry,
ComputeManager,
ManagerActivationBody,
ManagerUpdateBody,
Expand Down
38 changes: 0 additions & 38 deletions qcportal/qcportal/managers/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,26 +55,6 @@ def __str__(self):
return self.fullname


class ComputeManagerLogEntry(BaseModel):
class Config:
extra = Extra.forbid

id: int
manager_id: int

claimed: int
successes: int
failures: int
rejected: int

total_cpu_hours: float
active_tasks: int
active_cores: int
active_memory: float

timestamp: datetime


class ComputeManager(BaseModel):
class Config:
extra = Extra.forbid
Expand Down Expand Up @@ -103,31 +83,13 @@ class Config:
manager_version: str
programs: Dict[str, List[str]]

log_: Optional[List[ComputeManagerLogEntry]] = None

_client: Any = PrivateAttr(None)
_base_url: Optional[str] = PrivateAttr(None)

def propagate_client(self, client):
self._client = client
self._base_url = f"api/v1/managers/{self.name}"

def _fetch_log(self):
if self._client is None:
raise RuntimeError("This manager object is not connected to a client")

self.log_ = self._client.make_request(
"get",
f"{self._base_url}/log",
List[ComputeManagerLogEntry],
)

@property
def log(self):
if self.log_ is None:
self._fetch_log()
return self.log_


class ManagerActivationBody(RestModelBase):
name_data: ManagerName = Field(..., description="Name information about this manager")
Expand Down

0 comments on commit ab80708

Please sign in to comment.