Skip to content

Commit

Permalink
refactor tombstone read related code (#18506)
Browse files Browse the repository at this point in the history
refactor tombstone related code for further usage

Approved by: @triump2020, @LeftHandCold
  • Loading branch information
XuPeng-SH authored Sep 3, 2024
1 parent 7e3e503 commit b409c91
Show file tree
Hide file tree
Showing 12 changed files with 288 additions and 197 deletions.
9 changes: 9 additions & 0 deletions pkg/objectio/object_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package objectio
import (
"bytes"
"fmt"

"github.com/matrixorigin/matrixone/pkg/common/moerr"
"github.com/matrixorigin/matrixone/pkg/container/types"
)
Expand Down Expand Up @@ -118,6 +119,14 @@ func (des *ObjectStats) ObjectLocation() Location {
return BuildLocation(des.ObjectName(), des.Extent(), 0, 0)
}

func (des *ObjectStats) BlockLocation(blk uint16, maxRows uint32) Location {
row := maxRows
if blk == uint16(des.BlkCnt())-1 {
row = des.Rows() - uint32(blk)*maxRows
}
return BuildLocation(des.ObjectName(), des.Extent(), row, blk)
}

func (des *ObjectStats) ObjectName() ObjectName {
return ObjectName(des[objectNameOffset : objectNameOffset+ObjectNameLen])
}
Expand Down
191 changes: 37 additions & 154 deletions pkg/vm/engine/disttae/datasource.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,9 @@ import (
"github.com/matrixorigin/matrixone/pkg/objectio"
"github.com/matrixorigin/matrixone/pkg/pb/plan"
"github.com/matrixorigin/matrixone/pkg/pb/timestamp"
v2 "github.com/matrixorigin/matrixone/pkg/util/metric/v2"
"github.com/matrixorigin/matrixone/pkg/vm/engine"
"github.com/matrixorigin/matrixone/pkg/vm/engine/disttae/logtailreplay"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/blockio"
catalog2 "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/catalog"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/index"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/options"
"github.com/matrixorigin/matrixone/pkg/vm/process"
Expand Down Expand Up @@ -767,51 +765,6 @@ func (ls *LocalDataSource) filterInMemCommittedInserts(
return nil
}

func loadBlockDeletesByLocation(
ctx context.Context,
fs fileservice.FileService,
blockId types.Blockid,
location objectio.Location,
snapshotTS types.TS,
) (deleteMask *nulls.Nulls, err error) {

var (
rows *nulls.Nulls
//bisect time.Duration
release func()
persistedByCN bool
persistedDeletes *batch.Batch
)

if !location.IsEmpty() {
//t1 := time.Now()

if persistedDeletes, persistedByCN, release, err = blockio.ReadBlockDelete(ctx, location, fs); err != nil {
return nil, err
}
defer release()

//readCost := time.Since(t1)

if persistedByCN {
rows = blockio.EvalDeleteRowsByTimestampForDeletesPersistedByCN(blockId, persistedDeletes)
} else {
//t2 := time.Now()
rows = blockio.EvalDeleteRowsByTimestamp(persistedDeletes, snapshotTS, &blockId)
//bisect = time.Since(t2)
}

if rows != nil {
deleteMask = rows
}

//readTotal := time.Since(t1)
//blockio.RecordReadDel(readTotal, readCost, bisect)
}

return deleteMask, nil
}

// ApplyTombstones check if any deletes exist in
// 1. unCommittedInmemDeletes:
// a. workspace writes
Expand Down Expand Up @@ -1000,24 +953,23 @@ func (ls *LocalDataSource) applyWorkspaceFlushedS3Deletes(
deletedRows.InitWithSize(8192)
}

var obj logtailreplay.ObjectEntry
scanOp := func(onTombstone func(tombstone logtailreplay.ObjectEntry) (bool, error)) error {
for _, stats := range s3FlushedDeletes.data {
obj.ObjectStats = stats
if goOn, err := onTombstone(obj); err != nil || !goOn {
return err
}
var curr int
getTombstone := func() (*objectio.ObjectStats, error) {
if curr >= len(s3FlushedDeletes.data) {
return nil, nil
}
return nil
i := curr
curr++
return &s3FlushedDeletes.data[i], nil
}

if err = GetTombstonesByBlockId(
if err = blockio.GetTombstonesByBlockId(
ls.ctx,
ls.fs,
bid,
ls.snapshotTS,
bid,
getTombstone,
deletedRows,
scanOp,
ls.fs,
); err != nil {
return nil, err
}
Expand Down Expand Up @@ -1100,22 +1052,41 @@ func (ls *LocalDataSource) applyPStateTombstoneObjects(
return offsets, nil
}

scanOp := func(onTombstone func(tombstone logtailreplay.ObjectEntry) (bool, error)) (err error) {
return ForeachTombstoneObject(ls.snapshotTS, onTombstone, ls.pState)
var iter logtailreplay.ObjectsIter
getTombstone := func() (*objectio.ObjectStats, error) {
var err error
if iter == nil {
if iter, err = ls.pState.NewObjectsIter(
ls.snapshotTS, true, true,
); err != nil {
return nil, err
}
}
if iter.Next() {
entry := iter.Entry()
return &entry.ObjectStats, nil
}
return nil, nil
}
defer func() {
if iter != nil {
iter.Close()
}
}()

if deletedRows == nil {
deletedRows = &nulls.Nulls{}
deletedRows.InitWithSize(8192)
}

if err := GetTombstonesByBlockId(
if err := blockio.GetTombstonesByBlockId(
ls.ctx,
ls.fs,
bid,
ls.snapshotTS,
bid,
getTombstone,
deletedRows,
scanOp); err != nil {
ls.fs,
); err != nil {
return nil, err
}

Expand Down Expand Up @@ -1155,94 +1126,6 @@ func (ls *LocalDataSource) batchPrefetch(seqNums []uint16) {
ls.rc.batchPrefetchCursor = end
}

func GetTombstonesByBlockId(
ctx context.Context,
fs fileservice.FileService,
bid objectio.Blockid,
snapshot types.TS,
deleteMask *nulls.Nulls,
scanOp func(func(tombstone logtailreplay.ObjectEntry) (bool, error)) error,
) (err error) {

var (
totalBlk int
zmBreak int
blBreak int
loaded int
totalScanned int
)

onTombstone := func(obj logtailreplay.ObjectEntry) (bool, error) {
totalScanned++
if !obj.ZMIsEmpty() {
objZM := obj.SortKeyZoneMap()
if skip := !objZM.PrefixEq(bid[:]); skip {
zmBreak++
return true, nil
}
}

var objMeta objectio.ObjectMeta

location := obj.Location()

if objMeta, err = objectio.FastLoadObjectMeta(
ctx, &location, false, fs,
); err != nil {
return false, err
}
dataMeta := objMeta.MustDataMeta()

blkCnt := int(dataMeta.BlockCount())
totalBlk += blkCnt

startIdx := sort.Search(blkCnt, func(i int) bool {
return dataMeta.GetBlockMeta(uint32(i)).MustGetColumn(0).ZoneMap().AnyGEByValue(bid[:])
})

for pos := startIdx; pos < blkCnt; pos++ {
blkMeta := dataMeta.GetBlockMeta(uint32(pos))
columnZonemap := blkMeta.MustGetColumn(0).ZoneMap()
// block id is the prefix of the rowid and zonemap is min-max of rowid
// !PrefixEq means there is no rowid of this block in this zonemap, so skip
if !columnZonemap.PrefixEq(bid[:]) {
if columnZonemap.PrefixGT(bid[:]) {
// all zone maps are sorted by the rowid
// if the block id is less than the prefix of the min rowid, skip the rest blocks
break
}
continue
}
loaded++
tombstoneLoc := catalog2.BuildLocation(obj.ObjectStats, uint16(pos), options.DefaultBlockMaxRows)

var mask *nulls.Nulls

if mask, err = loadBlockDeletesByLocation(
ctx, fs, bid, tombstoneLoc, snapshot,
); err != nil {
return false, err
}

deleteMask.Or(mask)
}
return true, nil
}

err = scanOp(onTombstone)

v2.TxnReaderEachBLKLoadedTombstoneHistogram.Observe(float64(loaded))
v2.TxnReaderScannedTotalTombstoneHistogram.Observe(float64(totalScanned))
if totalScanned != 0 {
v2.TxnReaderTombstoneZMSelectivityHistogram.Observe(float64(zmBreak) / float64(totalScanned))
}
if totalBlk != 0 {
v2.TxnReaderTombstoneBLSelectivityHistogram.Observe(float64(blBreak) / float64(totalBlk))
}

return err
}

func (ls *LocalDataSource) batchApplyTombstoneObjects(
minTS types.TS,
rowIds []types.Rowid,
Expand Down Expand Up @@ -1321,7 +1204,7 @@ func (ls *LocalDataSource) batchApplyTombstoneObjects(
return nil, err
}

location = catalog2.BuildLocation(obj.ObjectStats, uint16(idx), options.DefaultBlockMaxRows)
location = obj.ObjectStats.BlockLocation(uint16(idx), options.DefaultBlockMaxRows)

if loaded, persistedByCN, release, err = blockio.ReadBlockDelete(ls.ctx, location, ls.fs); err != nil {
return nil, err
Expand Down
5 changes: 1 addition & 4 deletions pkg/vm/engine/disttae/logtailreplay/change_handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,10 +126,7 @@ func readObjects(stats objectio.ObjectStats, blockID uint32, fs fileservice.File
if isTombstone {
metaType = objectio.SchemaTombstone
}
loc := catalog.BuildLocation(
stats,
uint16(blockID),
8192)
loc := stats.BlockLocation(uint16(blockID), 8192)
bat, _, err = blockio.LoadOneBlock(
ctx,
fs,
Expand Down
29 changes: 13 additions & 16 deletions pkg/vm/engine/disttae/tombstones.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"fmt"
"sort"

"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/catalog"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/index"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/options"
"go.uber.org/zap"
Expand All @@ -32,7 +31,6 @@ import (
"github.com/matrixorigin/matrixone/pkg/logutil"
"github.com/matrixorigin/matrixone/pkg/objectio"
"github.com/matrixorigin/matrixone/pkg/vm/engine"
"github.com/matrixorigin/matrixone/pkg/vm/engine/disttae/logtailreplay"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/blockio"
)

Expand Down Expand Up @@ -228,7 +226,7 @@ func (tomb *tombstoneData) PrefetchTombstones(
for i, end := 0, tomb.files.Len(); i < end; i++ {
stats := tomb.files.Get(i)
for j := 0; j < int(stats.BlkCnt()); j++ {
loc := catalog.BuildLocation(*stats, uint16(j), options.DefaultBlockMaxRows)
loc := stats.BlockLocation(uint16(j), options.DefaultBlockMaxRows)
if err := blockio.Prefetch(
srvId,
[]uint16{0, 1, 2},
Expand Down Expand Up @@ -277,30 +275,29 @@ func (tomb *tombstoneData) ApplyPersistedTombstones(
return
}

var obj logtailreplay.ObjectEntry
scanOp := func(onTombstone func(tombstone logtailreplay.ObjectEntry) (bool, error)) error {
for i, end := 0, tomb.files.Len(); i < end; i++ {
stats := tomb.files.Get(i)
obj.ObjectStats = *stats
if goOn, err := onTombstone(obj); err != nil || !goOn {
return err
}
var curr int
getTombstone := func() (*objectio.ObjectStats, error) {
if curr >= tomb.files.Len() {
return nil, nil
}
return nil
i := curr
curr++
return tomb.files.Get(i), nil
}

if deletedMask == nil {
deletedMask = &nulls.Nulls{}
deletedMask.InitWithSize(8192)
}

if err = GetTombstonesByBlockId(
if err = blockio.GetTombstonesByBlockId(
ctx,
fs,
bid,
snapshot,
bid,
getTombstone,
deletedMask,
scanOp); err != nil {
fs,
); err != nil {
return nil, err
}

Expand Down
41 changes: 41 additions & 0 deletions pkg/vm/engine/tae/blockio/read.go
Original file line number Diff line number Diff line change
Expand Up @@ -749,3 +749,44 @@ func FindIntervalForBlock(rowids []types.Rowid, id *types.Blockid) (start int, e
end = i
return
}

func FillBlockDeleteMask(
ctx context.Context,
snapshotTS types.TS,
blockId types.Blockid,
location objectio.Location,
fs fileservice.FileService,
) (deleteMask *nulls.Nulls, err error) {
var (
rows *nulls.Nulls
//bisect time.Duration
release func()
persistedByCN bool
persistedDeletes *batch.Batch
)

if !location.IsEmpty() {
//t1 := time.Now()

if persistedDeletes, persistedByCN, release, err = ReadBlockDelete(ctx, location, fs); err != nil {
return nil, err
}
defer release()

//readCost := time.Since(t1)

if persistedByCN {
rows = EvalDeleteRowsByTimestampForDeletesPersistedByCN(blockId, persistedDeletes)
} else {
//t2 := time.Now()
rows = EvalDeleteRowsByTimestamp(persistedDeletes, snapshotTS, &blockId)
//bisect = time.Since(t2)
}

if rows != nil {
deleteMask = rows
}
}

return deleteMask, nil
}
Loading

0 comments on commit b409c91

Please sign in to comment.