Skip to content

Commit

Permalink
[bug] txn client: push the txn's snapshot ts
Browse files Browse the repository at this point in the history
  • Loading branch information
volgariver6 committed Nov 4, 2024
1 parent 48c7e16 commit 5cfa70e
Show file tree
Hide file tree
Showing 15 changed files with 200 additions and 12 deletions.
1 change: 1 addition & 0 deletions pkg/cnservice/distributed_tae.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ func (s *service) initDistributedTAE(
s.cfg.Engine.CNTransferTxnLifespanThreshold),
)
pu.StorageEngine = s.storeEngine
client.SetCacheTSGetter(s.storeEngine)

// cdc mp
if s.cdcMp, err = mpool.NewMPool("cdc", 0, mpool.NoFixed); err != nil {
Expand Down
74 changes: 70 additions & 4 deletions pkg/frontend/test/engine_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

49 changes: 49 additions & 0 deletions pkg/frontend/test/txn_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

33 changes: 26 additions & 7 deletions pkg/txn/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,9 @@ type txnClient struct {
enableRefreshExpression bool
txnOpenedCallbacks []func(TxnOperator)

// cacheTSGetter is the interface which gets the start TS of catalog.
cacheTSGetter CacheTSGetter

// normalStateNoWait is used to control if wait for the txn client's
// state to be normal. If it is false, which is default value, wait
// until the txn client's state to be normal; otherwise, if it is true,
Expand Down Expand Up @@ -228,6 +231,11 @@ func (client *txnClient) GetState() TxnState {
}
}

// SetCacheTSGetter implements the TxnClient interface.
func (client *txnClient) SetCacheTSGetter(t CacheTSGetter) {
client.cacheTSGetter = t
}

// NewTxnClient create a txn client with TxnSender and Options
func NewTxnClient(
sid string,
Expand Down Expand Up @@ -328,7 +336,8 @@ func (client *txnClient) doCreateTxn(
client.closeTxn,
)

if err := client.openTxn(op); err != nil {
wait, err := client.openTxn(op)
if err != nil {
return nil, err
}

Expand All @@ -347,6 +356,12 @@ func (client *txnClient) doCreateTxn(
return nil, err
}
}
// If this transaction waits for the TN logtail service,
// we should check ts of catalog. If the ts is greater than
// snapshot ts, update snapshot ts to catalog ts.
if wait {
op.TryPushSnapshot(client.cacheTSGetter.GetCacheTS())
}

util.LogTxnSnapshotTimestamp(
client.logger,
Expand Down Expand Up @@ -500,23 +515,27 @@ func (client *txnClient) GetSyncLatestCommitTSTimes() uint64 {
return client.atomic.forceSyncCommitTimes.Load()
}

func (client *txnClient) openTxn(op *txnOperator) error {
// openTxn tries to open transaction. The first return value is boolean, which indicates
// if the transaction waits for the status of TN logtail service.
func (client *txnClient) openTxn(op *txnOperator) (bool, error) {
client.mu.Lock()
defer func() {
v2.TxnActiveQueueSizeGauge.Set(float64(len(client.mu.activeTxns)))
v2.TxnWaitActiveQueueSizeGauge.Set(float64(len(client.mu.waitActiveTxns)))
client.mu.Unlock()
}()

var wait bool
if !op.opts.skipWaitPushClient {
for client.mu.state == paused {
if client.normalStateNoWait {
return moerr.NewInternalErrorNoCtx("cn service is not ready, retry later")
return false, moerr.NewInternalErrorNoCtx("cn service is not ready, retry later")
}

if op.opts.options.WaitPausedDisabled() {
return moerr.NewInvalidStateNoCtx("txn client is in pause state")
return false, moerr.NewInvalidStateNoCtx("txn client is in pause state")
}
wait = true

client.logger.Warn("txn client is in pause state, wait for it to be ready",
zap.String("txn ID", hex.EncodeToString(op.reset.txnID)))
Expand All @@ -531,19 +550,19 @@ func (client *txnClient) openTxn(op *txnOperator) error {
if !op.opts.options.UserTxn() ||
client.mu.users < client.maxActiveTxn {
client.addActiveTxnLocked(op)
return nil
return wait, nil
}
var cancelC chan struct{}
if client.timestampWaiter != nil {
cancelC = client.timestampWaiter.CancelC()
if cancelC == nil {
return moerr.NewWaiterPausedNoCtx()
return false, moerr.NewWaiterPausedNoCtx()
}
}
op.reset.waiter = newWaiter(timestamp.Timestamp{}, cancelC)
op.reset.waiter.ref()
client.mu.waitActiveTxns = append(client.mu.waitActiveTxns, op)
return nil
return wait, nil
}

func (client *txnClient) closeTxn(event TxnEvent) {
Expand Down
3 changes: 2 additions & 1 deletion pkg/txn/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,5 +278,6 @@ func TestOpenTxnWithWaitPausedDisabled(t *testing.T) {
op := &txnOperator{}
op.opts.options = op.opts.options.WithDisableWaitPaused()

require.Error(t, c.openTxn(op))
_, err := c.openTxn(op)
require.Error(t, err)
}
10 changes: 10 additions & 0 deletions pkg/txn/client/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -530,6 +530,16 @@ func (tc *txnOperator) UpdateSnapshot(
return err
}

// TryPushSnapshot tries to push the snapshot ts byb the cache ts, which is
// from catalog. If the current snapshot ts is smaller than cache ts, push it.
func (tc *txnOperator) TryPushSnapshot(cacheTS timestamp.Timestamp) {
tc.mu.Lock()
defer tc.mu.Unlock()
if cacheTS.Greater(tc.mu.txn.SnapshotTS) {
tc.mu.txn.SnapshotTS = cacheTS
}
}

func (tc *txnOperator) ApplySnapshot(data []byte) error {
if !tc.opts.coordinator {
tc.logger.Fatal("apply snapshot on non-coordinator txn operator")
Expand Down
10 changes: 10 additions & 0 deletions pkg/txn/client/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,12 @@ type TxnClient interface {
IterTxns(func(TxnOverview) bool)
// GetState returns the current state of txn client.
GetState() TxnState
// SetCacheTSGetter set the CacheTSGetter to the client.
// The CacheTSGetter gets the start ts of catalog. When TN first
// starts or restarts, some transactions wait for the TN ready,
// the snapshot TS of those transactions should be after the
// start ts of catalog.
SetCacheTSGetter(t CacheTSGetter)
}

type TxnState struct {
Expand Down Expand Up @@ -338,3 +344,7 @@ func (e TxnEvent) Committed() bool {
func (e TxnEvent) Aborted() bool {
return e.Txn.Status == txn.TxnStatus_Aborted
}

type CacheTSGetter interface {
GetCacheTS() timestamp.Timestamp
}
4 changes: 4 additions & 0 deletions pkg/txn/storage/memorystorage/storage_txn_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,10 @@ func (s *StorageTxnClient) GetState() client.TxnState {
panic("unimplemented")
}

func (s *StorageTxnClient) SetCacheTSGetter(t client.CacheTSGetter) {
panic("unimplemented")
}

func (s *StorageTxnClient) IterTxns(func(client.TxnOverview) bool) {
panic("unimplemented")
}
Expand Down
6 changes: 6 additions & 0 deletions pkg/vm/engine/disttae/cache/catalog.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,12 @@ func (cc *CatalogCache) CanServe(ts types.TS) bool {
return ts.GE(&cc.mu.start) && ts.LE(&cc.mu.end)
}

func (cc *CatalogCache) GetStartTs() types.TS {
cc.mu.Lock()
defer cc.mu.Unlock()
return cc.mu.start
}

type GCReport struct {
TScanItem int
TStaleItem int
Expand Down
4 changes: 4 additions & 0 deletions pkg/vm/engine/disttae/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -369,6 +369,10 @@ func (e *Engine) GetLatestCatalogCache() *cache.CatalogCache {
return e.catalog
}

func (e *Engine) GetCacheTS() timestamp.Timestamp {
return e.catalog.GetStartTs().ToTimestamp()
}

func requestSnapshotRead(ctx context.Context, tbl *txnTable, snapshot *types.TS) (resp any, err error) {
whichTN := func(string) ([]uint64, error) { return nil, nil }
payload := func(tnShardID uint64, parameter string, proc *process.Process) ([]byte, error) {
Expand Down
4 changes: 4 additions & 0 deletions pkg/vm/engine/entire_engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,3 +117,7 @@ func (e *EntireEngine) GetMessageCenter() any {
func (e *EntireEngine) GetService() string {
return e.Engine.GetService()
}

func (e *EntireEngine) GetCacheTS() timestamp.Timestamp {
return e.Engine.GetCacheTS()
}
4 changes: 4 additions & 0 deletions pkg/vm/engine/entire_engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,10 @@ func (e *testEngine) GetService() string {
return ""
}

func (e *testEngine) GetCacheTS() timestamp.Timestamp {
return timestamp.Timestamp{}
}

func (e *testEngine) LatestLogtailAppliedTime() timestamp.Timestamp {
return timestamp.Timestamp{}
}
Expand Down
4 changes: 4 additions & 0 deletions pkg/vm/engine/memoryengine/binded.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,3 +120,7 @@ func (b *BindedEngine) GetMessageCenter() any {
func (b *BindedEngine) GetService() string {
return b.engine.GetService()
}

func (b *BindedEngine) GetCacheTS() timestamp.Timestamp {
return b.engine.GetCacheTS()
}
4 changes: 4 additions & 0 deletions pkg/vm/engine/memoryengine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,3 +262,7 @@ func getTNServices(cluster clusterservice.MOCluster) []metadata.TNService {
func (e *Engine) GetMessageCenter() any {
return nil
}

func (e *Engine) GetCacheTS() timestamp.Timestamp {
return timestamp.Timestamp{}
}
2 changes: 2 additions & 0 deletions pkg/vm/engine/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -989,6 +989,8 @@ type Engine interface {
GetService() string

LatestLogtailAppliedTime() timestamp.Timestamp

GetCacheTS() timestamp.Timestamp
}

type VectorPool interface {
Expand Down

0 comments on commit 5cfa70e

Please sign in to comment.