From da3a62dc708dae988fdfb30af59853901cad8d3a Mon Sep 17 00:00:00 2001 From: aptend <49832303+aptend@users.noreply.github.com> Date: Mon, 10 Apr 2023 18:25:05 +0800 Subject: [PATCH] fix full segment (#8919) - create new appendable segments during appending, avoid index overflowing in a segment Approved by: @XuPeng-SH --- pkg/vm/engine/tae/catalog/segment.go | 10 ++ pkg/vm/engine/tae/db/db_test.go | 102 ++++++++++++++++++++ pkg/vm/engine/tae/db/scannerop.go | 3 +- pkg/vm/engine/tae/options/types.go | 2 + pkg/vm/engine/tae/tables/handle.go | 11 +++ pkg/vm/engine/tae/tables/jobs/compactblk.go | 23 ++++- 6 files changed, 149 insertions(+), 2 deletions(-) diff --git a/pkg/vm/engine/tae/catalog/segment.go b/pkg/vm/engine/tae/catalog/segment.go index 1a2e47632016..d2f0d79a7658 100644 --- a/pkg/vm/engine/tae/catalog/segment.go +++ b/pkg/vm/engine/tae/catalog/segment.go @@ -17,6 +17,7 @@ package catalog import ( "bytes" "fmt" + "math" "github.com/matrixorigin/matrixone/pkg/common/moerr" "github.com/matrixorigin/matrixone/pkg/container/types" @@ -279,6 +280,12 @@ func (entry *SegmentEntry) LastAppendableBlock() (blk *BlockEntry) { return } +func (entry *SegmentEntry) GetNextObjectIndex() uint16 { + entry.RLock() + defer entry.RUnlock() + return entry.nextObjectIdx +} + func (entry *SegmentEntry) CreateBlock( txn txnif.AsyncTxn, state EntryState, @@ -296,6 +303,9 @@ func (entry *SegmentEntry) CreateBlock( id = common.NewBlockid(&entry.ID, entry.nextObjectIdx, 0) entry.nextObjectIdx += 1 } + if entry.nextObjectIdx == math.MaxUint16 { + panic("bad logic: full object offset") + } if _, ok := entry.entries[id]; ok { panic(fmt.Sprintf("duplicate bad block id: %s", id.String())) } diff --git a/pkg/vm/engine/tae/db/db_test.go b/pkg/vm/engine/tae/db/db_test.go index cef9d9096f18..413714cbe904 100644 --- a/pkg/vm/engine/tae/db/db_test.go +++ b/pkg/vm/engine/tae/db/db_test.go @@ -4677,6 +4677,108 @@ func TestUpdate(t *testing.T) { } } +// This is used to observe a lot of compactions to overflow a segment, it is not compulsory +func TestAlwaysUpdate(t *testing.T) { + t.Skip("This is a long test, run it manully to observe catalog") + defer testutils.AfterTest(t)() + opts := config.WithQuickScanAndCKPOpts2(nil, 100) + opts.GCCfg.ScanGCInterval = 3600 * time.Second + opts.CatalogCfg.GCInterval = 3600 * time.Second + // opts := config.WithLongScanAndCKPOpts(nil) + tae := newTestEngine(t, opts) + defer tae.Close() + + schema := catalog.MockSchemaAll(5, 3) + schema.Name = "testupdate" + schema.BlockMaxRows = 8192 + schema.SegmentMaxBlocks = 200 + tae.bindSchema(schema) + + bats := catalog.MockBatch(schema, 400*100).Split(100) + metalocs := make([]string, 0, 100) + // write only one segment + for i := 0; i < 1; i++ { + objName1 := common.NewSegmentid().ToString() + "-0" + writer, err := blockio.NewBlockWriter(tae.Fs.Service, objName1) + assert.Nil(t, err) + writer.SetPrimaryKey(3) + for _, bat := range bats[i*25 : (i+1)*25] { + _, err := writer.WriteBlock(bat) + assert.Nil(t, err) + } + blocks, _, err := writer.Sync(context.Background()) + assert.Nil(t, err) + assert.Equal(t, 25, len(blocks)) + for _, blk := range blocks { + loc, err := blockio.EncodeLocation(blk.GetExtent(), 8192, blocks) + assert.Nil(t, err) + metalocs = append(metalocs, loc) + } + } + + txn, _ := tae.StartTxn(nil) + db, err := txn.CreateDatabase("db", "", "") + assert.NoError(t, err) + tbl, err := db.CreateRelation(schema) + assert.NoError(t, err) + assert.NoError(t, tbl.AddBlksWithMetaLoc(nil, metalocs)) + assert.NoError(t, txn.Commit()) + + t.Log(tae.Catalog.SimplePPString(common.PPL1)) + + wg := &sync.WaitGroup{} + + updateFn := func(i, j int) { + defer wg.Done() + tuples := bats[0].CloneWindow(0, 1) + defer tuples.Close() + for x := i; x < j; x++ { + txn, rel := tae.getRelation() + filter := handle.NewEQFilter(int64(x)) + id, offset, err := rel.GetByFilter(filter) + assert.NoError(t, err) + _, err = rel.GetValue(id, offset, 2) + assert.NoError(t, err) + err = rel.RangeDelete(id, offset, offset, handle.DT_Normal) + if err != nil { + t.Logf("range delete %v, rollbacking", err) + _ = txn.Rollback() + return + } + tuples.Vecs[3].Update(0, int64(x)) + err = rel.Append(tuples) + assert.NoError(t, err) + assert.NoError(t, txn.Commit()) + } + t.Logf("(%d, %d) done", i, j) + } + + p, _ := ants.NewPool(10) + defer p.Release() + + ch := make(chan int, 1) + ticker := time.NewTicker(10 * time.Second) + go func() { + for { + select { + case <-ticker.C: + t.Log(tae.Catalog.SimplePPString(common.PPL1)) + case <-ch: + } + } + }() + + for r := 0; r < 10; r++ { + for i := 0; i < 40; i++ { + wg.Add(1) + start, end := i*200, (i+1)*200 + f := func() { updateFn(start, end) } + p.Submit(f) + } + wg.Wait() + } +} + func TestInsertPerf(t *testing.T) { t.Skip(any("for debug")) opts := new(options.Options) diff --git a/pkg/vm/engine/tae/db/scannerop.go b/pkg/vm/engine/tae/db/scannerop.go index 9ff64471d22c..8f35d8682bc0 100644 --- a/pkg/vm/engine/tae/db/scannerop.go +++ b/pkg/vm/engine/tae/db/scannerop.go @@ -404,7 +404,8 @@ func (s *MergeTaskBuilder) onSegment(segmentEntry *catalog.SegmentEntry) (err er if !segmentEntry.IsActive() || (!segmentEntry.IsAppendable() && segmentEntry.IsSorted()) { return moerr.GetOkStopCurrRecur() } - // handle appendable segs and unsorted non-appendable segs(which was written by cn) + // handle appendable segs + // TODO Iter non appendable segs to delete all. Typical occasion is TPCC s.segBuilder.resetForNewSeg() return } diff --git a/pkg/vm/engine/tae/options/types.go b/pkg/vm/engine/tae/options/types.go index 1d87a41e530a..c37386e378ec 100644 --- a/pkg/vm/engine/tae/options/types.go +++ b/pkg/vm/engine/tae/options/types.go @@ -36,6 +36,8 @@ const ( DefaultBlockMaxRows = uint32(8192) DefaultBlocksPerSegment = uint16(256) + DefaultObejctPerSegment = uint16(512) + DefaultScannerInterval = time.Second * 5 DefaultCheckpointFlushInterval = time.Minute DefaultCheckpointMinCount = int64(100) diff --git a/pkg/vm/engine/tae/tables/handle.go b/pkg/vm/engine/tae/tables/handle.go index 090d2e021392..c76ca647022e 100644 --- a/pkg/vm/engine/tae/tables/handle.go +++ b/pkg/vm/engine/tae/tables/handle.go @@ -15,9 +15,11 @@ package tables import ( + "github.com/matrixorigin/matrixone/pkg/logutil" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/catalog" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/common" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/iface/data" + "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/options" ) type tableHandle struct { @@ -81,6 +83,15 @@ func (h *tableHandle) GetAppender() (appender data.BlockAppender, err error) { panic(err) } } + + // Instead in ThrowAppenderAndErr, check object index here because + // it is better to create new appendable early in some busy update workload case + if seg := h.block.meta.GetSegment(); seg.GetNextObjectIndex() >= options.DefaultObejctPerSegment { + logutil.Infof("%s create new seg due to large object index %d", + seg.ID.ToString(), seg.GetNextObjectIndex()) + return nil, data.ErrAppendableSegmentNotFound + } + dropped := h.block.meta.HasDropCommitted() if !h.appender.IsAppendable() || !h.block.IsAppendable() || dropped { return h.ThrowAppenderAndErr() diff --git a/pkg/vm/engine/tae/tables/jobs/compactblk.go b/pkg/vm/engine/tae/tables/jobs/compactblk.go index 377cf1e3b62e..90d92b80f468 100644 --- a/pkg/vm/engine/tae/tables/jobs/compactblk.go +++ b/pkg/vm/engine/tae/tables/jobs/compactblk.go @@ -18,6 +18,7 @@ import ( "fmt" "time" + "github.com/matrixorigin/matrixone/pkg/common/moerr" "github.com/matrixorigin/matrixone/pkg/container/types" "github.com/RoaringBitmap/roaring" @@ -30,6 +31,7 @@ import ( "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/iface/txnif" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/mergesort" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/model" + "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/options" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/tables/txnentries" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/tasks" ) @@ -163,7 +165,26 @@ func (task *compactBlockTask) Execute() (err error) { } if !empty { - if _, err = task.createAndFlushNewBlock(seg, preparer, deletes); err != nil { + createOnSeg := seg + curSeg := seg.GetMeta().(*catalog.SegmentEntry) + // double the threshold to make more room for creating new appendable segment during appending, just a piece of defensive code + // check GetAppender function in tableHandle + if curSeg.GetNextObjectIndex() > options.DefaultObejctPerSegment*2 { + nextSeg := curSeg.GetTable().LastAppendableSegmemt() + if nextSeg.ID == curSeg.ID { + // we can't create appendable seg here because compaction can be rollbacked. + // so just wait until the new appendable seg is available. + // actually this log can barely be printed. + logutil.Infof("do not compact on seg %s %d, wait", curSeg.ID.ToString(), curSeg.GetNextObjectIndex()) + return moerr.GetOkExpectedEOB() + } + createOnSeg, err = task.compacted.GetSegment().GetRelation().GetSegment(nextSeg.ID) + if err != nil { + return err + } + } + + if _, err = task.createAndFlushNewBlock(createOnSeg, preparer, deletes); err != nil { return } }