Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

YQ RD pass UV from parser to filter #10721

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 45 additions & 42 deletions ydb/core/fq/libs/row_dispatcher/json_filter.cpp
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
#include <ydb/library/yql/providers/common/schema/parser/yql_type_parser.h>
#include <ydb/library/yql/public/udf/udf_version.h>
#include <ydb/library/yql/public/purecalc/purecalc.h>
#include <ydb/library/yql/public/purecalc/io_specs/mkql/spec.h>
Expand All @@ -10,7 +11,6 @@
#include <ydb/core/fq/libs/common/util.h>
#include <ydb/core/fq/libs/row_dispatcher/json_filter.h>


namespace {

using TCallback = NFq::TJsonFilter::TCallback;
Expand All @@ -23,6 +23,12 @@ NYT::TNode CreateTypeNode(const TString& fieldType) {
.Add(fieldType);
}

NYT::TNode CreateOptionalTypeNode(const TString& fieldType) {
return NYT::TNode::CreateList()
.Add("OptionalType")
.Add(CreateTypeNode(fieldType));
}

void AddField(NYT::TNode& node, const TString& fieldName, const TString& fieldType) {
node.Add(
NYT::TNode::CreateList()
Expand All @@ -31,18 +37,29 @@ void AddField(NYT::TNode& node, const TString& fieldName, const TString& fieldTy
);
}

void AddOptionalField(NYT::TNode& node, const TString& fieldName, const TString& fieldType) {
node.Add(NYT::TNode::CreateList()
.Add(fieldName)
.Add(NYT::TNode::CreateList().Add("OptionalType").Add(CreateTypeNode(fieldType)))
void AddTypedField(NYT::TNode& node, const TString& fieldName, const TString& fieldTypeYson) {
NYT::TNode parsedType;
Y_ENSURE(NYql::NCommon::ParseYson(parsedType, fieldTypeYson, Cerr), "Invalid field type");

// TODO: remove this when the re-parsing is removed from pq read actor
if (parsedType == CreateTypeNode("Json")) {
parsedType = CreateTypeNode("String");
} else if (parsedType == CreateOptionalTypeNode("Json")) {
parsedType = CreateOptionalTypeNode("String");
}

node.Add(
NYT::TNode::CreateList()
.Add(fieldName)
.Add(parsedType)
);
}

NYT::TNode MakeInputSchema(const TVector<TString>& columns) {
NYT::TNode MakeInputSchema(const TVector<TString>& columns, const TVector<TString>& types) {
auto structMembers = NYT::TNode::CreateList();
AddField(structMembers, OffsetFieldName, "Uint64");
for (const auto& col : columns) {
AddOptionalField(structMembers, col, "String");
for (size_t i = 0; i < columns.size(); ++i) {
AddTypedField(structMembers, columns[i], types[i]);
}
return NYT::TNode::CreateList().Add("StructType").Add(std::move(structMembers));
}
Expand All @@ -68,7 +85,7 @@ class TFilterInputSpec : public NYql::NPureCalc::TInputSpecBase {
TVector<NYT::TNode> Schemas;
};

class TFilterInputConsumer : public NYql::NPureCalc::IConsumer<std::pair<const TVector<ui64>&, const TVector<TVector<std::string_view>>&>> {
class TFilterInputConsumer : public NYql::NPureCalc::IConsumer<std::pair<const TVector<ui64>&, const TVector<const NKikimr::NMiniKQL::TUnboxedValueVector*>&>> {
public:
TFilterInputConsumer(
const TFilterInputSpec& spec,
Expand Down Expand Up @@ -106,15 +123,15 @@ class TFilterInputConsumer : public NYql::NPureCalc::IConsumer<std::pair<const T
}
}

void OnObject(std::pair<const TVector<ui64>&, const TVector<TVector<std::string_view>>&> values) override {
void OnObject(std::pair<const TVector<ui64>&, const TVector<const NKikimr::NMiniKQL::TUnboxedValueVector*>&> values) override {
Y_ENSURE(FieldsPositions.size() == values.second.size());

NKikimr::NMiniKQL::TThrowingBindTerminator bind;
with_lock (Worker->GetScopedAlloc()) {
auto& holderFactory = Worker->GetGraph().GetHolderFactory();

// TODO: use blocks here
for (size_t rowId = 0; rowId < values.second.front().size(); ++rowId) {
for (size_t rowId = 0; rowId < values.second.front()->size(); ++rowId) {
NYql::NUdf::TUnboxedValue* items = nullptr;

NYql::NUdf::TUnboxedValue result = Cache.NewArray(
Expand All @@ -126,13 +143,16 @@ class TFilterInputConsumer : public NYql::NPureCalc::IConsumer<std::pair<const T

size_t fieldId = 0;
for (const auto& column : values.second) {
items[FieldsPositions[fieldId++]] = column[rowId].data() // Check that std::string_view was initialized in json_parser
? NKikimr::NMiniKQL::MakeString(column[rowId]).MakeOptional()
: NKikimr::NUdf::TUnboxedValuePod();
items[FieldsPositions[fieldId++]] = column->at(rowId);
}

Worker->Push(std::move(result));
}

// Clear cache after each object because
// values allocated on another allocator and should be released
Cache.Clear();
Worker->GetGraph().Invalidate();
}
}

Expand Down Expand Up @@ -216,7 +236,7 @@ struct NYql::NPureCalc::TInputSpecTraits<TFilterInputSpec> {
static constexpr bool IsPartial = false;
static constexpr bool SupportPushStreamMode = true;

using TConsumerType = THolder<NYql::NPureCalc::IConsumer<std::pair<const TVector<ui64>&, const TVector<TVector<std::string_view>>&>>>;
using TConsumerType = THolder<NYql::NPureCalc::IConsumer<std::pair<const TVector<ui64>&, const TVector<const NKikimr::NMiniKQL::TUnboxedValueVector*>&>>>;

static TConsumerType MakeConsumer(
const TFilterInputSpec& spec,
Expand Down Expand Up @@ -244,12 +264,15 @@ class TJsonFilter::TImpl {
const TVector<TString>& types,
const TString& whereFilter,
TCallback callback)
: Sql(GenerateSql(columns, types, whereFilter)) {
: Sql(GenerateSql(whereFilter)) {
Y_ENSURE(columns.size() == types.size(), "Number of columns and types should by equal");
auto factory = NYql::NPureCalc::MakeProgramFactory(NYql::NPureCalc::TProgramFactoryOptions());

// Program should be stateless because input values
// allocated on another allocator and should be released
LOG_ROW_DISPATCHER_DEBUG("Creating program...");
Program = factory->MakePushStreamProgram(
TFilterInputSpec(MakeInputSchema(columns)),
TFilterInputSpec(MakeInputSchema(columns, types)),
TFilterOutputSpec(MakeOutputSchema()),
Sql,
NYql::NPureCalc::ETranslationMode::SQL
Expand All @@ -258,7 +281,7 @@ class TJsonFilter::TImpl {
LOG_ROW_DISPATCHER_DEBUG("Program created");
}

void Push(const TVector<ui64>& offsets, const TVector<TVector<std::string_view>>& values) {
void Push(const TVector<ui64>& offsets, const TVector<const NKikimr::NMiniKQL::TUnboxedValueVector*>& values) {
Y_ENSURE(values, "Expected non empty schema");
InputConsumer->OnObject(std::make_pair(offsets, values));
}
Expand All @@ -268,29 +291,9 @@ class TJsonFilter::TImpl {
}

private:
TString GenerateSql(const TVector<TString>& columnNames, const TVector<TString>& columnTypes, const TString& whereFilter) {
TString GenerateSql(const TString& whereFilter) {
TStringStream str;
str << "$fields = SELECT ";
Y_ABORT_UNLESS(columnNames.size() == columnTypes.size());
str << OffsetFieldName << ", ";
for (size_t i = 0; i < columnNames.size(); ++i) {
TString columnType = columnTypes[i];
TString columnName = NFq::EncloseAndEscapeString(columnNames[i], '`');
if (columnType == "Json") {
columnType = "String";
} else if (columnType == "Optional<Json>") {
columnType = "Optional<String>";
}

if (columnType.StartsWith("Optional")) {
str << "IF(" << columnName << " IS NOT NULL, Unwrap(CAST(" << columnName << " as " << columnType << ")), NULL)";
} else {
str << "Unwrap(CAST(" << columnName << " as " << columnType << "))";
}
str << " as " << columnName << ((i != columnNames.size() - 1) ? "," : "");
}
str << " FROM Input;\n";
str << "$filtered = SELECT * FROM $fields " << whereFilter << ";\n";
str << "$filtered = SELECT * FROM Input " << whereFilter << ";\n";

str << "SELECT " << OffsetFieldName << ", Unwrap(Json::SerializeJson(Yson::From(RemoveMembers(TableRow(), [\"" << OffsetFieldName;
str << "\"])))) as data FROM $filtered";
Expand All @@ -300,7 +303,7 @@ class TJsonFilter::TImpl {

private:
THolder<NYql::NPureCalc::TPushStreamProgram<TFilterInputSpec, TFilterOutputSpec>> Program;
THolder<NYql::NPureCalc::IConsumer<std::pair<const TVector<ui64>&, const TVector<TVector<std::string_view>>&>>> InputConsumer;
THolder<NYql::NPureCalc::IConsumer<std::pair<const TVector<ui64>&, const TVector<const NKikimr::NMiniKQL::TUnboxedValueVector*>&>>> InputConsumer;
const TString Sql;
};

Expand All @@ -315,7 +318,7 @@ TJsonFilter::TJsonFilter(
TJsonFilter::~TJsonFilter() {
}

void TJsonFilter::Push(const TVector<ui64>& offsets, const TVector<TVector<std::string_view>>& values) {
void TJsonFilter::Push(const TVector<ui64>& offsets, const TVector<const NKikimr::NMiniKQL::TUnboxedValueVector*>& values) {
Impl->Push(offsets, values);
}

Expand Down
5 changes: 2 additions & 3 deletions ydb/core/fq/libs/row_dispatcher/json_filter.h
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
#pragma once

#include <ydb/library/yql/public/udf/udf_data_type.h>
#include <ydb/library/yql/public/udf/udf_value.h>
#include <ydb/library/yql/minikql/computation/mkql_computation_node_holders.h>

namespace NFq {

Expand All @@ -18,7 +17,7 @@ class TJsonFilter {

~TJsonFilter();

void Push(const TVector<ui64>& offsets, const TVector<TVector<std::string_view>>& values);
void Push(const TVector<ui64>& offsets, const TVector<const NKikimr::NMiniKQL::TUnboxedValueVector*>& values);
TString GetSql();

private:
Expand Down
Loading
Loading