From 08d3cb72fe7e16f47075e474a983bf7c3a84b527 Mon Sep 17 00:00:00 2001 From: michael Date: Sun, 3 Nov 2024 10:06:04 +0100 Subject: [PATCH] improved processing, need testing and further refactoring... --- node/app_minimal.py | 2 +- node/container.py | 2 + node/routers/api/__init__.py | 8 +- .../routers/api/{acquisition.py => camera.py} | 29 +-- node/routers/api/job.py | 38 ++++ .../backends/cameras/abstractbackend.py | 57 +++++- node/services/backends/cameras/picamera2.py | 71 +++----- .../backends/cameras/virtualcamera.py | 86 +++++++-- node/services/config/models.py | 4 + node/services/jobservice.py | 171 ++++++++++++++++++ node/services/sync_acquisition_service.py | 95 ++++++---- node/web_spa/index.html | 2 +- pyproject.toml | 2 +- 13 files changed, 447 insertions(+), 120 deletions(-) rename node/routers/api/{acquisition.py => camera.py} (65%) create mode 100644 node/routers/api/job.py create mode 100644 node/services/jobservice.py diff --git a/node/app_minimal.py b/node/app_minimal.py index 44dfbde..fd1e056 100644 --- a/node/app_minimal.py +++ b/node/app_minimal.py @@ -46,7 +46,7 @@ def main(args=None, run: bool = True): container.start() _shutterbutton_in = Button(pin=shutter_pin, bounce_time=0.04) - _shutterbutton_in.when_pressed = container.synced_acquisition_service.set_trigger_out + _shutterbutton_in.when_pressed = container.synced_acquisition_service.trigger_execute_job logger.info(f"external trigger button on {_shutterbutton_in}") try: diff --git a/node/container.py b/node/container.py index 7b7b067..4869d0d 100644 --- a/node/container.py +++ b/node/container.py @@ -2,6 +2,7 @@ from .services.baseservice import BaseService from .services.config import appconfig +from .services.jobservice import JobService from .services.loggingservice import LoggingService from .services.sync_acquisition_service import SyncedAcquisitionService @@ -13,6 +14,7 @@ class Container: # container logging_service = LoggingService(config=appconfig.logging) synced_acquisition_service = SyncedAcquisitionService(config=appconfig.syncedacquisition) + job_service = JobService(config=appconfig.syncedacquisition, synced_acquisition_service=synced_acquisition_service) def _service_list(self) -> list[BaseService]: # list used to start/stop services. List sorted in the order of definition. diff --git a/node/routers/api/__init__.py b/node/routers/api/__init__.py index f5aac17..e90e8cb 100644 --- a/node/routers/api/__init__.py +++ b/node/routers/api/__init__.py @@ -2,13 +2,15 @@ from fastapi import APIRouter -from . import acquisition, system +from . import camera, job, system __all__ = [ - "acquisition", # refers to the 'acquisition.py' file + "camera", # refers to the 'acquisition.py' file + "job", # refers to the 'acquisition.py' file "system", ] router = APIRouter(prefix="/api") -router.include_router(acquisition.router) +router.include_router(camera.router) +router.include_router(job.router) router.include_router(system.router) diff --git a/node/routers/api/acquisition.py b/node/routers/api/camera.py similarity index 65% rename from node/routers/api/acquisition.py rename to node/routers/api/camera.py index b776f1e..fd8c63a 100644 --- a/node/routers/api/acquisition.py +++ b/node/routers/api/camera.py @@ -2,16 +2,23 @@ from fastapi import APIRouter, HTTPException, status from fastapi.responses import StreamingResponse +from pydantic import BaseModel from ...container import container logger = logging.getLogger(__name__) router = APIRouter( - prefix="/acquisition", - tags=["acquisition"], + prefix="/camera", + tags=["camera"], ) +class CameraConfig(BaseModel): + iso: str | None = None + shutter: str | None = None + # ... TODO. this shall be an object in acquisition service or camera + + @router.get("/stream.mjpg") def video_stream(): """ @@ -31,16 +38,16 @@ def video_stream(): raise HTTPException(status.HTTP_500_INTERNAL_SERVER_ERROR, f"preview failed: {exc}") from exc -@router.get("/setup") -def setup_job(job_id, number_captures): - container.synced_acquisition_service.setup_job() +@router.get("/still") +def still_camera(): + raise NotImplementedError -@router.get("/trigger") -def trigger_job(job_id): - container.synced_acquisition_service.set_trigger_out() +@router.get("/configure") +def configure_camera(camera_config: CameraConfig): + raise NotImplementedError -@router.get("/results") -def get_results(job_id): - pass +@router.get("/configure/reset") +def reset_camera_config(): + raise NotImplementedError diff --git a/node/routers/api/job.py b/node/routers/api/job.py new file mode 100644 index 0000000..507112e --- /dev/null +++ b/node/routers/api/job.py @@ -0,0 +1,38 @@ +import logging + +from fastapi import APIRouter, HTTPException, status + +from ...container import container +from ...services.jobservice import JobItem, JobRequest + +logger = logging.getLogger(__name__) +router = APIRouter( + prefix="/job", + tags=["job"], +) + + +@router.post("/setup") +def setup_job(job_request: JobRequest) -> JobItem: + try: + return container.job_service.setup_job_request(jobrequest=job_request) + except ConnectionRefusedError as exc: + raise HTTPException(status_code=status.HTTP_429_TOO_MANY_REQUESTS, detail=f"Error setting up job: {exc}") from exc + + +@router.get("/trigger") +def trigger_job(): + """triggers a job that was setup before. this call needs to be sent to primary only and via GPIO the nodes will execute the job.""" + container.job_service.trigger_execute_job() + + +@router.get("/list") +def get_jobs(): + """triggers a job that was setup before. this call needs to be sent to primary only and via GPIO the nodes will execute the job.""" + return container.job_service.db_get_list_as_dict() + + +@router.get("/results/{job_id}") +def get_results(job_id: str) -> JobItem: + raise NotImplementedError + # return container.job_service.get_job_results(job_id=job_id) diff --git a/node/services/backends/cameras/abstractbackend.py b/node/services/backends/cameras/abstractbackend.py index 8b9e4b1..945130d 100644 --- a/node/services/backends/cameras/abstractbackend.py +++ b/node/services/backends/cameras/abstractbackend.py @@ -2,11 +2,28 @@ import logging from abc import ABC, abstractmethod from dataclasses import dataclass -from threading import Barrier, BrokenBarrierError, Condition, Event +from pathlib import Path +from queue import Queue +from threading import Barrier, BrokenBarrierError, Condition + +from ....utils.stoppablethread import StoppableThread logger = logging.getLogger(__name__) +@dataclass +class BackendRequest: + pass + # nothing to align here until today... maybe here we could add later a skip-frame command or something... + + +@dataclass +class BackendItem: + # request: BackendRequest + filepath: Path = None + # metadata: dict = None + + class StreamingOutput(io.BufferedIOBase): """Lores data class used for streaming. Used in hardware accelerated MJPEGEncoder @@ -37,11 +54,13 @@ class AbstractCameraBackend(ABC): def __init__(self): # declare common abstract props self._nominal_framerate: int = None - self._capture: Event = None - self._capture_in_progress: bool = None + self._camera_thread: StoppableThread = None + self._align_thread: StoppableThread = None self._barrier: Barrier = None self._current_timestampset: TimestampSet = None self._align_timestampset: TimestampSet = None + self._queue_in: Queue[BackendRequest] = None + self._queue_out: Queue[BackendItem] = None def __repr__(self): return f"{self.__class__}" @@ -66,22 +85,36 @@ def start(self, nominal_framerate: int = None): # init common abstract props self._nominal_framerate = nominal_framerate - self._capture = Event() - self._capture_in_progress = False self._barrier = Barrier(3, action=self.get_timestamps_to_align) self._current_timestampset = TimestampSet(None, None) self._align_timestampset = TimestampSet(None, None) + self._queue_in: Queue[BackendRequest] = Queue() + self._queue_out: Queue[BackendItem] = Queue() + + self._camera_thread = StoppableThread(name="_camera_thread", target=self._camera_fun, args=(), daemon=True) + self._camera_thread.start() + + self._align_thread = StoppableThread(name="_align_thread", target=self._align_fun, args=(), daemon=True) + self._align_thread.start() @abstractmethod def stop(self): logger.debug(f"{self.__module__} stop called") + if self._align_thread and self._align_thread.is_alive(): + self._align_thread.stop() + self._align_thread.join() + + if self._camera_thread and self._camera_thread.is_alive(): + self._camera_thread.stop() + self._camera_thread.join() + @abstractmethod def camera_alive(self) -> bool: - pass + camera_alive = self._camera_thread and self._camera_thread.is_alive() + align_alive = self._align_thread and self._align_thread.is_alive() - def do_capture(self, filename: str = None, number_frames: int = 1): - self._capture.set() + return camera_alive and align_alive def sync_tick(self, timestamp_ns: int): self._current_timestampset.reference = timestamp_ns @@ -101,3 +134,11 @@ def stop_stream(self): @abstractmethod def wait_for_lores_image(self): pass + + @abstractmethod + def _camera_fun(self): + pass + + @abstractmethod + def _align_fun(self): + pass diff --git a/node/services/backends/cameras/picamera2.py b/node/services/backends/cameras/picamera2.py index 91c1dcd..04e33ec 100644 --- a/node/services/backends/cameras/picamera2.py +++ b/node/services/backends/cameras/picamera2.py @@ -12,7 +12,7 @@ from ....utils.stoppablethread import StoppableThread from ...config.models import ConfigBackendPicamera2 -from .abstractbackend import AbstractCameraBackend, StreamingOutput +from .abstractbackend import AbstractCameraBackend, BackendItem, BackendRequest, StreamingOutput logger = logging.getLogger(__name__) @@ -29,12 +29,9 @@ def __init__(self, config: ConfigBackendPicamera2): # private props self._picamera2: Picamera2 = None self._nominal_framerate: float = None - self._camera_thread: StoppableThread = None self._processing_thread: StoppableThread = None - self._queue_processing: Queue = None + self._queue_processing: Queue[tuple[BackendRequest, object]] = None self._streaming_output: StreamingOutput = None - self._align_thread: StoppableThread = None - self._camera_timestamp_ns: int = None logger.info(f"global_camera_info {Picamera2.global_camera_info()}") @@ -44,8 +41,7 @@ def start(self, nominal_framerate: int = None): # initialize private props self._streaming_output: StreamingOutput = StreamingOutput() - self._queue_processing: Queue = Queue() - self._camera_timestamp_ns: int = None + self._queue_processing: Queue[tuple[BackendRequest, object]] = Queue() # https://github.com/raspberrypi/picamera2/issues/576 if self._picamera2: @@ -96,12 +92,6 @@ def start(self, nominal_framerate: int = None): self._processing_thread = StoppableThread(name="_processing_thread", target=self._processing_fun, args=(), daemon=True) self._processing_thread.start() - self._camera_thread = StoppableThread(name="_camera_thread", target=self._camera_fun, args=(), daemon=True) - self._camera_thread.start() - - self._align_thread = StoppableThread(name="_align_thread", target=self._align_fun, args=(), daemon=True) - self._align_thread.start() - logger.info(f"camera_config: {self._picamera2.camera_config}") logger.info(f"camera_controls: {self._picamera2.camera_controls}") logger.info(f"controls: {self._picamera2.controls}") @@ -119,26 +109,17 @@ def stop(self): self._picamera2.stop() self._picamera2.close() # need to close camera so it can be used by other processes also (or be started again) - if self._camera_thread and self._camera_thread.is_alive(): - self._camera_thread.stop() - self._camera_thread.join() - if self._processing_thread and self._processing_thread.is_alive(): self._processing_thread.stop() self._processing_thread.join() - if self._align_thread and self._align_thread.is_alive(): - self._align_thread.stop() - self._align_thread.join() - logger.debug(f"{self.__module__} stopped") def camera_alive(self) -> bool: - camera_alive = self._camera_thread and self._camera_thread.is_alive() + super_alive = super().camera_alive() processing_alive = self._processing_thread and self._processing_thread.is_alive() - align_alive = self._align_thread and self._align_thread.is_alive() - return camera_alive and processing_alive and align_alive + return super_alive and processing_alive def start_stream(self): self._picamera2.stop_recording() @@ -197,10 +178,6 @@ def _init_autofocus(self): logger.debug("autofocus set") - @staticmethod - def clamp(n, min_value, max_value): - return max(min_value, min(n, max_value)) - def recover(self): tms = time.time() try: @@ -221,8 +198,8 @@ def _align_fun(self): self.recover() while not current_thread().stopped(): - if self._capture_in_progress: - adjust_cycle_counter = 0 # keep counter 0 until something is in progress and wait X_CYCLES until adjustment is done afterwards + # if self._capture_in_progress: + # adjust_cycle_counter = 0 # keep counter 0 until something is in progress and wait X_CYCLES until adjustment is done afterwards try: self._barrier.wait() @@ -233,13 +210,6 @@ def _align_fun(self): timestamp_delta_ns = self._align_timestampset.camera - self._align_timestampset.reference # in ns - # if abs(timestamp_delta_ns) > (1.1e9 * nominal_frame_duration): - # logger.info("delta big, recovering...") - # adjust_cycle_counter = 0 - # self.recover() - # logger.info("finished recover") - # continue - if adjust_cycle_counter >= ADJUST_EVERY_X_CYCLE: adjust_cycle_counter = 0 adjust_amount_us = -timestamp_delta_ns / 1.0e3 @@ -272,15 +242,19 @@ def _camera_fun(self): logger.debug("starting _camera_fun") while not current_thread().stopped(): - if self._capture.is_set(): - self._capture.clear() - self._capture_in_progress = True + backendrequest = None + try: + backendrequest = self._queue_in.get_nowait() + except Empty: + pass # no actual job to process... + + if backendrequest: + # TODO: check if capable to keep up with the framerate or something get lost? for now we only use 1 frame so it's ok tms = time.time() - self._queue_processing.put(self._picamera2.capture_buffer("main", wait=2.0)) + buffer = self._picamera2.capture_buffer("main", wait=2.0) + self._queue_processing.put((backendrequest, buffer)) logger.info(f"queued up buffer to process image, time taken: {round((time.time() - tms)*1.0e3, 0)}ms") - self._capture_in_progress = False - else: try: picam_metadata = self._picamera2.capture_metadata(wait=2.0) @@ -298,12 +272,13 @@ def _camera_fun(self): logger.info("_camera_fun left") def _processing_fun(self): + # TODO: this might be better in multiprocessing or use some lib that is in c++ releasing the gil during processing... logger.debug("starting _processing_fun") buffer_to_proc = None while not current_thread().stopped(): try: - buffer_to_proc = self._queue_processing.get(block=True, timeout=1.0) + (backendrequest, buffer_to_proc) = self._queue_processing.get(block=True, timeout=1.0) logger.info("got img off queue, jpg proc start") except Empty: continue # just continue but allow .stopped to exit after 1.0 sec latest... @@ -319,4 +294,12 @@ def _processing_fun(self): image.save(filepath.with_suffix(".jpg"), quality=self._config.original_still_quality) logger.info(f"jpg compression finished, time taken: {round((time.time() - tms)*1.0e3, 0)}ms") + backenditem = BackendItem( + filepath=filepath, + ) + self._queue_out.put(backenditem) + logger.info(f"result item put on output queue: {backenditem}") + + self._queue_in.task_done() + logger.info("_processing_fun left") diff --git a/node/services/backends/cameras/virtualcamera.py b/node/services/backends/cameras/virtualcamera.py index bf5f659..990101f 100644 --- a/node/services/backends/cameras/virtualcamera.py +++ b/node/services/backends/cameras/virtualcamera.py @@ -1,12 +1,16 @@ import io import logging import time +from datetime import datetime +from pathlib import Path +from queue import Empty +from threading import BrokenBarrierError, current_thread import numpy from PIL import Image from ...config.models import ConfigBackendVirtualCamera -from .abstractbackend import AbstractCameraBackend +from .abstractbackend import AbstractCameraBackend, BackendItem logger = logging.getLogger(__name__) @@ -18,10 +22,10 @@ def __init__(self, config: ConfigBackendVirtualCamera): self._config = config # declarations - self._tick_tock_counter: int = None + # # initializiation - self._tick_tock_counter = 0 + # def start(self, nominal_framerate: int = None): super().start(nominal_framerate=nominal_framerate) @@ -29,8 +33,10 @@ def start(self, nominal_framerate: int = None): def stop(self): super().stop() - def camera_alive(self): - return True + def camera_alive(self) -> bool: + super_alive = super().camera_alive() + + return super_alive def start_stream(self): pass @@ -38,21 +44,69 @@ def start_stream(self): def stop_stream(self): pass - def wait_for_lores_image(self): - time.sleep(1.0 / self._nominal_framerate) - + def _produce_dummy_image(self): byte_io = io.BytesIO() - imarray = numpy.random.rand(200, 200, 3) * 255 + imarray = numpy.random.rand(250, 250, 3) * 255 random_image = Image.fromarray(imarray.astype("uint8"), "RGB") random_image.save(byte_io, format="JPEG", quality=50) return byte_io.getbuffer() - def do_capture(self, filename: str = None, number_frames: int = 1): - raise NotImplementedError("not yet supported by virtual camera backend") + def wait_for_lores_image(self): + time.sleep(1.0 / self._nominal_framerate) + + return self._produce_dummy_image() + + def _align_fun(self): + logger.debug("starting _align_fun") + + while not current_thread().stopped(): + try: + self._barrier.wait() + except BrokenBarrierError: + logger.debug("sync barrier broke") + break + + # simulate some processing and lower cpu use + time.sleep(0.01) + + logger.info("_align_fun left") + + def _camera_fun(self): + logger.debug("starting _camera_fun") + + while not current_thread().stopped(): + backendrequest = None + + try: + backendrequest = self._queue_in.get_nowait() + except Empty: + pass # no actual job to process... + + if backendrequest: + folder = Path("./tmp/") + filename = Path(f"img_{datetime.now().astimezone().strftime('%Y%m%d-%H%M%S-%f')}").with_suffix(".jpg") + filepath = folder / filename + logger.info(f"{filepath=}") + + with open(filepath, "wb") as f: + f.write(self.wait_for_lores_image()) + + backenditem = BackendItem( + filepath=filepath, + ) + self._queue_out.put(backenditem) + logger.info(f"result item put on output queue: {backenditem}") + + self._queue_in.task_done() + else: + time.sleep(1.0 / self._nominal_framerate) + self._current_timestampset.camera = time.monotonic_ns() + + try: + self._barrier.wait() + except BrokenBarrierError: + logger.debug("sync barrier broke") + break - def sync_tick(self, timestamp_ns: int): - self._tick_tock_counter += 1 - if self._tick_tock_counter > 10: - self._tick_tock_counter = 0 - logger.debug("tick") + logger.info("_camera_fun left") diff --git a/node/services/config/models.py b/node/services/config/models.py index caa9f9b..50e00a1 100644 --- a/node/services/config/models.py +++ b/node/services/config/models.py @@ -68,3 +68,7 @@ class ConfigSyncedAcquisition(BaseModel): camera_backends: GroupCameraBackend = Field(default=GroupCameraBackend()) io_backends: GroupIoBackend = Field(default=GroupIoBackend()) + + +class ConfigJob(BaseModel): + pass diff --git a/node/services/jobservice.py b/node/services/jobservice.py new file mode 100644 index 0000000..d723c6e --- /dev/null +++ b/node/services/jobservice.py @@ -0,0 +1,171 @@ +import logging +import os +import uuid +from dataclasses import dataclass, field +from pathlib import Path +from queue import Empty +from threading import current_thread + +from ..utils.stoppablethread import StoppableThread +from .baseservice import BaseService +from .config.models import ConfigJob +from .sync_acquisition_service import AcqRequest, SyncedAcquisitionService + +logger = logging.getLogger(__name__) + +DATA_PATH = Path("./media") +# as from image source +PATH_ORIGINAL = DATA_PATH / "original" + +print(DATA_PATH) +print(PATH_ORIGINAL) + + +@dataclass +class JobRequest: + number_captures: int = 1 + + +@dataclass +class JobItem: + request: JobRequest + + id: uuid.UUID = field(default_factory=uuid.uuid4) + # urls: list[str] = field(default_factory=list) + filepaths: list[Path] = field(default_factory=list) + + @property + def is_finished(self) -> bool: + return self.request.number_captures == len(self.filepaths) # if more this is also considered as error! + + def asdict(self) -> dict: + out = { + prop: getattr(self, prop) + for prop in dir(self) + if ( + not prop.startswith("_") # no privates + and not callable(getattr(__class__, prop, None)) # no callables + and not isinstance(getattr(self, prop), Path) # no path instances (not json.serializable) + ) + } + return out + + +class JobService(BaseService): + def __init__(self, config: ConfigJob, synced_acquisition_service: SyncedAcquisitionService): + super().__init__() + + # init the arguments + self._config: ConfigJob = config + self._synced_acquisition_service: SyncedAcquisitionService = synced_acquisition_service + + # declare private props + self._db_jobs: list[JobItem] = [] + self._sync_thread: StoppableThread = None + self._current_job: JobItem = None + + # ensure data directories exist + os.makedirs(f"{PATH_ORIGINAL}", exist_ok=True) + + def start(self): + super().start() + + self._jobprocessor_thread = StoppableThread(name="_jobprocessor_thread", target=self._jobprocessor_fun, args=(), daemon=True) + self._jobprocessor_thread.start() + + logger.debug(f"{self.__module__} started") + + def stop(self): + super().stop() + + if self._jobprocessor_thread and self._jobprocessor_thread.is_alive(): + self._jobprocessor_thread.stop() + self._jobprocessor_thread.join() + + logger.debug(f"{self.__module__} stopped") + + def db_add_jobitem(self, job: JobItem): + self._db_jobs.insert(0, job) # insert at first position (prepend) + + def db_get_jobitem(self, id: uuid): + return self._db_jobs[-1] + + def db_update_jobitem(self, updated_item: JobItem): + for idx, item in enumerate(self._db_jobs): + if updated_item == item: + self._db_jobs[idx] = updated_item + + return self._db_jobs[idx] + + def db_del_jobitem(self, job: JobItem): + self._db_jobs.remove(job) + + def db_clear(self): + self._db_jobs.clear() + + def db_get_list_as_dict(self) -> list: + return [job.asdict() for job in self._db_jobs] + + def db_get_list(self) -> list[JobItem]: + return [job for job in self._db_jobs] + + def db_get_jobitem_by_id(self, job_id: str): + if not isinstance(job_id, str): + raise RuntimeError("job_id is wrong type") + + # https://stackoverflow.com/a/7125547 + job = next((x for x in self._db_jobs if x.id == job_id), None) + + if job is None: + logger.error(f"image {job_id} not found!") + raise FileNotFoundError(f"image {job_id} not found!") + + return job + + @property + def db_length(self) -> int: + return len(self._db_jobs) + + def setup_job_request(self, jobrequest: JobRequest) -> JobItem: + if self._current_job: + # if self._synced_acquisition_service._queue_in.qsize() > 0: + raise ConnectionRefusedError("there is already an unprocessed job! reset first to queue a new job or process it") + + self._current_job = JobItem(request=jobrequest) + self.db_add_jobitem(self._current_job) + # try to put job to queue for processing. + # add job to db and also put it into the queue to process + for i in range(self._current_job.request.number_captures): + acqrequest = AcqRequest(seq_no=i) + self._synced_acquisition_service._queue_in.put(acqrequest) + + return self._current_job + + def trigger_execute_job(self): + # TODO: all this should run only on primary device! it's not validated, the connector needs to ensure to call the right device currently. + # maybe config can be changed in future and so also the _tirgger_out_thread is not started on secondary nodes. + self._synced_acquisition_service.trigger_execute_job() + + def _jobprocessor_fun(self): + logger.info("_jobprocessor_fun started") + + while not current_thread().stopped(): + self._synced_acquisition_service._queue_in.join() + + try: + # active waiting! + acqitem = self._synced_acquisition_service._queue_out.get(block=True, timeout=1) + if not self._current_job: + # job ran just by trigger on standalone basis. # will be removed later once app is separated properly. + continue + self._current_job.filepaths.append(acqitem.filepath) + except Empty: + continue + + # update jobitem: + logger.info(self._current_job) + self.db_update_jobitem(self._current_job) + + self._current_job = None + + logger.info("_jobprocessor_fun left") diff --git a/node/services/sync_acquisition_service.py b/node/services/sync_acquisition_service.py index 3b3bfc3..a840ea1 100644 --- a/node/services/sync_acquisition_service.py +++ b/node/services/sync_acquisition_service.py @@ -1,14 +1,13 @@ import logging import time -from dataclasses import dataclass, field -from datetime import datetime +from dataclasses import dataclass from importlib import import_module from pathlib import Path -from queue import Full, Queue +from queue import Empty, Queue from threading import Event, current_thread from ..utils.stoppablethread import StoppableThread -from .backends.cameras.abstractbackend import AbstractCameraBackend +from .backends.cameras.abstractbackend import AbstractCameraBackend, BackendItem, BackendRequest from .backends.io.abstractbackend import AbstractIoBackend from .baseservice import BaseService from .config.models import ConfigSyncedAcquisition @@ -17,15 +16,16 @@ @dataclass -class CaptureJob: - id: str = field(default_factory=datetime.now) - number_captures: int = 1 +class AcqRequest: + seq_no: int + # nothing to align here until today... @dataclass -class CaptureJobResult: - id: str = None - filenames: list[Path] = field(default_factory=list) +class AcquisitionItem: + # request: AcqRequest + # backenditem: BackendItem + filepath: Path class SyncedAcquisitionService(BaseService): @@ -43,14 +43,14 @@ def __init__(self, config: ConfigSyncedAcquisition): self._trigger_in_thread: StoppableThread = None self._trigger_out_thread: StoppableThread = None self._supervisor_thread: StoppableThread = None - self._job: CaptureJob = None + self._flag_trigger_out: Event = None self._device_initialized_once: bool = False - self._queue_job: Queue = None # initialize private properties. self._flag_trigger_out: Event = Event() - self._queue_job: Queue = Queue(maxsize=1) + self._queue_in: Queue[AcqRequest] = Queue() + self._queue_out: Queue[AcquisitionItem] = Queue() def start(self): super().start() @@ -120,13 +120,9 @@ def gen_stream(self): yield (b"--frame\r\n" b"Content-Type: image/jpeg\r\n\r\n" + output_jpeg_bytes + b"\r\n\r\n") - def setup_job(self, job: CaptureJob): - try: - self._queue_job.put_nowait(job) - except Full as exc: - raise RuntimeError("there is already an unprocessed job! reset first to queue a new job or process it") from exc - - def set_trigger_out(self): + def trigger_execute_job(self): + # TODO: all this should run only on primary device! it's not validated, the connector needs to ensure to call the right device currently. + # maybe config can be changed in future and so also the _tirgger_out_thread is not started on secondary nodes. self._flag_trigger_out.set() def _device_start(self, derived_fps: int): @@ -244,25 +240,54 @@ def _sync_fun(self): def _trigger_in_fun(self): while not current_thread().stopped(): if self._gpio_backend._trigger_in_flag.wait(timeout=1.0): - # first clear to avoid endless loops - self._gpio_backend._trigger_in_flag.clear() + self._gpio_backend._trigger_in_flag.clear() # first clear to avoid endless loops logger.info("trigger_in received to start processing job") - # useful if mobile camera is without any interconnection to a concentrator that could setup a job - if self._config.allow_standalone_job: - logger.info("using default capture job") - self._job = CaptureJob() - if self._job: + # this is implementation for wigglecam_minimal to allow working without external job setup. + if self._queue_in.empty() and self._config.allow_standalone_job: + # useful if mobile camera is without any interconnection to a concentrator that could setup a job + self._queue_in.put(AcqRequest(seq_no=0)) + logger.info("default job was added to the input queue") + + # send down to backend the job in input queue + # the jobs have just to be in the queue, the backend is taking care about the correct timing - + # it might fail if it can not catch up with the framerate + while not current_thread().stopped(): + try: + acqrequest = self._queue_in.get_nowait() + logger.info(f"got acquisition request off the queue: {acqrequest}, passing to capture backend.") + backendrequest = BackendRequest() + self._camera_backend._queue_in.put(backendrequest) + except Empty: + logger.info("all capture jobs sent to backend...") + break # leave inner processing loop, continue listen to trigger in outer. + + # get back the jobs one by one + # TODO: maybe we don't need to wait later for join... + logger.info("waiting for job to finish") + self._camera_backend._queue_in.join() + logger.info("ok, continue") + + while not current_thread().stopped(): try: - self._camera_backend.do_capture(self._job.id, self._job.number_captures) - except Exception as exc: - logger.exception(exc) - logger.critical(f"error during capture: {exc}") - finally: - self._job = None - else: - logger.warning("capture request ignored because no job set!") + backenditem: BackendItem = self._camera_backend._queue_out.get_nowait() + acquisitionitem = AcquisitionItem( + filepath=backenditem.filepath, + ) + self._queue_out.put(acquisitionitem) + except Empty: + logger.info("all capture jobs received from backend...") + break # leave inner processing loop, continue listen to trigger in outer. + except TimeoutError: + logger.info("timed out waiting for job to finish :(") + break + + logger.info("finished queue_acq_input processing") + self._queue_in.task_done() + + logger.info("trigger_in finished, waiting for next job") + else: pass # flag not set, continue diff --git a/node/web_spa/index.html b/node/web_spa/index.html index 4b9775a..208db2c 100644 --- a/node/web_spa/index.html +++ b/node/web_spa/index.html @@ -14,7 +14,7 @@

Wigglecam-Api Frontend

The api is just a backend with no actual frontend. This is just a collection of links, that might be helpful:

diff --git a/pyproject.toml b/pyproject.toml index 7f89012..c489736 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -38,7 +38,7 @@ dependencies = [ "pillow>=10.0.0", "fastapi", "uvicorn", - "gpiod>=2.2.1", + "gpiod>=2.2.1; platform_system == 'Linux'", "opencv-python-headless", ]