Skip to content

Commit

Permalink
improved processing, need testing and further refactoring...
Browse files Browse the repository at this point in the history
  • Loading branch information
mgineer85 committed Nov 3, 2024
1 parent aba3b2d commit 08d3cb7
Show file tree
Hide file tree
Showing 13 changed files with 447 additions and 120 deletions.
2 changes: 1 addition & 1 deletion node/app_minimal.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ def main(args=None, run: bool = True):
container.start()

_shutterbutton_in = Button(pin=shutter_pin, bounce_time=0.04)
_shutterbutton_in.when_pressed = container.synced_acquisition_service.set_trigger_out
_shutterbutton_in.when_pressed = container.synced_acquisition_service.trigger_execute_job
logger.info(f"external trigger button on {_shutterbutton_in}")

try:
Expand Down
2 changes: 2 additions & 0 deletions node/container.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

from .services.baseservice import BaseService
from .services.config import appconfig
from .services.jobservice import JobService
from .services.loggingservice import LoggingService
from .services.sync_acquisition_service import SyncedAcquisitionService

Expand All @@ -13,6 +14,7 @@ class Container:
# container
logging_service = LoggingService(config=appconfig.logging)
synced_acquisition_service = SyncedAcquisitionService(config=appconfig.syncedacquisition)
job_service = JobService(config=appconfig.syncedacquisition, synced_acquisition_service=synced_acquisition_service)

def _service_list(self) -> list[BaseService]:
# list used to start/stop services. List sorted in the order of definition.
Expand Down
8 changes: 5 additions & 3 deletions node/routers/api/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,15 @@

from fastapi import APIRouter

from . import acquisition, system
from . import camera, job, system

__all__ = [
"acquisition", # refers to the 'acquisition.py' file
"camera", # refers to the 'acquisition.py' file
"job", # refers to the 'acquisition.py' file
"system",
]

router = APIRouter(prefix="/api")
router.include_router(acquisition.router)
router.include_router(camera.router)
router.include_router(job.router)
router.include_router(system.router)
29 changes: 18 additions & 11 deletions node/routers/api/acquisition.py → node/routers/api/camera.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,23 @@

from fastapi import APIRouter, HTTPException, status
from fastapi.responses import StreamingResponse
from pydantic import BaseModel

from ...container import container

logger = logging.getLogger(__name__)
router = APIRouter(
prefix="/acquisition",
tags=["acquisition"],
prefix="/camera",
tags=["camera"],
)


class CameraConfig(BaseModel):
iso: str | None = None
shutter: str | None = None
# ... TODO. this shall be an object in acquisition service or camera


@router.get("/stream.mjpg")
def video_stream():
"""
Expand All @@ -31,16 +38,16 @@ def video_stream():
raise HTTPException(status.HTTP_500_INTERNAL_SERVER_ERROR, f"preview failed: {exc}") from exc


@router.get("/setup")
def setup_job(job_id, number_captures):
container.synced_acquisition_service.setup_job()
@router.get("/still")
def still_camera():
raise NotImplementedError


@router.get("/trigger")
def trigger_job(job_id):
container.synced_acquisition_service.set_trigger_out()
@router.get("/configure")
def configure_camera(camera_config: CameraConfig):
raise NotImplementedError


@router.get("/results")
def get_results(job_id):
pass
@router.get("/configure/reset")
def reset_camera_config():
raise NotImplementedError
38 changes: 38 additions & 0 deletions node/routers/api/job.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
import logging

from fastapi import APIRouter, HTTPException, status

from ...container import container
from ...services.jobservice import JobItem, JobRequest

logger = logging.getLogger(__name__)
router = APIRouter(
prefix="/job",
tags=["job"],
)


@router.post("/setup")
def setup_job(job_request: JobRequest) -> JobItem:
try:
return container.job_service.setup_job_request(jobrequest=job_request)
except ConnectionRefusedError as exc:
raise HTTPException(status_code=status.HTTP_429_TOO_MANY_REQUESTS, detail=f"Error setting up job: {exc}") from exc


@router.get("/trigger")
def trigger_job():
"""triggers a job that was setup before. this call needs to be sent to primary only and via GPIO the nodes will execute the job."""
container.job_service.trigger_execute_job()


@router.get("/list")
def get_jobs():
"""triggers a job that was setup before. this call needs to be sent to primary only and via GPIO the nodes will execute the job."""
return container.job_service.db_get_list_as_dict()


@router.get("/results/{job_id}")
def get_results(job_id: str) -> JobItem:
raise NotImplementedError
# return container.job_service.get_job_results(job_id=job_id)
57 changes: 49 additions & 8 deletions node/services/backends/cameras/abstractbackend.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,28 @@
import logging
from abc import ABC, abstractmethod
from dataclasses import dataclass
from threading import Barrier, BrokenBarrierError, Condition, Event
from pathlib import Path
from queue import Queue
from threading import Barrier, BrokenBarrierError, Condition

from ....utils.stoppablethread import StoppableThread

logger = logging.getLogger(__name__)


@dataclass
class BackendRequest:
pass
# nothing to align here until today... maybe here we could add later a skip-frame command or something...


@dataclass
class BackendItem:
# request: BackendRequest
filepath: Path = None
# metadata: dict = None


class StreamingOutput(io.BufferedIOBase):
"""Lores data class used for streaming.
Used in hardware accelerated MJPEGEncoder
Expand Down Expand Up @@ -37,11 +54,13 @@ class AbstractCameraBackend(ABC):
def __init__(self):
# declare common abstract props
self._nominal_framerate: int = None
self._capture: Event = None
self._capture_in_progress: bool = None
self._camera_thread: StoppableThread = None
self._align_thread: StoppableThread = None
self._barrier: Barrier = None
self._current_timestampset: TimestampSet = None
self._align_timestampset: TimestampSet = None
self._queue_in: Queue[BackendRequest] = None
self._queue_out: Queue[BackendItem] = None

