Skip to content

Commit

Permalink
Merge branch '2.0-dev' into password-management
Browse files Browse the repository at this point in the history
  • Loading branch information
mergify[bot] authored Nov 5, 2024
2 parents 9413cf8 + 7dad3b1 commit 65e6d24
Show file tree
Hide file tree
Showing 8 changed files with 129 additions and 22 deletions.
20 changes: 12 additions & 8 deletions pkg/common/malloc/fixed_size_mmap_allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,11 +80,9 @@ func NewFixedSizeMmapAllocator(
func(hints Hints, args *fixedSizeMmapDeallocatorArgs) {

if hints&DoNotReuse > 0 {
if err := unix.Munmap(
checkMunmap(unix.Munmap(
unsafe.Slice((*byte)(args.ptr), size),
); err != nil {
panic(err)
}
), hints)
return
}

Expand All @@ -104,11 +102,9 @@ func NewFixedSizeMmapAllocator(

default:
// unmap
if err := unix.Munmap(
checkMunmap(unix.Munmap(
unsafe.Slice((*byte)(args.ptr), size),
); err != nil {
panic(err)
}
), hints)

}

Expand Down Expand Up @@ -164,3 +160,11 @@ func (f *fixedSizeMmapAllocator) Allocate(hints Hints, clearSize uint64) (slice
length: f.size,
}), nil
}

func checkMunmap(err error, hints Hints) {
if err != nil {
if hints&IgnoreMunmapError == 0 {
panic(err)
}
}
}
36 changes: 36 additions & 0 deletions pkg/common/malloc/fixed_size_mmap_allocator_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
// Copyright 2024 Matrix Origin
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package malloc

import (
"io"
"testing"
)

func TestCheckMunmap(t *testing.T) {
checkMunmap(io.EOF, IgnoreMunmapError)
func() {
defer func() {
p := recover()
if p == nil {
t.Fatal("should panic")
}
if p != io.EOF {
t.Fatalf("got %v", p)
}
}()
checkMunmap(io.EOF, NoHints)
}()
}
1 change: 1 addition & 0 deletions pkg/common/malloc/hints.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,5 @@ const (
NoHints Hints = 0
DoNotReuse Hints = 1 << iota
NoClear
IgnoreMunmapError
)
3 changes: 2 additions & 1 deletion pkg/common/mpool/mpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -646,7 +646,8 @@ func (mp *MPool) Free(bs []byte) {
if pHdr.offHeap {
allocator().Deallocate(
unsafe.Slice((*byte)(hdr), 1),
malloc.NoHints,
// issue: https://github.com/matrixorigin/matrixone/issues/19758
malloc.IgnoreMunmapError,
)
}
}
Expand Down
6 changes: 6 additions & 0 deletions pkg/vm/engine/disttae/cache/catalog.go
Original file line number Diff line number Diff line change
Expand Up @@ -376,6 +376,12 @@ func (cc *CatalogCache) GetTable(tbl *TableItem) bool {
return find
}

func (cc *CatalogCache) GetStartTS() types.TS {
cc.mu.Lock()
defer cc.mu.Unlock()
return cc.mu.start
}

