From 3d0127affeec04ccda5371df65a4062312bda2cf Mon Sep 17 00:00:00 2001 From: jiangxinmeng1 <51114574+jiangxinmeng1@users.noreply.github.com> Date: Tue, 5 Nov 2024 13:26:55 +0800 Subject: [PATCH] fix dedup (#19774) fix dedup and add debug log Approved by: @XuPeng-SH --- pkg/vm/engine/tae/db/test/db_test.go | 42 +++++++++++++++++ pkg/vm/engine/tae/tables/aobj.go | 3 +- pkg/vm/engine/tae/tables/appender.go | 3 +- pkg/vm/engine/tae/tables/base.go | 66 ++++++++++++++++++++++----- pkg/vm/engine/tae/tables/functions.go | 44 ++++++++++++++++-- 5 files changed, 142 insertions(+), 16 deletions(-) diff --git a/pkg/vm/engine/tae/db/test/db_test.go b/pkg/vm/engine/tae/db/test/db_test.go index 490a8fe06818..4e92c55a8a85 100644 --- a/pkg/vm/engine/tae/db/test/db_test.go +++ b/pkg/vm/engine/tae/db/test/db_test.go @@ -10118,6 +10118,48 @@ func TestTransferInMerge(t *testing.T) { assert.NoError(t, txn.Commit(context.Background())) } +func TestDeleteAndMerge(t *testing.T) { + ctx := context.Background() + + opts := config.WithLongScanAndCKPOpts(nil) + tae := testutil.NewTestEngine(ctx, ModuleName, t, opts) + defer tae.Close() + schema := catalog.MockSchemaAll(3, 2) + schema.Extra.BlockMaxRows = 5 + schema.Extra.ObjectMaxBlocks = 256 + tae.BindSchema(schema) + bat := catalog.MockBatch(schema, 10) + defer bat.Close() + tae.CreateRelAndAppend(bat, true) + tae.CompactBlocks(true) + + txn, rel := tae.GetRelation() + var objs []*catalog.ObjectEntry + objIt := rel.MakeObjectIt(false) + for objIt.Next() { + obj := objIt.GetObject().GetMeta().(*catalog.ObjectEntry) + if !obj.IsAppendable() { + objs = append(objs, obj) + } + } + task, err := jobs.NewMergeObjectsTask(nil, txn, objs, tae.Runtime, 0, false) + assert.NoError(t, err) + err = task.OnExec(context.Background()) + assert.NoError(t, err) + var appendTxn txnif.AsyncTxn + { + tae.DeleteAll(true) + appendTxn, err = tae.StartTxn(nil) + assert.NoError(t, err) + } + assert.NoError(t, txn.Commit(context.Background())) + tae.CompactBlocks(true) + tae.DoAppendWithTxn(bat, appendTxn, false) + + assert.NoError(t, appendTxn.Commit(ctx)) + t.Log(tae.Catalog.SimplePPString(3)) +} + func TestTransferInMerge2(t *testing.T) { ctx := context.Background() diff --git a/pkg/vm/engine/tae/tables/aobj.go b/pkg/vm/engine/tae/tables/aobj.go index 728cdaaa55f8..c35b7d859f63 100644 --- a/pkg/vm/engine/tae/tables/aobj.go +++ b/pkg/vm/engine/tae/tables/aobj.go @@ -256,7 +256,8 @@ func (obj *aobject) GetMaxRowByTS(ts types.TS) (uint32, error) { return 0, err } defer vec.Close() - tsVec := vector.MustFixedColNoTypeCheck[types.TS](vec.GetDownstreamVector()) + tsVec := vector.MustFixedColNoTypeCheck[types.TS]( + vec.GetDownstreamVector()) for i := range tsVec { if tsVec[i].GT(&ts) { return uint32(i), nil diff --git a/pkg/vm/engine/tae/tables/appender.go b/pkg/vm/engine/tae/tables/appender.go index 790f18435039..318a151c6243 100644 --- a/pkg/vm/engine/tae/tables/appender.go +++ b/pkg/vm/engine/tae/tables/appender.go @@ -120,7 +120,8 @@ func (appender *objectAppender) ApplyAppend( continue } if colDef.IsRealPrimary() && !schema.IsSecondaryIndexTable() { - if err = node.pkIndex.BatchUpsert(bat.Vecs[colDef.Idx].GetDownstreamVector(), from); err != nil { + if err = node.pkIndex.BatchUpsert( + bat.Vecs[colDef.Idx].GetDownstreamVector(), from); err != nil { panic(err) } } diff --git a/pkg/vm/engine/tae/tables/base.go b/pkg/vm/engine/tae/tables/base.go index a694acc76a1f..8b28c09dbae3 100644 --- a/pkg/vm/engine/tae/tables/base.go +++ b/pkg/vm/engine/tae/tables/base.go @@ -22,6 +22,7 @@ import ( "github.com/matrixorigin/matrixone/pkg/fileservice" "github.com/matrixorigin/matrixone/pkg/logutil" + "go.uber.org/zap" "github.com/matrixorigin/matrixone/pkg/common/moerr" "github.com/matrixorigin/matrixone/pkg/common/mpool" @@ -36,6 +37,7 @@ import ( "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/iface/data" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/iface/txnif" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/index" + "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/model" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/tables/updates" ) @@ -67,7 +69,8 @@ func newBaseObject( appendMVCC: updates.NewAppendMVCCHandle(meta), } obj.meta.Store(meta) - obj.appendMVCC.SetAppendListener(obj.OnApplyAppend) + obj.appendMVCC.SetAppendListener( + obj.OnApplyAppend) obj.RWMutex = obj.appendMVCC.RWMutex return obj } @@ -233,7 +236,8 @@ func (obj *baseObject) ResolvePersistedColumnData( col int, mp *mpool.MPool, ) (bat *containers.Batch, err error) { - err = obj.Scan(ctx, &bat, txn, readSchema, blkOffset, []int{col}, mp) + err = obj.Scan( + ctx, &bat, txn, readSchema, blkOffset, []int{col}, mp) return } @@ -267,14 +271,33 @@ func (obj *baseObject) getDuplicateRowsWithLoad( var dedupFn any if isAblk { dedupFn = containers.MakeForeachVectorOp( - keys.GetType().Oid, getRowIDAlkFunctions, data.Vecs[0], rowIDs, blkID, maxVisibleRow, obj.LoadPersistedCommitTS, txn, skipCommittedBeforeTxnForAblk, + keys.GetType().Oid, + getRowIDAlkFunctions, + data.Vecs[0], + rowIDs, + blkID, + maxVisibleRow, + obj.LoadPersistedCommitTS, + txn, + skipCommittedBeforeTxnForAblk, ) } else { dedupFn = containers.MakeForeachVectorOp( - keys.GetType().Oid, getDuplicatedRowIDNABlkFunctions, data.Vecs[0], rowIDs, blkID, + keys.GetType().Oid, + getDuplicatedRowIDNABlkFunctions, + data.Vecs[0], + rowIDs, + blkID, ) } err = containers.ForeachVector(keys, dedupFn, sels) + if err != nil { + logutil.Info("Dedup-Err", + zap.Any("err", err), + zap.String("txn", txn.Repr()), + zap.String("obj", obj.meta.Load().ID().String()), + zap.Uint16("blk offset", blkOffset)) + } return } @@ -306,13 +329,28 @@ func (obj *baseObject) containsWithLoad( if isAblk { dedupFn = containers.MakeForeachVectorOp( keys.GetType().Oid, containsAlkFunctions, data.Vecs[0], keys, obj.LoadPersistedCommitTS, txn, + func(vrowID any) *model.TransDels { + rowID := vrowID.(types.Rowid) + blkID := rowID.BorrowBlockID() + return obj.rt.TransferDelsMap.GetDelsForBlk(*blkID) + }, ) } else { dedupFn = containers.MakeForeachVectorOp( - keys.GetType().Oid, containsNABlkFunctions, data.Vecs[0], keys, + keys.GetType().Oid, + containsNABlkFunctions, + data.Vecs[0], + keys, ) } err = containers.ForeachVector(keys, dedupFn, sels) + if err != nil { + logutil.Info("Dedup-Err", + zap.Any("err", err), + zap.String("txn", txn.Repr()), + zap.String("obj", obj.meta.Load().ID().String()), + zap.Uint16("blk offset", blkOffset)) + } return } @@ -348,7 +386,8 @@ func (obj *baseObject) persistedGetDuplicatedRows( if err == nil || !moerr.IsMoErrCode(err, moerr.OkExpectedPossibleDup) { continue } - err = obj.getDuplicateRowsWithLoad(ctx, txn, keys, sels, rowIDs, uint16(i), isAblk, skipCommittedBeforeTxnForAblk, maxVisibleRow, mp) + err = obj.getDuplicateRowsWithLoad( + ctx, txn, keys, sels, rowIDs, uint16(i), isAblk, skipCommittedBeforeTxnForAblk, maxVisibleRow, mp) if err != nil { return err } @@ -386,7 +425,8 @@ func (obj *baseObject) persistedContains( if err == nil || !moerr.IsMoErrCode(err, moerr.OkExpectedPossibleDup) { continue } - err = obj.containsWithLoad(ctx, txn, keys, sels, uint16(i), isAblk, isCommitting, mp) + err = obj.containsWithLoad( + ctx, txn, keys, sels, uint16(i), isAblk, isCommitting, mp) if err != nil { return err } @@ -441,7 +481,8 @@ func (obj *baseObject) Scan( ) (err error) { node := obj.PinNode() defer node.Unref() - return node.Scan(ctx, bat, txn, readSchema.(*catalog.Schema), blkID, colIdxes, mp) + return node.Scan( + ctx, bat, txn, readSchema.(*catalog.Schema), blkID, colIdxes, mp) } func (obj *baseObject) FillBlockTombstones( @@ -503,12 +544,14 @@ func (obj *baseObject) GetValue( if !obj.meta.Load().IsTombstone && !skipCheckDelete { var bat *containers.Batch blkID := objectio.NewBlockidWithObjectID(obj.meta.Load().ID(), blkOffset) - err = HybridScanByBlock(ctx, obj.meta.Load().GetTable(), txn, &bat, readSchema.(*catalog.Schema), []int{col}, blkID, mp) + err = HybridScanByBlock( + ctx, obj.meta.Load().GetTable(), txn, &bat, readSchema.(*catalog.Schema), []int{col}, blkID, mp) if err != nil { return } defer bat.Close() - err = txn.GetStore().FillInWorkspaceDeletes(obj.meta.Load().AsCommonID(), &bat.Deletes, uint64(0)) + err = txn.GetStore().FillInWorkspaceDeletes( + obj.meta.Load().AsCommonID(), &bat.Deletes, uint64(0)) if err != nil { return } @@ -523,7 +566,8 @@ func (obj *baseObject) GetValue( return } var bat *containers.Batch - err = obj.Scan(ctx, &bat, txn, readSchema, blkOffset, []int{col}, mp) + err = obj.Scan( + ctx, &bat, txn, readSchema, blkOffset, []int{col}, mp) if err != nil { return } diff --git a/pkg/vm/engine/tae/tables/functions.go b/pkg/vm/engine/tae/tables/functions.go index 88f32350ccc3..d6c7fd8e730d 100644 --- a/pkg/vm/engine/tae/tables/functions.go +++ b/pkg/vm/engine/tae/tables/functions.go @@ -19,12 +19,15 @@ import ( "github.com/matrixorigin/matrixone/pkg/container/nulls" "github.com/matrixorigin/matrixone/pkg/container/types" "github.com/matrixorigin/matrixone/pkg/container/vector" + "github.com/matrixorigin/matrixone/pkg/logutil" "github.com/matrixorigin/matrixone/pkg/objectio" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/catalog" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/common" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/compute" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/containers" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/iface/txnif" + "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/model" + "go.uber.org/zap" ) var getDuplicatedRowIDNABlkFunctions = map[types.T]any{ @@ -159,7 +162,7 @@ func parseAGetDuplicateRowIDsArgs(args ...any) ( func parseAContainsArgs(args ...any) ( vec containers.Vector, rowIDs containers.Vector, - scanFn func(uint16) (vec containers.Vector, err error), txn txnif.TxnReader, + scanFn func(uint16) (vec containers.Vector, err error), txn txnif.TxnReader, delsFn func(rowID any) *model.TransDels, ) { vec = args[0].(containers.Vector) if args[1] != nil { @@ -171,6 +174,9 @@ func parseAContainsArgs(args ...any) ( if args[3] != nil { txn = args[3].(txnif.TxnReader) } + if args[4] != nil { + delsFn = args[4].(func(rowID any) *model.TransDels) + } return } @@ -296,6 +302,11 @@ func getDuplicatedRowIDABlkBytesFunc(args ...any) func([]byte, bool, int) error commitTS := vector.GetFixedAtNoTypeCheck[types.TS](tsVec.GetDownstreamVector(), row) startTS := txn.GetStartTS() if commitTS.GT(&startTS) { + logutil.Info("Dedup-WW", + zap.String("txn", txn.Repr()), + zap.Int("row offset", row), + zap.String("commit ts", commitTS.ToString()), + ) return txnif.ErrTxnWWConflict } if skip && commitTS.LT(&startTS) { @@ -346,6 +357,11 @@ func getDuplicatedRowIDABlkFuncFactory[T types.FixedSizeT](comp func(T, T) int) commitTS := tsVec.Get(row).(types.TS) startTS := txn.GetStartTS() if commitTS.GT(&startTS) { + logutil.Info("Dedup-WW", + zap.String("txn", txn.Repr()), + zap.Int("row offset", row), + zap.String("commit ts", commitTS.ToString()), + ) return txnif.ErrTxnWWConflict } if skip && commitTS.LT(&startTS) { @@ -361,7 +377,7 @@ func getDuplicatedRowIDABlkFuncFactory[T types.FixedSizeT](comp func(T, T) int) func containsABlkFuncFactory[T types.FixedSizeT](comp func(T, T) int) func(args ...any) func(T, bool, int) error { return func(args ...any) func(T, bool, int) error { - vec, rowIDs, scanFn, txn := parseAContainsArgs(args...) + vec, rowIDs, scanFn, txn, delsFn := parseAContainsArgs(args...) vs := vector.MustFixedColNoTypeCheck[T](vec.GetDownstreamVector()) return func(v1 T, _ bool, rowOffset int) error { if rowIDs.IsNull(rowOffset) { @@ -391,7 +407,24 @@ func containsABlkFuncFactory[T types.FixedSizeT](comp func(T, T) int) func(args commitTS := tsVec.Get(row).(types.TS) startTS := txn.GetStartTS() if commitTS.GT(&startTS) { - return txnif.ErrTxnWWConflict + dels := delsFn(v1) + ts, ok := dels.Mapping[row] + if !ok { + logutil.Info("Dedup-WW", + zap.String("txn", txn.Repr()), + zap.Int("row offset", row), + zap.String("commit ts", commitTS.ToString()), + ) + return txnif.ErrTxnWWConflict + } + if ts.GT(&startTS) { + logutil.Info("Dedup-WW", + zap.String("txn", txn.Repr()), + zap.Int("row offset", row), + zap.String("commit ts", commitTS.ToString()), + ) + return txnif.ErrTxnWWConflict + } } } return nil @@ -447,6 +480,11 @@ func dedupABlkClosureFactory( commitTS := tsVec.Get(row).(types.TS) startTS := txn.GetStartTS() if commitTS.GT(&startTS) { + logutil.Info("Dedup-WW", + zap.String("txn", txn.Repr()), + zap.Int("row offset", row), + zap.String("commit ts", commitTS.ToString()), + ) return txnif.ErrTxnWWConflict } entry := common.TypeStringValue(*vec.GetType(), v1, false)