Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support seamless switchover redis for sequencer coordinator #2526

Merged
merged 16 commits into from
Oct 17, 2024
Merged
22 changes: 15 additions & 7 deletions arbnode/seq_coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -358,20 +358,20 @@ func (c *SeqCoordinator) GetRemoteMsgCount() (arbutil.MessageIndex, error) {
return c.getRemoteMsgCountImpl(c.GetContext(), c.Client)
}

func (c *SeqCoordinator) wantsLockoutUpdate(ctx context.Context) error {
func (c *SeqCoordinator) wantsLockoutUpdate(ctx context.Context, client redis.UniversalClient) error {
c.wantsLockoutMutex.Lock()
defer c.wantsLockoutMutex.Unlock()
return c.wantsLockoutUpdateWithMutex(ctx)
return c.wantsLockoutUpdateWithMutex(ctx, client)
}

// Requires the caller hold the wantsLockoutMutex
func (c *SeqCoordinator) wantsLockoutUpdateWithMutex(ctx context.Context) error {
func (c *SeqCoordinator) wantsLockoutUpdateWithMutex(ctx context.Context, client redis.UniversalClient) error {
if c.avoidLockout > 0 {
return nil
}
myWantsLockoutKey := redisutil.WantsLockoutKeyFor(c.config.Url())
wantsLockoutUntil := time.Now().Add(c.config.LockoutDuration)
pipe := c.Client.TxPipeline()
pipe := client.TxPipeline()
initialDuration := c.config.LockoutDuration
if initialDuration < 2*time.Second {
initialDuration = 2 * time.Second
Expand Down Expand Up @@ -657,7 +657,7 @@ func (c *SeqCoordinator) update(ctx context.Context) time.Duration {
// this could be just new messages we didn't get yet - even then, we should retry soon
log.Info("sequencer failed to become chosen", "err", err, "msgcount", localMsgCount)
// make sure we're marked as wanting the lockout
if err := c.wantsLockoutUpdate(ctx); err != nil {
if err := c.wantsLockoutUpdate(ctx, c.Client); err != nil {
log.Warn("failed to update wants lockout key", "err", err)
}
c.prevChosenSequencer = ""
Expand Down Expand Up @@ -685,7 +685,7 @@ func (c *SeqCoordinator) update(ctx context.Context) time.Duration {
// update wanting the lockout
var wantsLockoutErr error
if synced && !c.AvoidingLockout() {
wantsLockoutErr = c.wantsLockoutUpdate(ctx)
wantsLockoutErr = c.wantsLockoutUpdate(ctx, c.Client)
} else {
wantsLockoutErr = c.wantsLockoutRelease(ctx)
}
Expand Down Expand Up @@ -789,6 +789,14 @@ func (c *SeqCoordinator) trySwitchingRedis(ctx context.Context) error {
err, "newRedisUrl", c.config.NewRedisUrl)
return err
}
err = c.wantsLockoutUpdate(ctx, c.Client)
if err != nil {
return err
}
err = c.wantsLockoutUpdate(ctx, newRedisCoordinator.Client)
amsanghi marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return err
}
c.prevRedisCoordinator = &c.RedisCoordinator
c.RedisCoordinator = *newRedisCoordinator
}
Expand Down Expand Up @@ -908,7 +916,7 @@ func (c *SeqCoordinator) SeekLockout(ctx context.Context) {
log.Info("seeking lockout", "myUrl", c.config.Url())
if c.sequencer.Synced() {
// Even if this errors we still internally marked ourselves as wanting the lockout
err := c.wantsLockoutUpdateWithMutex(ctx)
err := c.wantsLockoutUpdateWithMutex(ctx, c.Client)
if err != nil {
log.Warn("failed to set wants lockout key in redis after seeking lockout again", "err", err)
}
Expand Down
Loading