Skip to content

Commit

Permalink
Feateure/metrics tasks (#248)
Browse files Browse the repository at this point in the history
* feat: Add celery settings

* feat: fix celery settings

* feat: update lock file

* feat: add close metrics task

* feat: add custom queue to run the close metrics task

* feat: change default value of metrics custom queue to celery

* feat: make it a choice to run the tasks in celery or synchronous

* feat: remove apm from docker compose

* feat: add debug task
  • Loading branch information
helllllllder authored Jul 26, 2023
1 parent ef7ae13 commit 086d8fb
Show file tree
Hide file tree
Showing 9 changed files with 331 additions and 48 deletions.
5 changes: 5 additions & 0 deletions chats/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
# This will make sure the app is always imported when
# Django starts so that shared_task will use this app.
from .celery import app as celery_app

__all__ = ("celery_app",)
6 changes: 6 additions & 0 deletions chats/apps/api/v1/external/rooms/viewsets.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import json

from django.conf import settings
from django.core.exceptions import ObjectDoesNotExist, ValidationError
from django.db import IntegrityError
from django.utils import timezone
Expand All @@ -15,6 +16,7 @@
from chats.apps.dashboard.models import RoomMetrics
from chats.apps.rooms.models import Room
from chats.apps.rooms.views import (
close_room,
get_editable_custom_fields_room,
update_custom_fields,
update_flows_custom_fields,
Expand Down Expand Up @@ -67,6 +69,10 @@ def close(
instance.close(None, "agent")
serialized_data = RoomFlowSerializer(instance=instance)
instance.notify_queue("close")
if not settings.ACTIVATE_CALC_METRICS:
return Response(serialized_data.data, status=status.HTTP_200_OK)

close_room(str(instance.pk))
return Response(serialized_data.data, status=status.HTTP_200_OK)

def create(self, request, *args, **kwargs):
Expand Down
41 changes: 2 additions & 39 deletions chats/apps/api/v1/rooms/viewsets.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,9 @@
TransferRoomSerializer,
)
from chats.apps.dashboard.models import RoomMetrics
from chats.apps.msgs.models import Message
from chats.apps.rooms.models import Room
from chats.apps.rooms.views import (
close_room,
get_editable_custom_fields_room,
update_custom_fields,
update_flows_custom_fields,
Expand Down Expand Up @@ -161,44 +161,7 @@ def close(
if not settings.ACTIVATE_CALC_METRICS:
return Response(serialized_data.data, status=status.HTTP_200_OK)

messages_contact = (
Message.objects.filter(room=instance, contact__isnull=False)
.order_by("created_on")
.first()
)
messages_agent = (
Message.objects.filter(room=instance, user__isnull=False)
.order_by("created_on")
.first()
)

time_message_contact = 0
time_message_agent = 0

if messages_agent and messages_contact:
time_message_agent = messages_agent.created_on.timestamp()
time_message_contact = messages_contact.created_on.timestamp()
else:
time_message_agent = 0
time_message_contact = 0

difference_time = time_message_agent - time_message_contact

interation_time = (
Room.objects.filter(pk=instance.pk)
.aggregate(
avg_time=Sum(
F("ended_at") - F("created_on"),
)
)["avg_time"]
.total_seconds()
)

metric_room = RoomMetrics.objects.get_or_create(room=instance)[0]
metric_room.message_response_time = difference_time
metric_room.interaction_time = interation_time
metric_room.save()

close_room(str(instance.pk))
return Response(serialized_data.data, status=status.HTTP_200_OK)

def perform_create(self, serializer):
Expand Down
36 changes: 36 additions & 0 deletions chats/apps/dashboard/tasks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
from chats.apps.dashboard.models import RoomMetrics
from chats.apps.rooms.models import Room
from chats.celery import app


def generate_metrics(room: str):
room = Room.objects.get(pk=room)
messages_contact = (
room.messages.filter(contact__isnull=False).order_by("created_on").first()
)
messages_agent = (
room.messages.filter(user__isnull=False).order_by("created_on").first()
)

time_message_contact = 0
time_message_agent = 0

if messages_agent and messages_contact:
time_message_agent = messages_agent.created_on.timestamp()
time_message_contact = messages_contact.created_on.timestamp()
else:
time_message_agent = 0
time_message_contact = 0

difference_time = time_message_agent - time_message_contact
interaction_time = room.ended_at - room.created_on

metric_room = RoomMetrics.objects.get_or_create(room=room)[0]
metric_room.message_response_time = difference_time
metric_room.interaction_time = interaction_time.total_seconds()
metric_room.save()


@app.task(name="close_metrics")
def close_metrics(room: str):
generate_metrics(room)
12 changes: 12 additions & 0 deletions chats/apps/rooms/views.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,22 @@
from rest_framework import status
from rest_framework.exceptions import APIException
from django.conf import settings
from django.db import transaction

from chats.apps.dashboard.tasks import close_metrics, generate_metrics
from chats.apps.api.v1.internal.rest_clients.flows_rest_client import FlowRESTClient
from chats.apps.rooms.models import Room


def close_room(room_pk: str):
if settings.USE_CELERY:
transaction.on_commit(
lambda: close_metrics.apply_async(
args=[room_pk], queue=settings.METRICS_CUSTOM_QUEUE
)
)
generate_metrics(room_pk)

def update_custom_fields(room: Room, custom_fields_update: dict):
room.custom_fields.update(custom_fields_update)
room.save()
Expand Down
22 changes: 22 additions & 0 deletions chats/celery.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
import os

from celery import Celery

# Set the default Django settings module for the "celery" program.
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "chats.settings")

app = Celery("chats")

# Using a string here means the worker doesn't have to serialize
# the configuration object to child processes.
# - namespace="CELERY" means all celery-related configuration keys
# should have a `CELERY_` prefix.
app.config_from_object("django.conf:settings", namespace="CELERY")

# Load task modules from all registered Django apps.
app.autodiscover_tasks()


@app.task(bind=True)
def debug_task(self):
print("Request: {0!r}".format(self.request))
25 changes: 20 additions & 5 deletions chats/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@
"rest_framework",
"rest_framework.authtoken",
"corsheaders",
"django_celery_beat",
"django_celery_results",
]

MIDDLEWARE = [
Expand Down Expand Up @@ -105,24 +107,25 @@

WSGI_APPLICATION = "chats.wsgi.application"

# channels
ASGI_APPLICATION = "chats.asgi.application"

REDIS_URL = env.str("CHANNEL_LAYERS_REDIS", default="redis://localhost:6379/1")

# channels

CHANNEL_LAYERS = {
"default": {
"BACKEND": "channels_redis.pubsub.RedisPubSubChannelLayer",
"CONFIG": {
"hosts": [
env.str("CHANNEL_LAYERS_REDIS", default="redis://127.0.0.1:6379/1")
],
"hosts": [REDIS_URL],
},
},
}

CACHES = {
"default": {
"BACKEND": "django_redis.cache.RedisCache",
"LOCATION": env.str("CHANNEL_LAYERS_REDIS", default="redis://127.0.0.1:6379/1"),
"LOCATION": REDIS_URL,
"OPTIONS": {"CLIENT_CLASS": "django_redis.client.DefaultClient"},
}
}
Expand Down Expand Up @@ -353,3 +356,15 @@

CHATS_FLOWS_TAG = env.str("CHATS_FLOWS_TAG", default="chats")
CHATS_CACHE_TIME = env.int("CHATS_CACHE_TIME", default=1 * 60 * 60)

# Celery

METRICS_CUSTOM_QUEUE = env("METRICS_CUSTOM_QUEUE", default="celery")

USE_CELERY = env.bool("USE_CELERY", default=False)
CELERY_BROKER_URL = env.str("CELERY_BROKER_URL", default=REDIS_URL)
CELERY_RESULT_BACKEND = env.str("CELERY_RESULT_BACKEND", default="django-db")
CELERY_ACCEPT_CONTENT = ["application/json"]
CELERY_TASK_SERIALIZER = "json"
CELERY_RESULT_SERIALIZER = "json"
CELERY_TIMEZONE = TIME_ZONE
Loading

0 comments on commit 086d8fb

Please sign in to comment.