Skip to content

Commit

Permalink
change cluster fiterratio as collection properties
Browse files Browse the repository at this point in the history
Signed-off-by: wayblink <[email protected]>
  • Loading branch information
wayblink committed Jan 9, 2024
1 parent e9f4f37 commit 848eb46
Show file tree
Hide file tree
Showing 24 changed files with 103 additions and 64 deletions.
1 change: 1 addition & 0 deletions internal/proto/query_coord.proto
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,7 @@ message LoadSegmentsRequest {
bool need_transfer = 11;
LoadScope load_scope = 12;
repeated index.IndexInfo index_info_list = 13;
repeated common.KeyValuePair collection_properties = 14;
}

message ReleaseSegmentsRequest {
Expand Down
23 changes: 12 additions & 11 deletions internal/querycoordv2/task/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,17 +129,18 @@ func packLoadSegmentRequest(
commonpbutil.WithMsgType(commonpb.MsgType_LoadSegments),
commonpbutil.WithMsgID(task.ID()),
),
Infos: []*querypb.SegmentLoadInfo{loadInfo},
Schema: schema, // assign it for compatibility of rolling upgrade from 2.2.x to 2.3
LoadMeta: loadMeta, // assign it for compatibility of rolling upgrade from 2.2.x to 2.3
CollectionID: task.CollectionID(),
ReplicaID: task.ReplicaID(),
DeltaPositions: []*msgpb.MsgPosition{loadInfo.GetDeltaPosition()}, // assign it for compatibility of rolling upgrade from 2.2.x to 2.3
DstNodeID: action.Node(),
Version: time.Now().UnixNano(),
NeedTransfer: true,
IndexInfoList: indexInfo,
LoadScope: loadScope,
Infos: []*querypb.SegmentLoadInfo{loadInfo},
Schema: schema, // assign it for compatibility of rolling upgrade from 2.2.x to 2.3
LoadMeta: loadMeta, // assign it for compatibility of rolling upgrade from 2.2.x to 2.3
CollectionID: task.CollectionID(),
ReplicaID: task.ReplicaID(),
DeltaPositions: []*msgpb.MsgPosition{loadInfo.GetDeltaPosition()}, // assign it for compatibility of rolling upgrade from 2.2.x to 2.3
DstNodeID: action.Node(),
Version: time.Now().UnixNano(),
NeedTransfer: true,
IndexInfoList: indexInfo,
LoadScope: loadScope,
CollectionProperties: collectionProperties,
}
}

