Skip to content

Commit

Permalink
Add object's bloom filter (#9432)
Browse files Browse the repository at this point in the history
Approved by: @XuPeng-SH
  • Loading branch information
LeftHandCold authored May 14, 2023
1 parent 09ebd31 commit b7ceaaa
Show file tree
Hide file tree
Showing 8 changed files with 73 additions and 47 deletions.
2 changes: 1 addition & 1 deletion pkg/objectio/constructors.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func constructorFactory(size int64, algo uint8) CacheConstructor {
}

// no compress
if algo == 0 {
if algo == compress.None {
return data, int64(len(data)), nil
}

Expand Down
4 changes: 4 additions & 0 deletions pkg/objectio/meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,10 @@ func (bf BloomFilter) GetBloomFilter(BlockID uint32) []byte {
return bf[offset : offset+length]
}

func (bf BloomFilter) GetObjectBloomFilter() []byte {
return bf.GetBloomFilter(bf.BlockCount())
}

type ZoneMapArea []byte

func (zma ZoneMapArea) BlockCount() uint32 {
Expand Down
16 changes: 13 additions & 3 deletions pkg/objectio/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ type objectWriterV1 struct {
lastId uint32
name ObjectName
compressBuf []byte
bloomFilter []byte
}

type blockData struct {
Expand Down Expand Up @@ -141,6 +142,11 @@ func (w *objectWriterV1) WriteBF(blkIdx int, seqnum uint16, buf []byte) (err err
return
}

func (w *objectWriterV1) WriteObjectMetaBF(buf []byte) (err error) {
w.bloomFilter = buf
return
}

func (w *objectWriterV1) WriteObjectMeta(ctx context.Context, totalrow uint32, metas []ColumnMeta) {
w.totalRow = totalrow
w.colmeta = metas
Expand Down Expand Up @@ -219,19 +225,23 @@ func (w *objectWriterV1) prepareBloomFilter(blockCount uint32, offset uint32) ([
h := IOEntryHeader{IOET_BF, IOET_BloomFilter_CurrVer}
buf.Write(EncodeIOEntryHeader(&h))
bloomFilterStart := uint32(0)
bloomFilterIndex := BuildBlockIndex(blockCount)
bloomFilterIndex.SetBlockCount(blockCount)
bloomFilterIndex := BuildBlockIndex(blockCount + 1)
bloomFilterIndex.SetBlockCount(blockCount + 1)
bloomFilterStart += bloomFilterIndex.Length()
for i, block := range w.blocks {
n := uint32(len(block.bloomFilter))
bloomFilterIndex.SetBlockMetaPos(uint32(i), bloomFilterStart, n)
bloomFilterStart += n
}
bloomFilterIndex.SetBlockMetaPos(blockCount, bloomFilterStart, uint32(len(w.bloomFilter)))
buf.Write(bloomFilterIndex)
for _, block := range w.blocks {
buf.Write(block.bloomFilter)
}
return w.WriteWithCompress(offset, buf.Bytes())
buf.Write(w.bloomFilter)
length := uint32(len(buf.Bytes()))
extent := NewExtent(compress.None, offset, length, length)
return buf.Bytes(), extent, nil
}

func (w *objectWriterV1) prepareZoneMapArea(blockCount uint32, offset uint32) ([]byte, Extent, error) {
Expand Down
21 changes: 17 additions & 4 deletions pkg/vm/engine/tae/blockio/write_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ type ObjectColumnMetasBuilder struct {
metas []objectio.ColumnMeta
sks []*hll.Sketch
zms []index.ZM
pkData []containers.Vector
}

func NewObjectColumnMetasBuilder(colIdx int) *ObjectColumnMetasBuilder {
Expand All @@ -34,17 +35,22 @@ func NewObjectColumnMetasBuilder(colIdx int) *ObjectColumnMetasBuilder {
metas[i] = objectio.BuildObjectColumnMeta()
}
return &ObjectColumnMetasBuilder{
metas: metas,
sks: make([]*hll.Sketch, colIdx),
zms: make([]index.ZM, colIdx),
metas: metas,
sks: make([]*hll.Sketch, colIdx),
zms: make([]index.ZM, colIdx),
pkData: make([]containers.Vector, 0),
}
}

func (b *ObjectColumnMetasBuilder) AddRowCnt(rows int) {
b.totalRow += uint32(rows)
}

func (b *ObjectColumnMetasBuilder) InspectVector(idx int, vec containers.Vector) {
func (b *ObjectColumnMetasBuilder) AddPKData(data containers.Vector) {
b.pkData = append(b.pkData, data)
}

func (b *ObjectColumnMetasBuilder) InspectVector(idx int, vec containers.Vector, isPK bool) {
if vec.HasNull() {
cnt := b.metas[idx].NullCnt()
cnt += uint32(vec.NullCount())
Expand All @@ -54,6 +60,9 @@ func (b *ObjectColumnMetasBuilder) InspectVector(idx int, vec containers.Vector)
if b.zms[idx] == nil {
b.zms[idx] = index.NewZM(vec.GetType().Oid, vec.GetType().Scale)
}
if isPK {
return
}
if b.sks[idx] == nil {
b.sks[idx] = hll.New()
}
Expand All @@ -76,6 +85,10 @@ func (b *ObjectColumnMetasBuilder) UpdateZm(idx int, zm index.ZM) {
index.UpdateZM(b.zms[idx], zm.GetMaxBuf())
}

func (b *ObjectColumnMetasBuilder) GetPKData() []containers.Vector {
return b.pkData
}

func (b *ObjectColumnMetasBuilder) Build() (uint32, []objectio.ColumnMeta) {
for i := range b.metas {
if b.sks[i] != nil { // rowid or types.TS
Expand Down
17 changes: 16 additions & 1 deletion pkg/vm/engine/tae/blockio/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,15 +80,19 @@ func (w *BlockWriter) WriteBatch(batch *batch.Batch) (objectio.BlockObject, erro
}
seqnums := w.writer.GetSeqnums()
for i, vec := range batch.Vecs {
isPK := false
if i == 0 {
w.objMetaBuilder.AddRowCnt(vec.Length())
}
if vec.GetType().Oid == types.T_Rowid || vec.GetType().Oid == types.T_TS {
continue
}
if w.isSetPK && w.pk == uint16(i) {
isPK = true
}
columnData := containers.ToDNVector(vec)
// update null count and distinct value
w.objMetaBuilder.InspectVector(i, columnData)
w.objMetaBuilder.InspectVector(i, columnData, isPK)

// Build ZM
zm := index.NewZM(vec.GetType().Oid, vec.GetType().Scale)
Expand All @@ -103,6 +107,7 @@ func (w *BlockWriter) WriteBatch(batch *batch.Batch) (objectio.BlockObject, erro
if !w.isSetPK || w.pk != uint16(i) {
continue
}
w.objMetaBuilder.AddPKData(columnData)
bf, err := index.NewBinaryFuseFilter(columnData)
if err != nil {
return nil, err
Expand All @@ -128,6 +133,16 @@ func (w *BlockWriter) Sync(ctx context.Context) ([]objectio.BlockObject, objecti
if w.objMetaBuilder != nil {
cnt, meta := w.objMetaBuilder.Build()
w.writer.WriteObjectMeta(ctx, cnt, meta)
columnsData := w.objMetaBuilder.GetPKData()
bf, err := index.NewBinaryFuseFilterByVectors(columnsData)
if err != nil {
return nil, nil, err
}
buf, err := bf.Marshal()
if err != nil {
return nil, nil, err
}
w.writer.WriteObjectMetaBF(buf)
}
blocks, err := w.writer.WriteEnd(ctx)
if len(blocks) == 0 {
Expand Down
27 changes: 0 additions & 27 deletions pkg/vm/engine/tae/containers/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -317,33 +317,6 @@ func MockVector2(typ types.Type, rows int, offset int) Vector {
return vec
}

func MockVector3(typ types.Type, rows int) Vector {
vec := MakeVector(typ)
switch typ.Oid {
case types.T_int32:
for i := 0; i < rows; i++ {
vec.Append(int32(rows), false)
}
case types.T_int64:
for i := 0; i < rows; i++ {
vec.Append(int64(rows), false)
}
case types.T_uint32:
for i := 0; i < rows; i++ {
vec.Append(uint32(i), false)
vec.Append(uint32(i), false)
i++
}
case types.T_uint64:
for i := 0; i < rows; i++ {
vec.Append(uint64(rows), false)
}
default:
panic("not support")
}
return vec
}

func MockBatchWithAttrs(vecTypes []types.Type, attrs []string, rows int, uniqueIdx int, provider *MockDataProvider) (bat *Batch) {
bat = MockNullableBatch(vecTypes, rows, uniqueIdx, provider)
bat.Attrs = attrs
Expand Down
31 changes: 21 additions & 10 deletions pkg/vm/engine/tae/index/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,17 +56,12 @@ type binaryFuseFilter struct {
}

func NewBinaryFuseFilter(data containers.Vector) (StaticFilter, error) {
hashes := make([]uint64, 0, data.Length())
op := func(v []byte, _ bool, _ int) error {
hash := hashV1(v)
hashes = append(hashes, hash)
return nil
}
var err error
if err = containers.ForeachWindowBytes(data, 0, data.Length(), op, nil); err != nil {
return nil, err
}
return NewBinaryFuseFilterByVectors([]containers.Vector{data})
}

func buildFuseFilter(hashes []uint64) (StaticFilter, error) {
var inner *xorfilter.BinaryFuse8
var err error
if inner, err = xorfilter.PopulateBinaryFuse8(hashes); err != nil {
if err.Error() == FuseFilterError {
// 230+ duplicate keys in hashes
Expand All @@ -82,6 +77,22 @@ func NewBinaryFuseFilter(data containers.Vector) (StaticFilter, error) {
return sf, nil
}

func NewBinaryFuseFilterByVectors(datas []containers.Vector) (StaticFilter, error) {
hashes := make([]uint64, 0)
op := func(v []byte, _ bool, _ int) error {
hash := hashV1(v)
hashes = append(hashes, hash)
return nil
}
var err error
for _, data := range datas {
if err = containers.ForeachWindowBytes(data, 0, data.Length(), op, nil); err != nil {
return nil, err
}
}
return buildFuseFilter(hashes)
}

func (filter *binaryFuseFilter) MayContainsKey(key []byte) (bool, error) {
hash := hashV1(key)
return filter.Contains(hash), nil
Expand Down
2 changes: 1 addition & 1 deletion pkg/vm/engine/tae/index/filter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func TestStaticFilterNumeric(t *testing.T) {
func TestNewBinaryFuseFilter(t *testing.T) {
testutils.EnsureNoLeak(t)
typ := types.T_uint32.ToType()
data := containers.MockVector3(typ, 2000)
data := containers.MockVector2(typ, 2000, 0)
defer data.Close()
_, err := NewBinaryFuseFilter(data)
require.NoError(t, err)
Expand Down

0 comments on commit b7ceaaa

Please sign in to comment.