Skip to content

Commit

Permalink
feat: add prometheus metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
刘江波 committed Dec 20, 2024
1 parent dacd457 commit 3b1afa8
Show file tree
Hide file tree
Showing 17 changed files with 504 additions and 21 deletions.
4 changes: 3 additions & 1 deletion api/.env.example
Original file line number Diff line number Diff line change
Expand Up @@ -427,4 +427,6 @@ CREATE_TIDB_SERVICE_JOB_ENABLED=false
# Maximum number of submitted thread count in a ThreadPool for parallel node execution
MAX_SUBMIT_COUNT=100
# Lockout duration in seconds
LOGIN_LOCKOUT_DURATION=86400
LOGIN_LOCKOUT_DURATION=86400

PROMETHEUS_MULTIPROC_DIR=/tmp/prometheus_multiproc_dir
16 changes: 15 additions & 1 deletion api/configs/feature/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ class SecurityConfig(BaseSettings):
SECRET_KEY: str = Field(
description="Secret key for secure session cookie signing."
"Make sure you are changing this key for your deployment with a strong key."
"Generate a strong key using `openssl rand -base64 42` or set via the `SECRET_KEY` environment variable.",
"Generate a strong key using `openssl rand -base64 42` "
"or set via the `SECRET_KEY` environment variable.",
default="",
)

Expand Down Expand Up @@ -767,6 +768,18 @@ class LoginConfig(BaseSettings):
)


class PrometheusConfig(BaseSettings):
HISTOGRAM_BUCKETS_1MIN: list[float] = Field(
description="The buckets of Prometheus histogram under 1 minute",
default=[0.1, 0.2, 0.5, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 12, 14, 16, 18, 20, 25, 30, 40, 50, 60],
)

HISTOGRAM_BUCKETS_5MIN: list[float] = Field(
description="The buckets of Prometheus histogram under 5 minute",
default=[0.1, 0.2, 0.5, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 12, 14, 16, 18, 20, 25, 30, 40, 50, 60, 120, 180, 300],
)


