Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add upset examples multipart endpoint #1209

Draft
wants to merge 10 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
120 changes: 120 additions & 0 deletions python/bench/upload_examples_bench.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
import statistics

Check notice on line 1 in python/bench/upload_examples_bench.py

View workflow job for this annotation

GitHub Actions / benchmark

Benchmark results

......................................... create_5_000_run_trees: Mean +- std dev: 619 ms +- 45 ms ......................................... create_10_000_run_trees: Mean +- std dev: 1.19 sec +- 0.06 sec ......................................... create_20_000_run_trees: Mean +- std dev: 1.18 sec +- 0.06 sec ......................................... dumps_class_nested_py_branch_and_leaf_200x400: Mean +- std dev: 705 us +- 10 us ......................................... dumps_class_nested_py_leaf_50x100: Mean +- std dev: 25.2 ms +- 0.3 ms ......................................... dumps_class_nested_py_leaf_100x200: Mean +- std dev: 104 ms +- 2 ms ......................................... dumps_dataclass_nested_50x100: Mean +- std dev: 25.6 ms +- 0.3 ms ......................................... WARNING: the benchmark result may be unstable * the standard deviation (15.5 ms) is 24% of the mean (65.7 ms) Try to rerun the benchmark with more runs, values and/or loops. Run 'python -m pyperf system tune' command to reduce the system jitter. Use pyperf stats, pyperf dump and pyperf hist to analyze results. Use --quiet option to hide these warnings. dumps_pydantic_nested_50x100: Mean +- std dev: 65.7 ms +- 15.5 ms ......................................... WARNING: the benchmark result may be unstable * the standard deviation (29.3 ms) is 13% of the mean (219 ms) Try to rerun the benchmark with more runs, values and/or loops. Run 'python -m pyperf system tune' command to reduce the system jitter. Use pyperf stats, pyperf dump and pyperf hist to analyze results. Use --quiet option to hide these warnings. dumps_pydanticv1_nested_50x100: Mean +- std dev: 219 ms +- 29 ms

Check notice on line 1 in python/bench/upload_examples_bench.py

View workflow job for this annotation

GitHub Actions / benchmark

Comparison against main

+-----------------------------------------------+----------+------------------------+ | Benchmark | main | changes | +===============================================+==========+========================+ | create_20_000_run_trees | 1.20 sec | 1.18 sec: 1.02x faster | +-----------------------------------------------+----------+------------------------+ | create_10_000_run_trees | 1.21 sec | 1.19 sec: 1.02x faster | +-----------------------------------------------+----------+------------------------+ | dumps_class_nested_py_leaf_50x100 | 25.3 ms | 25.2 ms: 1.00x faster | +-----------------------------------------------+----------+------------------------+ | dumps_class_nested_py_branch_and_leaf_200x400 | 698 us | 705 us: 1.01x slower | +-----------------------------------------------+----------+------------------------+ | Geometric mean | (ref) | 1.01x faster | +-----------------------------------------------+----------+------------------------+ Benchmark hidden because not significant (5): dumps_pydantic_nested_50x100, dumps_pydanticv1_nested_50x100, create_5_000_run_trees, dumps_dataclass_nested_50x100, dumps_class_nested_py_leaf_100x200
import time
from typing import Dict
from uuid import uuid4
from langsmith.schemas import DataType, ExampleCreateWithAttachments
import sys
from langsmith import Client

def create_large_json(length: int) -> Dict:
"""Create a large JSON object for benchmarking purposes."""
large_array = [
{
"index": i,
"data": f"This is element number {i}",
"nested": {"id": i, "value": f"Nested value for element {i}"},
}
for i in range(length)
]

return {
"name": "Huge JSON" + str(uuid4()),
"description": "This is a very large JSON object for benchmarking purposes.",
"array": large_array,
"metadata": {
"created_at": "2024-10-22T19:00:00Z",
"author": "Python Program",
"version": 1.0,
},
}


def create_example_data(dataset_id: str, json_size: int) -> Dict:
"""Create a single example data object."""
return ExampleCreateWithAttachments(**{
"dataset_id": dataset_id,
"inputs": create_large_json(json_size),
"outputs": create_large_json(json_size),
})

DATASET_NAME = "upsert_llm_evaluator_benchmark_dataset"
def benchmark_example_uploading(num_examples: int, json_size: int, samples: int = 1) -> Dict:
"""
Benchmark run creation with specified parameters.
Returns timing statistics.
"""
multipart_timings, old_timings = [], []


for _ in range(samples):
client = Client(api_url="https://dev.api.smith.langchain.com")

if client.has_dataset(dataset_name=DATASET_NAME):
client.delete_dataset(dataset_name=DATASET_NAME)

dataset = client.create_dataset(
DATASET_NAME,
description="Test dataset for multipart example upload",
data_type=DataType.kv,
)
examples = [create_example_data(dataset.id, json_size) for i in range(num_examples)]

# Old method
old_start = time.perf_counter()
inputs=[e.inputs for e in examples]
outputs=[e.outputs for e in examples]
# the create_examples endpoint fails above 20mb - so this will crash with json_size > ~100
client.create_examples(inputs=inputs,
outputs=outputs,dataset_id=dataset.id)
old_elapsed = time.perf_counter() - old_start

# New method
multipart_start = time.perf_counter()
client.upsert_examples_multipart(upserts=examples)
multipart_elapsed = time.perf_counter() - multipart_start

multipart_timings.append(multipart_elapsed)
old_timings.append(old_elapsed)

