Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#31927,#31928][prism] Support StringSet and Gauge metrics. #32184

Merged
merged 3 commits into from
Aug 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions sdks/go/pkg/beam/model/fnexecution_v1/beam_fn_api.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion sdks/go/pkg/beam/model/jobmanagement_v1/beam_job_api.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2,177 changes: 1,094 additions & 1,083 deletions sdks/go/pkg/beam/model/pipeline_v1/beam_runner_api.pb.go

Large diffs are not rendered by default.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion sdks/go/pkg/beam/model/pipeline_v1/endpoints.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

206 changes: 165 additions & 41 deletions sdks/go/pkg/beam/model/pipeline_v1/external_transforms.pb.go

Large diffs are not rendered by default.

698 changes: 362 additions & 336 deletions sdks/go/pkg/beam/model/pipeline_v1/metrics.pb.go

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion sdks/go/pkg/beam/model/pipeline_v1/schema.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

123 changes: 122 additions & 1 deletion sdks/go/pkg/beam/runners/prism/internal/jobservices/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,17 @@ func buildUrnToOpsMap(mUrn2Spec map[string]*pipepb.MonitoringInfoSpec) map[strin
// Defaults should be safe since the metric only exists if we get any values at all.
return &distributionInt64{dist: metrics.DistributionValue{Min: math.MaxInt64, Max: math.MinInt64}}
},
getMetTyp(pipepb.MonitoringInfoTypeUrns_LATEST_INT64_TYPE): func() metricAccumulator {
// Initializes the gauge so any new value will override it.
return &gaugeInt64{millisSinceEpoch: math.MinInt64}
},
getMetTyp(pipepb.MonitoringInfoTypeUrns_LATEST_DOUBLE_TYPE): func() metricAccumulator {
// Initializes the gauge so any new value will override it.
return &gaugeFloat64{millisSinceEpoch: math.MinInt64}
},
getMetTyp(pipepb.MonitoringInfoTypeUrns_SET_STRING_TYPE): func() metricAccumulator {
return &stringSet{set: map[string]struct{}{}}
},
getMetTyp(pipepb.MonitoringInfoTypeUrns_PROGRESS_TYPE): func() metricAccumulator { return &progress{} },
}

Expand Down Expand Up @@ -347,6 +358,116 @@ func (m *distributionInt64) toProto(key metricKey) *pipepb.MonitoringInfo {
}
}

type gaugeInt64 struct {
millisSinceEpoch int64
val int64
}

func (m *gaugeInt64) accumulate(pyld []byte) error {
buf := bytes.NewBuffer(pyld)

timestamp, err := coder.DecodeVarInt(buf)
if err != nil {
return err
}
if m.millisSinceEpoch > timestamp {
// Drop values that are older than what we have already.
return nil
}
val, err := coder.DecodeVarInt(buf)
if err != nil {
return err
}
m.millisSinceEpoch = timestamp
m.val = val
return nil
}

func (m *gaugeInt64) toProto(key metricKey) *pipepb.MonitoringInfo {
var buf bytes.Buffer
coder.EncodeVarInt(m.millisSinceEpoch, &buf)
coder.EncodeVarInt(m.val, &buf)
return &pipepb.MonitoringInfo{
Urn: key.Urn(),
Type: getMetTyp(pipepb.MonitoringInfoTypeUrns_LATEST_INT64_TYPE),
Payload: buf.Bytes(),
Labels: key.Labels(),
}
}

type gaugeFloat64 struct {
millisSinceEpoch int64
val float64
}

func (m *gaugeFloat64) accumulate(pyld []byte) error {
buf := bytes.NewBuffer(pyld)

timestamp, err := coder.DecodeVarInt(buf)
if err != nil {
return err
}
if m.millisSinceEpoch > timestamp {
// Drop values that are older than what we have already.
return nil
}
val, err := coder.DecodeDouble(buf)
if err != nil {
return err
}
m.millisSinceEpoch = timestamp
m.val = val
return nil
}

