Skip to content

Commit

Permalink
Merge #65972
Browse files Browse the repository at this point in the history
65972: colbuilder: optimize casting in edge cases r=yuzefovich a=yuzefovich

**colbuilder: return custom error for LocalPlanNode core**

If a particular processor core is not supported, we return a general
error (in order to not incur any allocations when printing out the type
of the core). However, `LocalPlanNode` core is quite special (because we
don't have any plans on vectorizing it at the moment, if ever), so it
deserves a separate error object.

Release note: None

**colbuilder: optimize casting in edge cases**

In order to make sure that `NewColOperator` call produces the type
schema as expected, we are planning casts between the mismatched types.
If a particular cast is not supported natively by the vectorized engine,
we have to fallback to wrapping a row-execution noop processor.
Previously, we would create a separate processor for each such
mismatched column. However, this is quite inefficient if multiple
columns need to be casted this way (the processor itself isn't free but
also we need to plan another pair of materializer - columnarizer
wrappers).

This commit switches that casting planning to be a bit smarter - to
check explicitly whether all necessary casts are supported natively -
which allows us to plan a single row-execution processor to handle all
unsupported casts at once. Currently, if a row-execution processor is
needed, we will use it to handle all mismatched types, even the ones for
which we have native cast operation. The reasoning behind this change is
that this commit needs to be backportable (so it must be as bullet-proof
as possible) but also it might be actually a better option since all
vectorized casts will append more columns to go through the
materializer-columnarizer pair. It is left as a TODO to explore which
way is better in more detail.

Fixes: https://github.com/cockroachlabs/support/issues/1008.

Release note: None

Co-authored-by: Yahor Yuzefovich <[email protected]>
  • Loading branch information
craig[bot] and yuzefovich committed Jun 2, 2021
2 parents 0237afb + eb24f95 commit 7869b78
Show file tree
Hide file tree
Showing 3 changed files with 307 additions and 41 deletions.
110 changes: 69 additions & 41 deletions pkg/sql/colexec/colbuilder/execplan.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,13 +239,20 @@ func supportedNatively(spec *execinfrapb.ProcessorSpec) error {
}
return nil

case spec.Core.LocalPlanNode != nil:
// LocalPlanNode core is special (we don't have any plans on vectorizing
// it at the moment), so we want to return a custom error for it to
// distinguish from other unsupported cores.
return errLocalPlanNodeWrap

default:
return errCoreUnsupportedNatively
}
}

