Skip to content

Commit

Permalink
fix existing ut
Browse files Browse the repository at this point in the history
  • Loading branch information
karenychen committed Aug 4, 2024
1 parent ca3a330 commit e0c60b3
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 54 deletions.
78 changes: 26 additions & 52 deletions v2/lockrenewer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,9 @@ func Test_StopRenewingOnHandlerCompletion(t *testing.T) {
err := settler.CompleteMessage(ctx, message, nil)
g.Expect(err).To(Not(HaveOccurred()))
}))
msg := &azservicebus.ReceivedMessage{}
msg := &azservicebus.ReceivedMessage{
LockedUntil: to.Ptr(time.Now().Add(1 * time.Minute)),
}
ctx, cancel := context.WithTimeout(context.TODO(), 120*time.Millisecond)
defer cancel()
lr.Handle(ctx, settler, msg)
Expand All @@ -90,8 +92,12 @@ func Test_RenewalHandlerStayIndependentPerMessage(t *testing.T) {
// send 2 message with different context, cancel the 2nd context right away.
// The 2nd message should not be renewed.
// The 1st message should be renewed exactly twice
msg1 := &azservicebus.ReceivedMessage{}
msg2 := &azservicebus.ReceivedMessage{}
msg1 := &azservicebus.ReceivedMessage{
LockedUntil: to.Ptr(time.Now().Add(1 * time.Minute)),
}
msg2 := &azservicebus.ReceivedMessage{
LockedUntil: to.Ptr(time.Now().Add(1 * time.Minute)),
}
ctx := context.Background()
ctx1, cancel1 := context.WithCancel(ctx)
ctx2, cancel2 := context.WithCancel(ctx)
Expand Down Expand Up @@ -126,7 +132,9 @@ func Test_RenewPeriodically(t *testing.T) {
message *azservicebus.ReceivedMessage) {
time.Sleep(150 * time.Millisecond)
}))
msg := &azservicebus.ReceivedMessage{}
msg := &azservicebus.ReceivedMessage{
LockedUntil: to.Ptr(time.Now().Add(1 * time.Minute)),
}
ctx, cancel := context.WithTimeout(context.TODO(), 120*time.Millisecond)
defer cancel()
lr.Handle(ctx, settler, msg)
Expand All @@ -146,7 +154,9 @@ func Test_NewLockRenewalHandler_RenewPeriodically(t *testing.T) {
message *azservicebus.ReceivedMessage) {
time.Sleep(150 * time.Millisecond)
}))
msg := &azservicebus.ReceivedMessage{}
msg := &azservicebus.ReceivedMessage{
LockedUntil: to.Ptr(time.Now().Add(1 * time.Minute)),
}
ctx, cancel := context.WithTimeout(context.TODO(), 120*time.Millisecond)
defer cancel()
lr.Handle(ctx, settler, msg)
Expand Down Expand Up @@ -178,9 +188,11 @@ func Test_RenewPeriodically_Error(t *testing.T) {
},
},
{
name: "stop periodic renewal on context canceled",
name: "stop periodic renewal on renewal context deadline exceeded",
isRenewerCanceled: false,
settler: &fakeSBRenewLockSettler{Err: context.Canceled},
settler: &fakeSBRenewLockSettler{
Err: context.DeadlineExceeded,
},
verify: func(g Gomega, tc *testCase, metrics *processor.Informer) {
g.Consistently(
func(g Gomega) {
Expand All @@ -194,7 +206,7 @@ func Test_RenewPeriodically_Error(t *testing.T) {
},
},
{
name: "stop periodic renewal on context canceled",
name: "stop periodic renewal on msg context canceled",
isRenewerCanceled: true,
settler: &fakeSBRenewLockSettler{Err: context.Canceled},
verify: func(g Gomega, tc *testCase, metrics *processor.Informer) {
Expand Down Expand Up @@ -274,7 +286,9 @@ func Test_RenewPeriodically_Error(t *testing.T) {
break
}
}))
msg := &azservicebus.ReceivedMessage{}
msg := &azservicebus.ReceivedMessage{
LockedUntil: to.Ptr(time.Now().Add(1 * time.Minute)),
}
ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond)
if tc.isRenewerCanceled {
cancel()
Expand All @@ -299,48 +313,6 @@ func Test_RenewTimeoutOption(t *testing.T) {
verify func(g Gomega, tc *testCase, metrics *processor.Informer)
}
testCases := []testCase{
{
name: "should time out at the same time as the renewal interval if not set",
settler: &fakeSBRenewLockSettler{
// set delay to be greater than interval to check for lockTimeout
Delay: time.Duration(100) * time.Millisecond,
// customized error to check renewal timeout config
Err: fmt.Errorf("renew timeout"),
},
verify: func(g Gomega, tc *testCase, metrics *processor.Informer) {
// first renewal attempt at 0ms and finish at 50ms
// after interval 50ms, second renewal attempt start at 100ms and finish at 150ms
// eventually, the downstream handler completed at 180ms and message context was canceled
g.Eventually(
func(g Gomega) { g.Expect(tc.settler.RenewCalled.Load()).To(Equal(int32(2))) },
180*time.Millisecond,
20*time.Millisecond).Should(Succeed())
g.Expect(tc.gotMessageCtx.Err()).To(Equal(context.Canceled))
// processor context healthy because we finished early
g.Expect(tc.processorCtx.Err()).To(BeNil())
},
},
{
name: "should time out at the same time as the renewal interval if set to 0",
settler: &fakeSBRenewLockSettler{
// set delay to be greater than interval to check for lockTimeout
Delay: time.Duration(100) * time.Millisecond,
// customized error to check renewal timeout config
Err: fmt.Errorf("renew timeout"),
},
renewTimeout: to.Ptr(time.Duration(0)),
verify: func(g Gomega, tc *testCase, metrics *processor.Informer) {
// first renewal attempt at 0ms and finish at 50ms
// after interval 50ms, second renewal attempt start at 100ms and finish at 150ms
// eventually, the downstream handler completed at 180ms and message context was canceled
g.Eventually(
func(g Gomega) { g.Expect(tc.settler.RenewCalled.Load()).To(Equal(int32(2))) },
180*time.Millisecond,
20*time.Millisecond).Should(Succeed())
g.Expect(tc.gotMessageCtx.Err()).To(Equal(context.Canceled))
g.Expect(tc.processorCtx.Err()).To(BeNil())
},
},
{
name: "should time out sooner than renewal interval if set",
settler: &fakeSBRenewLockSettler{
Expand Down Expand Up @@ -458,7 +430,9 @@ func Test_RenewTimeoutOption(t *testing.T) {
break
}
}))
msg := &azservicebus.ReceivedMessage{}
msg := &azservicebus.ReceivedMessage{
LockedUntil: to.Ptr(time.Now().Add(1 * time.Minute)),
}
ctx, cancel := context.WithTimeout(context.Background(), 210*time.Millisecond)
tc.processorCtx = ctx
if tc.isRenewerCanceled {
Expand Down
8 changes: 6 additions & 2 deletions v2/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -581,14 +581,18 @@ func TestProcessorStart_MultiProcessorWithNewRenewLockHandler(t *testing.T) {
func messagesChannel(messageCount int) chan *azservicebus.ReceivedMessage {
messages := make(chan *azservicebus.ReceivedMessage, messageCount)
for i := 0; i < messageCount; i++ {
messages <- &azservicebus.ReceivedMessage{}
messages <- &azservicebus.ReceivedMessage{
LockedUntil: to.Ptr(time.Now().Add(1 * time.Minute)),
}
}
return messages
}

func enqueueCount(q chan *azservicebus.ReceivedMessage, messageCount int) {
for i := 0; i < messageCount; i++ {
q <- &azservicebus.ReceivedMessage{}
q <- &azservicebus.ReceivedMessage{
LockedUntil: to.Ptr(time.Now().Add(1 * time.Minute)),
}
}
}

Expand Down

0 comments on commit e0c60b3

Please sign in to comment.