Skip to content

Commit

Permalink
fix: return if the context was cancelled while polling for subscripti…
Browse files Browse the repository at this point in the history
…on items (#351)
  • Loading branch information
pratik151192 authored Sep 1, 2023
1 parent 561f045 commit f62abd0
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 1 deletion.
36 changes: 36 additions & 0 deletions momento/topic_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package momento_test

import (
"context"
"fmt"
"time"

"github.com/google/uuid"
Expand Down Expand Up @@ -101,6 +102,41 @@ var _ = Describe("Pubsub", func() {
Expect(receivedValues).To(Equal(publishedValues))
})

It("Cancels the context immediataly after subscribing and asserts as such", func() {

sub, _ := sharedContext.TopicClient.Subscribe(sharedContext.Ctx, &TopicSubscribeRequest{
CacheName: sharedContext.CacheName,
TopicName: sharedContext.CollectionName,
})

// Create a new context with a timeout
ctx, cancel := context.WithTimeout(context.Background(), time.Second*1)
defer cancel()

done := make(chan bool)

// Run Item function in a goroutine
go func() {
_, err := sub.Item(ctx)
if err == nil {
fmt.Println("Expected an error due to context cancellation, got nil")
}
close(done)
}()

// immediately cancel the context
cancel()

// Wait for either the Item function to return or the test to timeout
select {
case <-done:
// Test passed
case <-time.After(time.Second * 2):
Fail("Test timed out, likely due to infinite loop in Item function")
}

})

It("returns an error when trying to publish a nil topic value", func() {
Expect(
sharedContext.TopicClient.Publish(sharedContext.Ctx, &TopicPublishRequest{
Expand Down
23 changes: 22 additions & 1 deletion momento/topic_subscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,31 @@ type topicSubscription struct {

func (s *topicSubscription) Item(ctx context.Context) (TopicValue, error) {
for {
// Its totally possible a client just calls `cancel` on the `context` immediately after subscribing to an
// item, so we should check that here.
select {
case <-ctx.Done():
// Context has been canceled, return an error
return nil, ctx.Err()
default:
// Proceed as is
}

rawMsg := new(pb.XSubscriptionItem)
if err := s.grpcClient.RecvMsg(rawMsg); err != nil {
s.log.Error("stream disconnected, attempting to reconnect err:", fmt.Sprint(err))
s.attemptReconnect(ctx)

// Check if the context has been canceled before attempting to reconnect as the client
// might have given up on the context
select {
case <-ctx.Done():
// Context has been canceled, return an error
return nil, ctx.Err()
default:
// Attempt to reconnect
s.attemptReconnect(ctx)
}

// retry getting the latest item
continue
}
Expand Down

0 comments on commit f62abd0

Please sign in to comment.