Skip to content

Commit

Permalink
fix full segment (#8919)
Browse files Browse the repository at this point in the history
- create new appendable segments during appending, avoid index overflowing in a segment

Approved by: @XuPeng-SH
  • Loading branch information
aptend authored Apr 10, 2023
1 parent cce87e6 commit da3a62d
Show file tree
Hide file tree
Showing 6 changed files with 149 additions and 2 deletions.
10 changes: 10 additions & 0 deletions pkg/vm/engine/tae/catalog/segment.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package catalog
import (
"bytes"
"fmt"
"math"

"github.com/matrixorigin/matrixone/pkg/common/moerr"
"github.com/matrixorigin/matrixone/pkg/container/types"
Expand Down Expand Up @@ -279,6 +280,12 @@ func (entry *SegmentEntry) LastAppendableBlock() (blk *BlockEntry) {
return
}

func (entry *SegmentEntry) GetNextObjectIndex() uint16 {
entry.RLock()
defer entry.RUnlock()
return entry.nextObjectIdx
}

func (entry *SegmentEntry) CreateBlock(
txn txnif.AsyncTxn,
state EntryState,
Expand All @@ -296,6 +303,9 @@ func (entry *SegmentEntry) CreateBlock(
id = common.NewBlockid(&entry.ID, entry.nextObjectIdx, 0)
entry.nextObjectIdx += 1
}
if entry.nextObjectIdx == math.MaxUint16 {
panic("bad logic: full object offset")
}
if _, ok := entry.entries[id]; ok {
panic(fmt.Sprintf("duplicate bad block id: %s", id.String()))
}
Expand Down
102 changes: 102 additions & 0 deletions pkg/vm/engine/tae/db/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4677,6 +4677,108 @@ func TestUpdate(t *testing.T) {
}
}

// This is used to observe a lot of compactions to overflow a segment, it is not compulsory
func TestAlwaysUpdate(t *testing.T) {
t.Skip("This is a long test, run it manully to observe catalog")
defer testutils.AfterTest(t)()
opts := config.WithQuickScanAndCKPOpts2(nil, 100)
opts.GCCfg.ScanGCInterval = 3600 * time.Second
opts.CatalogCfg.GCInterval = 3600 * time.Second
// opts := config.WithLongScanAndCKPOpts(nil)
tae := newTestEngine(t, opts)
defer tae.Close()

schema := catalog.MockSchemaAll(5, 3)
schema.Name = "testupdate"
schema.BlockMaxRows = 8192
schema.SegmentMaxBlocks = 200
tae.bindSchema(schema)

bats := catalog.MockBatch(schema, 400*100).Split(100)
metalocs := make([]string, 0, 100)
// write only one segment
for i := 0; i < 1; i++ {
objName1 := common.NewSegmentid().ToString() + "-0"
writer, err := blockio.NewBlockWriter(tae.Fs.Service, objName1)
assert.Nil(t, err)
writer.SetPrimaryKey(3)
for _, bat := range bats[i*25 : (i+1)*25] {
_, err := writer.WriteBlock(bat)
assert.Nil(t, err)
}
blocks, _, err := writer.Sync(context.Background())
assert.Nil(t, err)
assert.Equal(t, 25, len(blocks))
for _, blk := range blocks {
loc, err := blockio.EncodeLocation(blk.GetExtent(), 8192, blocks)
assert.Nil(t, err)
metalocs = append(metalocs, loc)
}
}

txn, _ := tae.StartTxn(nil)
db, err := txn.CreateDatabase("db", "", "")
assert.NoError(t, err)
tbl, err := db.CreateRelation(schema)
assert.NoError(t, err)
assert.NoError(t, tbl.AddBlksWithMetaLoc(nil, metalocs))
assert.NoError(t, txn.Commit())

t.Log(tae.Catalog.SimplePPString(common.PPL1))

wg := &sync.WaitGroup{}

updateFn := func(i, j int) {
defer wg.Done()
tuples := bats[0].CloneWindow(0, 1)
defer tuples.Close()
for x := i; x < j; x++ {
txn, rel := tae.getRelation()
filter := handle.NewEQFilter(int64(x))
id, offset, err := rel.GetByFilter(filter)
assert.NoError(t, err)
_, err = rel.GetValue(id, offset, 2)
assert.NoError(t, err)
err = rel.RangeDelete(id, offset, offset, handle.DT_Normal)
if err != nil {
t.Logf("range delete %v, rollbacking", err)
_ = txn.Rollback()
return
}
tuples.Vecs[3].Update(0, int64(x))
err = rel.Append(tuples)
assert.NoError(t, err)
assert.NoError(t, txn.Commit())
}
t.Logf("(%d, %d) done", i, j)
}

p, _ := ants.NewPool(10)
defer p.Release()

ch := make(chan int, 1)
ticker := time.NewTicker(10 * time.Second)
go func() {
for {
select {
case <-ticker.C:
t.Log(tae.Catalog.SimplePPString(common.PPL1))
case <-ch:
}
}
}()

for r := 0; r < 10; r++ {
for i := 0; i < 40; i++ {
wg.Add(1)
start, end := i*200, (i+1)*200
f := func() { updateFn(start, end) }
p.Submit(f)
}
wg.Wait()
}
}

func TestInsertPerf(t *testing.T) {
t.Skip(any("for debug"))
opts := new(options.Options)
Expand Down
3 changes: 2 additions & 1 deletion pkg/vm/engine/tae/db/scannerop.go
Original file line number Diff line number Diff line change
Expand Up @@ -404,7 +404,8 @@ func (s *MergeTaskBuilder) onSegment(segmentEntry *catalog.SegmentEntry) (err er
if !segmentEntry.IsActive() || (!segmentEntry.IsAppendable() && segmentEntry.IsSorted()) {
return moerr.GetOkStopCurrRecur()
}
// handle appendable segs and unsorted non-appendable segs(which was written by cn)
// handle appendable segs
// TODO Iter non appendable segs to delete all. Typical occasion is TPCC
s.segBuilder.resetForNewSeg()
return
}
Expand Down
2 changes: 2 additions & 0 deletions pkg/vm/engine/tae/options/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ const (
DefaultBlockMaxRows = uint32(8192)
DefaultBlocksPerSegment = uint16(256)

DefaultObejctPerSegment = uint16(512)

DefaultScannerInterval = time.Second * 5
DefaultCheckpointFlushInterval = time.Minute
DefaultCheckpointMinCount = int64(100)
Expand Down
11 changes: 11 additions & 0 deletions pkg/vm/engine/tae/tables/handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,11 @@
package tables

import (
"github.com/matrixorigin/matrixone/pkg/logutil"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/catalog"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/common"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/iface/data"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/options"
)

type tableHandle struct {
Expand Down Expand Up @@ -81,6 +83,15 @@ func (h *tableHandle) GetAppender() (appender data.BlockAppender, err error) {
panic(err)
}
}

// Instead in ThrowAppenderAndErr, check object index here because
// it is better to create new appendable early in some busy update workload case
if seg := h.block.meta.GetSegment(); seg.GetNextObjectIndex() >= options.DefaultObejctPerSegment {
logutil.Infof("%s create new seg due to large object index %d",
seg.ID.ToString(), seg.GetNextObjectIndex())
return nil, data.ErrAppendableSegmentNotFound
}

dropped := h.block.meta.HasDropCommitted()
if !h.appender.IsAppendable() || !h.block.IsAppendable() || dropped {
return h.ThrowAppenderAndErr()
Expand Down
23 changes: 22 additions & 1 deletion pkg/vm/engine/tae/tables/jobs/compactblk.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"fmt"
"time"

"github.com/matrixorigin/matrixone/pkg/common/moerr"
"github.com/matrixorigin/matrixone/pkg/container/types"

"github.com/RoaringBitmap/roaring"
Expand All @@ -30,6 +31,7 @@ import (
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/iface/txnif"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/mergesort"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/model"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/options"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/tables/txnentries"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/tasks"
)
Expand Down Expand Up @@ -163,7 +165,26 @@ func (task *compactBlockTask) Execute() (err error) {
}

if !empty {
if _, err = task.createAndFlushNewBlock(seg, preparer, deletes); err != nil {
createOnSeg := seg
curSeg := seg.GetMeta().(*catalog.SegmentEntry)
// double the threshold to make more room for creating new appendable segment during appending, just a piece of defensive code
// check GetAppender function in tableHandle
if curSeg.GetNextObjectIndex() > options.DefaultObejctPerSegment*2 {
nextSeg := curSeg.GetTable().LastAppendableSegmemt()
if nextSeg.ID == curSeg.ID {
// we can't create appendable seg here because compaction can be rollbacked.
// so just wait until the new appendable seg is available.
// actually this log can barely be printed.
logutil.Infof("do not compact on seg %s %d, wait", curSeg.ID.ToString(), curSeg.GetNextObjectIndex())
return moerr.GetOkExpectedEOB()
}
createOnSeg, err = task.compacted.GetSegment().GetRelation().GetSegment(nextSeg.ID)
if err != nil {
return err
}
}

if _, err = task.createAndFlushNewBlock(createOnSeg, preparer, deletes); err != nil {
return
}
}
Expand Down

0 comments on commit da3a62d

Please sign in to comment.