Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-50589][SQL] Avoid extra expression duplication when push filter #49202

Open
wants to merge 17 commits into
base: master
Choose a base branch
from

Conversation

zml1206
Copy link
Contributor

@zml1206 zml1206 commented Dec 16, 2024

What changes were proposed in this pull request?

Push predicates to Project/Aggregate will extra expression duplication, this will cause the execution plan to become larger, and may also cause performance regression if common expression is non-cheap.

This PR use With expression to rewrite condition which contains attribute that are not cheap and be consumed multiple times. Each predicate generates one or 0 With. Before rewrite With, extra expression duplication is reduced again through the new rule MergeWithExpression. In addition, optimized rewrite With rule, don't pre-evaluate common expressions which are only used once.

Why are the changes needed?

  1. Avoid nested expansion of filters expression causing execution plans to become too large.
  2. Avoid duplicate non-cheap expressions for better performance

Does this PR introduce any user-facing change?

No.

How was this patch tested?

UT.

Was this patch authored or co-authored using generative AI tooling?

No.

@zml1206 zml1206 marked this pull request as draft December 16, 2024 09:47
@github-actions github-actions bot added the SQL label Dec 16, 2024
@zml1206
Copy link
Contributor Author

zml1206 commented Dec 17, 2024

@cloud-fan Can you help first to see if the overall design is feasible? It involves a lot of UT, so I will modify it step by step.

@cloud-fan
Copy link
Contributor

I'm very confused. When pushing predicates through Project, the problem we hit is we need to rewrite the attributes in the predicate with the corresponding expression in the Project. If the attribute appears more than once in the predicate, and the expression from Project is expensive, filter pushdown may make the query slower.

The solution is to wrap the predicate with With expression, adding the expression from Project as the common expression, and replace attributes in predicate with the common expression reference. But this PR does some rewriting and I don't understand what it does.

@zml1206
Copy link
Contributor Author

zml1206 commented Dec 19, 2024

I'm very confused. When pushing predicates through Project, the problem we hit is we need to rewrite the attributes in the predicate with the corresponding expression in the Project. If the attribute appears more than once in the predicate, and the expression from Project is expensive, filter pushdown may make the query slower.

The solution is to wrap the predicate with With expression, adding the expression from Project as the common expression, and replace attributes in predicate with the common expression reference. But this PR does some rewriting and I don't understand what it does.

Filters support partial pushdown. If an common expression exists in two conditions, one condition is pushed to one level, and the other condition is pushed down to multiple levels. Therefore, after splitting by and, each filter need generates a With, then they need to share commonexprdef. Only the lowest commonexprdef will generate a common expression project. In addition, the child of the original common expression Alias ​​also needs to be rewrite with new common expression attribute.

@cloud-fan
Copy link
Contributor

Why do we care about common expressions in the original predicate? This is an optimization opportunity but what we should focus on is to avoid perf regression when pushing filter through project that duplicate expensive expressions.

Maybe we should discuss this with a concrete example.

@zml1206
Copy link
Contributor Author

zml1206 commented Dec 19, 2024

Why do we care about common expressions in the original predicate? This is an optimization opportunity but what we should focus on is to avoid perf regression when pushing filter through project that duplicate expensive expressions.

Maybe we should discuss this with a concrete example.

As the ut in pr, I hope that the common expression will only be calculated only once. _common_expr_0 finally used in two Filters and as e.

@zml1206
Copy link
Contributor Author

zml1206 commented Dec 19, 2024

Do you want to make it simpler, commonexprdef is not shared, and the original alias is ignored? This way the common expression will be evaluated multiple times, but less than before, especially in nested cases.

@cloud-fan
Copy link
Contributor

OK let's make the scope clear. We do not aim to find all duplicated expressions in the query plan tree and optimize them with With. We aim to avoid extra expression duplication that happens during pushing filter through project. So the change should be we update the filter pushdown rule and use With to avoid the aforementioned expression duplication.

@zml1206
Copy link
Contributor Author

zml1206 commented Dec 19, 2024