Expand Down
33 changes: 25 additions & 8 deletions internal/querynodev2/delegator/delegator.go
Original file line number Diff line number Diff line change
Expand Up @@ -747,24 +747,41 @@ func getVectorFieldFromSchema(schema *schemapb.CollectionSchema) (*VectorField,
// Todo support call external hook
func (sd *shardDelegator) OptimizeSearchBasedOnClustering(req *querypb.SearchRequest, sealeds []SnapshotItem) (*querypb.SearchRequest, []SnapshotItem) {
log := log.With(zap.Int64("searchRequestID", req.GetReq().GetReqID()))
if !paramtable.Get().QueryNodeCfg.EnableSearchBasedOnClustering.GetAsBool() {
if !paramtable.Get().QueryNodeCfg.ClusteringOptimizeSearchEnable.GetAsBool() {
log.Debug("skip OptimizeSearchBasedOnClustering by system config")
return req, sealeds
}
if req.GetReq().GetClusteringOptions().GetFilterRatio() >= 1 {
log.Debug("skip OptimizeSearchBasedOnClustering by user config")

if sd.vectorField.dataType != schemapb.DataType_FloatVector {
log.Debug("Currently we only support FloatVector")
return req, sealeds
}

// filter ratio set priority: user client config > collection config > system config
var filterRatio float64
collectionPropertiesMap := funcutil.KeyValuePair2Map(sd.collection.GetCollectionProperties())
ratio, exist := collectionPropertiesMap[clustering.CollectionClusteringOptimizeSearchFilterRatio]
if !exist {
ratio = clustering.DefaultCollectionClusteringOptimizeSearchFilterRatio
}
filterRatio, err := strconv.ParseFloat(ratio, 64)
if err != nil {
log.Error("Error format for filterRatio", zap.String("ratio", ratio), zap.Error(err))
return req, sealeds
}
if req.GetReq().GetClusteringOptions().GetFilterRatio() != 0 {
filterRatio = float64(req.GetReq().GetClusteringOptions().GetFilterRatio())
}

metricType := req.GetReq().GetMetricType()
topK := req.GetReq().GetTopk()
filterRatio := req.GetReq().GetClusteringOptions().GetFilterRatio()
log.Debug("SearchRequest parameter",
log.Debug("Search parameter",
zap.String("metricType", metricType),
zap.Int64("topK", topK),
zap.Float32("filterRatio", filterRatio))
zap.Float64("filterRatio", filterRatio))

var phg commonpb.PlaceholderGroup
err := proto.Unmarshal(req.GetReq().GetPlaceholderGroup(), &phg)
err = proto.Unmarshal(req.GetReq().GetPlaceholderGroup(), &phg)
if err != nil {
log.Warn("fail to parse SearchRequest PlaceholderGroup", zap.Error(err))
return req, sealeds
Expand Down Expand Up @@ -832,7 +849,7 @@ func (sd *shardDelegator) OptimizeSearchBasedOnClustering(req *querypb.SearchReq
}

toFilterSegNum := len(vectorSegmentDistance)
targetSegNum := int(float32(toFilterSegNum) * filterRatio)
targetSegNum := int(float64(toFilterSegNum) * filterRatio)
var optimizedRowNums int64
for i, segmentDistance := range vectorSegmentDistance {
if i < targetSegNum || optimizedRowNums < topK {
Expand Down
2 changes: 1 addition & 1 deletion internal/querynodev2/delegator/delegator_data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ func (s *DelegatorDataSuite) SetupTest() {
},
}, &querypb.LoadMetaInfo{
LoadType: querypb.LoadType_LoadCollection,
})
}, nil)

s.mq = &msgstream.MockMsgStream{}

Expand Down
12 changes: 6 additions & 6 deletions internal/querynodev2/delegator/delegator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ func (s *DelegatorSuite) SetupTest() {
},
}, &querypb.LoadMetaInfo{
PartitionIDs: s.partitionIDs,
})
}, nil)

s.mq = &msgstream.MockMsgStream{}

