Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable delayed microservices spawning based on dependencies #772

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 62 additions & 0 deletions comps/cores/mega/micro_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,20 @@

import asyncio
import multiprocessing
import time
from typing import Any, List, Optional, Type

import requests

from ..proto.docarray import TextDoc
from .constants import ServiceRoleType, ServiceType
from .logger import CustomLogger
from .utils import check_ports_availability

opea_microservices = {}

logger = CustomLogger("micro-service")


class MicroService:
"""MicroService class to create a microservice."""
Expand All @@ -31,8 +37,61 @@ def __init__(
provider: Optional[str] = None,
provider_endpoint: Optional[str] = None,
use_remote_service: Optional[bool] = False,
methods: List[str] = ["POST"],
llm_endpoint: Optional[str] = None,
):
"""Init the microservice."""

# Make sure LLM/Microservice endpoint are ready
logger.info(f"Init microservice - {name}")
if llm_endpoint is not None:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

instead of just waiting for LLM microservice. Should we make it more general to wait for all dependency? maybe just traverse through DAG and make sure next node available first?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In fact it does wait for the other dependency if they are not LLM related. This is what we observed when we tested this:

  1. Microservice will wait for LLMs to be up
  2. Megaservice will wait for the microservice to be up

else:
url = f"{host}:{port}{endpoint}"
if protocol.upper() == "HTTP":
url = f"http://{url}"
elif protocol.upper() == "HTTPS":
url = f"https://{url}"
# enable future support for gRPC?
logger.info(f"Endpoint requested for microservice @ {url}")
success = False
while success is False:
try:
for item in methods:
# Do we want to cater for GET request?
# if yes, how to make it clean
if item.upper() == "POST":
response = requests.post(url)
if response.status_code == 200:
logger.info(f"Microservice endpoint is ready - {url}")
else:
logger.info(f"Microservice endpoint is ready - but error status code - {url}")
success = True
else:
logger.info(f"Microservice endpoint method not supported - Skipping - {url}")
success = True
except requests.exceptions.RequestException as e:
logger.info(f"Error: {e} - {url}")
time.sleep(2.5)

logger.info(f"LLM Endpoint requested for microservice @ {llm_endpoint}")
success = False
while success is False:
try:
for item in methods:
# Do we want to cater for GET request?
# if yes, how to make it clean
if item.upper() == "POST":
response = requests.post(llm_endpoint)
if response.status_code == 200:
logger.info(f"LLM endpoint is ready - {llm_endpoint}")
else:
logger.info(f"LLM endpoint is ready - but error status code - {llm_endpoint}")
success = True

except requests.exceptions.RequestException as e:
logger.info(f"Error: {e} - {llm_endpoint}")
time.sleep(2.5)
else:
url = f"{host}:{port}{endpoint}"
if protocol.upper() == "HTTP":
url = f"http://{url}"
elif protocol.upper() == "HTTPS":
url = f"https://{url}"
# enable future support for gRPC?

logger.info(f"Endpoint requested for microservice @ {url}")
success = False
while success is False:
try:
for item in methods:
# Do we want to cater for GET request?
# if yes, how to make it clean
if item.upper() == "POST":
response = requests.post(url)
if response.status_code == 200:
logger.info(f"Microservice endpoint is ready - {url}")
else:
logger.info(f"Microservice endpoint is ready - but error status code - {url}")
success = True
else:
logger.info(f"Microservice endpoint method not supported - Skipping - {url}")
success = True
except requests.exceptions.RequestException as e:
logger.info(f"Error: {e} - {url}")
time.sleep(2.5)

self.name = f"{name}/{self.__class__.__name__}" if name else self.__class__.__name__
self.service_role = service_role
self.service_type = service_type
Expand Down Expand Up @@ -155,6 +214,7 @@ def register_microservice(
provider: Optional[str] = None,
provider_endpoint: Optional[str] = None,
methods: List[str] = ["POST"],
llm_endpoint: Optional[str] = None,
):
def decorator(func):
if name not in opea_microservices:
Expand All @@ -172,6 +232,8 @@ def decorator(func):
output_datatype=output_datatype,
provider=provider,
provider_endpoint=provider_endpoint,
methods=methods,
llm_endpoint=llm_endpoint,
)
opea_microservices[name] = micro_service
opea_microservices[name].app.router.add_api_route(endpoint, func, methods=methods)
Expand Down
1 change: 1 addition & 0 deletions comps/intent_detection/langchain/intent_detection.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
endpoint="/v1/chat/intent",
host="0.0.0.0",
port=9000,
llm_endpoint=os.getenv("TGI_LLM_ENDPOINT", "http://localhost:8080"),
)
def llm_generate(input: LLMParamsDoc):
llm_endpoint = os.getenv("TGI_LLM_ENDPOINT", "http://localhost:8080")
Expand Down
1 change: 1 addition & 0 deletions comps/llms/faq-generation/tgi/langchain/llm.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ def post_process_text(text: str):
endpoint="/v1/faqgen",
host="0.0.0.0",
port=9000,
llm_endpoint=os.getenv("TGI_LLM_ENDPOINT", "http://localhost:8080"),
)
def llm_generate(input: LLMParamsDoc):
if logflag:
Expand Down
1 change: 1 addition & 0 deletions comps/llms/summarization/tgi/langchain/llm.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ def post_process_text(text: str):
endpoint="/v1/chat/docsum",
host="0.0.0.0",
port=9000,
llm_endpoint=os.getenv("TGI_LLM_ENDPOINT", "http://localhost:8080"),
)
def llm_generate(input: LLMParamsDoc):
if logflag:
Expand Down
1 change: 1 addition & 0 deletions comps/llms/text-generation/tgi/llm.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
endpoint="/v1/chat/completions",
host="0.0.0.0",
port=9000,
llm_endpoint=llm_endpoint,
)
@register_statistics(names=["opea_service@llm_tgi"])
async def llm_generate(input: Union[LLMParamsDoc, ChatCompletionRequest, SearchedDoc]):
Expand Down
1 change: 1 addition & 0 deletions comps/llms/text-generation/vllm/langchain/llm.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ def post_process_text(text: str):
endpoint="/v1/chat/completions",
host="0.0.0.0",
port=9000,
llm_endpoint=llm_endpoint,
)
def llm_generate(input: Union[LLMParamsDoc, ChatCompletionRequest, SearchedDoc]):
if logflag:
Expand Down
1 change: 1 addition & 0 deletions comps/llms/text-generation/vllm/llama_index/llm.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ def post_process_text(text: str):
endpoint="/v1/chat/completions",
host="0.0.0.0",
port=9000,
llm_endpoint=os.getenv("vLLM_ENDPOINT", "http://localhost:8008"),
)
def llm_generate(input: LLMParamsDoc):
if logflag:
Expand Down
1 change: 1 addition & 0 deletions comps/ragas/tgi/langchain/llm.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
port=9050,
input_datatype=RAGASParams,
output_datatype=RAGASScores,
llm_endpoint=os.getenv("TGI_LLM_ENDPOINT", "http://localhost:8080"),
)
def llm_generate(input: RAGASParams):
if logflag:
Expand Down
Loading