Skip to content

Commit

Permalink
feat: support directly stream logs from container to stdout in debug …
Browse files Browse the repository at this point in the history
…mode (#5408)
  • Loading branch information
xingyaoww authored Dec 16, 2024
1 parent d6a2c4b commit e0b2310
Show file tree
Hide file tree
Showing 4 changed files with 121 additions and 170 deletions.
105 changes: 9 additions & 96 deletions openhands/runtime/impl/eventstream/eventstream_runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
from openhands.runtime.impl.eventstream.containers import remove_all_containers
from openhands.runtime.plugins import PluginRequirement
from openhands.runtime.utils import find_available_tcp_port
from openhands.runtime.utils.log_streamer import LogStreamer
from openhands.runtime.utils.request import send_request
from openhands.runtime.utils.runtime_build import build_runtime_image
from openhands.utils.async_utils import call_sync_from_async
Expand All @@ -58,68 +59,6 @@ def remove_all_runtime_containers():
atexit.register(remove_all_runtime_containers)


class LogBuffer:
"""Synchronous buffer for Docker container logs.
This class provides a thread-safe way to collect, store, and retrieve logs
from a Docker container. It uses a list to store log lines and provides methods
for appending, retrieving, and clearing logs.
"""

def __init__(self, container: docker.models.containers.Container, logFn: Callable):
self.init_msg = 'Runtime client initialized.'

self.buffer: list[str] = []
self.lock = threading.Lock()
self._stop_event = threading.Event()
self.log_generator = container.logs(stream=True, follow=True)
self.log_stream_thread = threading.Thread(target=self.stream_logs)
self.log_stream_thread.daemon = True
self.log_stream_thread.start()
self.log = logFn

def append(self, log_line: str):
with self.lock:
self.buffer.append(log_line)

def get_and_clear(self) -> list[str]:
with self.lock:
logs = list(self.buffer)
self.buffer.clear()
return logs

def stream_logs(self):
"""Stream logs from the Docker container in a separate thread.
This method runs in its own thread to handle the blocking
operation of reading log lines from the Docker SDK's synchronous generator.
"""
try:
for log_line in self.log_generator:
if self._stop_event.is_set():
break
if log_line:
decoded_line = log_line.decode('utf-8').rstrip()
self.append(decoded_line)
except Exception as e:
self.log('error', f'Error streaming docker logs: {e}')

def __del__(self):
if self.log_stream_thread.is_alive():
self.log(
'warn',
"LogBuffer was not properly closed. Use 'log_buffer.close()' for clean shutdown.",
)
self.close(timeout=5)

def close(self, timeout: float = 5.0):
self._stop_event.set()
self.log_stream_thread.join(timeout)
# Close the log generator to release the file descriptor
if hasattr(self.log_generator, 'close'):
self.log_generator.close()


class EventStreamRuntime(Runtime):
"""This runtime will subscribe the event stream.
When receive an event, it will send the event to runtime-client which run inside the docker environment.
Expand Down Expand Up @@ -186,7 +125,7 @@ def __init__(
self.runtime_builder = DockerRuntimeBuilder(self.docker_client)

# Buffer for container logs
self.log_buffer: LogBuffer | None = None
self.log_streamer: LogStreamer | None = None

self.init_base_runtime(
config,
Expand Down Expand Up @@ -241,7 +180,7 @@ async def connect(self):
f'Container started: {self.container_name}. VSCode URL: {self.vscode_url}',
)

self.log_buffer = LogBuffer(self.container, self.log)
self.log_streamer = LogStreamer(self.container, self.log)

if not self.attach_to_existing:
self.log('info', f'Waiting for client to become ready at {self.api_url}...')
Expand Down Expand Up @@ -407,27 +346,6 @@ def _attach_to_container(self):
f'attached to container: {self.container_name} {self._container_port} {self.api_url}',
)

def _refresh_logs(self):
self.log('debug', 'Getting container logs...')

assert (
self.log_buffer is not None
), 'Log buffer is expected to be initialized when container is started'

logs = self.log_buffer.get_and_clear()
if logs:
formatted_logs = '\n'.join([f' |{log}' for log in logs])
self.log(
'debug',
'\n'
+ '-' * 35
+ 'Container logs:'
+ '-' * 35
+ f'\n{formatted_logs}'
+ '\n'
+ '-' * 80,
)

@tenacity.retry(
stop=tenacity.stop_after_delay(120) | stop_if_should_exit(),
retry=tenacity.retry_if_exception_type(
Expand All @@ -446,8 +364,7 @@ def _wait_until_alive(self):
except docker.errors.NotFound:
raise RuntimeNotFoundError(f'Container {self.container_name} not found.')

self._refresh_logs()
if not self.log_buffer:
if not self.log_streamer:
raise RuntimeError('Runtime client is not ready.')

with send_request(
Expand All @@ -464,8 +381,8 @@ def close(self, rm_all_containers: bool | None = None):
Parameters:
- rm_all_containers (bool): Whether to remove all containers with the 'openhands-sandbox-' prefix
"""
if self.log_buffer:
self.log_buffer.close()
if self.log_streamer:
self.log_streamer.close()

if self.session:
self.session.close()
Expand Down Expand Up @@ -513,8 +430,6 @@ def run_action(self, action: Action) -> Observation:
'Action has been rejected by the user! Waiting for further user input.'
)

