Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kvserver/rangefeed: remove future package for server rangefeed #125782

Draft
wants to merge 4 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions pkg/kv/kvserver/rangefeed/buffered_registration.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ func newBufferedRegistration(
metrics *Metrics,
stream Stream,
unregisterFn func(),
cleanup func(registration),
) *bufferedRegistration {
br := &bufferedRegistration{
baseRegistration: baseRegistration{
Expand All @@ -91,6 +92,7 @@ func newBufferedRegistration(
withDiff: withDiff,
withFiltering: withFiltering,
withOmitRemote: withOmitRemote,
cleanup: cleanup,
unreg: unregisterFn,
},
metrics: metrics,
Expand Down Expand Up @@ -151,7 +153,7 @@ func (br *bufferedRegistration) publish(
// disconnect cancels the output loop context for the registration and passes an
// error to the output error stream for the registration.
// Safe to run multiple times, but subsequent errors would be discarded.
func (br *bufferedRegistration) disconnect(pErr *kvpb.Error) {
func (br *bufferedRegistration) disconnect(ctx context.Context, pErr *kvpb.Error) {
br.mu.Lock()
defer br.mu.Unlock()
if !br.mu.disconnected {
Expand All @@ -164,6 +166,7 @@ func (br *bufferedRegistration) disconnect(pErr *kvpb.Error) {
}
br.mu.disconnected = true
br.stream.Disconnect(pErr)
br.cleanup(br)
}
}

Expand Down Expand Up @@ -230,7 +233,7 @@ func (br *bufferedRegistration) runOutputLoop(ctx context.Context, _forStacks ro
ctx, br.mu.outputLoopCancelFn = context.WithCancel(ctx)
br.mu.Unlock()
err := br.outputLoop(ctx)
br.disconnect(kvpb.NewError(err))
br.disconnect(ctx, kvpb.NewError(err))
}

// drainAllocations should be done after registration is disconnected from
Expand Down
5 changes: 5 additions & 0 deletions pkg/kv/kvserver/rangefeed/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,11 @@ type Config struct {
// for low-volume system ranges, since the worker pool is small (default 2).
// Only has an effect when Scheduler is used.
Priority bool

// UnregisterFromReplica is a callback provided from the
// replica that this processor can call when shutting down to
// remove itself from the replica.
UnregisterFromReplica func(Processor)
}

// SetDefaults initializes unset fields in Config to values
Expand Down
60 changes: 51 additions & 9 deletions pkg/kv/kvserver/rangefeed/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"context"
"fmt"
"sync"
"sync/atomic"

"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
Expand All @@ -26,7 +27,7 @@ type registration interface {
publish(ctx context.Context, event *kvpb.RangeFeedEvent, alloc *SharedBudgetAllocation)
// disconnect disconnects the registration with the provided error. Safe to
// run multiple times, but subsequent errors would be discarded.
disconnect(pErr *kvpb.Error)
disconnect(ctx context.Context, pErr *kvpb.Error)
// runOutputLoop runs the output loop for the registration. The output loop is
// meant to be run in a separate goroutine.
runOutputLoop(ctx context.Context, forStacks roachpb.RangeID)
Expand Down Expand Up @@ -57,20 +58,32 @@ type registration interface {
// getUnreg returns the unregisterFn call back of the registration. It should
// be called when being unregistered from processor.
getUnreg() func()

// shouldUnregister returns true if this registration should
// be unregistered.
shouldUnregister() bool
// setShouldUnregister sets shouldUnregister to the given
// value.
setShouldUnregister(bool)
}

// baseRegistration is a common base for all registration types. It is intended
// to be embedded in an actual registration struct.
type baseRegistration struct {
streamCtx context.Context
span roachpb.Span
withDiff bool
withFiltering bool
withOmitRemote bool
unreg func()
streamCtx context.Context
span roachpb.Span
withDiff bool
withFiltering bool
withOmitRemote bool
// TODO(ssd): This unreg can be removed when the LegacyProcess
// is removed.
unreg func()
cleanup func(registration)

catchUpTimestamp hlc.Timestamp // exclusive
id int64 // internal
keys interval.Range
shouldUnreg atomic.Bool
}

// ID implements interval.Interface.
Expand Down Expand Up @@ -115,6 +128,14 @@ func (r *baseRegistration) getUnreg() func() {
return r.unreg
}

func (r *baseRegistration) shouldUnregister() bool {
return r.shouldUnreg.Load()
}

func (r *baseRegistration) setShouldUnregister(b bool) {
r.shouldUnreg.Store(b)
}

func (r *baseRegistration) getWithDiff() bool {
return r.withDiff
}
Expand Down Expand Up @@ -348,7 +369,10 @@ func (reg *registry) Unregister(ctx context.Context, r registration) {
// https://github.com/cockroachdb/cockroach/issues/110634
func (reg *registry) DisconnectAllOnShutdown(ctx context.Context, pErr *kvpb.Error) {
reg.metrics.RangeFeedRegistrations.Dec(int64(reg.tree.Len()))
reg.DisconnectWithErr(ctx, all, pErr)
reg.forOverlappingRegs(ctx, all, func(r registration) (bool, *kvpb.Error) {
r.drainAllocations(ctx)
return true /* disconned */, pErr
})
}

// Disconnect disconnects all registrations that overlap the specified span with
Expand Down Expand Up @@ -380,7 +404,7 @@ func (reg *registry) forOverlappingRegs(
r := i.(registration)
dis, pErr := fn(r)
if dis {
r.disconnect(pErr)
r.disconnect(ctx, pErr)
toDelete = append(toDelete, i)
}
return false
Expand All @@ -390,7 +414,10 @@ func (reg *registry) forOverlappingRegs(
} else {
reg.tree.DoMatching(matchFn, span.AsRange())
}
reg.remove(ctx, toDelete)
}

func (reg *registry) remove(ctx context.Context, toDelete []interval.Interface) {
if len(toDelete) == reg.tree.Len() {
reg.tree.Clear()
} else if len(toDelete) == 1 {
Expand All @@ -407,6 +434,21 @@ func (reg *registry) forOverlappingRegs(
}
}

func (reg *registry) unregisterMarkedRegistrations(ctx context.Context) {
var toDelete []interval.Interface
reg.tree.Do(func(i interval.Interface) (done bool) {
r := i.(registration)
if r.shouldUnregister() {
toDelete = append(toDelete, i)
}
return false
})
reg.remove(ctx, toDelete)
for _, i := range toDelete {
i.(registration).drainAllocations(ctx)
}
}

// waitForCaughtUp waits for all registrations overlapping the given span to
// completely process their internal buffers.
func (reg *registry) waitForCaughtUp(ctx context.Context, span roachpb.Span) error {
Expand Down
1 change: 1 addition & 0 deletions pkg/kv/kvserver/rangefeed/registry_helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,7 @@ func newTestRegistration(
NewMetrics(),
s,
func() {},
func(registration) {},
)
return &testRegistration{
bufferedRegistration: r,
Expand Down
34 changes: 17 additions & 17 deletions pkg/kv/kvserver/rangefeed/registry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func TestRegistrationBasic(t *testing.T) {
go noCatchupReg.runOutputLoop(ctx, 0)
require.NoError(t, noCatchupReg.waitForCaughtUp(ctx))
require.Equal(t, []*kvpb.RangeFeedEvent{ev1, ev2}, noCatchupReg.Events())
noCatchupReg.disconnect(nil)
noCatchupReg.disconnect(ctx, nil)

// Registration with catchup scan.
catchupReg := newTestRegistration(spBC, hlc.Timestamp{WallTime: 1},
Expand All @@ -56,7 +56,7 @@ func TestRegistrationBasic(t *testing.T) {
events := catchupReg.Events()
require.Equal(t, 5, len(events))
require.Equal(t, []*kvpb.RangeFeedEvent{ev1, ev2}, events[3:])
catchupReg.disconnect(nil)
catchupReg.disconnect(ctx, nil)

// EXIT CONDITIONS
// External Disconnect.
Expand All @@ -67,7 +67,7 @@ func TestRegistrationBasic(t *testing.T) {
go disconnectReg.runOutputLoop(ctx, 0)
require.NoError(t, disconnectReg.waitForCaughtUp(ctx))
discErr := kvpb.NewError(fmt.Errorf("disconnection error"))
disconnectReg.disconnect(discErr)
disconnectReg.disconnect(ctx, discErr)
require.Equal(t, discErr.GoError(), disconnectReg.WaitForError(t))
require.Equal(t, 2, len(disconnectReg.Events()))

Expand All @@ -76,7 +76,7 @@ func TestRegistrationBasic(t *testing.T) {
false /* withDiff */, false /* withFiltering */, false /* withOmitRemote */)
disconnectEarlyReg.publish(ctx, ev1, nil /* alloc */)
disconnectEarlyReg.publish(ctx, ev2, nil /* alloc */)
disconnectEarlyReg.disconnect(discErr)
disconnectEarlyReg.disconnect(ctx, discErr)
go disconnectEarlyReg.runOutputLoop(ctx, 0)
require.Equal(t, discErr.GoError(), disconnectEarlyReg.WaitForError(t))
require.Equal(t, 0, len(disconnectEarlyReg.Events()))
Expand Down Expand Up @@ -156,8 +156,8 @@ func TestRegistryWithOmitOrigin(t *testing.T) {
go rAC.runOutputLoop(ctx, 0)
go originFiltering.runOutputLoop(ctx, 0)

defer rAC.disconnect(nil)
defer originFiltering.disconnect(nil)
defer rAC.disconnect(ctx, nil)
defer originFiltering.disconnect(ctx, nil)

reg.Register(ctx, rAC.bufferedRegistration)
reg.Register(ctx, originFiltering.bufferedRegistration)
Expand Down Expand Up @@ -208,11 +208,11 @@ func TestRegistryBasic(t *testing.T) {
go rCD.runOutputLoop(ctx, 0)
go rAC.runOutputLoop(ctx, 0)
go rACFiltering.runOutputLoop(ctx, 0)
defer rAB.disconnect(nil)
defer rBC.disconnect(nil)
defer rCD.disconnect(nil)
defer rAC.disconnect(nil)
defer rACFiltering.disconnect(nil)
defer rAB.disconnect(ctx, nil)
defer rBC.disconnect(ctx, nil)
defer rCD.disconnect(ctx, nil)
defer rAC.disconnect(ctx, nil)
defer rACFiltering.disconnect(ctx, nil)

// Register 6 registrations.
reg.Register(ctx, rAB.bufferedRegistration)
Expand Down Expand Up @@ -355,35 +355,35 @@ func TestRegistryPublishBeneathStartTimestamp(t *testing.T) {
require.NoError(t, reg.waitForCaughtUp(ctx, all))
require.Equal(t, []*kvpb.RangeFeedEvent{ev}, r.Events())

r.disconnect(nil)
r.disconnect(ctx, nil)
}

func TestRegistrationString(t *testing.T) {
testCases := []struct {
r baseRegistration
r *baseRegistration
exp string
}{
{
r: baseRegistration{
r: &baseRegistration{
span: roachpb.Span{Key: roachpb.Key("a")},
},
exp: `[a @ 0,0+]`,
},
{
r: baseRegistration{span: roachpb.Span{
r: &baseRegistration{span: roachpb.Span{
Key: roachpb.Key("a"), EndKey: roachpb.Key("c")},
},
exp: `[{a-c} @ 0,0+]`,
},
{
r: baseRegistration{
r: &baseRegistration{
span: roachpb.Span{Key: roachpb.Key("d")},
catchUpTimestamp: hlc.Timestamp{WallTime: 10, Logical: 1},
},
exp: `[d @ 0.000000010,1+]`,
},
{
r: baseRegistration{span: roachpb.Span{
r: &baseRegistration{span: roachpb.Span{
Key: roachpb.Key("d"), EndKey: roachpb.Key("z")},
catchUpTimestamp: hlc.Timestamp{WallTime: 40, Logical: 9},
},
Expand Down
Loading
Loading