def __repr__(self):
return f"{self.__class__}"
Expand All @@ -66,22 +85,36 @@ def start(self, nominal_framerate: int = None):

# init common abstract props
self._nominal_framerate = nominal_framerate
self._capture = Event()
self._capture_in_progress = False
self._barrier = Barrier(3, action=self.get_timestamps_to_align)
self._current_timestampset = TimestampSet(None, None)
self._align_timestampset = TimestampSet(None, None)
self._queue_in: Queue[BackendRequest] = Queue()
self._queue_out: Queue[BackendItem] = Queue()

self._camera_thread = StoppableThread(name="_camera_thread", target=self._camera_fun, args=(), daemon=True)
self._camera_thread.start()

self._align_thread = StoppableThread(name="_align_thread", target=self._align_fun, args=(), daemon=True)
self._align_thread.start()

@abstractmethod
def stop(self):
logger.debug(f"{self.__module__} stop called")

if self._align_thread and self._align_thread.is_alive():
self._align_thread.stop()
self._align_thread.join()

if self._camera_thread and self._camera_thread.is_alive():
self._camera_thread.stop()
self._camera_thread.join()

@abstractmethod
def camera_alive(self) -> bool:
pass
camera_alive = self._camera_thread and self._camera_thread.is_alive()
align_alive = self._align_thread and self._align_thread.is_alive()

def do_capture(self, filename: str = None, number_frames: int = 1):
self._capture.set()
return camera_alive and align_alive

def sync_tick(self, timestamp_ns: int):
self._current_timestampset.reference = timestamp_ns
Expand All @@ -101,3 +134,11 @@ def stop_stream(self):
@abstractmethod
def wait_for_lores_image(self):
pass

@abstractmethod
def _camera_fun(self):
pass

@abstractmethod
def _align_fun(self):
pass
71 changes: 27 additions & 44 deletions node/services/backends/cameras/picamera2.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@

from ....utils.stoppablethread import StoppableThread
from ...config.models import ConfigBackendPicamera2
from .abstractbackend import AbstractCameraBackend, StreamingOutput
from .abstractbackend import AbstractCameraBackend, BackendItem, BackendRequest, StreamingOutput

logger = logging.getLogger(__name__)

Expand All @@ -29,12 +29,9 @@ def __init__(self, config: ConfigBackendPicamera2):
# private props
self._picamera2: Picamera2 = None
self._nominal_framerate: float = None
self._camera_thread: StoppableThread = None
self._processing_thread: StoppableThread = None
self._queue_processing: Queue = None
self._queue_processing: Queue[tuple[BackendRequest, object]] = None
self._streaming_output: StreamingOutput = None
self._align_thread: StoppableThread = None
self._camera_timestamp_ns: int = None

logger.info(f"global_camera_info {Picamera2.global_camera_info()}")

Expand All @@ -44,8 +41,7 @@ def start(self, nominal_framerate: int = None):

# initialize private props
self._streaming_output: StreamingOutput = StreamingOutput()
self._queue_processing: Queue = Queue()
self._camera_timestamp_ns: int = None
self._queue_processing: Queue[tuple[BackendRequest, object]] = Queue()

# https://github.com/raspberrypi/picamera2/issues/576
if self._picamera2:
Expand Down Expand Up @@ -96,12 +92,6 @@ def start(self, nominal_framerate: int = None):
self._processing_thread = StoppableThread(name="_processing_thread", target=self._processing_fun, args=(), daemon=True)
self._processing_thread.start()

self._camera_thread = StoppableThread(name="_camera_thread", target=self._camera_fun, args=(), daemon=True)
self._camera_thread.start()

