diff --git a/sdks/go/pkg/beam/runners/prism/internal/coders.go b/sdks/go/pkg/beam/runners/prism/internal/coders.go index 6deaab65d366..b7157b8598de 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/coders.go +++ b/sdks/go/pkg/beam/runners/prism/internal/coders.go @@ -212,6 +212,20 @@ func pullDecoderNoAlloc(c *pipepb.Coder, coders map[string]*pipepb.Coder) func(i l, _ := coder.DecodeVarInt(r) ioutilx.ReadN(r, int(l)) } + case urns.CoderNullable: + return func(r io.Reader) { + b, _ := ioutilx.ReadN(r, 1) + if len(b) == 0 { + return + } + // Nullable coder is prefixed with 0 or 1 to indicate whether there exists remaining data. + prefix := b[0] + if prefix == 0 { + return + } + l, _ := coder.DecodeVarInt(r) + ioutilx.ReadN(r, int(l)) + } case urns.CoderVarInt: return func(r io.Reader) { coder.DecodeVarInt(r)