Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[bug] txn client: push the txn's snapshot ts #19757

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pkg/cnservice/distributed_tae.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ func (s *service) initDistributedTAE(
s.cfg.Engine.CNTransferTxnLifespanThreshold),
)
pu.StorageEngine = s.storeEngine
client.SetCacheTSGetter(s.storeEngine)

// cdc mp
if s.cdcMp, err = mpool.NewMPool("cdc", 0, mpool.NoFixed); err != nil {
Expand Down
74 changes: 70 additions & 4 deletions pkg/frontend/test/engine_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

49 changes: 49 additions & 0 deletions pkg/frontend/test/txn_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

33 changes: 26 additions & 7 deletions pkg/txn/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,9 @@ type txnClient struct {
enableRefreshExpression bool
txnOpenedCallbacks []func(TxnOperator)

// cacheTSGetter is the interface which gets the start TS of catalog.
cacheTSGetter CacheTSGetter

// normalStateNoWait is used to control if wait for the txn client's
// state to be normal. If it is false, which is default value, wait
// until the txn client's state to be normal; otherwise, if it is true,
Expand Down Expand Up @@ -228,6 +231,11 @@ func (client *txnClient) GetState() TxnState {
}
}

// SetCacheTSGetter implements the TxnClient interface.
func (client *txnClient) SetCacheTSGetter(t CacheTSGetter) {
client.cacheTSGetter = t
}

// NewTxnClient create a txn client with TxnSender and Options
func NewTxnClient(
sid string,
Expand Down Expand Up @@ -328,7 +336,8 @@ func (client *txnClient) doCreateTxn(
client.closeTxn,
)

if err := client.openTxn(op); err != nil {
wait, err := client.openTxn(op)
if err != nil {
return nil, err
}

Expand All @@ -347,6 +356,12 @@ func (client *txnClient) doCreateTxn(
return nil, err
}
}
// If this transaction waits for the TN logtail service,
// we should check ts of catalog. If the ts is greater than
// snapshot ts, update snapshot ts to catalog ts.
if wait {
op.TryPushSnapshot(client.cacheTSGetter.GetCacheTS())
}

util.LogTxnSnapshotTimestamp(
client.logger,
Expand Down Expand Up @@ -500,23 +515,27 @@ func (client *txnClient) GetSyncLatestCommitTSTimes() uint64 {
return client.atomic.forceSyncCommitTimes.Load()
}

func (client *txnClient) openTxn(op *txnOperator) error {
// openTxn tries to open transaction. The first return value is boolean, which indicates
// if the transaction waits for the status of TN logtail service.
func (client *txnClient) openTxn(op *txnOperator) (bool, error) {
client.mu.Lock()
defer func() {
v2.TxnActiveQueueSizeGauge.Set(float64(len(client.mu.activeTxns)))
v2.TxnWaitActiveQueueSizeGauge.Set(float64(len(client.mu.waitActiveTxns)))
client.mu.Unlock()
}()

var wait bool
if !op.opts.skipWaitPushClient {
for client.mu.state == paused {
if client.normalStateNoWait {
return moerr.NewInternalErrorNoCtx("cn service is not ready, retry later")
return false, moerr.NewInternalErrorNoCtx("cn service is not ready, retry later")
}

if op.opts.options.WaitPausedDisabled() {
return moerr.NewInvalidStateNoCtx("txn client is in pause state")
return false, moerr.NewInvalidStateNoCtx("txn client is in pause state")
}
wait = true

client.logger.Warn("txn client is in pause state, wait for it to be ready",
zap.String("txn ID", hex.EncodeToString(op.reset.txnID)))
Expand All @@ -531,19 +550,19 @@ func (client *txnClient) openTxn(op *txnOperator) error {
if !op.opts.options.UserTxn() ||
client.mu.users < client.maxActiveTxn {
client.addActiveTxnLocked(op)
return nil
return wait, nil
}
var cancelC chan struct{}
if client.timestampWaiter != nil {
cancelC = client.timestampWaiter.CancelC()
if cancelC == nil {
return moerr.NewWaiterPausedNoCtx()
return false, moerr.NewWaiterPausedNoCtx()
}
}
op.reset.waiter = newWaiter(timestamp.Timestamp{}, cancelC)
op.reset.waiter.ref()
client.mu.waitActiveTxns = append(client.mu.waitActiveTxns, op)
return nil
return wait, nil
}

func (client *txnClient) closeTxn(event TxnEvent) {
Expand Down
3 changes: 2 additions & 1 deletion pkg/txn/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,5 +278,6 @@ func TestOpenTxnWithWaitPausedDisabled(t *testing.T) {
op := &txnOperator{}
op.opts.options = op.opts.options.WithDisableWaitPaused()

require.Error(t, c.openTxn(op))
_, err := c.openTxn(op)
require.Error(t, err)
}
10 changes: 10 additions & 0 deletions pkg/txn/client/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -530,6 +530,16 @@ func (tc *txnOperator) UpdateSnapshot(
return err
}

// TryPushSnapshot tries to push the snapshot ts byb the cache ts, which is
// from catalog. If the current snapshot ts is smaller than cache ts, push it.
func (tc *txnOperator) TryPushSnapshot(cacheTS timestamp.Timestamp) {
tc.mu.Lock()
defer tc.mu.Unlock()
if cacheTS.Greater(tc.mu.txn.SnapshotTS) {
tc.mu.txn.SnapshotTS = cacheTS
}
}

func (tc *txnOperator) ApplySnapshot(data []byte) error {
if !tc.opts.coordinator {
tc.logger.Fatal("apply snapshot on non-coordinator txn operator")
Expand Down
17 changes: 17 additions & 0 deletions pkg/txn/client/operator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -650,6 +650,23 @@ func TestBase(t *testing.T) {
)
}

func TestPushSnapshot(t *testing.T) {
runOperatorTests(t, func(
ctx context.Context,
tc *txnOperator,
_ *testTxnSender,
) {
require.NotNil(t, tc.TxnRef())
require.Equal(t, tc.Txn().SnapshotTS, tc.SnapshotTS())
cacheTS := timestamp.Timestamp{
PhysicalTime: 10,
LogicalTime: 10,
}
tc.TryPushSnapshot(cacheTS)
require.Equal(t, tc.SnapshotTS(), cacheTS)
})
}

func runOperatorTests(
t *testing.T,
tc func(context.Context, *txnOperator, *testTxnSender),
Expand Down
10 changes: 10 additions & 0 deletions pkg/txn/client/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,12 @@ type TxnClient interface {
IterTxns(func(TxnOverview) bool)
// GetState returns the current state of txn client.
GetState() TxnState
// SetCacheTSGetter set the CacheTSGetter to the client.
// The CacheTSGetter gets the start ts of catalog. When TN first
// starts or restarts, some transactions wait for the TN ready,
// the snapshot TS of those transactions should be after the
// start ts of catalog.
SetCacheTSGetter(t CacheTSGetter)
}

type TxnState struct {
Expand Down Expand Up @@ -338,3 +344,7 @@ func (e TxnEvent) Committed() bool {
func (e TxnEvent) Aborted() bool {
return e.Txn.Status == txn.TxnStatus_Aborted
}

type CacheTSGetter interface {
GetCacheTS() timestamp.Timestamp
}
4 changes: 4 additions & 0 deletions pkg/txn/storage/memorystorage/storage_txn_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,10 @@ func (s *StorageTxnClient) GetState() client.TxnState {
panic("unimplemented")
}

func (s *StorageTxnClient) SetCacheTSGetter(t client.CacheTSGetter) {
panic("unimplemented")
}

func (s *StorageTxnClient) IterTxns(func(client.TxnOverview) bool) {
panic("unimplemented")
}
Expand Down
6 changes: 6 additions & 0 deletions pkg/vm/engine/disttae/cache/catalog.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,12 @@ func (cc *CatalogCache) CanServe(ts types.TS) bool {
return ts.GE(&cc.mu.start) && ts.LE(&cc.mu.end)
}

func (cc *CatalogCache) GetStartTs() types.TS {
cc.mu.Lock()
defer cc.mu.Unlock()
return cc.mu.start
}

type GCReport struct {
TScanItem int
TStaleItem int
Expand Down
4 changes: 4 additions & 0 deletions pkg/vm/engine/disttae/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -369,6 +369,10 @@ func (e *Engine) GetLatestCatalogCache() *cache.CatalogCache {
return e.catalog
}

func (e *Engine) GetCacheTS() timestamp.Timestamp {
return e.catalog.GetStartTs().ToTimestamp()
}

func requestSnapshotRead(ctx context.Context, tbl *txnTable, snapshot *types.TS) (resp any, err error) {
whichTN := func(string) ([]uint64, error) { return nil, nil }
payload := func(tnShardID uint64, parameter string, proc *process.Process) ([]byte, error) {
Expand Down
4 changes: 4 additions & 0 deletions pkg/vm/engine/entire_engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,3 +117,7 @@ func (e *EntireEngine) GetMessageCenter() any {
func (e *EntireEngine) GetService() string {
return e.Engine.GetService()
}

func (e *EntireEngine) GetCacheTS() timestamp.Timestamp {
return e.Engine.GetCacheTS()
}
4 changes: 4 additions & 0 deletions pkg/vm/engine/entire_engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,10 @@ func (e *testEngine) GetService() string {
return ""
}

func (e *testEngine) GetCacheTS() timestamp.Timestamp {
return timestamp.Timestamp{}
}

func (e *testEngine) LatestLogtailAppliedTime() timestamp.Timestamp {
return timestamp.Timestamp{}
}
Expand Down
Loading
Loading