From 3b1afa89cfcefb0fdee1d3c604fd6dcd44168488 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=88=98=E6=B1=9F=E6=B3=A2?= Date: Thu, 12 Dec 2024 22:13:40 +0800 Subject: [PATCH] feat: add prometheus metrics --- api/.env.example | 4 +- api/configs/feature/__init__.py | 16 ++- .../advanced_chat/generate_task_pipeline.py | 11 +- .../apps/workflow/generate_task_pipeline.py | 6 + api/core/app/task_pipeline/__init__.py | 36 ++++++ .../easy_ui_based_generate_task_pipeline.py | 51 ++++++++ .../task_pipeline/workflow_cycle_manage.py | 36 ++++++ api/core/model_manager.py | 49 +++++++- .../callbacks/metrics_callback.py | 118 ++++++++++++++++++ .../model_providers/__base/ai_model.py | 1 + .../__base/large_language_model.py | 13 +- api/core/tools/tool/tool.py | 34 ++++- api/docker/entrypoint.sh | 1 + api/extensions/ext_app_metrics.py | 88 ++++++++++++- api/extensions/ext_storage.py | 42 +++++++ api/poetry.lock | 18 ++- api/pyproject.toml | 1 + 17 files changed, 504 insertions(+), 21 deletions(-) create mode 100644 api/core/model_runtime/callbacks/metrics_callback.py diff --git a/api/.env.example b/api/.env.example index 071a200e680278..1f25a671841b5f 100644 --- a/api/.env.example +++ b/api/.env.example @@ -427,4 +427,6 @@ CREATE_TIDB_SERVICE_JOB_ENABLED=false # Maximum number of submitted thread count in a ThreadPool for parallel node execution MAX_SUBMIT_COUNT=100 # Lockout duration in seconds -LOGIN_LOCKOUT_DURATION=86400 \ No newline at end of file +LOGIN_LOCKOUT_DURATION=86400 + +PROMETHEUS_MULTIPROC_DIR=/tmp/prometheus_multiproc_dir diff --git a/api/configs/feature/__init__.py b/api/configs/feature/__init__.py index 73f8a95989baaf..9cdd3664bee614 100644 --- a/api/configs/feature/__init__.py +++ b/api/configs/feature/__init__.py @@ -23,7 +23,8 @@ class SecurityConfig(BaseSettings): SECRET_KEY: str = Field( description="Secret key for secure session cookie signing." "Make sure you are changing this key for your deployment with a strong key." - "Generate a strong key using `openssl rand -base64 42` or set via the `SECRET_KEY` environment variable.", + "Generate a strong key using `openssl rand -base64 42` " + "or set via the `SECRET_KEY` environment variable.", default="", ) @@ -767,6 +768,18 @@ class LoginConfig(BaseSettings): ) +class PrometheusConfig(BaseSettings): + HISTOGRAM_BUCKETS_1MIN: list[float] = Field( + description="The buckets of Prometheus histogram under 1 minute", + default=[0.1, 0.2, 0.5, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 12, 14, 16, 18, 20, 25, 30, 40, 50, 60], + ) + + HISTOGRAM_BUCKETS_5MIN: list[float] = Field( + description="The buckets of Prometheus histogram under 5 minute", + default=[0.1, 0.2, 0.5, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 12, 14, 16, 18, 20, 25, 30, 40, 50, 60, 120, 180, 300], + ) + + class FeatureConfig( # place the configs in alphabet order AppExecutionConfig, @@ -794,6 +807,7 @@ class FeatureConfig( WorkflowNodeExecutionConfig, WorkspaceConfig, LoginConfig, + PrometheusConfig, # hosted services config HostedServiceConfig, CeleryBeatConfig, diff --git a/api/core/app/apps/advanced_chat/generate_task_pipeline.py b/api/core/app/apps/advanced_chat/generate_task_pipeline.py index ce0e95962772ad..62efac54192499 100644 --- a/api/core/app/apps/advanced_chat/generate_task_pipeline.py +++ b/api/core/app/apps/advanced_chat/generate_task_pipeline.py @@ -398,7 +398,9 @@ def _process_stream_response( conversation_id=self._conversation.id, trace_manager=trace_manager, ) - + self._workflow_time_it( + is_success=True, graph_runtime_state=graph_runtime_state, workflow_run=workflow_run + ) yield self._workflow_finish_to_stream_response( task_id=self._application_generate_entity.task_id, workflow_run=workflow_run ) @@ -421,6 +423,9 @@ def _process_stream_response( conversation_id=None, trace_manager=trace_manager, ) + self._workflow_time_it( + is_success=False, graph_runtime_state=graph_runtime_state, workflow_run=workflow_run + ) yield self._workflow_finish_to_stream_response( task_id=self._application_generate_entity.task_id, workflow_run=workflow_run @@ -445,7 +450,9 @@ def _process_stream_response( trace_manager=trace_manager, exceptions_count=event.exceptions_count, ) - + self._workflow_time_it( + is_success=False, graph_runtime_state=graph_runtime_state, workflow_run=workflow_run + ) yield self._workflow_finish_to_stream_response( task_id=self._application_generate_entity.task_id, workflow_run=workflow_run ) diff --git a/api/core/app/apps/workflow/generate_task_pipeline.py b/api/core/app/apps/workflow/generate_task_pipeline.py index 79e5e2bcb96d60..97cdf77c38f189 100644 --- a/api/core/app/apps/workflow/generate_task_pipeline.py +++ b/api/core/app/apps/workflow/generate_task_pipeline.py @@ -357,6 +357,9 @@ def _process_stream_response( conversation_id=None, trace_manager=trace_manager, ) + self._workflow_time_it( + is_success=True, graph_runtime_state=graph_runtime_state, workflow_run=workflow_run + ) # save workflow app log self._save_workflow_app_log(workflow_run) @@ -381,6 +384,9 @@ def _process_stream_response( conversation_id=None, trace_manager=trace_manager, ) + self._workflow_time_it( + is_success=False, graph_runtime_state=graph_runtime_state, workflow_run=workflow_run + ) # save workflow app log self._save_workflow_app_log(workflow_run) diff --git a/api/core/app/task_pipeline/__init__.py b/api/core/app/task_pipeline/__init__.py index e69de29bb2d1d6..e4307ec31133d1 100644 --- a/api/core/app/task_pipeline/__init__.py +++ b/api/core/app/task_pipeline/__init__.py @@ -0,0 +1,36 @@ +from prometheus_client import Counter, Histogram + +from configs import dify_config + +app_request = Counter( + name="app_request", + documentation="The total count of APP requests", + labelnames=["app_id", "tenant_id", "username"], +) +app_request_failed = Counter( + name="app_request_failed", + documentation="The failed count of APP requests", + labelnames=["app_id", "tenant_id", "username"], +) +app_request_latency = Histogram( + name="app_request_latency", + documentation="The latency of APP requests", + unit="seconds", + labelnames=["app_id", "tenant_id", "username"], + buckets=dify_config.HISTOGRAM_BUCKETS_5MIN, +) +app_input_tokens = Counter( + name="app_input_tokens", + documentation="The input tokens cost by APP requests", + labelnames=["app_id", "tenant_id", "username"], +) +app_output_tokens = Counter( + name="app_output_tokens", + documentation="The output tokens cost by APP requests", + labelnames=["app_id", "tenant_id", "username"], +) +app_total_tokens = Counter( + name="app_total_tokens", + documentation="The total tokens cost by APP requests", + labelnames=["app_id", "tenant_id", "username"], +) diff --git a/api/core/app/task_pipeline/easy_ui_based_generate_task_pipeline.py b/api/core/app/task_pipeline/easy_ui_based_generate_task_pipeline.py index 917649f34e769c..6aef5cea482059 100644 --- a/api/core/app/task_pipeline/easy_ui_based_generate_task_pipeline.py +++ b/api/core/app/task_pipeline/easy_ui_based_generate_task_pipeline.py @@ -39,6 +39,14 @@ MessageEndStreamResponse, StreamResponse, ) +from core.app.task_pipeline import ( + app_input_tokens, + app_output_tokens, + app_request, + app_request_failed, + app_request_latency, + app_total_tokens, +) from core.app.task_pipeline.based_generate_task_pipeline import BasedGenerateTaskPipeline from core.app.task_pipeline.message_cycle_manage import MessageCycleManage from core.model_manager import ModelInstance @@ -251,6 +259,47 @@ def _wrapper_process_stream_response( if publisher: yield MessageAudioEndStreamResponse(audio="", task_id=task_id) + def _chat_time_it(self, is_success: bool) -> None: + """ + Record chat / completion / agent run metrics. + """ + app_id = self._app_config.app_id + tenant_id = self._app_config.tenant_id + username = self._conversation.from_account_name + app_request.labels( + app_id=app_id, + tenant_id=tenant_id, + username=username, + ).inc() + + if not is_success: + app_request_failed.labels( + app_id=app_id, + tenant_id=tenant_id, + username=username, + ).inc() + return + app_request_latency.labels( + app_id=app_id, + tenant_id=tenant_id, + username=username, + ).observe(self._message.provider_response_latency) + app_input_tokens.labels( + app_id=app_id, + tenant_id=tenant_id, + username=username, + ).inc(self._message.message_tokens) + app_output_tokens.labels( + app_id=app_id, + tenant_id=tenant_id, + username=username, + ).inc(self._message.answer_tokens) + app_total_tokens.labels( + app_id=app_id, + tenant_id=tenant_id, + username=username, + ).inc(self._message.message_tokens + self._message.answer_tokens) + def _process_stream_response( self, publisher: AppGeneratorTTSPublisher, trace_manager: Optional[TraceQueueManager] = None ) -> Generator[StreamResponse, None, None]: @@ -265,6 +314,7 @@ def _process_stream_response( if isinstance(event, QueueErrorEvent): err = self._handle_error(event, self._message) + self._chat_time_it(is_success=False) yield self._error_to_stream_response(err) break elif isinstance(event, QueueStopEvent | QueueMessageEndEvent): @@ -283,6 +333,7 @@ def _process_stream_response( # Save message self._save_message(trace_manager) + self._chat_time_it(is_success=True) yield self._message_end_to_stream_response() elif isinstance(event, QueueRetrieverResourcesEvent): diff --git a/api/core/app/task_pipeline/workflow_cycle_manage.py b/api/core/app/task_pipeline/workflow_cycle_manage.py index e2fa12b1cddd70..666c4c2cc870fc 100644 --- a/api/core/app/task_pipeline/workflow_cycle_manage.py +++ b/api/core/app/task_pipeline/workflow_cycle_manage.py @@ -35,6 +35,14 @@ WorkflowStartStreamResponse, WorkflowTaskState, ) +from core.app.task_pipeline import ( + app_input_tokens, + app_output_tokens, + app_request, + app_request_failed, + app_request_latency, + app_total_tokens, +) from core.file import FILE_MODEL_IDENTITY, File from core.model_runtime.utils.encoders import jsonable_encoder from core.ops.entities.trace_entity import TraceTaskName @@ -42,6 +50,7 @@ from core.tools.tool_manager import ToolManager from core.workflow.entities.node_entities import NodeRunMetadataKey from core.workflow.enums import SystemVariableKey +from core.workflow.graph_engine import GraphRuntimeState from core.workflow.nodes import NodeType from core.workflow.nodes.tool.entities import ToolNodeData from core.workflow.workflow_entry import WorkflowEntry @@ -119,6 +128,33 @@ def _handle_workflow_run_start(self) -> WorkflowRun: return workflow_run + def _workflow_time_it( + self, is_success: bool, graph_runtime_state: GraphRuntimeState, workflow_run: WorkflowRun + ) -> None: + """ + Record advanced-chat / workflow run metrics. + """ + app_id = workflow_run.app_id + tenant_id = workflow_run.tenant_id + username = self._user.name + app_request.labels(app_id=app_id, tenant_id=tenant_id, username=username).inc() + + if not is_success: + app_request_failed.labels(app_id=app_id, tenant_id=tenant_id, username=username).inc() + return + app_request_latency.labels(app_id=app_id, tenant_id=tenant_id, username=username).observe( + workflow_run.elapsed_time + ) + app_input_tokens.labels(app_id=app_id, tenant_id=tenant_id, username=username).inc( + graph_runtime_state.llm_usage.prompt_tokens + ) + app_output_tokens.labels(app_id=app_id, tenant_id=tenant_id, username=username).inc( + graph_runtime_state.llm_usage.completion_tokens + ) + app_total_tokens.labels(app_id=app_id, tenant_id=tenant_id, username=username).inc( + graph_runtime_state.llm_usage.total_tokens + ) + def _handle_workflow_run_success( self, workflow_run: WorkflowRun, diff --git a/api/core/model_manager.py b/api/core/model_manager.py index 1986688551b601..997cedf64f2145 100644 --- a/api/core/model_manager.py +++ b/api/core/model_manager.py @@ -2,6 +2,8 @@ from collections.abc import Callable, Generator, Iterable, Sequence from typing import IO, Any, Optional, Union, cast +from prometheus_client import Counter, Histogram + from configs import dify_config from core.entities.embedding_type import EmbeddingInputType from core.entities.provider_configuration import ProviderConfiguration, ProviderModelBundle @@ -26,6 +28,25 @@ logger = logging.getLogger(__name__) +model_request_total_counter = Counter( + name="model_request_total_counter", + documentation="The total count of model requests", + labelnames=["model_type", "provider", "model", "method"], +) +model_request_failed_counter = Counter( + name="model_request_failed_counter", + documentation="The failed count of model requests", + labelnames=["model_type", "provider", "model", "method"], +) +model_request_latency = Histogram( + name="model_request_latency", + documentation="The latency of model requests. For the LLM model, it just indicate " + "the TTFT (a.k.a. Time To First Token).", + unit="seconds", + labelnames=["model_type", "provider", "model", "method"], + buckets=dify_config.HISTOGRAM_BUCKETS_1MIN, +) + class ModelInstance: """ @@ -298,6 +319,30 @@ def invoke_tts(self, content_text: str, tenant_id: str, voice: str, user: Option voice=voice, ) + def _invoke_with_timeit(self, function: Callable[..., Any], *args, **kwargs): + with model_request_latency.labels( + model_type=self.model_type_instance.model_type.value, + provider=self.provider, + model=self.model, + method=function.__name__ if hasattr(function, "__name__") else "unknown", + ).time(): + model_request_total_counter.labels( + model_type=self.model_type_instance.model_type.value, + provider=self.provider, + model=self.model, + method=function.__name__ if hasattr(function, "__name__") else "unknown", + ).inc() + try: + return function(*args, **kwargs) + except Exception as e: + model_request_failed_counter.labels( + model_type=self.model_type_instance.model_type.value, + provider=self.provider, + model=self.model, + method=function.__name__ if hasattr(function, "__name__") else "unknown", + ).inc() + raise e + def _round_robin_invoke(self, function: Callable[..., Any], *args, **kwargs): """ Round-robin invoke @@ -307,7 +352,7 @@ def _round_robin_invoke(self, function: Callable[..., Any], *args, **kwargs): :return: """ if not self.load_balancing_manager: - return function(*args, **kwargs) + return self._invoke_with_timeit(function, *args, **kwargs) last_exception = None while True: @@ -321,7 +366,7 @@ def _round_robin_invoke(self, function: Callable[..., Any], *args, **kwargs): try: if "credentials" in kwargs: del kwargs["credentials"] - return function(*args, **kwargs, credentials=lb_config.credentials) + return self._invoke_with_timeit(function, *args, **kwargs, credentials=lb_config.credentials) except InvokeRateLimitError as e: # expire in 60 seconds self.load_balancing_manager.cooldown(lb_config, expire=60) diff --git a/api/core/model_runtime/callbacks/metrics_callback.py b/api/core/model_runtime/callbacks/metrics_callback.py new file mode 100644 index 00000000000000..4c9a3716bcdecb --- /dev/null +++ b/api/core/model_runtime/callbacks/metrics_callback.py @@ -0,0 +1,118 @@ +from typing import Optional + +from prometheus_client import Counter, Histogram + +from configs import dify_config +from core.model_runtime.callbacks.base_callback import Callback +from core.model_runtime.entities.llm_entities import LLMResult, LLMResultChunk +from core.model_runtime.entities.message_entities import PromptMessage, PromptMessageTool +from core.model_runtime.model_providers.__base.ai_model import AIModel + +llm_model_request_total_counter = Counter( + name="llm_model_request_total_counter", + documentation="The total count of LLM model requests", + labelnames=["model_type", "model"], +) +llm_model_request_failed_counter = Counter( + name="llm_model_request_failed_counter", + documentation="The failed count of LLM model requests", + labelnames=["model_type", "model"], +) +llm_model_request_first_chunk_latency = Histogram( + name="llm_model_request_first_chunk_latency", + documentation="The first chunk latency of LLM model requests", + unit="seconds", + labelnames=["model_type", "model"], + buckets=dify_config.HISTOGRAM_BUCKETS_1MIN, +) +llm_model_request_following_chunk_latency = Histogram( + name="llm_model_request_following_chunk_latency", + documentation="The following chunk latency of LLM model requests", + unit="seconds", + labelnames=["model_type", "model"], + buckets=Histogram.DEFAULT_BUCKETS, +) +llm_model_request_entire_latency = Histogram( + name="llm_model_request_entire_latency", + documentation="The entire latency of LLM model requests", + unit="seconds", + labelnames=["model_type", "model"], + buckets=dify_config.HISTOGRAM_BUCKETS_5MIN, +) + + +class MetricsCallback(Callback): + first_chunk: bool = True + + def on_before_invoke( + self, + llm_instance: AIModel, + model: str, + credentials: dict, + prompt_messages: list[PromptMessage], + model_parameters: dict, + tools: Optional[list[PromptMessageTool]] = None, + stop: Optional[list[str]] = None, + stream: bool = True, + user: Optional[str] = None, + ) -> None: + llm_model_request_total_counter.labels(model_type=llm_instance.model_type.value, model=model).inc() + + def on_new_chunk( + self, + llm_instance: AIModel, + chunk: LLMResultChunk, + model: str, + credentials: dict, + prompt_messages: list[PromptMessage], + model_parameters: dict, + tools: Optional[list[PromptMessageTool]] = None, + stop: Optional[list[str]] = None, + stream: bool = True, + user: Optional[str] = None, + ): + # Skip the last one. The last one indicate the entire usage. + if chunk.delta.finish_reason is not None: + return + + if self.first_chunk: + llm_model_request_first_chunk_latency.labels(model_type=llm_instance.model_type.value, model=model).observe( + chunk.delta.usage.latency + ) + self.first_chunk = False + else: + llm_model_request_following_chunk_latency.labels( + model_type=llm_instance.model_type.value, model=model + ).observe(chunk.delta.usage.latency) + + def on_after_invoke( + self, + llm_instance: AIModel, + result: LLMResult, + model: str, + credentials: dict, + prompt_messages: list[PromptMessage], + model_parameters: dict, + tools: Optional[list[PromptMessageTool]] = None, + stop: Optional[list[str]] = None, + stream: bool = True, + user: Optional[str] = None, + ) -> None: + llm_model_request_entire_latency.labels(model_type=llm_instance.model_type.value, model=model).observe( + result.usage.latency + ) + + def on_invoke_error( + self, + llm_instance: AIModel, + ex: Exception, + model: str, + credentials: dict, + prompt_messages: list[PromptMessage], + model_parameters: dict, + tools: Optional[list[PromptMessageTool]] = None, + stop: Optional[list[str]] = None, + stream: bool = True, + user: Optional[str] = None, + ) -> None: + llm_model_request_failed_counter.labels(model_type=llm_instance.model_type.value, model=model).inc() diff --git a/api/core/model_runtime/model_providers/__base/ai_model.py b/api/core/model_runtime/model_providers/__base/ai_model.py index 79a1d28ebe637e..78f01be668dbc8 100644 --- a/api/core/model_runtime/model_providers/__base/ai_model.py +++ b/api/core/model_runtime/model_providers/__base/ai_model.py @@ -31,6 +31,7 @@ class AIModel(ABC): model_type: ModelType model_schemas: Optional[list[AIModelEntity]] = None started_at: float = 0 + last_chunked_at: float = 0 # pydantic configs model_config = ConfigDict(protected_namespaces=()) diff --git a/api/core/model_runtime/model_providers/__base/large_language_model.py b/api/core/model_runtime/model_providers/__base/large_language_model.py index 8faeffa872b40f..e30ffa1d89344c 100644 --- a/api/core/model_runtime/model_providers/__base/large_language_model.py +++ b/api/core/model_runtime/model_providers/__base/large_language_model.py @@ -10,6 +10,7 @@ from configs import dify_config from core.model_runtime.callbacks.base_callback import Callback from core.model_runtime.callbacks.logging_callback import LoggingCallback +from core.model_runtime.callbacks.metrics_callback import MetricsCallback from core.model_runtime.entities.llm_entities import LLMMode, LLMResult, LLMResultChunk, LLMResultChunkDelta, LLMUsage from core.model_runtime.entities.message_entities import ( AssistantPromptMessage, @@ -74,8 +75,10 @@ def invoke( model_parameters = self._validate_and_filter_model_parameters(model, model_parameters, credentials) self.started_at = time.perf_counter() + self.last_chunked_at = self.started_at callbacks = callbacks or [] + callbacks.append(MetricsCallback()) if dify_config.DEBUG: callbacks.append(LoggingCallback()) @@ -429,6 +432,14 @@ def _invoke_result_generator( for chunk in result: yield chunk + if chunk.delta.usage: + usage = chunk.delta.usage + else: + chunk.delta.usage = LLMUsage.empty_usage() + now = time.perf_counter() + chunk.delta.usage.latency = now - self.last_chunked_at + self.last_chunked_at = now + self._trigger_new_chunk_callbacks( chunk=chunk, model=model, @@ -444,8 +455,6 @@ def _invoke_result_generator( prompt_message.content += chunk.delta.message.content real_model = chunk.model - if chunk.delta.usage: - usage = chunk.delta.usage if chunk.system_fingerprint: system_fingerprint = chunk.system_fingerprint diff --git a/api/core/tools/tool/tool.py b/api/core/tools/tool/tool.py index 8d4045038171a6..3ce58c34497dd2 100644 --- a/api/core/tools/tool/tool.py +++ b/api/core/tools/tool/tool.py @@ -4,9 +4,11 @@ from enum import Enum, StrEnum from typing import TYPE_CHECKING, Any, Optional, Union +from prometheus_client import Counter, Histogram from pydantic import BaseModel, ConfigDict, field_validator from pydantic_core.core_schema import ValidationInfo +from configs import dify_config from core.app.entities.app_invoke_entities import InvokeFrom from core.tools.entities.tool_entities import ( ToolDescription, @@ -21,9 +23,26 @@ ) from core.tools.tool_file_manager import ToolFileManager + if TYPE_CHECKING: from core.file.models import File +tool_request_total_counter = Counter( + name="tool_request_total_counter", documentation="The total count of tool requests", labelnames=["provider", "tool"] +) +tool_request_failed_counter = Counter( + name="tool_request_failed_counter", + documentation="The failed count of tool requests", + labelnames=["provider", "tool"], +) +tool_request_latency = Histogram( + name="tool_request_latency", + documentation="The latency of tool requests", + unit="seconds", + labelnames=["provider", "tool"], + buckets=dify_config.HISTOGRAM_BUCKETS_5MIN, +) + class Tool(BaseModel, ABC): identity: Optional[ToolIdentity] = None @@ -206,10 +225,17 @@ def invoke(self, user_id: str, tool_parameters: Mapping[str, Any]) -> list[ToolI # try parse tool parameters into the correct type tool_parameters = self._transform_tool_parameters_type(tool_parameters) - result = self._invoke( - user_id=user_id, - tool_parameters=tool_parameters, - ) + result = [] + with tool_request_latency.labels(provider=self.identity.provider, tool=self.identity.name).time(): + tool_request_total_counter.labels(provider=self.identity.provider, tool=self.identity.name).inc() + try: + result = self._invoke( + user_id=user_id, + tool_parameters=tool_parameters, + ) + except Exception as e: + tool_request_failed_counter.labels(provider=self.identity.provider, tool=self.identity.name).inc() + raise e if not isinstance(result, list): result = [result] diff --git a/api/docker/entrypoint.sh b/api/docker/entrypoint.sh index 881263171fa145..07f96a952e051e 100755 --- a/api/docker/entrypoint.sh +++ b/api/docker/entrypoint.sh @@ -26,6 +26,7 @@ if [[ "${MODE}" == "worker" ]]; then elif [[ "${MODE}" == "beat" ]]; then exec celery -A app.celery beat --loglevel ${LOG_LEVEL} else + mkdir "${PROMETHEUS_MULTIPROC_DIR}" if [[ "${DEBUG}" == "true" ]]; then exec flask run --host=${DIFY_BIND_ADDRESS:-0.0.0.0} --port=${DIFY_PORT:-5001} --debug else diff --git a/api/extensions/ext_app_metrics.py b/api/extensions/ext_app_metrics.py index de1cdfeb984e86..c865cc4a088f14 100644 --- a/api/extensions/ext_app_metrics.py +++ b/api/extensions/ext_app_metrics.py @@ -3,9 +3,58 @@ import threading from flask import Response +from prometheus_client import ( + CollectorRegistry, + Counter, + Gauge, + make_wsgi_app, + multiprocess, +) +from sqlalchemy import text +from werkzeug.middleware.dispatcher import DispatcherMiddleware from configs import dify_config from dify_app import DifyApp +from extensions.ext_database import db +from extensions.ext_redis import redis_client +from extensions.ext_storage import storage + +health_check_total_counter = Counter(name="health_check_total_counter", documentation="The count of health check") +redis_checked_counter = Counter( + name="redis_checked_counter", documentation="The count of Redis has been checked as health" +) +db_checked_counter = Counter(name="db_checked_counter", documentation="The count of DB has been checked as health") +storage_checked_counter = Counter( + name="storage_checked_counter", documentation="The count of storage has been checked as health" +) +redis_used_memory_bytes = Gauge( + name="redis_used_memory_bytes", documentation="The used bytes of memory in Redis", multiprocess_mode="livesum" +) +redis_total_memory_bytes = Gauge( + name="redis_total_memory_bytes", documentation="The total bytes of memory in Redis", multiprocess_mode="livesum" +) + +db_pool_total_size = Gauge( + name="db_pool_total_size", + documentation="The total size of db pool", + multiprocess_mode="livesum", +) +db_pool_checkout_size = Gauge( + name="db_pool_checkout_size", documentation="The checkout size of db pool", multiprocess_mode="livesum" +) +db_pool_overflow_size = Gauge( + name="db_pool_overflow_size", documentation="The overflow size of db pool", multiprocess_mode="livesum" +) + + +# Using multiprocess collector for registry +def _make_metrics_app(): + if os.getenv("PROMETHEUS_MULTIPROC_DIR", "") != "": + registry = CollectorRegistry() + multiprocess.MultiProcessCollector(registry) + return make_wsgi_app(registry=registry) + else: + return make_wsgi_app() def init_app(app: DifyApp): @@ -18,11 +67,32 @@ def after_request(response): @app.route("/health") def health(): - return Response( - json.dumps({"pid": os.getpid(), "status": "ok", "version": dify_config.CURRENT_VERSION}), - status=200, - content_type="application/json", - ) + try: + health_check_key = "dify.health_check" + redis_client.set(health_check_key, 1) + redis_client.get(health_check_key) + redis_checked_counter.inc() + + info = redis_client.info() + redis_used_memory_bytes.set(info["used_memory"]) + redis_total_memory_bytes.set(info["maxmemory"] if info["maxmemory"] != 0 else info["total_system_memory"]) + + db.session.execute(text("SELECT 1")) + db_checked_counter.inc() + + storage.save(health_check_key, b"test") + storage.load(health_check_key) + storage_checked_counter.inc() + + return Response( + json.dumps({"pid": os.getpid(), "status": "ok", "version": dify_config.CURRENT_VERSION}), + status=200, + content_type="application/json", + ) + finally: + # 最后才增加计数,保证在健康时,也不会出现 health_check_total_counter > *_checked_counter 的情况, + # 保证 max(*_checked_counter / health_check_total_counter, 1) == 1,以便于监控系统判断是否有异常 + health_check_total_counter.inc() @app.route("/threads") def threads(): @@ -54,7 +124,7 @@ def pool_stat(): from extensions.ext_database import db engine = db.engine - return { + stat = { "pid": os.getpid(), "pool_size": engine.pool.size(), "checked_in_connections": engine.pool.checkedin(), @@ -63,3 +133,9 @@ def pool_stat(): "connection_timeout": engine.pool.timeout(), "recycle_time": db.engine.pool._recycle, } + db_pool_total_size.set(stat["pool_size"]) + db_pool_checkout_size.set(stat["checked_out_connections"]) + db_pool_overflow_size.set(stat["overflow_connections"]) + return stat + + app.wsgi_app = DispatcherMiddleware(app.wsgi_app, {"/metrics": _make_metrics_app()}) diff --git a/api/extensions/ext_storage.py b/api/extensions/ext_storage.py index 42422263c4dd03..be9f423eec93b8 100644 --- a/api/extensions/ext_storage.py +++ b/api/extensions/ext_storage.py @@ -1,8 +1,10 @@ import logging from collections.abc import Callable, Generator +from functools import wraps from typing import Union from flask import Flask +from prometheus_client import Counter, Histogram from configs import dify_config from dify_app import DifyApp @@ -11,6 +13,39 @@ logger = logging.getLogger(__name__) +storage_request_latency = Histogram( + name="storage_request_latency", + documentation="The latency of storage requests", + unit="seconds", + labelnames=["method", "provider"], +) + +storage_request_total_counter = Counter( + name="storage_request_total_counter", + documentation="The total count of storage requests", + labelnames=["method", "provider"], +) + +storage_request_failed_counter = Counter( + name="storage_request_failed_counter", + documentation="The failed count of storage requests", + labelnames=["method", "provider"], +) + + +def timeit(func): + @wraps(func) + def decorator(*args, **kwargs): + with storage_request_latency.labels(method=func.__name__, provider=dify_config.STORAGE_TYPE).time(): + storage_request_total_counter.labels(method=func.__name__, provider=dify_config.STORAGE_TYPE).inc() + try: + return func(*args, **kwargs) + except Exception as e: + storage_request_failed_counter.labels(method=func.__name__, provider=dify_config.STORAGE_TYPE).inc() + raise e + + return decorator + class Storage: def init_app(self, app: Flask): @@ -72,6 +107,7 @@ def get_storage_factory(storage_type: str) -> Callable[[], BaseStorage]: case _: raise ValueError(f"unsupported storage type {storage_type}") + @timeit def save(self, filename, data): try: self.storage_runner.save(filename, data) @@ -79,6 +115,7 @@ def save(self, filename, data): logger.exception(f"Failed to save file {filename}") raise e + @timeit def load(self, filename: str, /, *, stream: bool = False) -> Union[bytes, Generator]: try: if stream: @@ -89,6 +126,7 @@ def load(self, filename: str, /, *, stream: bool = False) -> Union[bytes, Genera logger.exception(f"Failed to load file {filename}") raise e + @timeit def load_once(self, filename: str) -> bytes: try: return self.storage_runner.load_once(filename) @@ -96,6 +134,7 @@ def load_once(self, filename: str) -> bytes: logger.exception(f"Failed to load_once file {filename}") raise e + @timeit def load_stream(self, filename: str) -> Generator: try: return self.storage_runner.load_stream(filename) @@ -103,6 +142,7 @@ def load_stream(self, filename: str) -> Generator: logger.exception(f"Failed to load_stream file {filename}") raise e + @timeit def download(self, filename, target_filepath): try: self.storage_runner.download(filename, target_filepath) @@ -110,6 +150,7 @@ def download(self, filename, target_filepath): logger.exception(f"Failed to download file {filename}") raise e + @timeit def exists(self, filename): try: return self.storage_runner.exists(filename) @@ -117,6 +158,7 @@ def exists(self, filename): logger.exception(f"Failed to check file exists {filename}") raise e + @timeit def delete(self, filename): try: return self.storage_runner.delete(filename) diff --git a/api/poetry.lock b/api/poetry.lock index 35fda9b36fa42a..a89b7f8c5da761 100644 --- a/api/poetry.lock +++ b/api/poetry.lock @@ -1993,7 +1993,6 @@ files = [ {file = "cryptography-44.0.0-cp37-abi3-manylinux_2_28_aarch64.whl", hash = "sha256:761817a3377ef15ac23cd7834715081791d4ec77f9297ee694ca1ee9c2c7e5eb"}, {file = "cryptography-44.0.0-cp37-abi3-manylinux_2_28_x86_64.whl", hash = "sha256:3c672a53c0fb4725a29c303be906d3c1fa99c32f58abe008a82705f9ee96f40b"}, {file = "cryptography-44.0.0-cp37-abi3-manylinux_2_34_aarch64.whl", hash = "sha256:4ac4c9f37eba52cb6fbeaf5b59c152ea976726b865bd4cf87883a7e7006cc543"}, - {file = "cryptography-44.0.0-cp37-abi3-manylinux_2_34_x86_64.whl", hash = "sha256:60eb32934076fa07e4316b7b2742fa52cbb190b42c2df2863dbc4230a0a9b385"}, {file = "cryptography-44.0.0-cp37-abi3-musllinux_1_2_aarch64.whl", hash = "sha256:ed3534eb1090483c96178fcb0f8893719d96d5274dfde98aa6add34614e97c8e"}, {file = "cryptography-44.0.0-cp37-abi3-musllinux_1_2_x86_64.whl", hash = "sha256:f3f6fdfa89ee2d9d496e2c087cebef9d4fcbb0ad63c40e821b39f74bf48d9c5e"}, {file = "cryptography-44.0.0-cp37-abi3-win32.whl", hash = "sha256:eb33480f1bad5b78233b0ad3e1b0be21e8ef1da745d8d2aecbb20671658b9053"}, @@ -2004,7 +2003,6 @@ files = [ {file = "cryptography-44.0.0-cp39-abi3-manylinux_2_28_aarch64.whl", hash = "sha256:c5eb858beed7835e5ad1faba59e865109f3e52b3783b9ac21e7e47dc5554e289"}, {file = "cryptography-44.0.0-cp39-abi3-manylinux_2_28_x86_64.whl", hash = "sha256:f53c2c87e0fb4b0c00fa9571082a057e37690a8f12233306161c8f4b819960b7"}, {file = "cryptography-44.0.0-cp39-abi3-manylinux_2_34_aarch64.whl", hash = "sha256:9e6fc8a08e116fb7c7dd1f040074c9d7b51d74a8ea40d4df2fc7aa08b76b9e6c"}, - {file = "cryptography-44.0.0-cp39-abi3-manylinux_2_34_x86_64.whl", hash = "sha256:9abcc2e083cbe8dde89124a47e5e53ec38751f0d7dfd36801008f316a127d7ba"}, {file = "cryptography-44.0.0-cp39-abi3-musllinux_1_2_aarch64.whl", hash = "sha256:d2436114e46b36d00f8b72ff57e598978b37399d2786fd39793c36c6d5cb1c64"}, {file = "cryptography-44.0.0-cp39-abi3-musllinux_1_2_x86_64.whl", hash = "sha256:a01956ddfa0a6790d594f5b34fc1bfa6098aca434696a03cfdbe469b8ed79285"}, {file = "cryptography-44.0.0-cp39-abi3-win32.whl", hash = "sha256:eca27345e1214d1b9f9490d200f9db5a874479be914199194e746c893788d417"}, @@ -6852,6 +6850,20 @@ files = [ [package.extras] dev = ["certifi", "pytest (>=8.1.1)"] +[[package]] +name = "prometheus-client" +version = "0.21.1" +description = "Python client for the Prometheus monitoring system." +optional = false +python-versions = ">=3.8" +files = [ + {file = "prometheus_client-0.21.1-py3-none-any.whl", hash = "sha256:594b45c410d6f4f8888940fe80b5cc2521b305a1fafe1c58609ef715a001f301"}, + {file = "prometheus_client-0.21.1.tar.gz", hash = "sha256:252505a722ac04b0456be05c05f75f45d760c2911ffc45f2a06bcaed9f3ae3fb"}, +] + +[package.extras] +twisted = ["twisted"] + [[package]] name = "prompt-toolkit" version = "3.0.48" @@ -11095,4 +11107,4 @@ cffi = ["cffi (>=1.11)"] [metadata] lock-version = "2.0" python-versions = ">=3.11,<3.13" -content-hash = "14476bf95504a4df4b8d5a5c6608c6aa3dae7499d27d1e41ef39d761cc7c693d" +content-hash = "740e20c643f9bc492cfc5cb8760d4100fdd90af84644027995e7778da416b3bd" diff --git a/api/pyproject.toml b/api/pyproject.toml index da9eabecf55ccf..e6ae9e27203dbb 100644 --- a/api/pyproject.toml +++ b/api/pyproject.toml @@ -60,6 +60,7 @@ oci = "~2.135.1" openai = "~1.52.0" openpyxl = "~3.1.5" pandas = { version = "~2.2.2", extras = ["performance", "excel"] } +prometheus-client = ">=0.5.0,<1.0.0" psycopg2-binary = "~2.9.6" pycryptodome = "3.19.1" pydantic = "~2.9.2"