var (
errCoreUnsupportedNatively = errors.New("unsupported processor core")
errLocalPlanNodeWrap = errors.New("LocalPlanNode core needs to be wrapped")
errMetadataTestSenderWrap = errors.New("core.MetadataTestSender is not supported")
errMetadataTestReceiverWrap = errors.New("core.MetadataTestReceiver is not supported")
errChangeAggregatorWrap = errors.New("core.ChangeAggregator is not supported")
Expand All @@ -257,6 +264,7 @@ var (
errSamplerWrap = errors.New("core.Sampler is not supported (not an execinfra.RowSource)")
errSampleAggregatorWrap = errors.New("core.SampleAggregator is not supported (not an execinfra.RowSource)")
errExperimentalWrappingProhibited = errors.New("wrapping for non-JoinReader and non-LocalPlanNode cores is prohibited in vectorize=experimental_always")
errWrappedCast = errors.New("mismatched types in NewColOperator and unsupported casts")
)

func canWrap(mode sessiondatapb.VectorizeExecMode, spec *execinfrapb.ProcessorSpec) error {
Expand Down Expand Up @@ -1370,56 +1378,72 @@ func NewColOperator(

// Check that the actual output types are equal to the expected ones and
// plan casts if they are not.
//
// For each output column we check whether the actual and expected types are
// identical, and if not, whether we support a native vectorized cast
// between them. If for at least one column the native cast is not
// supported, we will plan a wrapped row-execution noop processor that will
// be responsible for casting all mismatched columns (and for performing the
// projection of the original no longer needed types).
// TODO(yuzefovich): consider whether planning some vectorized casts is
// worth it even if we need to plan a wrapped processor for some other
// columns.
if len(args.Spec.ResultTypes) != len(r.ColumnTypes) {
return r, errors.AssertionFailedf("unexpectedly different number of columns are output: expected %v, actual %v", args.Spec.ResultTypes, r.ColumnTypes)
}
// projection is lazily allocated when the first column that needs an
// explicit cast is found. It'll remain nil if projection isn't necessary.
var projection []uint32
numMismatchedTypes, needWrappedCast := 0, false
for i := range args.Spec.ResultTypes {
expected, actual := args.Spec.ResultTypes[i], r.ColumnTypes[i]
if !actual.Identical(expected) {
input := r.Root
castedIdx := len(r.ColumnTypes)
resultTypes := appendOneType(r.ColumnTypes, expected)
r.Root, err = colexecbase.GetCastOperator(
streamingAllocator, input, i, castedIdx, actual, expected,
)
if err != nil {
// We don't support a native vectorized cast between these
// types, so we will plan a noop row-execution processor to
// handle it with a post-processing spec that simply passes
// through all of the columns from the input and appends the
// result of the cast to the end of the schema.
post := &execinfrapb.PostProcessSpec{}
post.RenderExprs = make([]execinfrapb.Expression, castedIdx+1)
for j := 0; j < castedIdx; j++ {
post.RenderExprs[j].LocalExpr = tree.NewTypedOrdinalReference(j, r.ColumnTypes[j])
}
post.RenderExprs[castedIdx].LocalExpr = tree.NewTypedCastExpr(tree.NewTypedOrdinalReference(i, r.ColumnTypes[i]), expected)
result.Root = input
if err = result.wrapPostProcessSpec(ctx, flowCtx, args, post, resultTypes, factory, err); err != nil {
return r, err
}
numMismatchedTypes++
if !colexecbase.IsCastSupported(actual, expected) {
needWrappedCast = true
}
}
}

if needWrappedCast {
post := &execinfrapb.PostProcessSpec{
RenderExprs: make([]execinfrapb.Expression, len(args.Spec.ResultTypes)),
}
for i := range args.Spec.ResultTypes {
expected, actual := args.Spec.ResultTypes[i], r.ColumnTypes[i]
if !actual.Identical(expected) {
post.RenderExprs[i].LocalExpr = tree.NewTypedCastExpr(tree.NewTypedOrdinalReference(i, actual), expected)
} else {
post.RenderExprs[i].LocalExpr = tree.NewTypedOrdinalReference(i, args.Spec.ResultTypes[i])
}
r.ColumnTypes = resultTypes
if projection == nil {
// This is the first column that needs an explicit cast, so we
// need to actually allocate the slice and set all previous
// columns to be used as is.
projection = make([]uint32, len(args.Spec.ResultTypes))
for j := 0; j < i; j++ {
projection[j] = uint32(j)
}
if err = result.wrapPostProcessSpec(ctx, flowCtx, args, post, args.Spec.ResultTypes, factory, errWrappedCast); err != nil {
return r, err
}
} else if numMismatchedTypes > 0 {
// We will need to project out the original mismatched columns, so
// we're keeping track of the required projection.
projection := make([]uint32, len(args.Spec.ResultTypes))
typesWithCasts := make([]*types.T, len(args.Spec.ResultTypes), len(args.Spec.ResultTypes)+numMismatchedTypes)
// All original mismatched columns will be passed through by all of the
// vectorized cast operators.
copy(typesWithCasts, r.ColumnTypes)
for i := range args.Spec.ResultTypes {
expected, actual := args.Spec.ResultTypes[i], r.ColumnTypes[i]
if !actual.Identical(expected) {
castedIdx := len(typesWithCasts)
r.Root, err = colexecbase.GetCastOperator(
streamingAllocator, r.Root, i, castedIdx, actual, expected,
)
if err != nil {
return r, errors.AssertionFailedf("unexpectedly couldn't plan a cast although IsCastSupported returned true: %v", err)
}
projection[i] = uint32(castedIdx)
typesWithCasts = append(typesWithCasts, expected)
} else {
projection[i] = uint32(i)
}
projection[i] = uint32(castedIdx)
} else if projection != nil {
projection[i] = uint32(i)
}
r.Root, r.ColumnTypes = addProjection(r.Root, typesWithCasts, projection)
}
if projection != nil {
r.Root, r.ColumnTypes = addProjection(r.Root, r.ColumnTypes, projection)
}

takeOverMetaInfo(&result.OpWithMetaInfo, inputs)
if util.CrdbTestBuild {
// TODO(yuzefovich): remove the testing knob.
Expand Down Expand Up @@ -1496,10 +1520,14 @@ func (r opResult) wrapPostProcessSpec(
}
inputToMaterializer := colexecargs.OpWithMetaInfo{Root: r.Root}
takeOverMetaInfo(&inputToMaterializer, args.Inputs)
return r.createAndWrapRowSource(
if err := r.createAndWrapRowSource(
ctx, flowCtx, args, []colexecargs.OpWithMetaInfo{inputToMaterializer},
[][]*types.T{r.ColumnTypes}, noopSpec, factory, causeToWrap,
)
); err != nil {
return err
}
r.ColumnTypes = resultTypes
return nil
}

// planPostProcessSpec plans the post processing stage specified in post on top
Expand Down
209 changes: 209 additions & 0 deletions pkg/sql/colexec/colexecbase/cast.eg.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 7869b78

Please sign in to comment.