self._refresh_logs()

assert action.timeout is not None

try:
Expand All @@ -533,7 +448,7 @@ def run_action(self, action: Action) -> Observation:
raise RuntimeError(
f'Runtime failed to return execute_action before the requested timeout of {action.timeout}s'
)
self._refresh_logs()

return obs

def run(self, action: CmdRunAction) -> Observation:
Expand Down Expand Up @@ -564,7 +479,6 @@ def copy_to(
if not os.path.exists(host_src):
raise FileNotFoundError(f'Source file {host_src} does not exist')

self._refresh_logs()
try:
if recursive:
# For recursive copy, create a zip file
Expand Down Expand Up @@ -609,14 +523,13 @@ def copy_to(
self.log(
'debug', f'Copy completed: host:{host_src} -> runtime:{sandbox_dest}'
)
self._refresh_logs()

def list_files(self, path: str | None = None) -> list[str]:
"""List files in the sandbox.
If path is None, list files in the sandbox's initial working directory (e.g., /workspace).
"""
self._refresh_logs()

try:
data = {}
if path is not None:
Expand All @@ -637,7 +550,7 @@ def list_files(self, path: str | None = None) -> list[str]:

def copy_from(self, path: str) -> Path:
"""Zip all files in the sandbox and return as a stream of bytes."""
self._refresh_logs()

try:
params = {'path': path}
with send_request(
Expand Down
53 changes: 32 additions & 21 deletions openhands/runtime/impl/modal/modal_runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from openhands.events import EventStream
from openhands.runtime.impl.eventstream.eventstream_runtime import (
EventStreamRuntime,
LogBuffer,
LogStreamer,
)
from openhands.runtime.plugins import PluginRequirement
from openhands.runtime.utils.command import get_remote_startup_command
Expand All @@ -32,24 +32,38 @@ def bytes_shim(string_generator) -> Generator[bytes, None, None]:
yield line.encode('utf-8')


class ModalLogBuffer(LogBuffer):
"""Synchronous buffer for Modal sandbox logs.
class ModalLogStreamer(LogStreamer):
"""Streams Modal sandbox logs to stdout.
This class provides a thread-safe way to collect, store, and retrieve logs
from a Modal sandbox. It uses a list to store log lines and provides methods
for appending, retrieving, and clearing logs.
This class provides a way to stream logs from a Modal sandbox directly to stdout
through the provided logging function.
"""

def __init__(self, sandbox: modal.Sandbox):
self.init_msg = 'Runtime client initialized.'

self.buffer: list[str] = []
self.lock = threading.Lock()
def __init__(
self,
sandbox: modal.Sandbox,
logFn: Callable,
):
self.log = logFn
self._stop_event = threading.Event()
self.log_generator = bytes_shim(sandbox.stderr)
self.log_stream_thread = threading.Thread(target=self.stream_logs)
self.log_stream_thread.daemon = True
self.log_stream_thread.start()

# Start the stdout streaming thread
self.stdout_thread = threading.Thread(target=self._stream_logs)
self.stdout_thread.daemon = True
self.stdout_thread.start()

def _stream_logs(self):
"""Stream logs from the Modal sandbox."""
try:
for log_line in self.log_generator:
if self._stop_event.is_set():
break
if log_line:
decoded_line = log_line.decode('utf-8').rstrip()
self.log('debug', f'[inside sandbox] {decoded_line}')
except Exception as e:
self.log('error', f'Error streaming modal logs: {e}')


class ModalRuntime(EventStreamRuntime):
Expand Down Expand Up @@ -109,7 +123,7 @@ def __init__(
self.action_semaphore = threading.Semaphore(1) # Ensure one action at a time

# Buffer for container logs
self.log_buffer: LogBuffer | None = None
self.log_streamer: LogStreamer | None = None

if self.config.sandbox.runtime_extra_deps:
self.log(
Expand Down Expand Up @@ -156,7 +170,7 @@ async def connect(self):

self.send_status_message('STATUS$CONTAINER_STARTED')

self.log_buffer = ModalLogBuffer(self.sandbox)
self.log_streamer = ModalLogStreamer(self.sandbox, self.log)
if self.sandbox is None:
raise Exception('Sandbox not initialized')
tunnel = self.sandbox.tunnels()[self.container_port]
Expand Down Expand Up @@ -278,11 +292,8 @@ def _init_sandbox(

def close(self):
"""Closes the ModalRuntime and associated objects."""
# if self.temp_dir_handler:
# self.temp_dir_handler.__exit__(None, None, None)

if self.log_buffer:
self.log_buffer.close()
if self.log_streamer:
self.log_streamer.close()

if self.session:
self.session.close()
Expand Down
Loading

0 comments on commit e0b2310

Please sign in to comment.