Skip to content

Commit

Permalink
fix: return if the context was cancelled while polling for subscripti…
Browse files Browse the repository at this point in the history
…on items
  • Loading branch information
pratik151192 committed Sep 1, 2023
1 parent 561f045 commit d032012
Showing 1 changed file with 22 additions and 1 deletion.
23 changes: 22 additions & 1 deletion momento/topic_subscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,31 @@ type topicSubscription struct {

func (s *topicSubscription) Item(ctx context.Context) (TopicValue, error) {
for {
// Its totally possible a client just calls `cancel` on the `context` immediately after subscribing to an
// item, so we should check that here.
select {
case <-ctx.Done():
// Context has been canceled, return an error
return nil, ctx.Err()
default:
// Proceed as is
}

rawMsg := new(pb.XSubscriptionItem)
if err := s.grpcClient.RecvMsg(rawMsg); err != nil {
s.log.Error("stream disconnected, attempting to reconnect err:", fmt.Sprint(err))
s.attemptReconnect(ctx)

// Check if the context has been canceled before attempting to reconnect as the client
// might have given up on the context
select {
case <-ctx.Done():
// Context has been canceled, return an error
return nil, ctx.Err()
default:
// Attempt to reconnect
s.attemptReconnect(ctx)
}

// retry getting the latest item
continue
}
Expand Down

0 comments on commit d032012

Please sign in to comment.