func (m *gaugeFloat64) toProto(key metricKey) *pipepb.MonitoringInfo {
var buf bytes.Buffer
coder.EncodeVarInt(m.millisSinceEpoch, &buf)
coder.EncodeDouble(m.val, &buf)
return &pipepb.MonitoringInfo{
Urn: key.Urn(),
Type: getMetTyp(pipepb.MonitoringInfoTypeUrns_LATEST_DOUBLE_TYPE),
Payload: buf.Bytes(),
Labels: key.Labels(),
}
}

type stringSet struct {
set map[string]struct{}
}

func (m *stringSet) accumulate(pyld []byte) error {
buf := bytes.NewBuffer(pyld)

n, err := coder.DecodeInt32(buf)
if err != nil {
return err
}
// Assume it's a fixed iterator size.
for i := int32(0); i < n; i++ {
val, err := coder.DecodeStringUTF8(buf)
if err != nil {
return err
}
m.set[val] = struct{}{}
}
return nil
}

func (m *stringSet) toProto(key metricKey) *pipepb.MonitoringInfo {
var buf bytes.Buffer
coder.EncodeInt32(int32(len(m.set)), &buf)
for k := range m.set {
coder.EncodeStringUTF8(k, &buf)
}
return &pipepb.MonitoringInfo{
Urn: key.Urn(),
Type: getMetTyp(pipepb.MonitoringInfoTypeUrns_SET_STRING_TYPE),
Payload: buf.Bytes(),
Labels: key.Labels(),
}
}

type durability int

const (
Expand Down Expand Up @@ -507,7 +628,7 @@ func (m *metricsStore) contributeMetrics(d durability, mdata map[string][]byte)
a = ops.newAccum()
}
if err := a.accumulate(payload); err != nil {
panic(fmt.Sprintf("error decoding metrics %v: %+v\n\t%+v", key.Urn(), key, a))
panic(fmt.Sprintf("error decoding metrics %v: %+v\n\t%+v :%v", key.Urn(), key, a, err))
}
accums[key] = a
switch u := key.Urn(); u {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,44 @@ func Test_metricsStore_ContributeMetrics(t *testing.T) {
want: []*pipepb.MonitoringInfo{
makeInfoWBytes(pipepb.MonitoringInfoSpecs_USER_DISTRIBUTION_INT64, []byte{4, 19, 2, 7}),
},
}, {
name: "int64Gauge",
input: []map[string][]byte{
{"a": []byte{3, 5}},
{"a": []byte{14, 2}},
{"a": []byte{10, 18}},
},
shortIDs: map[string]*pipepb.MonitoringInfo{
"a": makeInfo(pipepb.MonitoringInfoSpecs_USER_LATEST_INT64),
},
want: []*pipepb.MonitoringInfo{
makeInfoWBytes(pipepb.MonitoringInfoSpecs_USER_LATEST_INT64, []byte{14, 2}),
},
}, {
name: "float64Gauge",
input: []map[string][]byte{
{"a": append([]byte{2}, doubleBytes(45)...)},
{"a": append([]byte{17}, doubleBytes(2)...)},
{"a": append([]byte{16}, doubleBytes(200)...)},
},
shortIDs: map[string]*pipepb.MonitoringInfo{
"a": makeInfo(pipepb.MonitoringInfoSpecs_USER_LATEST_DOUBLE),
},
want: []*pipepb.MonitoringInfo{
makeInfoWBytes(pipepb.MonitoringInfoSpecs_USER_LATEST_DOUBLE, append([]byte{17}, doubleBytes(2)...)),
},
}, {
name: "stringSet",
input: []map[string][]byte{
{"a": []byte{0, 0, 0, 1, 1, 63}},
{"a": []byte{0, 0, 0, 2, 1, 63, 1, 63}},
},
shortIDs: map[string]*pipepb.MonitoringInfo{
"a": makeInfo(pipepb.MonitoringInfoSpecs_USER_SET_STRING),
},
want: []*pipepb.MonitoringInfo{
makeInfoWBytes(pipepb.MonitoringInfoSpecs_USER_SET_STRING, []byte{0, 0, 0, 1, 1, 63}),
},
},
}

Expand Down
Loading