Skip to content

Commit

Permalink
chore: update queue API
Browse files Browse the repository at this point in the history
  • Loading branch information
sutyum committed Oct 5, 2023
1 parent 25162cf commit 6437b4a
Show file tree
Hide file tree
Showing 7 changed files with 81 additions and 73 deletions.
8 changes: 5 additions & 3 deletions openoligo/api/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,12 @@ async def set_completed_now(task_id: int):

async def set_failed_now(task_id: int):
"""Set the completed_at timestamp of a synthesis task."""
await update_task_status(task_id, TaskStatus.FAILED)
await update_task_status(task_id, TaskStatus.SYNTHESIS_FAILED)


async def set_task_in_progress(task_id: int):
"""Set the status of a synthesis task."""
await SynthesisTask.filter(id=task_id).update(status=TaskStatus.IN_PROGRESS)
await SynthesisTask.filter(id=task_id).update(status=TaskStatus.SYNTHESIS_IN_PROGRESS)


async def set_log_file(task_id: int, log_file: str):
Expand Down Expand Up @@ -79,5 +79,7 @@ async def create_new_reactant(name: str, accronym: str, volume: float) -> None:
async def get_next_task() -> Optional[SynthesisTask]:
"""Get the next synthesis task."""
return (
await SynthesisTask.filter(status=TaskStatus.QUEUED).order_by("-rank", "created_at").first()
await SynthesisTask.filter(status=TaskStatus.WAITING_IN_QUEUE)
.order_by("-rank", "created_at")
.first()
)
63 changes: 54 additions & 9 deletions openoligo/api/models.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
"""
Tortoise ORM Models for the OpenOligo API
"""
import json
import re
from dataclasses import dataclass
from enum import Enum
from typing import Optional

from pydantic import BaseModel
from tortoise import fields
from tortoise.contrib.pydantic import pydantic_model_creator
from tortoise.exceptions import ValidationError
Expand All @@ -23,10 +26,13 @@
class TaskStatus(str, Enum):
"""Status of a synthesis task."""

QUEUED = "queued"
IN_PROGRESS = "in_progress"
COMPLETE = "complete"
FAILED = "failed"
ATTEMPTING_TO_SEND = ("Attempting to Send to Instrument",)
WAITING_IN_QUEUE = ("Waiting in Queue",)
WAITING_TO_INITIATE = ("Waiting to Initiate Synthesis",)
SYNTHESIS_IN_PROGRESS = ("Synthesis in Progress",)
SYNTHESIS_COMPLETE = ("Synthesis Complete",)
SYNTHESIS_FAILED = ("Synthesis Failed",)
SYNTHESIS_CANCELLED = ("Synthesis Cancelled",)


class ReactantType(str, Enum):
Expand Down Expand Up @@ -132,17 +138,56 @@ class Nucleotide(Model):
)


# Some Code Duplication is Better than Over Engineering
class NucleotideModel(BaseModel):
"""NucleotideModel for the Nucleotide class"""

accronym: Optional[str]
backbone: Backbone
sugar: Sugar
base: Nucleobase

class Config:
json_encoders = {
"accronym": lambda accronym: accronym.value,
"backbone": lambda backbone: backbone.value,
"sugar": lambda sugar: sugar.value,
"base": lambda base: base.value,
}


# NucleotidesModel = list[NucleotideModel] # but needs to be a pydantic model
class NucleotidesModel(BaseModel):
"""NucleotidesModel for the Nucleotide class"""

__root__: list[NucleotideModel]

class Config:
json_encoders = {
"NucelotidesModel": lambda lst: [item.dict() for item in lst],
}


def json_to_seq(raw_json: str | bytes) -> NucleotidesModel:
"""Convert json to a Nucleotide[]"""
seq: NucleotidesModel = NucleotidesModel.parse_raw(raw_json)
return seq


def seq_to_json(seq: NucleotidesModel) -> str:
"""Convert a sequence(Nucleotide[]) to a json"""
return seq.json()


class SynthesisTask(Model):
"""A synthesis task in the queue."""

id = fields.IntField(pk=True, autoincrement=True, description="Synthesis ID")

sequence = fields.TextField(
validators=[ValidSeq()],
description="Sequence of the Nucliec Acid to synthesize",
)
sequence = fields.JSONField(decoder=json_to_seq, encoder=seq_to_json)

status = fields.CharEnumField(TaskStatus, default=TaskStatus.QUEUED)
solid_support = fields.CharEnumField(SolidSupport, default=SolidSupport.UNIVERSAL)
status = fields.CharEnumField(TaskStatus, default=TaskStatus.WAITING_IN_QUEUE)

rank = fields.IntField(
default=0,
Expand Down
8 changes: 7 additions & 1 deletion openoligo/hal/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,13 @@
from enum import Enum
from typing import Iterator, Protocol

from openoligo.hal.platform import PLATFORM_TO_BOARD, MinimumCommonPinout, Platform, __platform__, rpi_board_pins
from openoligo.hal.platform import (
PLATFORM_TO_BOARD,
MinimumCommonPinout,
Platform,
__platform__,
rpi_board_pins,
)


class Board:
Expand Down
6 changes: 3 additions & 3 deletions openoligo/scripts/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@
from typing import Optional

from openoligo.api.db import db_init, get_db_url
from openoligo.api.helpers import get_next_task # set_failed_now,
from openoligo.api.helpers import (
get_next_task, # set_failed_now,
set_completed_now,
set_log_file,
set_started_now,
Expand Down Expand Up @@ -58,7 +58,7 @@ async def worker():
rl.change_log_file(name)

await set_started_now(task.id)
await update_task_status(task.id, TaskStatus.IN_PROGRESS)
await update_task_status(task.id, TaskStatus.SYNTHESIS_IN_PROGRESS)
logger.info("Starting task %d", task.id)

# Execute the task
Expand All @@ -67,7 +67,7 @@ async def worker():
inst.pressure_off()

await set_completed_now(task.id)
await update_task_status(task.id, TaskStatus.COMPLETE)
await update_task_status(task.id, TaskStatus.SYNTHESIS_COMPLETE)
logger.info("Task %d complete", task.id)


Expand Down
63 changes: 10 additions & 53 deletions openoligo/scripts/server.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
"""
Script to start the REST API server for OpenOligo.
"""
import uuid
from typing import Optional

import requests
import uvicorn
from fastapi import Body, FastAPI, HTTPException, Query, status
from tortoise.exceptions import ValidationError
Expand All @@ -13,19 +11,20 @@

# from openoligo.api.models import Settings # pylint: disable=unused-import
from openoligo.api.models import ( # EssentialReagentsModel,; SequencePartReagentsModel,
InstrumentHealth,
NucleotidesModel,
Reactant,
ReactantModel,
ReactantType,
Settings,
SettingsModel,
SolidSupport,
SynthesisTask,
SynthesisTaskModel,
TaskStatus,
ValidSeq,
InstrumentHealth
)
from openoligo.hal.platform import __platform__
from openoligo.seq import SeqCategory
from openoligo.utils.logger import OligoLogger

ol = OligoLogger(name="server", rotates=True)
Expand Down Expand Up @@ -78,56 +77,12 @@
)


def get_public_ip() -> str:
"""Get the public IP address of the instrument."""
try:
return requests.get("https://api.ipify.org", timeout=1).text
except requests.exceptions.Timeout:
return ""


def get_mac() -> str:
"""Get the MAC address of the instrument."""
return f"{uuid.getnode():012x}"


async def service_discovery(register: bool):
"""
Register the service with the discovery service.
Inform the service discovery node about the service,
pass it our mac address, IP address and port
"""
mac = get_mac()
ip = get_public_ip() # pylint: disable=invalid-name
port = 9191

print(f"MAC address: {mac}, IP address: {ip}, Service port: {port} -> {register}")

# Call the service discovery node and register the service.
# response = requests.post(
# "http://service_discovery_node_endpoint",
# json={
# "mac_address": mac,
# "ip_address": ip,
# "port": port,
# },
# )

# Check if the service was registered successfully.
# if response.status_code == 200:
# print("Service registered successfully.")
# else:
# print(f"Failed to register service. Status code: {response.status_code}")


@app.on_event("startup")
async def startup_event():
"""Startup event for the FastAPI server."""
logger.info("Starting the API server...") # pragma: no cover
db_url = get_db_url(__platform__) # pragma: no cover
logger.info("Using database: '%s'", db_url) # pragma: no cover
await service_discovery(True)
await db_init(db_url)


Expand All @@ -140,7 +95,7 @@ async def shutdown_event():
@app.get("/health", status_code=200, tags=["Utilities"])
def get_health_status():
"""Health check."""
return { "status": InstrumentHealth.OPERATIONAL }
return {"status": InstrumentHealth.OPERATIONAL}


@app.post(
Expand All @@ -150,11 +105,13 @@ def get_health_status():
tags=["Synthesis Queue"],
)
async def add_a_task_to_synthesis_queue(
sequence: str, category: SeqCategory = SeqCategory.DNA, rank: int = 0
sequence: NucleotidesModel, solid_support: SolidSupport, rank: int = 0
):
"""Add a synthesis task to the synthesis task queue by providing a sequence and its category."""
try:
return await SynthesisTask.create(sequence=sequence, category=category, rank=rank)
task = await SynthesisTask.create(sequence=sequence, solid_support=solid_support, rank=rank)
# return { "status": task.status, "created_at": task.created_at }
return task
except ValidationError as exc:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(exc)) from exc
finally:
Expand All @@ -178,7 +135,7 @@ async def get_all_tasks_in_synthesis_queue(filter_by: Optional[TaskStatus] = Non
@app.delete("/queue", status_code=status.HTTP_200_OK, tags=["Synthesis Queue"])
async def clear_all_queued_tasks_in_task_queue():
"""Delete all tasks in the QUEUED state."""
return await SynthesisTask.filter(status=TaskStatus.QUEUED).delete()
return await SynthesisTask.filter(status=TaskStatus.WAITING_IN_QUEUE).delete()


@app.get("/queue/{task_id}", response_model=SynthesisTaskModel, tags=["Synthesis Queue"])
Expand All @@ -205,7 +162,7 @@ async def update_a_synthesis_task(
if task is None:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Sequence task not found")

if task.status != TaskStatus.QUEUED:
if task.status != TaskStatus.WAITING_IN_QUEUE:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST, detail="Sequence task not in QUEUED state"
)
Expand Down
4 changes: 1 addition & 3 deletions tests/api/test_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,7 @@ async def test_synthesis_queue_validation(db):

@pytest.mark.asyncio
async def test_update_synthesis_queue_status(db):
synthesis_queue = await SynthesisTask.create(
sequence="ATCG", status=TaskStatus.QUEUED, rank=0
)
synthesis_queue = await SynthesisTask.create(sequence="ATCG", status=TaskStatus.QUEUED, rank=0)

await synthesis_queue.all().update(status=TaskStatus.IN_PROGRESS)
synthesis_queue_updated = await SynthesisTask.get(id=synthesis_queue.id)
Expand Down
2 changes: 1 addition & 1 deletion tests/api/test_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
def test_get_health():
response = client.get("/health")
assert response.status_code == 200
assert response.json() == {"status": "ok"}
assert response.json() == {"status": "Operational"}


def test_add_task_to_synthesis_queue(db):
Expand Down

0 comments on commit 6437b4a

Please sign in to comment.