Skip to content

Commit

Permalink
Merge branch '2.0-dev' into debug_backup
Browse files Browse the repository at this point in the history
  • Loading branch information
mergify[bot] authored Nov 5, 2024
2 parents 0514711 + cc03497 commit 3fe0223
Show file tree
Hide file tree
Showing 14 changed files with 680 additions and 185 deletions.
5 changes: 2 additions & 3 deletions pkg/cdc/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,8 +172,6 @@ func (reader *tableReader) readTableWithTxn(
txnOp client.TxnOperator,
packer *types.Packer,
ar *ActiveRoutine) (err error) {
v2.CdcMpoolInUseBytesGauge.Set(float64(reader.mp.Stats().NumCurrBytes.Load()))

var rel engine.Relation
var changes engine.ChangesHandle
//step1 : get relation
Expand Down Expand Up @@ -249,7 +247,7 @@ func (reader *tableReader) readTableWithTxn(
defer func() {
if hasBegin {
if err == nil {
_ = reader.sinker.SendCommit(ctx)
err = reader.sinker.SendCommit(ctx)
} else {
_ = reader.sinker.SendRollback(ctx)
}
Expand All @@ -271,6 +269,7 @@ func (reader *tableReader) readTableWithTxn(
return
default:
}
v2.CdcMpoolInUseBytesGauge.Set(float64(reader.mp.Stats().NumCurrBytes.Load()))

start = time.Now()
insertData, deleteData, curHint, err = changes.Next(ctx, reader.mp)
Expand Down
12 changes: 6 additions & 6 deletions pkg/cdc/sinker.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ type mysqlSinker struct {
// buf of sql statement
sqlBuf []byte
// buf of row data from batch, e.g. values part of insert statement (insert into xx values (a), (b), (c))
// or where ... in part of delete statement (delete from xx where pk in ((a), (b), (c)))
// or `where ... in ... ` part of delete statement (delete from xx where pk in ((a), (b), (c)))
rowBuf []byte
insertPrefix []byte
deletePrefix []byte
Expand Down Expand Up @@ -224,8 +224,8 @@ var NewMysqlSinker = func(
func (s *mysqlSinker) Sink(ctx context.Context, data *DecoderOutput) (err error) {
watermark := s.watermarkUpdater.GetFromMem(s.dbTblInfo.SourceTblIdStr)
if data.toTs.LE(&watermark) {
logutil.Errorf("^^^^^ Sinker: unexpected watermark: %s, current watermark: %s",
data.toTs.ToString(), watermark.ToString())
logutil.Errorf("cdc task mysqlSinker(%v): unexpected watermark: %s, current watermark: %s",
s.dbTblInfo, data.toTs.ToString(), watermark.ToString())
return
}

Expand Down Expand Up @@ -586,15 +586,15 @@ func (s *mysqlSink) Send(ctx context.Context, ar *ActiveRoutine, sql string) (er
v2.CdcSendSqlDurationHistogram.Observe(time.Since(start).Seconds())
// return if success
if err == nil {
//logutil.Errorf("----mysql send sql----, success, sql: %s", sql)
//logutil.Errorf("----cdc task mysqlSink send sql----, success, sql: %s", sql)
return
}

logutil.Errorf("----mysql send sql----, failed, err: %v, sql: %s", err, sql[:min(len(sql), sqlPrintLen)])
logutil.Errorf("----cdc task mysqlSink send sql----, failed, err: %v, sql: %s", err, sql[:min(len(sql), sqlPrintLen)])
v2.CdcMysqlSinkErrorCounter.Inc()
time.Sleep(time.Second)
}
return moerr.NewInternalError(ctx, "mysql sink retry exceed retryTimes or retryDuration")
return moerr.NewInternalError(ctx, "cdc task mysqlSink retry exceed retryTimes or retryDuration")
}

func (s *mysqlSink) SendBegin(ctx context.Context) (err error) {
Expand Down
4 changes: 2 additions & 2 deletions pkg/cdc/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -536,7 +536,7 @@ var openDbConn = func(
user, password string,
ip string,
port int) (db *sql.DB, err error) {
dsn := fmt.Sprintf("%s:%s@tcp(%s:%d)/?multiStatements=true",
dsn := fmt.Sprintf("%s:%s@tcp(%s:%d)/?readTimeout=10m&timeout=10m&writeTimeout=10m&multiStatements=true",
user,
password,
ip,
Expand All @@ -549,7 +549,7 @@ var openDbConn = func(
v2.CdcMysqlConnErrorCounter.Inc()
time.Sleep(time.Second)
}
logutil.Error("^^^^^ openDbConn failed")
logutil.Error("cdc task openDbConn failed")
return
}

Expand Down
6 changes: 5 additions & 1 deletion pkg/sql/colexec/window/window.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,11 @@ func (window *Window) Call(proc *process.Process) (vm.CallResult, error) {
return result, err
}
if result.Batch == nil {
ctr.status = eval
if ctr.bat != nil {
ctr.status = eval
} else {
ctr.status = done
}
break
}
ctr.bat, err = ctr.bat.AppendWithCopy(proc.Ctx, proc.Mp(), result.Batch)
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/compile/alter.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ func (s *Scope) AlterTableCopy(c *Compile) error {
for _, multiTableIndex := range multiTableIndexes {
switch multiTableIndex.IndexAlgo {
case catalog.MoIndexIvfFlatAlgo.ToString():
err = s.handleVectorIvfFlatIndex(c, multiTableIndex.IndexDefs, qry.Database, newTableDef, nil)
err = s.handleVectorIvfFlatIndex(c, dbSource, multiTableIndex.IndexDefs, qry.Database, newTableDef, nil)
}
if err != nil {
c.proc.Error(c.proc.Ctx, "invoke reindex for the new table for alter table",
Expand Down
86 changes: 62 additions & 24 deletions pkg/sql/compile/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -565,16 +565,16 @@ func (s *Scope) AlterTableInplace(c *Compile) error {

if indexDef.Unique {
// 1. Unique Index related logic
err = s.handleUniqueIndexTable(c, indexDef, qry.Database, tableDef, indexInfo)
err = s.handleUniqueIndexTable(c, dbSource, indexDef, qry.Database, tableDef, indexInfo)
} else if !indexDef.Unique && catalog.IsRegularIndexAlgo(indexDef.IndexAlgo) {
// 2. Regular Secondary index
err = s.handleRegularSecondaryIndexTable(c, indexDef, qry.Database, tableDef, indexInfo)
err = s.handleRegularSecondaryIndexTable(c, dbSource, indexDef, qry.Database, tableDef, indexInfo)
} else if !indexDef.Unique && catalog.IsMasterIndexAlgo(indexDef.IndexAlgo) {
// 3. Master index
err = s.handleMasterIndexTable(c, indexDef, qry.Database, tableDef, indexInfo)
err = s.handleMasterIndexTable(c, dbSource, indexDef, qry.Database, tableDef, indexInfo)
} else if !indexDef.Unique && catalog.IsFullTextIndexAlgo(indexDef.IndexAlgo) {
// 3. FullText index
err = s.handleFullTextIndexTable(c, indexDef, qry.Database, tableDef, indexInfo)
err = s.handleFullTextIndexTable(c, dbSource, indexDef, qry.Database, tableDef, indexInfo)
} else if !indexDef.Unique && catalog.IsIvfIndexAlgo(indexDef.IndexAlgo) {
// 4. IVF indexDefs are aggregated and handled later
if _, ok := multiTableIndexes[indexDef.IndexName]; !ok {
Expand All @@ -593,7 +593,7 @@ func (s *Scope) AlterTableInplace(c *Compile) error {
for _, multiTableIndex := range multiTableIndexes {
switch multiTableIndex.IndexAlgo { // no need for catalog.ToLower() here
case catalog.MoIndexIvfFlatAlgo.ToString():
err = s.handleVectorIvfFlatIndex(c, multiTableIndex.IndexDefs, qry.Database, tableDef, indexInfo)
err = s.handleVectorIvfFlatIndex(c, dbSource, multiTableIndex.IndexDefs, qry.Database, tableDef, indexInfo)
}

if err != nil {
Expand Down Expand Up @@ -701,7 +701,7 @@ func (s *Scope) AlterTableInplace(c *Compile) error {
for _, multiTableIndex := range multiTableIndexes {
switch multiTableIndex.IndexAlgo {
case catalog.MoIndexIvfFlatAlgo.ToString():
err = s.handleVectorIvfFlatIndex(c, multiTableIndex.IndexDefs, qry.Database, tableDef, nil)
err = s.handleVectorIvfFlatIndex(c, dbSource, multiTableIndex.IndexDefs, qry.Database, tableDef, nil)
}

if err != nil {
Expand Down Expand Up @@ -1668,13 +1668,13 @@ func (s *Scope) CreateIndex(c *Compile) error {
}
}

d, err := c.e.Database(c.proc.Ctx, qry.Database, c.proc.GetTxnOperator())
dbSource, err := c.e.Database(c.proc.Ctx, qry.Database, c.proc.GetTxnOperator())
if err != nil {
return err
}
databaseId := d.GetDatabaseId(c.proc.Ctx)
databaseId := dbSource.GetDatabaseId(c.proc.Ctx)

r, err := d.Relation(c.proc.Ctx, qry.Table, nil)
r, err := dbSource.Relation(c.proc.Ctx, qry.Table, nil)
if err != nil {
return err
}
Expand All @@ -1685,6 +1685,7 @@ func (s *Scope) CreateIndex(c *Compile) error {
indexInfo := qry.GetIndex() // IndexInfo is named same as planner's IndexInfo
indexTableDef := indexInfo.GetTableDef()

// In MySQL, the `CREATE INDEX` syntax can only create one index instance at a time
// indexName -> meta -> indexDef[0]
// -> centroids -> indexDef[1]
// -> entries -> indexDef[2]
Expand All @@ -1694,13 +1695,15 @@ func (s *Scope) CreateIndex(c *Compile) error {
indexAlgo := indexDef.IndexAlgo
if indexDef.Unique {
// 1. Unique Index related logic
err = s.handleUniqueIndexTable(c, indexDef, qry.Database, originalTableDef, indexInfo)
//err = s.handleUniqueIndexTable(c, indexDef, qry.Database, originalTableDef, indexInfo)
err = s.handleUniqueIndexTable(c, dbSource, indexDef, qry.Database, originalTableDef, indexInfo)
} else if !indexDef.Unique && catalog.IsRegularIndexAlgo(indexAlgo) {
// 2. Regular Secondary index
err = s.handleRegularSecondaryIndexTable(c, indexDef, qry.Database, originalTableDef, indexInfo)
//err = s.handleRegularSecondaryIndexTable(c, indexDef, qry.Database, originalTableDef, indexInfo)
err = s.handleRegularSecondaryIndexTable(c, dbSource, indexDef, qry.Database, originalTableDef, indexInfo)
} else if !indexDef.Unique && catalog.IsMasterIndexAlgo(indexAlgo) {
// 3. Master index
err = s.handleMasterIndexTable(c, indexDef, qry.Database, originalTableDef, indexInfo)
err = s.handleMasterIndexTable(c, dbSource, indexDef, qry.Database, originalTableDef, indexInfo)
} else if !indexDef.Unique && catalog.IsIvfIndexAlgo(indexAlgo) {
// 4. IVF indexDefs are aggregated and handled later
if _, ok := multiTableIndexes[indexDef.IndexName]; !ok {
Expand All @@ -1712,7 +1715,7 @@ func (s *Scope) CreateIndex(c *Compile) error {
multiTableIndexes[indexDef.IndexName].IndexDefs[catalog.ToLower(indexDef.IndexAlgoTableType)] = indexDef
} else if !indexDef.Unique && catalog.IsFullTextIndexAlgo(indexAlgo) {
// 5. FullText index
err = s.handleFullTextIndexTable(c, indexDef, qry.Database, originalTableDef, indexInfo)
err = s.handleFullTextIndexTable(c, dbSource, indexDef, qry.Database, originalTableDef, indexInfo)
}
if err != nil {
return err
Expand All @@ -1722,7 +1725,7 @@ func (s *Scope) CreateIndex(c *Compile) error {
for _, multiTableIndex := range multiTableIndexes {
switch multiTableIndex.IndexAlgo {
case catalog.MoIndexIvfFlatAlgo.ToString():
err = s.handleVectorIvfFlatIndex(c, multiTableIndex.IndexDefs, qry.Database, originalTableDef, indexInfo)
err = s.handleVectorIvfFlatIndex(c, dbSource, multiTableIndex.IndexDefs, qry.Database, originalTableDef, indexInfo)
}

if err != nil {
Expand Down Expand Up @@ -1764,7 +1767,49 @@ func (s *Scope) CreateIndex(c *Compile) error {
return nil
}

func (s *Scope) handleVectorIvfFlatIndex(c *Compile, indexDefs map[string]*plan.IndexDef, qryDatabase string, originalTableDef *plan.TableDef, indexInfo *plan.CreateTable) error {
// indexTableBuild is used to build the index table corresponding to the index
// It converts the column definitions and execution definitions into plan, and then create the table in target database.
func indexTableBuild(c *Compile, def *plan.TableDef, dbSource engine.Database) error {
planCols := def.GetCols()
exeCols := planColsToExeCols(planCols)
exeDefs, err := planDefsToExeDefs(def)
if err != nil {
c.proc.Info(c.proc.Ctx, "createTable",
zap.String("databaseName", c.db),
zap.String("tableName", def.GetName()),
zap.Error(err),
)
return err
}
if _, err = dbSource.Relation(c.proc.Ctx, def.Name, nil); err == nil {
c.proc.Info(c.proc.Ctx, "createTable",
zap.String("databaseName", c.db),
zap.String("tableName", def.GetName()),
zap.Error(err),
)
return moerr.NewTableAlreadyExists(c.proc.Ctx, def.Name)
}
if err = dbSource.Create(c.proc.Ctx, def.Name, append(exeCols, exeDefs...)); err != nil {
c.proc.Info(c.proc.Ctx, "createTable",
zap.String("databaseName", c.db),
zap.String("tableName", def.GetName()),
zap.Error(err),
)
return err
}

err = maybeCreateAutoIncrement(
c.proc.Ctx,
c.proc.GetService(),
dbSource,
def,
c.proc.GetTxnOperator(),
nil,
)
return err
}

func (s *Scope) handleVectorIvfFlatIndex(c *Compile, dbSource engine.Database, indexDefs map[string]*plan.IndexDef, qryDatabase string, originalTableDef *plan.TableDef, indexInfo *plan.CreateTable) error {
if ok, err := s.isExperimentalEnabled(c, ivfFlatIndexFlag); err != nil {
return err
} else if !ok {
Expand All @@ -1780,15 +1825,8 @@ func (s *Scope) handleVectorIvfFlatIndex(c *Compile, indexDefs map[string]*plan.

// 2. create hidden tables
if indexInfo != nil {

tables := make([]string, 3)
tables[0] = genCreateIndexTableSqlForIvfIndex(indexInfo.GetIndexTables()[0], indexDefs[catalog.SystemSI_IVFFLAT_TblType_Metadata], qryDatabase)
tables[1] = genCreateIndexTableSqlForIvfIndex(indexInfo.GetIndexTables()[1], indexDefs[catalog.SystemSI_IVFFLAT_TblType_Centroids], qryDatabase)
tables[2] = genCreateIndexTableSqlForIvfIndex(indexInfo.GetIndexTables()[2], indexDefs[catalog.SystemSI_IVFFLAT_TblType_Entries], qryDatabase)

for _, createTableSql := range tables {
err := c.runSql(createTableSql)
if err != nil {
for _, table := range indexInfo.GetIndexTables() {
if err := indexTableBuild(c, table, dbSource); err != nil {
return err
}
}
Expand Down
54 changes: 28 additions & 26 deletions pkg/sql/compile/ddl_index_algo.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,61 +23,64 @@ import (
"github.com/matrixorigin/matrixone/pkg/container/vector"
"github.com/matrixorigin/matrixone/pkg/pb/plan"
"github.com/matrixorigin/matrixone/pkg/util/executor"
"github.com/matrixorigin/matrixone/pkg/vm/engine"
)

const (
ivfFlatIndexFlag = "experimental_ivf_index"
fulltextIndexFlag = "experimental_fulltext_index"
)

func (s *Scope) handleUniqueIndexTable(c *Compile,
func (s *Scope) handleUniqueIndexTable(c *Compile, dbSource engine.Database,
indexDef *plan.IndexDef, qryDatabase string,
originalTableDef *plan.TableDef, indexInfo *plan.CreateTable) error {
if len(indexInfo.GetIndexTables()) != 1 {
return moerr.NewInternalErrorNoCtx("index table count not equal to 1")
}

def := indexInfo.GetIndexTables()[0]
if err := indexTableBuild(c, def, dbSource); err != nil {
return err
}
// the logic of detecting whether the unique constraint is violated does not need to be done separately,
// it will be processed when inserting into the hidden table.

return s.createAndInsertForUniqueOrRegularIndexTable(c, indexDef, qryDatabase, originalTableDef, indexInfo)
}

func (s *Scope) handleRegularSecondaryIndexTable(c *Compile,
indexDef *plan.IndexDef, qryDatabase string,
originalTableDef *plan.TableDef, indexInfo *plan.CreateTable) error {

return s.createAndInsertForUniqueOrRegularIndexTable(c, indexDef, qryDatabase, originalTableDef, indexInfo)
}

func (s *Scope) createAndInsertForUniqueOrRegularIndexTable(c *Compile, indexDef *plan.IndexDef,
qryDatabase string, originalTableDef *plan.TableDef, indexInfo *plan.CreateTable) error {
insertSQL := genInsertIndexTableSql(originalTableDef, indexDef, qryDatabase, indexDef.Unique)
err := c.runSql(insertSQL)
if err != nil {
return err
}
return nil
}

func (s *Scope) handleRegularSecondaryIndexTable(c *Compile, dbSource engine.Database,
indexDef *plan.IndexDef, qryDatabase string,
originalTableDef *plan.TableDef, indexInfo *plan.CreateTable) error {

if len(indexInfo.GetIndexTables()) != 1 {
return moerr.NewInternalErrorNoCtx("index table count not equal to 1")
}

def := indexInfo.GetIndexTables()[0]
createSQL := genCreateIndexTableSql(def, indexDef, qryDatabase)
err := c.runSql(createSQL)
if err != nil {
if err := indexTableBuild(c, def, dbSource); err != nil {
return err
}

insertSQL := genInsertIndexTableSql(originalTableDef, indexDef, qryDatabase, indexDef.Unique)
err = c.runSql(insertSQL)
if err != nil {
return err
}
return nil
return s.createAndInsertForUniqueOrRegularIndexTable(c, indexDef, qryDatabase, originalTableDef, indexInfo)
}
func (s *Scope) handleMasterIndexTable(c *Compile, indexDef *plan.IndexDef, qryDatabase string,
originalTableDef *plan.TableDef, indexInfo *plan.CreateTable) error {

func (s *Scope) handleMasterIndexTable(c *Compile, dbSource engine.Database,
indexDef *plan.IndexDef, qryDatabase string, originalTableDef *plan.TableDef, indexInfo *plan.CreateTable) error {
if len(indexInfo.GetIndexTables()) != 1 {
return moerr.NewInternalErrorNoCtx("index table count not equal to 1")
}

def := indexInfo.GetIndexTables()[0]
createSQL := genCreateIndexTableSql(def, indexDef, qryDatabase)
err := c.runSql(createSQL)
err := indexTableBuild(c, def, dbSource)
if err != nil {
return err
}
Expand All @@ -92,8 +95,8 @@ func (s *Scope) handleMasterIndexTable(c *Compile, indexDef *plan.IndexDef, qryD
return nil
}

func (s *Scope) handleFullTextIndexTable(c *Compile, indexDef *plan.IndexDef, qryDatabase string,
originalTableDef *plan.TableDef, indexInfo *plan.CreateTable) error {
func (s *Scope) handleFullTextIndexTable(c *Compile, dbSource engine.Database, indexDef *plan.IndexDef,
qryDatabase string, originalTableDef *plan.TableDef, indexInfo *plan.CreateTable) error {
if ok, err := s.isExperimentalEnabled(c, fulltextIndexFlag); err != nil {
return err
} else if !ok {
Expand All @@ -104,8 +107,7 @@ func (s *Scope) handleFullTextIndexTable(c *Compile, indexDef *plan.IndexDef, qr
}

def := indexInfo.GetIndexTables()[0]
createSQL := genCreateIndexTableSqlForFullTextIndex(def, indexDef, qryDatabase)
err := c.runSql(createSQL)
err := indexTableBuild(c, def, dbSource)
if err != nil {
return err
}
Expand Down
Loading

0 comments on commit 3fe0223

Please sign in to comment.