diff --git a/v2/lockrenewer_test.go b/v2/lockrenewer_test.go index 3dc65a5..a41285a 100644 --- a/v2/lockrenewer_test.go +++ b/v2/lockrenewer_test.go @@ -381,7 +381,7 @@ func Test_RenewTimeoutOption(t *testing.T) { }, }, { - name: "should exit after first lock renewal failure due to context errors", + name: "should exit after first lock renewal failure due to context deadline exceeded", settler: &fakeSBRenewLockSettler{ // set delay to be greater than interval to check for lockTimeout Delay: time.Duration(100) * time.Millisecond, @@ -444,3 +444,179 @@ func Test_RenewTimeoutOption(t *testing.T) { }) } } + +func Test_RenewRetry(t *testing.T) { + type testCase struct { + name string + settler *fakeSBRenewLockSettler + isRenewerCanceled bool + renewTimeout *time.Duration + cancelCtxOnStop *bool + msgLockedUntil *time.Time + completeDelay time.Duration + processorCtx context.Context + gotMessageCtx context.Context + verify func(g Gomega, tc *testCase, metrics *processor.Informer) + } + testCases := []testCase{ + { + name: "should not attempt to renew if lock is already expired", + settler: &fakeSBRenewLockSettler{ + // set delay to be greater than interval to check for lockTimeout + Delay: time.Duration(100) * time.Millisecond, + Err: context.Canceled, + }, + msgLockedUntil: to.Ptr(time.Now()), + renewTimeout: to.Ptr(60 * time.Millisecond), + verify: func(g Gomega, tc *testCase, metrics *processor.Informer) { + g.Eventually( + func(g Gomega) { g.Expect(tc.settler.RenewCalled.Load()).To(Equal(int32(0))) }, + 180*time.Millisecond, + 20*time.Millisecond).Should(Succeed()) + g.Expect(tc.gotMessageCtx.Err()).To(Equal(context.Canceled)) + // processor context healthy because we finished early + g.Expect(tc.processorCtx.Err()).To(BeNil()) + }, + }, + { + name: "should continue retry if error is context Canceled", + settler: &fakeSBRenewLockSettler{ + // set delay to be greater than interval to check for lockTimeout + Delay: time.Duration(100) * time.Millisecond, + Err: context.Canceled, + }, + msgLockedUntil: to.Ptr(time.Now().Add(1 * time.Minute)), + renewTimeout: to.Ptr(60 * time.Millisecond), + verify: func(g Gomega, tc *testCase, metrics *processor.Informer) { + // renewal times out every 60ms with context.Canceled error + // retry should continue until the downstream handler completes at 180ms + g.Eventually( + func(g Gomega) { g.Expect(tc.settler.RenewCalled.Load()).To(Equal(int32(3))) }, + 180*time.Millisecond, + 20*time.Millisecond).Should(Succeed()) + g.Expect(tc.gotMessageCtx.Err()).To(Equal(context.Canceled)) + // processor context healthy because we finished early + g.Expect(tc.processorCtx.Err()).To(BeNil()) + }, + }, + { + name: "should continue retry until upstream context deadline exceeded", + settler: &fakeSBRenewLockSettler{ + // set delay to be greater than interval to check for lockTimeout + Delay: time.Duration(100) * time.Millisecond, + Err: context.Canceled, + }, + msgLockedUntil: to.Ptr(time.Now().Add(1 * time.Minute)), + renewTimeout: to.Ptr(50 * time.Millisecond), + completeDelay: 300 * time.Millisecond, + verify: func(g Gomega, tc *testCase, metrics *processor.Informer) { + // renewal times out every 50ms with context.Canceled error + // retry should continue until upstream context exceeds deadline at 210ms + g.Eventually( + func(g Gomega) { g.Expect(tc.settler.RenewCalled.Load()).To(Equal(int32(4))) }, + 180*time.Millisecond, + 20*time.Millisecond).Should(Succeed()) + g.Expect(tc.gotMessageCtx.Err()).To(Equal(context.DeadlineExceeded)) + // processor context healthy because we finished early + g.Expect(tc.processorCtx.Err()).To(Equal(context.DeadlineExceeded)) + }, + }, + { + name: "should continue retry until lock expires", + settler: &fakeSBRenewLockSettler{ + // set delay to be greater than interval to check for lockTimeout + Delay: time.Duration(100) * time.Millisecond, + Err: context.Canceled, + }, + msgLockedUntil: to.Ptr(time.Now().Add(110 * time.Millisecond)), + renewTimeout: to.Ptr(50 * time.Millisecond), + verify: func(g Gomega, tc *testCase, metrics *processor.Informer) { + // renewal times out every 50ms with context.Canceled error + // retry should continue until lock expires at 110ms + g.Eventually( + func(g Gomega) { g.Expect(tc.settler.RenewCalled.Load()).To(Equal(int32(2))) }, + 180*time.Millisecond, + 20*time.Millisecond).Should(Succeed()) + g.Expect(tc.gotMessageCtx.Err()).To(Equal(context.Canceled)) + // processor context healthy because we finished early + g.Expect(tc.processorCtx.Err()).To(BeNil()) + }, + }, + { + name: "should not retry if renew is successful", + settler: &fakeSBRenewLockSettler{}, + msgLockedUntil: to.Ptr(time.Now().Add(1 * time.Minute)), + renewTimeout: to.Ptr(100 * time.Millisecond), + verify: func(g Gomega, tc *testCase, metrics *processor.Informer) { + // should renew 3 times before message completes at 180ms + g.Eventually( + func(g Gomega) { g.Expect(tc.settler.RenewCalled.Load()).To(Equal(int32(3))) }, + 180*time.Millisecond, + 20*time.Millisecond).Should(Succeed()) + g.Expect(tc.gotMessageCtx.Err()).To(Equal(context.Canceled)) + // processor context healthy because we finished early + g.Expect(tc.processorCtx.Err()).To(BeNil()) + }, + }, + { + name: "should not retry if renew fails for an error other than context canceled", + settler: &fakeSBRenewLockSettler{ + Err: context.DeadlineExceeded, + }, + msgLockedUntil: to.Ptr(time.Now().Add(1 * time.Minute)), + renewTimeout: to.Ptr(100 * time.Millisecond), + verify: func(g Gomega, tc *testCase, metrics *processor.Informer) { + // should renew 3 times before message completes at 180ms + g.Eventually( + func(g Gomega) { g.Expect(tc.settler.RenewCalled.Load()).To(Equal(int32(1))) }, + 180*time.Millisecond, + 20*time.Millisecond).Should(Succeed()) + g.Expect(tc.gotMessageCtx.Err()).To(Equal(context.Canceled)) + // processor context healthy because we finished early + g.Expect(tc.processorCtx.Err()).To(BeNil()) + }, + }, + } + for _, tc := range testCases { + tc := tc + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + interval := 50 * time.Millisecond + reg := processor.NewRegistry() + reg.Init(prometheus.NewRegistry()) + informer := processor.NewInformerFor(reg) + lr := shuttle.NewRenewLockHandler( + &shuttle.LockRenewalOptions{ + Interval: &interval, + CancelMessageContextOnStop: tc.cancelCtxOnStop, + LockRenewalTimeout: tc.renewTimeout, + MetricRecorder: reg, + }, + shuttle.HandlerFunc(func(ctx context.Context, settler shuttle.MessageSettler, + message *azservicebus.ReceivedMessage) { + tc.gotMessageCtx = ctx + completeDelay := 180 * time.Millisecond + if tc.completeDelay > 0 { + completeDelay = tc.completeDelay + } + select { + case <-time.After(completeDelay): + break + case <-ctx.Done(): + break + } + })) + msg := &azservicebus.ReceivedMessage{ + LockedUntil: tc.msgLockedUntil, + } + ctx, cancel := context.WithTimeout(context.Background(), 210*time.Millisecond) + tc.processorCtx = ctx + if tc.isRenewerCanceled { + cancel() + } + defer cancel() + lr.Handle(ctx, tc.settler, msg) + tc.verify(NewWithT(t), &tc, informer) + }) + } +}