Skip to content

Commit

Permalink
refac: newLookupVindex func to reduce code duplicacy
Browse files Browse the repository at this point in the history
Signed-off-by: Noble Mittal <[email protected]>
  • Loading branch information
beingnoble03 committed Dec 21, 2024
1 parent 4e8a539 commit 94dfd64
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 62 deletions.
39 changes: 25 additions & 14 deletions go/vt/vtctl/workflow/lookup_vindex.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,19 @@ type lookupVindex struct {
parser *sqlparser.Parser
}

// newLookupVindex creates a new lookupVindex instance which is responsible
// for performing actions related to lookup vindexes.
func newLookupVindex(ws *Server) *lookupVindex {
return &lookupVindex{
ts: ws.ts,
tmc: ws.tmc,
logger: ws.Logger(),
parser: ws.SQLParser(),
}
}

// prepareCreate performs the preparatory steps for creating a Lookup Vindex.
func (l *lookupVindex) prepareCreate(ctx context.Context, workflow, keyspace string, specs *vschemapb.Keyspace, continueAfterCopyWithOwner bool) (
func (lv *lookupVindex) prepareCreate(ctx context.Context, workflow, keyspace string, specs *vschemapb.Keyspace, continueAfterCopyWithOwner bool) (
ms *vtctldatapb.MaterializeSettings, sourceVSchema, targetVSchema *vschemapb.Keyspace, cancelFunc func() error, err error) {
var (
// sourceVSchemaTable is the table info present in the vschema.
Expand All @@ -65,7 +76,7 @@ func (l *lookupVindex) prepareCreate(ctx context.Context, workflow, keyspace str
)

// Validate input vindex.
vindex, vInfo, err := l.validateAndGetVindex(specs)
vindex, vInfo, err := lv.validateAndGetVindex(specs)
if err != nil {
return nil, nil, nil, nil, err
}
Expand All @@ -80,7 +91,7 @@ func (l *lookupVindex) prepareCreate(ctx context.Context, workflow, keyspace str
return nil, nil, nil, nil, err
}

sourceVSchema, targetVSchema, err = l.getTargetAndSourceVSchema(ctx, keyspace, vInfo.targetKeyspace)
sourceVSchema, targetVSchema, err = lv.getTargetAndSourceVSchema(ctx, keyspace, vInfo.targetKeyspace)
if err != nil {
return nil, nil, nil, nil, err
}
Expand All @@ -102,7 +113,7 @@ func (l *lookupVindex) prepareCreate(ctx context.Context, workflow, keyspace str
}

// Validate against source schema.
sourceShards, err := l.ts.GetServingShards(ctx, keyspace)
sourceShards, err := lv.ts.GetServingShards(ctx, keyspace)
if err != nil {
return nil, nil, nil, nil, err
}
Expand All @@ -113,7 +124,7 @@ func (l *lookupVindex) prepareCreate(ctx context.Context, workflow, keyspace str
}

req := &tabletmanagerdatapb.GetSchemaRequest{Tables: []string{vInfo.sourceTableName}}
tableSchema, err := schematools.GetSchema(ctx, l.ts, l.tmc, onesource.PrimaryAlias, req)
tableSchema, err := schematools.GetSchema(ctx, lv.ts, lv.tmc, onesource.PrimaryAlias, req)
if err != nil {
return nil, nil, nil, nil, err
}
Expand All @@ -124,7 +135,7 @@ func (l *lookupVindex) prepareCreate(ctx context.Context, workflow, keyspace str
}

// Generate "create table" statement.
createDDL, err = l.generateCreateDDLStatement(tableSchema, sourceVindexColumns, vInfo, vindex)
createDDL, err = lv.generateCreateDDLStatement(tableSchema, sourceVindexColumns, vInfo, vindex)
if err != nil {
return nil, nil, nil, nil, err
}
Expand Down Expand Up @@ -202,7 +213,7 @@ func (l *lookupVindex) prepareCreate(ctx context.Context, workflow, keyspace str
if targetChanged {
cancelFunc = func() error {
// Restore the original target vschema.
return l.ts.SaveVSchema(ctx, vInfo.targetKeyspace, ogTargetVSchema)
return lv.ts.SaveVSchema(ctx, vInfo.targetKeyspace, ogTargetVSchema)
}
}

Expand Down Expand Up @@ -241,7 +252,7 @@ type vindexInfo struct {
}

// validateAndGetVindex validates and extracts vindex configuration
func (l *lookupVindex) validateAndGetVindex(specs *vschemapb.Keyspace) (*vschemapb.Vindex, *vindexInfo, error) {
func (lv *lookupVindex) validateAndGetVindex(specs *vschemapb.Keyspace) (*vschemapb.Vindex, *vindexInfo, error) {
if specs == nil {
return nil, nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "no vindex provided")
}
Expand All @@ -256,7 +267,7 @@ func (l *lookupVindex) validateAndGetVindex(specs *vschemapb.Keyspace) (*vschema
return nil, nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "vindex %s is not a lookup type", vindex.Type)
}

targetKeyspace, targetTableName, err := l.parser.ParseTable(vindex.Params["table"])
targetKeyspace, targetTableName, err := lv.parser.ParseTable(vindex.Params["table"])
if err != nil || targetKeyspace == "" {
return nil, nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT,
"vindex table name (%s) must be in the form <keyspace>.<table>", vindex.Params["table"])
Expand Down Expand Up @@ -313,8 +324,8 @@ func (l *lookupVindex) validateAndGetVindex(specs *vschemapb.Keyspace) (*vschema
}, nil
}

func (l *lookupVindex) getTargetAndSourceVSchema(ctx context.Context, sourceKeyspace string, targetKeyspace string) (sourceVSchema *vschemapb.Keyspace, targetVSchema *vschemapb.Keyspace, err error) {
sourceVSchema, err = l.ts.GetVSchema(ctx, sourceKeyspace)
func (lv *lookupVindex) getTargetAndSourceVSchema(ctx context.Context, sourceKeyspace string, targetKeyspace string) (sourceVSchema *vschemapb.Keyspace, targetVSchema *vschemapb.Keyspace, err error) {
sourceVSchema, err = lv.ts.GetVSchema(ctx, sourceKeyspace)
if err != nil {
return nil, nil, err
}
Expand All @@ -326,7 +337,7 @@ func (l *lookupVindex) getTargetAndSourceVSchema(ctx context.Context, sourceKeys
if sourceKeyspace == targetKeyspace {
targetVSchema = sourceVSchema
} else {
targetVSchema, err = l.ts.GetVSchema(ctx, targetKeyspace)
targetVSchema, err = lv.ts.GetVSchema(ctx, targetKeyspace)
if err != nil {
return nil, nil, err
}
Expand Down Expand Up @@ -374,7 +385,7 @@ func getSourceTable(specs *vschemapb.Keyspace, targetTableName string, fromCols
return sourceTable, sourceTableName, nil
}

func (l *lookupVindex) generateCreateDDLStatement(tableSchema *tabletmanagerdatapb.SchemaDefinition, sourceVindexColumns []string, vInfo *vindexInfo, vindex *vschemapb.Vindex) (string, error) {
func (lv *lookupVindex) generateCreateDDLStatement(tableSchema *tabletmanagerdatapb.SchemaDefinition, sourceVindexColumns []string, vInfo *vindexInfo, vindex *vschemapb.Vindex) (string, error) {
lines := strings.Split(tableSchema.TableDefinitions[0].Schema, "\n")
if len(lines) < 3 {
// Should never happen.
Expand Down Expand Up @@ -412,7 +423,7 @@ func (l *lookupVindex) generateCreateDDLStatement(tableSchema *tabletmanagerdata
createDDL := strings.Join(modified, "\n")

// Confirm that our DDL is valid before we create anything.
if _, err := l.parser.ParseStrictDDL(createDDL); err != nil {
if _, err := lv.parser.ParseStrictDDL(createDDL); err != nil {
return "", vterrors.Errorf(vtrpcpb.Code_INTERNAL, "error: %v; invalid lookup table definition generated: %s",
err, createDDL)
}
Expand Down
49 changes: 7 additions & 42 deletions go/vt/vtctl/workflow/materializer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1515,12 +1515,7 @@ func TestCreateLookupVindexCreateDDL(t *testing.T) {
setStartingVschema()
}()
}
lv := &lookupVindex{
ts: env.ws.ts,
tmc: env.ws.tmc,
logger: env.ws.Logger(),
parser: env.ws.SQLParser(),
}
lv := newLookupVindex(env.ws)
outms, _, _, cancelFunc, err := lv.prepareCreate(ctx, "workflow", ms.SourceKeyspace, tcase.specs, false)
if tcase.err != "" {
require.Error(t, err)
Expand Down Expand Up @@ -1769,12 +1764,7 @@ func TestCreateLookupVindexSourceVSchema(t *testing.T) {
t.Fatal(err)
}

lv := &lookupVindex{
ts: env.ws.ts,
tmc: env.ws.tmc,
logger: env.ws.Logger(),
parser: env.ws.SQLParser(),
}
lv := newLookupVindex(env.ws)
_, got, _, _, err := lv.prepareCreate(ctx, "workflow", ms.SourceKeyspace, specs, false)
require.NoError(t, err)
if !proto.Equal(got, tcase.out) {
Expand Down Expand Up @@ -2011,12 +2001,7 @@ func TestCreateLookupVindexTargetVSchema(t *testing.T) {
t.Fatal(err)
}

lv := &lookupVindex{
ts: env.ws.ts,
tmc: env.ws.tmc,
logger: env.ws.Logger(),
parser: env.ws.SQLParser(),
}
lv := newLookupVindex(env.ws)
_, _, got, cancelFunc, err := lv.prepareCreate(ctx, "workflow", ms.SourceKeyspace, specs, false)
if tcase.err != "" {
if err == nil || !strings.Contains(err.Error(), tcase.err) {
Expand Down Expand Up @@ -2139,12 +2124,7 @@ func TestCreateLookupVindexSameKeyspace(t *testing.T) {
t.Fatal(err)
}

lv := &lookupVindex{
ts: env.ws.ts,
tmc: env.ws.tmc,
logger: env.ws.Logger(),
parser: env.ws.SQLParser(),
}
lv := newLookupVindex(env.ws)
_, got, _, _, err := lv.prepareCreate(ctx, "keyspace", ms.TargetKeyspace, specs, false)
require.NoError(t, err)
if !proto.Equal(got, want) {
Expand Down Expand Up @@ -2271,12 +2251,7 @@ func TestCreateCustomizedVindex(t *testing.T) {
t.Fatal(err)
}

lv := &lookupVindex{
ts: env.ws.ts,
tmc: env.ws.tmc,
logger: env.ws.Logger(),
parser: env.ws.SQLParser(),
}
lv := newLookupVindex(env.ws)
_, got, _, _, err := lv.prepareCreate(ctx, "workflow", ms.TargetKeyspace, specs, false)
require.NoError(t, err)
if !proto.Equal(got, want) {
Expand Down Expand Up @@ -2395,12 +2370,7 @@ func TestCreateLookupVindexIgnoreNulls(t *testing.T) {
t.Fatal(err)
}

lv := &lookupVindex{
ts: env.ws.ts,
tmc: env.ws.tmc,
logger: env.ws.Logger(),
parser: env.ws.SQLParser(),
}
lv := newLookupVindex(env.ws)
ms, ks, _, _, err := lv.prepareCreate(ctx, "workflow", ms.TargetKeyspace, specs, false)
require.NoError(t, err)
if !proto.Equal(wantKs, ks) {
Expand Down Expand Up @@ -2481,12 +2451,7 @@ func TestStopAfterCopyFlag(t *testing.T) {
t.Fatal(err)
}

lv := &lookupVindex{
ts: env.ws.ts,
tmc: env.ws.tmc,
logger: env.ws.Logger(),
parser: env.ws.SQLParser(),
}
lv := newLookupVindex(env.ws)
ms1, _, _, _, err := lv.prepareCreate(ctx, "workflow", ms.TargetKeyspace, specs, false)
require.NoError(t, err)
require.Equal(t, ms1.StopAfterCopy, true)
Expand Down
8 changes: 2 additions & 6 deletions go/vt/vtctl/workflow/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -567,12 +567,8 @@ func (s *Server) LookupVindexCreate(ctx context.Context, req *vtctldatapb.Lookup
span.Annotate("cells", req.Cells)
span.Annotate("tablet_types", req.TabletTypes)

lv := &lookupVindex{
ts: s.ts,
tmc: s.tmc,
logger: s.Logger(),
parser: s.SQLParser(),
}
lv := newLookupVindex(s)

ms, sourceVSchema, targetVSchema, cancelFunc, err := lv.prepareCreate(ctx, req.Workflow, req.Keyspace, req.Vindex, req.ContinueAfterCopyWithOwner)
if err != nil {
return nil, err
Expand Down

0 comments on commit 94dfd64

Please sign in to comment.