diff --git a/pkg/objectio/injects.go b/pkg/objectio/injects.go index 8f2ff7fdef15..c069047881e1 100644 --- a/pkg/objectio/injects.go +++ b/pkg/objectio/injects.go @@ -30,8 +30,89 @@ const ( FJ_TracePartitionState = "fj/trace/partitionstate" FJ_Debug19524 = "fj/debug/19524" + + FJ_LogReader = "fj/log/reader" + FJ_LogWorkspace = "fj/log/workspace" +) + +const ( + FJ_C_AllNames = "_%_all_" ) +func LogWorkspaceInjected(name string) (bool, int) { + iarg, sarg, injected := fault.TriggerFault(FJ_LogWorkspace) + if !injected { + return false, 0 + } + if sarg == name || sarg == FJ_C_AllNames { + return true, int(iarg) + } + return false, 0 +} + +// `name` is the table name +// return injected, logLevel +func LogReaderInjected(name string) (bool, int) { + iarg, sarg, injected := fault.TriggerFault(FJ_LogReader) + if !injected { + return false, 0 + } + if sarg != name { + return false, 0 + } + return true, int(iarg) +} + +// inject log reader and partition state +// `name` is the table name +func InjectLog1( + name string, + level int, +) (rmFault func(), err error) { + rmFault = func() {} + if err = fault.AddFaultPoint( + context.Background(), + FJ_LogReader, + ":::", + "echo", + int64(level), + name, + ); err != nil { + return + } + if err = fault.AddFaultPoint( + context.Background(), + FJ_TracePartitionState, + ":::", + "echo", + 0, + name, + ); err != nil { + fault.RemoveFaultPoint(context.Background(), FJ_LogReader) + return + } + + if err = fault.AddFaultPoint( + context.Background(), + FJ_LogWorkspace, + ":::", + "echo", + int64(level), + name, + ); err != nil { + fault.RemoveFaultPoint(context.Background(), FJ_LogReader) + fault.RemoveFaultPoint(context.Background(), FJ_TracePartitionState) + return + } + + rmFault = func() { + fault.RemoveFaultPoint(context.Background(), FJ_LogWorkspace) + fault.RemoveFaultPoint(context.Background(), FJ_TracePartitionState) + fault.RemoveFaultPoint(context.Background(), FJ_LogReader) + } + return +} + func Debug19524Injected() bool { _, _, injected := fault.TriggerFault(FJ_Debug19524) return injected diff --git a/pkg/vm/engine/disttae/local_disttae_datasource.go b/pkg/vm/engine/disttae/local_disttae_datasource.go index 89df496f7bf1..ad5e87cf15b7 100644 --- a/pkg/vm/engine/disttae/local_disttae_datasource.go +++ b/pkg/vm/engine/disttae/local_disttae_datasource.go @@ -21,6 +21,8 @@ import ( "slices" "sort" + "go.uber.org/zap" + "github.com/matrixorigin/matrixone/pkg/catalog" "github.com/matrixorigin/matrixone/pkg/common/mpool" "github.com/matrixorigin/matrixone/pkg/container/batch" @@ -34,6 +36,7 @@ import ( "github.com/matrixorigin/matrixone/pkg/vm/engine/disttae/logtailreplay" "github.com/matrixorigin/matrixone/pkg/vm/engine/engine_util" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/blockio" + "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/common" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/containers" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/index" ) @@ -268,7 +271,7 @@ func (ls *LocalDisttaeDataSource) Next( filter any, mp *mpool.MPool, outBatch *batch.Batch, -) (*objectio.BlockInfo, engine.DataState, error) { +) (info *objectio.BlockInfo, state engine.DataState, err error) { if ls.memPKFilter == nil { ff := filter.(engine_util.MemPKFilter) @@ -276,7 +279,48 @@ func (ls *LocalDisttaeDataSource) Next( } if len(cols) == 0 { - return nil, engine.End, nil + state = engine.End + return + } + + injected, logLevel := objectio.LogReaderInjected(ls.table.tableName) + if injected && logLevel > 0 { + defer func() { + if err != nil { + logutil.Error( + "LOGREADER-INJECTED-2", + zap.String("table", ls.table.tableName), + zap.String("txn", ls.table.db.op.Txn().DebugString()), + zap.String("ps", fmt.Sprintf("%p", ls.pState)), + zap.Error(err), + ) + return + } + if state == engine.End { + return + } + if info != nil { + logutil.Info( + "LOGREADER-INJECTED-2", + zap.String("table", ls.table.tableName), + zap.String("txn", ls.table.db.op.Txn().DebugString()), + zap.String("ps", fmt.Sprintf("%p", ls.pState)), + zap.String("blk", info.String()), + ) + } else { + maxLogCnt := 10 + if logLevel > 1 { + maxLogCnt = outBatch.RowCount() + } + logutil.Info( + "LOGREADER-INJECTED-2", + zap.String("table", ls.table.tableName), + zap.String("txn", ls.table.db.op.Txn().DebugString()), + zap.String("data", common.MoBatchToString(outBatch, maxLogCnt)), + zap.String("ps", fmt.Sprintf("%p", ls.pState)), + ) + } + }() } // bathed prefetch block data and deletes @@ -286,9 +330,11 @@ func (ls *LocalDisttaeDataSource) Next( switch ls.iteratePhase { case engine.InMem: outBatch.CleanOnlyData() - err := ls.iterateInMemData(ctx, cols, types, seqNums, outBatch, mp) - if err != nil { - return nil, engine.InMem, err + if err = ls.iterateInMemData( + ctx, cols, types, seqNums, outBatch, mp, + ); err != nil { + state = engine.InMem + return } if outBatch.RowCount() == 0 { @@ -296,26 +342,30 @@ func (ls *LocalDisttaeDataSource) Next( continue } - return nil, engine.InMem, nil + state = engine.InMem + return case engine.Persisted: if ls.rangesCursor >= ls.rangeSlice.Len() { - return nil, engine.End, nil + state = engine.End + return } ls.handleOrderBy() if ls.rangesCursor >= ls.rangeSlice.Len() { - return nil, engine.End, nil + state = engine.End + return } - blk := ls.rangeSlice.Get(ls.rangesCursor) + info = ls.rangeSlice.Get(ls.rangesCursor) ls.rangesCursor++ - - return blk, engine.Persisted, nil + state = engine.Persisted + return case engine.End: - return nil, ls.iteratePhase, nil + state = ls.iteratePhase + return } } } diff --git a/pkg/vm/engine/disttae/logtailreplay/partition_state.go b/pkg/vm/engine/disttae/logtailreplay/partition_state.go index 87f0854c86bf..47c07bfb10e9 100644 --- a/pkg/vm/engine/disttae/logtailreplay/partition_state.go +++ b/pkg/vm/engine/disttae/logtailreplay/partition_state.go @@ -625,7 +625,7 @@ func NewPartitionState( opts := btree.Options{ Degree: 64, } - return &PartitionState{ + ps := &PartitionState{ service: service, tid: tid, noData: noData, @@ -639,6 +639,13 @@ func NewPartitionState( shared: new(sharedStates), start: types.MaxTs(), } + logutil.Info( + "PS-CREATED", + zap.Uint64("table-id", tid), + zap.String("service", service), + zap.String("addr", fmt.Sprintf("%p", ps)), + ) + return ps } func (p *PartitionState) truncateTombstoneObjects( diff --git a/pkg/vm/engine/disttae/txn.go b/pkg/vm/engine/disttae/txn.go index 73cf155a7ec9..1cad0ac3fe19 100644 --- a/pkg/vm/engine/disttae/txn.go +++ b/pkg/vm/engine/disttae/txn.go @@ -137,6 +137,40 @@ func (txn *Transaction) WriteBatch( txn.approximateInMemDeleteCnt += bat.RowCount() } + if injected, logLevel := objectio.LogWorkspaceInjected(tableName); injected { + if logLevel == 0 { + rowCnt := 0 + if bat != nil { + rowCnt = bat.RowCount() + } + logutil.Info( + "INJECT-LOG-WORKSPACE", + zap.String("table", tableName), + zap.String("txn", txn.op.Txn().DebugString()), + zap.String("typ", typesNames[typ]), + zap.Int("offset", len(txn.writes)), + zap.Int("rows", rowCnt), + ) + } else { + maxCnt := 10 + if logLevel > 1 && bat != nil { + maxCnt = bat.RowCount() + } + var dataStr string + if bat != nil { + dataStr = common.MoBatchToString(bat, maxCnt) + } + logutil.Info( + "INJECT-LOG-WORKSPACE", + zap.String("table", tableName), + zap.String("txn", txn.op.Txn().DebugString()), + zap.String("typ", typesNames[typ]), + zap.Int("offset", len(txn.writes)), + zap.String("data", dataStr), + ) + } + } + e := Entry{ typ: typ, accountId: accountId, diff --git a/pkg/vm/engine/engine_util/reader.go b/pkg/vm/engine/engine_util/reader.go index 6d93a520bea5..e6965cc89bb9 100644 --- a/pkg/vm/engine/engine_util/reader.go +++ b/pkg/vm/engine/engine_util/reader.go @@ -38,6 +38,7 @@ import ( v2 "github.com/matrixorigin/matrixone/pkg/util/metric/v2" "github.com/matrixorigin/matrixone/pkg/vm/engine" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/blockio" + "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/common" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/containers" ) @@ -402,6 +403,38 @@ func (r *reader) Read( if err != nil || dataState == engine.End { r.Close() } + if injected, logLevel := objectio.LogReaderInjected(r.name); injected || err != nil { + if err != nil { + logutil.Error( + "LOGREADER-ERROR", + zap.String("name", r.name), + zap.Error(err), + ) + return + } + if isEnd { + return + } + if logLevel == 0 { + logutil.Info( + "LOGREADER-INJECTED-1", + zap.String("name", r.name), + zap.Int("data-len", outBatch.RowCount()), + zap.Error(err), + ) + } else { + maxLogCnt := 10 + if logLevel > 1 { + maxLogCnt = outBatch.RowCount() + } + logutil.Info( + "LOGREADER-INJECTED-1", + zap.String("name", r.name), + zap.Error(err), + zap.String("data", common.MoBatchToString(outBatch, maxLogCnt)), + ) + } + } }() r.tryUpdateColumns(cols) diff --git a/pkg/vm/engine/test/disttae_engine_test.go b/pkg/vm/engine/test/disttae_engine_test.go index 3624e940cb5a..d64f3c04b0f9 100644 --- a/pkg/vm/engine/test/disttae_engine_test.go +++ b/pkg/vm/engine/test/disttae_engine_test.go @@ -1052,6 +1052,14 @@ func TestApplyDeletesForWorkspaceAndPart(t *testing.T) { p := testutil.InitEnginePack(testutil.TestOptions{TaeEngineOptions: opts}, t) defer p.Close() tae := p.T.GetDB() + fault.Enable() + defer fault.Disable() + rmFault, err := objectio.InjectLog1( + "mo_account", + 2, + ) + require.NoError(t, err) + defer rmFault() schema := catalog2.MockSchemaAll(5, 1) schema.Name = "mo_account" @@ -1072,7 +1080,7 @@ func TestApplyDeletesForWorkspaceAndPart(t *testing.T) { exec := v.(executor.SQLExecutor) txnop = p.StartCNTxn() - _, err := exec.Exec(p.Ctx, "delete from db.mo_account where mock_1 in (0, 1)", executor.Options{}.WithTxn(txnop)) + _, err = exec.Exec(p.Ctx, "delete from db.mo_account where mock_1 in (0, 1)", executor.Options{}.WithTxn(txnop)) require.NoError(t, err) require.NoError(t, txnop.Commit(p.Ctx)) diff --git a/pkg/vm/engine/test/reader_test.go b/pkg/vm/engine/test/reader_test.go index c3ae401aef6a..60716a19a097 100644 --- a/pkg/vm/engine/test/reader_test.go +++ b/pkg/vm/engine/test/reader_test.go @@ -39,6 +39,7 @@ import ( plan2 "github.com/matrixorigin/matrixone/pkg/sql/plan" testutil3 "github.com/matrixorigin/matrixone/pkg/testutil" "github.com/matrixorigin/matrixone/pkg/txn/client" + "github.com/matrixorigin/matrixone/pkg/util/fault" "github.com/matrixorigin/matrixone/pkg/vm/engine" "github.com/matrixorigin/matrixone/pkg/vm/engine/disttae" "github.com/matrixorigin/matrixone/pkg/vm/engine/engine_util" @@ -78,6 +79,11 @@ func Test_ReaderCanReadRangesBlocksWithoutDeletes(t *testing.T) { schema := catalog2.MockSchemaAll(4, primaryKeyIdx) schema.Name = tableName + fault.Enable() + defer fault.Disable() + rmFault, err := objectio.InjectLog1(tableName, 0) + require.NoError(t, err) + defer rmFault() opt, err := testutil.GetS3SharedFileServiceOption(ctx, testutil.GetDefaultTestPath("test", t)) require.NoError(t, err)