Skip to content

Commit

Permalink
Merge pull request #444 from aurelio-labs/vittorio/is_synced-method
Browse files Browse the repository at this point in the history
feat: is_synced method for Pinecone Index
  • Loading branch information
jamescalam authored Oct 4, 2024
2 parents ce3155d + 23ecf40 commit b2dcb6d
Show file tree
Hide file tree
Showing 4 changed files with 100 additions and 12 deletions.
8 changes: 7 additions & 1 deletion docs/source/route_layer/sync.rst
Original file line number Diff line number Diff line change
Expand Up @@ -67,4 +67,10 @@ You can try this yourself by running the following:
rl = RouteLayer(encoder=encoder, routes=routes, index=pc_index)
When initializing the `PineconeIndex` object, we can specify the `sync` parameter.
When initializing the `PineconeIndex` object, we can specify the `sync` parameter.

Checking for Synchronization
----------------------------

To verify whether the local and remote instances are synchronized, you can use the `is_synced` method. This method checks if the routes, utterances, and associated metadata in the local instance match those stored in the remote index.
Consider that if the `sync` flag is not set (e.g. for indexes different from Pinecone), it raises an error. If the index supports sync feature and everything aligns, it returns `True`, indicating that the local and remote instances are synchronized, otherwise it returns `False`.
13 changes: 13 additions & 0 deletions semantic_router/index/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,19 @@ def delete_index(self):
"""
raise NotImplementedError("This method should be implemented by subclasses.")

def is_synced(
self,
local_route_names: List[str],
local_utterances_list: List[str],
local_function_schemas_list: List[Dict[str, Any]],
local_metadata_list: List[Dict[str, Any]],
) -> bool:
"""
Checks whether local and remote index are synchronized.
This method should be implemented by subclasses.
"""
raise NotImplementedError("This method should be implemented by subclasses.")

def _sync_index(
self,
local_route_names: List[str],
Expand Down
80 changes: 69 additions & 11 deletions semantic_router/index/pinecone.py
Original file line number Diff line number Diff line change
Expand Up @@ -215,21 +215,14 @@ async def _init_async_index(self, force_create: bool = False):
logger.warning("Index could not be initialized.")
self.host = index_stats["host"] if index_stats else None

def _sync_index(
def _format_routes_dict_for_sync(
self,
local_route_names: List[str],
local_utterances_list: List[str],
local_function_schemas_list: List[Dict[str, Any]],
local_metadata_list: List[Dict[str, Any]],
dimensions: int,
) -> Tuple[List, List, Dict]:
if self.index is None:
self.dimensions = self.dimensions or dimensions
self.index = self._init_index(force_create=True)

remote_routes = self.get_routes()

# Create remote dictionary for storing utterances and metadata
remote_routes: List[Tuple],
) -> Tuple[Dict, Dict]:
remote_dict: Dict[str, Dict[str, Any]] = {
route: {
"utterances": set(),
Expand All @@ -241,7 +234,6 @@ def _sync_index(
for route, utterance, function_schemas, metadata in remote_routes:
remote_dict[route]["utterances"].add(utterance)

# Create local dictionary for storing utterances and metadata
local_dict: Dict[str, Dict[str, Any]] = {}
for route, utterance, function_schemas, metadata in zip(
local_route_names,
Expand All @@ -259,6 +251,72 @@ def _sync_index(
local_dict[route]["function_schemas"] = function_schemas
local_dict[route]["metadata"] = metadata

return local_dict, remote_dict

def is_synced(
self,
local_route_names: List[str],
local_utterances_list: List[str],
local_function_schemas_list: List[Dict[str, Any]],
local_metadata_list: List[Dict[str, Any]],
) -> bool:
remote_routes = self.get_routes()

local_dict, remote_dict = self._format_routes_dict_for_sync(
local_route_names,
local_utterances_list,
local_function_schemas_list,
local_metadata_list,
remote_routes,
)
logger.info(f"LOCAL: {local_dict}")
logger.info(f"REMOTE: {remote_dict}")

all_routes = set(remote_dict.keys()).union(local_dict.keys())

for route in all_routes:
local_utterances = local_dict.get(route, {}).get("utterances", set())
remote_utterances = remote_dict.get(route, {}).get("utterances", set())
local_function_schemas = (
local_dict.get(route, {}).get("function_schemas", {}) or {}
)
remote_function_schemas = (
remote_dict.get(route, {}).get("function_schemas", {}) or {}
)
local_metadata = local_dict.get(route, {}).get("metadata", {})
remote_metadata = remote_dict.get(route, {}).get("metadata", {})

if (
local_utterances != remote_utterances
or local_function_schemas != remote_function_schemas
or local_metadata != remote_metadata
):
return False

return True

def _sync_index(
self,
local_route_names: List[str],
local_utterances_list: List[str],
local_function_schemas_list: List[Dict[str, Any]],
local_metadata_list: List[Dict[str, Any]],
dimensions: int,
) -> Tuple[List, List, Dict]:
if self.index is None:
self.dimensions = self.dimensions or dimensions
self.index = self._init_index(force_create=True)

remote_routes = self.get_routes()

local_dict, remote_dict = self._format_routes_dict_for_sync(
local_route_names,
local_utterances_list,
local_function_schemas_list,
local_metadata_list,
remote_routes,
)

all_routes = set(remote_dict.keys()).union(local_dict.keys())

routes_to_add = []
Expand Down
11 changes: 11 additions & 0 deletions semantic_router/layer.py
Original file line number Diff line number Diff line change
Expand Up @@ -526,6 +526,17 @@ def _add_routes(self, routes: List[Route]):
logger.error(f"Failed to add routes to the index: {e}")
raise Exception("Indexing error occurred") from e

def is_synced(self) -> bool:
if not self.index.sync:
raise ValueError("Index is not set to sync with remote index.")

local_route_names, local_utterances, local_function_schemas, local_metadata = (
self._extract_routes_details(self.routes, include_metadata=True)
)
return self.index.is_synced(
local_route_names, local_utterances, local_function_schemas, local_metadata
)

def _add_and_sync_routes(self, routes: List[Route]):
# create embeddings for all routes and sync at startup with remote ones based on sync setting
local_route_names, local_utterances, local_function_schemas, local_metadata = (
Expand Down

0 comments on commit b2dcb6d

Please sign in to comment.