diff --git a/go.mod b/go.mod index 7133b28db154..694921c4a600 100644 --- a/go.mod +++ b/go.mod @@ -52,7 +52,6 @@ require ( github.com/tencentcloud/tencentcloud-sdk-go/tencentcloud/common v1.0.673 github.com/tidwall/btree v1.6.0 github.com/tidwall/pretty v1.2.1 - go.uber.org/multierr v1.11.0 go.uber.org/ratelimit v0.2.0 go.uber.org/zap v1.24.0 golang.org/x/exp v0.0.0-20230515195305-f3d0a9c9a5cc @@ -126,6 +125,7 @@ require ( github.com/valyala/histogram v1.2.0 // indirect github.com/yusufpapurcu/wmi v1.2.2 // indirect go.uber.org/atomic v1.11.0 // indirect + go.uber.org/multierr v1.11.0 // indirect golang.org/x/mod v0.10.0 // indirect golang.org/x/net v0.10.0 // indirect golang.org/x/tools v0.9.1 // indirect diff --git a/pkg/dnservice/store.go b/pkg/dnservice/store.go index 5919ff83fd79..61658ee9d1ae 100644 --- a/pkg/dnservice/store.go +++ b/pkg/dnservice/store.go @@ -16,6 +16,7 @@ package dnservice import ( "context" + "errors" "sync" "time" @@ -39,7 +40,6 @@ import ( "github.com/matrixorigin/matrixone/pkg/txn/service" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/blockio" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/common" - "go.uber.org/multierr" "go.uber.org/zap" ) @@ -200,27 +200,18 @@ func (s *store) Start() error { func (s *store) Close() error { s.stopper.Stop() - var err error s.moCluster.Close() - if e := s.ctlservice.Close(); e != nil { - err = multierr.Append(e, err) - } - if e := s.hakeeperClient.Close(); e != nil { - err = multierr.Append(e, err) - } - if e := s.sender.Close(); e != nil { - err = multierr.Append(e, err) - } - if e := s.server.Close(); e != nil { - err = multierr.Append(e, err) - } - if e := s.lockTableAllocator.Close(); e != nil { - err = multierr.Append(e, err) - } + err := errors.Join( + s.ctlservice.Close(), + s.hakeeperClient.Close(), + s.sender.Close(), + s.server.Close(), + s.lockTableAllocator.Close(), + ) s.replicas.Range(func(_, value any) bool { r := value.(*replica) if e := r.close(false); e != nil { - err = multierr.Append(e, err) + err = errors.Join(e, err) } return true }) @@ -228,7 +219,7 @@ func (s *store) Close() error { ts := s.task.serviceHolder s.task.RUnlock() if ts != nil { - err = ts.Close() + err = errors.Join(err, ts.Close()) } // stop I/O pipeline blockio.Stop() @@ -293,13 +284,7 @@ func (s *store) createReplica(shard metadata.DNShard) error { continue } - err = r.start(service.NewTxnService( - r.rt, - shard, - storage, - s.sender, - s.cfg.Txn.ZombieTimeout.Duration, - s.lockTableAllocator)) + err = r.start(service.NewTxnService(shard, storage, s.sender, s.cfg.Txn.ZombieTimeout.Duration, s.lockTableAllocator)) if err != nil { r.logger.Fatal("start DNShard failed", zap.Error(err)) diff --git a/pkg/taskservice/mysql_task_storage.go b/pkg/taskservice/mysql_task_storage.go index 3b4bf9494e50..12dff845f52f 100644 --- a/pkg/taskservice/mysql_task_storage.go +++ b/pkg/taskservice/mysql_task_storage.go @@ -26,7 +26,6 @@ import ( "github.com/go-sql-driver/mysql" "github.com/matrixorigin/matrixone/pkg/common/moerr" "github.com/matrixorigin/matrixone/pkg/pb/task" - "go.uber.org/multierr" ) var ( @@ -567,12 +566,7 @@ func (m *mysqlTaskStorage) QueryCronTask(ctx context.Context) (tasks []task.Cron return nil, err } defer func() { - if e := rows.Close(); e != nil { - err = errors.Join(err, e) - } - if e := rows.Err(); e != nil { - err = errors.Join(err, e) - } + err = errors.Join(err, rows.Close(), rows.Err()) }() tasks = make([]task.CronTask, 0) @@ -775,10 +769,10 @@ func (m *mysqlTaskStorage) getDB() (*sql.DB, func() error, error) { } if err = m.useDB(db); err != nil { - return nil, nil, multierr.Append(err, db.Close()) + return nil, nil, errors.Join(err, db.Close()) } - return db, func() error { return db.Close() }, nil + return db, db.Close, nil } func (m *mysqlTaskStorage) useDB(db *sql.DB) (err error) { @@ -786,7 +780,7 @@ func (m *mysqlTaskStorage) useDB(db *sql.DB) (err error) { return errNotReady } if _, err := db.Exec("use " + m.dbname); err != nil { - return multierr.Append(err, db.Close()) + return errors.Join(err, db.Close()) } return nil } diff --git a/pkg/tests/txn/cluster.go b/pkg/tests/txn/cluster.go index 0f15c6a95e8a..acaf90b1d369 100644 --- a/pkg/tests/txn/cluster.go +++ b/pkg/tests/txn/cluster.go @@ -88,7 +88,7 @@ func (c *cluster) Env() service.Cluster { } func (c *cluster) NewClient() Client { - cli, err := newSQLClient(c.logger, c.env) + cli, err := newSQLClient(c.env) require.NoError(c.t, err) return cli } diff --git a/pkg/tests/txn/sql_client.go b/pkg/tests/txn/sql_client.go index 0949de1c5984..1c306441f446 100644 --- a/pkg/tests/txn/sql_client.go +++ b/pkg/tests/txn/sql_client.go @@ -25,8 +25,6 @@ import ( "github.com/matrixorigin/matrixone/pkg/common/moerr" "github.com/matrixorigin/matrixone/pkg/tests/service" "github.com/matrixorigin/matrixone/pkg/txn/client" - "go.uber.org/multierr" - "go.uber.org/zap" ) var ( @@ -40,7 +38,7 @@ type sqlClient struct { cn service.CNService } -func newSQLClient(logger *zap.Logger, env service.Cluster) (Client, error) { +func newSQLClient(env service.Cluster) (Client, error) { cn, err := env.GetCNServiceIndexed(0) if err != nil { return nil, err @@ -53,25 +51,25 @@ func newSQLClient(logger *zap.Logger, env service.Cluster) (Client, error) { _, err = db.Exec(createDB) if err != nil { - return nil, multierr.Append(err, db.Close()) + return nil, errors.Join(err, db.Close()) } _, err = db.Exec(useDB) if err != nil { - return nil, multierr.Append(err, db.Close()) + return nil, errors.Join(err, db.Close()) } _, err = db.Exec(createSql) if err != nil { - return nil, multierr.Append(err, db.Close()) + return nil, errors.Join(err, db.Close()) } return &sqlClient{ cn: cn, - }, multierr.Append(err, db.Close()) + }, errors.Join(err, db.Close()) } -func (c *sqlClient) NewTxn(options ...client.TxnOption) (Txn, error) { +func (c *sqlClient) NewTxn(...client.TxnOption) (Txn, error) { return newSQLTxn(c.cn) } @@ -93,7 +91,7 @@ func newSQLTxn(cn service.CNService) (Txn, error) { txn, err := db.Begin() if err != nil { - return nil, multierr.Append(err, db.Close()) + return nil, errors.Join(err, db.Close()) } return &sqlTxn{ db: db, @@ -109,11 +107,7 @@ func (kop *sqlTxn) Commit() error { } kop.mu.closed = true - err := kop.txn.Commit() - if err != nil { - return multierr.Append(err, kop.db.Close()) - } - return kop.db.Close() + return errors.Join(kop.txn.Commit(), kop.db.Close()) } func (kop *sqlTxn) Rollback() error { @@ -124,11 +118,7 @@ func (kop *sqlTxn) Rollback() error { } kop.mu.closed = true - err := kop.txn.Rollback() - if err != nil { - return multierr.Append(err, kop.db.Close()) - } - return kop.db.Close() + return errors.Join(kop.txn.Rollback(), kop.db.Close()) } func (kop *sqlTxn) Read(key string) (_ string, err error) { @@ -137,12 +127,7 @@ func (kop *sqlTxn) Read(key string) (_ string, err error) { return "", err } defer func() { - if e := rows.Close(); e != nil { - err = errors.Join(err, e) - } - if e := rows.Err(); e != nil { - err = errors.Join(err, e) - } + err = errors.Join(err, rows.Close(), rows.Err()) }() if !rows.Next() { return "", nil diff --git a/pkg/txn/client/operator.go b/pkg/txn/client/operator.go index d3573424bee5..2cd807be0ef7 100644 --- a/pkg/txn/client/operator.go +++ b/pkg/txn/client/operator.go @@ -17,6 +17,7 @@ package client import ( "bytes" "context" + "errors" "sync" "github.com/matrixorigin/matrixone/pkg/common/moerr" @@ -28,7 +29,6 @@ import ( "github.com/matrixorigin/matrixone/pkg/txn/clock" "github.com/matrixorigin/matrixone/pkg/txn/rpc" "github.com/matrixorigin/matrixone/pkg/txn/util" - "go.uber.org/multierr" "go.uber.org/zap" ) @@ -491,7 +491,7 @@ func (tc *txnOperator) doWrite(ctx context.Context, requests []txn.TxnRequest, c if commit { if tc.workspace != nil { if err := tc.workspace.Commit(ctx); err != nil { - return nil, multierr.Append(err, tc.Rollback(ctx)) + return nil, errors.Join(err, tc.Rollback(ctx)) } } tc.mu.Lock() diff --git a/pkg/txn/service/service.go b/pkg/txn/service/service.go index abf8ef0dc46f..0bf680e03d76 100644 --- a/pkg/txn/service/service.go +++ b/pkg/txn/service/service.go @@ -17,13 +17,13 @@ package service import ( "bytes" "context" + "errors" "fmt" "sync" "time" "github.com/fagongzi/util/hack" "github.com/matrixorigin/matrixone/pkg/common/log" - "github.com/matrixorigin/matrixone/pkg/common/runtime" "github.com/matrixorigin/matrixone/pkg/common/stopper" "github.com/matrixorigin/matrixone/pkg/lockservice" "github.com/matrixorigin/matrixone/pkg/pb/metadata" @@ -31,7 +31,6 @@ import ( "github.com/matrixorigin/matrixone/pkg/txn/rpc" "github.com/matrixorigin/matrixone/pkg/txn/storage" "github.com/matrixorigin/matrixone/pkg/txn/util" - "go.uber.org/multierr" "go.uber.org/zap" ) @@ -67,7 +66,6 @@ type service struct { // NewTxnService create TxnService func NewTxnService( - rt runtime.Runtime, shard metadata.DNShard, storage storage.TxnStorage, sender rpc.TxnSender, @@ -118,10 +116,7 @@ func (s *service) Close(destroy bool) error { closer = s.storage.Destroy } // FIXME: all context.TODO() need to use tracing context - if err := closer(context.TODO()); err != nil { - return multierr.Append(err, s.sender.Close()) - } - return s.sender.Close() + return errors.Join(closer(context.TODO()), s.sender.Close()) } func (s *service) gcZombieTxn(ctx context.Context) { @@ -189,7 +184,7 @@ func (s *service) maybeAddTxn(meta txn.TxnMeta) (*txnContext, bool) { } // 1. first transaction write request at current DNShard - // 2. transaction already committed or aborted, the transcation context will removed by gcZombieTxn. + // 2. transaction already committed or aborted, the transaction context will be removed by gcZombieTxn. txnCtx.init(meta, acquireNotifier()) util.LogTxnCreateOn(meta, s.shard) return txnCtx, true @@ -230,7 +225,6 @@ func (s *service) releaseTxnContext(txnCtx *txnContext) { func (s *service) parallelSendWithRetry( ctx context.Context, - txnMeta txn.TxnMeta, requests []txn.TxnRequest, ignoreTxnErrorCodes map[uint16]struct{}) *rpc.SendResult { for { diff --git a/pkg/txn/service/service_cn_handler.go b/pkg/txn/service/service_cn_handler.go index 46c74dd45851..94d3c76b5c1f 100644 --- a/pkg/txn/service/service_cn_handler.go +++ b/pkg/txn/service/service_cn_handler.go @@ -29,11 +29,11 @@ import ( ) var ( - rollbackIngoreErrorCodes = map[uint16]struct{}{ + rollbackIgnoreErrorCodes = map[uint16]struct{}{ moerr.ErrTxnNotFound: {}, } - prepareIngoreErrorCodes = map[uint16]struct{}{ + prepareIgnoreErrorCodes = map[uint16]struct{}{ moerr.ErrTxnNotFound: {}, } ) @@ -69,7 +69,7 @@ func (s *service) Read(ctx context.Context, request *txn.TxnRequest, response *t waiters := make([]*waiter, 0, len(result.WaitTxns())) for _, txnID := range result.WaitTxns() { txnCtx := s.getTxnContext(txnID) - // The transaction can not found, it means the concurrent transaction to be waited for has already + // The transaction can not be found, it means the concurrent transaction to be waited for has already // been committed or aborted. if txnCtx == nil { continue @@ -375,7 +375,7 @@ func (s *service) startAsyncRollbackTask(txnMeta txn.TxnMeta) { }) } - s.parallelSendWithRetry(ctx, txnMeta, requests, rollbackIngoreErrorCodes) + s.parallelSendWithRetry(ctx, requests, rollbackIgnoreErrorCodes) util.LogTxnRollbackCompleted(txnMeta) }) if err != nil { @@ -433,7 +433,7 @@ func (s *service) startAsyncCommitTask(txnCtx *txnContext) error { ctx, cancel := context.WithTimeout(ctx, time.Duration(math.MaxInt64)) defer cancel() - if result := s.parallelSendWithRetry(ctx, txnMeta, requests, rollbackIngoreErrorCodes); result != nil { + if result := s.parallelSendWithRetry(ctx, requests, rollbackIgnoreErrorCodes); result != nil { result.Release() if s.logger.Enabled(zap.DebugLevel) { s.logger.Debug("other dnshards committed", diff --git a/pkg/txn/service/service_recovery.go b/pkg/txn/service/service_recovery.go index ab3e9a02a1d1..b94b7d1d2b7c 100644 --- a/pkg/txn/service/service_recovery.go +++ b/pkg/txn/service/service_recovery.go @@ -130,7 +130,7 @@ func (s *service) startAsyncCheckCommitTask(txnCtx *txnContext) error { }) } - result := s.parallelSendWithRetry(ctx, txnMeta, requests, prepareIngoreErrorCodes) + result := s.parallelSendWithRetry(ctx, requests, prepareIgnoreErrorCodes) if result == nil { return } diff --git a/pkg/txn/service/service_testutil.go b/pkg/txn/service/service_testutil.go index 701bdeb1f6d4..de865d937a72 100644 --- a/pkg/txn/service/service_testutil.go +++ b/pkg/txn/service/service_testutil.go @@ -114,8 +114,7 @@ func NewTestTxnServiceWithLogAndZombieAndLockTabkeAllocator( logutil.GetPanicLoggerWithLevel(zapcore.DebugLevel).With(zap.String("case", t.Name())), runtime.WithClock(clock)) runtime.SetupProcessLevelRuntime(rt) - return NewTxnService(rt, - NewTestDNShard(shard), + return NewTxnService(NewTestDNShard(shard), NewTestTxnStorage(log, clock), sender, zombie, diff --git a/pkg/txn/storage/tae/storage.go b/pkg/txn/storage/tae/storage.go index 3bdac3a6a212..85b2f3da0833 100644 --- a/pkg/txn/storage/tae/storage.go +++ b/pkg/txn/storage/tae/storage.go @@ -16,8 +16,7 @@ package taestorage import ( "context" - - "go.uber.org/multierr" + "errors" "github.com/matrixorigin/matrixone/pkg/common/runtime" "github.com/matrixorigin/matrixone/pkg/fileservice" @@ -91,10 +90,7 @@ func (s *taeStorage) Start() error { // Close implements storage.TxnTAEStorage func (s *taeStorage) Close(ctx context.Context) error { - if err := s.logtailServer.Close(); err != nil { - return multierr.Append(err, s.taeHandler.HandleClose(ctx)) - } - return s.taeHandler.HandleClose(ctx) + return errors.Join(s.logtailServer.Close(), s.taeHandler.HandleClose(ctx)) } // Commit implements storage.TxnTAEStorage