Skip to content

Commit

Permalink
Improve and align ValidateSchemaKeyspace
Browse files Browse the repository at this point in the history
Signed-off-by: Matt Lord <[email protected]>
  • Loading branch information
mattlord committed Dec 27, 2024
1 parent 98526d1 commit b73f2ef
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 46 deletions.
2 changes: 1 addition & 1 deletion go/test/endtoend/vreplication/partial_movetables_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -368,7 +368,7 @@ func testPartialMoveTablesBasic(t *testing.T, flavor workflowFlavor) {
require.Equal(t, emptyWorkflowShowResponse, output)

// Delete the original workflow
_, err = vc.VtctldClient.ExecuteCommandWithOutput("Workflow", "--keyspace", targetKs, "delete", "--shards", opts.shardSubset)
_, err = vc.VtctldClient.ExecuteCommandWithOutput("Workflow", "--keyspace", targetKs, "delete", "--workflow", wf, "--shards", opts.shardSubset)
require.NoError(t, err)
output, err = vc.VtctldClient.ExecuteCommandWithOutput("Workflow", "--keyspace", targetKs, "show", "--workflow", wf, "--shards", opts.shardSubset)
require.NoError(t, err)
Expand Down
73 changes: 28 additions & 45 deletions go/vt/vtctl/grpcvtctldserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -4708,9 +4708,7 @@ func (s *VtctldServer) ValidateSchemaKeyspace(ctx context.Context, req *vtctldat

shards, err := s.ts.GetShardNames(ctx, keyspace)
if err != nil {
resp.Results = append(resp.Results, fmt.Sprintf("TopologyServer.GetShardNames(%v) failed: %v", req.Keyspace, err))
err = nil
return resp, err
return nil, fmt.Errorf("TopologyServer.GetShardNames(%v) failed: %v", req.Keyspace, err)
}

resp.ResultsByShard = make(map[string]*vtctldatapb.ValidateShardResponse, len(shards))
Expand All @@ -4723,14 +4721,13 @@ func (s *VtctldServer) ValidateSchemaKeyspace(ctx context.Context, req *vtctldat
}

if req.IncludeVschema {
results, err2 := s.ValidateVSchema(ctx, &vtctldatapb.ValidateVSchemaRequest{
results, err := s.ValidateVSchema(ctx, &vtctldatapb.ValidateVSchemaRequest{
Keyspace: keyspace,
Shards: shards,
ExcludeTables: req.ExcludeTables,
IncludeViews: req.IncludeViews,
})
if err2 != nil {
err = err2
if err != nil {
return nil, err
}

Expand All @@ -4739,7 +4736,7 @@ func (s *VtctldServer) ValidateSchemaKeyspace(ctx context.Context, req *vtctldat
for shard, shardResults := range resp.ResultsByShard {
resp.ResultsByShard[shard].Results = append(resp.ResultsByShard[shard].Results, shardResults.Results...)
}
return resp, err
return resp, nil
}
}

Expand All @@ -4748,51 +4745,37 @@ func (s *VtctldServer) ValidateSchemaKeyspace(ctx context.Context, req *vtctldat
var (
referenceSchema *tabletmanagerdatapb.SchemaDefinition
referenceAlias *topodatapb.TabletAlias
m sync.Mutex
wg sync.WaitGroup
referenceMu sync.Mutex
)

r := &tabletmanagerdatapb.GetSchemaRequest{ExcludeTables: req.ExcludeTables, IncludeViews: req.IncludeViews}
for _, shard := range shards[0:] {
wg.Add(1)
go func(shard string) {
defer wg.Done()

si, err := s.ts.GetShard(ctx, keyspace, shard)

m.Lock()
defer m.Unlock()
eg, egctx := errgroup.WithContext(ctx)
for _, shard := range shards {
eg.Go(func() error {
si, err := s.ts.GetShard(egctx, keyspace, shard)

if err != nil {
errMessage := fmt.Sprintf("GetShard(%v, %v) failed: %v", keyspace, shard, err)
resp.ResultsByShard[shard].Results = append(resp.ResultsByShard[shard].Results, errMessage)
resp.Results = append(resp.Results, errMessage)
return
return fmt.Errorf("GetShard(%v, %v) failed: %v", keyspace, shard, err)
}

if !si.HasPrimary() {
if !req.SkipNoPrimary {
errMessage := fmt.Sprintf("no primary in shard %v/%v", keyspace, shard)
resp.ResultsByShard[shard].Results = append(resp.ResultsByShard[shard].Results, errMessage)
resp.Results = append(resp.Results, errMessage)
}
return
if !si.HasPrimary() && !req.SkipNoPrimary {
return fmt.Errorf("no primary in shard %v/%v", keyspace, shard)
}

if referenceSchema == nil {
referenceMu.Lock()
referenceAlias = si.PrimaryAlias
referenceSchema, err = schematools.GetSchema(ctx, s.ts, s.tmc, referenceAlias, r)
referenceSchema, err = schematools.GetSchema(egctx, s.ts, s.tmc, referenceAlias, r)
referenceMu.Unlock()
if err != nil {
return
return fmt.Errorf("failed to get get schema on %s failed: %v",
topoproto.TabletAliasString(referenceAlias), err)
}
}

aliases, err := s.ts.FindAllTabletAliasesInShard(ctx, keyspace, shard)
aliases, err := s.ts.FindAllTabletAliasesInShard(egctx, keyspace, shard)
if err != nil {
errMessage := fmt.Sprintf("FindAllTabletAliasesInShard(%v, %v) failed: %v", keyspace, shard, err)
resp.ResultsByShard[shard].Results = append(resp.ResultsByShard[shard].Results, errMessage)
resp.Results = append(resp.Results, errMessage)
return
return fmt.Errorf("FindAllTabletAliasesInShard(%v, %v) failed: %v", keyspace, shard, err)
}

aliasWg := sync.WaitGroup{}
Expand All @@ -4805,7 +4788,7 @@ func (s *VtctldServer) ValidateSchemaKeyspace(ctx context.Context, req *vtctldat
aliasWg.Add(1)
go func(alias *topodatapb.TabletAlias) {
defer aliasWg.Done()
replicaSchema, err := schematools.GetSchema(ctx, s.ts, s.tmc, alias, r)
replicaSchema, err := schematools.GetSchema(egctx, s.ts, s.tmc, alias, r)
if err != nil {
aliasErrs.RecordError(fmt.Errorf("GetSchema(%v, nil, %v, %v) failed: %v", alias, req.ExcludeTables, req.IncludeViews, err))
return
Expand All @@ -4817,18 +4800,18 @@ func (s *VtctldServer) ValidateSchemaKeyspace(ctx context.Context, req *vtctldat
aliasWg.Wait()

if aliasErrs.HasErrors() {
for _, err := range aliasErrs.Errors {
errMessage := err.Error()
resp.ResultsByShard[shard].Results = append(resp.ResultsByShard[shard].Results, errMessage)
resp.Results = append(resp.Results, errMessage)
}
return aliasErrs.AggrError(vterrors.Aggregate)
}
}(shard)

return nil
})
}

wg.Wait()
if err := eg.Wait(); err != nil {
return nil, err
}

return resp, err
return resp, nil
}

// ValidateShard is part of the vtctlservicepb.VtctldServer interface.
Expand Down

0 comments on commit b73f2ef

Please sign in to comment.