return {
"old": {
"mean": statistics.mean(old_timings),
"median": statistics.median(old_timings),
"stdev": statistics.stdev(old_timings) if len(old_timings) > 1 else 0,
"min": min(old_timings),
"max": max(old_timings),
},
"new": {
"mean": statistics.mean(multipart_timings),
"median": statistics.median(multipart_timings),
"stdev": statistics.stdev(multipart_timings) if len(multipart_timings) > 1 else 0,
"min": min(multipart_timings),
"max": max(multipart_timings),
}
}

json_size = 1000
num_examples = 1000

def main(json_size: int, num_examples: int):
"""
Run benchmarks with different combinations of parameters and report results.
"""
results = benchmark_example_uploading(num_examples=num_examples, json_size=json_size)

print(f"\nBenchmark Results for {num_examples} examples with JSON size {json_size}:")
isahers1 marked this conversation as resolved.
Show resolved Hide resolved
print("-" * 60)
print(f"{'Metric':<15} {'Old Method':>20} {'New Method':>20}")
print("-" * 60)

metrics = ['mean', 'median', 'stdev', 'min', 'max']
for metric in metrics:
print(f"{metric:<15} {results['old'][metric]:>20.4f} {results['new'][metric]:>20.4f}")

print("-" * 60)
print(f"{'Throughput':<15} {num_examples / results['old']['mean']:>20.2f} {num_examples / results['new']['mean']:>20.2f}")
print("(examples/second)")


if __name__ == "__main__":
main(json_size, num_examples)
130 changes: 130 additions & 0 deletions python/langsmith/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@
_SIZE_LIMIT_BYTES,
)
from langsmith._internal._multipart import (
MultipartPart,
MultipartPartsAndContext,
join_multipart_parts_and_context,
)
Expand Down Expand Up @@ -3369,6 +3370,135 @@ def create_example_from_run(
created_at=created_at,
)

def upsert_examples_multipart(
self,
*,
upserts: List[ls_schemas.ExampleUpsertWithAttachments] = None,
) -> ls_schemas.UpsertExamplesResponse:
"""Upsert examples."""
""" if not (self.info.instance_flags or {}).get(
"examples_multipart_enabled", False
):
raise ValueError("Your LangSmith version does not allow using the multipart examples endpoint, please update to the latest version.")
"""
if upserts is None:
upserts = []
parts: list[MultipartPart] = []

for example in upserts:
if example.id is not None:
example_id = str(example.id)
else:
example_id = str(uuid.uuid4())

example_body = {
"dataset_id": example.dataset_id,
"created_at": example.created_at,
}
if example.metadata is not None:
example_body["metadata"] = example.metadata
if example.split is not None:
example_body["split"] = example.split
valb = _dumps_json(example_body)

(
parts.append(
(
f"{example_id}",
(
None,
valb,
"application/json",
{},
),
)
),
)

inputsb = _dumps_json(example.inputs)

(
parts.append(
(
f"{example_id}.inputs",
(
None,
inputsb,
"application/json",
{},
),
)
),
)

if example.outputs:
outputsb = _dumps_json(example.outputs)
(
parts.append(
(
f"{example_id}.outputs",
(
None,
outputsb,
"application/json",
{},
),
)
),
)

if example.attachments:
for name, attachment in example.attachments.items():
if isinstance(attachment, tuple):
mime_type, data = attachment
(
parts.append(
(
f"{example_id}.attachment.{name}",
(
None,
data,
f"{mime_type}; length={len(data)}",
{},
isahers1 marked this conversation as resolved.
Show resolved Hide resolved
),
)
),
)
else:
(
parts.append(
(
f"{example_id}.attachment.{name}",
(
None,
attachment.data,
f"{attachment.mime_type}; length={len(attachment.data)}",
{},
),
)
),
)

encoder = rqtb_multipart.MultipartEncoder(parts, boundary=BOUNDARY)
if encoder.len <= 20_000_000: # ~20 MB
data = encoder.to_string()
else:
data = encoder

response = self.request_with_retries(
"POST",
"/v1/platform/examples/multipart",
request_kwargs={
"data": data,
"headers": {
**self._headers,
"Content-Type": encoder.content_type,
},
},
)
ls_utils.raise_for_status_with_text(response)
return response.json()

def create_examples(
self,
*,
Expand Down
26 changes: 20 additions & 6 deletions python/langsmith/schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,12 @@ class ExampleCreate(ExampleBase):
split: Optional[Union[str, List[str]]] = None


class ExampleUpsertWithAttachments(ExampleCreate):
"""Example create with attachments."""

attachments: Optional[Attachments] = None


class Example(ExampleBase):
"""Example model."""

Expand Down Expand Up @@ -125,12 +131,6 @@ def url(self) -> Optional[str]:
return None


class ExampleSearch(ExampleBase):
"""Example returned via search."""

id: UUID


class ExampleUpdate(BaseModel):
"""Update class for Example."""

Expand All @@ -145,6 +145,10 @@ class Config:

frozen = True

class ExampleUpdateWithAttachments(ExampleUpdate):
"""Example update with attachments."""
id: UUID
attachments: Optional[Attachments] = None

class DataType(str, Enum):
"""Enum for dataset data types."""
Expand Down Expand Up @@ -695,6 +699,8 @@ class LangSmithInfo(BaseModel):
license_expiration_time: Optional[datetime] = None
"""The time the license will expire."""
batch_ingest_config: Optional[BatchIngestConfig] = None
"""The instance flags."""
instance_flags: dict[str, Any] = None


Example.update_forward_refs()
Expand Down Expand Up @@ -980,3 +986,11 @@ class UsageMetadata(TypedDict):

Does *not* need to sum to full output token count. Does *not* need to have all keys.
"""

class UpsertExamplesResponse(TypedDict):
"""Response object returned from the upsert_examples_multipart method."""

count: int
"""The number of examples that were upserted."""
example_ids: List[str]
"""The ids of the examples that were upserted."""
Loading
Loading