From 1d437cc78af21f0e5957ec5c5550b4b6de3953e2 Mon Sep 17 00:00:00 2001 From: "Foong, Khang Sheong" Date: Wed, 9 Oct 2024 10:59:38 +0800 Subject: [PATCH] feature: enable delayed microservices spawning based on dependencies #764 --- comps/cores/mega/micro_service.py | 61 ++++++++++++++++++- .../langchain/intent_detection.py | 1 + .../llms/faq-generation/tgi/langchain/llm.py | 1 + comps/llms/summarization/tgi/langchain/llm.py | 1 + comps/llms/text-generation/tgi/llm.py | 1 + .../text-generation/vllm/langchain/llm.py | 1 + .../text-generation/vllm/llama_index/llm.py | 1 + comps/ragas/tgi/langchain/llm.py | 1 + 8 files changed, 67 insertions(+), 1 deletion(-) diff --git a/comps/cores/mega/micro_service.py b/comps/cores/mega/micro_service.py index 9d707fa688..0f161b05b9 100644 --- a/comps/cores/mega/micro_service.py +++ b/comps/cores/mega/micro_service.py @@ -3,15 +3,18 @@ import asyncio import multiprocessing +import requests +import time from typing import Any, List, Optional, Type from ..proto.docarray import TextDoc from .constants import ServiceRoleType, ServiceType from .utils import check_ports_availability +from .logger import CustomLogger opea_microservices = {} - +logger = CustomLogger("micro-service") class MicroService: """MicroService class to create a microservice.""" @@ -31,8 +34,61 @@ def __init__( provider: Optional[str] = None, provider_endpoint: Optional[str] = None, use_remote_service: Optional[bool] = False, + methods: List[str] = ["POST"], + llm_endpoint: Optional[str] = None, ): """Init the microservice.""" + + # Make sure LLM/Microservice endpoint are ready + logger.info(f"Init microservice - {name}") + if llm_endpoint is not None: + logger.info(f"LLM Endpoint requested for microservice @ {llm_endpoint}") + success = False + while success is False: + try: + for item in methods: + # Do we want to cater for GET request? + # if yes, how to make it clean + if item.upper() == "POST": + response = requests.post(llm_endpoint) + if response.status_code == 200: + logger.info(f"LLM endpoint is ready - {llm_endpoint}") + else: + logger.info(f"LLM endpoint is ready - but error status code - {llm_endpoint}") + success = True + + except requests.exceptions.RequestException as e: + logger.info(f"Error: {e} - {llm_endpoint}") + time.sleep(2.5) + else: + url = f"{host}:{port}{endpoint}" + if protocol.upper() == "HTTP": + url = f"http://{url}" + elif protocol.upper() == "HTTPS": + url = f"https://{url}" + # enable future support for gRPC? + + logger.info(f"Endpoint requested for microservice @ {url}") + success = False + while success is False: + try: + for item in methods: + # Do we want to cater for GET request? + # if yes, how to make it clean + if item.upper() == "POST": + response = requests.post(url) + if response.status_code == 200: + logger.info(f"Microservice endpoint is ready - {url}") + else: + logger.info(f"Microservice endpoint is ready - but error status code - {url}") + success = True + else: + logger.info(f"Microservice endpoint method not supported - Skipping - {url}") + success = True + except requests.exceptions.RequestException as e: + logger.info(f"Error: {e} - {url}") + time.sleep(2.5) + self.name = f"{name}/{self.__class__.__name__}" if name else self.__class__.__name__ self.service_role = service_role self.service_type = service_type @@ -155,6 +211,7 @@ def register_microservice( provider: Optional[str] = None, provider_endpoint: Optional[str] = None, methods: List[str] = ["POST"], + llm_endpoint: Optional[str] = None, ): def decorator(func): if name not in opea_microservices: @@ -172,6 +229,8 @@ def decorator(func): output_datatype=output_datatype, provider=provider, provider_endpoint=provider_endpoint, + methods=methods, + llm_endpoint=llm_endpoint, ) opea_microservices[name] = micro_service opea_microservices[name].app.router.add_api_route(endpoint, func, methods=methods) diff --git a/comps/intent_detection/langchain/intent_detection.py b/comps/intent_detection/langchain/intent_detection.py index 3abd6974ca..2bcd184de1 100644 --- a/comps/intent_detection/langchain/intent_detection.py +++ b/comps/intent_detection/langchain/intent_detection.py @@ -15,6 +15,7 @@ endpoint="/v1/chat/intent", host="0.0.0.0", port=9000, + llm_endpoint=os.getenv("TGI_LLM_ENDPOINT", "http://localhost:8080"), ) def llm_generate(input: LLMParamsDoc): llm_endpoint = os.getenv("TGI_LLM_ENDPOINT", "http://localhost:8080") diff --git a/comps/llms/faq-generation/tgi/langchain/llm.py b/comps/llms/faq-generation/tgi/langchain/llm.py index 4d54438c55..4a33731409 100644 --- a/comps/llms/faq-generation/tgi/langchain/llm.py +++ b/comps/llms/faq-generation/tgi/langchain/llm.py @@ -33,6 +33,7 @@ def post_process_text(text: str): endpoint="/v1/faqgen", host="0.0.0.0", port=9000, + llm_endpoint=os.getenv("TGI_LLM_ENDPOINT", "http://localhost:8080"), ) def llm_generate(input: LLMParamsDoc): if logflag: diff --git a/comps/llms/summarization/tgi/langchain/llm.py b/comps/llms/summarization/tgi/langchain/llm.py index ff380ce167..85f10c1b4e 100644 --- a/comps/llms/summarization/tgi/langchain/llm.py +++ b/comps/llms/summarization/tgi/langchain/llm.py @@ -32,6 +32,7 @@ def post_process_text(text: str): endpoint="/v1/chat/docsum", host="0.0.0.0", port=9000, + llm_endpoint=os.getenv("TGI_LLM_ENDPOINT", "http://localhost:8080"), ) def llm_generate(input: LLMParamsDoc): if logflag: diff --git a/comps/llms/text-generation/tgi/llm.py b/comps/llms/text-generation/tgi/llm.py index d96518296e..43b57eb819 100644 --- a/comps/llms/text-generation/tgi/llm.py +++ b/comps/llms/text-generation/tgi/llm.py @@ -40,6 +40,7 @@ endpoint="/v1/chat/completions", host="0.0.0.0", port=9000, + llm_endpoint=llm_endpoint, ) @register_statistics(names=["opea_service@llm_tgi"]) async def llm_generate(input: Union[LLMParamsDoc, ChatCompletionRequest, SearchedDoc]): diff --git a/comps/llms/text-generation/vllm/langchain/llm.py b/comps/llms/text-generation/vllm/langchain/llm.py index 7160bdb9fc..cb8a5ceed9 100644 --- a/comps/llms/text-generation/vllm/langchain/llm.py +++ b/comps/llms/text-generation/vllm/langchain/llm.py @@ -47,6 +47,7 @@ def post_process_text(text: str): endpoint="/v1/chat/completions", host="0.0.0.0", port=9000, + llm_endpoint=llm_endpoint, ) def llm_generate(input: Union[LLMParamsDoc, ChatCompletionRequest, SearchedDoc]): if logflag: diff --git a/comps/llms/text-generation/vllm/llama_index/llm.py b/comps/llms/text-generation/vllm/llama_index/llm.py index 4c3957bae7..5df13f07f7 100644 --- a/comps/llms/text-generation/vllm/llama_index/llm.py +++ b/comps/llms/text-generation/vllm/llama_index/llm.py @@ -38,6 +38,7 @@ def post_process_text(text: str): endpoint="/v1/chat/completions", host="0.0.0.0", port=9000, + llm_endpoint = os.getenv("vLLM_ENDPOINT", "http://localhost:8008"), ) def llm_generate(input: LLMParamsDoc): if logflag: diff --git a/comps/ragas/tgi/langchain/llm.py b/comps/ragas/tgi/langchain/llm.py index 0b67164a4e..0d2deb5ec5 100644 --- a/comps/ragas/tgi/langchain/llm.py +++ b/comps/ragas/tgi/langchain/llm.py @@ -39,6 +39,7 @@ port=9050, input_datatype=RAGASParams, output_datatype=RAGASScores, + llm_endpoint=os.getenv("TGI_LLM_ENDPOINT", "http://localhost:8080"), ) def llm_generate(input: RAGASParams): if logflag: