diff --git a/go/vt/vttablet/tabletconntest/fakequeryservice.go b/go/vt/vttablet/tabletconntest/fakequeryservice.go index 2d62b017433..e63cd028d05 100644 --- a/go/vt/vttablet/tabletconntest/fakequeryservice.go +++ b/go/vt/vttablet/tabletconntest/fakequeryservice.go @@ -702,7 +702,8 @@ func (f *FakeQueryService) StreamHealth(ctx context.Context, callback func(*quer // VStream is part of the queryservice.QueryService interface func (f *FakeQueryService) VStream(ctx context.Context, request *binlogdatapb.VStreamRequest, send func([]*binlogdatapb.VEvent) error) error { - panic("not implemented") + // This is called as part of vreplication unit tests, so we don't panic here. + return fmt.Errorf("VStream not implemented") } // VStreamRows is part of the QueryService interface. diff --git a/go/vt/vttablet/tabletmanager/rpc_vreplication_test.go b/go/vt/vttablet/tabletmanager/rpc_vreplication_test.go index 3f8bc85ac7f..06a6eb8b0a7 100644 --- a/go/vt/vttablet/tabletmanager/rpc_vreplication_test.go +++ b/go/vt/vttablet/tabletmanager/rpc_vreplication_test.go @@ -27,6 +27,8 @@ import ( "testing" "time" + "vitess.io/vitess/go/vt/vttablet/tabletmanager/vreplication" + "github.com/stretchr/testify/require" "vitess.io/vitess/go/constants/sidecar" @@ -1783,6 +1785,14 @@ func addInvariants(dbClient *binlogplayer.MockDBClient, vreplID, sourceTabletUID )) dbClient.AddInvariant(fmt.Sprintf(updatePickedSourceTablet, cell, sourceTabletUID, vreplID), &sqltypes.Result{}) dbClient.AddInvariant("update _vt.vreplication set state='Running', message='' where id=1", &sqltypes.Result{}) + dbClient.AddInvariant(vreplication.SqlMaxAllowedPacket, sqltypes.MakeTestResult( + sqltypes.MakeTestFields( + "@@max_allowed_packet", + "int64", + ), + "65536", + )) + dbClient.AddInvariant("update _vt.vreplication set message", &sqltypes.Result{}) } func addMaterializeSettingsTablesToSchema(ms *vtctldatapb.MaterializeSettings, tenv *testEnv, venv *vtenv.Environment) { diff --git a/go/vt/vttablet/tabletmanager/vreplication/vplayer.go b/go/vt/vttablet/tabletmanager/vreplication/vplayer.go index 98e36119622..96dcd9884f3 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vplayer.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vplayer.go @@ -125,7 +125,7 @@ func newVPlayer(vr *vreplicator, settings binlogplayer.VRSettings, copyState map settings.StopPos = pausePos saveStop = false } - + log.Infof("Starting VReplication player id: %v, startPos: %v, stop: %v, filter: %v", vr.id, settings.StartPos, settings.StopPos, vr.source) queryFunc := func(ctx context.Context, sql string) (*sqltypes.Result, error) { return vr.dbClient.ExecuteWithRetry(ctx, sql) } @@ -142,7 +142,7 @@ func newVPlayer(vr *vreplicator, settings binlogplayer.VRSettings, copyState map maxAllowedPacket := int64(vr.workflowConfig.RelayLogMaxSize) // We explicitly do NOT want to batch this, we want to send it down the wire // immediately so we use ExecuteFetch directly. - res, err := vr.dbClient.ExecuteFetch("select @@session.max_allowed_packet as max_allowed_packet", 1) + res, err := vr.dbClient.ExecuteFetch(SqlMaxAllowedPacket, 1) if err != nil { log.Errorf("Error getting max_allowed_packet, will use the relay_log_max_size value of %d bytes: %v", vr.workflowConfig.RelayLogMaxSize, err) } else { diff --git a/go/vt/vttablet/tabletmanager/vreplication/vreplicator.go b/go/vt/vttablet/tabletmanager/vreplication/vreplicator.go index 42701288a44..76177b56b5b 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vreplicator.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vreplicator.go @@ -89,6 +89,7 @@ const ( json_unquote(json_extract(action, '$.type'))=%a and vrepl_id=%a and table_name=%a` sqlDeletePostCopyAction = `delete from _vt.post_copy_action where vrepl_id=%a and table_name=%a and id=%a` + SqlMaxAllowedPacket = "select @@session.max_allowed_packet as max_allowed_packet" ) // vreplicator provides the core logic to start vreplication streams