From e0b231092abdda5f079a29649ed002a27661262f Mon Sep 17 00:00:00 2001 From: Xingyao Wang Date: Mon, 16 Dec 2024 14:17:57 -0500 Subject: [PATCH] feat: support directly stream logs from container to stdout in debug mode (#5408) --- .../impl/eventstream/eventstream_runtime.py | 105 ++---------------- openhands/runtime/impl/modal/modal_runtime.py | 53 +++++---- .../runtime/impl/runloop/runloop_runtime.py | 82 +++++--------- openhands/runtime/utils/log_streamer.py | 51 +++++++++ 4 files changed, 121 insertions(+), 170 deletions(-) create mode 100644 openhands/runtime/utils/log_streamer.py diff --git a/openhands/runtime/impl/eventstream/eventstream_runtime.py b/openhands/runtime/impl/eventstream/eventstream_runtime.py index 2dc99aa2aa36..becff94fb12b 100644 --- a/openhands/runtime/impl/eventstream/eventstream_runtime.py +++ b/openhands/runtime/impl/eventstream/eventstream_runtime.py @@ -43,6 +43,7 @@ from openhands.runtime.impl.eventstream.containers import remove_all_containers from openhands.runtime.plugins import PluginRequirement from openhands.runtime.utils import find_available_tcp_port +from openhands.runtime.utils.log_streamer import LogStreamer from openhands.runtime.utils.request import send_request from openhands.runtime.utils.runtime_build import build_runtime_image from openhands.utils.async_utils import call_sync_from_async @@ -58,68 +59,6 @@ def remove_all_runtime_containers(): atexit.register(remove_all_runtime_containers) -class LogBuffer: - """Synchronous buffer for Docker container logs. - - This class provides a thread-safe way to collect, store, and retrieve logs - from a Docker container. It uses a list to store log lines and provides methods - for appending, retrieving, and clearing logs. - """ - - def __init__(self, container: docker.models.containers.Container, logFn: Callable): - self.init_msg = 'Runtime client initialized.' - - self.buffer: list[str] = [] - self.lock = threading.Lock() - self._stop_event = threading.Event() - self.log_generator = container.logs(stream=True, follow=True) - self.log_stream_thread = threading.Thread(target=self.stream_logs) - self.log_stream_thread.daemon = True - self.log_stream_thread.start() - self.log = logFn - - def append(self, log_line: str): - with self.lock: - self.buffer.append(log_line) - - def get_and_clear(self) -> list[str]: - with self.lock: - logs = list(self.buffer) - self.buffer.clear() - return logs - - def stream_logs(self): - """Stream logs from the Docker container in a separate thread. - - This method runs in its own thread to handle the blocking - operation of reading log lines from the Docker SDK's synchronous generator. - """ - try: - for log_line in self.log_generator: - if self._stop_event.is_set(): - break - if log_line: - decoded_line = log_line.decode('utf-8').rstrip() - self.append(decoded_line) - except Exception as e: - self.log('error', f'Error streaming docker logs: {e}') - - def __del__(self): - if self.log_stream_thread.is_alive(): - self.log( - 'warn', - "LogBuffer was not properly closed. Use 'log_buffer.close()' for clean shutdown.", - ) - self.close(timeout=5) - - def close(self, timeout: float = 5.0): - self._stop_event.set() - self.log_stream_thread.join(timeout) - # Close the log generator to release the file descriptor - if hasattr(self.log_generator, 'close'): - self.log_generator.close() - - class EventStreamRuntime(Runtime): """This runtime will subscribe the event stream. When receive an event, it will send the event to runtime-client which run inside the docker environment. @@ -186,7 +125,7 @@ def __init__( self.runtime_builder = DockerRuntimeBuilder(self.docker_client) # Buffer for container logs - self.log_buffer: LogBuffer | None = None + self.log_streamer: LogStreamer | None = None self.init_base_runtime( config, @@ -241,7 +180,7 @@ async def connect(self): f'Container started: {self.container_name}. VSCode URL: {self.vscode_url}', ) - self.log_buffer = LogBuffer(self.container, self.log) + self.log_streamer = LogStreamer(self.container, self.log) if not self.attach_to_existing: self.log('info', f'Waiting for client to become ready at {self.api_url}...') @@ -407,27 +346,6 @@ def _attach_to_container(self): f'attached to container: {self.container_name} {self._container_port} {self.api_url}', ) - def _refresh_logs(self): - self.log('debug', 'Getting container logs...') - - assert ( - self.log_buffer is not None - ), 'Log buffer is expected to be initialized when container is started' - - logs = self.log_buffer.get_and_clear() - if logs: - formatted_logs = '\n'.join([f' |{log}' for log in logs]) - self.log( - 'debug', - '\n' - + '-' * 35 - + 'Container logs:' - + '-' * 35 - + f'\n{formatted_logs}' - + '\n' - + '-' * 80, - ) - @tenacity.retry( stop=tenacity.stop_after_delay(120) | stop_if_should_exit(), retry=tenacity.retry_if_exception_type( @@ -446,8 +364,7 @@ def _wait_until_alive(self): except docker.errors.NotFound: raise RuntimeNotFoundError(f'Container {self.container_name} not found.') - self._refresh_logs() - if not self.log_buffer: + if not self.log_streamer: raise RuntimeError('Runtime client is not ready.') with send_request( @@ -464,8 +381,8 @@ def close(self, rm_all_containers: bool | None = None): Parameters: - rm_all_containers (bool): Whether to remove all containers with the 'openhands-sandbox-' prefix """ - if self.log_buffer: - self.log_buffer.close() + if self.log_streamer: + self.log_streamer.close() if self.session: self.session.close() @@ -513,8 +430,6 @@ def run_action(self, action: Action) -> Observation: 'Action has been rejected by the user! Waiting for further user input.' ) - self._refresh_logs() - assert action.timeout is not None try: @@ -533,7 +448,7 @@ def run_action(self, action: Action) -> Observation: raise RuntimeError( f'Runtime failed to return execute_action before the requested timeout of {action.timeout}s' ) - self._refresh_logs() + return obs def run(self, action: CmdRunAction) -> Observation: @@ -564,7 +479,6 @@ def copy_to( if not os.path.exists(host_src): raise FileNotFoundError(f'Source file {host_src} does not exist') - self._refresh_logs() try: if recursive: # For recursive copy, create a zip file @@ -609,14 +523,13 @@ def copy_to( self.log( 'debug', f'Copy completed: host:{host_src} -> runtime:{sandbox_dest}' ) - self._refresh_logs() def list_files(self, path: str | None = None) -> list[str]: """List files in the sandbox. If path is None, list files in the sandbox's initial working directory (e.g., /workspace). """ - self._refresh_logs() + try: data = {} if path is not None: @@ -637,7 +550,7 @@ def list_files(self, path: str | None = None) -> list[str]: def copy_from(self, path: str) -> Path: """Zip all files in the sandbox and return as a stream of bytes.""" - self._refresh_logs() + try: params = {'path': path} with send_request( diff --git a/openhands/runtime/impl/modal/modal_runtime.py b/openhands/runtime/impl/modal/modal_runtime.py index fb036a6d72be..026e7c0b53ef 100644 --- a/openhands/runtime/impl/modal/modal_runtime.py +++ b/openhands/runtime/impl/modal/modal_runtime.py @@ -12,7 +12,7 @@ from openhands.events import EventStream from openhands.runtime.impl.eventstream.eventstream_runtime import ( EventStreamRuntime, - LogBuffer, + LogStreamer, ) from openhands.runtime.plugins import PluginRequirement from openhands.runtime.utils.command import get_remote_startup_command @@ -32,24 +32,38 @@ def bytes_shim(string_generator) -> Generator[bytes, None, None]: yield line.encode('utf-8') -class ModalLogBuffer(LogBuffer): - """Synchronous buffer for Modal sandbox logs. +class ModalLogStreamer(LogStreamer): + """Streams Modal sandbox logs to stdout. - This class provides a thread-safe way to collect, store, and retrieve logs - from a Modal sandbox. It uses a list to store log lines and provides methods - for appending, retrieving, and clearing logs. + This class provides a way to stream logs from a Modal sandbox directly to stdout + through the provided logging function. """ - def __init__(self, sandbox: modal.Sandbox): - self.init_msg = 'Runtime client initialized.' - - self.buffer: list[str] = [] - self.lock = threading.Lock() + def __init__( + self, + sandbox: modal.Sandbox, + logFn: Callable, + ): + self.log = logFn self._stop_event = threading.Event() self.log_generator = bytes_shim(sandbox.stderr) - self.log_stream_thread = threading.Thread(target=self.stream_logs) - self.log_stream_thread.daemon = True - self.log_stream_thread.start() + + # Start the stdout streaming thread + self.stdout_thread = threading.Thread(target=self._stream_logs) + self.stdout_thread.daemon = True + self.stdout_thread.start() + + def _stream_logs(self): + """Stream logs from the Modal sandbox.""" + try: + for log_line in self.log_generator: + if self._stop_event.is_set(): + break + if log_line: + decoded_line = log_line.decode('utf-8').rstrip() + self.log('debug', f'[inside sandbox] {decoded_line}') + except Exception as e: + self.log('error', f'Error streaming modal logs: {e}') class ModalRuntime(EventStreamRuntime): @@ -109,7 +123,7 @@ def __init__( self.action_semaphore = threading.Semaphore(1) # Ensure one action at a time # Buffer for container logs - self.log_buffer: LogBuffer | None = None + self.log_streamer: LogStreamer | None = None if self.config.sandbox.runtime_extra_deps: self.log( @@ -156,7 +170,7 @@ async def connect(self): self.send_status_message('STATUS$CONTAINER_STARTED') - self.log_buffer = ModalLogBuffer(self.sandbox) + self.log_streamer = ModalLogStreamer(self.sandbox, self.log) if self.sandbox is None: raise Exception('Sandbox not initialized') tunnel = self.sandbox.tunnels()[self.container_port] @@ -278,11 +292,8 @@ def _init_sandbox( def close(self): """Closes the ModalRuntime and associated objects.""" - # if self.temp_dir_handler: - # self.temp_dir_handler.__exit__(None, None, None) - - if self.log_buffer: - self.log_buffer.close() + if self.log_streamer: + self.log_streamer.close() if self.session: self.session.close() diff --git a/openhands/runtime/impl/runloop/runloop_runtime.py b/openhands/runtime/impl/runloop/runloop_runtime.py index 76f9b254fdcf..064aa104c372 100644 --- a/openhands/runtime/impl/runloop/runloop_runtime.py +++ b/openhands/runtime/impl/runloop/runloop_runtime.py @@ -12,49 +12,43 @@ from openhands.core.config import AppConfig from openhands.core.logger import openhands_logger as logger from openhands.events import EventStream -from openhands.runtime.impl.eventstream.eventstream_runtime import ( - EventStreamRuntime, - LogBuffer, -) +from openhands.runtime.impl.eventstream.eventstream_runtime import EventStreamRuntime from openhands.runtime.plugins import PluginRequirement from openhands.runtime.utils.command import get_remote_startup_command +from openhands.runtime.utils.log_streamer import LogStreamer from openhands.runtime.utils.request import send_request from openhands.utils.tenacity_stop import stop_if_should_exit CONTAINER_NAME_PREFIX = 'openhands-runtime-' -class RunloopLogBuffer(LogBuffer): - """Synchronous buffer for Runloop devbox logs. +class RunloopLogStreamer(LogStreamer): + """Streams Runloop devbox logs to stdout. - This class provides a thread-safe way to collect, store, and retrieve logs - from a Docker container. It uses a list to store log lines and provides methods - for appending, retrieving, and clearing logs. + This class provides a way to stream logs from a Runloop devbox directly to stdout + through the provided logging function. """ - def __init__(self, runloop_api_client: Runloop, devbox_id: str): - self.client_ready = False - self.init_msg = 'Runtime client initialized.' - - self.buffer: list[str] = [] - self.lock = threading.Lock() - self._stop_event = threading.Event() + def __init__( + self, + runloop_api_client: Runloop, + devbox_id: str, + logFn: Callable, + ): self.runloop_api_client = runloop_api_client self.devbox_id = devbox_id + self.log = logFn self.log_index = 0 - self.log_stream_thread = threading.Thread(target=self.stream_logs) - self.log_stream_thread.daemon = True - self.log_stream_thread.start() - - def stream_logs(self): - """Stream logs from the Docker container in a separate thread. + self._stop_event = threading.Event() - This method runs in its own thread to handle the blocking - operation of reading log lines from the Docker SDK's synchronous generator. - """ + # Start the stdout streaming thread + self.stdout_thread = threading.Thread(target=self._stream_logs) + self.stdout_thread.daemon = True + self.stdout_thread.start() + def _stream_logs(self): + """Stream logs from the Runloop devbox.""" try: - # TODO(Runloop) Replace with stream while True: raw_logs = self.runloop_api_client.devboxes.logs.list( self.devbox_id @@ -70,29 +64,11 @@ def stream_logs(self): break if logs: for log_line in logs: - self.append(log_line) - if self.init_msg in log_line: - self.client_ready = True + self.log('debug', f'[inside devbox] {log_line}') time.sleep(1) except Exception as e: - logger.error(f'Error streaming runloop logs: {e}') - - # NB: Match LogBuffer behavior on below methods - - def get_and_clear(self) -> list[str]: - with self.lock: - logs = list(self.buffer) - self.buffer.clear() - return logs - - def append(self, log_line: str): - with self.lock: - self.buffer.append(log_line) - - def close(self, timeout: float = 5.0): - self._stop_event.set() - self.log_stream_thread.join(timeout) + self.log('error', f'Error streaming runloop logs: {e}') class RunloopRuntime(EventStreamRuntime): @@ -132,7 +108,7 @@ def __init__( headless_mode, ) # Buffer for container logs - self.log_buffer: LogBuffer | None = None + self.log_streamer: LogStreamer | None = None self._vscode_url: str | None = None @tenacity.retry( @@ -224,7 +200,9 @@ async def connect(self): ) # Hook up logs - self.log_buffer = RunloopLogBuffer(self.runloop_api_client, self.devbox.id) + self.log_streamer = RunloopLogStreamer( + self.runloop_api_client, self.devbox.id, logger.info + ) self.api_url = tunnel.url logger.info(f'Container started. Server url: {self.api_url}') @@ -248,9 +226,7 @@ async def connect(self): reraise=(ConnectionRefusedError,), ) def _wait_until_alive(self): - # NB(Runloop): Remote logs are not guaranteed realtime, removing client_ready check from logs - self._refresh_logs() - if not self.log_buffer: + if not self.log_streamer: raise RuntimeError('Runtime client is not ready.') response = send_request( self.session, @@ -266,8 +242,8 @@ def _wait_until_alive(self): raise RuntimeError(msg) def close(self, rm_all_containers: bool | None = True): - if self.log_buffer: - self.log_buffer.close() + if self.log_streamer: + self.log_streamer.close() if self.session: self.session.close() diff --git a/openhands/runtime/utils/log_streamer.py b/openhands/runtime/utils/log_streamer.py new file mode 100644 index 000000000000..24a28b93f36c --- /dev/null +++ b/openhands/runtime/utils/log_streamer.py @@ -0,0 +1,51 @@ +import threading +from typing import Callable + +import docker + + +class LogStreamer: + """Streams Docker container logs to stdout. + + This class provides a way to stream logs from a Docker container directly to stdout + through the provided logging function. + """ + + def __init__( + self, + container: docker.models.containers.Container, + logFn: Callable, + ): + self.log = logFn + self.log_generator = container.logs(stream=True, follow=True) + self._stop_event = threading.Event() + + # Start the stdout streaming thread + self.stdout_thread = threading.Thread(target=self._stream_logs) + self.stdout_thread.daemon = True + self.stdout_thread.start() + + def _stream_logs(self): + """Stream logs from the Docker container to stdout.""" + try: + for log_line in self.log_generator: + if self._stop_event.is_set(): + break + if log_line: + decoded_line = log_line.decode('utf-8').rstrip() + self.log('debug', f'[inside container] {decoded_line}') + except Exception as e: + self.log('error', f'Error streaming docker logs to stdout: {e}') + + def __del__(self): + if self.stdout_thread and self.stdout_thread.is_alive(): + self.close(timeout=5) + + def close(self, timeout: float = 5.0): + """Clean shutdown of the log streaming.""" + self._stop_event.set() + if self.stdout_thread and self.stdout_thread.is_alive(): + self.stdout_thread.join(timeout) + # Close the log generator to release the file descriptor + if hasattr(self.log_generator, 'close'): + self.log_generator.close()