Skip to content

Commit

Permalink
fix: memory leak inside session reducer (#95)
Browse files Browse the repository at this point in the history
Signed-off-by: Yashash H L <[email protected]>
  • Loading branch information
yhl25 authored Jan 9, 2024
1 parent 133ef26 commit 5424b35
Showing 1 changed file with 6 additions and 7 deletions.
13 changes: 6 additions & 7 deletions pkg/sessionreducer/task_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,7 @@ func (rtm *sessionReduceTaskManager) CreateTask(ctx context.Context, request *v1
}

// add the task to the tasks list
key := task.uniqueKey()
rtm.tasks[key] = task
rtm.tasks[task.uniqueKey()] = task

rtm.rw.Unlock()

Expand Down Expand Up @@ -120,7 +119,7 @@ func (rtm *sessionReduceTaskManager) CreateTask(ctx context.Context, request *v1
close(task.doneCh)
// delete the task from the tasks list
rtm.rw.Lock()
delete(rtm.tasks, key)
delete(rtm.tasks, task.uniqueKey())
rtm.rw.Unlock()
}()

Expand Down Expand Up @@ -295,11 +294,11 @@ func (rtm *sessionReduceTaskManager) WaitAll() {
close(rtm.responseCh)
}

func generateKey(keyedWindows *v1.KeyedWindow) string {
func generateKey(keyedWindow *v1.KeyedWindow) string {
return fmt.Sprintf("%d:%d:%s",
keyedWindows.GetStart().AsTime().UnixMilli(),
keyedWindows.GetEnd().AsTime().UnixMilli(),
strings.Join(keyedWindows.GetKeys(), delimiter))
keyedWindow.GetStart().AsTime().UnixMilli(),
keyedWindow.GetEnd().AsTime().UnixMilli(),
strings.Join(keyedWindow.GetKeys(), delimiter))
}

func buildDatum(payload *v1.SessionReduceRequest_Payload) Datum {
Expand Down

0 comments on commit 5424b35

Please sign in to comment.