From f62abd00849b686cca41a1850242be5e41c11c1b Mon Sep 17 00:00:00 2001 From: pratik151192 Date: Fri, 1 Sep 2023 14:19:30 -0300 Subject: [PATCH] fix: return if the context was cancelled while polling for subscription items (#351) --- momento/topic_client_test.go | 36 +++++++++++++++++++++++++++++++++++ momento/topic_subscription.go | 23 +++++++++++++++++++++- 2 files changed, 58 insertions(+), 1 deletion(-) diff --git a/momento/topic_client_test.go b/momento/topic_client_test.go index 0737f6c6..e543e265 100644 --- a/momento/topic_client_test.go +++ b/momento/topic_client_test.go @@ -2,6 +2,7 @@ package momento_test import ( "context" + "fmt" "time" "github.com/google/uuid" @@ -101,6 +102,41 @@ var _ = Describe("Pubsub", func() { Expect(receivedValues).To(Equal(publishedValues)) }) + It("Cancels the context immediataly after subscribing and asserts as such", func() { + + sub, _ := sharedContext.TopicClient.Subscribe(sharedContext.Ctx, &TopicSubscribeRequest{ + CacheName: sharedContext.CacheName, + TopicName: sharedContext.CollectionName, + }) + + // Create a new context with a timeout + ctx, cancel := context.WithTimeout(context.Background(), time.Second*1) + defer cancel() + + done := make(chan bool) + + // Run Item function in a goroutine + go func() { + _, err := sub.Item(ctx) + if err == nil { + fmt.Println("Expected an error due to context cancellation, got nil") + } + close(done) + }() + + // immediately cancel the context + cancel() + + // Wait for either the Item function to return or the test to timeout + select { + case <-done: + // Test passed + case <-time.After(time.Second * 2): + Fail("Test timed out, likely due to infinite loop in Item function") + } + + }) + It("returns an error when trying to publish a nil topic value", func() { Expect( sharedContext.TopicClient.Publish(sharedContext.Ctx, &TopicPublishRequest{ diff --git a/momento/topic_subscription.go b/momento/topic_subscription.go index 4d146093..52a97978 100644 --- a/momento/topic_subscription.go +++ b/momento/topic_subscription.go @@ -29,10 +29,31 @@ type topicSubscription struct { func (s *topicSubscription) Item(ctx context.Context) (TopicValue, error) { for { + // Its totally possible a client just calls `cancel` on the `context` immediately after subscribing to an + // item, so we should check that here. + select { + case <-ctx.Done(): + // Context has been canceled, return an error + return nil, ctx.Err() + default: + // Proceed as is + } + rawMsg := new(pb.XSubscriptionItem) if err := s.grpcClient.RecvMsg(rawMsg); err != nil { s.log.Error("stream disconnected, attempting to reconnect err:", fmt.Sprint(err)) - s.attemptReconnect(ctx) + + // Check if the context has been canceled before attempting to reconnect as the client + // might have given up on the context + select { + case <-ctx.Done(): + // Context has been canceled, return an error + return nil, ctx.Err() + default: + // Attempt to reconnect + s.attemptReconnect(ctx) + } + // retry getting the latest item continue }