Skip to content

Commit

Permalink
feat: Make state client versioning the default, remove option (#1938)
Browse files Browse the repository at this point in the history
#### Summary

Fixes cloudquery/cloudquery-issues#2435

---
  • Loading branch information
erezrokah authored Oct 18, 2024
1 parent d9e6f47 commit f105651
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 25 deletions.
22 changes: 7 additions & 15 deletions internal/clients/state/v3/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,21 +36,8 @@ type versionedValue struct {
version uint64
}

func VersionedTable(name string) *schema.Table {
t := table(name)
t.Columns = append(t.Columns, schema.Column{
// Not defined as PrimaryKey to enable single keys if the destination supports PKs
Name: versionColumn,
Type: arrow.PrimitiveTypes.Uint64,
})
return t
}

func NewClient(ctx context.Context, conn *grpc.ClientConn, tableName string) (*Client, error) {
return NewClientWithTable(ctx, conn, table(tableName))
}

func NewClientWithTable(ctx context.Context, conn *grpc.ClientConn, table *schema.Table) (*Client, error) {
table := Table(tableName)
c := &Client{
conn: conn,
client: pb.NewPluginClient(conn),
Expand Down Expand Up @@ -215,7 +202,7 @@ func (c *Client) Close() error {
return nil
}

func table(name string) *schema.Table {
func Table(name string) *schema.Table {
return &schema.Table{
Name: name,
Columns: []schema.Column{
Expand All @@ -228,6 +215,11 @@ func table(name string) *schema.Table {
Name: valueColumn,
Type: arrow.BinaryTypes.String,
},
{
// Not defined as PrimaryKey to enable single keys if the destination supports PKs
Name: versionColumn,
Type: arrow.PrimitiveTypes.Uint64,
},
},
}
}
6 changes: 3 additions & 3 deletions serve/state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,13 +117,13 @@ func TestStateOverwriteGetLatest(t *testing.T) {
t.Fatal(err)
}

table := state.VersionedTable("test_no_pk")
table := state.Table("test_no_pk")
// Remove PKs
for i := range table.Columns {
table.Columns[i].PrimaryKey = false
}

stateClient, err := state.NewClientWithTable(ctx, conn, table)
stateClient, err := state.NewClient(ctx, conn, table.Name)
if err != nil {
t.Fatal(err)
}
Expand All @@ -138,7 +138,7 @@ func TestStateOverwriteGetLatest(t *testing.T) {
}
}

stateClient, err = state.NewClientWithTable(ctx, conn, table)
stateClient, err = state.NewClient(ctx, conn, table.Name)
if err != nil {
t.Fatal(err)
}
Expand Down
9 changes: 2 additions & 7 deletions state/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,7 @@ type Client interface {
Close() error
}

type ClientOptions struct {
Versioned bool
}
type ClientOptions struct{}

type ConnectionOptions struct {
MaxMsgSizeInBytes int
Expand All @@ -33,7 +31,7 @@ func NewClient(ctx context.Context, conn *grpc.ClientConn, tableName string) (Cl
return NewClientWithOptions(ctx, conn, tableName, ClientOptions{})
}

func NewClientWithOptions(ctx context.Context, conn *grpc.ClientConn, tableName string, opts ClientOptions) (Client, error) {
func NewClientWithOptions(ctx context.Context, conn *grpc.ClientConn, tableName string, _ ClientOptions) (Client, error) {
discoveryClient := pbDiscovery.NewDiscoveryClient(conn)
versions, err := discoveryClient.GetVersions(ctx, &pbDiscovery.GetVersions_Request{})
if err != nil {
Expand All @@ -43,9 +41,6 @@ func NewClientWithOptions(ctx context.Context, conn *grpc.ClientConn, tableName
return nil, fmt.Errorf("please upgrade your state backend plugin. state supporting version 3 plugin has %v", versions.Versions)
}

if opts.Versioned {
return stateV3.NewClientWithTable(ctx, conn, stateV3.VersionedTable(tableName))
}
return stateV3.NewClient(ctx, conn, tableName)
}

Expand Down

0 comments on commit f105651

Please sign in to comment.