Skip to content

Commit

Permalink
Don't lose 'any' flag after CBO. (#8674)
Browse files Browse the repository at this point in the history
  • Loading branch information
Tony-Romanov authored Sep 24, 2024
1 parent f3bb311 commit d2b896d
Show file tree
Hide file tree
Showing 16 changed files with 289 additions and 61 deletions.
Empty file.
2 changes: 1 addition & 1 deletion ydb/core/kqp/ut/join/kqp_join_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1892,7 +1892,7 @@ Y_UNIT_TEST_SUITE(KqpJoin) {
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
Cout << FormatResultSetYson(result.GetResultSet(0));
CompareYson(R"(
[[["02"];#;["02"];["03"];#;["03"];["1"];#;["1"]];[["02"];#;["02"];["03"];#;["04"];["1"];#;["1"]];[["02"];#;["02"];["05"];#;["05"];["2"];#;["2"]];[["02"];#;["02"];["05"];#;["06"];["2"];#;["2"]];[["02"];#;["02"];["06"];#;["05"];["2"];#;["2"]];[["02"];#;["02"];["06"];#;["06"];["2"];#;["2"]];[["03"];["03"];["03"];["08"];["02"];["07"];["1"];["1"];["1"]];[["03"];["03"];["03"];["09"];["03"];["08"];["2"];["2"];["2"]];[["09"];#;["09"];["20"];#;["09"];["1"];#;["1"]];[["09"];#;["09"];["21"];#;["10"];["2"];#;["2"]]]
[[["02"];#;["02"];["03"];#;["03"];["1"];#;["1"]];[["02"];#;["02"];["05"];#;["05"];["2"];#;["2"]];[["02"];#;["02"];["06"];#;["05"];["2"];#;["2"]];[["03"];["03"];["03"];["08"];["02"];["07"];["1"];["1"];["1"]];[["03"];["03"];["03"];["09"];["03"];["08"];["2"];["2"];["2"]];[["09"];#;["09"];["20"];#;["09"];["1"];#;["1"]];[["09"];#;["09"];["21"];#;["10"];["2"];#;["2"]]]
)", FormatResultSetYson(result.GetResultSet(0)));
}
}
Expand Down
45 changes: 31 additions & 14 deletions ydb/library/yql/core/cbo/cbo_optimizer_new.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -74,20 +74,30 @@ void TRelOptimizerNode::Print(std::stringstream& stream, int ntabs) {
stream << *Stats << "\n";
}

TJoinOptimizerNode::TJoinOptimizerNode(const std::shared_ptr<IBaseOptimizerNode>& left, const std::shared_ptr<IBaseOptimizerNode>& right,
const std::set<std::pair<TJoinColumn, TJoinColumn>>& joinConditions, const EJoinKind joinType, const EJoinAlgoType joinAlgo, bool nonReorderable) :
IBaseOptimizerNode(JoinNodeType),
LeftArg(left),
RightArg(right),
JoinConditions(joinConditions),
JoinType(joinType),
JoinAlgo(joinAlgo) {
IsReorderable = !nonReorderable;
for (auto [l,r] : joinConditions ) {
LeftJoinKeys.push_back(l.AttributeName);
RightJoinKeys.push_back(r.AttributeName);
}
TJoinOptimizerNode::TJoinOptimizerNode(
const std::shared_ptr<IBaseOptimizerNode>& left,
const std::shared_ptr<IBaseOptimizerNode>& right,
const std::set<std::pair<TJoinColumn, TJoinColumn>>& joinConditions,
const EJoinKind joinType,
const EJoinAlgoType joinAlgo,
bool leftAny,
bool rightAny,
bool nonReorderable
) : IBaseOptimizerNode(JoinNodeType)
, LeftArg(left)
, RightArg(right)
, JoinConditions(joinConditions)
, JoinType(joinType)
, JoinAlgo(joinAlgo)
, LeftAny(leftAny)
, RightAny(rightAny)
, IsReorderable(!nonReorderable)
{
for (const auto& [l,r] : joinConditions ) {
LeftJoinKeys.push_back(l.AttributeName);
RightJoinKeys.push_back(r.AttributeName);
}
}

TVector<TString> TJoinOptimizerNode::Labels() {
auto res = LeftArg->Labels();
Expand All @@ -101,7 +111,14 @@ void TJoinOptimizerNode::Print(std::stringstream& stream, int ntabs) {
stream << " ";
}

stream << "Join: (" << ToString(JoinType) << "," << ToString(JoinAlgo) << ") ";
stream << "Join: (" << ToString(JoinType) << "," << ToString(JoinAlgo);
if (LeftAny) {
stream << ",LeftAny";
}
if (RightAny) {
stream << ",RightAny";
}
stream << ") ";

for (auto c : JoinConditions){
stream << c.first.RelName << "." << c.first.AttributeName
Expand Down
9 changes: 8 additions & 1 deletion ydb/library/yql/core/cbo/cbo_optimizer_new.h
Original file line number Diff line number Diff line change
Expand Up @@ -295,14 +295,21 @@ struct TJoinOptimizerNode : public IBaseOptimizerNode {
TVector<TString> RightJoinKeys;
EJoinKind JoinType;
EJoinAlgoType JoinAlgo;
/////////////////// 'ANY' flag means leaving only one row from the join side.
bool LeftAny;
bool RightAny;
///////////////////
bool IsReorderable;

TJoinOptimizerNode(const std::shared_ptr<IBaseOptimizerNode>& left,
const std::shared_ptr<IBaseOptimizerNode>& right,
const std::set<std::pair<NDq::TJoinColumn, NDq::TJoinColumn>>& joinConditions,
const EJoinKind joinType,
const EJoinAlgoType joinAlgo,
bool nonReorderable=false);
bool leftAny,
bool rightAny,
bool nonReorderable = false
);
virtual ~TJoinOptimizerNode() {}
virtual TVector<TString> Labels();
virtual void Print(std::stringstream& stream, int ntabs=0);
Expand Down
24 changes: 16 additions & 8 deletions ydb/library/yql/dq/opt/dq_cbo_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,13 +55,16 @@ Y_UNIT_TEST(JoinSearch2Rels) {
std::static_pointer_cast<IBaseOptimizerNode>(rel2),
joinConditions,
InnerJoin,
EJoinAlgoType::GraceJoin
EJoinAlgoType::GraceJoin,
true,
false
);

auto res = optimizer->JoinSearch(op);
std::stringstream ss;
res->Print(ss);
TString expected = R"__(Join: (InnerJoin,MapJoin) b.1=a.1,
Cout << ss.str() << '\n';
TString expected = R"__(Join: (InnerJoin,MapJoin,RightAny) b.1=a.1,
Type: ManyManyJoin, Nrows: 2e+10, Ncols: 2, ByteSize: 0, Cost: 2.00112e+10, Sel: 1, Storage: NA
Rel: b
Type: BaseTable, Nrows: 1e+06, Ncols: 1, ByteSize: 0, Cost: 9.00001e+06, Sel: 1, Storage: NA
Expand Down Expand Up @@ -93,8 +96,10 @@ Y_UNIT_TEST(JoinSearch3Rels) {
std::static_pointer_cast<IBaseOptimizerNode>(rel2),
joinConditions,
InnerJoin,
EJoinAlgoType::GraceJoin
);
EJoinAlgoType::GraceJoin,
false,
false
);

joinConditions.insert({
NDq::TJoinColumn("a", "1"),
Expand All @@ -106,14 +111,17 @@ Y_UNIT_TEST(JoinSearch3Rels) {
std::static_pointer_cast<IBaseOptimizerNode>(rel3),
joinConditions,
InnerJoin,
EJoinAlgoType::GraceJoin
);
EJoinAlgoType::GraceJoin,
true,
false
);

auto res = optimizer->JoinSearch(op2);
std::stringstream ss;
res->Print(ss);
Cout << ss.str() << '\n';

TString expected = R"__(Join: (InnerJoin,MapJoin) a.1=b.1,a.1=c.1,
TString expected = R"__(Join: (InnerJoin,MapJoin,LeftAny) a.1=b.1,a.1=c.1,
Type: ManyManyJoin, Nrows: 4e+13, Ncols: 3, ByteSize: 0, Cost: 4.004e+13, Sel: 1, Storage: NA
Join: (InnerJoin,MapJoin) b.1=a.1,
Type: ManyManyJoin, Nrows: 2e+10, Ncols: 2, ByteSize: 0, Cost: 2.00112e+10, Sel: 1, Storage: NA
Expand Down Expand Up @@ -223,7 +231,7 @@ void _DqOptimizeEquiJoinWithCosts(const std::function<IOptimizerNew*()>& optFact
UNIT_ASSERT(equiJoin.Maybe<TCoEquiJoin>());
auto resStr = NCommon::ExprToPrettyString(ctx, *res.Ptr());
auto expected = R"__((
(let $1 '('"Inner" '"orders" '"customer" '('"orders" '"a") '('"customer" '"b") '('('"join_algo" '"MapJoin"))))
(let $1 '('"Inner" '"orders" '"customer" '('"orders" '"a") '('"customer" '"b") '('('join_algo 'MapJoin))))
(return (EquiJoin '('() '"orders") '('() '"customer") $1 '()))
)
)__";
Expand Down
24 changes: 19 additions & 5 deletions ydb/library/yql/dq/opt/dq_opt_conflict_rules_collector.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,14 +57,14 @@ class TConflictRulesCollector {
private:
auto GetLeftConflictsVisitor() {
auto visitor = [this](const std::shared_ptr<TJoinOptimizerNode>& child) {
if (!OperatorsAreAssociative(child->JoinType, Root_->JoinType) || !Root_->IsReorderable || !child->IsReorderable) {
if (!OperatorsAreAssociative(child->JoinType, Root_->JoinType)) {
ConflictRules_.emplace_back(
SubtreeNodes_[child->RightArg],
SubtreeNodes_[child->LeftArg]
);
}

if (!OperatorsAreLeftAsscom(child->JoinType, Root_->JoinType) || !Root_->IsReorderable || !child->IsReorderable) {
if (!OperatorsAreLeftAsscom(child->JoinType, Root_->JoinType)) {
ConflictRules_.emplace_back(
SubtreeNodes_[child->LeftArg],
SubtreeNodes_[child->RightArg]
Expand All @@ -77,18 +77,18 @@ class TConflictRulesCollector {

auto GetRightConflictsVisitor() {
auto visitor = [this](const std::shared_ptr<TJoinOptimizerNode>& child) {
if (!OperatorsAreAssociative(Root_->JoinType, child->JoinType) || !Root_->IsReorderable || !child->IsReorderable) {
if (!OperatorsAreAssociative(Root_->JoinType, child->JoinType)) {
ConflictRules_.emplace_back(
SubtreeNodes_[child->LeftArg],
SubtreeNodes_[child->RightArg]
);
}

if (!OperatorsAreRightAsscom(Root_->JoinType, child->JoinType) || !Root_->IsReorderable || !child->IsReorderable) {
if (!OperatorsAreRightAsscom(Root_->JoinType, child->JoinType)) {
ConflictRules_.emplace_back(
SubtreeNodes_[child->RightArg],
SubtreeNodes_[child->LeftArg]
);
);
}
};

Expand All @@ -106,6 +106,20 @@ class TConflictRulesCollector {
VisitJoinTree(childJoinNode->LeftArg, visitor);
VisitJoinTree(childJoinNode->RightArg, visitor);

if (childJoinNode->LeftAny || !childJoinNode->IsReorderable) {
ConflictRules_.emplace_back(
SubtreeNodes_[childJoinNode->LeftArg],
SubtreeNodes_[childJoinNode->RightArg]
);
}

if (childJoinNode->RightAny || !childJoinNode->IsReorderable) {
ConflictRules_.emplace_back(
SubtreeNodes_[childJoinNode->RightArg],
SubtreeNodes_[childJoinNode->LeftArg]
);
}

visitor(childJoinNode);
}

Expand Down
14 changes: 10 additions & 4 deletions ydb/library/yql/dq/opt/dq_opt_dphyp_solver.h
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ class TDPHypSolver {
const std::shared_ptr<IBaseOptimizerNode>& left,
const std::shared_ptr<IBaseOptimizerNode>& right,
EJoinKind joinKind,
bool leftAny,
bool rightAny,
bool isCommutative,
const std::set<std::pair<TJoinColumn, TJoinColumn>>& joinConditions,
const std::set<std::pair<TJoinColumn, TJoinColumn>>& reversedJoinConditions,
Expand Down Expand Up @@ -409,6 +411,8 @@ template <typename TNodeSet> std::shared_ptr<TJoinOptimizerNodeInternal> TDPHypS
const std::shared_ptr<IBaseOptimizerNode>& left,
const std::shared_ptr<IBaseOptimizerNode>& right,
EJoinKind joinKind,
bool leftAny,
bool rightAny,
bool isCommutative,
const std::set<std::pair<TJoinColumn, TJoinColumn>>& joinConditions,
const std::set<std::pair<TJoinColumn, TJoinColumn>>& reversedJoinConditions,
Expand All @@ -419,7 +423,7 @@ template <typename TNodeSet> std::shared_ptr<TJoinOptimizerNodeInternal> TDPHypS
TJoinAlgoHints::TJoinAlgoHint* maybeJoinHint
) {
double bestCost = std::numeric_limits<double>::infinity();
EJoinAlgoType bestAlgo{};
EJoinAlgoType bestAlgo = EJoinAlgoType::Undefined;
bool bestJoinIsReversed = false;

for (auto joinAlgo : AllJoinAlgos) {
Expand Down Expand Up @@ -452,13 +456,13 @@ template <typename TNodeSet> std::shared_ptr<TJoinOptimizerNodeInternal> TDPHypS
}
}

Y_ENSURE(bestCost != std::numeric_limits<double>::infinity(), "No join was chosen!");
Y_ENSURE(bestAlgo != EJoinAlgoType::Undefined, "No join was chosen!");

if (bestJoinIsReversed) {
return MakeJoinInternal(right, left, reversedJoinConditions, rightJoinKeys, leftJoinKeys, joinKind, bestAlgo, ctx, maybeCardHint);
return MakeJoinInternal(right, left, reversedJoinConditions, rightJoinKeys, leftJoinKeys, joinKind, bestAlgo, rightAny, leftAny, ctx, maybeCardHint);
}

return MakeJoinInternal(left, right, joinConditions, leftJoinKeys, rightJoinKeys, joinKind, bestAlgo, ctx, maybeCardHint);
return MakeJoinInternal(left, right, joinConditions, leftJoinKeys, rightJoinKeys, joinKind, bestAlgo, leftAny, rightAny, ctx, maybeCardHint);
}

/*
Expand Down Expand Up @@ -489,6 +493,8 @@ template<typename TNodeSet> void TDPHypSolver<TNodeSet>::EmitCsgCmp(const TNodeS
leftNodes,
rightNodes,
csgCmpEdge->JoinKind,
csgCmpEdge->LeftAny,
csgCmpEdge->RightAny,
csgCmpEdge->IsCommutative,
csgCmpEdge->JoinConditions,
reversedEdge->JoinConditions,
Expand Down
Loading

0 comments on commit d2b896d

Please sign in to comment.