From f748e6ae87e061b2fd300e26418e93ed895362a5 Mon Sep 17 00:00:00 2001 From: Yashash H L Date: Tue, 9 Jan 2024 11:36:40 +0530 Subject: [PATCH] fix: memory leak inside session reducer (#95) Signed-off-by: Yashash H L Signed-off-by: a3hadi --- pkg/sessionreducer/task_manager.go | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/pkg/sessionreducer/task_manager.go b/pkg/sessionreducer/task_manager.go index 2abd58c6..e40df8b6 100644 --- a/pkg/sessionreducer/task_manager.go +++ b/pkg/sessionreducer/task_manager.go @@ -89,8 +89,7 @@ func (rtm *sessionReduceTaskManager) CreateTask(ctx context.Context, request *v1 } // add the task to the tasks list - key := task.uniqueKey() - rtm.tasks[key] = task + rtm.tasks[task.uniqueKey()] = task rtm.rw.Unlock() @@ -120,7 +119,7 @@ func (rtm *sessionReduceTaskManager) CreateTask(ctx context.Context, request *v1 close(task.doneCh) // delete the task from the tasks list rtm.rw.Lock() - delete(rtm.tasks, key) + delete(rtm.tasks, task.uniqueKey()) rtm.rw.Unlock() }() @@ -295,11 +294,11 @@ func (rtm *sessionReduceTaskManager) WaitAll() { close(rtm.responseCh) } -func generateKey(keyedWindows *v1.KeyedWindow) string { +func generateKey(keyedWindow *v1.KeyedWindow) string { return fmt.Sprintf("%d:%d:%s", - keyedWindows.GetStart().AsTime().UnixMilli(), - keyedWindows.GetEnd().AsTime().UnixMilli(), - strings.Join(keyedWindows.GetKeys(), delimiter)) + keyedWindow.GetStart().AsTime().UnixMilli(), + keyedWindow.GetEnd().AsTime().UnixMilli(), + strings.Join(keyedWindow.GetKeys(), delimiter)) } func buildDatum(payload *v1.SessionReduceRequest_Payload) Datum {