-
Notifications
You must be signed in to change notification settings - Fork 59
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix "zombie" http output websocket connections #125
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|
|
@@ -389,57 +389,112 @@ func (h *httpServerOutput) streamHandler(w http.ResponseWriter, r *http.Request) | |||||||||||
} | ||||||||||||
|
||||||||||||
func (h *httpServerOutput) wsHandler(w http.ResponseWriter, r *http.Request) { | ||||||||||||
var err error | ||||||||||||
defer func() { | ||||||||||||
if err != nil { | ||||||||||||
http.Error(w, "Bad request", http.StatusBadRequest) | ||||||||||||
h.log.Warn("Websocket request failed: %v\n", err) | ||||||||||||
return | ||||||||||||
} | ||||||||||||
}() | ||||||||||||
|
||||||||||||
upgrader := websocket.Upgrader{} | ||||||||||||
|
||||||||||||
var ws *websocket.Conn | ||||||||||||
if ws, err = upgrader.Upgrade(w, r, nil); err != nil { | ||||||||||||
return | ||||||||||||
} | ||||||||||||
defer ws.Close() | ||||||||||||
|
||||||||||||
ctx, done := h.shutSig.SoftStopCtx(r.Context()) | ||||||||||||
defer done() | ||||||||||||
|
||||||||||||
for !h.shutSig.IsSoftStopSignalled() { | ||||||||||||
var ts message.Transaction | ||||||||||||
var open bool | ||||||||||||
|
||||||||||||
select { | ||||||||||||
case ts, open = <-h.transactions: | ||||||||||||
if !open { | ||||||||||||
go h.TriggerCloseNow() | ||||||||||||
return | ||||||||||||
} | ||||||||||||
case <-r.Context().Done(): | ||||||||||||
return | ||||||||||||
case <-h.shutSig.SoftStopChan(): | ||||||||||||
return | ||||||||||||
} | ||||||||||||
|
||||||||||||
var werr error | ||||||||||||
for _, msg := range message.GetAllBytes(ts.Payload) { | ||||||||||||
if werr = ws.WriteMessage(websocket.BinaryMessage, msg); werr != nil { | ||||||||||||
break | ||||||||||||
} | ||||||||||||
h.mWSBatchSent.Incr(1) | ||||||||||||
h.mWSSent.Incr(int64(batch.MessageCollapsedCount(ts.Payload))) | ||||||||||||
} | ||||||||||||
if werr != nil { | ||||||||||||
h.mWSError.Incr(1) | ||||||||||||
} | ||||||||||||
_ = ts.Ack(ctx, werr) | ||||||||||||
} | ||||||||||||
var err error | ||||||||||||
defer func() { | ||||||||||||
if err != nil { | ||||||||||||
http.Error(w, "Bad request", http.StatusBadRequest) | ||||||||||||
h.log.Warn("WebSocket request failed: %v", err) | ||||||||||||
return | ||||||||||||
} | ||||||||||||
}() | ||||||||||||
|
||||||||||||
upgrader := websocket.Upgrader{} | ||||||||||||
|
||||||||||||
// Upgrade the HTTP connection to a WebSocket connection | ||||||||||||
ws, err := upgrader.Upgrade(w, r, nil) | ||||||||||||
if err != nil { | ||||||||||||
h.log.Warn("WebSocket upgrade failed: %v", err) | ||||||||||||
return | ||||||||||||
} | ||||||||||||
defer ws.Close() | ||||||||||||
|
||||||||||||
// Set up ping/pong handlers and deadlines | ||||||||||||
const ( | ||||||||||||
writeWait = 10 * time.Second | ||||||||||||
pongWait = 60 * time.Second | ||||||||||||
pingPeriod = (pongWait * 9) / 10 | ||||||||||||
) | ||||||||||||
|
||||||||||||
ws.SetReadLimit(512) | ||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. On the question, on why we need both ping/pong and Read goroutine:
|
||||||||||||
if err := ws.SetReadDeadline(time.Now().Add(pongWait)); err != nil { | ||||||||||||
h.log.Warn("Failed to set read deadline: %v", err) | ||||||||||||
return | ||||||||||||
} | ||||||||||||
ws.SetPongHandler(func(string) error { | ||||||||||||
return ws.SetReadDeadline(time.Now().Add(pongWait)) | ||||||||||||
}) | ||||||||||||
Comment on lines
+419
to
+425
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is it necessary to set the read deadline before creating and inside the |
||||||||||||
|
||||||||||||
// Start a goroutine to read messages (to process control frames) | ||||||||||||
done := make(chan struct{}) | ||||||||||||
go func() { | ||||||||||||
defer close(done) | ||||||||||||
for { | ||||||||||||
_, _, err := ws.ReadMessage() | ||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not 100% clued up on websockets so excuse the ignorance but will doing a I'm concerned about a race condition where the payload we send using the below |
||||||||||||
if err != nil { | ||||||||||||
if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) { | ||||||||||||
h.log.Warn("WebSocket read error: %v", err) | ||||||||||||
} | ||||||||||||
break | ||||||||||||
} | ||||||||||||
} | ||||||||||||
}() | ||||||||||||
|
||||||||||||
// Start ticker to send ping messages to the client periodically | ||||||||||||
ticker := time.NewTicker(pingPeriod) | ||||||||||||
defer ticker.Stop() | ||||||||||||
|
||||||||||||
ctx, doneCtx := h.shutSig.SoftStopCtx(r.Context()) | ||||||||||||
defer doneCtx() | ||||||||||||
|
||||||||||||
for !h.shutSig.IsSoftStopSignalled() { | ||||||||||||
select { | ||||||||||||
case ts, open := <-h.transactions: | ||||||||||||
if !open { | ||||||||||||
// If the transactions channel is closed, trigger server shutdown | ||||||||||||
go h.TriggerCloseNow() | ||||||||||||
return | ||||||||||||
} | ||||||||||||
// Write messages to the client | ||||||||||||
var writeErr error | ||||||||||||
for _, msg := range message.GetAllBytes(ts.Payload) { | ||||||||||||
if err := ws.SetWriteDeadline(time.Now().Add(writeWait)); err != nil { | ||||||||||||
writeErr = err | ||||||||||||
break | ||||||||||||
} | ||||||||||||
Comment on lines
+460
to
+463
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same as below comment:
Suggested change
|
||||||||||||
if writeErr = ws.WriteMessage(websocket.BinaryMessage, msg); writeErr != nil { | ||||||||||||
break | ||||||||||||
} | ||||||||||||
h.mWSBatchSent.Incr(1) | ||||||||||||
h.mWSSent.Incr(int64(batch.MessageCollapsedCount(ts.Payload))) | ||||||||||||
} | ||||||||||||
if writeErr != nil { | ||||||||||||
h.mWSError.Incr(1) | ||||||||||||
_ = ts.Ack(ctx, writeErr) | ||||||||||||
return // Exit the loop on write error | ||||||||||||
} | ||||||||||||
_ = ts.Ack(ctx, nil) | ||||||||||||
case <-ticker.C: | ||||||||||||
// Send a ping message to the client | ||||||||||||
if err := ws.SetWriteDeadline(time.Now().Add(writeWait)); err != nil { | ||||||||||||
h.log.Warn("Failed to set write deadline for ping: %v", err) | ||||||||||||
return | ||||||||||||
} | ||||||||||||
Comment on lines
+478
to
+481
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This
Suggested change
|
||||||||||||
if err := ws.WriteMessage(websocket.PingMessage, nil); err != nil { | ||||||||||||
h.log.Warn("WebSocket ping error: %v", err) | ||||||||||||
return | ||||||||||||
} | ||||||||||||
case <-done: | ||||||||||||
// The read goroutine has exited, indicating the client has disconnected | ||||||||||||
h.log.Debug("WebSocket client disconnected") | ||||||||||||
return | ||||||||||||
case <-ctx.Done(): | ||||||||||||
// The context has been canceled (e.g., server is shutting down) | ||||||||||||
return | ||||||||||||
} | ||||||||||||
} | ||||||||||||
} | ||||||||||||
|
||||||||||||
|
||||||||||||
func (h *httpServerOutput) Consume(ts <-chan message.Transaction) error { | ||||||||||||
if h.transactions != nil { | ||||||||||||
return component.ErrAlreadyStarted | ||||||||||||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Those who would read it may find it strange, so here is why:
By setting
pingPeriod
to 90% ofpongWait
(i.e.,(pongWait * 9) / 10
), we ensure that:The server sends a ping before the
pongWait
deadline:pongWait
is 60 seconds,pingPeriod
becomes 54 seconds.There's a buffer period for the client to respond:
pongWait
) to respond with a pong before the read deadline (pongWait
) expires.Avoiding premature timeouts:
pingPeriod
were equal topongWait
, the read deadline might expire before the client has a chance to respond to the ping.pingPeriod
slightly less thanpongWait
, we avoid this race condition.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This all makes sense! A small nit would be we should consider adding these as configuration variables with the
advanced()
flag. Think these are sensible defaults in any-case.