Skip to content

Commit

Permalink
added async tracing support, cleaned up tracing code
Browse files Browse the repository at this point in the history
  • Loading branch information
sethjuarez committed Jul 26, 2024
1 parent 0fa7cc9 commit 755e991
Showing 1 changed file with 67 additions and 33 deletions.
100 changes: 67 additions & 33 deletions runtime/prompty/prompty/tracer.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,58 +89,92 @@ def to_dict(cls, obj: Any) -> Dict[str, Any]:
return str(obj)


def trace(func: Callable = None, *, description: str = None) -> Callable:
if func is None:
return partial(trace, description=description)
def _name(func: Callable, args):
if hasattr(func, "__qualname__"):
signature = f"{func.__module__}.{func.__qualname__}"
else:
signature = f"{func.__module__}.{func.__name__}"

# core invoker gets special treatment
core_invoker = signature == "prompty.core.Invoker.__call__"
if core_invoker:
name = type(args[0]).__name__
signature = f"{args[0].__module__}.{args[0].__class__.__name__}.invoke"
else:
name = func.__name__

return name, signature


def _inputs(func: Callable, args, kwargs) -> dict:
ba = inspect.signature(func).bind(*args, **kwargs)
ba.apply_defaults()

inputs = {k: Trace.to_dict(v) for k, v in ba.arguments.items() if k != "self"}

return inputs

def _results(result: Any) -> dict:
return {
"result": Trace.to_dict(result) if result is not None else "None",
}

def _trace_sync(func: Callable = None, *, description: str = None) -> Callable:
description = description or ""

@wraps(func)
def wrapper(*args, **kwargs):
if hasattr(func, "__qualname__"):
signature = f"{func.__module__}.{func.__qualname__}"
else:
signature = f"{func.__module__}.{func.__name__}"
name, signature = _name(func, args)
Trace.start(name)
Trace.add("signature", signature)
if description and description != "":
Trace.add("description", description)

# core invoker gets special treatment
core_invoker = signature == "prompty.core.Invoker.__call__"
if core_invoker:
name = type(args[0]).__name__
else:
name = func.__name__
inputs = _inputs(func, args, kwargs)
Trace.add("inputs", inputs)

Trace.start(name)
result = func(*args, **kwargs)
Trace.add("result", _results(result))

if core_invoker:
Trace.add(
"signature",
f"{args[0].__module__}.{args[0].__class__.__name__}.invoke",
)
else:
Trace.add("signature", signature)
Trace.end()

if len(description) > 0:
Trace.add("description", description)
return result

return wrapper

ba = inspect.signature(func).bind(*args, **kwargs)
ba.apply_defaults()
def _trace_async(func: Callable = None, *, description: str = None) -> Callable:
description = description or ""

inputs = {k: Trace.to_dict(v) for k, v in ba.arguments.items() if k != "self"}
@wraps(func)
async def wrapper(*args, **kwargs):
name, signature = _name(func, args)
Trace.start(name)
Trace.add("signature", signature)
if description and description != "":
Trace.add("description", description)

Trace.add("input", Trace.to_dict(inputs))
result = func(*args, **kwargs)
inputs = _inputs(func, args, kwargs)
Trace.add("inputs", inputs)

Trace.add(
"result",
Trace.to_dict(result) if result is not None else "None",
)
result = await func(*args, **kwargs)
Trace.add("result", _results(result))

Trace.end()

return result

return wrapper

def trace(func: Callable = None, *, description: str = None) -> Callable:
if func is None:
return partial(trace, description=description)

wrapped_method = (
_trace_async if inspect.iscoroutinefunction(func) else _trace_sync
)

return wrapped_method(func, description=description)


class PromptyTracer(Tracer):
_stack: List[Dict[str, Any]] = []
Expand Down

0 comments on commit 755e991

Please sign in to comment.