Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Managers claiming/returning should be marked as modified #851

Merged
merged 1 commit into from
Oct 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions qcfractal/qcfractal/components/tasks/socket.py
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,9 @@ def update_finished(
manager.failures += len(tasks_failures)
manager.rejected += len(tasks_rejected)

# Mark that we have heard from the manager
manager.modified_on = now_at_utc()

# Automatically reset ones that should be reset
if self.root_socket.qcf_config.auto_reset.enabled and to_be_reset:
self._logger.info(f"Auto resetting {len(to_be_reset)} records")
Expand Down Expand Up @@ -341,6 +344,9 @@ def claim_tasks(

manager.claimed += len(found)

# Mark that we have heard from the manager
manager.modified_on = now_at_utc()

self._logger.info(f"Manager {manager_name} has claimed {len(found)} new tasks")

return found
62 changes: 61 additions & 1 deletion qcfractal/qcfractal/components/tasks/test_socket_claim.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@

from __future__ import annotations

from qcarchivetesting.testing_classes import QCATestingSnowflake
from typing import TYPE_CHECKING

import pytest

from qcarchivetesting.testing_classes import QCATestingSnowflake
from qcfractal.components.managers.db_models import ComputeManagerORM
from qcfractal.components.optimization.testing_helpers import load_test_data as load_opt_test_data
from qcfractal.components.record_db_models import BaseRecordORM
Expand All @@ -18,9 +18,11 @@
generate_task_key as generate_td_task_key,
)
from qcfractal.testing_helpers import run_service
from qcfractalcompute.compress import compress_result
from qcportal.exceptions import ComputeManagerError
from qcportal.managers import ManagerName
from qcportal.record_models import PriorityEnum
from qcportal.utils import now_at_utc

if TYPE_CHECKING:
from qcfractal.db_socket import SQLAlchemySocket
Expand All @@ -35,6 +37,64 @@
input_spec_7, molecule_7, result_data_7 = load_sp_test_data("sp_rdkit_benzene_energy")


def test_task_socket_update_manager_time(storage_socket: SQLAlchemySocket, session: Session):
# Test that claiming & returning updates the modified_on time of the manager
mname1 = ManagerName(cluster="test_cluster", hostname="a_host1", uuid="1234-5678-1234-5678")

mprog1 = {"qcengine": ["unknown"], "psi4": ["unknown"], "geometric": ["v3.0"]}

mid_1 = storage_socket.managers.activate(
name_data=mname1,
manager_version="v2.0",
username="bill",
programs=mprog1,
tags=["tag1"],
)

meta, id_1 = storage_socket.records.singlepoint.add(
[molecule_1], input_spec_1, "tag1", PriorityEnum.low, None, None, True
)

# claim up to two tasks
t0 = now_at_utc()
tasks = storage_socket.tasks.claim_tasks(mname1.fullname, mprog1, ["tag1"], 2)
t1 = now_at_utc()
assert len(tasks) == 1

# Check assignments
session.expire_all()

m1 = session.get(ComputeManagerORM, mid_1)
assert m1.claimed == 1
assert t0 < m1.modified_on < t1

# Claiming but getting nothing also updates the time
t0 = now_at_utc()
tasks0 = storage_socket.tasks.claim_tasks(mname1.fullname, mprog1, ["tag1"], 2)
t1 = now_at_utc()
assert len(tasks0) == 0

session.expire_all()
m1 = session.get(ComputeManagerORM, mid_1)
assert m1.claimed == 1
assert t0 < m1.modified_on < t1

# Return the task data
t0 = now_at_utc()
rmeta = storage_socket.tasks.update_finished(
mname1.fullname,
{
tasks[0]["id"]: compress_result(result_data_1.dict()),
},
)
t1 = now_at_utc()

assert rmeta.n_accepted == 1
session.expire_all()
m1 = session.get(ComputeManagerORM, mid_1)
assert t0 < m1.modified_on < t1


def test_task_socket_claim_mixed(storage_socket: SQLAlchemySocket, session: Session):
mname1 = ManagerName(cluster="test_cluster", hostname="a_host1", uuid="1234-5678-1234-5678")
mname2 = ManagerName(cluster="test_cluster", hostname="a_host2", uuid="2234-5678-1234-5678")
Expand Down