Skip to content

Commit

Permalink
replace multierr.Append with errors.Join (#10798)
Browse files Browse the repository at this point in the history
replace multierr.Append with errors.Join

Approved by: @zhangxu19830126
  • Loading branch information
w-zr authored Jul 21, 2023
1 parent 974a4c8 commit 06b4dab
Show file tree
Hide file tree
Showing 11 changed files with 41 additions and 88 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ require (
github.com/tencentcloud/tencentcloud-sdk-go/tencentcloud/common v1.0.673
github.com/tidwall/btree v1.6.0
github.com/tidwall/pretty v1.2.1
go.uber.org/multierr v1.11.0
go.uber.org/ratelimit v0.2.0
go.uber.org/zap v1.24.0
golang.org/x/exp v0.0.0-20230515195305-f3d0a9c9a5cc
Expand Down Expand Up @@ -126,6 +125,7 @@ require (
github.com/valyala/histogram v1.2.0 // indirect
github.com/yusufpapurcu/wmi v1.2.2 // indirect
go.uber.org/atomic v1.11.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
golang.org/x/mod v0.10.0 // indirect
golang.org/x/net v0.10.0 // indirect
golang.org/x/tools v0.9.1 // indirect
Expand Down
37 changes: 11 additions & 26 deletions pkg/dnservice/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package dnservice

import (
"context"
"errors"
"sync"
"time"

Expand All @@ -39,7 +40,6 @@ import (
"github.com/matrixorigin/matrixone/pkg/txn/service"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/blockio"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/common"
"go.uber.org/multierr"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -200,35 +200,26 @@ func (s *store) Start() error {

func (s *store) Close() error {
s.stopper.Stop()
var err error
s.moCluster.Close()
if e := s.ctlservice.Close(); e != nil {
err = multierr.Append(e, err)
}
if e := s.hakeeperClient.Close(); e != nil {
err = multierr.Append(e, err)
}
if e := s.sender.Close(); e != nil {
err = multierr.Append(e, err)
}
if e := s.server.Close(); e != nil {
err = multierr.Append(e, err)
}
if e := s.lockTableAllocator.Close(); e != nil {
err = multierr.Append(e, err)
}
err := errors.Join(
s.ctlservice.Close(),
s.hakeeperClient.Close(),
s.sender.Close(),
s.server.Close(),
s.lockTableAllocator.Close(),
)
s.replicas.Range(func(_, value any) bool {
r := value.(*replica)
if e := r.close(false); e != nil {
err = multierr.Append(e, err)
err = errors.Join(e, err)
}
return true
})
s.task.RLock()
ts := s.task.serviceHolder
s.task.RUnlock()
if ts != nil {
err = ts.Close()
err = errors.Join(err, ts.Close())
}
// stop I/O pipeline
blockio.Stop()
Expand Down Expand Up @@ -293,13 +284,7 @@ func (s *store) createReplica(shard metadata.DNShard) error {
continue
}

err = r.start(service.NewTxnService(
r.rt,
shard,
storage,
s.sender,
s.cfg.Txn.ZombieTimeout.Duration,
s.lockTableAllocator))
err = r.start(service.NewTxnService(shard, storage, s.sender, s.cfg.Txn.ZombieTimeout.Duration, s.lockTableAllocator))
if err != nil {
r.logger.Fatal("start DNShard failed",
zap.Error(err))
Expand Down
14 changes: 4 additions & 10 deletions pkg/taskservice/mysql_task_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
"github.com/go-sql-driver/mysql"
"github.com/matrixorigin/matrixone/pkg/common/moerr"
"github.com/matrixorigin/matrixone/pkg/pb/task"
"go.uber.org/multierr"
)

var (
Expand Down Expand Up @@ -567,12 +566,7 @@ func (m *mysqlTaskStorage) QueryCronTask(ctx context.Context) (tasks []task.Cron
return nil, err
}
defer func() {
if e := rows.Close(); e != nil {
err = errors.Join(err, e)
}
if e := rows.Err(); e != nil {
err = errors.Join(err, e)
}
err = errors.Join(err, rows.Close(), rows.Err())
}()

tasks = make([]task.CronTask, 0)
Expand Down Expand Up @@ -775,18 +769,18 @@ func (m *mysqlTaskStorage) getDB() (*sql.DB, func() error, error) {
}

if err = m.useDB(db); err != nil {
return nil, nil, multierr.Append(err, db.Close())
return nil, nil, errors.Join(err, db.Close())
}

return db, func() error { return db.Close() }, nil
return db, db.Close, nil
}

func (m *mysqlTaskStorage) useDB(db *sql.DB) (err error) {
if err := db.Ping(); err != nil {
return errNotReady
}
if _, err := db.Exec("use " + m.dbname); err != nil {
return multierr.Append(err, db.Close())
return errors.Join(err, db.Close())
}
return nil
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/tests/txn/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func (c *cluster) Env() service.Cluster {
}

func (c *cluster) NewClient() Client {
cli, err := newSQLClient(c.logger, c.env)
cli, err := newSQLClient(c.env)
require.NoError(c.t, err)
return cli
}
35 changes: 10 additions & 25 deletions pkg/tests/txn/sql_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@ import (
"github.com/matrixorigin/matrixone/pkg/common/moerr"
"github.com/matrixorigin/matrixone/pkg/tests/service"
"github.com/matrixorigin/matrixone/pkg/txn/client"
"go.uber.org/multierr"
"go.uber.org/zap"
)

var (
Expand All @@ -40,7 +38,7 @@ type sqlClient struct {
cn service.CNService
}

func newSQLClient(logger *zap.Logger, env service.Cluster) (Client, error) {
func newSQLClient(env service.Cluster) (Client, error) {
cn, err := env.GetCNServiceIndexed(0)
if err != nil {
return nil, err
Expand All @@ -53,25 +51,25 @@ func newSQLClient(logger *zap.Logger, env service.Cluster) (Client, error) {

_, err = db.Exec(createDB)
if err != nil {
return nil, multierr.Append(err, db.Close())
return nil, errors.Join(err, db.Close())
}

_, err = db.Exec(useDB)
if err != nil {
return nil, multierr.Append(err, db.Close())
return nil, errors.Join(err, db.Close())
}

_, err = db.Exec(createSql)
if err != nil {
return nil, multierr.Append(err, db.Close())
return nil, errors.Join(err, db.Close())
}

return &sqlClient{
cn: cn,
}, multierr.Append(err, db.Close())
}, errors.Join(err, db.Close())
}

func (c *sqlClient) NewTxn(options ...client.TxnOption) (Txn, error) {
func (c *sqlClient) NewTxn(...client.TxnOption) (Txn, error) {
return newSQLTxn(c.cn)
}

Expand All @@ -93,7 +91,7 @@ func newSQLTxn(cn service.CNService) (Txn, error) {

txn, err := db.Begin()
if err != nil {
return nil, multierr.Append(err, db.Close())
return nil, errors.Join(err, db.Close())
}
return &sqlTxn{
db: db,
Expand All @@ -109,11 +107,7 @@ func (kop *sqlTxn) Commit() error {
}

kop.mu.closed = true
err := kop.txn.Commit()
if err != nil {
return multierr.Append(err, kop.db.Close())
}
return kop.db.Close()
return errors.Join(kop.txn.Commit(), kop.db.Close())
}

func (kop *sqlTxn) Rollback() error {
Expand All @@ -124,11 +118,7 @@ func (kop *sqlTxn) Rollback() error {
}

kop.mu.closed = true
err := kop.txn.Rollback()
if err != nil {
return multierr.Append(err, kop.db.Close())
}
return kop.db.Close()
return errors.Join(kop.txn.Rollback(), kop.db.Close())
}

func (kop *sqlTxn) Read(key string) (_ string, err error) {
Expand All @@ -137,12 +127,7 @@ func (kop *sqlTxn) Read(key string) (_ string, err error) {
return "", err
}
defer func() {
if e := rows.Close(); e != nil {
err = errors.Join(err, e)
}
if e := rows.Err(); e != nil {
err = errors.Join(err, e)
}
err = errors.Join(err, rows.Close(), rows.Err())
}()
if !rows.Next() {
return "", nil
Expand Down
4 changes: 2 additions & 2 deletions pkg/txn/client/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package client
import (
"bytes"
"context"
"errors"
"sync"

"github.com/matrixorigin/matrixone/pkg/common/moerr"
Expand All @@ -28,7 +29,6 @@ import (
"github.com/matrixorigin/matrixone/pkg/txn/clock"
"github.com/matrixorigin/matrixone/pkg/txn/rpc"
"github.com/matrixorigin/matrixone/pkg/txn/util"
"go.uber.org/multierr"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -491,7 +491,7 @@ func (tc *txnOperator) doWrite(ctx context.Context, requests []txn.TxnRequest, c
if commit {
if tc.workspace != nil {
if err := tc.workspace.Commit(ctx); err != nil {
return nil, multierr.Append(err, tc.Rollback(ctx))
return nil, errors.Join(err, tc.Rollback(ctx))
}
}
tc.mu.Lock()
Expand Down
12 changes: 3 additions & 9 deletions pkg/txn/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,20 @@ package service
import (
"bytes"
"context"
"errors"
"fmt"
"sync"
"time"

"github.com/fagongzi/util/hack"
"github.com/matrixorigin/matrixone/pkg/common/log"
"github.com/matrixorigin/matrixone/pkg/common/runtime"
"github.com/matrixorigin/matrixone/pkg/common/stopper"
"github.com/matrixorigin/matrixone/pkg/lockservice"
"github.com/matrixorigin/matrixone/pkg/pb/metadata"
"github.com/matrixorigin/matrixone/pkg/pb/txn"
"github.com/matrixorigin/matrixone/pkg/txn/rpc"
"github.com/matrixorigin/matrixone/pkg/txn/storage"
"github.com/matrixorigin/matrixone/pkg/txn/util"
"go.uber.org/multierr"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -67,7 +66,6 @@ type service struct {

// NewTxnService create TxnService
func NewTxnService(
rt runtime.Runtime,
shard metadata.DNShard,
storage storage.TxnStorage,
sender rpc.TxnSender,
Expand Down Expand Up @@ -118,10 +116,7 @@ func (s *service) Close(destroy bool) error {
closer = s.storage.Destroy
}
// FIXME: all context.TODO() need to use tracing context
if err := closer(context.TODO()); err != nil {
return multierr.Append(err, s.sender.Close())
}
return s.sender.Close()
return errors.Join(closer(context.TODO()), s.sender.Close())
}

func (s *service) gcZombieTxn(ctx context.Context) {
Expand Down Expand Up @@ -189,7 +184,7 @@ func (s *service) maybeAddTxn(meta txn.TxnMeta) (*txnContext, bool) {
}

// 1. first transaction write request at current DNShard
// 2. transaction already committed or aborted, the transcation context will removed by gcZombieTxn.
// 2. transaction already committed or aborted, the transaction context will be removed by gcZombieTxn.
txnCtx.init(meta, acquireNotifier())
util.LogTxnCreateOn(meta, s.shard)
return txnCtx, true
Expand Down Expand Up @@ -230,7 +225,6 @@ func (s *service) releaseTxnContext(txnCtx *txnContext) {

func (s *service) parallelSendWithRetry(
ctx context.Context,
txnMeta txn.TxnMeta,
requests []txn.TxnRequest,
ignoreTxnErrorCodes map[uint16]struct{}) *rpc.SendResult {
for {
Expand Down
10 changes: 5 additions & 5 deletions pkg/txn/service/service_cn_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,11 @@ import (
)

var (
rollbackIngoreErrorCodes = map[uint16]struct{}{
rollbackIgnoreErrorCodes = map[uint16]struct{}{
moerr.ErrTxnNotFound: {},
}

prepareIngoreErrorCodes = map[uint16]struct{}{
prepareIgnoreErrorCodes = map[uint16]struct{}{
moerr.ErrTxnNotFound: {},
}
)
Expand Down Expand Up @@ -69,7 +69,7 @@ func (s *service) Read(ctx context.Context, request *txn.TxnRequest, response *t
waiters := make([]*waiter, 0, len(result.WaitTxns()))
for _, txnID := range result.WaitTxns() {
txnCtx := s.getTxnContext(txnID)
// The transaction can not found, it means the concurrent transaction to be waited for has already
// The transaction can not be found, it means the concurrent transaction to be waited for has already
// been committed or aborted.
if txnCtx == nil {
continue
Expand Down Expand Up @@ -375,7 +375,7 @@ func (s *service) startAsyncRollbackTask(txnMeta txn.TxnMeta) {
})
}

s.parallelSendWithRetry(ctx, txnMeta, requests, rollbackIngoreErrorCodes)
s.parallelSendWithRetry(ctx, requests, rollbackIgnoreErrorCodes)
util.LogTxnRollbackCompleted(txnMeta)
})
if err != nil {
Expand Down Expand Up @@ -433,7 +433,7 @@ func (s *service) startAsyncCommitTask(txnCtx *txnContext) error {
ctx, cancel := context.WithTimeout(ctx, time.Duration(math.MaxInt64))
defer cancel()

if result := s.parallelSendWithRetry(ctx, txnMeta, requests, rollbackIngoreErrorCodes); result != nil {
if result := s.parallelSendWithRetry(ctx, requests, rollbackIgnoreErrorCodes); result != nil {
result.Release()
if s.logger.Enabled(zap.DebugLevel) {
s.logger.Debug("other dnshards committed",
Expand Down
2 changes: 1 addition & 1 deletion pkg/txn/service/service_recovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ func (s *service) startAsyncCheckCommitTask(txnCtx *txnContext) error {
})
}

result := s.parallelSendWithRetry(ctx, txnMeta, requests, prepareIngoreErrorCodes)
result := s.parallelSendWithRetry(ctx, requests, prepareIgnoreErrorCodes)
if result == nil {
return
}
Expand Down
3 changes: 1 addition & 2 deletions pkg/txn/service/service_testutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,8 +114,7 @@ func NewTestTxnServiceWithLogAndZombieAndLockTabkeAllocator(
logutil.GetPanicLoggerWithLevel(zapcore.DebugLevel).With(zap.String("case", t.Name())),
runtime.WithClock(clock))
runtime.SetupProcessLevelRuntime(rt)
return NewTxnService(rt,
NewTestDNShard(shard),
return NewTxnService(NewTestDNShard(shard),
NewTestTxnStorage(log, clock),
sender,
zombie,
Expand Down
Loading

0 comments on commit 06b4dab

Please sign in to comment.