func (cc *CatalogCache) HasNewerVersion(qry *TableChangeQuery) bool {
var find bool

Expand Down
37 changes: 29 additions & 8 deletions pkg/vm/engine/disttae/txn_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -1213,19 +1213,33 @@ func (tbl *txnTable) UpdateConstraint(ctx context.Context, c *engine.ConstraintD
// and then the second alter will be treated as an operation on a newly-created table if txn.CreateTable is used.
//
// 2. This check depends on replaying all catalog cache when cn starts.
func (tbl *txnTable) isCreatedInTxn() bool {
func (tbl *txnTable) isCreatedInTxn(ctx context.Context) (bool, error) {
if tbl.remoteWorkspace {
return tbl.createdInTxn
return tbl.createdInTxn, nil
}

if tbl.db.op.IsSnapOp() {
// if the operation is snapshot read, isCreatedInTxn can not be called by AlterTable
// So if the snapshot read want to subcribe logtail tail, let it go ahead.
return false
return false, nil
}
idAckedbyTN := tbl.db.getEng().GetLatestCatalogCache().
GetTableByIdAndTime(tbl.accountId, tbl.db.databaseId, tbl.tableId, tbl.db.op.SnapshotTS())
return idAckedbyTN == nil

cache := tbl.db.getEng().GetLatestCatalogCache()
cacheTS := cache.GetStartTS().ToTimestamp()
if cacheTS.Greater(tbl.db.op.SnapshotTS()) {
if err := tbl.db.op.UpdateSnapshot(ctx, cacheTS); err != nil {
return false, err
}
return false, moerr.NewTxnNeedRetry(ctx)
}

idAckedbyTN := cache.GetTableByIdAndTime(
tbl.accountId,
tbl.db.databaseId,
tbl.tableId,
tbl.db.op.SnapshotTS(),
)
return idAckedbyTN == nil, nil
}

func (tbl *txnTable) AlterTable(ctx context.Context, c *engine.ConstraintDef, reqs []*api.AlterTableReq) error {
Expand Down Expand Up @@ -1319,7 +1333,10 @@ func (tbl *txnTable) AlterTable(ctx context.Context, c *engine.ConstraintDef, re
// 0. check if the table is created in txn.
// For a table created in txn, alter means to recreate table and put relating dml/alter batch behind the new create batch.
// For a normal table, alter means sending Alter request to TN, no creating command, and no dropping command.
createdInTxn := tbl.isCreatedInTxn()
createdInTxn, err := tbl.isCreatedInTxn(ctx)
if err != nil {
return err
}
if !createdInTxn {
tbl.version += 1
// For normal Alter, send Alter request to TN
Expand Down Expand Up @@ -1865,7 +1882,11 @@ func (tbl *txnTable) tryToSubscribe(ctx context.Context) (ps *logtailreplay.Part
}
}()

if tbl.isCreatedInTxn() {
createdInTxn, err := tbl.isCreatedInTxn(ctx)
if err != nil {
return nil, err
}
if createdInTxn {
return
}

Expand Down
9 changes: 8 additions & 1 deletion pkg/vm/engine/disttae/txn_table_sharding.go
Original file line number Diff line number Diff line change
Expand Up @@ -1008,6 +1008,7 @@ func (tbl *txnTableDelegate) hasAllLocalReplicas() (bool, error) {
}

func (tbl *txnTableDelegate) getReadRequest(
ctx context.Context,
method int,
apply func([]byte),
) (shardservice.ReadRequest, error) {
Expand All @@ -1018,6 +1019,11 @@ func (tbl *txnTableDelegate) getReadRequest(
return shardservice.ReadRequest{}, err
}

createdInTx, err := tbl.origin.isCreatedInTxn(ctx)
if err != nil {
return shardservice.ReadRequest{}, err
}

return shardservice.ReadRequest{
TableID: tbl.shard.tableID,
Method: method,
Expand All @@ -1028,7 +1034,7 @@ func (tbl *txnTableDelegate) getReadRequest(
DatabaseName: tbl.origin.db.databaseName,
AccountID: uint64(tbl.origin.accountId),
TableName: tbl.origin.tableName,
CreatedInTxn: tbl.origin.isCreatedInTxn(),
CreatedInTxn: createdInTx,
},
},
Apply: apply,
Expand Down Expand Up @@ -1079,6 +1085,7 @@ func (tbl *txnTableDelegate) forwardRead(
apply func([]byte),
) error {
request, err := tbl.getReadRequest(
ctx,
method,
apply,
)
Expand Down
39 changes: 35 additions & 4 deletions pkg/vm/engine/disttae/txn_table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,17 @@ import (
"context"
"testing"

"github.com/matrixorigin/matrixone/pkg/pb/timestamp"
"github.com/matrixorigin/matrixone/pkg/txn/client"

"github.com/matrixorigin/matrixone/pkg/common/moerr"
"github.com/matrixorigin/matrixone/pkg/common/mpool"
"github.com/matrixorigin/matrixone/pkg/common/runtime"
"github.com/matrixorigin/matrixone/pkg/container/batch"
"github.com/matrixorigin/matrixone/pkg/container/types"
"github.com/matrixorigin/matrixone/pkg/container/vector"
"github.com/matrixorigin/matrixone/pkg/fileservice"
"github.com/matrixorigin/matrixone/pkg/pb/timestamp"
"github.com/matrixorigin/matrixone/pkg/txn/client"
"github.com/matrixorigin/matrixone/pkg/txn/rpc"
"github.com/matrixorigin/matrixone/pkg/vm/engine/disttae/cache"
"github.com/stretchr/testify/assert"
)

Expand All @@ -43,13 +46,20 @@ func newTxnTableForTest() *txnTable {
packer.Close()
},
),
catalog: cache.NewCatalog(),
}
var tnStore DNStore
txn := &Transaction{
engine: engine,
tnStores: []DNStore{tnStore},
}
c := client.NewTxnClient("", nil, nil, nil)
rt := runtime.DefaultRuntime()
s, err := rpc.NewSender(rpc.Config{}, rt)
if err != nil {
panic(err)
}
c := client.NewTxnClient("", s)
c.Resume()
op, _ := c.New(context.Background(), timestamp.Timestamp{})
op.AddWorkspace(txn)

Expand All @@ -59,10 +69,31 @@ func newTxnTableForTest() *txnTable {
table := &txnTable{
db: db,
primaryIdx: 0,
eng: engine,
}
return table
}

func TestIsCreatedInTxn(t *testing.T) {
ctx := context.Background()
tbl := newTxnTableForTest()
e := tbl.eng.(*Engine)

ts0 := types.BuildTS(0, 0)
ts1 := types.BuildTS(10, 10)
e.catalog.UpdateDuration(ts0, ts0)
assert.NoError(t, tbl.db.op.UpdateSnapshot(ctx, ts1.ToTimestamp()))
inTxn, err := tbl.isCreatedInTxn(ctx)
assert.NoError(t, err)
assert.True(t, inTxn)

ts2 := types.BuildTS(20, 20)
e.catalog.UpdateStart(ts2)
inTxn, err = tbl.isCreatedInTxn(ctx)
assert.True(t, moerr.IsMoErrCode(err, moerr.ErrTxnNeedRetry))
assert.False(t, inTxn)
}

func makeBatchForTest(
mp *mpool.MPool,
ints ...int64,
Expand Down

0 comments on commit 65e6d24

Please sign in to comment.