Skip to content

Commit

Permalink
Merge pull request #410 from cheshire-cat-ai/develop
Browse files Browse the repository at this point in the history
Rabbithole: progress percentages + bug fixes on URL ingestion
  • Loading branch information
pieroit authored Aug 11, 2023
2 parents da658f6 + 85e2cf3 commit 486684f
Show file tree
Hide file tree
Showing 5 changed files with 112 additions and 46 deletions.
43 changes: 33 additions & 10 deletions core/cat/rabbit_hole.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import os
import time
import math
import json
import mimetypes
from typing import List, Union
Expand Down Expand Up @@ -135,6 +136,7 @@ def ingest_file(
filename = file
else:
filename = file.filename

self.store_documents(docs=docs, source=filename)

def file_to_docs(
Expand Down Expand Up @@ -189,7 +191,7 @@ def file_to_docs(
source = file

# Make a request with a fake browser name
request = Request(file, headers={'User-Agent': "Magic Browser"})
request = Request(file, headers={"User-Agent": "Magic Browser"})

try:
# Get binary content of url
Expand All @@ -214,16 +216,36 @@ def file_to_docs(
mimetype=content_type,
source=source).from_data(data=file_bytes,
mime_type=content_type)

# Parser based on the mime type
parser = MimeTypeBasedParser(handlers=self.file_handlers)

# Parse the text
self.send_rabbit_thought("I'm parsing the content. Big content could require some minutes...")
text = parser.parse(blob)

self.send_rabbit_thought(f"Parsing completed. Now let's go with reading process...")
docs = self.split_text(text, chunk_size, chunk_overlap)
return docs

def send_rabbit_thought(self, thought):
"""Append a message to the notification list.
This method receive a string and create the message to append to the list of notifications.
Parameters
----------
thought : str
Text of the message to append to the notification list.
"""

self.cat.web_socket_notifications.append({
"error": False,
"type": "notification",
"content": thought,
"why": {},
})


def store_documents(self, docs: List[Document], source: str) -> None:
"""Add documents to the Cat's declarative memory.
Expand Down Expand Up @@ -255,7 +277,14 @@ def store_documents(self, docs: List[Document], source: str) -> None:
)

# classic embed
time_last_notification = time.time()
time_interval = 10 # a notification every 10 secs
for d, doc in enumerate(docs):
if time.time() - time_last_notification > time_interval:
time_last_notification = time.time()
perc_read = int( d / len(docs) * 100 )
self.send_rabbit_thought(f"Read {perc_read}% of {source}")

doc.metadata["source"] = source
doc.metadata["when"] = time.time()
doc = self.cat.mad_hatter.execute_hook(
Expand All @@ -279,14 +308,8 @@ def store_documents(self, docs: List[Document], source: str) -> None:
# notify client
finished_reading_message = f"Finished reading {source}, " \
f"I made {len(docs)} thoughts on it."
self.cat.web_socket_notifications.append(
{
"error": False,
"type": "notification",
"content": finished_reading_message,
"why": {},
}
)

self.send_rabbit_thought(finished_reading_message)

print(f"\n\nDone uploading {source}")

Expand Down
2 changes: 1 addition & 1 deletion core/cat/routes/plugins.py
Original file line number Diff line number Diff line change
Expand Up @@ -285,4 +285,4 @@ async def upsert_plugin_settings(
return {
"name": plugin_id,
"value": final_settings
}
}
7 changes: 6 additions & 1 deletion core/cat/routes/upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,14 @@ async def upload_url(
"""Upload a url. Website content will be extracted and segmented into chunks.
Chunks will be then vectorized and stored into documents memory."""
# check that URL is valid

try:
# Send a HEAD request to the specified URL
response = requests.head(url)
response = requests.head(
url,
headers={"User-Agent": "Magic Browser"},
allow_redirects=True
)
status_code = response.status_code

if status_code == 200:
Expand Down
104 changes: 71 additions & 33 deletions core/cat/routes/websocket.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,69 +7,107 @@

router = APIRouter()

# This constant sets the interval (in seconds) at which the system checks for notifications.
NOTIFICATION_CHECK_INTERVAL = 1 # seconds


class ConnectionManager:
"""
Manages active WebSocket connections.
"""

def __init__(self):
# List to store all active WebSocket connections.
self.active_connections: list[WebSocket] = []

async def connect(self, websocket: WebSocket):
"""
Accept the incoming WebSocket connection and add it to the active connections list.
"""
await websocket.accept()
self.active_connections.append(websocket)

def disconnect(self, websocket: WebSocket):
"""
Remove the given WebSocket from the active connections list.
"""
self.active_connections.remove(websocket)

async def send_personal_message(self, message: str, websocket: WebSocket):
"""
Send a personal message (in JSON format) to the specified WebSocket.
"""
await websocket.send_json(message)

async def broadcast(self, message: str):
"""
Send a message to all active WebSocket connections.
"""
for connection in self.active_connections:
await connection.send_json(message)


manager = ConnectionManager()

# main loop via websocket
@router.websocket_route("/ws")
async def websocket_endpoint(websocket: WebSocket):
ccat = websocket.app.state.ccat

await manager.connect(websocket)
async def receive_message(websocket: WebSocket, ccat: object):
"""
Continuously receive messages from the WebSocket and forward them to the `ccat` object for processing.
"""
while True:
user_message = await websocket.receive_json()

# Run the `ccat` object's method in a threadpool since it might be a CPU-bound operation.
cat_message = await run_in_threadpool(ccat, user_message)

async def receive_message():
while True:
# message received from specific user
user_message = await websocket.receive_json()
# Send the response message back to the user.
await manager.send_personal_message(cat_message, websocket)

# get response from the cat
cat_message = await run_in_threadpool(ccat, user_message)

# send output to specific user
await manager.send_personal_message(cat_message, websocket)
async def check_notification(websocket: WebSocket, ccat: object):
"""
Periodically check if there are any new notifications from the `ccat` object and send them to the user.
"""
while True:
if ccat.web_socket_notifications:
# extract from FIFO list websocket notification
notification = ccat.web_socket_notifications.pop(0)
await manager.send_personal_message(notification, websocket)

async def check_notification():
while True:
# chat notifications (i.e. finished uploading)
if len(ccat.web_socket_notifications) > 0:
# extract from FIFO list websocket notification
notification = ccat.web_socket_notifications.pop(0)
await manager.send_personal_message(notification, websocket)
# Sleep for the specified interval before checking for notifications again.
await asyncio.sleep(NOTIFICATION_CHECK_INTERVAL)

await asyncio.sleep(1) # wait for 1 seconds before checking again

@router.websocket_route("/ws")
async def websocket_endpoint(websocket: WebSocket):
"""
Endpoint to handle incoming WebSocket connections, process messages, and check for notifications.
"""

# Retrieve the `ccat` instance from the application's state.
ccat = websocket.app.state.ccat

# Add the new WebSocket connection to the manager.
await manager.connect(websocket)

try:
await asyncio.gather(receive_message(), check_notification())
# Process messages and check for notifications concurrently.
await asyncio.gather(
receive_message(websocket, ccat),
check_notification(websocket, ccat)
)
except WebSocketDisconnect:
manager.disconnect(websocket)
# Handle the event where the user disconnects their WebSocket.
log("WebSocket connection closed", "INFO")
except Exception as e:
# Log any unexpected errors and send an error message back to the user.
log(e, "ERROR")
traceback.print_exc()

# send error to specific user
await manager.send_personal_message(
{
"type": "error",
"name": type(e).__name__,
"description": str(e),
},
websocket
)
await manager.send_personal_message({
"type": "error",
"name": type(e).__name__,
"description": str(e),
}, websocket)
finally:
# Always ensure the WebSocket is removed from the manager, regardless of how the above block exits.
manager.disconnect(websocket)
2 changes: 1 addition & 1 deletion core/pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[project]
name = "Cheshire-Cat"
description = "Open source and customizable AI architecture"
version = "1.0.0"
version = "1.0.1"
requires-python = ">=3.10"
license = { file="LICENSE" }
authors = [
Expand Down

0 comments on commit 486684f

Please sign in to comment.