diff --git a/examples/gcworker/go.mod b/examples/gcworker/go.mod index 1da7b81d4..2159df803 100644 --- a/examples/gcworker/go.mod +++ b/examples/gcworker/go.mod @@ -30,7 +30,7 @@ require ( github.com/prometheus/procfs v0.12.0 // indirect github.com/remyoudompheng/bigfft v0.0.0-20200410134404-eec4a21b6bb0 // indirect github.com/tiancaiamao/gp v0.0.0-20221230034425-4025bc8a4d4a // indirect - github.com/tikv/pd/client v0.0.0-20240229062430-b207e514ece1 // indirect + github.com/tikv/pd/client v0.0.0-20240320081713-c00c42e77b31 // indirect github.com/twmb/murmur3 v1.1.3 // indirect go.etcd.io/etcd/api/v3 v3.5.10 // indirect go.etcd.io/etcd/client/pkg/v3 v3.5.10 // indirect diff --git a/examples/rawkv/go.mod b/examples/rawkv/go.mod index a0ea6f67b..45a1c4d08 100644 --- a/examples/rawkv/go.mod +++ b/examples/rawkv/go.mod @@ -30,7 +30,7 @@ require ( github.com/prometheus/procfs v0.12.0 // indirect github.com/remyoudompheng/bigfft v0.0.0-20200410134404-eec4a21b6bb0 // indirect github.com/tiancaiamao/gp v0.0.0-20221230034425-4025bc8a4d4a // indirect - github.com/tikv/pd/client v0.0.0-20240229062430-b207e514ece1 // indirect + github.com/tikv/pd/client v0.0.0-20240320081713-c00c42e77b31 // indirect github.com/twmb/murmur3 v1.1.3 // indirect go.etcd.io/etcd/api/v3 v3.5.10 // indirect go.etcd.io/etcd/client/pkg/v3 v3.5.10 // indirect diff --git a/examples/txnkv/1pc_txn/go.mod b/examples/txnkv/1pc_txn/go.mod index 70825849b..224ff0bb6 100644 --- a/examples/txnkv/1pc_txn/go.mod +++ b/examples/txnkv/1pc_txn/go.mod @@ -30,7 +30,7 @@ require ( github.com/prometheus/procfs v0.12.0 // indirect github.com/remyoudompheng/bigfft v0.0.0-20200410134404-eec4a21b6bb0 // indirect github.com/tiancaiamao/gp v0.0.0-20221230034425-4025bc8a4d4a // indirect - github.com/tikv/pd/client v0.0.0-20240229062430-b207e514ece1 // indirect + github.com/tikv/pd/client v0.0.0-20240320081713-c00c42e77b31 // indirect github.com/twmb/murmur3 v1.1.3 // indirect go.etcd.io/etcd/api/v3 v3.5.10 // indirect go.etcd.io/etcd/client/pkg/v3 v3.5.10 // indirect diff --git a/examples/txnkv/async_commit/go.mod b/examples/txnkv/async_commit/go.mod index 046d68df6..1253bbde7 100644 --- a/examples/txnkv/async_commit/go.mod +++ b/examples/txnkv/async_commit/go.mod @@ -30,7 +30,7 @@ require ( github.com/prometheus/procfs v0.12.0 // indirect github.com/remyoudompheng/bigfft v0.0.0-20200410134404-eec4a21b6bb0 // indirect github.com/tiancaiamao/gp v0.0.0-20221230034425-4025bc8a4d4a // indirect - github.com/tikv/pd/client v0.0.0-20240229062430-b207e514ece1 // indirect + github.com/tikv/pd/client v0.0.0-20240320081713-c00c42e77b31 // indirect github.com/twmb/murmur3 v1.1.3 // indirect go.etcd.io/etcd/api/v3 v3.5.10 // indirect go.etcd.io/etcd/client/pkg/v3 v3.5.10 // indirect diff --git a/examples/txnkv/delete_range/go.mod b/examples/txnkv/delete_range/go.mod index 704dd51da..504dd55ad 100644 --- a/examples/txnkv/delete_range/go.mod +++ b/examples/txnkv/delete_range/go.mod @@ -30,7 +30,7 @@ require ( github.com/prometheus/procfs v0.12.0 // indirect github.com/remyoudompheng/bigfft v0.0.0-20200410134404-eec4a21b6bb0 // indirect github.com/tiancaiamao/gp v0.0.0-20221230034425-4025bc8a4d4a // indirect - github.com/tikv/pd/client v0.0.0-20240229062430-b207e514ece1 // indirect + github.com/tikv/pd/client v0.0.0-20240320081713-c00c42e77b31 // indirect github.com/twmb/murmur3 v1.1.3 // indirect go.etcd.io/etcd/api/v3 v3.5.10 // indirect go.etcd.io/etcd/client/pkg/v3 v3.5.10 // indirect diff --git a/examples/txnkv/go.mod b/examples/txnkv/go.mod index bb1508e21..59cb804d7 100644 --- a/examples/txnkv/go.mod +++ b/examples/txnkv/go.mod @@ -30,7 +30,7 @@ require ( github.com/prometheus/procfs v0.12.0 // indirect github.com/remyoudompheng/bigfft v0.0.0-20200410134404-eec4a21b6bb0 // indirect github.com/tiancaiamao/gp v0.0.0-20221230034425-4025bc8a4d4a // indirect - github.com/tikv/pd/client v0.0.0-20240229062430-b207e514ece1 // indirect + github.com/tikv/pd/client v0.0.0-20240320081713-c00c42e77b31 // indirect github.com/twmb/murmur3 v1.1.3 // indirect go.etcd.io/etcd/api/v3 v3.5.10 // indirect go.etcd.io/etcd/client/pkg/v3 v3.5.10 // indirect diff --git a/examples/txnkv/pessimistic_txn/go.mod b/examples/txnkv/pessimistic_txn/go.mod index 491287ca2..4059cefa2 100644 --- a/examples/txnkv/pessimistic_txn/go.mod +++ b/examples/txnkv/pessimistic_txn/go.mod @@ -30,7 +30,7 @@ require ( github.com/prometheus/procfs v0.12.0 // indirect github.com/remyoudompheng/bigfft v0.0.0-20200410134404-eec4a21b6bb0 // indirect github.com/tiancaiamao/gp v0.0.0-20221230034425-4025bc8a4d4a // indirect - github.com/tikv/pd/client v0.0.0-20240229062430-b207e514ece1 // indirect + github.com/tikv/pd/client v0.0.0-20240320081713-c00c42e77b31 // indirect github.com/twmb/murmur3 v1.1.3 // indirect go.etcd.io/etcd/api/v3 v3.5.10 // indirect go.etcd.io/etcd/client/pkg/v3 v3.5.10 // indirect diff --git a/examples/txnkv/unsafedestoryrange/go.mod b/examples/txnkv/unsafedestoryrange/go.mod index 39450ee27..202531c6d 100644 --- a/examples/txnkv/unsafedestoryrange/go.mod +++ b/examples/txnkv/unsafedestoryrange/go.mod @@ -30,7 +30,7 @@ require ( github.com/prometheus/procfs v0.12.0 // indirect github.com/remyoudompheng/bigfft v0.0.0-20200410134404-eec4a21b6bb0 // indirect github.com/tiancaiamao/gp v0.0.0-20221230034425-4025bc8a4d4a // indirect - github.com/tikv/pd/client v0.0.0-20240229062430-b207e514ece1 // indirect + github.com/tikv/pd/client v0.0.0-20240320081713-c00c42e77b31 // indirect github.com/twmb/murmur3 v1.1.3 // indirect go.etcd.io/etcd/api/v3 v3.5.10 // indirect go.etcd.io/etcd/client/pkg/v3 v3.5.10 // indirect diff --git a/txnkv/transaction/2pc.go b/txnkv/transaction/2pc.go index 824a45dec..ccf972f1e 100644 --- a/txnkv/transaction/2pc.go +++ b/txnkv/transaction/2pc.go @@ -468,16 +468,14 @@ func (c *PlainMutations) AppendMutation(mutation PlainMutation) { // newTwoPhaseCommitter creates a twoPhaseCommitter. func newTwoPhaseCommitter(txn *KVTxn, sessionID uint64) (*twoPhaseCommitter, error) { committer := &twoPhaseCommitter{ - store: txn.store, - txn: txn, - startTS: txn.StartTS(), - sessionID: sessionID, - regionTxnSize: map[uint64]int{}, - isPessimistic: txn.IsPessimistic(), - binlog: txn.binlog, - diskFullOpt: kvrpcpb.DiskFullOpt_NotAllowedOnFull, - resourceGroupName: txn.resourceGroupName, - } + store: txn.store, + txn: txn, + startTS: txn.StartTS(), + sessionID: sessionID, + regionTxnSize: map[uint64]int{}, + isPessimistic: txn.IsPessimistic(), + } + txn.commitActionContext.applyToCommitter(committer) return committer, nil } diff --git a/txnkv/transaction/txn.go b/txnkv/transaction/txn.go index 890a86d42..134f20c09 100644 --- a/txnkv/transaction/txn.go +++ b/txnkv/transaction/txn.go @@ -133,32 +133,13 @@ type KVTxn struct { valid bool - // schemaVer is the infoSchema fetched at startTS. - schemaVer SchemaVer // commitCallback is called after current transaction gets committed commitCallback func(info string, err error) - binlog BinlogExecutor - schemaLeaseChecker SchemaLeaseChecker - syncLog bool - priority txnutil.Priority - isPessimistic bool - enableAsyncCommit bool - enable1PC bool - causalConsistency bool - scope string - kvFilter KVFilter - resourceGroupTag []byte - resourceGroupTagger tikvrpc.ResourceGroupTagger // use this when resourceGroupTag is nil - diskFullOpt kvrpcpb.DiskFullOpt - txnSource uint64 - commitTSUpperBoundCheck func(uint64) bool - // interceptor is used to decorate the RPC request logic related to the txn. - interceptor interceptor.RPCInterceptor - assertionLevel kvrpcpb.AssertionLevel - *util.RequestSource - // resourceGroupName is the name of tenant resource group. - resourceGroupName string + isPessimistic bool + enableAsyncCommit bool + enable1PC bool + *commitActionContext aggressiveLockingContext *aggressiveLockingContext aggressiveLockingDirty atomic.Bool @@ -179,12 +160,11 @@ func NewTiKVTxn(store kvstore, snapshot *txnsnapshot.KVSnapshot, startTS uint64, startTime: time.Now(), valid: true, vars: tikv.DefaultVars, - scope: options.TxnScope, enableAsyncCommit: cfg.EnableAsyncCommit, enable1PC: cfg.Enable1PC, - diskFullOpt: kvrpcpb.DiskFullOpt_NotAllowedOnFull, - RequestSource: snapshot.RequestSource, } + newTiKVTxn.commitActionContext = defaultCommitActionContext(newTiKVTxn) + newTiKVTxn.commitActionContext.setScope(options.TxnScope) if !options.PipelinedMemDB { newTiKVTxn.us = unionstore.NewUnionStore(unionstore.NewMemDBWithContext(), snapshot) return newTiKVTxn, nil @@ -270,12 +250,12 @@ func (txn *KVTxn) Delete(k []byte) error { // SetSchemaLeaseChecker sets a hook to check schema version. func (txn *KVTxn) SetSchemaLeaseChecker(checker SchemaLeaseChecker) { - txn.schemaLeaseChecker = checker + txn.commitActionContext.setSchemaLeaseChecker(checker) } // EnableForceSyncLog indicates tikv to always sync log for the transaction. func (txn *KVTxn) EnableForceSyncLog() { - txn.syncLog = true + txn.commitActionContext.setForceSyncLog(true) } // SetPessimistic indicates if the transaction should use pessimictic lock. @@ -289,59 +269,47 @@ func (txn *KVTxn) SetPessimistic(b bool) { // SetSchemaVer updates schema version to validate transaction. func (txn *KVTxn) SetSchemaVer(schemaVer SchemaVer) { txn.schemaVer = schemaVer + txn.commitActionContext.setSchemaVer(schemaVer) } // SetPriority sets the priority for both write and read. func (txn *KVTxn) SetPriority(pri txnutil.Priority) { - txn.priority = pri txn.GetSnapshot().SetPriority(pri) + txn.commitActionContext.setPriority(pri) } // SetResourceGroupTag sets the resource tag for both write and read. func (txn *KVTxn) SetResourceGroupTag(tag []byte) { - txn.resourceGroupTag = tag txn.GetSnapshot().SetResourceGroupTag(tag) - if txn.committer != nil && txn.IsPipelined() { - txn.committer.resourceGroupTag = tag - } + txn.commitActionContext.setResourceGroupTag(tag) } // SetResourceGroupTagger sets the resource tagger for both write and read. // Before sending the request, if resourceGroupTag is not nil, use // resourceGroupTag directly, otherwise use resourceGroupTagger. func (txn *KVTxn) SetResourceGroupTagger(tagger tikvrpc.ResourceGroupTagger) { - txn.resourceGroupTagger = tagger txn.GetSnapshot().SetResourceGroupTagger(tagger) - if txn.committer != nil && txn.IsPipelined() { - txn.committer.resourceGroupTagger = tagger - } + txn.commitActionContext.setResourceGroupTagger(tagger) } // SetResourceGroupName set resource group name for both read and write. func (txn *KVTxn) SetResourceGroupName(name string) { - txn.resourceGroupName = name txn.GetSnapshot().SetResourceGroupName(name) - if txn.committer != nil && txn.IsPipelined() { - txn.committer.resourceGroupName = name - } + txn.commitActionContext.setResourceGroupName(name) } // SetRPCInterceptor sets interceptor.RPCInterceptor for the transaction and its related snapshot. // interceptor.RPCInterceptor will be executed before each RPC request is initiated. // Note that SetRPCInterceptor will replace the previously set interceptor. func (txn *KVTxn) SetRPCInterceptor(it interceptor.RPCInterceptor) { - txn.interceptor = it txn.GetSnapshot().SetRPCInterceptor(it) + txn.commitActionContext.setRPCInterceptor(it) } // AddRPCInterceptor adds an interceptor, the order of addition is the order of execution. func (txn *KVTxn) AddRPCInterceptor(it interceptor.RPCInterceptor) { - if txn.interceptor == nil { - txn.SetRPCInterceptor(it) - return - } - txn.interceptor = interceptor.ChainRPCInterceptors(txn.interceptor, it) txn.GetSnapshot().AddRPCInterceptor(it) + txn.commitActionContext.addRPCInterceptor(it) } // SetCommitCallback sets up a function that will be called when the transaction @@ -365,34 +333,34 @@ func (txn *KVTxn) SetEnable1PC(b bool) { // guarantee linearizability. Default value is false which means // linearizability is guaranteed. func (txn *KVTxn) SetCausalConsistency(b bool) { - txn.causalConsistency = b + txn.commitActionContext.setCausalConsistency(b) } // SetScope sets the geographical scope of the transaction. func (txn *KVTxn) SetScope(scope string) { - txn.scope = scope + txn.commitActionContext.setScope(scope) } // SetKVFilter sets the filter to ignore key-values in memory buffer. func (txn *KVTxn) SetKVFilter(filter KVFilter) { - txn.kvFilter = filter + txn.commitActionContext.setKVFilter(filter) } // SetCommitTSUpperBoundCheck provide a way to restrict the commit TS upper bound. // The 2PC processing will pass the commitTS for the checker function, if the function // returns false, the 2PC processing will abort. func (txn *KVTxn) SetCommitTSUpperBoundCheck(f func(commitTS uint64) bool) { - txn.commitTSUpperBoundCheck = f + txn.commitActionContext.setCommitTSUpperBoundCheck(f) } // SetDiskFullOpt sets whether current operation is allowed in each TiKV disk usage level. func (txn *KVTxn) SetDiskFullOpt(level kvrpcpb.DiskFullOpt) { - txn.diskFullOpt = level + txn.commitActionContext.setDiskFullOpt(level) } // SetTxnSource sets the source of the transaction. func (txn *KVTxn) SetTxnSource(txnSource uint64) { - txn.txnSource = txnSource + txn.commitActionContext.setTxnSource(txnSource) } // GetDiskFullOpt gets the options of current operation in each TiKV disk usage level. @@ -402,12 +370,17 @@ func (txn *KVTxn) GetDiskFullOpt() kvrpcpb.DiskFullOpt { // ClearDiskFullOpt clears the options of current operation in each tikv disk usage level. func (txn *KVTxn) ClearDiskFullOpt() { - txn.diskFullOpt = kvrpcpb.DiskFullOpt_NotAllowedOnFull + txn.commitActionContext.setDiskFullOpt(kvrpcpb.DiskFullOpt_NotAllowedOnFull) } // SetAssertionLevel sets how strict the assertions in the transaction should be. func (txn *KVTxn) SetAssertionLevel(assertionLevel kvrpcpb.AssertionLevel) { - txn.assertionLevel = assertionLevel + txn.commitActionContext.setAssertionLevel(assertionLevel) +} + +// SetInTest sets whether the transaction is in test, since there may be some test builders don't compatible with testing.Testing. +func (txn *KVTxn) SetInTest(intest bool) { + txn.commitActionContext.setInTest(intest) } // IsPessimistic returns true if it is pessimistic. @@ -458,6 +431,7 @@ func (txn *KVTxn) InitPipelinedMemDB() error { flushedKeys += memdb.Len() flushedSize += memdb.Size() }() + txn.commitActionContext.setRunning() logutil.BgLogger().Info("[pipelined dml] flush memdb to kv store", zap.Int("keys", memdb.Len()), zap.String("size", units.HumanSize(float64(memdb.Size()))), zap.Int("flushed keys", flushedKeys), zap.String("flushed size", units.HumanSize(float64(flushedSize)))) @@ -554,11 +528,6 @@ func (txn *KVTxn) InitPipelinedMemDB() error { } return txn.committer.pipelinedFlushMutations(bo, mutations, generation) }) - txn.committer.priority = txn.priority.ToPB() - txn.committer.syncLog = txn.syncLog - txn.committer.resourceGroupTag = txn.resourceGroupTag - txn.committer.resourceGroupTagger = txn.resourceGroupTagger - txn.committer.resourceGroupName = txn.resourceGroupName txn.us = unionstore.NewUnionStore(pipelinedMemDB, txn.snapshot) return nil } @@ -586,6 +555,7 @@ func (txn *KVTxn) Commit(ctx context.Context) error { if !txn.valid { return tikverr.ErrInvalidTxn } + txn.commitActionContext.setRunning() defer txn.close() ctx = context.WithValue(ctx, util.RequestSourceKey, *txn.RequestSource) @@ -638,8 +608,6 @@ func (txn *KVTxn) Commit(ctx context.Context) error { txn.committer = committer } - committer.SetDiskFullOpt(txn.diskFullOpt) - committer.SetTxnSource(txn.txnSource) txn.committer.forUpdateTSConstraints = txn.forUpdateTSChecks defer committer.ttlManager.close() @@ -716,7 +684,6 @@ func (txn *KVTxn) Commit(ctx context.Context) error { func (txn *KVTxn) close() { txn.valid = false - txn.ClearDiskFullOpt() } // Rollback undoes the transaction operations to KV store. @@ -724,6 +691,7 @@ func (txn *KVTxn) Rollback() error { if !txn.valid { return tikverr.ErrInvalidTxn } + txn.commitActionContext.setRunning() if txn.IsInAggressiveLockingMode() { if len(txn.aggressiveLockingContext.currentLockedKeys) != 0 { @@ -766,6 +734,8 @@ func (txn *KVTxn) Rollback() error { txn.committer.ttlManager.close() } txn.close() + // forbid access to the committer after rollback. + txn.committer = nil logutil.BgLogger().Debug("[kv] rollback txn", zap.Uint64("txnStartTS", txn.StartTS())) if txn.isInternal() { metrics.TxnCmdHistogramWithRollbackInternal.Observe(time.Since(start).Seconds()) @@ -1105,6 +1075,8 @@ func (txn *KVTxn) LockKeysFunc(ctx context.Context, lockCtx *tikv.LockCtx, fn fu } func (txn *KVTxn) lockKeys(ctx context.Context, lockCtx *tikv.LockCtx, fn func(), keysInput ...[]byte) error { + txn.commitActionContext.setRunning() + defer txn.commitActionContext.setIdle() if txn.interceptor != nil { // User has called txn.SetRPCInterceptor() to explicitly set an interceptor, we // need to bind it to ctx so that the internal client can perceive and execute @@ -1611,10 +1583,7 @@ func (txn *KVTxn) GetSnapshot() *txnsnapshot.KVSnapshot { // SetBinlogExecutor sets the method to perform binlong synchronization. func (txn *KVTxn) SetBinlogExecutor(binlog BinlogExecutor) { - txn.binlog = binlog - if txn.committer != nil { - txn.committer.binlog = binlog - } + txn.commitActionContext.setBinlog(binlog) } // GetClusterID returns store's cluster id. @@ -1634,15 +1603,213 @@ func (txn *KVTxn) Mem() uint64 { // SetRequestSourceInternal sets the scope of the request source. func (txn *KVTxn) SetRequestSourceInternal(internal bool) { - txn.RequestSource.SetRequestSourceInternal(internal) + txn.commitActionContext.setRequestSourceInternal(internal) } // SetRequestSourceType sets the type of the request source. func (txn *KVTxn) SetRequestSourceType(tp string) { - txn.RequestSource.SetRequestSourceType(tp) + txn.commitActionContext.setRequestSourceType(tp) } // SetExplicitRequestSourceType sets the explicit type of the request source. func (txn *KVTxn) SetExplicitRequestSourceType(tp string) { - txn.RequestSource.SetExplicitRequestSourceType(tp) + txn.commitActionContext.setExplicitRequestSourceType(tp) +} + +// commitActionContext is the context will be read in committer. +// If the action is processing in background, like actionPipelinedFlush, we must ensure no write to the context before the action is done. +// the edit of committer must though this context, so that it's protected with action running status. +// the commitActionContext also guarantee the option consistency between the committer and the txn. +type commitActionContext struct { + running atomic.Bool + txn *KVTxn + intest bool + + // schemaVer is the infoSchema fetched at startTS. + schemaVer SchemaVer + syncLog bool + priority txnutil.Priority + binlog BinlogExecutor + schemaLeaseChecker SchemaLeaseChecker + causalConsistency bool + scope string + kvFilter KVFilter + resourceGroupTag []byte + diskFullOpt kvrpcpb.DiskFullOpt + txnSource uint64 + commitTSUpperBoundCheck func(uint64) bool + assertionLevel kvrpcpb.AssertionLevel + *util.RequestSource + // resourceGroupName is the name of tenant resource group. + resourceGroupName string + // use this when resourceGroupTag is nil + resourceGroupTagger tikvrpc.ResourceGroupTagger + // interceptor is used to decorate the RPC request logic related to the txn. + interceptor interceptor.RPCInterceptor +} + +func defaultCommitActionContext(txn *KVTxn) *commitActionContext { + return &commitActionContext{ + running: atomic.Bool{}, + txn: txn, + syncLog: false, + priority: txnutil.PriorityNormal, + diskFullOpt: kvrpcpb.DiskFullOpt_NotAllowedOnFull, + RequestSource: txn.snapshot.RequestSource, + } +} + +func (ctx *commitActionContext) applyToCommitter(committer *twoPhaseCommitter) { + committer.syncLog = ctx.syncLog + committer.priority = ctx.priority.ToPB() + committer.binlog = ctx.binlog + committer.resourceGroupTag = ctx.resourceGroupTag + committer.resourceGroupTagger = ctx.resourceGroupTagger + committer.diskFullOpt = ctx.diskFullOpt + committer.txnSource = ctx.txnSource +} + +func (ctx *commitActionContext) setInTest(intest bool) { + ctx.intest = intest +} + +// setRunning marks the context as running, it should be read only after calling setRunning. +func (ctx *commitActionContext) setRunning() { + ctx.running.Store(true) +} + +// setIdle marks the context as idle, can set option to it after calling setIdle. +func (ctx *commitActionContext) setIdle() { + ctx.running.Store(false) +} + +func (ctx *commitActionContext) assertIdle() { + if !ctx.intest { + return + } + if ctx.running.Load() { + panic("commit action is running") + } +} + +func (ctx *commitActionContext) setSchemaVer(schemaVer SchemaVer) { + ctx.assertIdle() + ctx.schemaVer = schemaVer +} + +func (ctx *commitActionContext) setForceSyncLog(syncLog bool) { + ctx.assertIdle() + ctx.syncLog = syncLog + if ctx.txn.committer != nil { + ctx.txn.committer.syncLog = syncLog + } +} + +func (ctx *commitActionContext) setPriority(pri txnutil.Priority) { + ctx.assertIdle() + ctx.priority = pri + if ctx.txn.committer != nil { + ctx.txn.committer.priority = pri.ToPB() + } +} + +func (ctx *commitActionContext) setBinlog(binlog BinlogExecutor) { + ctx.assertIdle() + ctx.binlog = binlog + if ctx.txn.committer != nil { + ctx.txn.committer.binlog = binlog + } +} + +func (ctx *commitActionContext) setSchemaLeaseChecker(schemaLeaseChecker SchemaLeaseChecker) { + ctx.assertIdle() + ctx.schemaLeaseChecker = schemaLeaseChecker +} + +func (ctx *commitActionContext) setCausalConsistency(causalConsistency bool) { + ctx.assertIdle() + ctx.causalConsistency = causalConsistency +} + +func (ctx *commitActionContext) setScope(scope string) { + ctx.assertIdle() + ctx.scope = scope +} + +func (ctx *commitActionContext) setKVFilter(kvFilter KVFilter) { + ctx.assertIdle() + ctx.kvFilter = kvFilter +} + +func (ctx *commitActionContext) setResourceGroupTag(resourceTag []byte) { + ctx.assertIdle() + ctx.resourceGroupTag = resourceTag + if ctx.txn.committer != nil { + ctx.txn.committer.resourceGroupTag = resourceTag + } +} + +func (ctx *commitActionContext) setResourceGroupTagger(resourceTagger tikvrpc.ResourceGroupTagger) { + ctx.assertIdle() + ctx.resourceGroupTagger = resourceTagger + if ctx.txn.committer != nil { + ctx.txn.committer.resourceGroupTagger = resourceTagger + } +} + +func (ctx *commitActionContext) setDiskFullOpt(level kvrpcpb.DiskFullOpt) { + ctx.assertIdle() + ctx.diskFullOpt = level + if ctx.txn.committer != nil { + ctx.txn.committer.diskFullOpt = level + } +} + +func (ctx *commitActionContext) setTxnSource(txnSource uint64) { + ctx.assertIdle() + ctx.txnSource = txnSource + if ctx.txn.committer != nil { + ctx.txn.committer.txnSource = txnSource + } +} + +func (ctx *commitActionContext) setCommitTSUpperBoundCheck(check func(uint64) bool) { + ctx.assertIdle() + ctx.commitTSUpperBoundCheck = check +} + +func (ctx *commitActionContext) setAssertionLevel(level kvrpcpb.AssertionLevel) { + ctx.assertIdle() + ctx.assertionLevel = level +} + +func (ctx *commitActionContext) setRequestSourceInternal(internal bool) { + ctx.RequestSource.SetRequestSourceInternal(internal) +} + +func (ctx *commitActionContext) setRequestSourceType(tp string) { + ctx.RequestSource.SetRequestSourceType(tp) +} + +func (ctx *commitActionContext) setExplicitRequestSourceType(tp string) { + ctx.RequestSource.SetExplicitRequestSourceType(tp) +} + +func (ctx *commitActionContext) setResourceGroupName(name string) { + ctx.assertIdle() + ctx.resourceGroupName = name +} + +func (ctx *commitActionContext) setRPCInterceptor(it interceptor.RPCInterceptor) { + ctx.assertIdle() + ctx.interceptor = it +} + +func (ctx *commitActionContext) addRPCInterceptor(it interceptor.RPCInterceptor) { + ctx.assertIdle() + if ctx.interceptor == nil { + ctx.interceptor = it + } else { + ctx.interceptor = interceptor.ChainRPCInterceptors(ctx.interceptor, it) + } }