-
Notifications
You must be signed in to change notification settings - Fork 14
/
app-agnext.py
103 lines (77 loc) · 3.32 KB
/
app-agnext.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
import streamlit as st
import asyncio
import logging
import os
import json
from datetime import datetime
from dotenv import load_dotenv
load_dotenv()
from magentic_one_helper import MagenticOneHelper
# Initialize session state for instructions
if 'instructions' not in st.session_state:
st.session_state['instructions'] = ""
if 'running' not in st.session_state:
st.session_state['running'] = False
if "final_answer" not in st.session_state:
st.session_state["final_answer"] = None
st.title("MagenticOne Workflow Runner")
st.caption("This app runs a MagenticOne workflow based on the instructions provided.")
# Input for instructions
instructions = st.text_input("Enter your instructions:", value="generate code for 'Hello World' in Python")
run_button = st.button("Run Agents", type="primary")
def display_log_message(log_entry):
# _log_entry_json = json.loads(log_entry)
_log_entry_json = log_entry
_type = _log_entry_json.get("type", None)
_timestamp = _log_entry_json.get("timestamp", None)
_timestamp = datetime.fromisoformat(_timestamp).strftime('%Y-%m-%d %H:%M:%S')
if _type == "OrchestrationEvent":
with st.expander(f"From {_log_entry_json['source']} @ {_timestamp}", expanded=False):
st.write(_log_entry_json["message"])
elif _type == "LLMCallEvent":
st.caption(f'{_timestamp} LLM Call [prompt_tokens: {_log_entry_json["prompt_tokens"]}, completion_tokens: {_log_entry_json["completion_tokens"]}]')
else:
st.caption("Invalid log entry format.")
async def main(task, logs_dir="./logs"):
# create folder for logs if not exists
if not os.path.exists(logs_dir):
os.makedirs(logs_dir)
# Initialize MagenticOne
magnetic_one = MagenticOneHelper(logs_dir=logs_dir)
await magnetic_one.initialize()
print("MagenticOne initialized.")
# Create task and log streaming tasks
task_future = asyncio.create_task(magnetic_one.run_task(task))
final_answer = None
with st.container(border=True):
# Stream and process logs
async for log_entry in magnetic_one.stream_logs():
# print(json.dumps(log_entry, indent=2))
# st.write(json.dumps(log_entry, indent=2))
display_log_message(log_entry=log_entry)
# Wait for task to complete
await task_future
# Get the final answer
final_answer = magnetic_one.get_final_answer()
if final_answer is not None:
print(f"Final answer: {final_answer}")
st.session_state["final_answer"] = final_answer
else:
print("No final answer found in logs.")
st.session_state["final_answer"] = None
st.warning("No final answer found in logs.")
if run_button and instructions:
st.session_state['instructions'] = instructions
st.session_state['running'] = True
st.write("Instructions:", st.session_state['instructions'])
with st.spinner("Running the workflow..."):
# asyncio.run(main("generate code for 'Hello World' in Python"))
asyncio.run(main(st.session_state['instructions']))
final_answer = st.session_state["final_answer"]
if final_answer:
st.success("Task completed successfully.")
st.write("## Final answer:")
st.write(final_answer)
else:
st.error("Task failed.")
st.write("Final answer not found.")