diff --git a/pkg/cdc/reader.go b/pkg/cdc/reader.go index 6b6c49ac20bd..a3f5204b566c 100644 --- a/pkg/cdc/reader.go +++ b/pkg/cdc/reader.go @@ -41,7 +41,7 @@ type tableReader struct { sinker Sinker wMarkUpdater *WatermarkUpdater tick *time.Ticker - restartFunc func(*DbTableInfo) error + resetWatermarkFunc func(*DbTableInfo) error initSnapshotSplitTxn bool tableDef *plan.TableDef @@ -58,7 +58,7 @@ func NewTableReader( sinker Sinker, wMarkUpdater *WatermarkUpdater, tableDef *plan.TableDef, - restartFunc func(*DbTableInfo) error, + resetWatermarkFunc func(*DbTableInfo) error, initSnapshotSplitTxn bool, ) Reader { reader := &tableReader{ @@ -70,7 +70,7 @@ func NewTableReader( sinker: sinker, wMarkUpdater: wMarkUpdater, tick: time.NewTicker(200 * time.Millisecond), - restartFunc: restartFunc, + resetWatermarkFunc: resetWatermarkFunc, initSnapshotSplitTxn: initSnapshotSplitTxn, tableDef: tableDef, } @@ -112,19 +112,22 @@ func (reader *tableReader) Run( } if err := reader.readTable(ctx, ar); err != nil { - logutil.Errorf("cdc tableReader(%v) failed, err: %v\n", reader.info, err) + logutil.Errorf("cdc tableReader(%v) failed, err: %v", reader.info, err) // if stale read, try to restart reader if moerr.IsMoErrCode(err, moerr.ErrStaleRead) { - if err = reader.restartFunc(reader.info); err != nil { - logutil.Errorf("cdc tableReader(%v) restart failed, err: %v\n", reader.info, err) + // reset sinker + reader.sinker.Reset() + // reset watermark + if err = reader.resetWatermarkFunc(reader.info); err != nil { + logutil.Errorf("cdc tableReader(%v) restart failed, err: %v", reader.info, err) return } - logutil.Errorf("cdc tableReader(%v) restart successfully\n", reader.info) + logutil.Errorf("cdc tableReader(%v) restart successfully", reader.info) continue } - logutil.Errorf("cdc tableReader(%v) err is not stale read, quit\n", reader.info) + logutil.Errorf("cdc tableReader(%v) err is not stale read, quit", reader.info) return } } @@ -247,9 +250,15 @@ func (reader *tableReader) readTableWithTxn( defer func() { if hasBegin { if err == nil { - err = reader.sinker.SendCommit(ctx) - } else { - _ = reader.sinker.SendRollback(ctx) + // error may can't be caught immediately, but must be caught when next call + reader.sinker.SendCommit() + // so send a dummy sql to guarantee previous commit is sent successfully + reader.sinker.SendDummy() + err = reader.sinker.Error() + } + + if err != nil { + reader.sinker.SendRollback() } } @@ -269,8 +278,12 @@ func (reader *tableReader) readTableWithTxn( return default: } - v2.CdcMpoolInUseBytesGauge.Set(float64(reader.mp.Stats().NumCurrBytes.Load())) + // check sinker error of last round + if err = reader.sinker.Error(); err != nil { + return + } + v2.CdcMpoolInUseBytesGauge.Set(float64(reader.mp.Stats().NumCurrBytes.Load())) start = time.Now() insertData, deleteData, curHint, err = changes.Next(ctx, reader.mp) v2.CdcReadDurationHistogram.Observe(time.Since(start).Seconds()) @@ -281,11 +294,12 @@ func (reader *tableReader) readTableWithTxn( // both nil denote no more data (end of this tail) if insertData == nil && deleteData == nil { // heartbeat - err = reader.sinker.Sink(ctx, &DecoderOutput{ + reader.sinker.Sink(ctx, &DecoderOutput{ noMoreData: true, fromTs: fromTs, toTs: toTs, }) + err = reader.sinker.Error() return } @@ -295,14 +309,12 @@ func (reader *tableReader) readTableWithTxn( case engine.ChangesHandle_Snapshot: // output sql in a txn if !hasBegin && !reader.initSnapshotSplitTxn { - if err = reader.sinker.SendBegin(ctx); err != nil { - return err - } + reader.sinker.SendBegin() hasBegin = true } // transform into insert instantly - err = reader.sinker.Sink(ctx, &DecoderOutput{ + reader.sinker.Sink(ctx, &DecoderOutput{ outputTyp: OutputTypeSnapshot, checkpointBat: insertData, fromTs: fromTs, @@ -310,7 +322,7 @@ func (reader *tableReader) readTableWithTxn( }) addSnapshotEndMetrics() insertData.Clean(reader.mp) - if err != nil { + if err = reader.sinker.Error(); err != nil { return } case engine.ChangesHandle_Tail_wip: @@ -333,13 +345,11 @@ func (reader *tableReader) readTableWithTxn( // output sql in a txn if !hasBegin { - if err = reader.sinker.SendBegin(ctx); err != nil { - return err - } + reader.sinker.SendBegin() hasBegin = true } - err = reader.sinker.Sink(ctx, &DecoderOutput{ + reader.sinker.Sink(ctx, &DecoderOutput{ outputTyp: OutputTypeTail, insertAtmBatch: insertAtmBatch, deleteAtmBatch: deleteAtmBatch, @@ -350,7 +360,7 @@ func (reader *tableReader) readTableWithTxn( addTailEndMetrics(deleteAtmBatch) insertAtmBatch.Close() deleteAtmBatch.Close() - if err != nil { + if err = reader.sinker.Error(); err != nil { return } diff --git a/pkg/cdc/reader_test.go b/pkg/cdc/reader_test.go index a27cb09b26fe..d9a681cf7c8a 100644 --- a/pkg/cdc/reader_test.go +++ b/pkg/cdc/reader_test.go @@ -213,7 +213,7 @@ func Test_tableReader_Run(t *testing.T) { sinker: tt.fields.sinker, wMarkUpdater: tt.fields.wMarkUpdater, tick: tt.fields.tick, - restartFunc: tt.fields.restartFunc, + resetWatermarkFunc: tt.fields.restartFunc, insTsColIdx: tt.fields.insTsColIdx, insCompositedPkColIdx: tt.fields.insCompositedPkColIdx, delTsColIdx: tt.fields.delTsColIdx, @@ -235,9 +235,9 @@ func Test_tableReader_Run_StaleRead(t *testing.T) { // restart success reader := &tableReader{ - tick: time.NewTicker(time.Millisecond * 300), - sinker: NewConsoleSinker(nil, nil), - restartFunc: func(*DbTableInfo) error { return nil }, + tick: time.NewTicker(time.Millisecond * 300), + sinker: NewConsoleSinker(nil, nil), + resetWatermarkFunc: func(*DbTableInfo) error { return nil }, } reader.Run(ctx, NewCdcActiveRoutine()) cancel() @@ -245,9 +245,9 @@ func Test_tableReader_Run_StaleRead(t *testing.T) { // restart failed ctx, cancel = context.WithTimeout(context.Background(), time.Second) reader = &tableReader{ - tick: time.NewTicker(time.Millisecond * 300), - sinker: NewConsoleSinker(nil, nil), - restartFunc: func(*DbTableInfo) error { return moerr.NewInternalErrorNoCtx("") }, + tick: time.NewTicker(time.Millisecond * 300), + sinker: NewConsoleSinker(nil, nil), + resetWatermarkFunc: func(*DbTableInfo) error { return moerr.NewInternalErrorNoCtx("") }, } reader.Run(ctx, NewCdcActiveRoutine()) cancel() @@ -264,9 +264,9 @@ func Test_tableReader_Run_NonStaleReadErr(t *testing.T) { defer stub.Reset() reader := &tableReader{ - tick: time.NewTicker(time.Millisecond * 300), - sinker: NewConsoleSinker(nil, nil), - restartFunc: func(*DbTableInfo) error { return nil }, + tick: time.NewTicker(time.Millisecond * 300), + sinker: NewConsoleSinker(nil, nil), + resetWatermarkFunc: func(*DbTableInfo) error { return nil }, } reader.Run(ctx, NewCdcActiveRoutine()) } @@ -334,77 +334,6 @@ func Test_tableReader_readTableWithTxn(t *testing.T) { assert.NoError(t, err) } -//func Test_tableReader_readTable(t *testing.T) { -// type fields struct { -// cnTxnClient client.TxnClient -// cnEngine engine.Engine -// mp *mpool.MPool -// packerPool *fileservice.Pool[*types.Packer] -// info *DbTableInfo -// sinker Sinker -// wMarkUpdater *WatermarkUpdater -// tick *time.Ticker -// restartFunc func(*DbTableInfo) error -// insTsColIdx int -// insCompositedPkColIdx int -// delTsColIdx int -// delCompositedPkColIdx int -// } -// -// type args struct { -// ctx context.Context -// ar *ActiveRoutine -// } -// tests := []struct { -// name string -// fields fields -// args args -// wantErr assert.ErrorAssertionFunc -// }{ -// { -// name: "t1", -// fields: fields{ -// packerPool: fileservice.NewPool( -// 128, -// func() *types.Packer { -// return types.NewPacker() -// }, -// func(packer *types.Packer) { -// packer.Reset() -// }, -// func(packer *types.Packer) { -// packer.Close() -// }, -// ), -// }, -// args: args{ -// ctx: context.Background(), -// ar: NewCdcActiveRoutine(), -// }, -// }, -// } -// for _, tt := range tests { -// t.Run(tt.name, func(t *testing.T) { -// reader := &tableReader{ -// cnTxnClient: tt.fields.cnTxnClient, -// cnEngine: tt.fields.cnEngine, -// mp: tt.fields.mp, -// packerPool: tt.fields.packerPool, -// info: tt.fields.info, -// sinker: tt.fields.sinker, -// wMarkUpdater: tt.fields.wMarkUpdater, -// tick: tt.fields.tick, -// restartFunc: tt.fields.restartFunc, -// insTsColIdx: tt.fields.insTsColIdx, -// insCompositedPkColIdx: tt.fields.insCompositedPkColIdx, -// delTsColIdx: tt.fields.delTsColIdx, -// delCompositedPkColIdx: tt.fields.delCompositedPkColIdx, -// } -// reader.readTable(tt.args.ctx, tt.args.ar) -// }) -// } -//} - var _ engine.ChangesHandle = new(testChangesHandle) const ( diff --git a/pkg/cdc/sinker.go b/pkg/cdc/sinker.go index da10efcda19f..074f1c2bb40e 100644 --- a/pkg/cdc/sinker.go +++ b/pkg/cdc/sinker.go @@ -19,6 +19,7 @@ import ( "database/sql" "fmt" "strings" + "sync/atomic" "time" "github.com/matrixorigin/matrixone/pkg/catalog" @@ -81,7 +82,9 @@ func NewConsoleSinker( } } -func (s *consoleSinker) Sink(ctx context.Context, data *DecoderOutput) error { +func (s *consoleSinker) Run(_ context.Context, _ *ActiveRoutine) {} + +func (s *consoleSinker) Sink(ctx context.Context, data *DecoderOutput) { logutil.Info("====console sinker====") logutil.Infof("output type %s", data.outputTyp) @@ -110,22 +113,22 @@ func (s *consoleSinker) Sink(ctx context.Context, data *DecoderOutput) error { iter.Close() } } - - return nil } -func (s *consoleSinker) SendBegin(_ context.Context) error { - return nil -} +func (s *consoleSinker) SendBegin() {} -func (s *consoleSinker) SendCommit(_ context.Context) error { - return nil -} +func (s *consoleSinker) SendCommit() {} + +func (s *consoleSinker) SendRollback() {} + +func (s *consoleSinker) SendDummy() {} -func (s *consoleSinker) SendRollback(_ context.Context) error { +func (s *consoleSinker) Error() error { return nil } +func (s *consoleSinker) Reset() {} + func (s *consoleSinker) Close() {} var _ Sinker = new(mysqlSinker) @@ -136,10 +139,14 @@ type mysqlSinker struct { watermarkUpdater *WatermarkUpdater ar *ActiveRoutine - // buffers, allocate only once maxAllowedPacket uint64 + // buf of sql statement - sqlBuf []byte + sqlBufs [2][]byte + curBufIdx int + sqlBuf []byte + sqlBufSendCh chan []byte + // buf of row data from batch, e.g. values part of insert statement (insert into xx values (a), (b), (c)) // or `where ... in ... ` part of delete statement (delete from xx where pk in ((a), (b), (c))) rowBuf []byte @@ -161,6 +168,8 @@ type mysqlSinker struct { preRowType RowType // the length of all completed sql statement in sqlBuf preSqlBufLen int + + err atomic.Value } var NewMysqlSinker = func( @@ -179,8 +188,13 @@ var NewMysqlSinker = func( } _ = mysql.(*mysqlSink).conn.QueryRow("SELECT @@max_allowed_packet").Scan(&s.maxAllowedPacket) - // buf - s.sqlBuf = make([]byte, 0, s.maxAllowedPacket) + // sqlBuf + s.sqlBufs[0] = make([]byte, 0, s.maxAllowedPacket) + s.sqlBufs[1] = make([]byte, 0, s.maxAllowedPacket) + s.curBufIdx = 0 + s.sqlBuf = s.sqlBufs[s.curBufIdx] + s.sqlBufSendCh = make(chan []byte) + s.rowBuf = make([]byte, 0, 1024) // prefix @@ -218,13 +232,65 @@ var NewMysqlSinker = func( // pre s.preRowType = NoOp s.preSqlBufLen = 0 + + // err + s.err = atomic.Value{} return s } -func (s *mysqlSinker) Sink(ctx context.Context, data *DecoderOutput) (err error) { +func (s *mysqlSinker) Run(ctx context.Context, ar *ActiveRoutine) { + logutil.Infof("cdc mysqlSinker(%v).Run: start", s.dbTblInfo) + defer func() { + logutil.Infof("cdc mysqlSinker(%v).Run: end", s.dbTblInfo) + }() + + for sqlBuf := range s.sqlBufSendCh { + // already have error, skip + if s.err.Load() != nil { + continue + } + + // dummy sql + if len(sqlBuf) == 0 { + continue + } + + if sql := util.UnsafeBytesToString(sqlBuf); sql == "begin" { + if err := s.mysql.SendBegin(ctx, ar); err != nil { + logutil.Errorf("cdc mysqlSinker(%v) SendBegin, err: %v", s.dbTblInfo, err) + // record error + s.err.Store(err) + continue + } + } else if sql == "commit" { + if err := s.mysql.SendCommit(ctx, ar); err != nil { + logutil.Errorf("cdc mysqlSinker(%v) SendCommit, err: %v", s.dbTblInfo, err) + // record error + s.err.Store(err) + continue + } + } else if sql == "rollback" { + if err := s.mysql.SendRollback(ctx, ar); err != nil { + logutil.Errorf("cdc mysqlSinker(%v) SendRollback, err: %v", s.dbTblInfo, err) + // record error + s.err.Store(err) + continue + } + } else { + if err := s.mysql.Send(ctx, ar, sql); err != nil { + logutil.Errorf("cdc mysqlSinker(%v) send sql failed, err: %v", s.dbTblInfo, err) + // record error + s.err.Store(err) + continue + } + } + } +} + +func (s *mysqlSinker) Sink(ctx context.Context, data *DecoderOutput) { watermark := s.watermarkUpdater.GetFromMem(s.dbTblInfo.SourceTblIdStr) if data.toTs.LE(&watermark) { - logutil.Errorf("cdc task mysqlSinker(%v): unexpected watermark: %s, current watermark: %s", + logutil.Errorf("cdc mysqlSinker(%v): unexpected watermark: %s, current watermark: %s", s.dbTblInfo, data.toTs.ToString(), watermark.ToString()) return } @@ -242,9 +308,9 @@ func (s *mysqlSinker) Sink(ctx context.Context, data *DecoderOutput) (err error) // output the left sql if s.preSqlBufLen != 0 { - if err = s.mysql.Send(ctx, s.ar, util.UnsafeBytesToString(s.sqlBuf)); err != nil { - return - } + s.sqlBufSendCh <- s.sqlBuf + s.curBufIdx ^= 1 + s.sqlBuf = s.sqlBufs[s.curBufIdx] } // reset status @@ -268,26 +334,69 @@ func (s *mysqlSinker) Sink(ctx context.Context, data *DecoderOutput) (err error) s.tsDeletePrefix = append(s.tsDeletePrefix, s.deletePrefix...) if data.outputTyp == OutputTypeSnapshot { - return s.sinkSnapshot(ctx, data.checkpointBat) + s.sinkSnapshot(ctx, data.checkpointBat) } else if data.outputTyp == OutputTypeTail { - return s.sinkTail(ctx, data.insertAtmBatch, data.deleteAtmBatch) + s.sinkTail(ctx, data.insertAtmBatch, data.deleteAtmBatch) + } else { + s.err.Store(moerr.NewInternalError(ctx, fmt.Sprintf("cdc mysqlSinker unexpected output type: %v", data.outputTyp))) } - return } -func (s *mysqlSinker) SendBegin(ctx context.Context) error { - return s.mysql.SendBegin(ctx) +func (s *mysqlSinker) SendBegin() { + s.sqlBufSendCh <- []byte("begin") } -func (s *mysqlSinker) SendCommit(ctx context.Context) error { - return s.mysql.SendCommit(ctx) +func (s *mysqlSinker) SendCommit() { + s.sqlBufSendCh <- []byte("commit") +} + +func (s *mysqlSinker) SendRollback() { + s.sqlBufSendCh <- []byte("rollback") +} + +func (s *mysqlSinker) SendDummy() { + s.sqlBufSendCh <- []byte("") +} + +func (s *mysqlSinker) Error() error { + if val := s.err.Load(); val == nil { + return nil + } else { + return val.(error) + } } -func (s *mysqlSinker) SendRollback(ctx context.Context) error { - return s.mysql.SendRollback(ctx) +func (s *mysqlSinker) Reset() { + s.sqlBufs[0] = s.sqlBufs[0][:0] + s.sqlBufs[1] = s.sqlBufs[1][:0] + s.curBufIdx = 0 + s.sqlBuf = s.sqlBufs[s.curBufIdx] + s.preRowType = NoOp + s.preSqlBufLen = 0 + s.err = atomic.Value{} } -func (s *mysqlSinker) sinkSnapshot(ctx context.Context, bat *batch.Batch) (err error) { +func (s *mysqlSinker) Close() { + // stop Run goroutine + close(s.sqlBufSendCh) + s.mysql.Close() + s.sqlBufs[0] = nil + s.sqlBufs[1] = nil + s.sqlBuf = nil + s.rowBuf = nil + s.insertPrefix = nil + s.deletePrefix = nil + s.tsInsertPrefix = nil + s.tsDeletePrefix = nil + s.insertTypes = nil + s.deleteTypes = nil + s.insertRow = nil + s.deleteRow = nil +} + +func (s *mysqlSinker) sinkSnapshot(ctx context.Context, bat *batch.Batch) { + var err error + // if last row is not insert row, means this is the first snapshot batch if s.preRowType != InsertRow { s.sqlBuf = append(s.sqlBuf[:0], s.tsInsertPrefix...) @@ -297,25 +406,29 @@ func (s *mysqlSinker) sinkSnapshot(ctx context.Context, bat *batch.Batch) (err e for i := 0; i < batchRowCount(bat); i++ { // step1: get row from the batch if err = extractRowFromEveryVector(ctx, bat, i, s.insertRow); err != nil { + s.err.Store(err) return } // step2: transform rows into sql parts if err = s.getInsertRowBuf(ctx); err != nil { + s.err.Store(err) return } // step3: append to sqlBuf, send sql if sqlBuf is full if err = s.appendSqlBuf(ctx, InsertRow); err != nil { + s.err.Store(err) return } } - return } // insertBatch and deleteBatch is sorted by ts // for the same ts, delete first, then insert -func (s *mysqlSinker) sinkTail(ctx context.Context, insertBatch, deleteBatch *AtomicBatch) (err error) { +func (s *mysqlSinker) sinkTail(ctx context.Context, insertBatch, deleteBatch *AtomicBatch) { + var err error + insertIter := insertBatch.GetRowIterator().(*atomicBatchRowIter) deleteIter := deleteBatch.GetRowIterator().(*atomicBatchRowIter) defer func() { @@ -330,12 +443,14 @@ func (s *mysqlSinker) sinkTail(ctx context.Context, insertBatch, deleteBatch *At // compare ts, ignore pk if insertItem.Ts.LT(&deleteItem.Ts) { if err = s.sinkInsert(ctx, insertIter); err != nil { + s.err.Store(err) return } // get next item insertIterHasNext = insertIter.Next() } else { if err = s.sinkDelete(ctx, deleteIter); err != nil { + s.err.Store(err) return } // get next item @@ -346,6 +461,7 @@ func (s *mysqlSinker) sinkTail(ctx context.Context, insertBatch, deleteBatch *At // output the rest of insert iterator for insertIterHasNext { if err = s.sinkInsert(ctx, insertIter); err != nil { + s.err.Store(err) return } // get next item @@ -355,12 +471,12 @@ func (s *mysqlSinker) sinkTail(ctx context.Context, insertBatch, deleteBatch *At // output the rest of delete iterator for deleteIterHasNext { if err = s.sinkDelete(ctx, deleteIter); err != nil { + s.err.Store(err) return } // get next item deleteIterHasNext = deleteIter.Next() } - return } func (s *mysqlSinker) sinkInsert(ctx context.Context, insertIter *atomicBatchRowIter) (err error) { @@ -434,9 +550,9 @@ func (s *mysqlSinker) appendSqlBuf(ctx context.Context, rowType RowType) (err er } // send it to downstream - if err = s.mysql.Send(ctx, s.ar, util.UnsafeBytesToString(s.sqlBuf)); err != nil { - return - } + s.sqlBufSendCh <- s.sqlBuf + s.curBufIdx ^= 1 + s.sqlBuf = s.sqlBufs[s.curBufIdx] // reset s.sqlBuf s.preSqlBufLen = 0 @@ -512,19 +628,7 @@ func (s *mysqlSinker) getDeleteRowBuf(ctx context.Context) (err error) { return } -func (s *mysqlSinker) Close() { - s.mysql.Close() - s.sqlBuf = nil - s.rowBuf = nil - s.insertPrefix = nil - s.deletePrefix = nil - s.tsInsertPrefix = nil - s.tsDeletePrefix = nil - s.insertTypes = nil - s.deleteTypes = nil - s.insertRow = nil - s.deleteRow = nil -} +var _ Sink = new(mysqlSink) type mysqlSink struct { conn *sql.DB @@ -555,12 +659,62 @@ var NewMysqlSink = func( return ret, err } +func (s *mysqlSink) Send(ctx context.Context, ar *ActiveRoutine, sql string) error { + return s.retry(ctx, ar, func() (err error) { + if s.tx != nil { + _, err = s.tx.Exec(sql) + } else { + _, err = s.conn.Exec(sql) + } + + if err != nil { + logutil.Errorf("cdc mysqlSink Send failed, err: %v, sql: %s", err, sql[:min(len(sql), sqlPrintLen)]) + } + //logutil.Errorf("----cdc mysqlSink send sql----, success, sql: %s", sql) + return + }) +} + +func (s *mysqlSink) SendBegin(ctx context.Context, ar *ActiveRoutine) (err error) { + return s.retry(ctx, ar, func() (err error) { + s.tx, err = s.conn.BeginTx(ctx, nil) + return err + }) +} + +func (s *mysqlSink) SendCommit(ctx context.Context, ar *ActiveRoutine) error { + defer func() { + s.tx = nil + }() + + return s.retry(ctx, ar, func() error { + return s.tx.Commit() + }) +} + +func (s *mysqlSink) SendRollback(ctx context.Context, ar *ActiveRoutine) error { + defer func() { + s.tx = nil + }() + + return s.retry(ctx, ar, func() error { + return s.tx.Rollback() + }) +} + +func (s *mysqlSink) Close() { + if s.conn != nil { + _ = s.conn.Close() + s.conn = nil + } +} + func (s *mysqlSink) connect() (err error) { s.conn, err = openDbConn(s.user, s.password, s.ip, s.port) return err } -func (s *mysqlSink) Send(ctx context.Context, ar *ActiveRoutine, sql string) (err error) { +func (s *mysqlSink) retry(ctx context.Context, ar *ActiveRoutine, fn func() error) (err error) { needRetry := func(retry int, startTime time.Time) bool { // retryTimes == -1 means retry forever // do not exceed retryTimes and retryDuration @@ -578,51 +732,19 @@ func (s *mysqlSink) Send(ctx context.Context, ar *ActiveRoutine, sql string) (er } start := time.Now() - if s.tx != nil { - _, err = s.tx.Exec(sql) - } else { - _, err = s.conn.Exec(sql) - } + err = fn() v2.CdcSendSqlDurationHistogram.Observe(time.Since(start).Seconds()) + // return if success if err == nil { - //logutil.Errorf("----cdc task mysqlSink send sql----, success, sql: %s", sql) return } - logutil.Errorf("----cdc task mysqlSink send sql----, failed, err: %v, sql: %s", err, sql[:min(len(sql), sqlPrintLen)]) + logutil.Errorf("cdc mysqlSink retry failed, err: %v", err) v2.CdcMysqlSinkErrorCounter.Inc() time.Sleep(time.Second) } - return moerr.NewInternalError(ctx, "cdc task mysqlSink retry exceed retryTimes or retryDuration") -} - -func (s *mysqlSink) SendBegin(ctx context.Context) (err error) { - s.tx, err = s.conn.BeginTx(ctx, nil) - return -} - -func (s *mysqlSink) SendCommit(_ context.Context) (err error) { - defer func() { - s.tx = nil - }() - - return s.tx.Commit() -} - -func (s *mysqlSink) SendRollback(_ context.Context) (err error) { - defer func() { - s.tx = nil - }() - - return s.tx.Rollback() -} - -func (s *mysqlSink) Close() { - if s.conn != nil { - _ = s.conn.Close() - s.conn = nil - } + return moerr.NewInternalError(ctx, "cdc mysqlSink retry exceed retryTimes or retryDuration") } //type matrixoneSink struct { diff --git a/pkg/cdc/sinker_test.go b/pkg/cdc/sinker_test.go index a685a933912e..5eb4a0431074 100644 --- a/pkg/cdc/sinker_test.go +++ b/pkg/cdc/sinker_test.go @@ -190,7 +190,7 @@ func Test_consoleSinker_Sink(t *testing.T) { dbTblInfo: tt.fields.dbTblInfo, watermarkUpdater: tt.fields.watermarkUpdater, } - tt.wantErr(t, s.Sink(tt.args.ctx, tt.args.data), fmt.Sprintf("Sink(%v, %v)", tt.args.ctx, tt.args.data)) + s.Sink(tt.args.ctx, tt.args.data) }) } } @@ -324,6 +324,8 @@ func TestNewMysqlSinker(t *testing.T) { } func Test_mysqlSinker_appendSqlBuf(t *testing.T) { + ctx := context.Background() + tsInsertPrefix := "/* tsInsertPrefix */REPLACE INTO `db`.`table` VALUES " tsDeletePrefix := "/* tsDeletePrefix */DELETE FROM `db`.`table` WHERE a IN (" @@ -331,7 +333,6 @@ func Test_mysqlSinker_appendSqlBuf(t *testing.T) { assert.NoError(t, err) mock.ExpectExec(".*").WillReturnResult(sqlmock.NewResult(1, 1)) mock.ExpectExec(".*").WillReturnResult(sqlmock.NewResult(1, 1)) - sink := &mysqlSink{ user: "root", password: "123456", @@ -342,24 +343,35 @@ func Test_mysqlSinker_appendSqlBuf(t *testing.T) { conn: db, } + ar := NewCdcActiveRoutine() s := &mysqlSinker{ mysql: sink, - sqlBuf: make([]byte, 0, len(tsDeletePrefix)+len("delete")+SqlBufReserved), tsInsertPrefix: []byte(tsInsertPrefix), tsDeletePrefix: []byte(tsDeletePrefix), preRowType: NoOp, - ar: NewCdcActiveRoutine(), - } + ar: ar, + sqlBufSendCh: make(chan []byte), + } + s.sqlBufs[0] = make([]byte, 0, len(tsDeletePrefix)+len("delete")+SqlBufReserved) + s.sqlBufs[1] = make([]byte, 0, len(tsDeletePrefix)+len("delete")+SqlBufReserved) + s.curBufIdx = 0 + s.sqlBuf = s.sqlBufs[s.curBufIdx] + go s.Run(ctx, ar) + defer func() { + // call dummy to guarantee sqls has been sent, then close + s.SendDummy() + s.Close() + }() // test insert s.sqlBuf = append(s.sqlBuf[:0], s.tsInsertPrefix...) s.rowBuf = []byte("insert") // not exceed cap - err = s.appendSqlBuf(context.Background(), InsertRow) + err = s.appendSqlBuf(ctx, InsertRow) assert.NoError(t, err) assert.Equal(t, []byte(tsInsertPrefix+"insert"), s.sqlBuf) // exceed cap - err = s.appendSqlBuf(context.Background(), InsertRow) + err = s.appendSqlBuf(ctx, InsertRow) assert.NoError(t, err) assert.Equal(t, []byte(tsInsertPrefix+"insert"), s.sqlBuf) @@ -367,11 +379,11 @@ func Test_mysqlSinker_appendSqlBuf(t *testing.T) { s.sqlBuf = append(s.sqlBuf[:0], s.tsDeletePrefix...) s.rowBuf = []byte("delete") // not exceed cap - err = s.appendSqlBuf(context.Background(), DeleteRow) + err = s.appendSqlBuf(ctx, DeleteRow) assert.NoError(t, err) assert.Equal(t, []byte(tsDeletePrefix+"delete"), s.sqlBuf) // exceed cap - err = s.appendSqlBuf(context.Background(), DeleteRow) + err = s.appendSqlBuf(ctx, DeleteRow) assert.NoError(t, err) assert.Equal(t, []byte(tsDeletePrefix+"delete"), s.sqlBuf) } @@ -424,6 +436,7 @@ func Test_mysqlSinker_getInsertRowBuf(t *testing.T) { } func Test_mysqlSinker_Sink(t *testing.T) { + ctx := context.Background() t0 := types.BuildTS(0, 1) t1 := types.BuildTS(1, 1) t2 := types.BuildTS(2, 1) @@ -472,7 +485,15 @@ func Test_mysqlSinker_Sink(t *testing.T) { }, } - sinker := NewMysqlSinker(sink, dbTblInfo, watermarkUpdater, tableDef, NewCdcActiveRoutine()) + ar := NewCdcActiveRoutine() + + s := NewMysqlSinker(sink, dbTblInfo, watermarkUpdater, tableDef, ar) + go s.Run(ctx, ar) + defer func() { + // call dummy to guarantee sqls has been sent, then close + s.SendDummy() + s.Close() + }() packerPool := fileservice.NewPool( 128, @@ -496,14 +517,14 @@ func Test_mysqlSinker_Sink(t *testing.T) { ckpBat.Vecs[1] = testutil.MakeInt32Vector([]int32{1, 2, 3}, nil) ckpBat.SetRowCount(3) - err = sinker.Sink(context.Background(), &DecoderOutput{ + s.Sink(ctx, &DecoderOutput{ outputTyp: OutputTypeSnapshot, fromTs: t0, toTs: t1, checkpointBat: ckpBat, }) assert.NoError(t, err) - err = sinker.Sink(context.Background(), &DecoderOutput{ + s.Sink(ctx, &DecoderOutput{ noMoreData: true, fromTs: t0, toTs: t1, @@ -525,7 +546,7 @@ func Test_mysqlSinker_Sink(t *testing.T) { deleteBat.SetRowCount(1) deleteAtomicBat.Append(packer, deleteBat, 1, 0) - err = sinker.Sink(context.Background(), &DecoderOutput{ + s.Sink(ctx, &DecoderOutput{ outputTyp: OutputTypeTail, fromTs: t1, toTs: t2, @@ -534,7 +555,7 @@ func Test_mysqlSinker_Sink(t *testing.T) { }) assert.NoError(t, err) - err = sinker.Sink(context.Background(), &DecoderOutput{ + s.Sink(ctx, &DecoderOutput{ outputTyp: OutputTypeTail, fromTs: t1, toTs: t2, @@ -543,7 +564,7 @@ func Test_mysqlSinker_Sink(t *testing.T) { }) assert.NoError(t, err) - err = sinker.Sink(context.Background(), &DecoderOutput{ + s.Sink(ctx, &DecoderOutput{ outputTyp: OutputTypeTail, fromTs: t1, toTs: t2, @@ -552,7 +573,7 @@ func Test_mysqlSinker_Sink(t *testing.T) { }) assert.NoError(t, err) - err = sinker.Sink(context.Background(), &DecoderOutput{ + s.Sink(ctx, &DecoderOutput{ noMoreData: true, fromTs: t1, toTs: t2, @@ -561,6 +582,8 @@ func Test_mysqlSinker_Sink(t *testing.T) { } func Test_mysqlSinker_Sink_NoMoreData(t *testing.T) { + ctx := context.Background() + db, mock, err := sqlmock.New() assert.NoError(t, err) mock.ExpectExec(".*").WillReturnError(moerr.NewInternalErrorNoCtx("")) @@ -574,7 +597,9 @@ func Test_mysqlSinker_Sink_NoMoreData(t *testing.T) { } watermarkUpdater.UpdateMem("1_0", types.BuildTS(0, 1)) - sinker := &mysqlSinker{ + ar := NewCdcActiveRoutine() + + s := &mysqlSinker{ mysql: &mysqlSink{ user: "root", password: "123456", @@ -584,17 +609,30 @@ func Test_mysqlSinker_Sink_NoMoreData(t *testing.T) { retryDuration: 3 * time.Second, conn: db, }, - ar: NewCdcActiveRoutine(), + ar: ar, dbTblInfo: dbTblInfo, watermarkUpdater: watermarkUpdater, - sqlBuf: make([]byte, 1024), preRowType: DeleteRow, } - - err = sinker.Sink(context.Background(), &DecoderOutput{ + s.sqlBufs[0] = make([]byte, 128, 1024) + s.sqlBufs[1] = make([]byte, 0, 1024) + s.curBufIdx = 0 + s.sqlBuf = s.sqlBufs[s.curBufIdx] + s.preSqlBufLen = 128 + s.sqlBufSendCh = make(chan []byte) + go s.Run(ctx, ar) + defer func() { + // call dummy to guarantee sqls has been sent, then close + s.SendDummy() + s.Close() + }() + + s.Sink(ctx, &DecoderOutput{ noMoreData: true, toTs: types.BuildTS(1, 1), }) + s.SendDummy() + err = s.Error() assert.Error(t, err) } @@ -620,8 +658,7 @@ func Test_mysqlSinker_sinkSnapshot(t *testing.T) { insertBat := batch.New([]string{"a", "ts"}) insertBat.Vecs[0] = testutil.MakeUint64Vector([]uint64{1}, nil) insertBat.Vecs[1] = testutil.MakeTSVector([]types.TS{types.BuildTS(0, 1)}, nil) - err = sinker.sinkSnapshot(context.Background(), insertBat) - assert.NoError(t, err) + sinker.sinkSnapshot(context.Background(), insertBat) } func Test_mysqlSinker_sinkDelete(t *testing.T) { @@ -745,8 +782,7 @@ func Test_mysqlsink(t *testing.T) { LogicalTime: 100, } sink.watermarkUpdater.watermarkMap.Store(uint64(0), types.TimestampToTS(tts)) - err := sink.Sink(context.Background(), &DecoderOutput{}) - assert.NoError(t, err) + sink.Sink(context.Background(), &DecoderOutput{}) } func Test_mysqlSinker_sinkTail(t *testing.T) { @@ -792,8 +828,7 @@ func Test_mysqlSinker_sinkTail(t *testing.T) { insertAtomicBat.Append(packer, insertBat, 1, 0) deleteAtomicBat := NewAtomicBatch(testutil.TestUtilMp) - err = sinker.sinkTail(context.Background(), insertAtomicBat, deleteAtomicBat) - assert.NoError(t, err) + sinker.sinkTail(context.Background(), insertAtomicBat, deleteAtomicBat) } func Test_consoleSinker_Close(t *testing.T) { @@ -834,7 +869,8 @@ func Test_mysqlSinker_Close(t *testing.T) { } sinker := &mysqlSinker{ - mysql: sink, + mysql: sink, + sqlBufSendCh: make(chan []byte), } sinker.Close() @@ -848,33 +884,37 @@ func Test_mysqlSinker_SendBeginCommitRollback(t *testing.T) { mock.ExpectBegin() mock.ExpectRollback() - ctx := context.Background() - sinker := &mysqlSinker{ + ar := NewCdcActiveRoutine() + s := &mysqlSinker{ mysql: &mysqlSink{ retryTimes: 3, retryDuration: 3 * time.Second, conn: db, }, - ar: NewCdcActiveRoutine(), + ar: ar, + sqlBufSendCh: make(chan []byte), } - err = sinker.SendBegin(ctx) + go s.Run(context.Background(), ar) + defer func() { + // call dummy to guarantee sqls has been sent, then close + s.SendDummy() + s.Close() + }() + + s.SendBegin() assert.NoError(t, err) - err = sinker.SendCommit(ctx) + s.SendCommit() assert.NoError(t, err) - err = sinker.SendBegin(ctx) + s.SendBegin() assert.NoError(t, err) - err = sinker.SendRollback(ctx) + s.SendRollback() assert.NoError(t, err) } func Test_consoleSinker_SendBeginCommitRollback(t *testing.T) { - ctx := context.Background() s := &consoleSinker{} - err := s.SendBegin(ctx) - assert.NoError(t, err) - err = s.SendCommit(ctx) - assert.NoError(t, err) - err = s.SendRollback(ctx) - assert.NoError(t, err) + s.SendBegin() + s.SendCommit() + s.SendRollback() } diff --git a/pkg/cdc/types.go b/pkg/cdc/types.go index 7e3a4826bc1d..75fcde5cddc5 100644 --- a/pkg/cdc/types.go +++ b/pkg/cdc/types.go @@ -60,19 +60,24 @@ type Reader interface { // Sinker manages and drains the sql parts type Sinker interface { - Sink(ctx context.Context, data *DecoderOutput) error - SendBegin(ctx context.Context) error - SendCommit(ctx context.Context) error - SendRollback(ctx context.Context) error + Run(ctx context.Context, ar *ActiveRoutine) + Sink(ctx context.Context, data *DecoderOutput) + SendBegin() + SendCommit() + SendRollback() + SendDummy() + // Error must be called after Sink + Error() error + Reset() Close() } // Sink represents the destination mysql or matrixone type Sink interface { Send(ctx context.Context, ar *ActiveRoutine, sql string) error - SendBegin(ctx context.Context) error - SendCommit(ctx context.Context) error - SendRollback(ctx context.Context) error + SendBegin(ctx context.Context, ar *ActiveRoutine) error + SendCommit(ctx context.Context, ar *ActiveRoutine) error + SendRollback(ctx context.Context, ar *ActiveRoutine) error Close() } diff --git a/pkg/frontend/cdc.go b/pkg/frontend/cdc.go index 241b0272fff3..087245ee5b6a 100644 --- a/pkg/frontend/cdc.go +++ b/pkg/frontend/cdc.go @@ -1410,6 +1410,7 @@ func (cdc *CdcTask) addExecPipelineForTable(info *cdc2.DbTableInfo, txnOp client if err != nil { return err } + go sinker.Run(ctx, cdc.activeRoutine) // make reader reader := cdc2.NewTableReader(