Skip to content

Commit

Permalink
fix dedup (#19774)
Browse files Browse the repository at this point in the history
fix dedup and add debug log

Approved by: @XuPeng-SH
  • Loading branch information
jiangxinmeng1 authored Nov 5, 2024
1 parent 6650c8e commit 3d0127a
Show file tree
Hide file tree
Showing 5 changed files with 142 additions and 16 deletions.
42 changes: 42 additions & 0 deletions pkg/vm/engine/tae/db/test/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10118,6 +10118,48 @@ func TestTransferInMerge(t *testing.T) {
assert.NoError(t, txn.Commit(context.Background()))
}

func TestDeleteAndMerge(t *testing.T) {
ctx := context.Background()

opts := config.WithLongScanAndCKPOpts(nil)
tae := testutil.NewTestEngine(ctx, ModuleName, t, opts)
defer tae.Close()
schema := catalog.MockSchemaAll(3, 2)
schema.Extra.BlockMaxRows = 5
schema.Extra.ObjectMaxBlocks = 256
tae.BindSchema(schema)
bat := catalog.MockBatch(schema, 10)
defer bat.Close()
tae.CreateRelAndAppend(bat, true)
tae.CompactBlocks(true)

txn, rel := tae.GetRelation()
var objs []*catalog.ObjectEntry
objIt := rel.MakeObjectIt(false)
for objIt.Next() {
obj := objIt.GetObject().GetMeta().(*catalog.ObjectEntry)
if !obj.IsAppendable() {
objs = append(objs, obj)
}
}
task, err := jobs.NewMergeObjectsTask(nil, txn, objs, tae.Runtime, 0, false)
assert.NoError(t, err)
err = task.OnExec(context.Background())
assert.NoError(t, err)
var appendTxn txnif.AsyncTxn
{
tae.DeleteAll(true)
appendTxn, err = tae.StartTxn(nil)
assert.NoError(t, err)
}
assert.NoError(t, txn.Commit(context.Background()))
tae.CompactBlocks(true)
tae.DoAppendWithTxn(bat, appendTxn, false)

assert.NoError(t, appendTxn.Commit(ctx))
t.Log(tae.Catalog.SimplePPString(3))
}

func TestTransferInMerge2(t *testing.T) {
ctx := context.Background()

Expand Down
3 changes: 2 additions & 1 deletion pkg/vm/engine/tae/tables/aobj.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,8 @@ func (obj *aobject) GetMaxRowByTS(ts types.TS) (uint32, error) {
return 0, err
}
defer vec.Close()
tsVec := vector.MustFixedColNoTypeCheck[types.TS](vec.GetDownstreamVector())
tsVec := vector.MustFixedColNoTypeCheck[types.TS](
vec.GetDownstreamVector())
for i := range tsVec {
if tsVec[i].GT(&ts) {
return uint32(i), nil
Expand Down
3 changes: 2 additions & 1 deletion pkg/vm/engine/tae/tables/appender.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,8 @@ func (appender *objectAppender) ApplyAppend(
continue
}
if colDef.IsRealPrimary() && !schema.IsSecondaryIndexTable() {
if err = node.pkIndex.BatchUpsert(bat.Vecs[colDef.Idx].GetDownstreamVector(), from); err != nil {
if err = node.pkIndex.BatchUpsert(
bat.Vecs[colDef.Idx].GetDownstreamVector(), from); err != nil {
panic(err)
}
}
Expand Down
66 changes: 55 additions & 11 deletions pkg/vm/engine/tae/tables/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (

"github.com/matrixorigin/matrixone/pkg/fileservice"
"github.com/matrixorigin/matrixone/pkg/logutil"
"go.uber.org/zap"

"github.com/matrixorigin/matrixone/pkg/common/moerr"
"github.com/matrixorigin/matrixone/pkg/common/mpool"
Expand All @@ -36,6 +37,7 @@ import (
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/iface/data"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/iface/txnif"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/index"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/model"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/tables/updates"
)

Expand Down Expand Up @@ -67,7 +69,8 @@ func newBaseObject(
appendMVCC: updates.NewAppendMVCCHandle(meta),
}
obj.meta.Store(meta)
obj.appendMVCC.SetAppendListener(obj.OnApplyAppend)
obj.appendMVCC.SetAppendListener(
obj.OnApplyAppend)
obj.RWMutex = obj.appendMVCC.RWMutex
return obj
}
Expand Down Expand Up @@ -233,7 +236,8 @@ func (obj *baseObject) ResolvePersistedColumnData(
col int,
mp *mpool.MPool,
) (bat *containers.Batch, err error) {
err = obj.Scan(ctx, &bat, txn, readSchema, blkOffset, []int{col}, mp)
err = obj.Scan(
ctx, &bat, txn, readSchema, blkOffset, []int{col}, mp)
return
}

Expand Down Expand Up @@ -267,14 +271,33 @@ func (obj *baseObject) getDuplicateRowsWithLoad(
var dedupFn any
if isAblk {
dedupFn = containers.MakeForeachVectorOp(
keys.GetType().Oid, getRowIDAlkFunctions, data.Vecs[0], rowIDs, blkID, maxVisibleRow, obj.LoadPersistedCommitTS, txn, skipCommittedBeforeTxnForAblk,
keys.GetType().Oid,
getRowIDAlkFunctions,
data.Vecs[0],
rowIDs,
blkID,
maxVisibleRow,
obj.LoadPersistedCommitTS,
txn,
skipCommittedBeforeTxnForAblk,
)
} else {
dedupFn = containers.MakeForeachVectorOp(
keys.GetType().Oid, getDuplicatedRowIDNABlkFunctions, data.Vecs[0], rowIDs, blkID,
keys.GetType().Oid,
getDuplicatedRowIDNABlkFunctions,
data.Vecs[0],
rowIDs,
blkID,
)
}
err = containers.ForeachVector(keys, dedupFn, sels)
if err != nil {
logutil.Info("Dedup-Err",
zap.Any("err", err),
zap.String("txn", txn.Repr()),
zap.String("obj", obj.meta.Load().ID().String()),
zap.Uint16("blk offset", blkOffset))
}
return
}

Expand Down Expand Up @@ -306,13 +329,28 @@ func (obj *baseObject) containsWithLoad(
if isAblk {
dedupFn = containers.MakeForeachVectorOp(
keys.GetType().Oid, containsAlkFunctions, data.Vecs[0], keys, obj.LoadPersistedCommitTS, txn,
func(vrowID any) *model.TransDels {
rowID := vrowID.(types.Rowid)
blkID := rowID.BorrowBlockID()
return obj.rt.TransferDelsMap.GetDelsForBlk(*blkID)
},
)
} else {
dedupFn = containers.MakeForeachVectorOp(
keys.GetType().Oid, containsNABlkFunctions, data.Vecs[0], keys,
keys.GetType().Oid,
containsNABlkFunctions,
data.Vecs[0],
keys,
)
}
err = containers.ForeachVector(keys, dedupFn, sels)
if err != nil {
logutil.Info("Dedup-Err",
zap.Any("err", err),
zap.String("txn", txn.Repr()),
zap.String("obj", obj.meta.Load().ID().String()),
zap.Uint16("blk offset", blkOffset))
}
return
}

Expand Down Expand Up @@ -348,7 +386,8 @@ func (obj *baseObject) persistedGetDuplicatedRows(
if err == nil || !moerr.IsMoErrCode(err, moerr.OkExpectedPossibleDup) {
continue
}
err = obj.getDuplicateRowsWithLoad(ctx, txn, keys, sels, rowIDs, uint16(i), isAblk, skipCommittedBeforeTxnForAblk, maxVisibleRow, mp)
err = obj.getDuplicateRowsWithLoad(
ctx, txn, keys, sels, rowIDs, uint16(i), isAblk, skipCommittedBeforeTxnForAblk, maxVisibleRow, mp)
if err != nil {
return err
}
Expand Down Expand Up @@ -386,7 +425,8 @@ func (obj *baseObject) persistedContains(
if err == nil || !moerr.IsMoErrCode(err, moerr.OkExpectedPossibleDup) {
continue
}
err = obj.containsWithLoad(ctx, txn, keys, sels, uint16(i), isAblk, isCommitting, mp)
err = obj.containsWithLoad(
ctx, txn, keys, sels, uint16(i), isAblk, isCommitting, mp)
if err != nil {
return err
}
Expand Down Expand Up @@ -441,7 +481,8 @@ func (obj *baseObject) Scan(
) (err error) {
node := obj.PinNode()
defer node.Unref()
return node.Scan(ctx, bat, txn, readSchema.(*catalog.Schema), blkID, colIdxes, mp)
return node.Scan(
ctx, bat, txn, readSchema.(*catalog.Schema), blkID, colIdxes, mp)
}

func (obj *baseObject) FillBlockTombstones(
Expand Down Expand Up @@ -503,12 +544,14 @@ func (obj *baseObject) GetValue(
if !obj.meta.Load().IsTombstone && !skipCheckDelete {
var bat *containers.Batch
blkID := objectio.NewBlockidWithObjectID(obj.meta.Load().ID(), blkOffset)
err = HybridScanByBlock(ctx, obj.meta.Load().GetTable(), txn, &bat, readSchema.(*catalog.Schema), []int{col}, blkID, mp)
err = HybridScanByBlock(
ctx, obj.meta.Load().GetTable(), txn, &bat, readSchema.(*catalog.Schema), []int{col}, blkID, mp)
if err != nil {
return
}
defer bat.Close()
err = txn.GetStore().FillInWorkspaceDeletes(obj.meta.Load().AsCommonID(), &bat.Deletes, uint64(0))
err = txn.GetStore().FillInWorkspaceDeletes(
obj.meta.Load().AsCommonID(), &bat.Deletes, uint64(0))
if err != nil {
return
}
Expand All @@ -523,7 +566,8 @@ func (obj *baseObject) GetValue(
return
}
var bat *containers.Batch
err = obj.Scan(ctx, &bat, txn, readSchema, blkOffset, []int{col}, mp)
err = obj.Scan(
ctx, &bat, txn, readSchema, blkOffset, []int{col}, mp)
if err != nil {
return
}
Expand Down
44 changes: 41 additions & 3 deletions pkg/vm/engine/tae/tables/functions.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,15 @@ import (
"github.com/matrixorigin/matrixone/pkg/container/nulls"
"github.com/matrixorigin/matrixone/pkg/container/types"
"github.com/matrixorigin/matrixone/pkg/container/vector"
"github.com/matrixorigin/matrixone/pkg/logutil"
"github.com/matrixorigin/matrixone/pkg/objectio"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/catalog"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/common"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/compute"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/containers"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/iface/txnif"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/model"
"go.uber.org/zap"
)

var getDuplicatedRowIDNABlkFunctions = map[types.T]any{
Expand Down Expand Up @@ -159,7 +162,7 @@ func parseAGetDuplicateRowIDsArgs(args ...any) (

func parseAContainsArgs(args ...any) (
vec containers.Vector, rowIDs containers.Vector,
scanFn func(uint16) (vec containers.Vector, err error), txn txnif.TxnReader,
scanFn func(uint16) (vec containers.Vector, err error), txn txnif.TxnReader, delsFn func(rowID any) *model.TransDels,
) {
vec = args[0].(containers.Vector)
if args[1] != nil {
Expand All @@ -171,6 +174,9 @@ func parseAContainsArgs(args ...any) (
if args[3] != nil {
txn = args[3].(txnif.TxnReader)
}
if args[4] != nil {
delsFn = args[4].(func(rowID any) *model.TransDels)
}
return
}

Expand Down Expand Up @@ -296,6 +302,11 @@ func getDuplicatedRowIDABlkBytesFunc(args ...any) func([]byte, bool, int) error
commitTS := vector.GetFixedAtNoTypeCheck[types.TS](tsVec.GetDownstreamVector(), row)
startTS := txn.GetStartTS()
if commitTS.GT(&startTS) {
logutil.Info("Dedup-WW",
zap.String("txn", txn.Repr()),
zap.Int("row offset", row),
zap.String("commit ts", commitTS.ToString()),
)
return txnif.ErrTxnWWConflict
}
if skip && commitTS.LT(&startTS) {
Expand Down Expand Up @@ -346,6 +357,11 @@ func getDuplicatedRowIDABlkFuncFactory[T types.FixedSizeT](comp func(T, T) int)
commitTS := tsVec.Get(row).(types.TS)
startTS := txn.GetStartTS()
if commitTS.GT(&startTS) {
logutil.Info("Dedup-WW",
zap.String("txn", txn.Repr()),
zap.Int("row offset", row),
zap.String("commit ts", commitTS.ToString()),
)
return txnif.ErrTxnWWConflict
}
if skip && commitTS.LT(&startTS) {
Expand All @@ -361,7 +377,7 @@ func getDuplicatedRowIDABlkFuncFactory[T types.FixedSizeT](comp func(T, T) int)

func containsABlkFuncFactory[T types.FixedSizeT](comp func(T, T) int) func(args ...any) func(T, bool, int) error {
return func(args ...any) func(T, bool, int) error {
vec, rowIDs, scanFn, txn := parseAContainsArgs(args...)
vec, rowIDs, scanFn, txn, delsFn := parseAContainsArgs(args...)
vs := vector.MustFixedColNoTypeCheck[T](vec.GetDownstreamVector())
return func(v1 T, _ bool, rowOffset int) error {
if rowIDs.IsNull(rowOffset) {
Expand Down Expand Up @@ -391,7 +407,24 @@ func containsABlkFuncFactory[T types.FixedSizeT](comp func(T, T) int) func(args
commitTS := tsVec.Get(row).(types.TS)
startTS := txn.GetStartTS()
if commitTS.GT(&startTS) {
return txnif.ErrTxnWWConflict
dels := delsFn(v1)
ts, ok := dels.Mapping[row]
if !ok {
logutil.Info("Dedup-WW",
zap.String("txn", txn.Repr()),
zap.Int("row offset", row),
zap.String("commit ts", commitTS.ToString()),
)
return txnif.ErrTxnWWConflict
}
if ts.GT(&startTS) {
logutil.Info("Dedup-WW",
zap.String("txn", txn.Repr()),
zap.Int("row offset", row),
zap.String("commit ts", commitTS.ToString()),
)
return txnif.ErrTxnWWConflict
}
}
}
return nil
Expand Down Expand Up @@ -447,6 +480,11 @@ func dedupABlkClosureFactory(
commitTS := tsVec.Get(row).(types.TS)
startTS := txn.GetStartTS()
if commitTS.GT(&startTS) {
logutil.Info("Dedup-WW",
zap.String("txn", txn.Repr()),
zap.Int("row offset", row),
zap.String("commit ts", commitTS.ToString()),
)
return txnif.ErrTxnWWConflict
}
entry := common.TypeStringValue(*vec.GetType(), v1, false)
Expand Down

0 comments on commit 3d0127a

Please sign in to comment.