From 83f7e714c17bfd705e23f209fb484221c40d27c6 Mon Sep 17 00:00:00 2001 From: LiuBo Date: Mon, 4 Nov 2024 10:30:24 +0800 Subject: [PATCH] [bug] txn client: push the txn's snapshot ts --- pkg/cnservice/distributed_tae.go | 1 + pkg/frontend/test/engine_mock.go | 74 ++++++++++++++++++- pkg/frontend/test/txn_mock.go | 49 ++++++++++++ pkg/txn/client/client.go | 33 +++++++-- pkg/txn/client/client_test.go | 3 +- pkg/txn/client/operator.go | 10 +++ pkg/txn/client/operator_test.go | 17 +++++ pkg/txn/client/types.go | 10 +++ .../memorystorage/storage_txn_client.go | 4 + pkg/vm/engine/disttae/cache/catalog.go | 6 ++ pkg/vm/engine/disttae/db.go | 4 + pkg/vm/engine/entire_engine.go | 4 + pkg/vm/engine/entire_engine_test.go | 4 + pkg/vm/engine/memoryengine/binded.go | 4 + pkg/vm/engine/memoryengine/engine.go | 4 + pkg/vm/engine/types.go | 2 + 16 files changed, 217 insertions(+), 12 deletions(-) diff --git a/pkg/cnservice/distributed_tae.go b/pkg/cnservice/distributed_tae.go index 3d8ddc8450ed..d019e6f9f1cd 100644 --- a/pkg/cnservice/distributed_tae.go +++ b/pkg/cnservice/distributed_tae.go @@ -76,6 +76,7 @@ func (s *service) initDistributedTAE( s.cfg.Engine.CNTransferTxnLifespanThreshold), ) pu.StorageEngine = s.storeEngine + client.SetCacheTSGetter(s.storeEngine) // cdc mp if s.cdcMp, err = mpool.NewMPool("cdc", 0, mpool.NoFixed); err != nil { diff --git a/pkg/frontend/test/engine_mock.go b/pkg/frontend/test/engine_mock.go index d38bec098d05..d614583a543d 100644 --- a/pkg/frontend/test/engine_mock.go +++ b/pkg/frontend/test/engine_mock.go @@ -455,17 +455,17 @@ func (mr *MockRelDataMockRecorder) AttachTombstones(tombstones interface{}) *gom } // BuildEmptyRelData mocks base method. -func (m *MockRelData) BuildEmptyRelData() engine.RelData { +func (m *MockRelData) BuildEmptyRelData(preAllocSize int) engine.RelData { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "BuildEmptyRelData") + ret := m.ctrl.Call(m, "BuildEmptyRelData", preAllocSize) ret0, _ := ret[0].(engine.RelData) return ret0 } // BuildEmptyRelData indicates an expected call of BuildEmptyRelData. -func (mr *MockRelDataMockRecorder) BuildEmptyRelData() *gomock.Call { +func (mr *MockRelDataMockRecorder) BuildEmptyRelData(preAllocSize interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "BuildEmptyRelData", reflect.TypeOf((*MockRelData)(nil).BuildEmptyRelData)) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "BuildEmptyRelData", reflect.TypeOf((*MockRelData)(nil).BuildEmptyRelData), preAllocSize) } // DataCnt mocks base method. @@ -1454,6 +1454,58 @@ func (mr *MockRelationMockRecorder) Write(arg0, arg1 interface{}) *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Write", reflect.TypeOf((*MockRelation)(nil).Write), arg0, arg1) } +// MockBaseReader is a mock of BaseReader interface. +type MockBaseReader struct { + ctrl *gomock.Controller + recorder *MockBaseReaderMockRecorder +} + +// MockBaseReaderMockRecorder is the mock recorder for MockBaseReader. +type MockBaseReaderMockRecorder struct { + mock *MockBaseReader +} + +// NewMockBaseReader creates a new mock instance. +func NewMockBaseReader(ctrl *gomock.Controller) *MockBaseReader { + mock := &MockBaseReader{ctrl: ctrl} + mock.recorder = &MockBaseReaderMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockBaseReader) EXPECT() *MockBaseReaderMockRecorder { + return m.recorder +} + +// Close mocks base method. +func (m *MockBaseReader) Close() error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Close") + ret0, _ := ret[0].(error) + return ret0 +} + +// Close indicates an expected call of Close. +func (mr *MockBaseReaderMockRecorder) Close() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Close", reflect.TypeOf((*MockBaseReader)(nil).Close)) +} + +// Read mocks base method. +func (m *MockBaseReader) Read(arg0 context.Context, arg1 []string, arg2 *plan.Expr, arg3 *mpool.MPool, arg4 *batch.Batch) (bool, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Read", arg0, arg1, arg2, arg3, arg4) + ret0, _ := ret[0].(bool) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// Read indicates an expected call of Read. +func (mr *MockBaseReaderMockRecorder) Read(arg0, arg1, arg2, arg3, arg4 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Read", reflect.TypeOf((*MockBaseReader)(nil).Read), arg0, arg1, arg2, arg3, arg4) +} + // MockReader is a mock of Reader interface. type MockReader struct { ctrl *gomock.Controller @@ -1844,6 +1896,20 @@ func (mr *MockEngineMockRecorder) Delete(ctx, databaseName, op interface{}) *gom return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Delete", reflect.TypeOf((*MockEngine)(nil).Delete), ctx, databaseName, op) } +// GetCacheTS mocks base method. +func (m *MockEngine) GetCacheTS() timestamp.Timestamp { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetCacheTS") + ret0, _ := ret[0].(timestamp.Timestamp) + return ret0 +} + +// GetCacheTS indicates an expected call of GetCacheTS. +func (mr *MockEngineMockRecorder) GetCacheTS() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetCacheTS", reflect.TypeOf((*MockEngine)(nil).GetCacheTS)) +} + // GetMessageCenter mocks base method. func (m *MockEngine) GetMessageCenter() any { m.ctrl.T.Helper() diff --git a/pkg/frontend/test/txn_mock.go b/pkg/frontend/test/txn_mock.go index 4a5739a33938..6f7570c506c6 100644 --- a/pkg/frontend/test/txn_mock.go +++ b/pkg/frontend/test/txn_mock.go @@ -392,6 +392,18 @@ func (mr *MockTxnClientMockRecorder) Resume() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Resume", reflect.TypeOf((*MockTxnClient)(nil).Resume)) } +// SetCacheTSGetter mocks base method. +func (m *MockTxnClient) SetCacheTSGetter(t client.CacheTSGetter) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "SetCacheTSGetter", t) +} + +// SetCacheTSGetter indicates an expected call of SetCacheTSGetter. +func (mr *MockTxnClientMockRecorder) SetCacheTSGetter(t interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetCacheTSGetter", reflect.TypeOf((*MockTxnClient)(nil).SetCacheTSGetter), t) +} + // SyncLatestCommitTS mocks base method. func (m *MockTxnClient) SyncLatestCommitTS(arg0 timestamp.Timestamp) { m.ctrl.T.Helper() @@ -1336,3 +1348,40 @@ func (mr *MockWorkspaceMockRecorder) UpdateSnapshotWriteOffset() *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UpdateSnapshotWriteOffset", reflect.TypeOf((*MockWorkspace)(nil).UpdateSnapshotWriteOffset)) } + +// MockCacheTSGetter is a mock of CacheTSGetter interface. +type MockCacheTSGetter struct { + ctrl *gomock.Controller + recorder *MockCacheTSGetterMockRecorder +} + +// MockCacheTSGetterMockRecorder is the mock recorder for MockCacheTSGetter. +type MockCacheTSGetterMockRecorder struct { + mock *MockCacheTSGetter +} + +// NewMockCacheTSGetter creates a new mock instance. +func NewMockCacheTSGetter(ctrl *gomock.Controller) *MockCacheTSGetter { + mock := &MockCacheTSGetter{ctrl: ctrl} + mock.recorder = &MockCacheTSGetterMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockCacheTSGetter) EXPECT() *MockCacheTSGetterMockRecorder { + return m.recorder +} + +// GetCacheTS mocks base method. +func (m *MockCacheTSGetter) GetCacheTS() timestamp.Timestamp { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetCacheTS") + ret0, _ := ret[0].(timestamp.Timestamp) + return ret0 +} + +// GetCacheTS indicates an expected call of GetCacheTS. +func (mr *MockCacheTSGetterMockRecorder) GetCacheTS() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetCacheTS", reflect.TypeOf((*MockCacheTSGetter)(nil).GetCacheTS)) +} diff --git a/pkg/txn/client/client.go b/pkg/txn/client/client.go index 3b8c32ca6775..ab2552c0938d 100644 --- a/pkg/txn/client/client.go +++ b/pkg/txn/client/client.go @@ -173,6 +173,9 @@ type txnClient struct { enableRefreshExpression bool txnOpenedCallbacks []func(TxnOperator) + // cacheTSGetter is the interface which gets the start TS of catalog. + cacheTSGetter CacheTSGetter + // normalStateNoWait is used to control if wait for the txn client's // state to be normal. If it is false, which is default value, wait // until the txn client's state to be normal; otherwise, if it is true, @@ -228,6 +231,11 @@ func (client *txnClient) GetState() TxnState { } } +// SetCacheTSGetter implements the TxnClient interface. +func (client *txnClient) SetCacheTSGetter(t CacheTSGetter) { + client.cacheTSGetter = t +} + // NewTxnClient create a txn client with TxnSender and Options func NewTxnClient( sid string, @@ -328,7 +336,8 @@ func (client *txnClient) doCreateTxn( client.closeTxn, ) - if err := client.openTxn(op); err != nil { + wait, err := client.openTxn(op) + if err != nil { return nil, err } @@ -347,6 +356,12 @@ func (client *txnClient) doCreateTxn( return nil, err } } + // If this transaction waits for the TN logtail service, + // we should check ts of catalog. If the ts is greater than + // snapshot ts, update snapshot ts to catalog ts. + if wait { + op.TryPushSnapshot(client.cacheTSGetter.GetCacheTS()) + } util.LogTxnSnapshotTimestamp( client.logger, @@ -500,7 +515,9 @@ func (client *txnClient) GetSyncLatestCommitTSTimes() uint64 { return client.atomic.forceSyncCommitTimes.Load() } -func (client *txnClient) openTxn(op *txnOperator) error { +// openTxn tries to open transaction. The first return value is boolean, which indicates +// if the transaction waits for the status of TN logtail service. +func (client *txnClient) openTxn(op *txnOperator) (bool, error) { client.mu.Lock() defer func() { v2.TxnActiveQueueSizeGauge.Set(float64(len(client.mu.activeTxns))) @@ -508,15 +525,17 @@ func (client *txnClient) openTxn(op *txnOperator) error { client.mu.Unlock() }() + var wait bool if !op.opts.skipWaitPushClient { for client.mu.state == paused { if client.normalStateNoWait { - return moerr.NewInternalErrorNoCtx("cn service is not ready, retry later") + return false, moerr.NewInternalErrorNoCtx("cn service is not ready, retry later") } if op.opts.options.WaitPausedDisabled() { - return moerr.NewInvalidStateNoCtx("txn client is in pause state") + return false, moerr.NewInvalidStateNoCtx("txn client is in pause state") } + wait = true client.logger.Warn("txn client is in pause state, wait for it to be ready", zap.String("txn ID", hex.EncodeToString(op.reset.txnID))) @@ -531,19 +550,19 @@ func (client *txnClient) openTxn(op *txnOperator) error { if !op.opts.options.UserTxn() || client.mu.users < client.maxActiveTxn { client.addActiveTxnLocked(op) - return nil + return wait, nil } var cancelC chan struct{} if client.timestampWaiter != nil { cancelC = client.timestampWaiter.CancelC() if cancelC == nil { - return moerr.NewWaiterPausedNoCtx() + return false, moerr.NewWaiterPausedNoCtx() } } op.reset.waiter = newWaiter(timestamp.Timestamp{}, cancelC) op.reset.waiter.ref() client.mu.waitActiveTxns = append(client.mu.waitActiveTxns, op) - return nil + return wait, nil } func (client *txnClient) closeTxn(event TxnEvent) { diff --git a/pkg/txn/client/client_test.go b/pkg/txn/client/client_test.go index 1453331bc295..d3ca4d2cb89e 100644 --- a/pkg/txn/client/client_test.go +++ b/pkg/txn/client/client_test.go @@ -278,5 +278,6 @@ func TestOpenTxnWithWaitPausedDisabled(t *testing.T) { op := &txnOperator{} op.opts.options = op.opts.options.WithDisableWaitPaused() - require.Error(t, c.openTxn(op)) + _, err := c.openTxn(op) + require.Error(t, err) } diff --git a/pkg/txn/client/operator.go b/pkg/txn/client/operator.go index fe44fff9b99e..889b45ab4193 100644 --- a/pkg/txn/client/operator.go +++ b/pkg/txn/client/operator.go @@ -530,6 +530,16 @@ func (tc *txnOperator) UpdateSnapshot( return err } +// TryPushSnapshot tries to push the snapshot ts byb the cache ts, which is +// from catalog. If the current snapshot ts is smaller than cache ts, push it. +func (tc *txnOperator) TryPushSnapshot(cacheTS timestamp.Timestamp) { + tc.mu.Lock() + defer tc.mu.Unlock() + if cacheTS.Greater(tc.mu.txn.SnapshotTS) { + tc.mu.txn.SnapshotTS = cacheTS + } +} + func (tc *txnOperator) ApplySnapshot(data []byte) error { if !tc.opts.coordinator { tc.logger.Fatal("apply snapshot on non-coordinator txn operator") diff --git a/pkg/txn/client/operator_test.go b/pkg/txn/client/operator_test.go index 545989d597ff..ba5becb3b897 100644 --- a/pkg/txn/client/operator_test.go +++ b/pkg/txn/client/operator_test.go @@ -650,6 +650,23 @@ func TestBase(t *testing.T) { ) } +func TestPushSnapshot(t *testing.T) { + runOperatorTests(t, func( + ctx context.Context, + tc *txnOperator, + _ *testTxnSender, + ) { + require.NotNil(t, tc.TxnRef()) + require.Equal(t, tc.Txn().SnapshotTS, tc.SnapshotTS()) + cacheTS := timestamp.Timestamp{ + PhysicalTime: 10, + LogicalTime: 10, + } + tc.TryPushSnapshot(cacheTS) + require.Equal(t, tc.SnapshotTS(), cacheTS) + }) +} + func runOperatorTests( t *testing.T, tc func(context.Context, *txnOperator, *testTxnSender), diff --git a/pkg/txn/client/types.go b/pkg/txn/client/types.go index e05207c28c33..3418a3b10f0d 100644 --- a/pkg/txn/client/types.go +++ b/pkg/txn/client/types.go @@ -96,6 +96,12 @@ type TxnClient interface { IterTxns(func(TxnOverview) bool) // GetState returns the current state of txn client. GetState() TxnState + // SetCacheTSGetter set the CacheTSGetter to the client. + // The CacheTSGetter gets the start ts of catalog. When TN first + // starts or restarts, some transactions wait for the TN ready, + // the snapshot TS of those transactions should be after the + // start ts of catalog. + SetCacheTSGetter(t CacheTSGetter) } type TxnState struct { @@ -338,3 +344,7 @@ func (e TxnEvent) Committed() bool { func (e TxnEvent) Aborted() bool { return e.Txn.Status == txn.TxnStatus_Aborted } + +type CacheTSGetter interface { + GetCacheTS() timestamp.Timestamp +} diff --git a/pkg/txn/storage/memorystorage/storage_txn_client.go b/pkg/txn/storage/memorystorage/storage_txn_client.go index 553953b6a294..37304a92fcae 100644 --- a/pkg/txn/storage/memorystorage/storage_txn_client.go +++ b/pkg/txn/storage/memorystorage/storage_txn_client.go @@ -76,6 +76,10 @@ func (s *StorageTxnClient) GetState() client.TxnState { panic("unimplemented") } +func (s *StorageTxnClient) SetCacheTSGetter(t client.CacheTSGetter) { + panic("unimplemented") +} + func (s *StorageTxnClient) IterTxns(func(client.TxnOverview) bool) { panic("unimplemented") } diff --git a/pkg/vm/engine/disttae/cache/catalog.go b/pkg/vm/engine/disttae/cache/catalog.go index df44046c0e64..cbd09d3e9601 100644 --- a/pkg/vm/engine/disttae/cache/catalog.go +++ b/pkg/vm/engine/disttae/cache/catalog.go @@ -84,6 +84,12 @@ func (cc *CatalogCache) CanServe(ts types.TS) bool { return ts.GE(&cc.mu.start) && ts.LE(&cc.mu.end) } +func (cc *CatalogCache) GetStartTs() types.TS { + cc.mu.Lock() + defer cc.mu.Unlock() + return cc.mu.start +} + type GCReport struct { TScanItem int TStaleItem int diff --git a/pkg/vm/engine/disttae/db.go b/pkg/vm/engine/disttae/db.go index 85a17aeb54e5..25ac2aac714c 100644 --- a/pkg/vm/engine/disttae/db.go +++ b/pkg/vm/engine/disttae/db.go @@ -369,6 +369,10 @@ func (e *Engine) GetLatestCatalogCache() *cache.CatalogCache { return e.catalog } +func (e *Engine) GetCacheTS() timestamp.Timestamp { + return e.catalog.GetStartTs().ToTimestamp() +} + func requestSnapshotRead(ctx context.Context, tbl *txnTable, snapshot *types.TS) (resp any, err error) { whichTN := func(string) ([]uint64, error) { return nil, nil } payload := func(tnShardID uint64, parameter string, proc *process.Process) ([]byte, error) { diff --git a/pkg/vm/engine/entire_engine.go b/pkg/vm/engine/entire_engine.go index a62b68909e6c..a7f1552d29e1 100644 --- a/pkg/vm/engine/entire_engine.go +++ b/pkg/vm/engine/entire_engine.go @@ -117,3 +117,7 @@ func (e *EntireEngine) GetMessageCenter() any { func (e *EntireEngine) GetService() string { return e.Engine.GetService() } + +func (e *EntireEngine) GetCacheTS() timestamp.Timestamp { + return e.Engine.GetCacheTS() +} diff --git a/pkg/vm/engine/entire_engine_test.go b/pkg/vm/engine/entire_engine_test.go index f298f82e47f4..bf338ffcb378 100644 --- a/pkg/vm/engine/entire_engine_test.go +++ b/pkg/vm/engine/entire_engine_test.go @@ -350,6 +350,10 @@ func (e *testEngine) GetService() string { return "" } +func (e *testEngine) GetCacheTS() timestamp.Timestamp { + return timestamp.Timestamp{} +} + func (e *testEngine) LatestLogtailAppliedTime() timestamp.Timestamp { return timestamp.Timestamp{} } diff --git a/pkg/vm/engine/memoryengine/binded.go b/pkg/vm/engine/memoryengine/binded.go index b9f48cd1803c..1a1b2a82a230 100644 --- a/pkg/vm/engine/memoryengine/binded.go +++ b/pkg/vm/engine/memoryengine/binded.go @@ -120,3 +120,7 @@ func (b *BindedEngine) GetMessageCenter() any { func (b *BindedEngine) GetService() string { return b.engine.GetService() } + +func (b *BindedEngine) GetCacheTS() timestamp.Timestamp { + return b.engine.GetCacheTS() +} diff --git a/pkg/vm/engine/memoryengine/engine.go b/pkg/vm/engine/memoryengine/engine.go index 35cfc8cc5f93..714831aa3d99 100644 --- a/pkg/vm/engine/memoryengine/engine.go +++ b/pkg/vm/engine/memoryengine/engine.go @@ -262,3 +262,7 @@ func getTNServices(cluster clusterservice.MOCluster) []metadata.TNService { func (e *Engine) GetMessageCenter() any { return nil } + +func (e *Engine) GetCacheTS() timestamp.Timestamp { + return timestamp.Timestamp{} +} diff --git a/pkg/vm/engine/types.go b/pkg/vm/engine/types.go index 750e7d94fd36..d3b0d0b24a5c 100644 --- a/pkg/vm/engine/types.go +++ b/pkg/vm/engine/types.go @@ -989,6 +989,8 @@ type Engine interface { GetService() string LatestLogtailAppliedTime() timestamp.Timestamp + + GetCacheTS() timestamp.Timestamp } type VectorPool interface {