From 48a080bd82a95be35a88fd011d2ad7e4e76b0ff3 Mon Sep 17 00:00:00 2001 From: XuPeng-SH Date: Fri, 1 Nov 2024 13:03:16 +0800 Subject: [PATCH 1/8] update 1 --- pkg/vm/engine/disttae/logtailreplay/partition_state.go | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/pkg/vm/engine/disttae/logtailreplay/partition_state.go b/pkg/vm/engine/disttae/logtailreplay/partition_state.go index 87f0854c86bf..47c07bfb10e9 100644 --- a/pkg/vm/engine/disttae/logtailreplay/partition_state.go +++ b/pkg/vm/engine/disttae/logtailreplay/partition_state.go @@ -625,7 +625,7 @@ func NewPartitionState( opts := btree.Options{ Degree: 64, } - return &PartitionState{ + ps := &PartitionState{ service: service, tid: tid, noData: noData, @@ -639,6 +639,13 @@ func NewPartitionState( shared: new(sharedStates), start: types.MaxTs(), } + logutil.Info( + "PS-CREATED", + zap.Uint64("table-id", tid), + zap.String("service", service), + zap.String("addr", fmt.Sprintf("%p", ps)), + ) + return ps } func (p *PartitionState) truncateTombstoneObjects( From 4f6206b544eb435ffeb201889ea379280224f21c Mon Sep 17 00:00:00 2001 From: XuPeng-SH Date: Fri, 1 Nov 2024 13:40:18 +0800 Subject: [PATCH 2/8] update 2 --- pkg/objectio/injects.go | 16 +++++ .../disttae/local_disttae_datasource.go | 69 +++++++++++++++---- pkg/vm/engine/engine_util/reader.go | 26 +++++++ 3 files changed, 98 insertions(+), 13 deletions(-) diff --git a/pkg/objectio/injects.go b/pkg/objectio/injects.go index 8f2ff7fdef15..bc9e3d6fa62d 100644 --- a/pkg/objectio/injects.go +++ b/pkg/objectio/injects.go @@ -16,6 +16,7 @@ package objectio import ( "context" + "strings" "github.com/matrixorigin/matrixone/pkg/util/fault" ) @@ -30,8 +31,23 @@ const ( FJ_TracePartitionState = "fj/trace/partitionstate" FJ_Debug19524 = "fj/debug/19524" + + FJ_LogReader = "fj/log/reader" ) +// `name` is the table name +// return injected, logLevel +func LogReaderInjected(name string) (bool, int) { + iarg, sarg, injected := fault.TriggerFault(FJ_LogReader) + if !injected { + return false, 0 + } + if !strings.Contains(sarg, name) { + return false, 0 + } + return true, int(iarg) +} + func Debug19524Injected() bool { _, _, injected := fault.TriggerFault(FJ_Debug19524) return injected diff --git a/pkg/vm/engine/disttae/local_disttae_datasource.go b/pkg/vm/engine/disttae/local_disttae_datasource.go index cb003ac0b560..6fb9ba7cf6d8 100644 --- a/pkg/vm/engine/disttae/local_disttae_datasource.go +++ b/pkg/vm/engine/disttae/local_disttae_datasource.go @@ -19,10 +19,11 @@ import ( "context" "encoding/hex" "fmt" - "go.uber.org/zap" "slices" "sort" + "go.uber.org/zap" + "github.com/matrixorigin/matrixone/pkg/catalog" "github.com/matrixorigin/matrixone/pkg/common/mpool" "github.com/matrixorigin/matrixone/pkg/container/batch" @@ -271,7 +272,7 @@ func (ls *LocalDisttaeDataSource) Next( filter any, mp *mpool.MPool, outBatch *batch.Batch, -) (*objectio.BlockInfo, engine.DataState, error) { +) (info *objectio.BlockInfo, state engine.DataState, err error) { if ls.memPKFilter == nil { ff := filter.(engine_util.MemPKFilter) @@ -279,7 +280,43 @@ func (ls *LocalDisttaeDataSource) Next( } if len(cols) == 0 { - return nil, engine.End, nil + state = engine.End + return + } + + injected, logLevel := objectio.LogReaderInjected(ls.table.tableName) + if injected && logLevel > 0 { + defer func() { + if err != nil { + logutil.Error( + "LOGREADER-INJECTED-2", + zap.String("table", ls.table.tableName), + zap.String("txn", ls.table.db.op.Txn().DebugString()), + zap.Error(err), + ) + return + } + if info != nil { + logutil.Info( + "LOGREADER-INJECTED-2", + zap.String("table", ls.table.tableName), + zap.String("txn", ls.table.db.op.Txn().DebugString()), + zap.String("blk", info.String()), + ) + } else { + maxLogCnt := 10 + if logLevel > 1 { + maxLogCnt = outBatch.RowCount() + } + logutil.Info( + "LOGREADER-INJECTED-2", + zap.String("table", ls.table.tableName), + zap.String("txn", ls.table.db.op.Txn().DebugString()), + zap.String("data", common.MoBatchToString(outBatch, maxLogCnt)), + zap.String("ps", fmt.Sprintf("%p", ls.pState)), + ) + } + }() } // bathed prefetch block data and deletes @@ -289,9 +326,11 @@ func (ls *LocalDisttaeDataSource) Next( switch ls.iteratePhase { case engine.InMem: outBatch.CleanOnlyData() - err := ls.iterateInMemData(ctx, cols, types, seqNums, outBatch, mp) - if err != nil { - return nil, engine.InMem, err + if err = ls.iterateInMemData( + ctx, cols, types, seqNums, outBatch, mp, + ); err != nil { + state = engine.InMem + return } if outBatch.RowCount() == 0 { @@ -299,26 +338,30 @@ func (ls *LocalDisttaeDataSource) Next( continue } - return nil, engine.InMem, nil + state = engine.InMem + return case engine.Persisted: if ls.rangesCursor >= ls.rangeSlice.Len() { - return nil, engine.End, nil + state = engine.End + return } ls.handleOrderBy() if ls.rangesCursor >= ls.rangeSlice.Len() { - return nil, engine.End, nil + state = engine.End + return } - blk := ls.rangeSlice.Get(ls.rangesCursor) + info = ls.rangeSlice.Get(ls.rangesCursor) ls.rangesCursor++ - - return blk, engine.Persisted, nil + state = engine.Persisted + return case engine.End: - return nil, ls.iteratePhase, nil + state = ls.iteratePhase + return } } } diff --git a/pkg/vm/engine/engine_util/reader.go b/pkg/vm/engine/engine_util/reader.go index 6d93a520bea5..94d2db2c3a44 100644 --- a/pkg/vm/engine/engine_util/reader.go +++ b/pkg/vm/engine/engine_util/reader.go @@ -38,6 +38,7 @@ import ( v2 "github.com/matrixorigin/matrixone/pkg/util/metric/v2" "github.com/matrixorigin/matrixone/pkg/vm/engine" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/blockio" + "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/common" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/containers" ) @@ -402,6 +403,31 @@ func (r *reader) Read( if err != nil || dataState == engine.End { r.Close() } + if injected, logLevel := objectio.LogReaderInjected(r.name); injected { + logger := logutil.Info + if err != nil { + logger = logutil.Error + } + if logLevel == 0 { + logger( + "LOGREADER-INJECTED-1", + zap.String("name", r.name), + zap.Int("data-len", outBatch.RowCount()), + zap.Error(err), + ) + } else { + maxLogCnt := 10 + if logLevel > 1 { + maxLogCnt = outBatch.RowCount() + } + logger( + "LOGREADER-INJECTED-1", + zap.String("name", r.name), + zap.Error(err), + zap.String("data", common.MoBatchToString(outBatch, maxLogCnt)), + ) + } + } }() r.tryUpdateColumns(cols) From 1b93c36318c1943af3e3c64341f260015432cc47 Mon Sep 17 00:00:00 2001 From: XuPeng-SH Date: Fri, 1 Nov 2024 14:31:33 +0800 Subject: [PATCH 3/8] update 3 --- pkg/objectio/injects.go | 39 ++++++++++++++++++- .../disttae/local_disttae_datasource.go | 2 + pkg/vm/engine/test/reader_test.go | 6 +++ 3 files changed, 45 insertions(+), 2 deletions(-) diff --git a/pkg/objectio/injects.go b/pkg/objectio/injects.go index bc9e3d6fa62d..4a7376bfb4cb 100644 --- a/pkg/objectio/injects.go +++ b/pkg/objectio/injects.go @@ -16,7 +16,6 @@ package objectio import ( "context" - "strings" "github.com/matrixorigin/matrixone/pkg/util/fault" ) @@ -42,12 +41,48 @@ func LogReaderInjected(name string) (bool, int) { if !injected { return false, 0 } - if !strings.Contains(sarg, name) { + if sarg != name { return false, 0 } return true, int(iarg) } +// inject log reader and partition state +// `name` is the table name +func InjectLog1( + name string, + level int, +) (rmFault func(), err error) { + rmFault = func() {} + if err = fault.AddFaultPoint( + context.Background(), + FJ_LogReader, + ":::", + "echo", + int64(level), + name, + ); err != nil { + return + } + if err = fault.AddFaultPoint( + context.Background(), + FJ_TracePartitionState, + ":::", + "echo", + 0, + name, + ); err != nil { + fault.RemoveFaultPoint(context.Background(), FJ_LogReader) + return + } + + rmFault = func() { + fault.RemoveFaultPoint(context.Background(), FJ_TracePartitionState) + fault.RemoveFaultPoint(context.Background(), FJ_LogReader) + } + return +} + func Debug19524Injected() bool { _, _, injected := fault.TriggerFault(FJ_Debug19524) return injected diff --git a/pkg/vm/engine/disttae/local_disttae_datasource.go b/pkg/vm/engine/disttae/local_disttae_datasource.go index 6fb9ba7cf6d8..a64c59f7dcb0 100644 --- a/pkg/vm/engine/disttae/local_disttae_datasource.go +++ b/pkg/vm/engine/disttae/local_disttae_datasource.go @@ -292,6 +292,7 @@ func (ls *LocalDisttaeDataSource) Next( "LOGREADER-INJECTED-2", zap.String("table", ls.table.tableName), zap.String("txn", ls.table.db.op.Txn().DebugString()), + zap.String("ps", fmt.Sprintf("%p", ls.pState)), zap.Error(err), ) return @@ -301,6 +302,7 @@ func (ls *LocalDisttaeDataSource) Next( "LOGREADER-INJECTED-2", zap.String("table", ls.table.tableName), zap.String("txn", ls.table.db.op.Txn().DebugString()), + zap.String("ps", fmt.Sprintf("%p", ls.pState)), zap.String("blk", info.String()), ) } else { diff --git a/pkg/vm/engine/test/reader_test.go b/pkg/vm/engine/test/reader_test.go index c3ae401aef6a..b1a4e2f8ffde 100644 --- a/pkg/vm/engine/test/reader_test.go +++ b/pkg/vm/engine/test/reader_test.go @@ -39,6 +39,7 @@ import ( plan2 "github.com/matrixorigin/matrixone/pkg/sql/plan" testutil3 "github.com/matrixorigin/matrixone/pkg/testutil" "github.com/matrixorigin/matrixone/pkg/txn/client" + "github.com/matrixorigin/matrixone/pkg/util/fault" "github.com/matrixorigin/matrixone/pkg/vm/engine" "github.com/matrixorigin/matrixone/pkg/vm/engine/disttae" "github.com/matrixorigin/matrixone/pkg/vm/engine/engine_util" @@ -78,6 +79,11 @@ func Test_ReaderCanReadRangesBlocksWithoutDeletes(t *testing.T) { schema := catalog2.MockSchemaAll(4, primaryKeyIdx) schema.Name = tableName + fault.Enable() + defer fault.Disable() + rmFault, err := objectio.InjectLog1(tableName, 2) + require.NoError(t, err) + defer rmFault() opt, err := testutil.GetS3SharedFileServiceOption(ctx, testutil.GetDefaultTestPath("test", t)) require.NoError(t, err) From 581c39be624a6b21a430bc0f3b6942db56f2d898 Mon Sep 17 00:00:00 2001 From: XuPeng-SH Date: Fri, 1 Nov 2024 14:38:16 +0800 Subject: [PATCH 4/8] update 4 --- .../engine/disttae/local_disttae_datasource.go | 3 +++ pkg/vm/engine/engine_util/reader.go | 17 ++++++++++++----- 2 files changed, 15 insertions(+), 5 deletions(-) diff --git a/pkg/vm/engine/disttae/local_disttae_datasource.go b/pkg/vm/engine/disttae/local_disttae_datasource.go index a64c59f7dcb0..1ee4a985f7c8 100644 --- a/pkg/vm/engine/disttae/local_disttae_datasource.go +++ b/pkg/vm/engine/disttae/local_disttae_datasource.go @@ -297,6 +297,9 @@ func (ls *LocalDisttaeDataSource) Next( ) return } + if state == engine.End { + return + } if info != nil { logutil.Info( "LOGREADER-INJECTED-2", diff --git a/pkg/vm/engine/engine_util/reader.go b/pkg/vm/engine/engine_util/reader.go index 94d2db2c3a44..54912fc10194 100644 --- a/pkg/vm/engine/engine_util/reader.go +++ b/pkg/vm/engine/engine_util/reader.go @@ -403,13 +403,20 @@ func (r *reader) Read( if err != nil || dataState == engine.End { r.Close() } - if injected, logLevel := objectio.LogReaderInjected(r.name); injected { - logger := logutil.Info + if injected, logLevel := objectio.LogReaderInjected(r.name); injected || err != nil { if err != nil { - logger = logutil.Error + logutil.Error( + "LOGREADER-ERROR", + zap.String("name", r.name), + zap.Error(err), + ) + return + } + if isEnd == true { + return } if logLevel == 0 { - logger( + logutil.Info( "LOGREADER-INJECTED-1", zap.String("name", r.name), zap.Int("data-len", outBatch.RowCount()), @@ -420,7 +427,7 @@ func (r *reader) Read( if logLevel > 1 { maxLogCnt = outBatch.RowCount() } - logger( + logutil.Info( "LOGREADER-INJECTED-1", zap.String("name", r.name), zap.Error(err), From 9fc810fccbfddbf6eca7c505650bbeb475b7f47a Mon Sep 17 00:00:00 2001 From: XuPeng-SH Date: Fri, 1 Nov 2024 14:46:45 +0800 Subject: [PATCH 5/8] update 5 --- pkg/vm/engine/test/disttae_engine_test.go | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/pkg/vm/engine/test/disttae_engine_test.go b/pkg/vm/engine/test/disttae_engine_test.go index 3624e940cb5a..7861f4165175 100644 --- a/pkg/vm/engine/test/disttae_engine_test.go +++ b/pkg/vm/engine/test/disttae_engine_test.go @@ -1052,6 +1052,14 @@ func TestApplyDeletesForWorkspaceAndPart(t *testing.T) { p := testutil.InitEnginePack(testutil.TestOptions{TaeEngineOptions: opts}, t) defer p.Close() tae := p.T.GetDB() + fault.Enable() + defer fault.Disable() + rmFault, err := objectio.InjectLog1( + "mo_account", + 0, + ) + require.NoError(t, err) + defer rmFault() schema := catalog2.MockSchemaAll(5, 1) schema.Name = "mo_account" @@ -1072,7 +1080,7 @@ func TestApplyDeletesForWorkspaceAndPart(t *testing.T) { exec := v.(executor.SQLExecutor) txnop = p.StartCNTxn() - _, err := exec.Exec(p.Ctx, "delete from db.mo_account where mock_1 in (0, 1)", executor.Options{}.WithTxn(txnop)) + _, err = exec.Exec(p.Ctx, "delete from db.mo_account where mock_1 in (0, 1)", executor.Options{}.WithTxn(txnop)) require.NoError(t, err) require.NoError(t, txnop.Commit(p.Ctx)) From 8de0d2902d0dd9026dbf9b68608469b4bac52fb2 Mon Sep 17 00:00:00 2001 From: XuPeng-SH Date: Fri, 1 Nov 2024 15:18:39 +0800 Subject: [PATCH 6/8] update 6 --- pkg/vm/engine/engine_util/reader.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/vm/engine/engine_util/reader.go b/pkg/vm/engine/engine_util/reader.go index 54912fc10194..e6965cc89bb9 100644 --- a/pkg/vm/engine/engine_util/reader.go +++ b/pkg/vm/engine/engine_util/reader.go @@ -412,7 +412,7 @@ func (r *reader) Read( ) return } - if isEnd == true { + if isEnd { return } if logLevel == 0 { From 7b427120f39e01383aff5caf78a616861c9c8c2f Mon Sep 17 00:00:00 2001 From: XuPeng-SH Date: Fri, 1 Nov 2024 16:05:05 +0800 Subject: [PATCH 7/8] update 7 --- pkg/objectio/injects.go | 32 ++++++++++++++++++++- pkg/vm/engine/disttae/txn.go | 34 +++++++++++++++++++++++ pkg/vm/engine/test/disttae_engine_test.go | 2 +- pkg/vm/engine/test/reader_test.go | 2 +- 4 files changed, 67 insertions(+), 3 deletions(-) diff --git a/pkg/objectio/injects.go b/pkg/objectio/injects.go index 4a7376bfb4cb..c069047881e1 100644 --- a/pkg/objectio/injects.go +++ b/pkg/objectio/injects.go @@ -31,9 +31,25 @@ const ( FJ_Debug19524 = "fj/debug/19524" - FJ_LogReader = "fj/log/reader" + FJ_LogReader = "fj/log/reader" + FJ_LogWorkspace = "fj/log/workspace" ) +const ( + FJ_C_AllNames = "_%_all_" +) + +func LogWorkspaceInjected(name string) (bool, int) { + iarg, sarg, injected := fault.TriggerFault(FJ_LogWorkspace) + if !injected { + return false, 0 + } + if sarg == name || sarg == FJ_C_AllNames { + return true, int(iarg) + } + return false, 0 +} + // `name` is the table name // return injected, logLevel func LogReaderInjected(name string) (bool, int) { @@ -76,7 +92,21 @@ func InjectLog1( return } + if err = fault.AddFaultPoint( + context.Background(), + FJ_LogWorkspace, + ":::", + "echo", + int64(level), + name, + ); err != nil { + fault.RemoveFaultPoint(context.Background(), FJ_LogReader) + fault.RemoveFaultPoint(context.Background(), FJ_TracePartitionState) + return + } + rmFault = func() { + fault.RemoveFaultPoint(context.Background(), FJ_LogWorkspace) fault.RemoveFaultPoint(context.Background(), FJ_TracePartitionState) fault.RemoveFaultPoint(context.Background(), FJ_LogReader) } diff --git a/pkg/vm/engine/disttae/txn.go b/pkg/vm/engine/disttae/txn.go index 73cf155a7ec9..1cad0ac3fe19 100644 --- a/pkg/vm/engine/disttae/txn.go +++ b/pkg/vm/engine/disttae/txn.go @@ -137,6 +137,40 @@ func (txn *Transaction) WriteBatch( txn.approximateInMemDeleteCnt += bat.RowCount() } + if injected, logLevel := objectio.LogWorkspaceInjected(tableName); injected { + if logLevel == 0 { + rowCnt := 0 + if bat != nil { + rowCnt = bat.RowCount() + } + logutil.Info( + "INJECT-LOG-WORKSPACE", + zap.String("table", tableName), + zap.String("txn", txn.op.Txn().DebugString()), + zap.String("typ", typesNames[typ]), + zap.Int("offset", len(txn.writes)), + zap.Int("rows", rowCnt), + ) + } else { + maxCnt := 10 + if logLevel > 1 && bat != nil { + maxCnt = bat.RowCount() + } + var dataStr string + if bat != nil { + dataStr = common.MoBatchToString(bat, maxCnt) + } + logutil.Info( + "INJECT-LOG-WORKSPACE", + zap.String("table", tableName), + zap.String("txn", txn.op.Txn().DebugString()), + zap.String("typ", typesNames[typ]), + zap.Int("offset", len(txn.writes)), + zap.String("data", dataStr), + ) + } + } + e := Entry{ typ: typ, accountId: accountId, diff --git a/pkg/vm/engine/test/disttae_engine_test.go b/pkg/vm/engine/test/disttae_engine_test.go index 7861f4165175..d64f3c04b0f9 100644 --- a/pkg/vm/engine/test/disttae_engine_test.go +++ b/pkg/vm/engine/test/disttae_engine_test.go @@ -1056,7 +1056,7 @@ func TestApplyDeletesForWorkspaceAndPart(t *testing.T) { defer fault.Disable() rmFault, err := objectio.InjectLog1( "mo_account", - 0, + 2, ) require.NoError(t, err) defer rmFault() diff --git a/pkg/vm/engine/test/reader_test.go b/pkg/vm/engine/test/reader_test.go index b1a4e2f8ffde..60716a19a097 100644 --- a/pkg/vm/engine/test/reader_test.go +++ b/pkg/vm/engine/test/reader_test.go @@ -81,7 +81,7 @@ func Test_ReaderCanReadRangesBlocksWithoutDeletes(t *testing.T) { schema.Name = tableName fault.Enable() defer fault.Disable() - rmFault, err := objectio.InjectLog1(tableName, 2) + rmFault, err := objectio.InjectLog1(tableName, 0) require.NoError(t, err) defer rmFault() From 2f67a01a9f05ad66c080f1b254bee51cf4e33b30 Mon Sep 17 00:00:00 2001 From: XuPeng-SH Date: Fri, 1 Nov 2024 21:54:52 +0800 Subject: [PATCH 8/8] update 8 --- pkg/vm/engine/disttae/local_disttae_datasource.go | 1 + 1 file changed, 1 insertion(+) diff --git a/pkg/vm/engine/disttae/local_disttae_datasource.go b/pkg/vm/engine/disttae/local_disttae_datasource.go index 6e5711fef2d9..ad5e87cf15b7 100644 --- a/pkg/vm/engine/disttae/local_disttae_datasource.go +++ b/pkg/vm/engine/disttae/local_disttae_datasource.go @@ -36,6 +36,7 @@ import ( "github.com/matrixorigin/matrixone/pkg/vm/engine/disttae/logtailreplay" "github.com/matrixorigin/matrixone/pkg/vm/engine/engine_util" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/blockio" + "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/common" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/containers" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/index" )