Skip to content

Commit

Permalink
feat: support json index
Browse files Browse the repository at this point in the history
This PR adds json index support for json and dynamic fields. Now you can only do unary query like 'a["b"] > 1' using this index. We will support more filter type later.

basic usage:
```
collection.create_index("json_field", {"index_type": "INVERTED",
    "params": {"json_cast_type": DataType.STRING, "json_path":
'json_field["a"]["b"]'}})
```

There are some limits to use this index:
1. If a record does not have the json path you specify, it will be ignored and there will not throw an error.
2. If a value of the json path fails to be cast to the type you specify,  it will be ignored and there will not throw an error.
3. A specific json field can have only one json index.
4. If you try to create more than one json indexes for one json field, sdk(pymilvus<=2.4.7) may return immediately because of internal implementation. This will be fixed soon.

Signed-off-by: sunby <[email protected]>
  • Loading branch information
sunby committed Oct 10, 2024
1 parent 290ceb4 commit 0f2e61d
Show file tree
Hide file tree
Showing 40 changed files with 779 additions and 105 deletions.
14 changes: 14 additions & 0 deletions internal/core/src/common/FieldDataInterface.h
Original file line number Diff line number Diff line change
Expand Up @@ -652,6 +652,20 @@ class FieldDataJsonImpl : public FieldDataImpl<Json, true> {
}
length_ += n;
}

// only for test
void
add_json_data(const std::vector<Json>& json) {
std::lock_guard lck(tell_mutex_);
if (length_ + json.size() > get_num_rows()) {
resize_field_data(length_ + json.size());
}

for (size_t i = 0; i < json.size(); ++i) {
data_[length_ + i] = json[i];
}
length_ += json.size();
}
};

