Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🐛 fix: do not count context errors as failure to renew a lock #212

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 20 additions & 4 deletions v2/lockrenewer.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,10 @@ import (

"github.com/Azure/azure-sdk-for-go/sdk/azcore/to"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus"
"github.com/Azure/go-shuttle/v2/metrics/processor"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"

"github.com/Azure/go-shuttle/v2/metrics/processor"
)

// LockRenewer abstracts the servicebus receiver client to only expose lock renewal
Expand All @@ -26,25 +27,33 @@ type LockRenewalOptions struct {
// CancelMessageContextOnStop will cancel the downstream message context when the renewal handler is stopped.
// Defaults to true.
CancelMessageContextOnStop *bool
// MetricRecorder allows to pass a custom metric recorder for the LockRenewer.
// Defaults to processor.Metric instance.
MetricRecorder processor.Recorder
}

// NewLockRenewalHandler returns a middleware handler that will renew the lock on the message at the specified interval.
func NewLockRenewalHandler(lockRenewer LockRenewer, options *LockRenewalOptions, handler Handler) HandlerFunc {
interval := 10 * time.Second
cancelMessageContextOnStop := true
metricRecorder := processor.Metric
if options != nil {
if options.Interval != nil {
interval = *options.Interval
}
if options.CancelMessageContextOnStop != nil {
cancelMessageContextOnStop = *options.CancelMessageContextOnStop
}
if options.MetricRecorder != nil {
metricRecorder = options.MetricRecorder
}
}
return func(ctx context.Context, settler MessageSettler, message *azservicebus.ReceivedMessage) {
plr := &peekLockRenewer{
next: handler,
lockRenewer: lockRenewer,
renewalInterval: &interval,
metrics: metricRecorder,
cancelMessageCtxOnStop: cancelMessageContextOnStop,
stopped: make(chan struct{}, 1), // buffered channel to ensure we are not blocking
}
Expand Down Expand Up @@ -74,6 +83,7 @@ type peekLockRenewer struct {
next Handler
lockRenewer LockRenewer
renewalInterval *time.Duration
metrics processor.Recorder
alive atomic.Bool
cancelMessageCtxOnStop bool
cancelMessageCtx func()
Expand Down Expand Up @@ -124,7 +134,13 @@ func (plr *peekLockRenewer) startPeriodicRenewal(ctx context.Context, message *a
err := plr.lockRenewer.RenewMessageLock(ctx, message, nil)
if err != nil {
log(ctx, fmt.Sprintf("failed to renew lock: %s", err))
processor.Metric.IncMessageLockRenewedFailure(message)
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
// if the error is a context error
// we stop and let the next loop iteration handle the exit.
plr.stop(ctx)
continue
}
plr.metrics.IncMessageLockRenewedFailure(message)
// The context is canceled when the message handler returns from the processor.
// This can happen if we already entered the interval case when the message processing completes.
// The best we can do is log and retry on the next tick. The sdk already retries operations on recoverable network errors.
Expand All @@ -140,14 +156,14 @@ func (plr *peekLockRenewer) startPeriodicRenewal(ctx context.Context, message *a
continue
}
span.AddEvent("message lock renewed", trace.WithAttributes(attribute.Int("count", count)))
processor.Metric.IncMessageLockRenewedSuccess(message)
plr.metrics.IncMessageLockRenewedSuccess(message)
case <-ctx.Done():
log(ctx, "context done: stopping periodic renewal")
span.AddEvent("context done: stopping message lock renewal")
err := ctx.Err()
if errors.Is(err, context.DeadlineExceeded) {
span.RecordError(err)
processor.Metric.IncMessageDeadlineReachedCount(message)
plr.metrics.IncMessageDeadlineReachedCount(message)
}
plr.stop(ctx)
case <-plr.stopped:
Expand Down
46 changes: 36 additions & 10 deletions v2/lockrenewer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,10 @@ import (
"github.com/Azure/azure-sdk-for-go/sdk/azcore/to"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus"
. "github.com/onsi/gomega"
"github.com/prometheus/client_golang/prometheus"

"github.com/Azure/go-shuttle/v2"
"github.com/Azure/go-shuttle/v2/metrics/processor"
)

type fakeSBLockRenewer struct {
Expand Down Expand Up @@ -150,24 +152,40 @@ func Test_RenewPeriodically_Error(t *testing.T) {
isRenewerCanceled bool
cancelCtxOnStop *bool
gotMessageCtx context.Context
verify func(g Gomega, tc *testCase)
verify func(g Gomega, tc *testCase, metrics *processor.Informer)
}
testCases := []testCase{
{
name: "continue periodic renewal on unknown error",
renewer: &fakeSBLockRenewer{Err: fmt.Errorf("unknown error")},
verify: func(g Gomega, tc *testCase) {
verify: func(g Gomega, tc *testCase, metrics *processor.Informer) {
g.Eventually(
func(g Gomega) { g.Expect(tc.renewer.RenewCount.Load()).To(Equal(int32(2))) },
130*time.Millisecond,
20*time.Millisecond).Should(Succeed())
},
},
{
name: "stop periodic renewal on context canceled",
isRenewerCanceled: false,
renewer: &fakeSBLockRenewer{Err: context.Canceled},
verify: func(g Gomega, tc *testCase, metrics *processor.Informer) {
g.Consistently(
func(g Gomega) {
g.Expect(tc.renewer.RenewCount.Load()).To(Equal(int32(1)),
"should not attempt to renew")
g.Expect(metrics.GetMessageLockRenewedFailureCount()).To(Equal(float64(0)),
"should not record failure metric")
},
130*time.Millisecond,
20*time.Millisecond).Should(Succeed())
},
},
{
name: "stop periodic renewal on context canceled",
isRenewerCanceled: true,
renewer: &fakeSBLockRenewer{Err: context.Canceled},
verify: func(g Gomega, tc *testCase) {
verify: func(g Gomega, tc *testCase, metrics *processor.Informer) {
g.Consistently(
func(g Gomega) { g.Expect(tc.renewer.RenewCount.Load()).To(Equal(int32(0))) },
130*time.Millisecond,
Expand All @@ -177,7 +195,7 @@ func Test_RenewPeriodically_Error(t *testing.T) {
{
name: "stop periodic renewal on permanent error (lockLost)",
renewer: &fakeSBLockRenewer{Err: &azservicebus.Error{Code: azservicebus.CodeLockLost}},
verify: func(g Gomega, tc *testCase) {
verify: func(g Gomega, tc *testCase, metrics *processor.Informer) {
g.Consistently(
func(g Gomega) { g.Expect(tc.renewer.RenewCount.Load()).To(Equal(int32(1))) },
130*time.Millisecond,
Expand All @@ -187,7 +205,7 @@ func Test_RenewPeriodically_Error(t *testing.T) {
{
name: "cancel message context on stop by default",
renewer: &fakeSBLockRenewer{Err: &azservicebus.Error{Code: azservicebus.CodeLockLost}},
verify: func(g Gomega, tc *testCase) {
verify: func(g Gomega, tc *testCase, metrics *processor.Informer) {
g.Consistently(
func(g Gomega) { g.Expect(tc.renewer.RenewCount.Load()).To(Equal(int32(1))) },
130*time.Millisecond,
Expand All @@ -199,7 +217,7 @@ func Test_RenewPeriodically_Error(t *testing.T) {
name: "does not cancel message context on stop if disabled",
renewer: &fakeSBLockRenewer{Err: &azservicebus.Error{Code: azservicebus.CodeLockLost}},
cancelCtxOnStop: to.Ptr(false),
verify: func(g Gomega, tc *testCase) {
verify: func(g Gomega, tc *testCase, metrics *processor.Informer) {
g.Consistently(
func(g Gomega) {
g.Expect(tc.renewer.RenewCount.Load()).To(Equal(int32(1)))
Expand All @@ -212,7 +230,7 @@ func Test_RenewPeriodically_Error(t *testing.T) {
{
name: "continue periodic renewal on transient error (timeout)",
renewer: &fakeSBLockRenewer{Err: &azservicebus.Error{Code: azservicebus.CodeTimeout}},
verify: func(g Gomega, tc *testCase) {
verify: func(g Gomega, tc *testCase, metrics *processor.Informer) {
g.Eventually(
func(g Gomega) { g.Expect(tc.renewer.RenewCount.Load()).To(Equal(int32(2))) },
140*time.Millisecond,
Expand All @@ -225,7 +243,15 @@ func Test_RenewPeriodically_Error(t *testing.T) {
t.Run(tc.name, func(t *testing.T) {
t.Parallel()
interval := 50 * time.Millisecond
lr := shuttle.NewLockRenewalHandler(tc.renewer, &shuttle.LockRenewalOptions{Interval: &interval, CancelMessageContextOnStop: tc.cancelCtxOnStop},
reg := processor.NewRegistry()
reg.Init(prometheus.NewRegistry())
informer := processor.NewInformerFor(reg)
lr := shuttle.NewLockRenewalHandler(tc.renewer,
&shuttle.LockRenewalOptions{
Interval: &interval,
CancelMessageContextOnStop: tc.cancelCtxOnStop,
MetricRecorder: reg,
},
shuttle.HandlerFunc(func(ctx context.Context, settler shuttle.MessageSettler,
message *azservicebus.ReceivedMessage) {
tc.gotMessageCtx = ctx
Expand All @@ -237,13 +263,13 @@ func Test_RenewPeriodically_Error(t *testing.T) {
}
}))
msg := &azservicebus.ReceivedMessage{}
ctx, cancel := context.WithTimeout(context.TODO(), 200*time.Millisecond)
ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond)
if tc.isRenewerCanceled {
cancel()
}
defer cancel()
lr.Handle(ctx, &fakeSettler{}, msg)
tc.verify(NewWithT(t), &tc)
tc.verify(NewWithT(t), &tc, informer)
})
}
}
14 changes: 11 additions & 3 deletions v2/metrics/processor/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,13 @@ const (
)

var (
metricsRegistry = newRegistry()
metricsRegistry = NewRegistry()
// Metric exposes a Recorder interface to manipulate the Processor metrics.
Metric Recorder = metricsRegistry
)

func newRegistry() *Registry {
// NewRegistry creates a new Registry with initialized prometheus counter definitions
func NewRegistry() *Registry {
return &Registry{
MessageReceivedCount: prom.NewCounterVec(prom.CounterOpts{
Name: "message_received_total",
Expand Down Expand Up @@ -59,6 +60,7 @@ func getMessageTypeLabel(msg *azservicebus.ReceivedMessage) prom.Labels {
}
}

// Init registers the counters from the Registry on the prometheus.Registerer
func (m *Registry) Init(reg prom.Registerer) {
reg.MustRegister(
m.MessageReceivedCount,
Expand All @@ -68,6 +70,7 @@ func (m *Registry) Init(reg prom.Registerer) {
m.ConcurrentMessageCount)
}

// Registry provides the prometheus metrics for the message processor
type Registry struct {
MessageReceivedCount *prom.CounterVec
MessageHandledCount *prom.CounterVec
Expand Down Expand Up @@ -137,7 +140,12 @@ type Informer struct {

// NewInformer creates an Informer for the current registry
func NewInformer() *Informer {
return &Informer{registry: metricsRegistry}
return NewInformerFor(metricsRegistry)
}

// NewInformerFor creates an Informer for the current registry
func NewInformerFor(r *Registry) *Informer {
return &Informer{registry: r}
}

// GetMessageLockRenewedFailureCount retrieves the current value of the MessageLockRenewedFailureCount metric
Expand Down
11 changes: 8 additions & 3 deletions v2/metrics/processor/types_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,18 @@ func (f *fakeRegistry) Unregister(c prometheus.Collector) bool {

func TestRegistry_Init(t *testing.T) {
g := NewWithT(t)
r := newRegistry()
r := NewRegistry()
fRegistry := &fakeRegistry{}
g.Expect(func() { r.Init(prometheus.NewRegistry()) }).ToNot(Panic())
g.Expect(func() { r.Init(fRegistry) }).ToNot(Panic())
g.Expect(fRegistry.collectors).To(HaveLen(5))
Metric.IncMessageReceived(10)
}

func TestNewInformerDefault(t *testing.T) {
i := NewInformer()
g := NewWithT(t)
g.Expect(i.registry).To(Equal(Metric))
}

func TestMetrics(t *testing.T) {
Expand All @@ -55,9 +60,9 @@ func TestMetrics(t *testing.T) {
},
} {
g := NewWithT(t)
r := newRegistry()
r := NewRegistry()
registerer := prometheus.NewRegistry()
informer := &Informer{registry: r}
informer := NewInformerFor(r)

// before init
count, err := informer.GetMessageLockRenewedFailureCount()
Expand Down
Loading