Skip to content

Commit

Permalink
added UT
Browse files Browse the repository at this point in the history
  • Loading branch information
karenychen committed Aug 5, 2024
1 parent e0c60b3 commit d73c081
Showing 1 changed file with 177 additions and 1 deletion.
178 changes: 177 additions & 1 deletion v2/lockrenewer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -381,7 +381,7 @@ func Test_RenewTimeoutOption(t *testing.T) {
},
},
{
name: "should exit after first lock renewal failure due to context errors",
name: "should exit after first lock renewal failure due to context deadline exceeded",
settler: &fakeSBRenewLockSettler{
// set delay to be greater than interval to check for lockTimeout
Delay: time.Duration(100) * time.Millisecond,
Expand Down Expand Up @@ -444,3 +444,179 @@ func Test_RenewTimeoutOption(t *testing.T) {
})
}
}

func Test_RenewRetry(t *testing.T) {
type testCase struct {
name string
settler *fakeSBRenewLockSettler
isRenewerCanceled bool
renewTimeout *time.Duration
cancelCtxOnStop *bool
msgLockedUntil *time.Time
completeDelay time.Duration
processorCtx context.Context
gotMessageCtx context.Context
verify func(g Gomega, tc *testCase, metrics *processor.Informer)
}
testCases := []testCase{
{
name: "should not attempt to renew if lock is already expired",
settler: &fakeSBRenewLockSettler{
// set delay to be greater than interval to check for lockTimeout
Delay: time.Duration(100) * time.Millisecond,
Err: context.Canceled,
},
msgLockedUntil: to.Ptr(time.Now()),
renewTimeout: to.Ptr(60 * time.Millisecond),
verify: func(g Gomega, tc *testCase, metrics *processor.Informer) {
g.Eventually(
func(g Gomega) { g.Expect(tc.settler.RenewCalled.Load()).To(Equal(int32(0))) },
180*time.Millisecond,
20*time.Millisecond).Should(Succeed())
g.Expect(tc.gotMessageCtx.Err()).To(Equal(context.Canceled))
// processor context healthy because we finished early
g.Expect(tc.processorCtx.Err()).To(BeNil())
},
},
{
name: "should continue retry if error is context Canceled",
settler: &fakeSBRenewLockSettler{
// set delay to be greater than interval to check for lockTimeout
Delay: time.Duration(100) * time.Millisecond,
Err: context.Canceled,
},
msgLockedUntil: to.Ptr(time.Now().Add(1 * time.Minute)),
renewTimeout: to.Ptr(60 * time.Millisecond),
verify: func(g Gomega, tc *testCase, metrics *processor.Informer) {
// renewal times out every 60ms with context.Canceled error
// retry should continue until the downstream handler completes at 180ms
g.Eventually(
func(g Gomega) { g.Expect(tc.settler.RenewCalled.Load()).To(Equal(int32(3))) },
180*time.Millisecond,
20*time.Millisecond).Should(Succeed())
g.Expect(tc.gotMessageCtx.Err()).To(Equal(context.Canceled))
// processor context healthy because we finished early
g.Expect(tc.processorCtx.Err()).To(BeNil())
},
},
{
name: "should continue retry until upstream context deadline exceeded",
settler: &fakeSBRenewLockSettler{
// set delay to be greater than interval to check for lockTimeout
Delay: time.Duration(100) * time.Millisecond,
Err: context.Canceled,
},
msgLockedUntil: to.Ptr(time.Now().Add(1 * time.Minute)),
renewTimeout: to.Ptr(50 * time.Millisecond),
completeDelay: 300 * time.Millisecond,
verify: func(g Gomega, tc *testCase, metrics *processor.Informer) {
// renewal times out every 50ms with context.Canceled error
// retry should continue until upstream context exceeds deadline at 210ms
g.Eventually(
func(g Gomega) { g.Expect(tc.settler.RenewCalled.Load()).To(Equal(int32(4))) },
180*time.Millisecond,
20*time.Millisecond).Should(Succeed())
g.Expect(tc.gotMessageCtx.Err()).To(Equal(context.DeadlineExceeded))
// processor context healthy because we finished early
g.Expect(tc.processorCtx.Err()).To(Equal(context.DeadlineExceeded))
},
},
{
name: "should continue retry until lock expires",
settler: &fakeSBRenewLockSettler{
// set delay to be greater than interval to check for lockTimeout
Delay: time.Duration(100) * time.Millisecond,
Err: context.Canceled,
},
msgLockedUntil: to.Ptr(time.Now().Add(110 * time.Millisecond)),
renewTimeout: to.Ptr(50 * time.Millisecond),
verify: func(g Gomega, tc *testCase, metrics *processor.Informer) {
// renewal times out every 50ms with context.Canceled error
// retry should continue until lock expires at 110ms
g.Eventually(
func(g Gomega) { g.Expect(tc.settler.RenewCalled.Load()).To(Equal(int32(2))) },
180*time.Millisecond,
20*time.Millisecond).Should(Succeed())
g.Expect(tc.gotMessageCtx.Err()).To(Equal(context.Canceled))
// processor context healthy because we finished early
g.Expect(tc.processorCtx.Err()).To(BeNil())
},
},
{
name: "should not retry if renew is successful",
settler: &fakeSBRenewLockSettler{},
msgLockedUntil: to.Ptr(time.Now().Add(1 * time.Minute)),
renewTimeout: to.Ptr(100 * time.Millisecond),
verify: func(g Gomega, tc *testCase, metrics *processor.Informer) {
// should renew 3 times before message completes at 180ms
g.Eventually(
func(g Gomega) { g.Expect(tc.settler.RenewCalled.Load()).To(Equal(int32(3))) },
180*time.Millisecond,
20*time.Millisecond).Should(Succeed())
g.Expect(tc.gotMessageCtx.Err()).To(Equal(context.Canceled))
// processor context healthy because we finished early
g.Expect(tc.processorCtx.Err()).To(BeNil())
},
},
{
name: "should not retry if renew fails for an error other than context canceled",
settler: &fakeSBRenewLockSettler{
Err: context.DeadlineExceeded,
},
msgLockedUntil: to.Ptr(time.Now().Add(1 * time.Minute)),
renewTimeout: to.Ptr(100 * time.Millisecond),
verify: func(g Gomega, tc *testCase, metrics *processor.Informer) {
// should renew 3 times before message completes at 180ms
g.Eventually(
func(g Gomega) { g.Expect(tc.settler.RenewCalled.Load()).To(Equal(int32(1))) },
180*time.Millisecond,
20*time.Millisecond).Should(Succeed())
g.Expect(tc.gotMessageCtx.Err()).To(Equal(context.Canceled))
// processor context healthy because we finished early
g.Expect(tc.processorCtx.Err()).To(BeNil())
},
},
}
for _, tc := range testCases {
tc := tc
t.Run(tc.name, func(t *testing.T) {
t.Parallel()
interval := 50 * time.Millisecond
reg := processor.NewRegistry()
reg.Init(prometheus.NewRegistry())
informer := processor.NewInformerFor(reg)
lr := shuttle.NewRenewLockHandler(
&shuttle.LockRenewalOptions{
Interval: &interval,
CancelMessageContextOnStop: tc.cancelCtxOnStop,
LockRenewalTimeout: tc.renewTimeout,
MetricRecorder: reg,
},
shuttle.HandlerFunc(func(ctx context.Context, settler shuttle.MessageSettler,
message *azservicebus.ReceivedMessage) {
tc.gotMessageCtx = ctx
completeDelay := 180 * time.Millisecond
if tc.completeDelay > 0 {
completeDelay = tc.completeDelay
}
select {
case <-time.After(completeDelay):
break
case <-ctx.Done():
break
}
}))
msg := &azservicebus.ReceivedMessage{
LockedUntil: tc.msgLockedUntil,
}
ctx, cancel := context.WithTimeout(context.Background(), 210*time.Millisecond)
tc.processorCtx = ctx
if tc.isRenewerCanceled {
cancel()
}
defer cancel()
lr.Handle(ctx, tc.settler, msg)
tc.verify(NewWithT(t), &tc, informer)
})
}
}

0 comments on commit d73c081

Please sign in to comment.