class FieldDataSparseVectorImpl
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -440,6 +440,7 @@ class PhyBinaryArithOpEvalRangeExpr : public SegmentExpr {
name,
segment,
expr->column_.field_id_,
expr->column_.nested_path_,
active_count,
batch_size),
expr_(expr) {
Expand Down
1 change: 1 addition & 0 deletions internal/core/src/exec/expression/BinaryRangeExpr.h
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@ class PhyBinaryRangeFilterExpr : public SegmentExpr {
name,
segment,
expr->column_.field_id_,
expr->column_.nested_path_,
active_count,
batch_size),
expr_(expr) {
Expand Down
1 change: 1 addition & 0 deletions internal/core/src/exec/expression/ExistsExpr.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ class PhyExistsFilterExpr : public SegmentExpr {
name,
segment,
expr->column_.field_id_,
expr->column_.nested_path_,
active_count,
batch_size),
expr_(expr) {
Expand Down
45 changes: 38 additions & 7 deletions internal/core/src/exec/expression/Expr.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,15 @@
#include <memory>
#include <string>

#include "common/FieldDataInterface.h"
#include "common/Json.h"
#include "common/Types.h"
#include "exec/expression/EvalCtx.h"
#include "exec/expression/VectorFunction.h"
#include "exec/expression/Utils.h"
#include "exec/QueryContext.h"
#include "expr/ITypeExpr.h"
#include "log/Log.h"
#include "query/PlanProto.h"

namespace milvus {
Expand Down Expand Up @@ -88,12 +91,15 @@ class SegmentExpr : public Expr {
SegmentExpr(const std::vector<ExprPtr>&& input,
const std::string& name,
const segcore::SegmentInternalInterface* segment,
const FieldId& field_id,
const FieldId field_id,
const std::vector<std::string> nested_path,
int64_t active_count,
int64_t batch_size)
: Expr(DataType::BOOL, std::move(input), name),
segment_(segment),
field_id_(field_id),
nested_path_(nested_path),

active_count_(active_count),
batch_size_(batch_size) {
size_per_chunk_ = segment_->size_per_chunk();
Expand All @@ -108,6 +114,7 @@ class SegmentExpr : public Expr {
InitSegmentExpr() {
auto& schema = segment_->get_schema();
auto& field_meta = schema[field_id_];
field_type_ = field_meta.get_data_type();

if (schema.get_primary_field_id().has_value() &&
schema.get_primary_field_id().value() == field_id_ &&
Expand All @@ -116,9 +123,18 @@ class SegmentExpr : public Expr {
pk_type_ = field_meta.get_data_type();
}

is_index_mode_ = segment_->HasIndex(field_id_);
if (is_index_mode_) {
num_index_chunk_ = segment_->num_chunk_index(field_id_);
if (field_meta.get_data_type() == DataType::JSON) {
auto pointer = milvus::Json::pointer(nested_path_);
if (segment_->HasIndex(pointer)) {
// FIXME: sunby
is_index_mode_ = true;
num_index_chunk_ = 1;
}
} else {
is_index_mode_ = segment_->HasIndex(field_id_);
if (is_index_mode_) {
num_index_chunk_ = segment_->num_chunk_index(field_id_);
}
}
// if index not include raw data, also need load data
if (segment_->HasFieldData(field_id_)) {
Expand Down Expand Up @@ -300,9 +316,21 @@ class SegmentExpr : public Expr {
// It avoids indexing execute for evevy batch because indexing
// executing costs quite much time.
if (cached_index_chunk_id_ != i) {
const Index& index =
segment_->chunk_scalar_index<IndexInnerType>(field_id_, i);
auto* index_ptr = const_cast<Index*>(&index);
Index* index_ptr = nullptr;

if (field_type_ == DataType::JSON) {
auto pointer = milvus::Json::pointer(nested_path_);

const Index& index =
segment_->chunk_scalar_index<IndexInnerType>(pointer,
i);
index_ptr = const_cast<Index*>(&index);
} else {
const Index& index =
segment_->chunk_scalar_index<IndexInnerType>(field_id_,
i);
index_ptr = const_cast<Index*>(&index);
}
cached_index_chunk_res_ = std::move(func(index_ptr, values...));
cached_index_chunk_id_ = i;
}
Expand Down Expand Up @@ -427,6 +455,9 @@ class SegmentExpr : public Expr {
DataType pk_type_;
int64_t batch_size_;

std::vector<std::string> nested_path_;
DataType field_type_;

bool is_index_mode_{false};
bool is_data_mode_{false};
// sometimes need to skip index and using raw data
Expand Down
1 change: 1 addition & 0 deletions internal/core/src/exec/expression/JsonContainsExpr.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ class PhyJsonContainsFilterExpr : public SegmentExpr {
name,
segment,
expr->column_.field_id_,
expr->column_.nested_path_,
active_count,
batch_size),
expr_(expr) {
Expand Down
1 change: 1 addition & 0 deletions internal/core/src/exec/expression/TermExpr.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ class PhyTermFilterExpr : public SegmentExpr {
name,
segment,
expr->column_.field_id_,
expr->column_.nested_path_,
active_count,
batch_size),
expr_(expr),
Expand Down
74 changes: 54 additions & 20 deletions internal/core/src/exec/expression/UnaryExpr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@

#include "UnaryExpr.h"
#include "common/Json.h"
#include "common/Types.h"
#include "common/type_c.h"
#include "log/Log.h"

namespace milvus {
namespace exec {
Expand Down Expand Up @@ -187,25 +190,49 @@ PhyUnaryRangeFilterExpr::Eval(EvalCtx& context, VectorPtr& result) {
}
case DataType::JSON: {
auto val_type = expr_->val_.val_case();
switch (val_type) {
case proto::plan::GenericValue::ValCase::kBoolVal:
result = ExecRangeVisitorImplJson<bool>();
break;
case proto::plan::GenericValue::ValCase::kInt64Val:
result = ExecRangeVisitorImplJson<int64_t>();
break;
case proto::plan::GenericValue::ValCase::kFloatVal:
result = ExecRangeVisitorImplJson<double>();
break;
case proto::plan::GenericValue::ValCase::kStringVal:
result = ExecRangeVisitorImplJson<std::string>();
break;
case proto::plan::GenericValue::ValCase::kArrayVal:
result = ExecRangeVisitorImplJson<proto::plan::Array>();
break;
default:
PanicInfo(
DataTypeInvalid, "unknown data type: {}", val_type);
if (CanUseIndexForJson()) {
switch (val_type) {
case proto::plan::GenericValue::ValCase::kBoolVal:
result = ExecRangeVisitorImplForIndex<bool>();
break;
case proto::plan::GenericValue::ValCase::kInt64Val:
result = ExecRangeVisitorImplForIndex<int64_t>();
break;
case proto::plan::GenericValue::ValCase::kFloatVal:
result = ExecRangeVisitorImplForIndex<double>();
break;
case proto::plan::GenericValue::ValCase::kStringVal:
result = ExecRangeVisitorImplForIndex<std::string>();
break;
case proto::plan::GenericValue::ValCase::kArrayVal:
result =
ExecRangeVisitorImplForIndex<proto::plan::Array>();
break;
default:
PanicInfo(
DataTypeInvalid, "unknown data type: {}", val_type);
}
} else {
switch (val_type) {
case proto::plan::GenericValue::ValCase::kBoolVal:
result = ExecRangeVisitorImplJson<bool>();
break;
case proto::plan::GenericValue::ValCase::kInt64Val:
result = ExecRangeVisitorImplJson<int64_t>();
break;
case proto::plan::GenericValue::ValCase::kFloatVal:
result = ExecRangeVisitorImplJson<double>();
break;
case proto::plan::GenericValue::ValCase::kStringVal:
result = ExecRangeVisitorImplJson<std::string>();
break;
case proto::plan::GenericValue::ValCase::kArrayVal:
result = ExecRangeVisitorImplJson<proto::plan::Array>();
break;
default:
PanicInfo(
DataTypeInvalid, "unknown data type: {}", val_type);
}
}
break;
}
Expand Down Expand Up @@ -364,7 +391,7 @@ PhyUnaryRangeFilterExpr::ExecArrayEqualForIndex(bool reverse) {

// filtering by index, get candidates.
auto size_per_chunk = segment_->size_per_chunk();
auto retrieve = [ size_per_chunk, this ](int64_t offset) -> auto {
auto retrieve = [size_per_chunk, this](int64_t offset) -> auto {
auto chunk_idx = offset / size_per_chunk;
auto chunk_offset = offset % size_per_chunk;
const auto& chunk =
Expand Down Expand Up @@ -861,6 +888,13 @@ PhyUnaryRangeFilterExpr::CanUseIndex() {
return res;
}

bool
PhyUnaryRangeFilterExpr::CanUseIndexForJson() {
use_index_ =
segment_->HasIndex(milvus::Json::pointer(expr_->column_.nested_path_));
return use_index_;
}

VectorPtr
PhyUnaryRangeFilterExpr::ExecTextMatch() {
using Index = index::TextMatchIndex;
Expand Down
4 changes: 4 additions & 0 deletions internal/core/src/exec/expression/UnaryExpr.h
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,7 @@ class PhyUnaryRangeFilterExpr : public SegmentExpr {
name,
segment,
expr->column_.field_id_,
expr->column_.nested_path_,
active_count,
batch_size),
expr_(expr) {
Expand Down Expand Up @@ -331,6 +332,9 @@ class PhyUnaryRangeFilterExpr : public SegmentExpr {
bool
CanUseIndexForArray();

bool
CanUseIndexForJson();

VectorPtr
ExecTextMatch();

Expand Down
55 changes: 53 additions & 2 deletions internal/core/src/index/IndexFactory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,15 @@
// limitations under the License.

#include "index/IndexFactory.h"
#include <cstdlib>
#include <memory>
#include "common/EasyAssert.h"
#include "common/FieldDataInterface.h"
#include "common/Types.h"
#include "index/VectorMemIndex.h"
#include "index/Utils.h"
#include "index/Meta.h"
#include "indexbuilder/JsonInvertedIndexCreator.h"
#include "knowhere/utils.h"

#include "index/VectorDiskIndex.h"
Expand All @@ -29,6 +33,8 @@
#include "index/InvertedIndexTantivy.h"
#include "index/HybridScalarIndex.h"
#include "knowhere/comp/knowhere_check.h"
#include "log/Log.h"
#include "pb/schema.pb.h"

namespace milvus::index {

Expand Down Expand Up @@ -359,6 +365,49 @@ IndexFactory::CreateComplexScalarIndex(
PanicInfo(Unsupported, "Complex index not supported now");
}

IndexBasePtr
IndexFactory::CreateJsonIndex(
IndexType index_type,
DataType cast_dtype,
const std::string& nested_path,
const storage::FileManagerContext& file_manager_context) {
AssertInfo(index_type == INVERTED_INDEX_TYPE,
"Invalid index type for json index");
switch (cast_dtype) {
case DataType::BOOL:
return std::make_unique<
indexbuilder::JsonInvertedIndexCreator<bool>>(
proto::schema::DataType::Bool,
nested_path,
file_manager_context);
case milvus::DataType::INT8:
case milvus::DataType::INT16:
case milvus::DataType::INT32:
case DataType::INT64:
return std::make_unique<
indexbuilder::JsonInvertedIndexCreator<int64_t>>(
proto::schema::DataType::Int64,
nested_path,
file_manager_context);
case DataType::FLOAT:
case DataType::DOUBLE:
return std::make_unique<
indexbuilder::JsonInvertedIndexCreator<double>>(
proto::schema::DataType::Double,
nested_path,
file_manager_context);
case DataType::STRING:
case DataType::VARCHAR:
return std::make_unique<
indexbuilder::JsonInvertedIndexCreator<std::string>>(
proto::schema::DataType::String,
nested_path,
file_manager_context);
default:
PanicInfo(DataTypeInvalid, "Invalid data type:{}", cast_dtype);
}
}

IndexBasePtr
IndexFactory::CreateScalarIndex(
const CreateIndexInfo& create_index_info,
Expand All @@ -381,8 +430,10 @@ IndexFactory::CreateScalarIndex(
file_manager_context);
}
case DataType::JSON: {
return CreateComplexScalarIndex(create_index_info.index_type,
file_manager_context);
return CreateJsonIndex(create_index_info.index_type,
create_index_info.json_cast_type,
create_index_info.json_path,
file_manager_context);
}
default:
PanicInfo(DataTypeInvalid, "Invalid data type:{}", data_type);
Expand Down
8 changes: 8 additions & 0 deletions internal/core/src/index/IndexFactory.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include <mutex>
#include <shared_mutex>

#include "common/Types.h"
#include "common/type_c.h"
#include "index/Index.h"
#include "index/ScalarIndex.h"
Expand Down Expand Up @@ -103,6 +104,13 @@ class IndexFactory {
const storage::FileManagerContext& file_manager_context =
storage::FileManagerContext());

IndexBasePtr
CreateJsonIndex(IndexType index_type,
DataType cast_dtype,
const std::string& nested_path,
const storage::FileManagerContext& file_manager_context =
storage::FileManagerContext());

IndexBasePtr
CreateScalarIndex(const CreateIndexInfo& create_index_info,
const storage::FileManagerContext& file_manager_context =
Expand Down
2 changes: 2 additions & 0 deletions internal/core/src/index/IndexInfo.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ struct CreateIndexInfo {
IndexVersion index_engine_version;
std::string field_name;
int64_t dim;
DataType json_cast_type;
std::string json_path;
};

} // namespace milvus::index
Loading

0 comments on commit 0f2e61d

Please sign in to comment.