Skip to content

Commit

Permalink
Fix sheet run
Browse files Browse the repository at this point in the history
  • Loading branch information
vegito22 committed Oct 24, 2024
1 parent 9a98298 commit 6fce5db
Show file tree
Hide file tree
Showing 3 changed files with 167 additions and 141 deletions.
18 changes: 18 additions & 0 deletions llmstack/apps/runner/app_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,24 @@ def id(self):
return self.app_uuid


class SheetProcessorRunnerSource(PlaygroundAppRunnerSource):
type: AppRunnerSourceType = AppRunnerSourceType.SHEET
sheet_id: str

@property
def id(self):
return f"{self.provider_slug}/{self.processor_slug}"


class SheetStoreAppRunnerSource(StoreAppRunnerSource):
type: AppRunnerSourceType = AppRunnerSourceType.SHEET
sheet_id: str

@property
def id(self):
return self.slug


class AppRunnerStreamingResponseType(StrEnum):
ERRORS = "errors"
OUTPUT = "output"
Expand Down
58 changes: 58 additions & 0 deletions llmstack/sheets/apis.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import asyncio
import csv
import io
import json
Expand All @@ -13,6 +14,7 @@
from rest_framework.response import Response as DRFResponse
from rq import Callback

from llmstack.apps.runner.app_runner import AppRunnerRequest, SheetStoreAppRunnerSource
from llmstack.base.models import Profile
from llmstack.common.utils.sslr._client import LLM
from llmstack.jobs.adhoc import ProcessingJob
Expand Down Expand Up @@ -297,6 +299,62 @@ def download_run(self, request, sheet_uuid=None, run_id=None):
response["Content-Disposition"] = f'attachment; filename="sheet_{sheet_uuid}_{run_id}.csv"'
return response

def _run_until_complete(self, app_runner, input_data, session_id):
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
response = app_runner.run_until_complete(
AppRunnerRequest(client_request_id=str(uuid.uuid4()), session_id=session_id, input=input_data), loop
)
return response.data.model_dump()

def _execute_processor_run_cell(self, request, provider_slug, processor_slug, sheet_id, input_data, config_data):
from llmstack.apps.apis import PlaygroundViewSet
from llmstack.apps.runner.app_runner import SheetProcessorRunnerSource

session_id = str(uuid.uuid4())
app_runner = PlaygroundViewSet().get_app_runner(
session_id,
source=SheetProcessorRunnerSource(
request_user_email=request.user.email,
request_user=request.user,
provider_slug=provider_slug,
processor_slug=processor_slug,
sheet_id=sheet_id,
),
request_user=request.user,
input_data=input_data,
config_data=config_data,
)
run_response = self._run_until_complete(app_runner, input_data, session_id)
if "output" in run_response and run_response.get("chunks", {}).get("processor"):
return {"output": run_response.get("chunks", {}).get("processor")}
elif "errors" in run_response:
return {"errors": run_response.get("errors")}
return {"errors": "Processor run failed."}

def _execute_app_run_cell(self, request, app_slug, input_data, sheet_id):
from llmstack.app_store.apis import AppStoreAppViewSet

session_id = str(uuid.uuid4())
app_runner = AppStoreAppViewSet().get_app_runner(
session_id=session_id,
app_slug=app_slug,
source=SheetStoreAppRunnerSource(
request_user_email=request.user.email,
request_user=request.user,
sheet_id=sheet_id,
slug=app_slug,
),
request_user=request.user,
)

run_response = self._run_until_complete(app_runner, input_data, session_id)
if "output" in run_response and "output" in run_response["output"]:
return {"output": run_response.get("output").get("output")}
elif "errors" in run_response:
return {"errors": run_response.get("errors")}
return {"errors": "App run failed."}

@action(detail=True, methods=["get"])
def list_runs(self, request, sheet_uuid=None):
profile = Profile.objects.get(user=request.user)
Expand Down
232 changes: 91 additions & 141 deletions llmstack/sheets/tasks.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
import ast
import concurrent
import json
import logging
import re
import uuid
from threading import Lock
from typing import Any, Dict, List

Expand All @@ -15,10 +13,9 @@
from django_redis import get_redis_connection
from jsonpath_ng import parse