self._align_thread = StoppableThread(name="_align_thread", target=self._align_fun, args=(), daemon=True)
self._align_thread.start()

logger.info(f"camera_config: {self._picamera2.camera_config}")
logger.info(f"camera_controls: {self._picamera2.camera_controls}")
logger.info(f"controls: {self._picamera2.controls}")
Expand All @@ -119,26 +109,17 @@ def stop(self):
self._picamera2.stop()
self._picamera2.close() # need to close camera so it can be used by other processes also (or be started again)

if self._camera_thread and self._camera_thread.is_alive():
self._camera_thread.stop()
self._camera_thread.join()

if self._processing_thread and self._processing_thread.is_alive():
self._processing_thread.stop()
self._processing_thread.join()

if self._align_thread and self._align_thread.is_alive():
self._align_thread.stop()
self._align_thread.join()

logger.debug(f"{self.__module__} stopped")

def camera_alive(self) -> bool:
camera_alive = self._camera_thread and self._camera_thread.is_alive()
super_alive = super().camera_alive()
processing_alive = self._processing_thread and self._processing_thread.is_alive()
align_alive = self._align_thread and self._align_thread.is_alive()

return camera_alive and processing_alive and align_alive
return super_alive and processing_alive

def start_stream(self):
self._picamera2.stop_recording()
Expand Down Expand Up @@ -197,10 +178,6 @@ def _init_autofocus(self):

logger.debug("autofocus set")

@staticmethod
def clamp(n, min_value, max_value):
return max(min_value, min(n, max_value))

def recover(self):
tms = time.time()
try:
Expand All @@ -221,8 +198,8 @@ def _align_fun(self):
self.recover()

while not current_thread().stopped():
if self._capture_in_progress:
adjust_cycle_counter = 0 # keep counter 0 until something is in progress and wait X_CYCLES until adjustment is done afterwards
# if self._capture_in_progress:
# adjust_cycle_counter = 0 # keep counter 0 until something is in progress and wait X_CYCLES until adjustment is done afterwards

try:
self._barrier.wait()
Expand All @@ -233,13 +210,6 @@ def _align_fun(self):

timestamp_delta_ns = self._align_timestampset.camera - self._align_timestampset.reference # in ns

# if abs(timestamp_delta_ns) > (1.1e9 * nominal_frame_duration):
# logger.info("delta big, recovering...")
# adjust_cycle_counter = 0
# self.recover()
# logger.info("finished recover")
# continue

if adjust_cycle_counter >= ADJUST_EVERY_X_CYCLE:
adjust_cycle_counter = 0
adjust_amount_us = -timestamp_delta_ns / 1.0e3
Expand Down Expand Up @@ -272,15 +242,19 @@ def _camera_fun(self):
logger.debug("starting _camera_fun")

while not current_thread().stopped():
if self._capture.is_set():
self._capture.clear()
self._capture_in_progress = True
backendrequest = None

try:
backendrequest = self._queue_in.get_nowait()
except Empty:
pass # no actual job to process...

if backendrequest:
# TODO: check if capable to keep up with the framerate or something get lost? for now we only use 1 frame so it's ok
tms = time.time()
self._queue_processing.put(self._picamera2.capture_buffer("main", wait=2.0))
buffer = self._picamera2.capture_buffer("main", wait=2.0)
self._queue_processing.put((backendrequest, buffer))
logger.info(f"queued up buffer to process image, time taken: {round((time.time() - tms)*1.0e3, 0)}ms")
self._capture_in_progress = False

else:
try:
picam_metadata = self._picamera2.capture_metadata(wait=2.0)
Expand All @@ -298,12 +272,13 @@ def _camera_fun(self):
logger.info("_camera_fun left")

def _processing_fun(self):
# TODO: this might be better in multiprocessing or use some lib that is in c++ releasing the gil during processing...
logger.debug("starting _processing_fun")
buffer_to_proc = None

while not current_thread().stopped():
try:
buffer_to_proc = self._queue_processing.get(block=True, timeout=1.0)
(backendrequest, buffer_to_proc) = self._queue_processing.get(block=True, timeout=1.0)
logger.info("got img off queue, jpg proc start")
except Empty:
continue # just continue but allow .stopped to exit after 1.0 sec latest...
Expand All @@ -319,4 +294,12 @@ def _processing_fun(self):
image.save(filepath.with_suffix(".jpg"), quality=self._config.original_still_quality)
logger.info(f"jpg compression finished, time taken: {round((time.time() - tms)*1.0e3, 0)}ms")

backenditem = BackendItem(
filepath=filepath,
)
self._queue_out.put(backenditem)
logger.info(f"result item put on output queue: {backenditem}")

self._queue_in.task_done()

logger.info("_processing_fun left")
Loading

0 comments on commit 08d3cb7

Please sign in to comment.