class FeatureConfig(
# place the configs in alphabet order
AppExecutionConfig,
Expand Down Expand Up @@ -794,6 +807,7 @@ class FeatureConfig(
WorkflowNodeExecutionConfig,
WorkspaceConfig,
LoginConfig,
PrometheusConfig,
# hosted services config
HostedServiceConfig,
CeleryBeatConfig,
Expand Down
11 changes: 9 additions & 2 deletions api/core/app/apps/advanced_chat/generate_task_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -398,7 +398,9 @@ def _process_stream_response(
conversation_id=self._conversation.id,
trace_manager=trace_manager,
)

self._workflow_time_it(
is_success=True, graph_runtime_state=graph_runtime_state, workflow_run=workflow_run
)
yield self._workflow_finish_to_stream_response(
task_id=self._application_generate_entity.task_id, workflow_run=workflow_run
)
Expand All @@ -421,6 +423,9 @@ def _process_stream_response(
conversation_id=None,
trace_manager=trace_manager,
)
self._workflow_time_it(
is_success=False, graph_runtime_state=graph_runtime_state, workflow_run=workflow_run
)

yield self._workflow_finish_to_stream_response(
task_id=self._application_generate_entity.task_id, workflow_run=workflow_run
Expand All @@ -445,7 +450,9 @@ def _process_stream_response(
trace_manager=trace_manager,
exceptions_count=event.exceptions_count,
)

self._workflow_time_it(
is_success=False, graph_runtime_state=graph_runtime_state, workflow_run=workflow_run
)
yield self._workflow_finish_to_stream_response(
task_id=self._application_generate_entity.task_id, workflow_run=workflow_run
)
Expand Down
6 changes: 6 additions & 0 deletions api/core/app/apps/workflow/generate_task_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,9 @@ def _process_stream_response(
conversation_id=None,
trace_manager=trace_manager,
)
self._workflow_time_it(
is_success=True, graph_runtime_state=graph_runtime_state, workflow_run=workflow_run
)

# save workflow app log
self._save_workflow_app_log(workflow_run)
Expand All @@ -381,6 +384,9 @@ def _process_stream_response(
conversation_id=None,
trace_manager=trace_manager,
)
self._workflow_time_it(
is_success=False, graph_runtime_state=graph_runtime_state, workflow_run=workflow_run
)

# save workflow app log
self._save_workflow_app_log(workflow_run)
Expand Down
36 changes: 36 additions & 0 deletions api/core/app/task_pipeline/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
from prometheus_client import Counter, Histogram

from configs import dify_config

app_request = Counter(
name="app_request",
documentation="The total count of APP requests",
labelnames=["app_id", "tenant_id", "username"],
)
app_request_failed = Counter(
name="app_request_failed",
documentation="The failed count of APP requests",
labelnames=["app_id", "tenant_id", "username"],
)
app_request_latency = Histogram(
name="app_request_latency",
documentation="The latency of APP requests",
unit="seconds",
labelnames=["app_id", "tenant_id", "username"],
buckets=dify_config.HISTOGRAM_BUCKETS_5MIN,
)
app_input_tokens = Counter(
name="app_input_tokens",
documentation="The input tokens cost by APP requests",
labelnames=["app_id", "tenant_id", "username"],
)
app_output_tokens = Counter(
name="app_output_tokens",
documentation="The output tokens cost by APP requests",
labelnames=["app_id", "tenant_id", "username"],
)
app_total_tokens = Counter(
name="app_total_tokens",
documentation="The total tokens cost by APP requests",
labelnames=["app_id", "tenant_id", "username"],
)
51 changes: 51 additions & 0 deletions api/core/app/task_pipeline/easy_ui_based_generate_task_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,14 @@
MessageEndStreamResponse,
StreamResponse,
)
from core.app.task_pipeline import (
app_input_tokens,
app_output_tokens,
app_request,
app_request_failed,
app_request_latency,
app_total_tokens,
)
from core.app.task_pipeline.based_generate_task_pipeline import BasedGenerateTaskPipeline
from core.app.task_pipeline.message_cycle_manage import MessageCycleManage
from core.model_manager import ModelInstance
Expand Down Expand Up @@ -251,6 +259,47 @@ def _wrapper_process_stream_response(
if publisher:
yield MessageAudioEndStreamResponse(audio="", task_id=task_id)

def _chat_time_it(self, is_success: bool) -> None:
"""
Record chat / completion / agent run metrics.
"""
app_id = self._app_config.app_id
tenant_id = self._app_config.tenant_id
username = self._conversation.from_account_name
app_request.labels(
app_id=app_id,
tenant_id=tenant_id,
username=username,
).inc()

if not is_success:
app_request_failed.labels(
app_id=app_id,
tenant_id=tenant_id,
username=username,
).inc()
return
app_request_latency.labels(
app_id=app_id,
tenant_id=tenant_id,
username=username,
).observe(self._message.provider_response_latency)
app_input_tokens.labels(
app_id=app_id,
tenant_id=tenant_id,
username=username,
).inc(self._message.message_tokens)
app_output_tokens.labels(
app_id=app_id,
tenant_id=tenant_id,
username=username,
).inc(self._message.answer_tokens)
app_total_tokens.labels(
app_id=app_id,
tenant_id=tenant_id,
username=username,
).inc(self._message.message_tokens + self._message.answer_tokens)

def _process_stream_response(
self, publisher: AppGeneratorTTSPublisher, trace_manager: Optional[TraceQueueManager] = None
) -> Generator[StreamResponse, None, None]:
Expand All @@ -265,6 +314,7 @@ def _process_stream_response(

if isinstance(event, QueueErrorEvent):
err = self._handle_error(event, self._message)
self._chat_time_it(is_success=False)
yield self._error_to_stream_response(err)
break
elif isinstance(event, QueueStopEvent | QueueMessageEndEvent):
Expand All @@ -283,6 +333,7 @@ def _process_stream_response(

# Save message
self._save_message(trace_manager)
self._chat_time_it(is_success=True)

yield self._message_end_to_stream_response()
elif isinstance(event, QueueRetrieverResourcesEvent):
Expand Down
36 changes: 36 additions & 0 deletions api/core/app/task_pipeline/workflow_cycle_manage.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,22 @@
WorkflowStartStreamResponse,
WorkflowTaskState,
)
from core.app.task_pipeline import (
app_input_tokens,
app_output_tokens,
app_request,
app_request_failed,
app_request_latency,
app_total_tokens,
)
from core.file import FILE_MODEL_IDENTITY, File
from core.model_runtime.utils.encoders import jsonable_encoder
from core.ops.entities.trace_entity import TraceTaskName
from core.ops.ops_trace_manager import TraceQueueManager, TraceTask
from core.tools.tool_manager import ToolManager
from core.workflow.entities.node_entities import NodeRunMetadataKey
from core.workflow.enums import SystemVariableKey
from core.workflow.graph_engine import GraphRuntimeState
from core.workflow.nodes import NodeType
from core.workflow.nodes.tool.entities import ToolNodeData
from core.workflow.workflow_entry import WorkflowEntry
Expand Down Expand Up @@ -119,6 +128,33 @@ def _handle_workflow_run_start(self) -> WorkflowRun:

return workflow_run

def _workflow_time_it(
self, is_success: bool, graph_runtime_state: GraphRuntimeState, workflow_run: WorkflowRun
) -> None:
"""
Record advanced-chat / workflow run metrics.
"""
app_id = workflow_run.app_id
tenant_id = workflow_run.tenant_id
username = self._user.name
app_request.labels(app_id=app_id, tenant_id=tenant_id, username=username).inc()

if not is_success:
app_request_failed.labels(app_id=app_id, tenant_id=tenant_id, username=username).inc()
return
app_request_latency.labels(app_id=app_id, tenant_id=tenant_id, username=username).observe(
workflow_run.elapsed_time
)
app_input_tokens.labels(app_id=app_id, tenant_id=tenant_id, username=username).inc(
graph_runtime_state.llm_usage.prompt_tokens
)
app_output_tokens.labels(app_id=app_id, tenant_id=tenant_id, username=username).inc(
graph_runtime_state.llm_usage.completion_tokens
)
app_total_tokens.labels(app_id=app_id, tenant_id=tenant_id, username=username).inc(
graph_runtime_state.llm_usage.total_tokens
)

def _handle_workflow_run_success(
self,
workflow_run: WorkflowRun,
Expand Down
49 changes: 47 additions & 2 deletions api/core/model_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
from collections.abc import Callable, Generator, Iterable, Sequence
from typing import IO, Any, Optional, Union, cast

from prometheus_client import Counter, Histogram

from configs import dify_config
from core.entities.embedding_type import EmbeddingInputType
from core.entities.provider_configuration import ProviderConfiguration, ProviderModelBundle
Expand All @@ -26,6 +28,25 @@

logger = logging.getLogger(__name__)

model_request_total_counter = Counter(
name="model_request_total_counter",
documentation="The total count of model requests",
labelnames=["model_type", "provider", "model", "method"],
)
model_request_failed_counter = Counter(
name="model_request_failed_counter",
documentation="The failed count of model requests",
labelnames=["model_type", "provider", "model", "method"],
)
model_request_latency = Histogram(
name="model_request_latency",
documentation="The latency of model requests. For the LLM model, it just indicate "
"the TTFT (a.k.a. Time To First Token).",
unit="seconds",
labelnames=["model_type", "provider", "model", "method"],
buckets=dify_config.HISTOGRAM_BUCKETS_1MIN,
)


class ModelInstance:
"""
Expand Down Expand Up @@ -298,6 +319,30 @@ def invoke_tts(self, content_text: str, tenant_id: str, voice: str, user: Option
voice=voice,
)

def _invoke_with_timeit(self, function: Callable[..., Any], *args, **kwargs):
with model_request_latency.labels(
model_type=self.model_type_instance.model_type.value,
provider=self.provider,
model=self.model,
method=function.__name__ if hasattr(function, "__name__") else "unknown",
).time():
model_request_total_counter.labels(
model_type=self.model_type_instance.model_type.value,
provider=self.provider,
model=self.model,
method=function.__name__ if hasattr(function, "__name__") else "unknown",
).inc()
try:
return function(*args, **kwargs)
except Exception as e:
model_request_failed_counter.labels(
model_type=self.model_type_instance.model_type.value,
provider=self.provider,
model=self.model,
method=function.__name__ if hasattr(function, "__name__") else "unknown",
).inc()
raise e

def _round_robin_invoke(self, function: Callable[..., Any], *args, **kwargs):
"""
Round-robin invoke
Expand All @@ -307,7 +352,7 @@ def _round_robin_invoke(self, function: Callable[..., Any], *args, **kwargs):
:return:
"""
if not self.load_balancing_manager:
return function(*args, **kwargs)
return self._invoke_with_timeit(function, *args, **kwargs)

last_exception = None
while True:
Expand All @@ -321,7 +366,7 @@ def _round_robin_invoke(self, function: Callable[..., Any], *args, **kwargs):
try:
if "credentials" in kwargs:
del kwargs["credentials"]
return function(*args, **kwargs, credentials=lb_config.credentials)
return self._invoke_with_timeit(function, *args, **kwargs, credentials=lb_config.credentials)
except InvokeRateLimitError as e:
# expire in 60 seconds
self.load_balancing_manager.cooldown(lb_config, expire=60)
Expand Down
Loading

0 comments on commit 3b1afa8

Please sign in to comment.