From 6fce5dbf41390c81b5c1821791aaa5df0e9380d5 Mon Sep 17 00:00:00 2001 From: Vignesh Aigal Date: Thu, 24 Oct 2024 14:44:04 -0700 Subject: [PATCH] Fix sheet run --- llmstack/apps/runner/app_runner.py | 18 +++ llmstack/sheets/apis.py | 58 ++++++++ llmstack/sheets/tasks.py | 232 +++++++++++------------------ 3 files changed, 167 insertions(+), 141 deletions(-) diff --git a/llmstack/apps/runner/app_runner.py b/llmstack/apps/runner/app_runner.py index 9623e5a8900..1ed14e9f415 100644 --- a/llmstack/apps/runner/app_runner.py +++ b/llmstack/apps/runner/app_runner.py @@ -155,6 +155,24 @@ def id(self): return self.app_uuid +class SheetProcessorRunnerSource(PlaygroundAppRunnerSource): + type: AppRunnerSourceType = AppRunnerSourceType.SHEET + sheet_id: str + + @property + def id(self): + return f"{self.provider_slug}/{self.processor_slug}" + + +class SheetStoreAppRunnerSource(StoreAppRunnerSource): + type: AppRunnerSourceType = AppRunnerSourceType.SHEET + sheet_id: str + + @property + def id(self): + return self.slug + + class AppRunnerStreamingResponseType(StrEnum): ERRORS = "errors" OUTPUT = "output" diff --git a/llmstack/sheets/apis.py b/llmstack/sheets/apis.py index fb19c6e49c1..132f865a58b 100644 --- a/llmstack/sheets/apis.py +++ b/llmstack/sheets/apis.py @@ -1,3 +1,4 @@ +import asyncio import csv import io import json @@ -13,6 +14,7 @@ from rest_framework.response import Response as DRFResponse from rq import Callback +from llmstack.apps.runner.app_runner import AppRunnerRequest, SheetStoreAppRunnerSource from llmstack.base.models import Profile from llmstack.common.utils.sslr._client import LLM from llmstack.jobs.adhoc import ProcessingJob @@ -297,6 +299,62 @@ def download_run(self, request, sheet_uuid=None, run_id=None): response["Content-Disposition"] = f'attachment; filename="sheet_{sheet_uuid}_{run_id}.csv"' return response + def _run_until_complete(self, app_runner, input_data, session_id): + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + response = app_runner.run_until_complete( + AppRunnerRequest(client_request_id=str(uuid.uuid4()), session_id=session_id, input=input_data), loop + ) + return response.data.model_dump() + + def _execute_processor_run_cell(self, request, provider_slug, processor_slug, sheet_id, input_data, config_data): + from llmstack.apps.apis import PlaygroundViewSet + from llmstack.apps.runner.app_runner import SheetProcessorRunnerSource + + session_id = str(uuid.uuid4()) + app_runner = PlaygroundViewSet().get_app_runner( + session_id, + source=SheetProcessorRunnerSource( + request_user_email=request.user.email, + request_user=request.user, + provider_slug=provider_slug, + processor_slug=processor_slug, + sheet_id=sheet_id, + ), + request_user=request.user, + input_data=input_data, + config_data=config_data, + ) + run_response = self._run_until_complete(app_runner, input_data, session_id) + if "output" in run_response and run_response.get("chunks", {}).get("processor"): + return {"output": run_response.get("chunks", {}).get("processor")} + elif "errors" in run_response: + return {"errors": run_response.get("errors")} + return {"errors": "Processor run failed."} + + def _execute_app_run_cell(self, request, app_slug, input_data, sheet_id): + from llmstack.app_store.apis import AppStoreAppViewSet + + session_id = str(uuid.uuid4()) + app_runner = AppStoreAppViewSet().get_app_runner( + session_id=session_id, + app_slug=app_slug, + source=SheetStoreAppRunnerSource( + request_user_email=request.user.email, + request_user=request.user, + sheet_id=sheet_id, + slug=app_slug, + ), + request_user=request.user, + ) + + run_response = self._run_until_complete(app_runner, input_data, session_id) + if "output" in run_response and "output" in run_response["output"]: + return {"output": run_response.get("output").get("output")} + elif "errors" in run_response: + return {"errors": run_response.get("errors")} + return {"errors": "App run failed."} + @action(detail=True, methods=["get"]) def list_runs(self, request, sheet_uuid=None): profile = Profile.objects.get(user=request.user) diff --git a/llmstack/sheets/tasks.py b/llmstack/sheets/tasks.py index 997e07f9d32..8b8923c2542 100644 --- a/llmstack/sheets/tasks.py +++ b/llmstack/sheets/tasks.py @@ -1,9 +1,7 @@ -import ast import concurrent import json import logging import re -import uuid from threading import Lock from typing import Any, Dict, List @@ -15,10 +13,9 @@ from django_redis import get_redis_connection from jsonpath_ng import parse -from llmstack.app_store.apis import AppStoreAppViewSet -from llmstack.apps.apis import AppViewSet -from llmstack.common.utils.liquid import render_template -from llmstack.common.utils.utils import hydrate_input, retry_on_db_error +from llmstack.common.utils.liquid import hydrate_input, render_template +from llmstack.common.utils.utils import retry_on_db_error +from llmstack.sheets.apis import PromptlySheetViewSet from llmstack.sheets.models import ( PromptlySheet, PromptlySheetRunEntry, @@ -44,6 +41,26 @@ def number_to_letters(num): return letters +def convert_to_cell_type(cell_type, cell_value): + if cell_type == SheetCellType.OBJECT and isinstance(cell_value, str): + try: + result = json.loads(cell_value) + return result + except json.JSONDecodeError: + return {"data": cell_value} + if cell_type == SheetCellType.TEXT: + return str(cell_value) + return cell_value + + +def is_list_literal(str): + try: + result = json.loads(str) + return isinstance(result, list) + except Exception: + return False + + @retry_on_db_error def _execute_cell( cell: SheetCell, @@ -58,144 +75,84 @@ def _execute_cell( async_to_sync(channel_layer.group_send)(run_id, {"type": "cell.updating", "cell": {"id": cell.cell_id}}) - output = None output_cells = [] formula_type = cell.formula.type formula_data = cell.formula.data spread_output = cell.spread_output + cell_output = "" + cell_error = None - if formula_type == SheetFormulaType.APP_RUN: - app_slug = formula_data.app_slug - input = hydrate_input(formula_data.input, input_values) + request = RequestFactory().post(path="") + request.data = {} + request.user = user - request = RequestFactory().post( - f"/api/store/apps/{app_slug}", - format="json", - ) - request.data = { - "stream": False, - "input": input, - } - request.user = user - - # Execute the app - response = async_to_sync(AppStoreAppViewSet().run_app_internal_async)( - slug=app_slug, - session_id=None, - request_uuid=str(uuid.uuid4()), - request=request, - ) - output = response.get("output", "") - elif formula_type == SheetFormulaType.PROCESSOR_RUN: - api_provider_slug = formula_data.provider_slug - api_backend_slug = formula_data.processor_slug - processor_input = formula_data.input - processor_config = formula_data.config - processor_output_template = formula_data.output_template.get("jsonpath", "") - - input = hydrate_input(processor_input, input_values) - config = hydrate_input(processor_config, input_values) - - request = RequestFactory().post( - f"/api/playground/{api_provider_slug}_{api_backend_slug}/run", - format="json", - ) - request.data = { - "stream": False, - "input": { - "input": input, - "config": config, - "api_provider_slug": api_provider_slug, - "api_backend_slug": api_backend_slug, - }, - } - request.user = user - - # Run the processor - response = async_to_sync(AppViewSet().run_playground_internal_async)( - session_id=None, - request_uuid=str(uuid.uuid4()), - request=request, - preview=False, - ) + try: + if formula_type == SheetFormulaType.DATA_TRANSFORMER: + transformation_template = formula_data.transformation_template + cell_output = render_template(transformation_template, input_values) if transformation_template else "" + elif formula_type == SheetFormulaType.PROCESSOR_RUN: + # Run the processor + processor_run_response = PromptlySheetViewSet()._execute_processor_run_cell( + request=request, + provider_slug=formula_data.provider_slug, + processor_slug=formula_data.processor_slug, + sheet_id=str(sheet.uuid), + input_data=hydrate_input(formula_data.input, input_values), + config_data=hydrate_input(formula_data.config, input_values), + ) - # Render the output template using response.output - if response.get("output"): - try: - processor_output = ast.literal_eval(response.get("output")) - jsonpath_output = [match.value for match in parse(processor_output_template).find(processor_output)] + # Render the output template + if processor_run_response.get("output"): + cell_output = processor_run_response.get("output") + jsonpath_output = [ + match.value + for match in parse(formula_data.output_template.get("jsonpath", "")).find( + processor_run_response["output"] + ) + ] if len(jsonpath_output) == 1: jsonpath_output = jsonpath_output[0] - output = jsonpath_output if cell_type == SheetCellType.OBJECT else str(jsonpath_output) - - except Exception as e: - logger.error(f"Error processing processor output: {e}") - async_to_sync(channel_layer.group_send)( - run_id, {"type": "cell.error", "cell": {"id": cell.cell_id, "error": str(e)}} - ) - output = "" - elif response.get("errors"): - async_to_sync(channel_layer.group_send)( - run_id, {"type": "cell.error", "cell": {"id": cell.cell_id, "error": str(response.get("errors"))}} + cell_output = jsonpath_output[0] if len(jsonpath_output) == 1 else jsonpath_output + elif processor_run_response.get("errors"): + cell_error = str(processor_run_response.get("errors")) + elif formula_type == SheetFormulaType.APP_RUN: + app_run_response = PromptlySheetViewSet()._execute_app_run_cell( + request=request, + app_slug=formula_data.app_slug, + sheet_id=str(sheet.uuid), + input_data=hydrate_input(formula_data.input, input_values), ) - output = "" - else: - output = response.get("output", "") - elif formula_type == SheetFormulaType.DATA_TRANSFORMER: - transformation_template = formula_data.transformation_template - if transformation_template: - try: - output = render_template(transformation_template, input_values) - except Exception as e: - logger.error(f"Error applying transformation template: {e}") - async_to_sync(channel_layer.group_send)( - run_id, {"type": "cell.error", "cell": {"id": cell.cell_id, "error": str(e)}} - ) - else: - output = "" - elif formula_type == SheetFormulaType.AGENT_RUN: - request = RequestFactory().post("/api/platform_apps/run", format="json") - request.data = { - "stream": False, - "app_data": formula_data.model_dump(), - "input": hydrate_input(formula_data.input, input_values), - } - request.user = user - # Run the agent - response = async_to_sync(AppViewSet().run_platform_app_internal_async)( - session_id=None, - request_uuid=str(uuid.uuid4()), - request=request, - preview=False, - ) - # Render the output template using response.output - if response.get("output"): - try: - agent_output = response.get("output") - output = json.loads(agent_output) if cell_type == SheetCellType.OBJECT else str(agent_output) - except Exception as e: - logger.error(f"Error processing processor output: {e}") - async_to_sync(channel_layer.group_send)( - run_id, {"type": "cell.error", "cell": {"id": cell.cell_id, "error": str(e)}} - ) - output = agent_output - elif response.get("errors"): - async_to_sync(channel_layer.group_send)( - run_id, {"type": "cell.error", "cell": {"id": cell.cell_id, "error": str(response.get("errors"))}} + if app_run_response.get("output"): + cell_output = app_run_response.get("output", "") + elif processor_run_response.get("errors"): + cell_error = str(app_run_response.get("errors")) + elif formula_type == SheetFormulaType.AGENT_RUN: + agent_run_response = PromptlySheetViewSet()._execute_app_run_cell( + request=request, + app_slug="sheet-agent", + sheet_id=str(sheet.uuid), + input_data=hydrate_input(formula_data.input, input_values), ) - output = "" - else: - output = response.get("output", "") - - if spread_output and ( - cell_type == SheetCellType.OBJECT - or (isinstance(output, str) and output.startswith("[") and output.endswith("]")) - ): - try: - processed_output = output if cell_type == SheetCellType.OBJECT else ast.literal_eval(output) + if agent_run_response.get("output"): + cell_output = agent_run_response.get("output", "") + elif processor_run_response.get("errors"): + cell_error = str(agent_run_response.get("errors")) + except Exception as e: + logger.exception("Error in running formula") + cell_error = str(e) - if isinstance(processed_output, list) and all(isinstance(item, list) for item in processed_output): + if cell_error: + # Error occurred + async_to_sync(channel_layer.group_send)( + run_id, {"type": "cell.error", "cell": {"id": cell.cell_id, "error": cell_error}} + ) + cell.value = "" + output_cells = [cell] + else: + if spread_output and (isinstance(cell_output, list) or is_list_literal(cell_output)): + processed_output = json.loads(cell_output) if is_list_literal(cell_output) else cell_output + if all(isinstance(item, list) for item in processed_output): output_cells = [ SheetCell( row=cell.row + i, @@ -207,7 +164,7 @@ def _execute_cell( for i, row in enumerate(processed_output) for j, item in enumerate(row) ] - elif isinstance(processed_output, list): + else: output_cells = [ SheetCell( row=cell.row + i, @@ -216,16 +173,9 @@ def _execute_cell( ) for i, item in enumerate(processed_output) ] - else: - cell.value = processed_output if isinstance(processed_output, str) else output - output_cells = [cell] - except Exception as e: - logger.error(f"Error spreading cell output: {e}") - cell.value = str(output) + else: + cell.value = convert_to_cell_type(cell_type, cell_output) output_cells = [cell] - else: - cell.value = output - output_cells = [cell] total_rows = sheet.data.get("total_rows", 0) total_cols = sheet.data.get("total_cols", 0)