Skip to content

Commit

Permalink
Enable limit and offset expression of Python and HTTP API (#1905)
Browse files Browse the repository at this point in the history
### What problem does this PR solve?

1. Fix limit and offset in Python API and HTTP API
2. Add an example to demonstrate how to use limit and offset in Python
API.

Issue link:#1903 #1896

### Type of change

- [x] Bug Fix (non-breaking change which fixes an issue)

---------

Signed-off-by: Jin Hai <[email protected]>
  • Loading branch information
JinHai-CN authored Sep 24, 2024
1 parent c442e08 commit 0ba8617
Show file tree
Hide file tree
Showing 18 changed files with 441 additions and 191 deletions.
2 changes: 1 addition & 1 deletion benchmark/local_infinity/knn/knn_query_benchmark.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ int main(int argc, char *argv[]) {
auto select_rowid_expr = new FunctionExpr();
select_rowid_expr->func_name_ = "row_id";
output_columns->emplace_back(select_rowid_expr);
auto result = infinity->Search(db_name, table_name, search_expr, nullptr, output_columns);
auto result = infinity->Search(db_name, table_name, search_expr, nullptr, nullptr, nullptr, output_columns);
{
auto &cv = result.result_table_->GetDataBlockById(0)->column_vectors;
auto &column = *cv[0];
Expand Down
10 changes: 5 additions & 5 deletions example/hybrid_search.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@
This example is to connect local infinity instance, create table, insert data, search the data
"""

import infinity_embedded as infinity
#import infinity
# import infinity_embedded as infinity
import infinity
import sys

try:
Expand Down Expand Up @@ -101,12 +101,12 @@
.match_text("body", "blooms", 10)
.filter("year < 2024")
.fusion(
method="match_tensor", topn=2,
method="match_tensor", topn=3,
fusion_params={"field": "tensor", "data_type": "float",
"data": [[0.9, 0.0, 0.0, 0.0], [1.1, 0.0, 0.0, 0.0]]},
params={"filter": "year < 2024"}
"data": [[0.9, 0.0, 0.0, 0.0], [1.1, 0.0, 0.0, 0.0]]}
)
.to_pl()
# .explain(explain_type=infinity.table.ExplainType.UnOpt)
)

print(result)
Expand Down
73 changes: 73 additions & 0 deletions example/search_with_limit_offset.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
# Copyright(C) 2023 InfiniFlow, Inc. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

'''
This example is to connect local infinity instance, create table, insert data, search the data
'''

# import infinity_embedded as infinity
import infinity
import sys

try:
# Use infinity_embedded module to open a local directory
# infinity_instance = infinity.connect("/var/infinity")

# Use infinity module to connect a remote server
infinity_instance = infinity.connect(infinity.common.NetworkAddress("127.0.0.1", 23817))

# 'default_db' is the default database
db_instance = infinity_instance.get_database("default_db")

# Drop my_table if it already exists
db_instance.drop_table("my_table", infinity.common.ConflictType.Ignore)

# Create a table named "my_table"
table_instance = db_instance.create_table("my_table", {
"num": {"type": "integer"},
"body": {"type": "varchar"},
"vec": {"type": "vector, 4, float"},
})

# Insert 3 rows of data into the 'my_table'
table_instance.insert(
[
{
"num": 1,
"body": r"unnecessary and harmful",
"vec": [1.0, 1.2, 0.8, 0.9],
},
{
"num": 2,
"body": r"Office for Harmful Blooms",
"vec": [4.0, 4.2, 4.3, 4.5],
},
{
"num": 3,
"body": r"A Bloom filter is a space-efficient probabilistic data structure, conceived by Burton Howard Bloom in 1970, that is used to test whether an element is a member of a set.",
"vec": [4.0, 4.2, 4.3, 4.2],
},
]
)

result = table_instance.output(["num", "vec", "_similarity"]).match_dense("vec", [3.0, 2.8, 2.7, 3.1], "float",
"cosine", 3).limit(2).offset(1).to_pl()
print(result)
infinity_instance.disconnect()

print('test done')
sys.exit(0)
except Exception as e:
print(str(e))
sys.exit(-1)
172 changes: 96 additions & 76 deletions src/embedded_infinity/wrap_infinity.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import table_def;
import third_party;
import logger;
import query_options;
import defer_op;

namespace infinity {

Expand Down Expand Up @@ -1218,44 +1219,46 @@ WrapQueryResult WrapSearch(Infinity &instance,
WrapParsedExpr *limit_expr,
WrapParsedExpr *offset_expr) {
SearchExpr *search_expr = nullptr;
DeferFn defer_fn1([&]() {
if (search_expr != nullptr) {
delete search_expr;
search_expr = nullptr;
}
});
if (wrap_search_expr != nullptr) {
Status status;
search_expr = dynamic_cast<SearchExpr *>(wrap_search_expr->GetParsedExpr(status));
if (status.code_ != ErrorCode::kOk) {
if (search_expr != nullptr) {
delete search_expr;
search_expr = nullptr;
}
return WrapQueryResult(status.code_, status.msg_->c_str());
}
}
ParsedExpr *filter = nullptr;
DeferFn defer_fn2([&]() {
if (filter != nullptr) {
delete filter;
filter = nullptr;
}
});
if (where_expr != nullptr) {
Status status;
filter = where_expr->GetParsedExpr(status);
if (status.code_ != ErrorCode::kOk) {
if (filter != nullptr) {
delete filter;
filter = nullptr;
}
if (search_expr != nullptr) {
delete search_expr;
search_expr = nullptr;
}
return WrapQueryResult(status.code_, status.msg_->c_str());
}
}
Vector<ParsedExpr *> *output_columns = nullptr;

if (select_list.empty()) {
if (filter != nullptr) {
delete filter;
filter = nullptr;
}
if (search_expr != nullptr) {
delete search_expr;
search_expr = nullptr;
DeferFn defer_fn3([&]() {
if(output_columns != nullptr) {
SizeT output_column_len = output_columns->size();
for(SizeT i = 0; i < output_column_len; ++ i) {
if ((*output_columns)[i] != nullptr) {
delete (*output_columns)[i];
(*output_columns)[i] = nullptr;
}
}
}
});
if (select_list.empty()) {
return WrapQueryResult(ErrorCode::kEmptySelectFields, "[error] Select fields are empty");
} else {
output_columns = new Vector<ParsedExpr *>();
Expand All @@ -1264,30 +1267,47 @@ WrapQueryResult WrapSearch(Infinity &instance,
Status status;
output_columns->emplace_back(select_list[i].GetParsedExpr(status));
if (status.code_ != ErrorCode::kOk) {
if (output_columns != nullptr) {
for (SizeT j = 0; j <= i; ++j) {
if ((*output_columns)[j] != nullptr) {
delete (*output_columns)[j];
(*output_columns)[j] = nullptr;
}
}
delete output_columns;
output_columns = nullptr;
}
if (filter != nullptr) {
delete filter;
filter = nullptr;
}
if (search_expr != nullptr) {
delete search_expr;
search_expr = nullptr;
}
return WrapQueryResult(status.code_, status.msg_->c_str());
}
}
}

auto query_result = instance.Search(db_name, table_name, search_expr, filter, output_columns);
ParsedExpr *limit = nullptr;
DeferFn defer_fn4([&]() {
if (limit != nullptr) {
delete limit;
limit = nullptr;
}
});
if (limit_expr != nullptr) {
Status status;
limit = limit_expr->GetParsedExpr(status);
if (status.code_ != ErrorCode::kOk) {
return WrapQueryResult(status.code_, status.msg_->c_str());
}
}

ParsedExpr *offset = nullptr;
DeferFn defer_fn5([&]() {
if (offset != nullptr) {
delete offset;
offset = nullptr;
}
});
if (offset_expr != nullptr) {
Status status;
offset = offset_expr->GetParsedExpr(status);
if (status.code_ != ErrorCode::kOk) {
return WrapQueryResult(status.code_, status.msg_->c_str());
}
}

auto query_result = instance.Search(db_name, table_name, search_expr, filter, limit, offset, output_columns);
search_expr = nullptr;
filter = nullptr;
limit = nullptr;
offset = nullptr;
output_columns = nullptr;
if (!query_result.IsOk()) {
return WrapQueryResult(query_result.ErrorCode(), query_result.ErrorMsg());
}
Expand All @@ -1306,63 +1326,63 @@ WrapQueryResult WrapExplain(Infinity &instance,
WrapSearchExpr *wrap_search_expr,
WrapParsedExpr *wrap_filter) {
SearchExpr *search_expr = nullptr;
DeferFn defer_fn1([&]() {
if (search_expr != nullptr) {
delete search_expr;
search_expr = nullptr;
}
});
if (wrap_search_expr != nullptr) {
Status status;
search_expr = dynamic_cast<SearchExpr *>(wrap_search_expr->GetParsedExpr(status));
if (status.code_ != ErrorCode::kOk) {
if (search_expr != nullptr) {
delete search_expr;
search_expr = nullptr;
}
return WrapQueryResult(status.code_, status.msg_->c_str());
}
}
ParsedExpr *filter = nullptr;
DeferFn defer_fn2([&]() {
if (filter != nullptr) {
delete filter;
filter = nullptr;
}
});
if (wrap_filter != nullptr) {
Status status;
filter = wrap_filter->GetParsedExpr(status);
if (status.code_ != ErrorCode::kOk) {
if (filter != nullptr) {
delete filter;
filter = nullptr;
}
if (search_expr != nullptr) {
delete search_expr;
search_expr = nullptr;
}
return WrapQueryResult(status.code_, status.msg_->c_str());
}
}
Vector<ParsedExpr *> *output_columns = new Vector<ParsedExpr *>();
output_columns->reserve(wrap_output_columns.size());
for (SizeT i = 0; i < wrap_output_columns.size(); ++i) {
Status status;
output_columns->emplace_back(wrap_output_columns[i].GetParsedExpr(status));
if (status.code_ != ErrorCode::kOk) {
if (output_columns != nullptr) {
for (SizeT j = 0; j <= i; ++j) {
if ((*output_columns)[j] != nullptr) {
delete (*output_columns)[j];
(*output_columns)[j] = nullptr;
}
Vector<ParsedExpr *> *output_columns = nullptr;
DeferFn defer_fn3([&]() {
if(output_columns != nullptr) {
SizeT output_column_len = output_columns->size();
for(SizeT i = 0; i < output_column_len; ++ i) {
if ((*output_columns)[i] != nullptr) {
delete (*output_columns)[i];
(*output_columns)[i] = nullptr;
}
delete output_columns;
output_columns = nullptr;
}
if (filter != nullptr) {
delete filter;
filter = nullptr;
}
if (search_expr != nullptr) {
delete search_expr;
search_expr = nullptr;
}
});
if (wrap_output_columns.empty()) {
return WrapQueryResult(ErrorCode::kEmptySelectFields, "[error] Select fields are empty");
} else {
output_columns = new Vector<ParsedExpr *>();
output_columns->reserve(wrap_output_columns.size());
for (SizeT i = 0; i < wrap_output_columns.size(); ++i) {
Status status;
output_columns->emplace_back(wrap_output_columns[i].GetParsedExpr(status));
if (status.code_ != ErrorCode::kOk) {
return WrapQueryResult(status.code_, status.msg_->c_str());
}
return WrapQueryResult(status.code_, status.msg_->c_str());
}
}

auto query_result = instance.Explain(db_name, table_name, explain_type, search_expr, filter, output_columns);

auto query_result = instance.Explain(db_name, table_name, explain_type, search_expr, filter, nullptr, nullptr, output_columns);
search_expr = nullptr;
filter = nullptr;
output_columns = nullptr;
if (!query_result.IsOk()) {
return WrapQueryResult(query_result.ErrorCode(), query_result.ErrorMsg());
}
Expand Down
29 changes: 29 additions & 0 deletions src/executor/explain_physical_plan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2422,6 +2422,35 @@ void ExplainPhysicalPlan::Explain(const PhysicalMatch *match_node, SharedPtr<Vec
}
}

void ExplainPhysicalPlan::Explain(const PhysicalMatchSparseScan *match_sparse_node, SharedPtr<Vector<SharedPtr<String>>> &result, i64 intent_size) {
String explain_header_str;
if (intent_size != 0) {
explain_header_str = String(intent_size - 2, ' ') + "-> MatchSparseScan ";
} else {
explain_header_str = "MatchSparseScan ";
}
explain_header_str += "(" + std::to_string(match_sparse_node->node_id()) + ")";
result->emplace_back(MakeShared<String>(explain_header_str));

// Table index
String table_index = String(intent_size, ' ') + " - table index: #" + std::to_string(match_sparse_node->table_index());
result->emplace_back(MakeShared<String>(table_index));

// Output columns
String output_columns = String(intent_size, ' ') + " - output columns: [";
SizeT column_count = match_sparse_node->GetOutputNames()->size();
if (column_count == 0) {
String error_message = "No column in match sparse node.";
UnrecoverableError(error_message);
}
for (SizeT idx = 0; idx < column_count - 1; ++idx) {
output_columns += match_sparse_node->GetOutputNames()->at(idx) + ", ";
}
output_columns += match_sparse_node->GetOutputNames()->back();
output_columns += "]";
result->emplace_back(MakeShared<String>(output_columns));
}

void ExplainPhysicalPlan::Explain(const PhysicalMatchTensorScan *match_tensor_node, SharedPtr<Vector<SharedPtr<String>>> &result, i64 intent_size) {
String explain_header_str;
if (intent_size != 0) {
Expand Down
Loading

0 comments on commit 0ba8617

Please sign in to comment.