Skip to content

Commit

Permalink
Merge branch 'main' into moc4369
Browse files Browse the repository at this point in the history
  • Loading branch information
volgariver6 authored Nov 7, 2024
2 parents 16c572f + 0ec2b30 commit dff0c35
Show file tree
Hide file tree
Showing 162 changed files with 17,762 additions and 10,726 deletions.
7 changes: 7 additions & 0 deletions pkg/common/moerr/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ const (
ErrWrongDatetimeSpec uint16 = 20310
ErrUpgrateError uint16 = 20311
ErrInvalidTz uint16 = 20312
ErrUnsupportedDML uint16 = 20313

// Group 4: unexpected state and io errors
ErrInvalidState uint16 = 20400
Expand Down Expand Up @@ -341,6 +342,7 @@ var errorMsgRefer = map[uint16]moErrorMsgItem{
ErrBadFieldError: {ER_BAD_FIELD_ERROR, []string{MySQLDefaultSqlState}, "Unknown column '%s' in '%s'"},
ErrWrongDatetimeSpec: {ER_WRONG_DATETIME_SPEC, []string{MySQLDefaultSqlState}, "wrong date/time format specifier: %s"},
ErrUpgrateError: {ER_UNKNOWN_ERROR, []string{MySQLDefaultSqlState}, "CN upgrade table or view '%s.%s' under tenant '%s:%d' reports error: %s"},
ErrUnsupportedDML: {ER_UNKNOWN_ERROR, []string{MySQLDefaultSqlState}, "unsupported DML: %s"},

// Group 4: unexpected state or file io error
ErrInvalidState: {ER_UNKNOWN_ERROR, []string{MySQLDefaultSqlState}, "invalid state %s"},
Expand Down Expand Up @@ -854,6 +856,11 @@ func NewConstraintViolation(ctx context.Context, msg string) *Error {
return newError(ctx, ErrConstraintViolation, msg)
}

func NewUnsupportedDML(ctx context.Context, msg string, args ...any) *Error {
xmsg := fmt.Sprintf(msg, args...)
return newError(ctx, ErrUnsupportedDML, xmsg)
}

func NewEmptyVector(ctx context.Context) *Error {
return newError(ctx, ErrEmptyVector)
}
Expand Down
95 changes: 71 additions & 24 deletions pkg/container/batch/compact_batchs.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
package batch

import (
"context"

"github.com/matrixorigin/matrixone/pkg/common/mpool"
"github.com/matrixorigin/matrixone/pkg/container/vector"
)
Expand Down Expand Up @@ -49,11 +51,49 @@ func (bats *CompactBatchs) Get(idx int) *Batch {
return bats.batchs[idx]
}

// Push push one batch to CompactBatchs
// CompactBatchs donot obtain ownership of inBatch
// Push append inBatch to CompactBatchs.
// CompactBatchs will obtain ownership of inBatch
func (bats *CompactBatchs) Push(mpool *mpool.MPool, inBatch *Batch) error {
batLen := bats.Length()
var err error

// empty input
if inBatch.rowCount == 0 {
return nil
}

// empty bats
if batLen == 0 {
bats.batchs = append(bats.batchs, inBatch)
return nil
}

// fast path 1
lastBatRowCount := bats.batchs[batLen-1].rowCount
if lastBatRowCount == 0 {
bats.batchs = append(bats.batchs, inBatch)
return nil
}

defer func() {
inBatch.Clean(mpool)
}()

// fast path 2
if lastBatRowCount+inBatch.RowCount() <= DefaultBatchMaxRow {
bats.batchs[batLen-1], err = bats.batchs[batLen-1].Append(context.TODO(), mpool, inBatch)
return err
}

// slow path
return bats.fillData(mpool, inBatch)
}

// Extend extend one batch'data to CompactBatchs
// CompactBatchs donot obtain ownership of inBatch
func (bats *CompactBatchs) Extend(mpool *mpool.MPool, inBatch *Batch) error {
batLen := bats.Length()
var err error
var tmpBat *Batch

// empty input
Expand All @@ -71,12 +111,40 @@ func (bats *CompactBatchs) Push(mpool *mpool.MPool, inBatch *Batch) error {
return nil
}

return bats.fillData(mpool, inBatch)
}

func (bats *CompactBatchs) RowCount() int {
rowCount := 0
for _, bat := range bats.batchs {
rowCount += bat.rowCount
}
return rowCount
}

func (bats *CompactBatchs) Clean(mpool *mpool.MPool) {
for _, bat := range bats.batchs {
bat.Clean(mpool)
}
bats.batchs = nil
}

func (bats *CompactBatchs) TakeBatchs() []*Batch {
batchs := bats.batchs
bats.batchs = nil
return batchs
}

func (bats *CompactBatchs) fillData(mpool *mpool.MPool, inBatch *Batch) error {
batLen := bats.Length()
var tmpBat *Batch
var err error

if len(bats.ufs) == 0 {
for i := 0; i < inBatch.VectorCount(); i++ {
typ := *inBatch.GetVector(int32(i)).GetType()
bats.ufs = append(bats.ufs, vector.GetUnionAllFunction(typ, mpool))
}

}

//fill data
Expand Down Expand Up @@ -124,24 +192,3 @@ func (bats *CompactBatchs) Push(mpool *mpool.MPool, inBatch *Batch) error {

return nil
}

func (bats *CompactBatchs) RowCount() int {
rowCount := 0
for _, bat := range bats.batchs {
rowCount += bat.rowCount
}
return rowCount
}

func (bats *CompactBatchs) Clean(mpool *mpool.MPool) {
for _, bat := range bats.batchs {
bat.Clean(mpool)
}
bats.batchs = nil
}

func (bats *CompactBatchs) TakeBatchs() []*Batch {
batchs := bats.batchs
bats.batchs = nil
return batchs
}
80 changes: 73 additions & 7 deletions pkg/container/batch/compact_batchs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (
"github.com/stretchr/testify/require"
)

func TestCompactBatchsFo(t *testing.T) {
func TestCompactBatchsPush(t *testing.T) {
var err error
var bat1, bat2 *Batch
mp := mpool.MustNewZero()
Expand All @@ -32,7 +32,6 @@ func TestCompactBatchsFo(t *testing.T) {
//empty input
bat1 = NewWithSize(1)
err = bats.Push(mp, bat1)
bat1.Clean(mp)
require.NoError(t, err)
require.Nil(t, bats.Get(0))
bats.Clean(mp)
Expand All @@ -41,7 +40,6 @@ func TestCompactBatchsFo(t *testing.T) {
//simple test
bat1 = makeTestBatch(10, mp)
err = bats.Push(mp, bat1)
bat1.Clean(mp)
require.NoError(t, err)
require.Equal(t, 1, bats.Length())
require.Equal(t, 10, bats.RowCount())
Expand All @@ -55,8 +53,6 @@ func TestCompactBatchsFo(t *testing.T) {
_ = bats.Push(mp, bat1)
err = bats.Push(mp, bat2)
require.NoError(t, err)
bat1.Clean(mp)
bat2.Clean(mp)
require.Equal(t, 1, bats.Length())
require.Equal(t, 20, bats.RowCount())
require.Equal(t, 20, bats.Get(0).rowCount)
Expand All @@ -70,8 +66,6 @@ func TestCompactBatchsFo(t *testing.T) {
_ = bats.Push(mp, bat1)
err = bats.Push(mp, bat2)
require.NoError(t, err)
bat1.Clean(mp)
bat2.Clean(mp)
require.Equal(t, 2, bats.Length())
require.Equal(t, 8195, bats.RowCount())
require.Equal(t, 8192, bats.Get(0).rowCount)
Expand All @@ -86,6 +80,78 @@ func TestCompactBatchsFo(t *testing.T) {
_ = bats.Push(mp, bat1)
err = bats.Push(mp, bat2)
require.NoError(t, err)
require.Equal(t, 3, bats.Length())
require.Equal(t, 8192*2+1+3, bats.RowCount())
require.Equal(t, 8192, bats.Get(0).rowCount)
require.Equal(t, 8192, bats.Get(1).rowCount)
require.Equal(t, 4, bats.Get(2).rowCount)
bats.Clean(mp)
require.Equal(t, int64(0), mp.CurrNB())
}

func TestCompactBatchsExtend(t *testing.T) {
var err error
var bat1, bat2 *Batch
mp := mpool.MustNewZero()
bats := NewCompactBatchs()

//empty input
bat1 = NewWithSize(1)
err = bats.Extend(mp, bat1)
bat1.Clean(mp)
require.NoError(t, err)
require.Nil(t, bats.Get(0))
bats.Clean(mp)
require.Equal(t, int64(0), mp.CurrNB())

//simple test
bat1 = makeTestBatch(10, mp)
err = bats.Extend(mp, bat1)
bat1.Clean(mp)
require.NoError(t, err)
require.Equal(t, 1, bats.Length())
require.Equal(t, 10, bats.RowCount())
require.Equal(t, 10, bats.Get(0).rowCount)
bats.Clean(mp)
require.Equal(t, int64(0), mp.CurrNB())

// bat1.rowCount + bat2.rowCount < DefaultBatchMaxRow
bat1 = makeTestBatch(10, mp)
bat2 = makeTestBatch(10, mp)
_ = bats.Extend(mp, bat1)
err = bats.Extend(mp, bat2)
require.NoError(t, err)
bat1.Clean(mp)
bat2.Clean(mp)
require.Equal(t, 1, bats.Length())
require.Equal(t, 20, bats.RowCount())
require.Equal(t, 20, bats.Get(0).rowCount)
bats.Clean(mp)
require.Equal(t, int64(0), mp.CurrNB())

// bat1.rowCount + bat2.rowCount > DefaultBatchMaxRow
// but bat1.rowCount + bat2.rowCount - DefaultBatchMaxRow < DefaultBatchMaxRow
bat1 = makeTestBatch(3, mp)
bat2 = makeTestBatch(8192, mp)
_ = bats.Extend(mp, bat1)
err = bats.Extend(mp, bat2)
require.NoError(t, err)
bat1.Clean(mp)
bat2.Clean(mp)
require.Equal(t, 2, bats.Length())
require.Equal(t, 8195, bats.RowCount())
require.Equal(t, 8192, bats.Get(0).rowCount)
require.Equal(t, 3, bats.Get(1).rowCount)
bats.Clean(mp)
require.Equal(t, int64(0), mp.CurrNB())

// bat1.rowCount + bat2.rowCount > DefaultBatchMaxRow
// but bat1.rowCount + bat2.rowCount - DefaultBatchMaxRow > DefaultBatchMaxRow
bat1 = makeTestBatch(3, mp)
bat2 = makeTestBatch(8192*2+1, mp)
_ = bats.Extend(mp, bat1)
err = bats.Extend(mp, bat2)
require.NoError(t, err)
bat1.Clean(mp)
bat2.Clean(mp)
require.Equal(t, 3, bats.Length())
Expand Down
Loading

0 comments on commit dff0c35

Please sign in to comment.