Skip to content

Commit

Permalink
serialize instead of deepcopy for performance gains (#1136)
Browse files Browse the repository at this point in the history
code cleanup changes (no performance benefits):
- [x] declares helper methods for working with Multipart requests
(_multipart.py)
- [x] declares explicit `Operation` dataclasses representing
run/feedback operations in the `tracing_queue` (_operations.py)
- [x] declares helper methods for creating/using `Operation` dataclasses
(_operations.py)
- [x] declare new `_batch_ingest_ops` and `_multipart_ingest_ops`
methods that take `List[Operation]` as input and call respective
endpoints (client.py)
- [x] convert background thread to batch process `Operation` objects in
`_x_ingest_ops` functions (_background_thread.py)
- [x] convert `batch_ingest_runs` and `multipart_ingest_runs` into
wrappers that create `Operation` objects and pass along to
`_x_ingest_ops` functions (client.py)

performance change:
- [x] convert `create_run`, `update_run` to create an `Operation`
dataclass instead of deepcopying the input dictionary for performance
gains 💪 (client.py)

interface changes:
- `multipart_ingest_runs` no longer accepts feedback (not a public
interface yet - shouldn't matter)
- `batch_ingest_runs`: no interface changes
  • Loading branch information
efriis authored Oct 31, 2024
2 parents 3f0ef5e + a41bdbe commit b92aac8
Show file tree
Hide file tree
Showing 12 changed files with 805 additions and 299 deletions.
1 change: 1 addition & 0 deletions python/.gitignore
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
out
profiles
9 changes: 9 additions & 0 deletions python/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,15 @@ benchmark-fast:
rm -f $(OUTPUT)
poetry run python -m bench -o $(OUTPUT) --fast

PROFILE_NAME ?= output

profile-background-thread:
mkdir -p profiles
poetry run python -m cProfile -o profiles/$(PROFILE_NAME).prof bench/create_run.py

view-profile:
poetry run snakeviz profiles/${PROFILE_NAME}.prof

tests:
env \
-u LANGCHAIN_PROJECT \
Expand Down
158 changes: 158 additions & 0 deletions python/bench/create_run.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
import logging
import statistics
import time
from queue import PriorityQueue
from typing import Dict
from unittest.mock import Mock
from uuid import uuid4

from langsmith._internal._background_thread import (
_tracing_thread_drain_queue,
_tracing_thread_handle_batch,
)
from langsmith.client import Client


def create_large_json(length: int) -> Dict:
"""Create a large JSON object for benchmarking purposes."""
large_array = [
{
"index": i,
"data": f"This is element number {i}",
"nested": {"id": i, "value": f"Nested value for element {i}"},
}
for i in range(length)
]

return {
"name": "Huge JSON",
"description": "This is a very large JSON object for benchmarking purposes.",
"array": large_array,
"metadata": {
"created_at": "2024-10-22T19:00:00Z",
"author": "Python Program",
"version": 1.0,
},
}


def create_run_data(run_id: str, json_size: int) -> Dict:
"""Create a single run data object."""
return {
"name": "Run Name",
"id": run_id,
"run_type": "chain",
"inputs": create_large_json(json_size),
"outputs": create_large_json(json_size),
"extra": {"extra_data": "value"},
"trace_id": "trace_id",
"dotted_order": "1.1",
"tags": ["tag1", "tag2"],
"session_name": "Session Name",
}


def mock_session() -> Mock:
"""Create a mock session object."""
mock_session = Mock()
mock_response = Mock()
mock_response.status_code = 202
mock_response.text = "Accepted"
mock_response.json.return_value = {"status": "success"}
mock_session.request.return_value = mock_response
return mock_session


def create_dummy_data(json_size, num_runs) -> list:
return [create_run_data(str(uuid4()), json_size) for i in range(num_runs)]


def create_runs(runs: list, client: Client) -> None:
for run in runs:
client.create_run(**run)


def process_queue(client: Client) -> None:
if client.tracing_queue is None:
raise ValueError("Tracing queue is None")
while next_batch := _tracing_thread_drain_queue(
client.tracing_queue, limit=100, block=False
):
_tracing_thread_handle_batch(
client, client.tracing_queue, next_batch, use_multipart=True
)


def benchmark_run_creation(
*, num_runs: int, json_size: int, samples: int, benchmark_thread: bool
) -> Dict:
"""
Benchmark run creation with specified parameters.
Returns timing statistics.
"""
timings = []

if benchmark_thread:
client = Client(session=mock_session(), api_key="xxx", auto_batch_tracing=False)
client.tracing_queue = PriorityQueue()
else:
client = Client(session=mock_session(), api_key="xxx")

if client.tracing_queue is None:
raise ValueError("Tracing queue is None")

for _ in range(samples):
runs = create_dummy_data(json_size, num_runs)

start = time.perf_counter()

create_runs(runs, client)

# wait for client.tracing_queue to be empty
if benchmark_thread:
# reset the timer
start = time.perf_counter()
process_queue(client)
else:
client.tracing_queue.join()

elapsed = time.perf_counter() - start

del runs

timings.append(elapsed)

return {
"mean": statistics.mean(timings),
"median": statistics.median(timings),
"stdev": statistics.stdev(timings) if len(timings) > 1 else 0,
"min": min(timings),
"max": max(timings),
}


def test_benchmark_runs(
*, json_size: int, num_runs: int, samples: int, benchmark_thread: bool
):
"""
Run benchmarks with different combinations of parameters and report results.
"""
results = benchmark_run_creation(
num_runs=num_runs,
json_size=json_size,
samples=samples,
benchmark_thread=benchmark_thread,
)

print(f"\nBenchmark Results for {num_runs} runs with JSON size {json_size}:")
print(f"Mean time: {results['mean']:.4f} seconds")
print(f"Median time: {results['median']:.4f} seconds")
print(f"Std Dev: {results['stdev']:.4f} seconds")
print(f"Min time: {results['min']:.4f} seconds")
print(f"Max time: {results['max']:.4f} seconds")
print(f"Throughput: {num_runs / results['mean']:.2f} runs/second")


if __name__ == "__main__":
logging.basicConfig(level=logging.DEBUG)
test_benchmark_runs(json_size=5000, num_runs=1000, samples=1, benchmark_thread=True)
55 changes: 43 additions & 12 deletions python/langsmith/_internal/_background_thread.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
from __future__ import annotations

import functools
import logging
import sys
import threading
import weakref
from dataclasses import dataclass, field
from queue import Empty, Queue
from typing import (
TYPE_CHECKING,
Any,
List,
Union,
cast,
)

from langsmith import schemas as ls_schemas
Expand All @@ -18,14 +19,19 @@
_AUTO_SCALE_UP_NTHREADS_LIMIT,
_AUTO_SCALE_UP_QSIZE_TRIGGER,
)
from langsmith._internal._operations import (
SerializedFeedbackOperation,
SerializedRunOperation,
combine_serialized_queue_operations,
)

if TYPE_CHECKING:
from langsmith.client import Client

logger = logging.getLogger("langsmith.client")


@dataclass(order=True)
@functools.total_ordering
class TracingQueueItem:
"""An item in the tracing queue.
Expand All @@ -36,8 +42,29 @@ class TracingQueueItem:
"""

priority: str
action: str
item: Any = field(compare=False)
item: Union[SerializedRunOperation, SerializedFeedbackOperation]

__slots__ = ("priority", "item")

def __init__(
self,
priority: str,
item: Union[SerializedRunOperation, SerializedFeedbackOperation],
) -> None:
self.priority = priority
self.item = item

def __lt__(self, other: TracingQueueItem) -> bool:
return (self.priority, self.item.__class__) < (
other.priority,
other.item.__class__,
)

def __eq__(self, other: object) -> bool:
return isinstance(other, TracingQueueItem) and (
self.priority,
self.item.__class__,
) == (other.priority, other.item.__class__)


def _tracing_thread_drain_queue(
Expand Down Expand Up @@ -67,16 +94,20 @@ def _tracing_thread_handle_batch(
batch: List[TracingQueueItem],
use_multipart: bool,
) -> None:
create = [it.item for it in batch if it.action == "create"]
update = [it.item for it in batch if it.action == "update"]
feedback = [it.item for it in batch if it.action == "feedback"]
try:
ops = combine_serialized_queue_operations([item.item for item in batch])
if use_multipart:
client.multipart_ingest(
create=create, update=update, feedback=feedback, pre_sampled=True
)
client._multipart_ingest_ops(ops)
else:
client.batch_ingest_runs(create=create, update=update, pre_sampled=True)
if any(isinstance(op, SerializedFeedbackOperation) for op in ops):
logger.warn(
"Feedback operations are not supported in non-multipart mode"
)
ops = [
op for op in ops if not isinstance(op, SerializedFeedbackOperation)
]
client._batch_ingest_run_ops(cast(List[SerializedRunOperation], ops))

except Exception:
logger.error("Error in tracing queue", exc_info=True)
# exceptions are logged elsewhere, but we need to make sure the
Expand Down
27 changes: 27 additions & 0 deletions python/langsmith/_internal/_multipart.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
from __future__ import annotations

from typing import Dict, Iterable, Tuple

MultipartPart = Tuple[str, Tuple[None, bytes, str, Dict[str, str]]]


class MultipartPartsAndContext:
parts: list[MultipartPart]
context: str

__slots__ = ("parts", "context")

def __init__(self, parts: list[MultipartPart], context: str) -> None:
self.parts = parts
self.context = context


def join_multipart_parts_and_context(
parts_and_contexts: Iterable[MultipartPartsAndContext],
) -> MultipartPartsAndContext:
acc_parts: list[MultipartPart] = []
acc_context: list[str] = []
for parts_and_context in parts_and_contexts:
acc_parts.extend(parts_and_context.parts)
acc_context.append(parts_and_context.context)
return MultipartPartsAndContext(acc_parts, "; ".join(acc_context))
Loading

0 comments on commit b92aac8

Please sign in to comment.