Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Cherry-pick] Improve cdc send sql performance #19843

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 33 additions & 23 deletions pkg/cdc/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ type tableReader struct {
sinker Sinker
wMarkUpdater *WatermarkUpdater
tick *time.Ticker
restartFunc func(*DbTableInfo) error
resetWatermarkFunc func(*DbTableInfo) error
initSnapshotSplitTxn bool

tableDef *plan.TableDef
Expand All @@ -58,7 +58,7 @@ func NewTableReader(
sinker Sinker,
wMarkUpdater *WatermarkUpdater,
tableDef *plan.TableDef,
restartFunc func(*DbTableInfo) error,
resetWatermarkFunc func(*DbTableInfo) error,
initSnapshotSplitTxn bool,
) Reader {
reader := &tableReader{
Expand All @@ -70,7 +70,7 @@ func NewTableReader(
sinker: sinker,
wMarkUpdater: wMarkUpdater,
tick: time.NewTicker(200 * time.Millisecond),
restartFunc: restartFunc,
resetWatermarkFunc: resetWatermarkFunc,
initSnapshotSplitTxn: initSnapshotSplitTxn,
tableDef: tableDef,
}
Expand Down Expand Up @@ -112,19 +112,22 @@ func (reader *tableReader) Run(
}

if err := reader.readTable(ctx, ar); err != nil {
logutil.Errorf("cdc tableReader(%v) failed, err: %v\n", reader.info, err)
logutil.Errorf("cdc tableReader(%v) failed, err: %v", reader.info, err)

// if stale read, try to restart reader
if moerr.IsMoErrCode(err, moerr.ErrStaleRead) {
if err = reader.restartFunc(reader.info); err != nil {
logutil.Errorf("cdc tableReader(%v) restart failed, err: %v\n", reader.info, err)
// reset sinker
reader.sinker.Reset()
// reset watermark
if err = reader.resetWatermarkFunc(reader.info); err != nil {
logutil.Errorf("cdc tableReader(%v) restart failed, err: %v", reader.info, err)
return
}
logutil.Errorf("cdc tableReader(%v) restart successfully\n", reader.info)
logutil.Errorf("cdc tableReader(%v) restart successfully", reader.info)
continue
}

logutil.Errorf("cdc tableReader(%v) err is not stale read, quit\n", reader.info)
logutil.Errorf("cdc tableReader(%v) err is not stale read, quit", reader.info)
return
}
}
Expand Down Expand Up @@ -247,9 +250,15 @@ func (reader *tableReader) readTableWithTxn(
defer func() {
if hasBegin {
if err == nil {
err = reader.sinker.SendCommit(ctx)
} else {
_ = reader.sinker.SendRollback(ctx)
// error may can't be caught immediately, but must be caught when next call
reader.sinker.SendCommit()
// so send a dummy sql to guarantee previous commit is sent successfully
reader.sinker.SendDummy()
err = reader.sinker.Error()
}

if err != nil {
reader.sinker.SendRollback()
}
}

Expand All @@ -269,8 +278,12 @@ func (reader *tableReader) readTableWithTxn(
return
default:
}
v2.CdcMpoolInUseBytesGauge.Set(float64(reader.mp.Stats().NumCurrBytes.Load()))
// check sinker error of last round
if err = reader.sinker.Error(); err != nil {
return
}

v2.CdcMpoolInUseBytesGauge.Set(float64(reader.mp.Stats().NumCurrBytes.Load()))
start = time.Now()
insertData, deleteData, curHint, err = changes.Next(ctx, reader.mp)
v2.CdcReadDurationHistogram.Observe(time.Since(start).Seconds())
Expand All @@ -281,11 +294,12 @@ func (reader *tableReader) readTableWithTxn(
// both nil denote no more data (end of this tail)
if insertData == nil && deleteData == nil {
// heartbeat
err = reader.sinker.Sink(ctx, &DecoderOutput{
reader.sinker.Sink(ctx, &DecoderOutput{
noMoreData: true,
fromTs: fromTs,
toTs: toTs,
})
err = reader.sinker.Error()
return
}

Expand All @@ -295,22 +309,20 @@ func (reader *tableReader) readTableWithTxn(
case engine.ChangesHandle_Snapshot:
// output sql in a txn
if !hasBegin && !reader.initSnapshotSplitTxn {
if err = reader.sinker.SendBegin(ctx); err != nil {
return err
}
reader.sinker.SendBegin()
hasBegin = true
}

// transform into insert instantly
err = reader.sinker.Sink(ctx, &DecoderOutput{
reader.sinker.Sink(ctx, &DecoderOutput{
outputTyp: OutputTypeSnapshot,
checkpointBat: insertData,
fromTs: fromTs,
toTs: toTs,
})
addSnapshotEndMetrics()
insertData.Clean(reader.mp)
if err != nil {
if err = reader.sinker.Error(); err != nil {
return
}
case engine.ChangesHandle_Tail_wip:
Expand All @@ -333,13 +345,11 @@ func (reader *tableReader) readTableWithTxn(

// output sql in a txn
if !hasBegin {
if err = reader.sinker.SendBegin(ctx); err != nil {
return err
}
reader.sinker.SendBegin()
hasBegin = true
}

err = reader.sinker.Sink(ctx, &DecoderOutput{
reader.sinker.Sink(ctx, &DecoderOutput{
outputTyp: OutputTypeTail,
insertAtmBatch: insertAtmBatch,
deleteAtmBatch: deleteAtmBatch,
Expand All @@ -350,7 +360,7 @@ func (reader *tableReader) readTableWithTxn(
addTailEndMetrics(deleteAtmBatch)
insertAtmBatch.Close()
deleteAtmBatch.Close()
if err != nil {
if err = reader.sinker.Error(); err != nil {
return
}

Expand Down
91 changes: 10 additions & 81 deletions pkg/cdc/reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ func Test_tableReader_Run(t *testing.T) {
sinker: tt.fields.sinker,
wMarkUpdater: tt.fields.wMarkUpdater,
tick: tt.fields.tick,
restartFunc: tt.fields.restartFunc,
resetWatermarkFunc: tt.fields.restartFunc,
insTsColIdx: tt.fields.insTsColIdx,
insCompositedPkColIdx: tt.fields.insCompositedPkColIdx,
delTsColIdx: tt.fields.delTsColIdx,
Expand All @@ -235,19 +235,19 @@ func Test_tableReader_Run_StaleRead(t *testing.T) {

// restart success
reader := &tableReader{
tick: time.NewTicker(time.Millisecond * 300),
sinker: NewConsoleSinker(nil, nil),
restartFunc: func(*DbTableInfo) error { return nil },
tick: time.NewTicker(time.Millisecond * 300),
sinker: NewConsoleSinker(nil, nil),
resetWatermarkFunc: func(*DbTableInfo) error { return nil },
}
reader.Run(ctx, NewCdcActiveRoutine())
cancel()

// restart failed
ctx, cancel = context.WithTimeout(context.Background(), time.Second)
reader = &tableReader{
tick: time.NewTicker(time.Millisecond * 300),
sinker: NewConsoleSinker(nil, nil),
restartFunc: func(*DbTableInfo) error { return moerr.NewInternalErrorNoCtx("") },
tick: time.NewTicker(time.Millisecond * 300),
sinker: NewConsoleSinker(nil, nil),
resetWatermarkFunc: func(*DbTableInfo) error { return moerr.NewInternalErrorNoCtx("") },
}
reader.Run(ctx, NewCdcActiveRoutine())
cancel()
Expand All @@ -264,9 +264,9 @@ func Test_tableReader_Run_NonStaleReadErr(t *testing.T) {
defer stub.Reset()

reader := &tableReader{
tick: time.NewTicker(time.Millisecond * 300),
sinker: NewConsoleSinker(nil, nil),
restartFunc: func(*DbTableInfo) error { return nil },
tick: time.NewTicker(time.Millisecond * 300),
sinker: NewConsoleSinker(nil, nil),
resetWatermarkFunc: func(*DbTableInfo) error { return nil },
}
reader.Run(ctx, NewCdcActiveRoutine())
}
Expand Down Expand Up @@ -334,77 +334,6 @@ func Test_tableReader_readTableWithTxn(t *testing.T) {
assert.NoError(t, err)
}

//func Test_tableReader_readTable(t *testing.T) {
// type fields struct {
// cnTxnClient client.TxnClient
// cnEngine engine.Engine
// mp *mpool.MPool
// packerPool *fileservice.Pool[*types.Packer]
// info *DbTableInfo
// sinker Sinker
// wMarkUpdater *WatermarkUpdater
// tick *time.Ticker
// restartFunc func(*DbTableInfo) error
// insTsColIdx int
// insCompositedPkColIdx int
// delTsColIdx int
// delCompositedPkColIdx int
// }
//
// type args struct {
// ctx context.Context
// ar *ActiveRoutine
// }
// tests := []struct {
// name string
// fields fields
// args args
// wantErr assert.ErrorAssertionFunc
// }{
// {
// name: "t1",
// fields: fields{
// packerPool: fileservice.NewPool(
// 128,
// func() *types.Packer {
// return types.NewPacker()
// },
// func(packer *types.Packer) {
// packer.Reset()
// },
// func(packer *types.Packer) {
// packer.Close()
// },
// ),
// },
// args: args{
// ctx: context.Background(),
// ar: NewCdcActiveRoutine(),
// },
// },
// }
// for _, tt := range tests {
// t.Run(tt.name, func(t *testing.T) {
// reader := &tableReader{
// cnTxnClient: tt.fields.cnTxnClient,
// cnEngine: tt.fields.cnEngine,
// mp: tt.fields.mp,
// packerPool: tt.fields.packerPool,
// info: tt.fields.info,
// sinker: tt.fields.sinker,
// wMarkUpdater: tt.fields.wMarkUpdater,
// tick: tt.fields.tick,
// restartFunc: tt.fields.restartFunc,
// insTsColIdx: tt.fields.insTsColIdx,
// insCompositedPkColIdx: tt.fields.insCompositedPkColIdx,
// delTsColIdx: tt.fields.delTsColIdx,
// delCompositedPkColIdx: tt.fields.delCompositedPkColIdx,
// }
// reader.readTable(tt.args.ctx, tt.args.ar)
// })
// }
//}

var _ engine.ChangesHandle = new(testChangesHandle)

const (
Expand Down
Loading
Loading