Skip to content

Commit

Permalink
modify task operator to run parallel tasks
Browse files Browse the repository at this point in the history
  • Loading branch information
dewmal committed Aug 25, 2024
1 parent dc36f34 commit cd5644a
Showing 1 changed file with 32 additions and 39 deletions.
71 changes: 32 additions & 39 deletions bindings/ceylon/ceylon/task/task_operation.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from loguru import logger
from pydantic import BaseModel
from pydantic import Field
import asyncio


class SubTask(BaseModel):
Expand Down Expand Up @@ -93,14 +94,12 @@ def _create_dependency_graph(self) -> nx.DiGraph:
def validate_sub_tasks(self) -> bool:
subtask_names = set(self.subtasks.keys())

# Check if all dependencies are present
for subtask in self.subtasks.values():
if not subtask.depends_on.issubset(subtask_names):
missing_deps = subtask.depends_on - subtask_names
logger.info(f"Subtask '{subtask.name}' has missing dependencies: {missing_deps}")
return False

# Check for circular dependencies
try:
self._validate_dependencies()
except ValueError as e:
Expand All @@ -113,12 +112,13 @@ def get_execution_order(self) -> List[str]:
graph = self._create_dependency_graph()
return list(nx.topological_sort(graph))

def get_next_subtask(self) -> Optional[Tuple[str, SubTask]]:
def get_ready_subtasks(self) -> List[Tuple[str, SubTask]]:
ready_subtasks = []
for subtask_name in self.execution_order:
subtask = self.subtasks[subtask_name]
if all(self.subtasks[dep].completed for dep in subtask.depends_on):
return (subtask_name, subtask)
return None
if not subtask.completed and all(self.subtasks[dep].completed for dep in subtask.depends_on):
ready_subtasks.append((subtask_name, subtask))
return ready_subtasks

def update_subtask_status(self, subtask_name: str, result: str):
if subtask_name not in self.subtasks:
Expand All @@ -128,9 +128,6 @@ def update_subtask_status(self, subtask_name: str, result: str):
if result is not None:
subtask.complete(result)

if subtask_name in self.execution_order:
self.execution_order.remove(subtask_name)

def update_subtask_executor(self, subtask_name: str, executor: str) -> SubTask:
if subtask_name not in self.subtasks:
raise ValueError(f"Subtask {subtask_name} not found")
Expand Down Expand Up @@ -197,41 +194,35 @@ class TaskResult(BaseModel):
final_answer: str


if __name__ == "__main__":
def execute_task(task: Task) -> None:
while True:
# Get the next subtask
next_subtask: Optional[tuple[str, SubTask]] = task.get_next_subtask()
async def execute_subtask(subtask_name: str, subtask: SubTask) -> str:
print(f"Executing: {subtask}")
# Simulate subtask execution with a delay
await asyncio.sleep(1)
return f"Success: {subtask_name}"

# If there are no more subtasks, break the loop
if next_subtask is None:
break

subtask_name, subtask = next_subtask
print(f"Executing: {subtask}")
async def execute_task(task: Task) -> None:
while not task.is_completed():
ready_subtasks = task.get_ready_subtasks()
print(f"Ready subtasks: {ready_subtasks}")

# Here you would actually execute the subtask
# For this example, we'll simulate execution with a simple print statement
print(f"Simulating execution of {subtask_name}")
if not ready_subtasks:
await asyncio.sleep(0.1)
continue

# Simulate a result (in a real scenario, this would be the outcome of the subtask execution)
result = "Success"
# Execute ready subtasks in parallel
subtask_coroutines = [execute_subtask(name, subtask) for name, subtask in ready_subtasks]
results = await asyncio.gather(*subtask_coroutines)

# Update the subtask status
# Update task status
for (subtask_name, _), result in zip(ready_subtasks, results):
task.update_subtask_status(subtask_name, result)
print(f"Completed: {subtask_name}")

# Check if the entire task is completed
if task.is_completed():
print("All subtasks completed successfully!")
break

# Final check to see if all subtasks were completed
if task.is_completed():
print("Task execution completed successfully!")
else:
print("Task execution incomplete. Some subtasks may have failed.")
print("All subtasks completed successfully!")


if __name__ == "__main__":
# Create a task with initial subtasks
web_app = Task.create_task("Build Web App", "Create a simple web application",
subtasks=[
Expand All @@ -242,6 +233,9 @@ def execute_task(task: Task) -> None:
SubTask(name="testing", description="Perform unit and integration tests",
depends_on={"backend", "frontend"},
required_specialty="Knowledge about testing tools"),
SubTask(name="qa_test_cases", description="Perform unit and integration tests",
depends_on={"backend", "frontend"},
required_specialty="Knowledge about testing tools"),
SubTask(name="frontend", description="Develop the frontend UI",
depends_on={"setup", "backend"},
required_specialty="Knowledge about frontend tools"),
Expand All @@ -255,24 +249,23 @@ def execute_task(task: Task) -> None:
depends_on={"deployment"},
required_specialty="Knowledge about delivery tools"),
SubTask(name="qa", description="Perform quality assurance",
depends_on={"testing"},
depends_on={"testing", "qa_test_cases"},
required_specialty="Knowledge about testing tools")
])

# Execute the task
print("Execution order:", [web_app.subtasks[task_id].name for task_id in web_app.get_execution_order()])

if web_app.validate_sub_tasks():
print("Subtasks are valid")

print("\nExecuting task:")
execute_task(task=web_app)
asyncio.run(execute_task(task=web_app))

print("\nFinal task status:")
print(web_app)
else:
print("Subtasks are invalid")

# Serialization example
# Serialization example
print("\nSerialized Task:")
print(web_app.model_dump_json(indent=2))

0 comments on commit cd5644a

Please sign in to comment.