Skip to content

Commit

Permalink
[#21515][Go SDK] Update go protobuf package to new version (#32045)
Browse files Browse the repository at this point in the history
  • Loading branch information
imvtsl authored Aug 7, 2024
1 parent 81ad4fe commit 828717a
Show file tree
Hide file tree
Showing 37 changed files with 72 additions and 76 deletions.
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@

* X feature added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)).
* Go SDK Minimum Go Version updated to 1.21 ([#32092](https://github.com/apache/beam/pull/32092)).
* Updated Go protobuf package to new version (Go) ([#21515](https://github.com/apache/beam/issues/21515)).

## Breaking Changes

Expand Down
2 changes: 1 addition & 1 deletion sdks/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ require (
github.com/docker/go-connections v0.5.0
github.com/dustin/go-humanize v1.0.1
github.com/go-sql-driver/mysql v1.8.1
github.com/golang/protobuf v1.5.4 // TODO(danoliveira): Fully replace this with google.golang.org/protobuf
github.com/google/go-cmp v0.6.0
github.com/google/uuid v1.6.0
github.com/johannesboyne/gofakes3 v0.0.0-20221110173912-32fb85c5aed6
Expand Down Expand Up @@ -88,6 +87,7 @@ require (
github.com/go-logr/logr v1.4.1 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/go-ole/go-ole v1.2.6 // indirect
github.com/golang/protobuf v1.5.4 // indirect
github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 // indirect
github.com/minio/highwayhash v1.0.2 // indirect
github.com/moby/docker-image-spec v1.3.1 // indirect
Expand Down
3 changes: 1 addition & 2 deletions sdks/go/cmd/beamctl/cmd/provision.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ package cmd

import (
fnpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/fnexecution_v1"
"github.com/golang/protobuf/proto"
"github.com/spf13/cobra"
)

Expand Down Expand Up @@ -53,6 +52,6 @@ func infoFn(cmd *cobra.Command, args []string) error {
return err
}

cmd.Print(proto.MarshalTextString(info.GetInfo()))
cmd.Print(info.GetInfo().String())
return nil
}
2 changes: 1 addition & 1 deletion sdks/go/container/boot_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (
"github.com/apache/beam/sdks/v2/go/pkg/beam/artifact"
fnpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/fnexecution_v1"
pipepb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1"
"github.com/golang/protobuf/proto"
"google.golang.org/protobuf/proto"
)

func TestEnsureEndpointsSet_AllSet(t *testing.T) {
Expand Down
13 changes: 9 additions & 4 deletions sdks/go/container/tools/provision.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ import (

fnpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/fnexecution_v1"
"github.com/apache/beam/sdks/v2/go/pkg/beam/util/grpcx"
"github.com/golang/protobuf/jsonpb"
google_pb "github.com/golang/protobuf/ptypes/struct"
"google.golang.org/protobuf/encoding/protojson"
google_pb "google.golang.org/protobuf/types/known/structpb"
)

// ProvisionInfo returns the runtime provisioning info for the worker.
Expand Down Expand Up @@ -65,7 +65,8 @@ func OptionsToProto(v any) (*google_pb.Struct, error) {
// JSONToProto converts JSON-encoded pipeline options to a proto struct.
func JSONToProto(data string) (*google_pb.Struct, error) {
var out google_pb.Struct
if err := jsonpb.UnmarshalString(string(data), &out); err != nil {

if err := protojson.Unmarshal([]byte(data), &out); err != nil {
return nil, err
}
return &out, nil
Expand All @@ -85,5 +86,9 @@ func ProtoToJSON(opt *google_pb.Struct) (string, error) {
if opt == nil {
return "{}", nil
}
return (&jsonpb.Marshaler{}).MarshalToString(opt)
bytes, err := protojson.Marshal(opt)
if err != nil {
return "", err
}
return string(bytes), err
}
2 changes: 1 addition & 1 deletion sdks/go/pkg/beam/artifact/gcsproxy/retrieval.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ import (
"github.com/apache/beam/sdks/v2/go/pkg/beam/internal/errors"
jobpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/jobmanagement_v1"
"github.com/apache/beam/sdks/v2/go/pkg/beam/util/gcsx"
"github.com/golang/protobuf/proto"
"golang.org/x/net/context"
"google.golang.org/protobuf/proto"
)

// RetrievalServer is a artifact retrieval server backed by Google
Expand Down
2 changes: 1 addition & 1 deletion sdks/go/pkg/beam/artifact/gcsproxy/staging.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ import (
"github.com/apache/beam/sdks/v2/go/pkg/beam/internal/errors"
jobpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/jobmanagement_v1"
"github.com/apache/beam/sdks/v2/go/pkg/beam/util/gcsx"
"github.com/golang/protobuf/proto"
"golang.org/x/net/context"
"google.golang.org/protobuf/proto"
)

// StagingServer is a artifact staging server backed by Google Cloud Storage
Expand Down
2 changes: 1 addition & 1 deletion sdks/go/pkg/beam/artifact/materialize.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ import (
pipepb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1"
"github.com/apache/beam/sdks/v2/go/pkg/beam/util/errorx"
"github.com/apache/beam/sdks/v2/go/pkg/beam/util/grpcx"
"github.com/golang/protobuf/proto"
"google.golang.org/protobuf/proto"
)

// TODO(lostluck): 2018/05/28 Extract these from their enum descriptors in the pipeline_v1 proto
Expand Down
2 changes: 1 addition & 1 deletion sdks/go/pkg/beam/artifact/materialize_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,9 @@ import (
jobpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/jobmanagement_v1"
pipepb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1"
"github.com/apache/beam/sdks/v2/go/pkg/beam/util/grpcx"
"github.com/golang/protobuf/proto"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
"google.golang.org/protobuf/proto"
)

// TestRetrieve tests that we can successfully retrieve fresh files.
Expand Down
12 changes: 6 additions & 6 deletions sdks/go/pkg/beam/coder.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ import (
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/jsonx"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/reflectx"
"github.com/apache/beam/sdks/v2/go/pkg/beam/internal/errors"
protov1 "github.com/golang/protobuf/proto"
protov2 "google.golang.org/protobuf/proto"
"google.golang.org/protobuf/protoadapt"
"google.golang.org/protobuf/reflect/protoreflect"
)

Expand All @@ -51,7 +51,7 @@ type jsonCoder interface {
json.Unmarshaler
}

var protoMessageType = reflect.TypeOf((*protov1.Message)(nil)).Elem()
var protoMessageType = reflect.TypeOf((*protoadapt.MessageV1)(nil)).Elem()
var protoReflectMessageType = reflect.TypeOf((*protoreflect.ProtoMessage)(nil)).Elem()
var jsonCoderType = reflect.TypeOf((*jsonCoder)(nil)).Elem()

Expand Down Expand Up @@ -276,8 +276,8 @@ func protoEnc(in T) ([]byte, error) {
switch it := in.(type) {
case protoreflect.ProtoMessage:
p = it
case protov1.Message:
p = protov1.MessageV2(it)
case protoadapt.MessageV1:
p = protoadapt.MessageV2Of(it)
}
b, err := protov2.MarshalOptions{Deterministic: true}.Marshal(p)
if err != nil {
Expand All @@ -293,8 +293,8 @@ func protoDec(t reflect.Type, in []byte) (T, error) {
switch it := reflect.New(t.Elem()).Interface().(type) {
case protoreflect.ProtoMessage:
p = it
case protov1.Message:
p = protov1.MessageV2(it)
case protoadapt.MessageV1:
p = protoadapt.MessageV2Of(it)
}
err := protov2.UnmarshalOptions{}.Unmarshal(in, p)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion sdks/go/pkg/beam/core/runtime/exec/translate.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import (
"github.com/apache/beam/sdks/v2/go/pkg/beam/internal/errors"
fnpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/fnexecution_v1"
pipepb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1"
"github.com/golang/protobuf/proto"
"google.golang.org/protobuf/proto"
)

// TODO(lostluck): 2018/05/28 Extract these from the canonical enums in beam_runner_api.proto
Expand Down
8 changes: 4 additions & 4 deletions sdks/go/pkg/beam/core/runtime/graphx/coder.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/protox"
"github.com/apache/beam/sdks/v2/go/pkg/beam/internal/errors"
pipepb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1"
"github.com/golang/protobuf/proto"
"google.golang.org/protobuf/proto"
)

const (
Expand Down Expand Up @@ -615,8 +615,8 @@ func (b *CoderMarshaller) internRowCoder(schema *pipepb.Schema) string {
}

func (b *CoderMarshaller) internCoder(coder *pipepb.Coder) string {
key := proto.MarshalTextString(coder)
if id, exists := b.coder2id[key]; exists {
key := coder.String()
if id, exists := b.coder2id[(key)]; exists {
return id
}

Expand All @@ -626,7 +626,7 @@ func (b *CoderMarshaller) internCoder(coder *pipepb.Coder) string {
} else {
id = fmt.Sprintf("c%v@%v", len(b.coder2id), b.Namespace)
}
b.coder2id[key] = id
b.coder2id[string(key)] = id
b.coders[id] = coder
return id
}
2 changes: 1 addition & 1 deletion sdks/go/pkg/beam/core/runtime/graphx/schema/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ import (
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/reflectx"
"github.com/apache/beam/sdks/v2/go/pkg/beam/internal/errors"
pipepb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1"
"github.com/golang/protobuf/proto"
"github.com/google/uuid"
"google.golang.org/protobuf/proto"
)

// Initialize registered schemas. For use by the beam package at beam.Init time.
Expand Down
3 changes: 1 addition & 2 deletions sdks/go/pkg/beam/core/runtime/graphx/schema/schema_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (

"github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime"
pipepb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1"
"github.com/golang/protobuf/proto"
"github.com/google/go-cmp/cmp"
"google.golang.org/protobuf/encoding/prototext"
"google.golang.org/protobuf/testing/protocmp"
Expand Down Expand Up @@ -806,7 +805,7 @@ func TestSchemaConversion(t *testing.T) {
}
if d := cmp.Diff(test.st, got,
protocmp.Transform(),
protocmp.IgnoreFields(proto.MessageV2(&pipepb.Schema{}), "id"),
protocmp.IgnoreFields(&pipepb.Schema{}, "id"),
); d != "" {
t.Errorf("diff (-want, +got): %v", d)
}
Expand Down
8 changes: 4 additions & 4 deletions sdks/go/pkg/beam/core/runtime/graphx/translate.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import (
"github.com/apache/beam/sdks/v2/go/pkg/beam/internal/errors"
pipepb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1"
"github.com/apache/beam/sdks/v2/go/pkg/beam/options/resource"
"github.com/golang/protobuf/proto"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/durationpb"
)

Expand Down Expand Up @@ -1209,13 +1209,13 @@ func (m *marshaller) addWindowingStrategy(w *window.WindowingStrategy) (string,
}

func (m *marshaller) internWindowingStrategy(w *pipepb.WindowingStrategy) string {
key := proto.MarshalTextString(w)
if id, exists := m.windowing2id[key]; exists {
key := w.String()
if id, exists := m.windowing2id[(key)]; exists {
return id
}

id := fmt.Sprintf("w%v", len(m.windowing2id))
m.windowing2id[key] = id
m.windowing2id[string(key)] = id
m.windowing[id] = w
return id
}
Expand Down
10 changes: 5 additions & 5 deletions sdks/go/pkg/beam/core/runtime/graphx/translate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ import (
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/protox"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/reflectx"
pipepb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1"
"github.com/golang/protobuf/proto"
"github.com/google/go-cmp/cmp"
"google.golang.org/protobuf/proto"
)

func init() {
Expand Down Expand Up @@ -181,13 +181,13 @@ func TestMarshal(t *testing.T) {
}

if got, want := len(p.GetComponents().GetTransforms()), test.transforms; got != want {
t.Errorf("got %d transforms, want %d : %v", got, want, proto.MarshalTextString(p))
t.Errorf("got %d transforms, want %d : %v", got, want, p.String())
}
if got, want := len(p.GetRootTransformIds()), test.roots; got != want {
t.Errorf("got %d roots, want %d : %v", got, want, proto.MarshalTextString(p))
t.Errorf("got %d roots, want %d : %v", got, want, p.String())
}
if got, want := p.GetRequirements(), test.requirements; !cmp.Equal(got, want, cmpopts.SortSlices(func(a, b string) bool { return a < b })) {
t.Errorf("incorrect requirements: got %v, want %v : %v", got, want, proto.MarshalTextString(p))
t.Errorf("incorrect requirements: got %v, want %v : %v", got, want, p.String())
}
})
}
Expand Down Expand Up @@ -248,7 +248,7 @@ func TestMarshal_PTransformAnnotations(t *testing.T) {

pts := p.GetComponents().GetTransforms()
if got, want := len(pts), test.transforms; got != want {
t.Errorf("got %d transforms, want %d : %v", got, want, proto.MarshalTextString(p))
t.Errorf("got %d transforms, want %d : %v", got, want, p.String())
}
for _, pt := range pts {
// Context annotations only apply to composites, and are not duplicated to leaves.
Expand Down
2 changes: 1 addition & 1 deletion sdks/go/pkg/beam/core/runtime/harness/harness_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec"
fnpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/fnexecution_v1"
pipepb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1"
"github.com/golang/protobuf/proto"
"google.golang.org/protobuf/proto"
)

// validDescriptor describes a valid pipeline with a source and a sink, but doesn't do anything else.
Expand Down
5 changes: 2 additions & 3 deletions sdks/go/pkg/beam/core/runtime/harness/statemgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (
"github.com/apache/beam/sdks/v2/go/pkg/beam/internal/errors"
"github.com/apache/beam/sdks/v2/go/pkg/beam/log"
fnpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/fnexecution_v1"
"github.com/golang/protobuf/proto"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
Expand Down Expand Up @@ -633,15 +632,15 @@ func (c *StateChannel) read(ctx context.Context) {
if !ok {
// This can happen if Send returns an error that write handles, but
// the message was actually sent.
log.Errorf(ctx, "StateChannel[%v].read: no consumer for state response: %v", c.id, proto.MarshalTextString(msg))
log.Errorf(ctx, "StateChannel[%v].read: no consumer for state response: %v", c.id, msg.String())
continue
}

select {
case ch <- msg:
// ok
default:
panic(fmt.Sprintf("StateChannel[%v].read: failed to consume state response: %v", c.id, proto.MarshalTextString(msg)))
panic(fmt.Sprintf("StateChannel[%v].read: failed to consume state response: %v", c.id, msg.String()))
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion sdks/go/pkg/beam/core/runtime/pipelinex/clone_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ import (
"testing"

pipepb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1"
"github.com/golang/protobuf/proto"
"github.com/google/go-cmp/cmp"
"google.golang.org/protobuf/proto"
)

func TestShallowClonePTransform(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion sdks/go/pkg/beam/core/runtime/pipelinex/replace.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import (
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/reflectx"
"github.com/apache/beam/sdks/v2/go/pkg/beam/internal/errors"
pipepb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1"
"github.com/golang/protobuf/proto"
"google.golang.org/protobuf/proto"
)

// Update merges a pipeline with the given components, which may add, replace
Expand Down
2 changes: 1 addition & 1 deletion sdks/go/pkg/beam/core/runtime/pipelinex/replace_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ import (
"testing"

pipepb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1"
"github.com/golang/protobuf/proto"
"github.com/google/go-cmp/cmp"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/testing/protocmp"
)

Expand Down
2 changes: 1 addition & 1 deletion sdks/go/pkg/beam/core/runtime/pipelinex/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import (
"sort"

pipepb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1"
"github.com/golang/protobuf/proto"
"google.golang.org/protobuf/proto"
)

// Bounded returns true iff all PCollections are bounded.
Expand Down
2 changes: 1 addition & 1 deletion sdks/go/pkg/beam/core/runtime/xlangx/resolve_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import (

"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph"
pipepb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1"
"github.com/golang/protobuf/proto"
"google.golang.org/protobuf/proto"
)

func createExternalEdge(typeUrn string, typePayload []byte) *graph.MultiEdge {
Expand Down
6 changes: 3 additions & 3 deletions sdks/go/pkg/beam/core/util/protox/any.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@ package protox

import (
"github.com/apache/beam/sdks/v2/go/pkg/beam/internal/errors"
"github.com/golang/protobuf/proto"
protobuf "github.com/golang/protobuf/ptypes/any"
protobufw "github.com/golang/protobuf/ptypes/wrappers"
"google.golang.org/protobuf/proto"
protobuf "google.golang.org/protobuf/types/known/anypb"
protobufw "google.golang.org/protobuf/types/known/wrapperspb"
)

const (
Expand Down
4 changes: 2 additions & 2 deletions sdks/go/pkg/beam/core/util/protox/any_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ import (
"bytes"
"testing"

"github.com/golang/protobuf/proto"
protobufw "github.com/golang/protobuf/ptypes/wrappers"
"google.golang.org/protobuf/proto"
protobufw "google.golang.org/protobuf/types/known/wrapperspb"
)

func TestProtoPackingInvertibility(t *testing.T) {
Expand Down
Loading

0 comments on commit 828717a

Please sign in to comment.