Expand Down Expand Up @@ -1269,7 +1269,7 @@ func (s *DelegatorSuite) TestOptimizeSearchBasedOnClustering_config() {
assert.NotEmpty(s.T(), snapshots)
assert.Equal(s.T(), 2, len(snapshots[0].Segments))

paramtable.Get().Save(paramtable.Get().QueryNodeCfg.EnableSearchBasedOnClustering.Key, "false")
paramtable.Get().Save(paramtable.Get().QueryNodeCfg.ClusteringOptimizeSearchEnable.Key, "false")
sr2, snapshots2 := s.delegator.OptimizeSearchBasedOnClustering(req, sealeds)
assert.NotEmpty(s.T(), sr2)
assert.NotEmpty(s.T(), snapshots2)
Expand All @@ -1279,7 +1279,7 @@ func (s *DelegatorSuite) TestOptimizeSearchBasedOnClustering_config() {
func (s *DelegatorSuite) TestOptimizeSearchBasedOnClustering_nq1() {
s.delegator.Start()
paramtable.SetNodeID(1)
paramtable.Get().Save(paramtable.Get().QueryNodeCfg.EnableSearchBasedOnClustering.Key, "true")
paramtable.Get().Save(paramtable.Get().QueryNodeCfg.ClusteringOptimizeSearchEnable.Key, "true")
vector1 := []float32{0.40448594, 0.16214314, 0.17850745, 0.6640584, 0.77309024, 0.48807725, 0.66572666, 0.15990956}
vectors := [][]float32{vector1}

Expand Down Expand Up @@ -1360,7 +1360,7 @@ func (s *DelegatorSuite) TestOptimizeSearchBasedOnClustering_nq1() {
func (s *DelegatorSuite) TestOptimizeSearchBasedOnClustering_nq2() {
s.delegator.Start()
paramtable.SetNodeID(1)
paramtable.Get().Save(paramtable.Get().QueryNodeCfg.EnableSearchBasedOnClustering.Key, "true")
paramtable.Get().Save(paramtable.Get().QueryNodeCfg.ClusteringOptimizeSearchEnable.Key, "true")
vector1 := []float32{0.8877872002188053, 0.6131822285635065, 0.8476814632326242, 0.6645877829359371, 0.9962627712600025, 0.8976183052440327, 0.41941169325798844, 0.7554387854258499}
vector2 := []float32{0.8644394874390322, 0.023327886647378615, 0.08330118483461302, 0.7068040179963112, 0.6983994910799851, 0.5562075958994153, 0.3288536247938002, 0.07077341010237759}
vectors := [][]float32{vector1, vector2}
Expand Down Expand Up @@ -1440,7 +1440,7 @@ func (s *DelegatorSuite) TestOptimizeSearchBasedOnClustering_nq2() {
func (s *DelegatorSuite) TestOptimizeSearchBasedOnClustering_nq1_LargeTopK() {
s.delegator.Start()
paramtable.SetNodeID(1)
paramtable.Get().Save(paramtable.Get().QueryNodeCfg.EnableSearchBasedOnClustering.Key, "true")
paramtable.Get().Save(paramtable.Get().QueryNodeCfg.ClusteringOptimizeSearchEnable.Key, "true")
vector1 := []float32{0.40448594, 0.16214314, 0.17850745, 0.6640584, 0.77309024, 0.48807725, 0.66572666, 0.15990956}
vectors := [][]float32{vector1}

Expand Down Expand Up @@ -1531,7 +1531,7 @@ func (s *DelegatorSuite) TestOptimizeSearchBasedOnClustering_nq1_LargeTopK() {
func (s *DelegatorSuite) TestOptimizeSearchBasedOnClustering_WrongClusteringInfo() {
s.delegator.Start()
paramtable.SetNodeID(1)
paramtable.Get().Save(paramtable.Get().QueryNodeCfg.EnableSearchBasedOnClustering.Key, "true")
paramtable.Get().Save(paramtable.Get().QueryNodeCfg.ClusteringOptimizeSearchEnable.Key, "true")
vector1 := []float32{0.40448594, 0.16214314, 0.17850745, 0.6640584, 0.77309024, 0.48807725, 0.66572666, 0.15990956}
vectors := [][]float32{vector1}

Expand Down
4 changes: 2 additions & 2 deletions internal/querynodev2/local_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,12 +94,12 @@ func (suite *LocalWorkerTestSuite) BeforeTest(suiteName, testName string) {

suite.schema = segments.GenTestCollectionSchema(suite.collectionName, schemapb.DataType_Int64)
suite.indexMeta = segments.GenTestIndexMeta(suite.collectionID, suite.schema)
collection := segments.NewCollection(suite.collectionID, suite.schema, suite.indexMeta, querypb.LoadType_LoadCollection)
collection := segments.NewCollection(suite.collectionID, suite.schema, suite.indexMeta, querypb.LoadType_LoadCollection, nil)
loadMata := &querypb.LoadMetaInfo{
LoadType: querypb.LoadType_LoadCollection,
CollectionID: suite.collectionID,
}
suite.node.manager.Collection.PutOrRef(suite.collectionID, collection.Schema(), suite.indexMeta, loadMata)
suite.node.manager.Collection.PutOrRef(suite.collectionID, collection.Schema(), suite.indexMeta, loadMata, nil)
suite.worker = NewLocalWorker(suite.node)
}

Expand Down
4 changes: 2 additions & 2 deletions internal/querynodev2/pipeline/insert_node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func (suite *InsertNodeSuite) TestBasic() {
schema := segments.GenTestCollectionSchema(suite.collectionName, schemapb.DataType_Int64)
in := suite.buildInsertNodeMsg(schema)

collection := segments.NewCollection(suite.collectionID, schema, segments.GenTestIndexMeta(suite.collectionID, schema), querypb.LoadType_LoadCollection)
collection := segments.NewCollection(suite.collectionID, schema, segments.GenTestIndexMeta(suite.collectionID, schema), querypb.LoadType_LoadCollection, nil)
collection.AddPartition(suite.partitionID)

// init mock
Expand Down Expand Up @@ -95,7 +95,7 @@ func (suite *InsertNodeSuite) TestDataTypeNotSupported() {
schema := segments.GenTestCollectionSchema(suite.collectionName, schemapb.DataType_Int64)
in := suite.buildInsertNodeMsg(schema)

collection := segments.NewCollection(suite.collectionID, schema, segments.GenTestIndexMeta(suite.collectionID, schema), querypb.LoadType_LoadCollection)
collection := segments.NewCollection(suite.collectionID, schema, segments.GenTestIndexMeta(suite.collectionID, schema), querypb.LoadType_LoadCollection, nil)
collection.AddPartition(suite.partitionID)

// init mock
Expand Down
2 changes: 1 addition & 1 deletion internal/querynodev2/pipeline/pipeline_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ func (suite *PipelineTestSuite) TestBasic() {
// init mock
// mock collection manager
schema := segments.GenTestCollectionSchema(suite.collectionName, schemapb.DataType_Int64)
collection := segments.NewCollection(suite.collectionID, schema, segments.GenTestIndexMeta(suite.collectionID, schema), querypb.LoadType_LoadCollection)
collection := segments.NewCollection(suite.collectionID, schema, segments.GenTestIndexMeta(suite.collectionID, schema), querypb.LoadType_LoadCollection, nil)
suite.collectionManager.EXPECT().Get(suite.collectionID).Return(collection)

// mock mq factory
Expand Down
29 changes: 19 additions & 10 deletions internal/querynodev2/segments/collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ import (

type CollectionManager interface {
Get(collectionID int64) *Collection
PutOrRef(collectionID int64, schema *schemapb.CollectionSchema, meta *segcorepb.CollectionIndexMeta, loadMeta *querypb.LoadMetaInfo)
PutOrRef(collectionID int64, schema *schemapb.CollectionSchema, meta *segcorepb.CollectionIndexMeta, loadMeta *querypb.LoadMetaInfo, collectionProperties []*commonpb.KeyValuePair)
Ref(collectionID int64, count uint32) bool
// unref the collection,
// returns true if the collection ref count goes 0, or the collection not exists,
Expand All @@ -71,18 +71,20 @@ func (m *collectionManager) Get(collectionID int64) *Collection {
return m.collections[collectionID]
}

func (m *collectionManager) PutOrRef(collectionID int64, schema *schemapb.CollectionSchema, meta *segcorepb.CollectionIndexMeta, loadMeta *querypb.LoadMetaInfo) {
func (m *collectionManager) PutOrRef(collectionID int64, schema *schemapb.CollectionSchema, meta *segcorepb.CollectionIndexMeta, loadMeta *querypb.LoadMetaInfo, collectionProperties []*commonpb.KeyValuePair) {
m.mut.Lock()
defer m.mut.Unlock()

if collection, ok := m.collections[collectionID]; ok {
// the schema may be changed even the collection is loaded
collection.schema.Store(schema)
// update collectionProperties
collection.collectionProperties = collectionProperties
collection.Ref(1)
return
}

collection := NewCollection(collectionID, schema, meta, loadMeta.GetLoadType())
collection := NewCollection(collectionID, schema, meta, loadMeta.GetLoadType(), collectionProperties)
collection.AddPartition(loadMeta.GetPartitionIDs()...)
collection.Ref(1)
m.collections[collectionID] = collection
Expand Down Expand Up @@ -129,6 +131,8 @@ type Collection struct {
isGpuIndex bool

refCount *atomic.Uint32

collectionProperties []*commonpb.KeyValuePair
}

// ID returns collection id
Expand Down Expand Up @@ -192,8 +196,12 @@ func (c *Collection) Unref(count uint32) uint32 {
return refCount
}

func (c *Collection) GetCollectionProperties() []*commonpb.KeyValuePair {
return c.collectionProperties
}

// newCollection returns a new Collection
func NewCollection(collectionID int64, schema *schemapb.CollectionSchema, indexMeta *segcorepb.CollectionIndexMeta, loadType querypb.LoadType) *Collection {
func NewCollection(collectionID int64, schema *schemapb.CollectionSchema, indexMeta *segcorepb.CollectionIndexMeta, loadType querypb.LoadType, collectionProperties []*commonpb.KeyValuePair) *Collection {
/*
CCollection
NewCollection(const char* schema_proto_blob);
Expand Down Expand Up @@ -226,12 +234,13 @@ func NewCollection(collectionID int64, schema *schemapb.CollectionSchema, indexM
}

coll := &Collection{
collectionPtr: collection,
id: collectionID,
partitions: typeutil.NewConcurrentSet[int64](),
loadType: loadType,
refCount: atomic.NewUint32(0),
isGpuIndex: isGpuIndex,
collectionPtr: collection,
id: collectionID,
partitions: typeutil.NewConcurrentSet[int64](),
loadType: loadType,
refCount: atomic.NewUint32(0),
isGpuIndex: isGpuIndex,
collectionProperties: collectionProperties,
}
coll.schema.Store(schema)

Expand Down
2 changes: 1 addition & 1 deletion internal/querynodev2/segments/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func (s *ManagerSuite) SetupTest() {
schema := GenTestCollectionSchema("manager-suite", schemapb.DataType_Int64)
segment, err := NewSegment(
context.Background(),
NewCollection(s.collectionIDs[i], schema, GenTestIndexMeta(s.collectionIDs[i], schema), querypb.LoadType_LoadCollection),
NewCollection(s.collectionIDs[i], schema, GenTestIndexMeta(s.collectionIDs[i], schema), querypb.LoadType_LoadCollection, nil),
id,
s.partitionIDs[i],
s.collectionIDs[i],
Expand Down
18 changes: 10 additions & 8 deletions internal/querynodev2/segments/mock_collection_manager.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion internal/querynodev2/segments/plan_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func (suite *PlanSuite) SetupTest() {
suite.partitionID = 10
suite.segmentID = 1
schema := GenTestCollectionSchema("plan-suite", schemapb.DataType_Int64)
suite.collection = NewCollection(suite.collectionID, schema, GenTestIndexMeta(suite.collectionID, schema), querypb.LoadType_LoadCollection)
suite.collection = NewCollection(suite.collectionID, schema, GenTestIndexMeta(suite.collectionID, schema), querypb.LoadType_LoadCollection, nil)
suite.collection.AddPartition(suite.partitionID)
}

Expand Down
1 change: 1 addition & 0 deletions internal/querynodev2/segments/reduce_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ func (suite *ReduceSuite) SetupTest() {
schema,
GenTestIndexMeta(suite.collectionID, schema),
querypb.LoadType_LoadCollection,
nil,
)
suite.segment, err = NewSegment(ctx,
suite.collection,
Expand Down
1 change: 1 addition & 0 deletions internal/querynodev2/segments/retrieve_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ func (suite *RetrieveSuite) SetupTest() {
CollectionID: suite.collectionID,
PartitionIDs: []int64{suite.partitionID},
},
nil,
)
suite.collection = suite.manager.Collection.Get(suite.collectionID)

Expand Down
1 change: 1 addition & 0 deletions internal/querynodev2/segments/search_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ func (suite *SearchSuite) SetupTest() {
CollectionID: suite.collectionID,
PartitionIDs: []int64{suite.partitionID},
},
nil,
)
suite.collection = suite.manager.Collection.Get(suite.collectionID)

Expand Down
4 changes: 2 additions & 2 deletions internal/querynodev2/segments/segment_loader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func (suite *SegmentLoaderSuite) SetupTest() {
CollectionID: suite.collectionID,
PartitionIDs: []int64{suite.partitionID},
}
suite.manager.Collection.PutOrRef(suite.collectionID, suite.schema, indexMeta, loadMeta)
suite.manager.Collection.PutOrRef(suite.collectionID, suite.schema, indexMeta, loadMeta, nil)
}

func (suite *SegmentLoaderSuite) TearDownTest() {
Expand Down Expand Up @@ -684,7 +684,7 @@ func (suite *SegmentLoaderDetailSuite) SetupTest() {
PartitionIDs: []int64{suite.partitionID},
}

collection := NewCollection(suite.collectionID, schema, indexMeta, loadMeta.GetLoadType())
collection := NewCollection(suite.collectionID, schema, indexMeta, loadMeta.GetLoadType(), nil)
suite.collectionManager.EXPECT().Get(suite.collectionID).Return(collection).Maybe()
}

Expand Down
1 change: 1 addition & 0 deletions internal/querynodev2/segments/segment_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ func (suite *SegmentSuite) SetupTest() {
CollectionID: suite.collectionID,
PartitionIDs: []int64{suite.partitionID},
},
nil,
)
suite.collection = suite.manager.Collection.Get(suite.collectionID)

Expand Down
Loading

0 comments on commit 848eb46

Please sign in to comment.