from llmstack.app_store.apis import AppStoreAppViewSet
from llmstack.apps.apis import AppViewSet
from llmstack.common.utils.liquid import render_template
from llmstack.common.utils.utils import hydrate_input, retry_on_db_error
from llmstack.common.utils.liquid import hydrate_input, render_template
from llmstack.common.utils.utils import retry_on_db_error
from llmstack.sheets.apis import PromptlySheetViewSet
from llmstack.sheets.models import (
PromptlySheet,
PromptlySheetRunEntry,
Expand All @@ -44,6 +41,26 @@ def number_to_letters(num):
return letters


def convert_to_cell_type(cell_type, cell_value):
if cell_type == SheetCellType.OBJECT and isinstance(cell_value, str):
try:
result = json.loads(cell_value)
return result
except json.JSONDecodeError:
return {"data": cell_value}
if cell_type == SheetCellType.TEXT:
return str(cell_value)
return cell_value


def is_list_literal(str):
try:
result = json.loads(str)
return isinstance(result, list)
except Exception:
return False


@retry_on_db_error
def _execute_cell(
cell: SheetCell,
Expand All @@ -58,144 +75,84 @@ def _execute_cell(

async_to_sync(channel_layer.group_send)(run_id, {"type": "cell.updating", "cell": {"id": cell.cell_id}})

output = None
output_cells = []
formula_type = cell.formula.type
formula_data = cell.formula.data
spread_output = cell.spread_output
cell_output = ""
cell_error = None

if formula_type == SheetFormulaType.APP_RUN:
app_slug = formula_data.app_slug
input = hydrate_input(formula_data.input, input_values)
request = RequestFactory().post(path="")
request.data = {}
request.user = user

request = RequestFactory().post(
f"/api/store/apps/{app_slug}",
format="json",
)
request.data = {
"stream": False,
"input": input,
}
request.user = user

# Execute the app
response = async_to_sync(AppStoreAppViewSet().run_app_internal_async)(
slug=app_slug,
session_id=None,
request_uuid=str(uuid.uuid4()),
request=request,
)
output = response.get("output", "")
elif formula_type == SheetFormulaType.PROCESSOR_RUN:
api_provider_slug = formula_data.provider_slug
api_backend_slug = formula_data.processor_slug
processor_input = formula_data.input
processor_config = formula_data.config
processor_output_template = formula_data.output_template.get("jsonpath", "")

input = hydrate_input(processor_input, input_values)
config = hydrate_input(processor_config, input_values)

request = RequestFactory().post(
f"/api/playground/{api_provider_slug}_{api_backend_slug}/run",
format="json",
)
request.data = {
"stream": False,
"input": {
"input": input,
"config": config,
"api_provider_slug": api_provider_slug,
"api_backend_slug": api_backend_slug,
},
}
request.user = user

# Run the processor
response = async_to_sync(AppViewSet().run_playground_internal_async)(
session_id=None,
request_uuid=str(uuid.uuid4()),
request=request,
preview=False,
)
try:
if formula_type == SheetFormulaType.DATA_TRANSFORMER:
transformation_template = formula_data.transformation_template
cell_output = render_template(transformation_template, input_values) if transformation_template else ""
elif formula_type == SheetFormulaType.PROCESSOR_RUN:
# Run the processor
processor_run_response = PromptlySheetViewSet()._execute_processor_run_cell(
request=request,
provider_slug=formula_data.provider_slug,
processor_slug=formula_data.processor_slug,
sheet_id=str(sheet.uuid),
input_data=hydrate_input(formula_data.input, input_values),
config_data=hydrate_input(formula_data.config, input_values),
)

# Render the output template using response.output
if response.get("output"):
try:
processor_output = ast.literal_eval(response.get("output"))
jsonpath_output = [match.value for match in parse(processor_output_template).find(processor_output)]
# Render the output template
if processor_run_response.get("output"):
cell_output = processor_run_response.get("output")
jsonpath_output = [
match.value
for match in parse(formula_data.output_template.get("jsonpath", "")).find(
processor_run_response["output"]
)
]
if len(jsonpath_output) == 1:
jsonpath_output = jsonpath_output[0]

output = jsonpath_output if cell_type == SheetCellType.OBJECT else str(jsonpath_output)

except Exception as e:
logger.error(f"Error processing processor output: {e}")
async_to_sync(channel_layer.group_send)(
run_id, {"type": "cell.error", "cell": {"id": cell.cell_id, "error": str(e)}}
)
output = ""
elif response.get("errors"):
async_to_sync(channel_layer.group_send)(
run_id, {"type": "cell.error", "cell": {"id": cell.cell_id, "error": str(response.get("errors"))}}
cell_output = jsonpath_output[0] if len(jsonpath_output) == 1 else jsonpath_output
elif processor_run_response.get("errors"):
cell_error = str(processor_run_response.get("errors"))
elif formula_type == SheetFormulaType.APP_RUN:
app_run_response = PromptlySheetViewSet()._execute_app_run_cell(
request=request,
app_slug=formula_data.app_slug,
sheet_id=str(sheet.uuid),
input_data=hydrate_input(formula_data.input, input_values),
)
output = ""
else:
output = response.get("output", "")
elif formula_type == SheetFormulaType.DATA_TRANSFORMER:
transformation_template = formula_data.transformation_template
if transformation_template:
try:
output = render_template(transformation_template, input_values)
except Exception as e:
logger.error(f"Error applying transformation template: {e}")
async_to_sync(channel_layer.group_send)(
run_id, {"type": "cell.error", "cell": {"id": cell.cell_id, "error": str(e)}}
)
else:
output = ""
elif formula_type == SheetFormulaType.AGENT_RUN:
request = RequestFactory().post("/api/platform_apps/run", format="json")
request.data = {
"stream": False,
"app_data": formula_data.model_dump(),
"input": hydrate_input(formula_data.input, input_values),
}
request.user = user
# Run the agent
response = async_to_sync(AppViewSet().run_platform_app_internal_async)(
session_id=None,
request_uuid=str(uuid.uuid4()),
request=request,
preview=False,
)
# Render the output template using response.output
if response.get("output"):
try:
agent_output = response.get("output")
output = json.loads(agent_output) if cell_type == SheetCellType.OBJECT else str(agent_output)
except Exception as e:
logger.error(f"Error processing processor output: {e}")
async_to_sync(channel_layer.group_send)(
run_id, {"type": "cell.error", "cell": {"id": cell.cell_id, "error": str(e)}}
)
output = agent_output
elif response.get("errors"):
async_to_sync(channel_layer.group_send)(
run_id, {"type": "cell.error", "cell": {"id": cell.cell_id, "error": str(response.get("errors"))}}
if app_run_response.get("output"):
cell_output = app_run_response.get("output", "")
elif processor_run_response.get("errors"):
cell_error = str(app_run_response.get("errors"))
elif formula_type == SheetFormulaType.AGENT_RUN:
agent_run_response = PromptlySheetViewSet()._execute_app_run_cell(
request=request,
app_slug="sheet-agent",
sheet_id=str(sheet.uuid),
input_data=hydrate_input(formula_data.input, input_values),
)
output = ""
else:
output = response.get("output", "")

if spread_output and (
cell_type == SheetCellType.OBJECT
or (isinstance(output, str) and output.startswith("[") and output.endswith("]"))
):
try:
processed_output = output if cell_type == SheetCellType.OBJECT else ast.literal_eval(output)
if agent_run_response.get("output"):
cell_output = agent_run_response.get("output", "")
elif processor_run_response.get("errors"):
cell_error = str(agent_run_response.get("errors"))
except Exception as e:
logger.exception("Error in running formula")
cell_error = str(e)

if isinstance(processed_output, list) and all(isinstance(item, list) for item in processed_output):
if cell_error:
# Error occurred
async_to_sync(channel_layer.group_send)(
run_id, {"type": "cell.error", "cell": {"id": cell.cell_id, "error": cell_error}}
)
cell.value = ""
output_cells = [cell]
else:
if spread_output and (isinstance(cell_output, list) or is_list_literal(cell_output)):
processed_output = json.loads(cell_output) if is_list_literal(cell_output) else cell_output
if all(isinstance(item, list) for item in processed_output):
output_cells = [
SheetCell(
row=cell.row + i,
Expand All @@ -207,7 +164,7 @@ def _execute_cell(
for i, row in enumerate(processed_output)
for j, item in enumerate(row)
]
elif isinstance(processed_output, list):
else:
output_cells = [
SheetCell(
row=cell.row + i,
Expand All @@ -216,16 +173,9 @@ def _execute_cell(
)
for i, item in enumerate(processed_output)
]
else:
cell.value = processed_output if isinstance(processed_output, str) else output
output_cells = [cell]
except Exception as e:
logger.error(f"Error spreading cell output: {e}")
cell.value = str(output)
else:
cell.value = convert_to_cell_type(cell_type, cell_output)
output_cells = [cell]
else:
cell.value = output
output_cells = [cell]

total_rows = sheet.data.get("total_rows", 0)
total_cols = sheet.data.get("total_cols", 0)
Expand Down

0 comments on commit 6fce5db

Please sign in to comment.