diff --git a/runtime/prompty/prompty/core.py b/runtime/prompty/prompty/core.py index f06bc8f..2704661 100644 --- a/runtime/prompty/prompty/core.py +++ b/runtime/prompty/prompty/core.py @@ -6,9 +6,9 @@ import json import abc from pathlib import Path +from .tracer import Tracer, trace, to_dict from pydantic import BaseModel, Field, FilePath -from typing import List, Literal, Dict, Callable, Set, TypeVar -from .tracer import trace +from typing import Iterator, List, Literal, Dict, Callable, Set class PropertySettings(BaseModel): @@ -449,3 +449,33 @@ def read(cls, string): "body": body, "frontmatter": fmatter, } + + +class PromptyStream(Iterator): + """PromptyStream class to iterate over LLM stream. + Necessary for Prompty to handle streaming data when tracing.""" + + def __init__(self, name: str, iterator: Iterator): + self.name = name + self.iterator = iterator + self.items: List[any] = [] + self.__name__ = "PromptyStream" + + def __iter__(self): + return self + + def __next__(self): + try: + # enumerate but add to list + o = self.iterator.__next__() + self.items.append(o) + return o + + except StopIteration: + # StopIteration is raised + # contents are exhausted + if len(self.items) > 0: + with Tracer.start(f"{self.name}.PromptyStream") as trace: + trace("items", [to_dict(s) for s in self.items]) + + raise StopIteration diff --git a/runtime/prompty/prompty/executors.py b/runtime/prompty/prompty/executors.py index 3e4f555..fd05820 100644 --- a/runtime/prompty/prompty/executors.py +++ b/runtime/prompty/prompty/executors.py @@ -1,8 +1,8 @@ import azure.identity -from .tracer import Trace -from openai import AzureOpenAI -from .core import Invoker, InvokerFactory, Prompty import importlib.metadata +from typing import Iterator +from openai import AzureOpenAI +from .core import Invoker, InvokerFactory, Prompty, PromptyStream VERSION = importlib.metadata.version("prompty") @@ -87,9 +87,8 @@ def invoke(self, data: any) -> any: elif self.api == "image": raise NotImplementedError("Azure OpenAI Image API is not implemented yet") - if hasattr(response, "usage") and response.usage: - Trace.add("completion_tokens", response.usage.completion_tokens) - Trace.add("prompt_tokens", response.usage.prompt_tokens) - Trace.add("total_tokens", response.usage.total_tokens) - - return response + # stream response + if isinstance(response, Iterator): + return PromptyStream("AzureOpenAIExecutor", response) + else: + return response diff --git a/runtime/prompty/prompty/processors.py b/runtime/prompty/prompty/processors.py index c54da01..a82b35e 100644 --- a/runtime/prompty/prompty/processors.py +++ b/runtime/prompty/prompty/processors.py @@ -1,10 +1,8 @@ -from .tracer import Trace -from openai import Stream from typing import Iterator from pydantic import BaseModel from openai.types.completion import Completion -from .core import Invoker, InvokerFactory, Prompty from openai.types.chat.chat_completion import ChatCompletion +from .core import Invoker, InvokerFactory, Prompty, PromptyStream from openai.types.create_embedding_response import CreateEmbeddingResponse @@ -66,9 +64,8 @@ def generator(): for chunk in data: if len(chunk.choices) == 1 and chunk.choices[0].delta.content != None: content = chunk.choices[0].delta.content - Trace.add("stream", content) yield content - return generator() + return PromptyStream("OpenAIProcessor", generator()) else: return data diff --git a/runtime/prompty/prompty/tracer.py b/runtime/prompty/prompty/tracer.py index 56e3b07..7bf0e25 100644 --- a/runtime/prompty/prompty/tracer.py +++ b/runtime/prompty/prompty/tracer.py @@ -1,92 +1,64 @@ -import abc +import os import json import inspect -import datetime +import contextlib +from pathlib import Path from numbers import Number -import os from datetime import datetime -from pathlib import Path from pydantic import BaseModel from functools import wraps, partial -from typing import Any, Callable, Dict, List - - -class Tracer(abc.ABC): - - @abc.abstractmethod - def start(self, name: str) -> None: - pass - - @abc.abstractmethod - def add(self, key: str, value: Any) -> None: - pass +from typing import Any, Callable, Dict, Iterator, List - @abc.abstractmethod - def end(self) -> None: - pass - -class Trace: - _tracers: Dict[str, Tracer] = {} +class Tracer: + _tracers: Dict[str, Callable[[str], Iterator[Callable[[str, Any], None]]]] = {} @classmethod - def add_tracer(cls, name: str, tracer: Tracer) -> None: + def add( + cls, name: str, tracer: Callable[[str], Iterator[Callable[[str, Any], None]]] + ) -> None: cls._tracers[name] = tracer - @classmethod - def start(cls, name: str) -> None: - for tracer in cls._tracers.values(): - tracer.start(name) - - @classmethod - def add(cls, name: str, value: Any) -> None: - for tracer in cls._tracers.values(): - tracer.add(name, value) - - @classmethod - def end(cls) -> None: - for tracer in cls._tracers.values(): - tracer.end() - @classmethod def clear(cls) -> None: cls._tracers = {} @classmethod - def register(cls, name: str): - def inner_wrapper(wrapped_class: Tracer) -> Callable: - cls._tracers[name] = wrapped_class() - return wrapped_class - - return inner_wrapper - - @classmethod - def to_dict(cls, obj: Any) -> Dict[str, Any]: - # simple json types - if isinstance(obj, str) or isinstance(obj, Number) or isinstance(obj, bool): - return obj - # datetime - elif isinstance(obj, datetime): - return obj.isoformat() - # safe Prompty obj serialization - elif type(obj).__name__ == "Prompty": - return obj.to_safe_dict() - # pydantic models have their own json serialization - elif isinstance(obj, BaseModel): - return obj.model_dump() - # recursive list and dict - elif isinstance(obj, list): - return [Trace.to_dict(item) for item in obj] - elif isinstance(obj, dict): - return { - k: v if isinstance(v, str) else Trace.to_dict(v) - for k, v in obj.items() - } - elif isinstance(obj, Path): - return str(obj) - # cast to string otherwise... - else: - return str(obj) + @contextlib.contextmanager + def start(cls, name: str) -> Iterator[Callable[[str, Any], None]]: + with contextlib.ExitStack() as stack: + traces = [ + stack.enter_context(tracer(name)) for tracer in cls._tracers.values() + ] + yield lambda key, value: [trace(key, value) for trace in traces] + + +def to_dict(obj: Any) -> Dict[str, Any]: + # simple json types + if isinstance(obj, str) or isinstance(obj, Number) or isinstance(obj, bool): + return obj + # datetime + elif isinstance(obj, datetime): + return obj.isoformat() + # safe Prompty obj serialization + elif type(obj).__name__ == "Prompty": + return obj.to_safe_dict() + # safe PromptyStream obj serialization + elif type(obj).__name__ == "PromptyStream": + return "PromptyStream" + # pydantic models have their own json serialization + elif isinstance(obj, BaseModel): + return obj.model_dump() + # recursive list and dict + elif isinstance(obj, list): + return [to_dict(item) for item in obj] + elif isinstance(obj, dict): + return {k: v if isinstance(v, str) else to_dict(v) for k, v in obj.items()} + elif isinstance(obj, Path): + return str(obj) + # cast to string otherwise... + else: + return str(obj) def _name(func: Callable, args): @@ -110,14 +82,14 @@ def _inputs(func: Callable, args, kwargs) -> dict: ba = inspect.signature(func).bind(*args, **kwargs) ba.apply_defaults() - inputs = {k: Trace.to_dict(v) for k, v in ba.arguments.items() if k != "self"} + inputs = {k: to_dict(v) for k, v in ba.arguments.items() if k != "self"} return inputs + def _results(result: Any) -> dict: - return { - "result": Trace.to_dict(result) if result is not None else "None", - } + return to_dict(result) if result is not None else "None" + def _trace_sync(func: Callable = None, *, description: str = None) -> Callable: description = description or "" @@ -125,107 +97,104 @@ def _trace_sync(func: Callable = None, *, description: str = None) -> Callable: @wraps(func) def wrapper(*args, **kwargs): name, signature = _name(func, args) - Trace.start(name) - Trace.add("signature", signature) - if description and description != "": - Trace.add("description", description) + with Tracer.start(name) as trace: + trace("signature", signature) + if description and description != "": + trace("description", description) - inputs = _inputs(func, args, kwargs) - Trace.add("inputs", inputs) + inputs = _inputs(func, args, kwargs) + trace("inputs", inputs) - result = func(*args, **kwargs) - Trace.add("result", _results(result)) + result = func(*args, **kwargs) + trace("result", _results(result)) - Trace.end() + return result - return result - return wrapper + def _trace_async(func: Callable = None, *, description: str = None) -> Callable: description = description or "" @wraps(func) async def wrapper(*args, **kwargs): name, signature = _name(func, args) - Trace.start(name) - Trace.add("signature", signature) - if description and description != "": - Trace.add("description", description) + with Tracer.start(name) as trace: + trace("signature", signature) + if description and description != "": + trace("description", description) - inputs = _inputs(func, args, kwargs) - Trace.add("inputs", inputs) + inputs = _inputs(func, args, kwargs) + trace("inputs", inputs) - result = await func(*args, **kwargs) - Trace.add("result", _results(result)) + result = await func(*args, **kwargs) + trace("result", _results(result)) - Trace.end() + return result - return result - return wrapper + def trace(func: Callable = None, *, description: str = None) -> Callable: if func is None: return partial(trace, description=description) - - wrapped_method = ( - _trace_async if inspect.iscoroutinefunction(func) else _trace_sync - ) - return wrapped_method(func, description=description) + wrapped_method = _trace_async if inspect.iscoroutinefunction(func) else _trace_sync + return wrapped_method(func, description=description) -class PromptyTracer(Tracer): - _stack: List[Dict[str, Any]] = [] - _name: str = None +class PromptyTracer: def __init__(self, output_dir: str = None) -> None: - super().__init__() if output_dir: - self.root = Path(output_dir).resolve().absolute() + self.output = Path(output_dir).resolve().absolute() else: - self.root = Path(Path(os.getcwd()) / ".runs").resolve().absolute() - - if not self.root.exists(): - self.root.mkdir(parents=True, exist_ok=True) - - def start(self, name: str) -> None: - self._stack.append({"name": name}) - # first entry frame - if self._name is None: - self._name = name - - def add(self, name: str, value: Any) -> None: - frame = self._stack[-1] - if name not in frame: - frame[name] = value - # multiple values creates list - else: - if isinstance(frame[name], list): - frame[name].append(value) + self.output = Path(Path(os.getcwd()) / ".runs").resolve().absolute() + + if not self.output.exists(): + self.output.mkdir(parents=True, exist_ok=True) + + self.stack: List[Dict[str, Any]] = [] + + @contextlib.contextmanager + def tracer(self, name: str) -> Iterator[Callable[[str, Any], None]]: + try: + self.stack.append({"name": name}) + frame = self.stack[-1] + + def add(key: str, value: Any) -> None: + if key not in frame: + frame[key] = value + # multiple values creates list + else: + if isinstance(frame[key], list): + frame[key].append(value) + else: + frame[key] = [frame[key], value] + + yield add + finally: + frame = self.stack.pop() + # if stack is empty, dump the frame + if len(self.stack) == 0: + trace_file = ( + self.output + / f"{frame['name']}.{datetime.now().strftime('%Y%m%d.%H%M%S')}.ptrace" + ) + + with open(trace_file, "w") as f: + json.dump(frame, f, indent=4) + # otherwise, append the frame to the parent else: - frame[name] = [frame[name], value] - - - def end(self) -> None: - # pop the current stack - frame = self._stack.pop() - - # if stack is empty, dump the frame - if len(self._stack) == 0: - self.flush(frame) - # otherwise, append the frame to the parent - else: - if "__frames" not in self._stack[-1]: - self._stack[-1]["__frames"] = [] - self._stack[-1]["__frames"].append(frame) - - def flush(self, frame: Dict[str, Any]) -> None: - - trace_file = ( - self.root / f"{self._name}.{datetime.now().strftime('%Y%m%d.%H%M%S')}.ptrace" - ) - - with open(trace_file, "w") as f: - json.dump(frame, f, indent=4) + if "__frames" not in self.stack[-1]: + self.stack[-1]["__frames"] = [] + self.stack[-1]["__frames"].append(frame) + + +@contextlib.contextmanager +def console_tracer(name: str) -> Iterator[Callable[[str, Any], None]]: + try: + print(f"Starting {name}") + yield lambda key, value: print(f"{key}:\n{json.dumps(value, indent=4)}") + finally: + print(f"Ending {name}") diff --git a/runtime/prompty/tests/__init__.py b/runtime/prompty/tests/__init__.py index fc08af2..98a00e0 100644 --- a/runtime/prompty/tests/__init__.py +++ b/runtime/prompty/tests/__init__.py @@ -1,5 +1,6 @@ import json from pathlib import Path +from prompty.core import PromptyStream from openai.types.chat import ChatCompletionChunk from prompty import Invoker, Prompty, InvokerFactory from openai.types.chat.chat_completion import ChatCompletion @@ -29,11 +30,12 @@ def invoke(self, data: any) -> any: if self.parameters.get("stream", False): items = json.loads(j) + def generator(): for i in range(1, len(items)): yield ChatCompletionChunk.model_validate(items[i]) - - return generator() + + return PromptyStream("FakeAzureExecutor", generator()) elif self.api == "chat": return ChatCompletion.model_validate_json(j) diff --git a/runtime/prompty/tests/test_common.py b/runtime/prompty/tests/test_common.py index d7207d1..3ed6989 100644 --- a/runtime/prompty/tests/test_common.py +++ b/runtime/prompty/tests/test_common.py @@ -1,8 +1,5 @@ import pytest import prompty -from pathlib import Path - -BASE_PATH = str(Path(__file__).absolute().parent.as_posix()) @pytest.mark.parametrize( diff --git a/runtime/prompty/tests/test_execute.py b/runtime/prompty/tests/test_execute.py index e94303c..ddce980 100644 --- a/runtime/prompty/tests/test_execute.py +++ b/runtime/prompty/tests/test_execute.py @@ -1,13 +1,8 @@ import pytest import prompty -from pathlib import Path - from prompty.tracer import trace -BASE_PATH = str(Path(__file__).absolute().parent.as_posix()) - - @pytest.mark.parametrize( "prompt", [ @@ -126,6 +121,7 @@ def test_function_calling(): ) print(result) + # need to add trace attribute to # materialize stream into the function # trace decorator diff --git a/runtime/prompty/tests/test_factory_invoker.py b/runtime/prompty/tests/test_factory_invoker.py index e51eebb..da517a2 100644 --- a/runtime/prompty/tests/test_factory_invoker.py +++ b/runtime/prompty/tests/test_factory_invoker.py @@ -1,8 +1,6 @@ -import os -from typing import Dict import pytest -from pathlib import Path import prompty +from pathlib import Path from prompty.core import InvokerFactory diff --git a/runtime/prompty/tests/test_path_exec.py b/runtime/prompty/tests/test_path_exec.py index ddae63a..2cbe1cc 100644 --- a/runtime/prompty/tests/test_path_exec.py +++ b/runtime/prompty/tests/test_path_exec.py @@ -1,4 +1,3 @@ -import pytest import prompty from pathlib import Path @@ -9,24 +8,30 @@ def test_prompty_config_local(): p = prompty.load(f"{BASE_PATH}/prompts/sub/sub/basic.prompty") assert p.model.configuration["type"] == "TEST_LOCAL" + def test_prompty_config_global(): p = prompty.load(f"{BASE_PATH}/prompts/sub/basic.prompty") assert p.model.configuration["type"] == "azure" def test_prompty_config_headless(): - p = prompty.headless("embedding", ["this is the first line", "this is the second line"]) + p = prompty.headless( + "embedding", ["this is the first line", "this is the second line"] + ) assert p.model.configuration["type"] == "FROM_CONTENT" + # make sure the prompty path is # relative to the current executing file def test_prompty_relative_local(): from .prompts.test import run + p = run() assert p.name == "Basic Prompt" def test_prompty_relative(): from .prompts.sub.sub.test import run + p = run() assert p.name == "Prompt with complex context" diff --git a/runtime/prompty/tests/test_tracing.py b/runtime/prompty/tests/test_tracing.py new file mode 100644 index 0000000..404c212 --- /dev/null +++ b/runtime/prompty/tests/test_tracing.py @@ -0,0 +1,146 @@ +import pytest +import prompty +from prompty.tracer import trace, Tracer, console_tracer, PromptyTracer + + +@pytest.fixture +def setup_tracing(): + Tracer.add("console", console_tracer) + json_tracer = PromptyTracer() + Tracer.add("console", json_tracer.tracer) + + +@pytest.mark.parametrize( + "prompt", + [ + "prompts/basic.prompty", + "prompts/context.prompty", + "prompts/groundedness.prompty", + "prompts/faithfulness.prompty", + "prompts/embedding.prompty", + ], +) +def test_basic_execution(prompt: str, setup_tracing): + result = prompty.execute(prompt) + print(result) + + +@trace +def get_customer(customerId): + return {"id": customerId, "firstName": "Sally", "lastName": "Davis"} + + +@trace +def get_context(search): + return [ + { + "id": "17", + "name": "RainGuard Hiking Jacket", + "price": 110, + "category": "Hiking Clothing", + "brand": "MountainStyle", + "description": "Introducing the MountainStyle RainGuard Hiking Jacket - the ultimate solution for weatherproof comfort during your outdoor undertakings! Designed with waterproof, breathable fabric, this jacket promises an outdoor experience that's as dry as it is comfortable. The rugged construction assures durability, while the adjustable hood provides a customizable fit against wind and rain. Featuring multiple pockets for safe, convenient storage and adjustable cuffs and hem, you can tailor the jacket to suit your needs on-the-go. And, don't worry about overheating during intense activities - it's equipped with ventilation zippers for increased airflow. Reflective details ensure visibility even during low-light conditions, making it perfect for evening treks. With its lightweight, packable design, carrying it inside your backpack requires minimal effort. With options for men and women, the RainGuard Hiking Jacket is perfect for hiking, camping, trekking and countless other outdoor adventures. Don't let the weather stand in your way - embrace the outdoors with MountainStyle RainGuard Hiking Jacket!", + }, + { + "id": "3", + "name": "Summit Breeze Jacket", + "price": 120, + "category": "Hiking Clothing", + "brand": "MountainStyle", + "description": "Discover the joy of hiking with MountainStyle's Summit Breeze Jacket. This lightweight jacket is your perfect companion for outdoor adventures. Sporting a trail-ready, windproof design and a water-resistant fabric, it's ready to withstand any weather. The breathable polyester material and adjustable cuffs keep you comfortable, whether you're ascending a mountain or strolling through a park. And its sleek black color adds style to function. The jacket features a full-zip front closure, adjustable hood, and secure zippered pockets. Experience the comfort of its inner lining and the convenience of its packable design. Crafted for night trekkers too, the jacket has reflective accents for enhanced visibility. Rugged yet chic, the Summit Breeze Jacket is more than a hiking essential, it's the gear that inspires you to reach new heights. Choose adventure, choose the Summit Breeze Jacket.", + }, + { + "id": "10", + "name": "TrailBlaze Hiking Pants", + "price": 75, + "category": "Hiking Clothing", + "brand": "MountainStyle", + "description": "Meet the TrailBlaze Hiking Pants from MountainStyle, the stylish khaki champions of the trails. These are not just pants; they're your passport to outdoor adventure. Crafted from high-quality nylon fabric, these dapper troopers are lightweight and fast-drying, with a water-resistant armor that laughs off light rain. Their breathable design whisks away sweat while their articulated knees grant you the flexibility of a mountain goat. Zippered pockets guard your essentials, making them a hiker's best ally. Designed with durability for all your trekking trials, these pants come with a comfortable, ergonomic fit that will make you forget you're wearing them. Sneak a peek, and you are sure to be tempted by the sleek allure that is the TrailBlaze Hiking Pants. Your outdoors wardrobe wouldn't be quite complete without them.", + }, + ] + + +@trace +def get_response(customerId, question, prompt): + customer = get_customer(customerId) + context = get_context(question) + + result = prompty.execute( + prompt, + inputs={"question": question, "customer": customer, "documentation": context}, + ) + return {"question": question, "answer": result, "context": context} + + +@trace +def test_context_flow(setup_tracing): + customerId = 1 + question = "tell me about your jackets" + prompt = "context.prompty" + + response = get_response(customerId, question, f"prompts/{prompt}") + print(response) + + +@trace +def evaluate(prompt, evalprompt, customerId, question): + response = get_response(customerId, question, prompt) + + result = prompty.execute( + evalprompt, + inputs=response, + ) + return result + + +@trace +def test_context_groundedness(setup_tracing): + result = evaluate( + "prompts/context.prompty", + "prompts/groundedness.prompty", + 1, + "tell me about your jackets", + ) + print(result) + + +@trace +def test_embedding_headless(setup_tracing): + p = prompty.headless( + api="embedding", + configuration={"type": "azure", "azure_deployment": "text-embedding-ada-002"}, + content="hello world", + ) + emb = prompty.execute(p) + print(emb) + + +@trace +def test_embeddings_headless(setup_tracing): + p = prompty.headless( + api="embedding", + configuration={"type": "azure", "azure_deployment": "text-embedding-ada-002"}, + content=["hello world", "goodbye world", "hello again"], + ) + emb = prompty.execute(p) + print(emb) + + +@trace +def test_function_calling(setup_tracing): + result = prompty.execute( + "prompts/functions.prompty", + ) + print(result) + + +# need to add trace attribute to +# materialize stream into the function +# trace decorator +@trace +def test_streaming(setup_tracing): + result = prompty.execute( + "prompts/streaming.prompty", + ) + for item in result: + print(item)