diff --git a/go/test/endtoend/vreplication/partial_movetables_test.go b/go/test/endtoend/vreplication/partial_movetables_test.go index 694c214149e..ce77c380cff 100644 --- a/go/test/endtoend/vreplication/partial_movetables_test.go +++ b/go/test/endtoend/vreplication/partial_movetables_test.go @@ -368,7 +368,7 @@ func testPartialMoveTablesBasic(t *testing.T, flavor workflowFlavor) { require.Equal(t, emptyWorkflowShowResponse, output) // Delete the original workflow - _, err = vc.VtctldClient.ExecuteCommandWithOutput("Workflow", "--keyspace", targetKs, "delete", "--shards", opts.shardSubset) + _, err = vc.VtctldClient.ExecuteCommandWithOutput("Workflow", "--keyspace", targetKs, "delete", "--workflow", wf, "--shards", opts.shardSubset) require.NoError(t, err) output, err = vc.VtctldClient.ExecuteCommandWithOutput("Workflow", "--keyspace", targetKs, "show", "--workflow", wf, "--shards", opts.shardSubset) require.NoError(t, err) diff --git a/go/vt/vtctl/grpcvtctldserver/server.go b/go/vt/vtctl/grpcvtctldserver/server.go index c3dc22d21b4..a230bc30528 100644 --- a/go/vt/vtctl/grpcvtctldserver/server.go +++ b/go/vt/vtctl/grpcvtctldserver/server.go @@ -4708,9 +4708,7 @@ func (s *VtctldServer) ValidateSchemaKeyspace(ctx context.Context, req *vtctldat shards, err := s.ts.GetShardNames(ctx, keyspace) if err != nil { - resp.Results = append(resp.Results, fmt.Sprintf("TopologyServer.GetShardNames(%v) failed: %v", req.Keyspace, err)) - err = nil - return resp, err + return nil, fmt.Errorf("TopologyServer.GetShardNames(%v) failed: %v", req.Keyspace, err) } resp.ResultsByShard = make(map[string]*vtctldatapb.ValidateShardResponse, len(shards)) @@ -4723,14 +4721,13 @@ func (s *VtctldServer) ValidateSchemaKeyspace(ctx context.Context, req *vtctldat } if req.IncludeVschema { - results, err2 := s.ValidateVSchema(ctx, &vtctldatapb.ValidateVSchemaRequest{ + results, err := s.ValidateVSchema(ctx, &vtctldatapb.ValidateVSchemaRequest{ Keyspace: keyspace, Shards: shards, ExcludeTables: req.ExcludeTables, IncludeViews: req.IncludeViews, }) - if err2 != nil { - err = err2 + if err != nil { return nil, err } @@ -4739,7 +4736,7 @@ func (s *VtctldServer) ValidateSchemaKeyspace(ctx context.Context, req *vtctldat for shard, shardResults := range resp.ResultsByShard { resp.ResultsByShard[shard].Results = append(resp.ResultsByShard[shard].Results, shardResults.Results...) } - return resp, err + return resp, nil } } @@ -4748,51 +4745,37 @@ func (s *VtctldServer) ValidateSchemaKeyspace(ctx context.Context, req *vtctldat var ( referenceSchema *tabletmanagerdatapb.SchemaDefinition referenceAlias *topodatapb.TabletAlias - m sync.Mutex - wg sync.WaitGroup + referenceMu sync.Mutex ) r := &tabletmanagerdatapb.GetSchemaRequest{ExcludeTables: req.ExcludeTables, IncludeViews: req.IncludeViews} - for _, shard := range shards[0:] { - wg.Add(1) - go func(shard string) { - defer wg.Done() - - si, err := s.ts.GetShard(ctx, keyspace, shard) - - m.Lock() - defer m.Unlock() + eg, egctx := errgroup.WithContext(ctx) + for _, shard := range shards { + eg.Go(func() error { + si, err := s.ts.GetShard(egctx, keyspace, shard) if err != nil { - errMessage := fmt.Sprintf("GetShard(%v, %v) failed: %v", keyspace, shard, err) - resp.ResultsByShard[shard].Results = append(resp.ResultsByShard[shard].Results, errMessage) - resp.Results = append(resp.Results, errMessage) - return + return fmt.Errorf("GetShard(%v, %v) failed: %v", keyspace, shard, err) } - if !si.HasPrimary() { - if !req.SkipNoPrimary { - errMessage := fmt.Sprintf("no primary in shard %v/%v", keyspace, shard) - resp.ResultsByShard[shard].Results = append(resp.ResultsByShard[shard].Results, errMessage) - resp.Results = append(resp.Results, errMessage) - } - return + if !si.HasPrimary() && !req.SkipNoPrimary { + return fmt.Errorf("no primary in shard %v/%v", keyspace, shard) } if referenceSchema == nil { + referenceMu.Lock() referenceAlias = si.PrimaryAlias - referenceSchema, err = schematools.GetSchema(ctx, s.ts, s.tmc, referenceAlias, r) + referenceSchema, err = schematools.GetSchema(egctx, s.ts, s.tmc, referenceAlias, r) + referenceMu.Unlock() if err != nil { - return + return fmt.Errorf("failed to get get schema on %s failed: %v", + topoproto.TabletAliasString(referenceAlias), err) } } - aliases, err := s.ts.FindAllTabletAliasesInShard(ctx, keyspace, shard) + aliases, err := s.ts.FindAllTabletAliasesInShard(egctx, keyspace, shard) if err != nil { - errMessage := fmt.Sprintf("FindAllTabletAliasesInShard(%v, %v) failed: %v", keyspace, shard, err) - resp.ResultsByShard[shard].Results = append(resp.ResultsByShard[shard].Results, errMessage) - resp.Results = append(resp.Results, errMessage) - return + return fmt.Errorf("FindAllTabletAliasesInShard(%v, %v) failed: %v", keyspace, shard, err) } aliasWg := sync.WaitGroup{} @@ -4805,7 +4788,7 @@ func (s *VtctldServer) ValidateSchemaKeyspace(ctx context.Context, req *vtctldat aliasWg.Add(1) go func(alias *topodatapb.TabletAlias) { defer aliasWg.Done() - replicaSchema, err := schematools.GetSchema(ctx, s.ts, s.tmc, alias, r) + replicaSchema, err := schematools.GetSchema(egctx, s.ts, s.tmc, alias, r) if err != nil { aliasErrs.RecordError(fmt.Errorf("GetSchema(%v, nil, %v, %v) failed: %v", alias, req.ExcludeTables, req.IncludeViews, err)) return @@ -4817,18 +4800,18 @@ func (s *VtctldServer) ValidateSchemaKeyspace(ctx context.Context, req *vtctldat aliasWg.Wait() if aliasErrs.HasErrors() { - for _, err := range aliasErrs.Errors { - errMessage := err.Error() - resp.ResultsByShard[shard].Results = append(resp.ResultsByShard[shard].Results, errMessage) - resp.Results = append(resp.Results, errMessage) - } + return aliasErrs.AggrError(vterrors.Aggregate) } - }(shard) + + return nil + }) } - wg.Wait() + if err := eg.Wait(); err != nil { + return nil, err + } - return resp, err + return resp, nil } // ValidateShard is part of the vtctlservicepb.VtctldServer interface.