OK let's make the scope clear. We do not aim to find all duplicated expressions in the query plan tree and optimize them with With. We aim to avoid extra expression duplication that happens during pushing filter through project. So the change should be we update the filter pushdown rule and use With to avoid the aforementioned expression duplication.

OK, I'll do it as later, thank you.

@zml1206 zml1206 changed the title [SPARK-50589][SQL] Avoid produce duplicate non-cheap expressions when push down predicate [SPARK-50589][SQL] Avoid extra expression duplication when push filter Dec 19, 2024
* @param replaceMap Replaced attributes and common expressions
*/
def apply(expr: Expression, replaceMap: Map[Attribute, Expression]): With = {
val commonExprDefsMap = replaceMap.map(m => m._1 -> CommonExpressionDef(m._2))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we create a map if we never look up from it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Convenient to generate commonExprRefsMap.

@zml1206 zml1206 marked this pull request as ready for review December 20, 2024 08:35
@@ -65,6 +65,8 @@ class SparkOptimizer(
RewriteDistinctAggregates),
Batch("Pushdown Filters from PartitionPruning", fixedPoint,
PushDownPredicates),
Batch("Rewrite With expression", fixedPoint,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we just rewrite With once at the very end of the optimizer?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me try.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can't push With filter into scan, so rewrite with at the end of operatorOptimizationBatch.

@cloud-fan
Copy link
Contributor

cloud-fan commented Dec 23, 2024

how about the rule PushPredicateThroughJoin?

@zml1206
Copy link
Contributor Author

zml1206 commented Dec 23, 2024

how about the rule PushPredicateThroughJoin?

There is no replaceAlias ​​logic in PushPredicateThroughJoin.

@cloud-fan
Copy link
Contributor

ah ok. Shall we implement the better predicate split-and-combine in this PR to make it more useful? Otherwise we can't handle common cases like a + a > 5 AND a < 10.

@zml1206
Copy link
Contributor Author

zml1206 commented Dec 23, 2024

@cloud-fan It seems that the change in filters order causes the execution plan to mismatch. Is there any good solution?

@@ -107,6 +107,14 @@ trait PredicateHelper extends AliasHelper with Logging {
}
}

/**
* First split the predicates by And, then combine the predicates with the same references.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there is a misunderstanding. What I proposed is splitConjunctivePredicates should split AND/OR under With: With(c1, c2, c1 + c1 > 0 AND c2 + c2 > 0 AND c1 + c2 > 0) can be split into With(c1, c1 + c1 > 0), With(c2, c2 + c2 > 0), With(c1, c2, c1 + c2 > 0). When combine the predicates later, we also recognize the With and merge them to avoid duplicated expressions.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why split OR

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Merge With requires a certain cost. It seems that we only need to merge once before rewriting With.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

note: when splitting the predicates under With (by AND), we don't refresh the CTE ids, so when merging the With back, it should be pretty easy.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is unnecessary to continuously split and merge with. We only need to split by and to generate with, and finally merge with before rewriting with.

@@ -112,6 +112,22 @@ object With {
With(replaced(commonExprRefs), commonExprDefs)
}

/**
* Helper function to create a [[With]] statement when push down filter.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this is very specific to filter pushdown, shall we put this method in the filter pushdown rule?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, wait for CI to finish running.

@@ -1831,8 +1841,8 @@ object PushPredicateThroughNonJoin extends Rule[LogicalPlan] with PredicateHelpe
}

if (pushDown.nonEmpty) {
val pushDownPredicate = pushDown.reduce(And)
val replaced = replaceAlias(pushDownPredicate, aliasMap)
val replacedByWith = rewriteConditionByWith(pushDown.reduce(And), aliasMap)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rewriteConditionByWith will split the predicate anyway, why do we combine them with And here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Calculate the number of times each common expression is used through the entire condition.

val pushDownPredicate = pushDown.reduce(And)
val replaced = replaceAlias(pushDownPredicate, aliasMap)
val replacedByWith = rewriteConditionByWith(pushDown.reduce(And), aliasMap)
val replaced = replaceAlias(replacedByWith, aliasMap)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

doesn't rewriteConditionByWith already replace the aliases?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, rewriteConditionByWith only rewrite common attribute to common expression ref.

@@ -96,14 +96,17 @@ object RewriteWithExpression extends Rule[LogicalPlan] {
val defs = w.defs.map(rewriteWithExprAndInputPlans(_, inputPlans, isNestedWith = true))
val refToExpr = mutable.HashMap.empty[CommonExpressionId, Expression]
val childProjections = Array.fill(inputPlans.length)(mutable.ArrayBuffer.empty[Alias])
val refsCount = child.collect { case r: CommonExpressionRef => r}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should only collect references to the current With, not the nested With.

Suggested change
val refsCount = child.collect { case r: CommonExpressionRef => r}
val refsCount = child.collect { case r: CommonExpressionRef if defs.exists(_.id == r.id) => r}

val newDefs = left.defs.toBuffer
val replaceMap = mutable.HashMap.empty[CommonExpressionId, CommonExpressionRef]
right.defs.foreach {rDef =>
val index = left.defs.indexWhere(lDef => rDef.child.fastEquals(lDef.child))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

plan comparison can be expensive. Since it's a very special case for filter pushdown, shall we use the same CommonExpressionDef ids for different Withs in the split predicates?

fix

fix

fix

fix
@zml1206
Copy link
Contributor Author

zml1206 commented Dec 25, 2024

CI failure is not relevant.

if (!SQLConf.get.getConf(SQLConf.ALWAYS_INLINE_COMMON_EXPR)) {
val canRewriteConf = cond.filter(canRewriteByWith)
if (canRewriteConf.nonEmpty) {
val replaceWithMap = canRewriteConf.reduce(And)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can do canRewriteConf.flatMap(_.collect...)


// With does not support inline subquery
private def canRewriteByWith(expr: Expression): Boolean = {
!expr.containsPattern(PLAN_EXPRESSION)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this check too strong? We only require the common expression to not contain subqueries.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Both rewriting subqueries and pushing down predicates are in batch "operator optimization before inferring filters". Pushing down predicates may cause SubqueryExpression to contain common refs, and then rewriting subqueries cannot replace common refs.

.groupBy(identity)
.transform((_, v) => v.size)
.filter(m => aliasMap.contains(m._1) && m._2 > 1)
.map(m => m._1 -> trimAliases(aliasMap.getOrElse(m._1, m._1)))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Slo we've done the work of def replaceAlias here, why do we need to call replaceAlias?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here just get the map of attribute and common expression to generate With, without rewriting the condition.

val defs = mutable.HashSet.empty[CommonExpressionDef]
val replaced = expr.transform {
case a: Attribute if refsMap.contains(a) =>
defs.add(defsMap.get(a).get)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
defs.add(defsMap.get(a).get)
defs.add(defsMap(a))

val refsCount = child.collect {
case r: CommonExpressionRef
if defs.exists {
case d: CommonExpressionDef => d.id == r.id
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: we can create an id set to avoid repeated linear search.

Batch("Replace CTE with Repartition", Once, ReplaceCTERefWithRepartition)))
Batch("Replace CTE with Repartition", Once, ReplaceCTERefWithRepartition),
Batch("Merge With expression", fixedPoint, MergeWithExpression),
Batch("Rewrite With expression", fixedPoint,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how many places do we need to put this batch?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Before Extract Python UDFs, infer filters and last of each optimizer.

Input [4]: [w_warehouse_name#12, i_item_id#7, inv_before#19, inv_after#20]
Condition : (CASE WHEN (inv_before#19 > 0) THEN ((cast(inv_after#20 as double) / cast(inv_before#19 as double)) >= 0.666667) END AND CASE WHEN (inv_before#19 > 0) THEN ((cast(inv_after#20 as double) / cast(inv_before#19 as double)) <= 1.5) END)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah so this is a positive plan change that we avoid computing the CASE WHEN multiple times in the Filter.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants