Skip to content

Commit

Permalink
Merge branch 'main' into fslist
Browse files Browse the repository at this point in the history
  • Loading branch information
mergify[bot] authored Oct 28, 2024
2 parents f19cb22 + 5931813 commit 568303b
Show file tree
Hide file tree
Showing 110 changed files with 2,792 additions and 1,697 deletions.
30 changes: 30 additions & 0 deletions pkg/bootstrap/versions/v2_0_0/cluster_upgrade_list.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ var clusterUpgEntries = []versions.UpgradeEntry{
upg_rename_system_metrics_metric_120,
upg_create_system_metrics_metric_130,
upg_system_metrics_sql_stmt_cu_comment,
upg_system_stmt_info_add_column_conn_id,
upg_system_stmt_info_add_column_cu,
}

var needUpgradePubSub = false
Expand Down Expand Up @@ -281,3 +283,31 @@ var upg_system_metrics_sql_stmt_cu_comment = versions.UpgradeEntry{
return false, nil
},
}

var upg_system_stmt_info_add_column_conn_id = versions.UpgradeEntry{
Schema: catalog.MO_SYSTEM,
TableName: catalog.MO_STATEMENT,
UpgType: versions.ADD_COLUMN,
UpgSql: `alter table system.statement_info ADD COLUMN connection_id BIGINT DEFAULT '0' comment "connection id", ADD COLUMN cu DOUBLE DEFAULT '0.0' COMMENT 'cu cost';`,
CheckFunc: func(txn executor.TxnExecutor, accountId uint32) (bool, error) {
colInfo, err := versions.CheckTableColumn(txn, accountId, catalog.MO_SYSTEM, catalog.MO_STATEMENT, "connection_id")
if err != nil {
return false, err
}
return colInfo.IsExits, nil
},
}

var upg_system_stmt_info_add_column_cu = versions.UpgradeEntry{
Schema: catalog.MO_SYSTEM,
TableName: catalog.MO_STATEMENT,
UpgType: versions.ADD_COLUMN,
UpgSql: `alter table system.statement_info add column cu DOUBLE DEFAULT '0.0' comment "cu cost"`,
CheckFunc: func(txn executor.TxnExecutor, accountId uint32) (bool, error) {
colInfo, err := versions.CheckTableColumn(txn, accountId, catalog.MO_SYSTEM, catalog.MO_STATEMENT, "cu")
if err != nil {
return false, err
}
return colInfo.IsExits, nil
},
}
2 changes: 2 additions & 0 deletions pkg/bootstrap/versions/v2_0_0/tenant_upgrade_list.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ var tenantUpgEntries = []versions.UpgradeEntry{
upg_mo_retention,
upg_information_schema_columns,
upg_information_schema_schemata,
upg_system_stmt_info_add_column_conn_id,
upg_system_stmt_info_add_column_cu,
}

const viewServerSnapshotUsage = "server_snapshot_usage"
Expand Down
109 changes: 109 additions & 0 deletions pkg/bootstrap/versions/v2_0_0/upgrade_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -373,6 +373,65 @@ func Test_HandleClusterUpgrade_upg_system_metrics_schema(t *testing.T) {
},
)

// test upg_system_stmt_info_add_column_conn_id, case NotExist
runtime.RunTest(
sid,
func(rt runtime.Runtime) {
txnOperator := mock_frontend.NewMockTxnOperator(gomock.NewController(t))
txnOperator.EXPECT().TxnOptions().Return(txn.TxnOptions{CN: sid}).AnyTimes()
executor := MockExecutor_CheckTableColumn_NotExist(txnOperator)
got := upg_system_stmt_info_add_column_conn_id.Upgrade(executor, uint32(0))
require.Equal(t, nil, got)
},
)
// test upg_system_stmt_info_add_column_conn_id, case Exist
runtime.RunTest(
sid,
func(rt runtime.Runtime) {
txnOperator := mock_frontend.NewMockTxnOperator(gomock.NewController(t))
txnOperator.EXPECT().TxnOptions().Return(txn.TxnOptions{CN: sid}).AnyTimes()
executor := MockExecutor_CheckTableColumn_Exist(txnOperator)
got := upg_system_stmt_info_add_column_conn_id.Upgrade(executor, uint32(0))
require.Equal(t, nil, got)
},
)
// test upg_system_stmt_info_add_column_conn_id, case Error
runtime.RunTest(
sid,
func(rt runtime.Runtime) {
txnOperator := mock_frontend.NewMockTxnOperator(gomock.NewController(t))
txnOperator.EXPECT().TxnOptions().Return(txn.TxnOptions{CN: sid}).AnyTimes()
err := moerr.GetOkExpectedEOF()
executor := MockExecutor_CheckTableColumn_Error(txnOperator, err)
got := upg_system_stmt_info_add_column_conn_id.Upgrade(executor, uint32(0))
require.Equal(t, err, got)
},
)

// test upg_system_stmt_info_add_column_cu, case NotExist
runtime.RunTest(
sid,
func(rt runtime.Runtime) {
txnOperator := mock_frontend.NewMockTxnOperator(gomock.NewController(t))
txnOperator.EXPECT().TxnOptions().Return(txn.TxnOptions{CN: sid}).AnyTimes()
executor := MockExecutor_CheckTableColumn_NotExist(txnOperator)
got := upg_system_stmt_info_add_column_cu.Upgrade(executor, uint32(0))
require.Equal(t, nil, got)
},
)
// test upg_system_stmt_info_add_column_cu, case Error
runtime.RunTest(
sid,
func(rt runtime.Runtime) {
txnOperator := mock_frontend.NewMockTxnOperator(gomock.NewController(t))
txnOperator.EXPECT().TxnOptions().Return(txn.TxnOptions{CN: sid}).AnyTimes()
err := moerr.GetOkExpectedEOF()
executor := MockExecutor_CheckTableColumn_Error(txnOperator, err)
got := upg_system_stmt_info_add_column_cu.Upgrade(executor, uint32(0))
require.Equal(t, err, got)
},
)

}

func MockExecutor_CheckTableDefinitionExist(txnOperator *mock_frontend.MockTxnOperator) executor.TxnExecutor {
Expand Down Expand Up @@ -450,3 +509,53 @@ func MockExecutor_CheckTableComment_Error(txnOperator *mock_frontend.MockTxnOper
return executor.Result{}, nil
}, txnOperator)
}

func MockExecutor_CheckTableColumn_NotExist(txnOperator *mock_frontend.MockTxnOperator) executor.TxnExecutor {
return executor.NewMemTxnExecutor(func(sql string) (executor.Result, error) {
return executor.Result{}, nil
}, txnOperator)
}

func MockExecutor_CheckTableColumn_Error(txnOperator *mock_frontend.MockTxnOperator, err error) executor.TxnExecutor {
return executor.NewMemTxnExecutor(func(sql string) (executor.Result, error) {
return executor.Result{}, err
}, txnOperator)
}

func MockExecutor_CheckTableColumn_Exist(txnOperator *mock_frontend.MockTxnOperator) executor.TxnExecutor {
const checkTableColumnKeySql = `att_comment AS COLUMN_COMMENT FROM mo_catalog.mo_columns'`

return executor.NewMemTxnExecutor(func(sql string) (executor.Result, error) {
if strings.Contains(strings.ToLower(sql), strings.ToLower(checkTableColumnKeySql)) {
typs := []types.Type{
types.New(types.T_varchar, 64, 0), // DATA_TYPE
types.New(types.T_varchar, 64, 0), // IS_NULLABLE
types.New(types.T_uint64, 0, 0), // CHARACTER_MAXIMUM_LENGTH
types.New(types.T_uint64, 0, 0), // NUMERIC_PRECISION
types.New(types.T_uint64, 0, 0), // NUMERIC_SCALE
types.New(types.T_uint64, 0, 0), // NUMERIC_SCALE
types.New(types.T_uint32, 0, 0), // ORDINAL_POSITION
types.New(types.T_varchar, 1024, 0), // COLUMN_DEFAULT
types.New(types.T_varchar, 1024, 0), // EXTRA
types.New(types.T_varchar, 1024, 0), // COLUMN_COMMENT
}

memRes := executor.NewMemResult(
typs,
mpool.MustNewZero())
memRes.NewBatch()
executor.AppendStringRows(memRes, 0, []string{"DATA_TYPE_str"})
executor.AppendStringRows(memRes, 1, []string{"YES"}) // or "NO"
executor.AppendFixedRows(memRes, 2, []uint64{0})
executor.AppendFixedRows(memRes, 3, []uint64{0})
executor.AppendFixedRows(memRes, 4, []uint64{0})
executor.AppendFixedRows(memRes, 5, []uint64{0})
executor.AppendFixedRows(memRes, 6, []uint64{0})
executor.AppendStringRows(memRes, 7, []string{"COLUMN_DEFAULT"})
executor.AppendStringRows(memRes, 8, []string{"EXTRA"}) // '' or 'auto_increment'
executor.AppendStringRows(memRes, 9, []string{"COLUMN_COMMENT"})
return memRes.GetResult(), nil
}
return executor.Result{}, nil
}, txnOperator)
}
3 changes: 3 additions & 0 deletions pkg/cnservice/distributed_tae.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,9 @@ func (s *service) initDistributedTAE(
hakeeper,
s.gossipNode.StatsKeyRouter(),
s.cfg.LogtailUpdateWorkerFactor,

disttae.WithCNTransferTxnLifespanThreshold(
s.cfg.Engine.CNTransferTxnLifespanThreshold),
)
pu.StorageEngine = s.storeEngine

Expand Down
2 changes: 2 additions & 0 deletions pkg/cnservice/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,8 @@ type Config struct {
Engine struct {
Type EngineType `toml:"type"`
Logstore options.LogstoreType `toml:"logstore"`

CNTransferTxnLifespanThreshold time.Duration `toml:"cn-transfer-txn-lifespan-threshold"`
}

// parameters for cn-server related buffer.
Expand Down
4 changes: 2 additions & 2 deletions pkg/common/moerr/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,7 @@ var errorMsgRefer = map[uint16]moErrorMsgItem{
// Group 4: unexpected state or file io error
ErrInvalidState: {ER_UNKNOWN_ERROR, []string{MySQLDefaultSqlState}, "invalid state %s"},
ErrLogServiceNotReady: {ER_UNKNOWN_ERROR, []string{MySQLDefaultSqlState}, "log service not ready"},
ErrBadDB: {ER_BAD_DB_ERROR, []string{MySQLDefaultSqlState}, "invalid database %s"},
ErrBadDB: {ER_BAD_DB_ERROR, []string{MySQLDefaultSqlState}, "Unknown database %s"},
ErrNoSuchTable: {ER_NO_SUCH_TABLE, []string{MySQLDefaultSqlState}, "no such table %s.%s"},
ErrNoSuchSequence: {ER_NO_SUCH_TABLE, []string{MySQLDefaultSqlState}, "no such sequence %s.%s"},
ErrEmptyVector: {ER_UNKNOWN_ERROR, []string{MySQLDefaultSqlState}, "empty vector"},
Expand All @@ -357,7 +357,7 @@ var errorMsgRefer = map[uint16]moErrorMsgItem{
ErrShortWrite: {ER_UNKNOWN_ERROR, []string{MySQLDefaultSqlState}, "file %s io short write"},
ErrInvalidWrite: {ER_UNKNOWN_ERROR, []string{MySQLDefaultSqlState}, "file %s io invalid write"},
ErrShortBuffer: {ER_UNKNOWN_ERROR, []string{MySQLDefaultSqlState}, "file %s io short buffer"},
ErrNoDB: {ER_NO_DB_ERROR, []string{MySQLDefaultSqlState}, "not connect to a database"},
ErrNoDB: {ER_NO_DB_ERROR, []string{MySQLDefaultSqlState}, "No database selected"},
ErrNoWorkingStore: {ER_UNKNOWN_ERROR, []string{MySQLDefaultSqlState}, "no working store"},
ErrNoHAKeeper: {ER_UNKNOWN_ERROR, []string{MySQLDefaultSqlState}, "cannot locate ha keeper"},
ErrInvalidTruncateLsn: {ER_UNKNOWN_ERROR, []string{MySQLDefaultSqlState}, "invalid truncate lsn, shard %d already truncated to %d"},
Expand Down
24 changes: 14 additions & 10 deletions pkg/fileservice/fifocache/data_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,17 +59,21 @@ func (d *DataCache) Capacity() int64 {
func (d *DataCache) DeletePaths(ctx context.Context, paths []string) {
for _, path := range paths {
for i := 0; i < len(d.fifo.shards); i++ {
shard := &d.fifo.shards[i]
shard.Lock()
for key, item := range shard.values {
if key.Path == path {
delete(shard.values, key)
if d.fifo.postEvict != nil {
d.fifo.postEvict(item.key, item.value)
}
}
d.deletePath(i, path)
}
}
}

func (d *DataCache) deletePath(shardIndex int, path string) {
shard := &d.fifo.shards[shardIndex]
shard.Lock()
defer shard.Unlock()
for key, item := range shard.values {
if key.Path == path {
delete(shard.values, key)
if d.fifo.postEvict != nil {
d.fifo.postEvict(item.key, item.value)
}
shard.Unlock()
}
}
}
Expand Down
51 changes: 27 additions & 24 deletions pkg/fileservice/fifocache/fifo.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,14 +102,14 @@ func New[K comparable, V any](
return ret
}

func (c *Cache[K, V]) Set(key K, value V, size int64) {
func (c *Cache[K, V]) set(key K, value V, size int64) *_CacheItem[K, V] {
shard := &c.shards[c.keyShardFunc(key)]
shard.Lock()
defer shard.Unlock()
_, ok := shard.values[key]
if ok {
// existed
shard.Unlock()
return
return nil
}

item := &_CacheItem[K, V]{
Expand All @@ -121,14 +121,19 @@ func (c *Cache[K, V]) Set(key K, value V, size int64) {
if c.postSet != nil {
c.postSet(key, value)
}
shard.Unlock()

c.queueLock.Lock()
defer c.queueLock.Unlock()
c.queue1.enqueue(item)
c.used1 += size
if c.used1+c.used2 > c.capacity() {
c.evict(nil, 0)
return item
}

func (c *Cache[K, V]) Set(key K, value V, size int64) {
if item := c.set(key, value, size); item != nil {
c.queueLock.Lock()
defer c.queueLock.Unlock()
c.queue1.enqueue(item)
c.used1 += size
if c.used1+c.used2 > c.capacity() {
c.evict(nil, 0)
}
}
}

Expand Down Expand Up @@ -205,19 +210,23 @@ func (c *Cache[K, V]) evict1() {
c.used2 += item.size
} else {
// evict
shard := &c.shards[c.keyShardFunc(item.key)]
shard.Lock()
delete(shard.values, item.key)
if c.postEvict != nil {
c.postEvict(item.key, item.value)
}
shard.Unlock()
c.deleteItem(item)
c.used1 -= item.size
return
}
}
}

func (c *Cache[K, V]) deleteItem(item *_CacheItem[K, V]) {
shard := &c.shards[c.keyShardFunc(item.key)]
shard.Lock()
defer shard.Unlock()
delete(shard.values, item.key)
if c.postEvict != nil {
c.postEvict(item.key, item.value)
}
}

func (c *Cache[K, V]) evict2() {
// queue 2
for {
Expand All @@ -232,13 +241,7 @@ func (c *Cache[K, V]) evict2() {
item.dec()
} else {
// evict
shard := &c.shards[c.keyShardFunc(item.key)]
shard.Lock()
delete(shard.values, item.key)
if c.postEvict != nil {
c.postEvict(item.key, item.value)
}
shard.Unlock()
c.deleteItem(item)
c.used2 -= item.size
return
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/frontend/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import (
"time"

"github.com/google/uuid"
"go.uber.org/zap"

"github.com/matrixorigin/matrixone/pkg/catalog"
cdc2 "github.com/matrixorigin/matrixone/pkg/cdc"
"github.com/matrixorigin/matrixone/pkg/common/moerr"
Expand All @@ -39,7 +41,6 @@ import (
"github.com/matrixorigin/matrixone/pkg/txn/client"
ie "github.com/matrixorigin/matrixone/pkg/util/internalExecutor"
"github.com/matrixorigin/matrixone/pkg/vm/engine"
"go.uber.org/zap"
)

const (
Expand Down
6 changes: 4 additions & 2 deletions pkg/frontend/mysql_cmd_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,7 @@ var RecordStatement = func(ctx context.Context, ses *Session, proc *process.Proc
requestAt = time.Now()
}

stm.ConnectionId = ses.GetConnectionID()
stm.Account = tenant.GetTenant()
stm.RoleId = proc.GetSessionInfo().RoleId
stm.User = tenant.GetUser()
Expand Down Expand Up @@ -3607,18 +3608,19 @@ func (h *marshalPlanHandler) Stats(ctx context.Context, ses FeSession) (statsByt
int64(statsInfo.PlanStage.PlanDuration) +
int64(statsInfo.CompileStage.CompileDuration) +
statsInfo.PrepareRunStage.ScopePrepareDuration +
statsInfo.PrepareRunStage.CompilePreRunOnceDuration -
statsInfo.PrepareRunStage.CompilePreRunOnceDuration - statsInfo.PrepareRunStage.CompilePreRunOnceWaitLock -
(statsInfo.IOAccessTimeConsumption + statsInfo.S3FSPrefetchFileIOMergerTimeConsumption)

if totalTime < 0 {
ses.Infof(ctx, "negative cpu statement_id:%s, statement_type:%s, statsInfo:[Parse(%d)+BuildPlan(%d)+Compile(%d)+PhyExec(%d)+PrepareRun(%d)-IOAccess(%d)-IOMerge(%d) = %d]",
ses.Infof(ctx, "negative cpu statement_id:%s, statement_type:%s, statsInfo:[Parse(%d)+BuildPlan(%d)+Compile(%d)+PhyExec(%d)+PrepareRun(%d)-PreRunWaitLock(%d)-IOAccess(%d)-IOMerge(%d) = %d]",
uuid.UUID(h.stmt.StatementID).String(),
h.stmt.StatementType,
statsInfo.ParseStage.ParseDuration,
statsInfo.PlanStage.PlanDuration,
statsInfo.CompileStage.CompileDuration,
operatorTimeConsumed,
statsInfo.PrepareRunStage.ScopePrepareDuration+statsInfo.PrepareRunStage.CompilePreRunOnceDuration,
statsInfo.PrepareRunStage.CompilePreRunOnceWaitLock,
statsInfo.IOAccessTimeConsumption,
statsInfo.S3FSPrefetchFileIOMergerTimeConsumption,
totalTime,
Expand Down
4 changes: 2 additions & 2 deletions pkg/frontend/publication_subscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ func createPublication(ctx context.Context, bh BackgroundExec, cp *tree.CreatePu
dbName := string(cp.Database)
comment := cp.Comment
if _, ok := sysDatabases[dbName]; ok {
err = moerr.NewInternalErrorf(ctx, "invalid database name '%s', not support publishing system database", dbName)
err = moerr.NewInternalErrorf(ctx, "Unknown database name '%s', not support publishing system database", dbName)
return
}

Expand Down Expand Up @@ -424,7 +424,7 @@ func doAlterPublication(ctx context.Context, ses *Session, ap *tree.AlterPublica
if ap.DbName != "" {
dbName = ap.DbName
if _, ok := sysDatabases[dbName]; ok {
return moerr.NewInternalErrorf(ctx, "invalid database name '%s', not support publishing system database", dbName)
return moerr.NewInternalErrorf(ctx, "Unknown database name '%s', not support publishing system database", dbName)
}

if dbId, dbType, err = getDbIdAndType(ctx, bh, dbName); err != nil {
Expand Down
Loading

0 comments on commit 568303b

Please sign in to comment.