Skip to content

Commit

Permalink
Handle race condition in TestMoveTables(Un)sharded where vplayer can …
Browse files Browse the repository at this point in the history
…start in some runs and update mock support for it

Signed-off-by: Rohit Nayak <[email protected]>
  • Loading branch information
rohit-nayak-ps committed Dec 26, 2024
1 parent 837ddd4 commit 872bbcd
Show file tree
Hide file tree
Showing 4 changed files with 15 additions and 3 deletions.
3 changes: 2 additions & 1 deletion go/vt/vttablet/tabletconntest/fakequeryservice.go
Original file line number Diff line number Diff line change
Expand Up @@ -702,7 +702,8 @@ func (f *FakeQueryService) StreamHealth(ctx context.Context, callback func(*quer

// VStream is part of the queryservice.QueryService interface
func (f *FakeQueryService) VStream(ctx context.Context, request *binlogdatapb.VStreamRequest, send func([]*binlogdatapb.VEvent) error) error {
panic("not implemented")
// This is called as part of vreplication unit tests, so we don't panic here.
return fmt.Errorf("VStream not implemented")
}

// VStreamRows is part of the QueryService interface.
Expand Down
10 changes: 10 additions & 0 deletions go/vt/vttablet/tabletmanager/rpc_vreplication_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ import (
"testing"
"time"

"vitess.io/vitess/go/vt/vttablet/tabletmanager/vreplication"

"github.com/stretchr/testify/require"

"vitess.io/vitess/go/constants/sidecar"
Expand Down Expand Up @@ -1783,6 +1785,14 @@ func addInvariants(dbClient *binlogplayer.MockDBClient, vreplID, sourceTabletUID
))
dbClient.AddInvariant(fmt.Sprintf(updatePickedSourceTablet, cell, sourceTabletUID, vreplID), &sqltypes.Result{})
dbClient.AddInvariant("update _vt.vreplication set state='Running', message='' where id=1", &sqltypes.Result{})
dbClient.AddInvariant(vreplication.SqlMaxAllowedPacket, sqltypes.MakeTestResult(
sqltypes.MakeTestFields(
"@@max_allowed_packet",
"int64",
),
"65536",
))
dbClient.AddInvariant("update _vt.vreplication set message", &sqltypes.Result{})
}

func addMaterializeSettingsTablesToSchema(ms *vtctldatapb.MaterializeSettings, tenv *testEnv, venv *vtenv.Environment) {
Expand Down
4 changes: 2 additions & 2 deletions go/vt/vttablet/tabletmanager/vreplication/vplayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ func newVPlayer(vr *vreplicator, settings binlogplayer.VRSettings, copyState map
settings.StopPos = pausePos
saveStop = false
}

log.Infof("Starting VReplication player id: %v, startPos: %v, stop: %v, filter: %v", vr.id, settings.StartPos, settings.StopPos, vr.source)
queryFunc := func(ctx context.Context, sql string) (*sqltypes.Result, error) {
return vr.dbClient.ExecuteWithRetry(ctx, sql)
}
Expand All @@ -142,7 +142,7 @@ func newVPlayer(vr *vreplicator, settings binlogplayer.VRSettings, copyState map
maxAllowedPacket := int64(vr.workflowConfig.RelayLogMaxSize)
// We explicitly do NOT want to batch this, we want to send it down the wire
// immediately so we use ExecuteFetch directly.
res, err := vr.dbClient.ExecuteFetch("select @@session.max_allowed_packet as max_allowed_packet", 1)
res, err := vr.dbClient.ExecuteFetch(SqlMaxAllowedPacket, 1)
if err != nil {
log.Errorf("Error getting max_allowed_packet, will use the relay_log_max_size value of %d bytes: %v", vr.workflowConfig.RelayLogMaxSize, err)
} else {
Expand Down
1 change: 1 addition & 0 deletions go/vt/vttablet/tabletmanager/vreplication/vreplicator.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ const (
json_unquote(json_extract(action, '$.type'))=%a and vrepl_id=%a and table_name=%a`
sqlDeletePostCopyAction = `delete from _vt.post_copy_action where vrepl_id=%a and
table_name=%a and id=%a`
SqlMaxAllowedPacket = "select @@session.max_allowed_packet as max_allowed_packet"
)

// vreplicator provides the core logic to start vreplication streams
Expand Down

0 comments on commit 872bbcd

Please sign in to comment.