From e0c60b338f752711b71a6c5d158b7d621f3fb263 Mon Sep 17 00:00:00 2001 From: Karen Chen Date: Sun, 4 Aug 2024 19:44:33 -0400 Subject: [PATCH] fix existing ut --- v2/lockrenewer_test.go | 78 ++++++++++++++---------------------------- v2/processor_test.go | 8 +++-- 2 files changed, 32 insertions(+), 54 deletions(-) diff --git a/v2/lockrenewer_test.go b/v2/lockrenewer_test.go index 928bf463..3dc65a5b 100644 --- a/v2/lockrenewer_test.go +++ b/v2/lockrenewer_test.go @@ -64,7 +64,9 @@ func Test_StopRenewingOnHandlerCompletion(t *testing.T) { err := settler.CompleteMessage(ctx, message, nil) g.Expect(err).To(Not(HaveOccurred())) })) - msg := &azservicebus.ReceivedMessage{} + msg := &azservicebus.ReceivedMessage{ + LockedUntil: to.Ptr(time.Now().Add(1 * time.Minute)), + } ctx, cancel := context.WithTimeout(context.TODO(), 120*time.Millisecond) defer cancel() lr.Handle(ctx, settler, msg) @@ -90,8 +92,12 @@ func Test_RenewalHandlerStayIndependentPerMessage(t *testing.T) { // send 2 message with different context, cancel the 2nd context right away. // The 2nd message should not be renewed. // The 1st message should be renewed exactly twice - msg1 := &azservicebus.ReceivedMessage{} - msg2 := &azservicebus.ReceivedMessage{} + msg1 := &azservicebus.ReceivedMessage{ + LockedUntil: to.Ptr(time.Now().Add(1 * time.Minute)), + } + msg2 := &azservicebus.ReceivedMessage{ + LockedUntil: to.Ptr(time.Now().Add(1 * time.Minute)), + } ctx := context.Background() ctx1, cancel1 := context.WithCancel(ctx) ctx2, cancel2 := context.WithCancel(ctx) @@ -126,7 +132,9 @@ func Test_RenewPeriodically(t *testing.T) { message *azservicebus.ReceivedMessage) { time.Sleep(150 * time.Millisecond) })) - msg := &azservicebus.ReceivedMessage{} + msg := &azservicebus.ReceivedMessage{ + LockedUntil: to.Ptr(time.Now().Add(1 * time.Minute)), + } ctx, cancel := context.WithTimeout(context.TODO(), 120*time.Millisecond) defer cancel() lr.Handle(ctx, settler, msg) @@ -146,7 +154,9 @@ func Test_NewLockRenewalHandler_RenewPeriodically(t *testing.T) { message *azservicebus.ReceivedMessage) { time.Sleep(150 * time.Millisecond) })) - msg := &azservicebus.ReceivedMessage{} + msg := &azservicebus.ReceivedMessage{ + LockedUntil: to.Ptr(time.Now().Add(1 * time.Minute)), + } ctx, cancel := context.WithTimeout(context.TODO(), 120*time.Millisecond) defer cancel() lr.Handle(ctx, settler, msg) @@ -178,9 +188,11 @@ func Test_RenewPeriodically_Error(t *testing.T) { }, }, { - name: "stop periodic renewal on context canceled", + name: "stop periodic renewal on renewal context deadline exceeded", isRenewerCanceled: false, - settler: &fakeSBRenewLockSettler{Err: context.Canceled}, + settler: &fakeSBRenewLockSettler{ + Err: context.DeadlineExceeded, + }, verify: func(g Gomega, tc *testCase, metrics *processor.Informer) { g.Consistently( func(g Gomega) { @@ -194,7 +206,7 @@ func Test_RenewPeriodically_Error(t *testing.T) { }, }, { - name: "stop periodic renewal on context canceled", + name: "stop periodic renewal on msg context canceled", isRenewerCanceled: true, settler: &fakeSBRenewLockSettler{Err: context.Canceled}, verify: func(g Gomega, tc *testCase, metrics *processor.Informer) { @@ -274,7 +286,9 @@ func Test_RenewPeriodically_Error(t *testing.T) { break } })) - msg := &azservicebus.ReceivedMessage{} + msg := &azservicebus.ReceivedMessage{ + LockedUntil: to.Ptr(time.Now().Add(1 * time.Minute)), + } ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond) if tc.isRenewerCanceled { cancel() @@ -299,48 +313,6 @@ func Test_RenewTimeoutOption(t *testing.T) { verify func(g Gomega, tc *testCase, metrics *processor.Informer) } testCases := []testCase{ - { - name: "should time out at the same time as the renewal interval if not set", - settler: &fakeSBRenewLockSettler{ - // set delay to be greater than interval to check for lockTimeout - Delay: time.Duration(100) * time.Millisecond, - // customized error to check renewal timeout config - Err: fmt.Errorf("renew timeout"), - }, - verify: func(g Gomega, tc *testCase, metrics *processor.Informer) { - // first renewal attempt at 0ms and finish at 50ms - // after interval 50ms, second renewal attempt start at 100ms and finish at 150ms - // eventually, the downstream handler completed at 180ms and message context was canceled - g.Eventually( - func(g Gomega) { g.Expect(tc.settler.RenewCalled.Load()).To(Equal(int32(2))) }, - 180*time.Millisecond, - 20*time.Millisecond).Should(Succeed()) - g.Expect(tc.gotMessageCtx.Err()).To(Equal(context.Canceled)) - // processor context healthy because we finished early - g.Expect(tc.processorCtx.Err()).To(BeNil()) - }, - }, - { - name: "should time out at the same time as the renewal interval if set to 0", - settler: &fakeSBRenewLockSettler{ - // set delay to be greater than interval to check for lockTimeout - Delay: time.Duration(100) * time.Millisecond, - // customized error to check renewal timeout config - Err: fmt.Errorf("renew timeout"), - }, - renewTimeout: to.Ptr(time.Duration(0)), - verify: func(g Gomega, tc *testCase, metrics *processor.Informer) { - // first renewal attempt at 0ms and finish at 50ms - // after interval 50ms, second renewal attempt start at 100ms and finish at 150ms - // eventually, the downstream handler completed at 180ms and message context was canceled - g.Eventually( - func(g Gomega) { g.Expect(tc.settler.RenewCalled.Load()).To(Equal(int32(2))) }, - 180*time.Millisecond, - 20*time.Millisecond).Should(Succeed()) - g.Expect(tc.gotMessageCtx.Err()).To(Equal(context.Canceled)) - g.Expect(tc.processorCtx.Err()).To(BeNil()) - }, - }, { name: "should time out sooner than renewal interval if set", settler: &fakeSBRenewLockSettler{ @@ -458,7 +430,9 @@ func Test_RenewTimeoutOption(t *testing.T) { break } })) - msg := &azservicebus.ReceivedMessage{} + msg := &azservicebus.ReceivedMessage{ + LockedUntil: to.Ptr(time.Now().Add(1 * time.Minute)), + } ctx, cancel := context.WithTimeout(context.Background(), 210*time.Millisecond) tc.processorCtx = ctx if tc.isRenewerCanceled { diff --git a/v2/processor_test.go b/v2/processor_test.go index 9744d3de..a0a433d2 100644 --- a/v2/processor_test.go +++ b/v2/processor_test.go @@ -581,14 +581,18 @@ func TestProcessorStart_MultiProcessorWithNewRenewLockHandler(t *testing.T) { func messagesChannel(messageCount int) chan *azservicebus.ReceivedMessage { messages := make(chan *azservicebus.ReceivedMessage, messageCount) for i := 0; i < messageCount; i++ { - messages <- &azservicebus.ReceivedMessage{} + messages <- &azservicebus.ReceivedMessage{ + LockedUntil: to.Ptr(time.Now().Add(1 * time.Minute)), + } } return messages } func enqueueCount(q chan *azservicebus.ReceivedMessage, messageCount int) { for i := 0; i < messageCount; i++ { - q <- &azservicebus.ReceivedMessage{} + q <- &azservicebus.ReceivedMessage{ + LockedUntil: to.Ptr(time.Now().Add(1 * time.Minute)), + } } }