From ec9370800f4f403424e9dae94c1a11d07a35aeb5 Mon Sep 17 00:00:00 2001 From: gouhongshen Date: Mon, 27 May 2024 19:13:12 +0800 Subject: [PATCH 01/10] remove blk info --- pkg/container/batch/batch.go | 4 + pkg/sql/colexec/insert/insert.go | 6 +- pkg/sql/colexec/mergeblock/types.go | 24 +++-- pkg/sql/colexec/s3util.go | 101 ++++++++++--------- pkg/vm/engine/disttae/tools.go | 2 +- pkg/vm/engine/disttae/txn.go | 145 +++++++++++++++------------- pkg/vm/engine/disttae/txn_table.go | 18 ++-- pkg/vm/engine/disttae/util.go | 4 +- pkg/vm/engine/tae/rpc/handle.go | 43 +++++---- 9 files changed, 192 insertions(+), 155 deletions(-) diff --git a/pkg/container/batch/batch.go b/pkg/container/batch/batch.go index e45accbd13fd..0e3330a5fe56 100644 --- a/pkg/container/batch/batch.go +++ b/pkg/container/batch/batch.go @@ -177,6 +177,10 @@ func (bat *Batch) SetVector(pos int32, vec *vector.Vector) { } func (bat *Batch) GetVector(pos int32) *vector.Vector { + if len(bat.Vecs) == 0 { + x := 0 + x++ + } return bat.Vecs[pos] } diff --git a/pkg/sql/colexec/insert/insert.go b/pkg/sql/colexec/insert/insert.go index cebf5c839aac..28b5a6eabea4 100644 --- a/pkg/sql/colexec/insert/insert.go +++ b/pkg/sql/colexec/insert/insert.go @@ -247,12 +247,12 @@ func (arg *Argument) insert_table(proc *process.Process) (vm.CallResult, error) // Collect all partition subtables' s3writers metaLoc information and output it func collectAndOutput(proc *process.Process, s3Writers []*colexec.S3Writer, result *vm.CallResult) (err error) { - attrs := []string{catalog.BlockMeta_TableIdx_Insert, catalog.BlockMeta_BlockInfo, catalog.ObjectMeta_ObjectStats} + attrs := []string{catalog.BlockMeta_TableIdx_Insert, catalog.ObjectMeta_ObjectStats} res := batch.NewWithSize(len(attrs)) res.SetAttributes(attrs) res.Vecs[0] = proc.GetVector(types.T_int16.ToType()) - res.Vecs[1] = proc.GetVector(types.T_text.ToType()) - res.Vecs[2] = proc.GetVector(types.T_binary.ToType()) + //res.Vecs[1] = proc.GetVector(types.T_text.ToType()) + res.Vecs[1] = proc.GetVector(types.T_binary.ToType()) for _, w := range s3Writers { //deep copy. diff --git a/pkg/sql/colexec/mergeblock/types.go b/pkg/sql/colexec/mergeblock/types.go index 457b47820134..6823dbc12b2f 100644 --- a/pkg/sql/colexec/mergeblock/types.go +++ b/pkg/sql/colexec/mergeblock/types.go @@ -175,7 +175,7 @@ func splitObjectStats(arg *Argument, proc *process.Process, objDataMeta := objectio.BuildObjectMeta(uint16(blkVec.Length())) var objStats objectio.ObjectStats - statsVec := bat.Vecs[2] + statsVec := bat.Vecs[1] statsIdx := 0 for idx := 0; idx < len(tblIdx); idx++ { @@ -190,8 +190,8 @@ func splitObjectStats(arg *Argument, proc *process.Process, continue } - destVec := arg.container.mp[int(tblIdx[idx])].Vecs[1] - + destVec := arg.container.mp[int(tblIdx[idx])].Vecs[0] + affectedRows := uint32(0) if needLoad { // comes from old version cn objStats, objDataMeta, err = disttae.ConstructObjStatsByLoadObjMeta(proc.Ctx, blkInfo.MetaLocation(), fs) @@ -199,12 +199,18 @@ func splitObjectStats(arg *Argument, proc *process.Process, return err } + affectedRows = objStats.Rows() vector.AppendBytes(destVec, objStats.Marshal(), false, proc.GetMPool()) } else { // not comes from old version cn vector.AppendBytes(destVec, statsVec.GetBytesAt(statsIdx), false, proc.GetMPool()) objDataMeta.BlockHeader().SetBlockID(&blkInfo.BlockID) statsIdx++ + affectedRows = uint32(0) + } + + if arg.AddAffectedRows { + arg.affectedRows += uint64(affectedRows) } } @@ -220,12 +226,12 @@ func (arg *Argument) Split(proc *process.Process, bat *batch.Batch) error { hasObject := false for i := range tblIdx { // append s3 writer returned blk info if tblIdx[i] >= 0 { - if arg.AddAffectedRows { - blkInfo := objectio.DecodeBlockInfo(blkInfosVec.GetBytesAt(i)) - arg.affectedRows += uint64(blkInfo.MetaLocation().Rows()) - } - vector.AppendBytes(arg.container.mp[int(tblIdx[i])].Vecs[0], - blkInfosVec.GetBytesAt(i), false, proc.GetMPool()) + //if arg.AddAffectedRows { + // blkInfo := objectio.DecodeBlockInfo(blkInfosVec.GetBytesAt(i)) + // arg.affectedRows += uint64(blkInfo.MetaLocation().Rows()) + //} + //vector.AppendBytes(arg.container.mp[int(tblIdx[i])].Vecs[0], + // blkInfosVec.GetBytesAt(i), false, proc.GetMPool()) hasObject = true } else { // append data idx := int(-(tblIdx[i] + 1)) diff --git a/pkg/sql/colexec/s3util.go b/pkg/sql/colexec/s3util.go index e592037de0d7..1b71e9a1b3b2 100644 --- a/pkg/sql/colexec/s3util.go +++ b/pkg/sql/colexec/s3util.go @@ -37,6 +37,11 @@ import ( "go.uber.org/zap" ) +const ( + objStatsInfoBatTblIdxOffset = 0 + objStatsInfoBatObjectOffset = 1 +) + // S3Writer is used to write table data to S3 and package a series of `BlockWriter` write operations // Currently there are two scenarios will let cn write s3 directly // scenario 1 is insert operator directly go s3, when a one-time insert/load data volume is relatively large will trigger the scenario @@ -56,8 +61,9 @@ type S3Writer struct { lengths []uint64 // the third vector only has several rows, not aligns with the other two vectors. - blockInfoBat *batch.Batch - + objStatsInfoBat *batch.Batch + // TODO(ghs) remove this later + //blkInfoBat *batch.Batch // An intermediate cache after the merge sort of all `Bats` data buffer *batch.Batch @@ -87,9 +93,9 @@ const ( ) func (w *S3Writer) Free(proc *process.Process) { - if w.blockInfoBat != nil { - w.blockInfoBat.Clean(proc.Mp()) - w.blockInfoBat = nil + if w.objStatsInfoBat != nil { + w.objStatsInfoBat.Clean(proc.Mp()) + w.objStatsInfoBat = nil } if w.buffer != nil { w.buffer.Clean(proc.Mp()) @@ -106,7 +112,7 @@ func (w *S3Writer) Free(proc *process.Process) { } func (w *S3Writer) GetBlockInfoBat() *batch.Batch { - return w.blockInfoBat + return w.objStatsInfoBat } func (w *S3Writer) SetSortIdx(sortIdx int) { @@ -234,17 +240,21 @@ func (w *S3Writer) ResetBlockInfoBat(proc *process.Process) { // A simple explanation of the two vectors held by metaLocBat // vecs[0] to mark which table this metaLoc belongs to: [0] means insertTable itself, [1] means the first uniqueIndex table, [2] means the second uniqueIndex table and so on // vecs[1] store relative block metadata - if w.blockInfoBat != nil { - proc.PutBatch(w.blockInfoBat) + if w.objStatsInfoBat != nil { + proc.PutBatch(w.objStatsInfoBat) } - attrs := []string{catalog.BlockMeta_TableIdx_Insert, catalog.BlockMeta_BlockInfo, catalog.ObjectMeta_ObjectStats} + //attrs := []string{catalog.BlockMeta_TableIdx_Insert, catalog.BlockMeta_BlockInfo, catalog.ObjectMeta_ObjectStats} + attrs := []string{catalog.BlockMeta_TableIdx_Insert, catalog.ObjectMeta_ObjectStats} blockInfoBat := batch.NewWithSize(len(attrs)) blockInfoBat.Attrs = attrs - blockInfoBat.Vecs[0] = proc.GetVector(types.T_int16.ToType()) - blockInfoBat.Vecs[1] = proc.GetVector(types.T_text.ToType()) - blockInfoBat.Vecs[2] = proc.GetVector(types.T_binary.ToType()) + //objStatsInfoBat.Vecs[0] = proc.GetVector(types.T_int16.ToType()) + blockInfoBat.Vecs[objStatsInfoBatTblIdxOffset] = proc.GetVector(types.T_int16.ToType()) + blockInfoBat.Vecs[objStatsInfoBatObjectOffset] = proc.GetVector(types.T_binary.ToType()) - w.blockInfoBat = blockInfoBat + w.objStatsInfoBat = blockInfoBat + + //w.blkInfoBat = batch.NewWithSize(1) + //w.blkInfoBat.Vecs[0] = proc.GetVector(types.T_text.ToType()) } //func (w *S3Writer) WriteEnd(proc *process.Process) { @@ -255,18 +265,18 @@ func (w *S3Writer) ResetBlockInfoBat(proc *process.Process) { //} func (w *S3Writer) Output(proc *process.Process, result *vm.CallResult) error { - bat := batch.NewWithSize(len(w.blockInfoBat.Attrs)) - bat.SetAttributes(w.blockInfoBat.Attrs) + bat := batch.NewWithSize(len(w.objStatsInfoBat.Attrs)) + bat.SetAttributes(w.objStatsInfoBat.Attrs) - for i := range w.blockInfoBat.Attrs { - vec := proc.GetVector(*w.blockInfoBat.Vecs[i].GetType()) - if err := vec.UnionBatch(w.blockInfoBat.Vecs[i], 0, w.blockInfoBat.Vecs[i].Length(), nil, proc.GetMPool()); err != nil { + for i := range w.objStatsInfoBat.Attrs { + vec := proc.GetVector(*w.objStatsInfoBat.Vecs[i].GetType()) + if err := vec.UnionBatch(w.objStatsInfoBat.Vecs[i], 0, w.objStatsInfoBat.Vecs[i].Length(), nil, proc.GetMPool()); err != nil { vec.Free(proc.Mp()) return err } bat.SetVector(int32(i), vec) } - bat.SetRowCount(w.blockInfoBat.RowCount()) + bat.SetRowCount(w.objStatsInfoBat.RowCount()) w.ResetBlockInfoBat(proc) result.Batch = bat return nil @@ -291,12 +301,12 @@ func (w *S3Writer) WriteS3CacheBatch(proc *process.Process) error { if err := w.SortAndFlush(proc); err != nil { return err } - w.blockInfoBat.SetRowCount(w.blockInfoBat.Vecs[0].Length()) + w.objStatsInfoBat.SetRowCount(w.objStatsInfoBat.Vecs[objStatsInfoBatTblIdxOffset].Length()) return nil } for _, bat := range w.Bats { if err := vector.AppendFixed( - w.blockInfoBat.Vecs[0], -w.partitionIndex-1, + w.objStatsInfoBat.Vecs[objStatsInfoBatTblIdxOffset], -w.partitionIndex-1, false, proc.GetMPool()); err != nil { return err } @@ -305,12 +315,12 @@ func (w *S3Writer) WriteS3CacheBatch(proc *process.Process) error { return err } if err = vector.AppendBytes( - w.blockInfoBat.Vecs[1], bytes, + w.objStatsInfoBat.Vecs[objStatsInfoBatObjectOffset], bytes, false, proc.GetMPool()); err != nil { return err } } - w.blockInfoBat.SetRowCount(w.blockInfoBat.Vecs[0].Length()) + w.objStatsInfoBat.SetRowCount(w.objStatsInfoBat.Vecs[objStatsInfoBatTblIdxOffset].Length()) return nil } @@ -634,27 +644,27 @@ func (w *S3Writer) WriteBlock(bat *batch.Batch, dataType ...objectio.DataMetaTyp } func (w *S3Writer) writeEndBlocks(proc *process.Process) error { - blkInfos, stats, err := w.WriteEndBlocks(proc) + _, stats, err := w.WriteEndBlocks(proc) if err != nil { return err } - for _, blkInfo := range blkInfos { - if err := vector.AppendFixed( - w.blockInfoBat.Vecs[0], - w.partitionIndex, - false, - proc.GetMPool()); err != nil { - return err - } - if err := vector.AppendBytes( - w.blockInfoBat.Vecs[1], - //[]byte(metaLoc), - objectio.EncodeBlockInfo(blkInfo), - false, - proc.GetMPool()); err != nil { - return err - } - } + //for _, blkInfo := range blkInfos { + // if err := vector.AppendFixed( + // w.objStatsInfoBat.Vecs[0], + // w.partitionIndex, + // false, + // proc.GetMPool()); err != nil { + // return err + // } + //if err := vector.AppendBytes( + // w.blkInfoBat.Vecs[0], + // //[]byte(metaLoc), + // objectio.EncodeBlockInfo(blkInfo), + // false, + // proc.GetMPool()); err != nil { + // return err + //} + //} // append the object stats to bat, // at most one will append in @@ -663,13 +673,18 @@ func (w *S3Writer) writeEndBlocks(proc *process.Process) error { continue } - if err = vector.AppendBytes(w.blockInfoBat.Vecs[2], + if err := vector.AppendFixed(w.objStatsInfoBat.Vecs[objStatsInfoBatTblIdxOffset], + w.partitionIndex, false, proc.GetMPool()); err != nil { + return err + } + + if err = vector.AppendBytes(w.objStatsInfoBat.Vecs[objStatsInfoBatObjectOffset], stats[idx].Marshal(), false, proc.GetMPool()); err != nil { return err } } - w.blockInfoBat.SetRowCount(w.blockInfoBat.Vecs[0].Length()) + w.objStatsInfoBat.SetRowCount(w.objStatsInfoBat.Vecs[objStatsInfoBatTblIdxOffset].Length()) return nil } diff --git a/pkg/vm/engine/disttae/tools.go b/pkg/vm/engine/disttae/tools.go index ff2672347d41..b60c4838302f 100644 --- a/pkg/vm/engine/disttae/tools.go +++ b/pkg/vm/engine/disttae/tools.go @@ -899,7 +899,7 @@ func toPBEntry(e Entry) (*api.Entry, error) { if e.typ == INSERT { ebat = batch.NewWithSize(0) - if e.bat.Attrs[0] == catalog.BlockMeta_MetaLoc { + if e.bat.Attrs[0] == catalog.ObjectMeta_ObjectStats { ebat.Vecs = e.bat.Vecs ebat.Attrs = e.bat.Attrs } else { diff --git a/pkg/vm/engine/disttae/txn.go b/pkg/vm/engine/disttae/txn.go index a3b0a3869be6..df6534c83411 100644 --- a/pkg/vm/engine/disttae/txn.go +++ b/pkg/vm/engine/disttae/txn.go @@ -491,18 +491,19 @@ func (txn *Transaction) dumpBatchLocked(offset int) error { if err != nil { return err } - blockInfo := s3Writer.GetBlockInfoBat() + objStatsInfo := s3Writer.GetBlockInfoBat() - lenVecs := len(blockInfo.Attrs) - // only remain the metaLoc col and object stats - blockInfo.Vecs = blockInfo.Vecs[lenVecs-2:] - blockInfo.Attrs = blockInfo.Attrs[lenVecs-2:] - blockInfo.SetRowCount(blockInfo.Vecs[0].Length()) + lenVecs := len(objStatsInfo.Attrs) + // only remain the object stats + objStatsInfo.Vecs = objStatsInfo.Vecs[lenVecs-1:] + objStatsInfo.Attrs = objStatsInfo.Attrs[lenVecs-1:] + objStatsInfo.SetRowCount(objStatsInfo.Vecs[0].Length()) table := tbl.(*txnTable) - fileName := objectio.DecodeBlockInfo( - blockInfo.Vecs[0].GetBytesAt(0)). - MetaLocation().Name().String() + + stats := objectio.ObjectStats(objStatsInfo.Vecs[0].GetBytesAt(0)) + fileName := stats.ObjectName().String() + err = table.getTxn().WriteFileLocked( INSERT, table.accountId, @@ -511,7 +512,7 @@ func (txn *Transaction) dumpBatchLocked(offset int) error { table.db.databaseName, table.tableName, fileName, - blockInfo, + objStatsInfo, table.getTxn().tnStores[0], ) if err != nil { @@ -566,16 +567,22 @@ func (txn *Transaction) insertPosForCNBlock( b *batch.Batch, dbName string, tbName string) error { - blks, area := vector.MustVarlenaRawData(vec) - for i := range blks { - blkInfo := *objectio.DecodeBlockInfo(blks[i].GetByteSlice(area)) - txn.cnBlkId_Pos[blkInfo.BlockID] = Pos{ + //blks := vector.MustBytesCol(vec) + stats := objectio.ObjectStats(vec.GetBytesAt(0)) + iter := NewStatsBlkIter(&stats, nil) + + offset := int64(0) + for iter.Next() { + blk := iter.Entry() + txn.cnBlkId_Pos[blk.BlockID] = Pos{ bat: b, accountId: id, dbName: dbName, tbName: tbName, - offset: int64(i), - blkInfo: blkInfo} + offset: offset, + blkInfo: blk, + } + offset++ } return nil } @@ -594,24 +601,16 @@ func (txn *Transaction) WriteFileLocked( newBat := bat if typ == INSERT { newBat = batch.NewWithSize(len(bat.Vecs)) - newBat.SetAttributes([]string{catalog.BlockMeta_MetaLoc, catalog.ObjectMeta_ObjectStats}) + newBat.SetAttributes([]string{catalog.ObjectMeta_ObjectStats}) for idx := 0; idx < newBat.VectorCount(); idx++ { newBat.SetVector(int32(idx), vector.NewVec(*bat.Vecs[idx].GetType())) } - blkInfosVec := bat.Vecs[0] - for idx := 0; idx < blkInfosVec.Length(); idx++ { - blkInfo := *objectio.DecodeBlockInfo(blkInfosVec.GetBytesAt(idx)) - vector.AppendBytes(newBat.Vecs[0], []byte(blkInfo.MetaLocation().String()), - false, txn.proc.Mp()) - colexec.Get().PutCnSegment(&blkInfo.SegmentID, colexec.CnBlockIdType) - } - // append obj stats, may multiple - statsListVec := bat.Vecs[1] + statsListVec := bat.Vecs[0] for idx := 0; idx < statsListVec.Length(); idx++ { - vector.AppendBytes(newBat.Vecs[1], statsListVec.GetBytesAt(idx), false, txn.proc.Mp()) + vector.AppendBytes(newBat.Vecs[0], statsListVec.GetBytesAt(idx), false, txn.proc.Mp()) } newBat.SetRowCount(bat.Vecs[0].Length()) @@ -861,55 +860,63 @@ func (txn *Transaction) compactionBlksLocked() error { } //TODO::do parallel compaction for table tbl := rel.(*txnTable) - createdBlks, stats, err := tbl.compaction(blks) + stats, err := tbl.compaction(blks) if err != nil { return err } - if len(createdBlks) > 0 { - bat := batch.NewWithSize(2) - bat.Attrs = []string{catalog.BlockMeta_BlockInfo, catalog.ObjectMeta_ObjectStats} - bat.SetVector(0, vector.NewVec(types.T_text.ToType())) - bat.SetVector(1, vector.NewVec(types.T_binary.ToType())) - for _, blkInfo := range createdBlks { - vector.AppendBytes( - bat.GetVector(0), - objectio.EncodeBlockInfo(blkInfo), - false, - tbl.getTxn().proc.GetMPool()) - } - // append the object stats to bat - for idx := 0; idx < len(stats); idx++ { - if stats[idx].IsZero() { - continue - } - if err = vector.AppendBytes(bat.Vecs[1], stats[idx].Marshal(), - false, tbl.getTxn().proc.GetMPool()); err != nil { - return err - } - } + var objStats objectio.ObjectStats + if !stats[objectio.SchemaData].IsZero() { + objStats = stats[objectio.SchemaData] + } else { + objStats = stats[objectio.SchemaTombstone] + } - bat.SetRowCount(len(createdBlks)) - defer func() { - bat.Clean(tbl.getTxn().proc.GetMPool()) - }() + //if len(createdBlks) > 0 { + bat := batch.NewWithSize(1) + bat.Attrs = []string{catalog.ObjectMeta_ObjectStats} + //bat.SetVector(0, vector.NewVec(types.T_text.ToType())) + bat.SetVector(0, vector.NewVec(types.T_binary.ToType())) + //for _, blkInfo := range createdBlks { + // vector.AppendBytes( + // bat.GetVector(0), + // objectio.EncodeBlockInfo(blkInfo), + // false, + // tbl.getTxn().proc.GetMPool()) + //} + + // append the object stats to bat + //for idx := 0; idx < len(stats); idx++ { + // if stats[idx].IsZero() { + // continue + // } + if err = vector.AppendBytes(bat.Vecs[0], objStats.Marshal(), + false, tbl.getTxn().proc.GetMPool()); err != nil { + return err + } + //} - err := txn.WriteFileLocked( - INSERT, - tbl.accountId, - tbl.db.databaseId, - tbl.tableId, - tbl.db.databaseName, - tbl.tableName, - createdBlks[0].MetaLocation().Name().String(), - bat, - tbl.getTxn().tnStores[0], - ) - if err != nil { - return err - } + bat.SetRowCount(1) + defer func() { + bat.Clean(tbl.getTxn().proc.GetMPool()) + }() + + err = txn.WriteFileLocked( + INSERT, + tbl.accountId, + tbl.db.databaseId, + tbl.tableId, + tbl.db.databaseName, + tbl.tableName, + objStats.ObjectName().String(), + bat, + tbl.getTxn().tnStores[0], + ) + if err != nil { + return err } } + //} //compaction for txn.writes for i, entry := range txn.writes { @@ -922,7 +929,7 @@ func (txn *Transaction) compactionBlksLocked() error { } if entry.typ != INSERT || - entry.bat.Attrs[0] != catalog.BlockMeta_MetaLoc { + entry.bat.Attrs[0] != catalog.ObjectMeta_ObjectStats { continue } entry.bat.Shrink(compactedEntries[entry.bat], true) diff --git a/pkg/vm/engine/disttae/txn_table.go b/pkg/vm/engine/disttae/txn_table.go index 830bb2b489c6..f041b4726cd9 100644 --- a/pkg/vm/engine/disttae/txn_table.go +++ b/pkg/vm/engine/disttae/txn_table.go @@ -1495,10 +1495,12 @@ func (tbl *txnTable) Write(ctx context.Context, bat *batch.Batch) error { return nil } // for writing S3 Block - if bat.Attrs[0] == catalog.BlockMeta_BlockInfo { + if bat.Attrs[0] == catalog.ObjectMeta_ObjectStats { tbl.getTxn().hasS3Op.Store(true) //bocks maybe come from different S3 object, here we just need to make sure fileName is not Nil. - fileName := objectio.DecodeBlockInfo(bat.Vecs[0].GetBytesAt(0)).MetaLocation().Name().String() + stats := objectio.ObjectStats(bat.Vecs[0].GetBytesAt(0)) + fileName := stats.ObjectName().String() + //fileName := objectio.DecodeBlockInfo(bat.Vecs[0].GetBytesAt(0)).MetaLocation().Name().String() return tbl.getTxn().WriteFile( INSERT, tbl.accountId, @@ -1608,13 +1610,13 @@ func (tbl *txnTable) ensureSeqnumsAndTypesExpectRowid() { // TODO:: do prefetch read and parallel compaction func (tbl *txnTable) compaction( - compactedBlks map[objectio.ObjectLocation][]int64) ([]objectio.BlockInfo, []objectio.ObjectStats, error) { + compactedBlks map[objectio.ObjectLocation][]int64) ([]objectio.ObjectStats, error) { s3writer := &colexec.S3Writer{} s3writer.SetTableName(tbl.tableName) s3writer.SetSchemaVer(tbl.version) _, err := s3writer.GenerateWriter(tbl.getTxn().proc) if err != nil { - return nil, nil, err + return nil, err } tbl.ensureSeqnumsAndTypesExpectRowid() s3writer.SetSeqnums(tbl.seqnums) @@ -1630,7 +1632,7 @@ func (tbl *txnTable) compaction( tbl.getTxn().engine.fs, tbl.getTxn().proc.GetMPool()) if e != nil { - return nil, nil, e + return nil, e } if bat.RowCount() == 0 { continue @@ -1639,11 +1641,11 @@ func (tbl *txnTable) compaction( bat.Clean(tbl.getTxn().proc.GetMPool()) } - createdBlks, stats, err := s3writer.WriteEndBlocks(tbl.getTxn().proc) + _, stats, err := s3writer.WriteEndBlocks(tbl.getTxn().proc) if err != nil { - return nil, nil, err + return nil, err } - return createdBlks, stats, nil + return stats, nil } func (tbl *txnTable) Delete(ctx context.Context, bat *batch.Batch, name string) error { diff --git a/pkg/vm/engine/disttae/util.go b/pkg/vm/engine/disttae/util.go index ef1dd1ccb38a..6abd6e390c13 100644 --- a/pkg/vm/engine/disttae/util.go +++ b/pkg/vm/engine/disttae/util.go @@ -1159,9 +1159,11 @@ func (i *StatsBlkIter) Entry() objectio.BlockInfo { } // assume that all blks have DefaultBlockMaxRows, except the last one - if i.meta.IsEmpty() { + if i.meta == nil || i.meta.IsEmpty() { if i.cur == int(i.blkCnt-1) { i.curBlkRows = i.totalRows - i.accRows + } else { + i.curBlkRows = options.DefaultBlockMaxRows } } else { i.curBlkRows = i.meta.GetBlockMeta(uint32(i.cur)).GetRows() diff --git a/pkg/vm/engine/tae/rpc/handle.go b/pkg/vm/engine/tae/rpc/handle.go index 0d0e03fc30b1..a2bd14330ca2 100644 --- a/pkg/vm/engine/tae/rpc/handle.go +++ b/pkg/vm/engine/tae/rpc/handle.go @@ -267,12 +267,13 @@ func (h *Handle) HandlePreCommitWrite( PkCheck: db.PKCheckType(pe.GetPkCheckByTn()), } if req.FileName != "" { - loc := req.Batch.Vecs[0] + //loc := req.Batch.Vecs[0] for i := 0; i < req.Batch.RowCount(); i++ { + stats := objectio.ObjectStats(req.Batch.Vecs[0].GetBytesAt(i)) if req.Type == db.EntryInsert { - req.MetaLocs = append(req.MetaLocs, loc.GetStringAt(i)) + req.MetaLocs = append(req.MetaLocs, stats.ObjectLocation().String()) } else { - req.DeltaLocs = append(req.DeltaLocs, loc.GetStringAt(i)) + req.DeltaLocs = append(req.DeltaLocs, stats.ObjectLocation().String()) } } } @@ -571,25 +572,25 @@ func (h *Handle) HandleWrite( if req.Type == db.EntryInsert { //Add blocks which had been bulk-loaded into S3 into table. if req.FileName != "" { - metalocations := make(map[string]struct{}) - for _, metLoc := range req.MetaLocs { - location, err := blockio.EncodeLocationFromString(metLoc) - if err != nil { - return err - } - metalocations[location.Name().String()] = struct{}{} - } - statsCNVec := req.Batch.Vecs[1] + //metalocations := make(map[string]struct{}) + //for _, metLoc := range req.MetaLocs { + // location, err := blockio.EncodeLocationFromString(metLoc) + // if err != nil { + // return err + // } + // metalocations[location.Name().String()] = struct{}{} + //} + statsCNVec := req.Batch.Vecs[0] statsVec := containers.ToTNVector(statsCNVec, common.WorkspaceAllocator) - for i := 0; i < statsVec.Length(); i++ { - s := objectio.ObjectStats(statsVec.Get(i).([]byte)) - delete(metalocations, s.ObjectName().String()) - } - if len(metalocations) != 0 { - logutil.Warnf("tbl %v, not receive stats of following locations %v", req.TableName, metalocations) - err = moerr.NewInternalError(ctx, "object stats doesn't match meta locations") - return - } + //for i := 0; i < statsVec.Length(); i++ { + // s := objectio.ObjectStats(statsVec.Get(i).([]byte)) + // delete(metalocations, s.ObjectName().String()) + //} + //if len(metalocations) != 0 { + // logutil.Warnf("tbl %v, not receive stats of following locations %v", req.TableName, metalocations) + // err = moerr.NewInternalError(ctx, "object stats doesn't match meta locations") + // return + //} err = tb.AddObjsWithMetaLoc(ctx, statsVec) return } From 7d33db40ce72ef64ab6402d892b343788999cebe Mon Sep 17 00:00:00 2001 From: gouhongshen Date: Tue, 28 May 2024 09:54:51 +0800 Subject: [PATCH 02/10] remove log --- pkg/sql/colexec/mergeblock/types.go | 6 ------ pkg/sql/colexec/s3util.go | 28 +++------------------------- pkg/vm/engine/disttae/txn.go | 2 ++ pkg/vm/engine/disttae/txn_table.go | 1 - pkg/vm/engine/tae/rpc/handle.go | 17 ----------------- 5 files changed, 5 insertions(+), 49 deletions(-) diff --git a/pkg/sql/colexec/mergeblock/types.go b/pkg/sql/colexec/mergeblock/types.go index 6823dbc12b2f..65d44ab9a6a6 100644 --- a/pkg/sql/colexec/mergeblock/types.go +++ b/pkg/sql/colexec/mergeblock/types.go @@ -226,12 +226,6 @@ func (arg *Argument) Split(proc *process.Process, bat *batch.Batch) error { hasObject := false for i := range tblIdx { // append s3 writer returned blk info if tblIdx[i] >= 0 { - //if arg.AddAffectedRows { - // blkInfo := objectio.DecodeBlockInfo(blkInfosVec.GetBytesAt(i)) - // arg.affectedRows += uint64(blkInfo.MetaLocation().Rows()) - //} - //vector.AppendBytes(arg.container.mp[int(tblIdx[i])].Vecs[0], - // blkInfosVec.GetBytesAt(i), false, proc.GetMPool()) hasObject = true } else { // append data idx := int(-(tblIdx[i] + 1)) diff --git a/pkg/sql/colexec/s3util.go b/pkg/sql/colexec/s3util.go index 1b71e9a1b3b2..24779b46e623 100644 --- a/pkg/sql/colexec/s3util.go +++ b/pkg/sql/colexec/s3util.go @@ -243,18 +243,15 @@ func (w *S3Writer) ResetBlockInfoBat(proc *process.Process) { if w.objStatsInfoBat != nil { proc.PutBatch(w.objStatsInfoBat) } - //attrs := []string{catalog.BlockMeta_TableIdx_Insert, catalog.BlockMeta_BlockInfo, catalog.ObjectMeta_ObjectStats} + attrs := []string{catalog.BlockMeta_TableIdx_Insert, catalog.ObjectMeta_ObjectStats} blockInfoBat := batch.NewWithSize(len(attrs)) blockInfoBat.Attrs = attrs - //objStatsInfoBat.Vecs[0] = proc.GetVector(types.T_int16.ToType()) + blockInfoBat.Vecs[objStatsInfoBatTblIdxOffset] = proc.GetVector(types.T_int16.ToType()) blockInfoBat.Vecs[objStatsInfoBatObjectOffset] = proc.GetVector(types.T_binary.ToType()) w.objStatsInfoBat = blockInfoBat - - //w.blkInfoBat = batch.NewWithSize(1) - //w.blkInfoBat.Vecs[0] = proc.GetVector(types.T_text.ToType()) } //func (w *S3Writer) WriteEnd(proc *process.Process) { @@ -648,26 +645,7 @@ func (w *S3Writer) writeEndBlocks(proc *process.Process) error { if err != nil { return err } - //for _, blkInfo := range blkInfos { - // if err := vector.AppendFixed( - // w.objStatsInfoBat.Vecs[0], - // w.partitionIndex, - // false, - // proc.GetMPool()); err != nil { - // return err - // } - //if err := vector.AppendBytes( - // w.blkInfoBat.Vecs[0], - // //[]byte(metaLoc), - // objectio.EncodeBlockInfo(blkInfo), - // false, - // proc.GetMPool()); err != nil { - // return err - //} - //} - - // append the object stats to bat, - // at most one will append in + for idx := 0; idx < len(stats); idx++ { if stats[idx].IsZero() { continue diff --git a/pkg/vm/engine/disttae/txn.go b/pkg/vm/engine/disttae/txn.go index df6534c83411..6d12d7e6775e 100644 --- a/pkg/vm/engine/disttae/txn.go +++ b/pkg/vm/engine/disttae/txn.go @@ -138,6 +138,7 @@ func (txn *Transaction) WriteBatch( tnStore: tnStore, truncate: truncate, } + txn.writes = append(txn.writes, e) txn.pkCount += bat.RowCount() @@ -633,6 +634,7 @@ func (txn *Transaction) WriteFileLocked( bat: newBat, tnStore: tnStore, } + txn.writes = append(txn.writes, entry) return nil } diff --git a/pkg/vm/engine/disttae/txn_table.go b/pkg/vm/engine/disttae/txn_table.go index f041b4726cd9..a6def3b2bc30 100644 --- a/pkg/vm/engine/disttae/txn_table.go +++ b/pkg/vm/engine/disttae/txn_table.go @@ -1500,7 +1500,6 @@ func (tbl *txnTable) Write(ctx context.Context, bat *batch.Batch) error { //bocks maybe come from different S3 object, here we just need to make sure fileName is not Nil. stats := objectio.ObjectStats(bat.Vecs[0].GetBytesAt(0)) fileName := stats.ObjectName().String() - //fileName := objectio.DecodeBlockInfo(bat.Vecs[0].GetBytesAt(0)).MetaLocation().Name().String() return tbl.getTxn().WriteFile( INSERT, tbl.accountId, diff --git a/pkg/vm/engine/tae/rpc/handle.go b/pkg/vm/engine/tae/rpc/handle.go index a2bd14330ca2..dbb4b881e54a 100644 --- a/pkg/vm/engine/tae/rpc/handle.go +++ b/pkg/vm/engine/tae/rpc/handle.go @@ -572,25 +572,8 @@ func (h *Handle) HandleWrite( if req.Type == db.EntryInsert { //Add blocks which had been bulk-loaded into S3 into table. if req.FileName != "" { - //metalocations := make(map[string]struct{}) - //for _, metLoc := range req.MetaLocs { - // location, err := blockio.EncodeLocationFromString(metLoc) - // if err != nil { - // return err - // } - // metalocations[location.Name().String()] = struct{}{} - //} statsCNVec := req.Batch.Vecs[0] statsVec := containers.ToTNVector(statsCNVec, common.WorkspaceAllocator) - //for i := 0; i < statsVec.Length(); i++ { - // s := objectio.ObjectStats(statsVec.Get(i).([]byte)) - // delete(metalocations, s.ObjectName().String()) - //} - //if len(metalocations) != 0 { - // logutil.Warnf("tbl %v, not receive stats of following locations %v", req.TableName, metalocations) - // err = moerr.NewInternalError(ctx, "object stats doesn't match meta locations") - // return - //} err = tb.AddObjsWithMetaLoc(ctx, statsVec) return } From cff91f3a7727f09e7d5a4d4348839a0c4feb5315 Mon Sep 17 00:00:00 2001 From: gouhongshen Date: Tue, 28 May 2024 15:28:02 +0800 Subject: [PATCH 03/10] replace more blkInfo --- pkg/vm/engine/disttae/partition_reader.go | 2 +- pkg/vm/engine/disttae/txn.go | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/pkg/vm/engine/disttae/partition_reader.go b/pkg/vm/engine/disttae/partition_reader.go index 51643485a06b..50f07d480361 100644 --- a/pkg/vm/engine/disttae/partition_reader.go +++ b/pkg/vm/engine/disttae/partition_reader.go @@ -79,7 +79,7 @@ func (p *PartitionReader) prepare() error { if entry.bat == nil || entry.bat.IsEmpty() { return } - if entry.bat.Attrs[0] == catalog.BlockMeta_MetaLoc { + if entry.bat.Attrs[0] == catalog.ObjectMeta_ObjectStats { return } inserts = append(inserts, entry.bat) diff --git a/pkg/vm/engine/disttae/txn.go b/pkg/vm/engine/disttae/txn.go index 6d12d7e6775e..52e53740bf17 100644 --- a/pkg/vm/engine/disttae/txn.go +++ b/pkg/vm/engine/disttae/txn.go @@ -755,7 +755,8 @@ func (txn *Transaction) deleteTableWrites( } // for 3 and 4 above. if e.bat.Attrs[0] == catalog.BlockMeta_MetaLoc || - e.bat.Attrs[0] == catalog.BlockMeta_DeltaLoc { + e.bat.Attrs[0] == catalog.BlockMeta_DeltaLoc || + e.bat.Attrs[0] == catalog.ObjectMeta_ObjectStats { continue } sels = sels[:0] From f77ef1349f2ebb5e723c94deaae4c8129ebfa458 Mon Sep 17 00:00:00 2001 From: gouhongshen Date: Wed, 29 May 2024 13:52:51 +0800 Subject: [PATCH 04/10] update split object --- pkg/sql/colexec/mergeblock/types.go | 79 +++++++++++++++++++---------- 1 file changed, 53 insertions(+), 26 deletions(-) diff --git a/pkg/sql/colexec/mergeblock/types.go b/pkg/sql/colexec/mergeblock/types.go index 65d44ab9a6a6..28b460d0e39c 100644 --- a/pkg/sql/colexec/mergeblock/types.go +++ b/pkg/sql/colexec/mergeblock/types.go @@ -14,6 +14,7 @@ package mergeblock import ( + "fmt" "github.com/matrixorigin/matrixone/pkg/catalog" "github.com/matrixorigin/matrixone/pkg/common/reuse" "github.com/matrixorigin/matrixone/pkg/container/batch" @@ -159,25 +160,50 @@ func (arg *Argument) GetMetaLocBat(src *batch.Batch, proc *process.Process) { } } -func splitObjectStats(arg *Argument, proc *process.Process, - bat *batch.Batch, blkVec *vector.Vector, tblIdx []int16, -) error { - // bat comes from old CN, no object stats vec in it. +func splitObjectStats(arg *Argument, proc *process.Process, bat *batch.Batch) error { + var ( + err error + fs fileservice.FileService + needLoad bool + statsIdx int + objStats objectio.ObjectStats + blkVec *vector.Vector + tblIdx []int16 + statsVec *vector.Vector + batLen = len(bat.Attrs) + objDataMeta objectio.ObjectDataMeta + ) + + tblIdx = vector.MustFixedCol[int16](bat.GetVector(0)) + + // bat comes from old CN, may has not object stats vec in it. // to ensure all bats the TN received contain the object stats column, we should // construct the object stats from block info here. - needLoad := bat.Attrs[len(bat.Attrs)-1] != catalog.ObjectMeta_ObjectStats - - fs, err := fileservice.Get[fileservice.FileService](proc.FileService, defines.SharedFileServiceName) - if err != nil { - logutil.Error("get fs failed when split object stats. ", zap.Error(err)) - return err + // + // bat comes from different version: + // V1 [table index | blk info] + // V2 [table index | blk info | object stats ] + // V3 [table index | object stats] + // only V1 need to construct the object stats from block info + // + if bat.Attrs[batLen-1] == catalog.ObjectMeta_ObjectStats { + // V2, V3 + statsVec = bat.GetVector(int32(batLen - 1)) + } else if bat.Attrs[batLen-1] == catalog.BlockMeta_MetaLoc { + // V1 + needLoad = true + blkVec = bat.GetVector(int32(batLen - 1)) + objDataMeta = objectio.BuildObjectMeta(uint16(blkVec.Length())) + fs, err = fileservice.Get[fileservice.FileService](proc.FileService, defines.SharedFileServiceName) + if err != nil { + logutil.Error("get fs failed when split object stats. ", zap.Error(err)) + return err + } + } else { + panic(fmt.Sprintf("splitObjectStats got wrong bat: [attrs=%s, rows=%d, cnt=%d]\n", + bat.Attrs, bat.RowCount(), bat.Cnt)) } - objDataMeta := objectio.BuildObjectMeta(uint16(blkVec.Length())) - var objStats objectio.ObjectStats - statsVec := bat.Vecs[1] - statsIdx := 0 - for idx := 0; idx < len(tblIdx); idx++ { if tblIdx[idx] < 0 { // will the data and blk infos mixed together in one batch? @@ -185,15 +211,15 @@ func splitObjectStats(arg *Argument, proc *process.Process, continue } - blkInfo := objectio.DecodeBlockInfo(blkVec.GetBytesAt(idx)) - if objectio.IsSameObjectLocVsMeta(blkInfo.MetaLocation(), objDataMeta) { - continue - } - - destVec := arg.container.mp[int(tblIdx[idx])].Vecs[0] affectedRows := uint32(0) + destVec := arg.container.mp[int(tblIdx[idx])].Vecs[0] + // comes from old version cn without object stats if needLoad { - // comes from old version cn + blkInfo := objectio.DecodeBlockInfo(blkVec.GetBytesAt(idx)) + if objectio.IsSameObjectLocVsMeta(blkInfo.MetaLocation(), objDataMeta) { + continue + } + objStats, objDataMeta, err = disttae.ConstructObjStatsByLoadObjMeta(proc.Ctx, blkInfo.MetaLocation(), fs) if err != nil { return err @@ -202,11 +228,12 @@ func splitObjectStats(arg *Argument, proc *process.Process, affectedRows = objStats.Rows() vector.AppendBytes(destVec, objStats.Marshal(), false, proc.GetMPool()) } else { + // with object stats in it // not comes from old version cn - vector.AppendBytes(destVec, statsVec.GetBytesAt(statsIdx), false, proc.GetMPool()) - objDataMeta.BlockHeader().SetBlockID(&blkInfo.BlockID) + stats := objectio.ObjectStats(statsVec.GetBytesAt(statsIdx)) + vector.AppendBytes(destVec, stats.Marshal(), false, proc.GetMPool()) statsIdx++ - affectedRows = uint32(0) + affectedRows = uint32(stats.Rows()) } if arg.AddAffectedRows { @@ -243,7 +270,7 @@ func (arg *Argument) Split(proc *process.Process, bat *batch.Batch) error { // exist blk info, split it if hasObject { - if err := splitObjectStats(arg, proc, bat, blkInfosVec, tblIdx); err != nil { + if err := splitObjectStats(arg, proc, bat); err != nil { return err } } From 31e977bb39c2f8ce134e107d7a7f8d9392dcc6bd Mon Sep 17 00:00:00 2001 From: gouhongshen Date: Mon, 3 Jun 2024 16:53:24 +0800 Subject: [PATCH 05/10] update --- pkg/vm/engine/disttae/types.go | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/pkg/vm/engine/disttae/types.go b/pkg/vm/engine/disttae/types.go index 9a878d0daf3f..e1480c76b62e 100644 --- a/pkg/vm/engine/disttae/types.go +++ b/pkg/vm/engine/disttae/types.go @@ -465,12 +465,14 @@ func (txn *Transaction) gcObjs(start int) error { //2. Remove the segments generated by this statement lazily till txn commits or rollback. //3. Now, GC the s3 objects(data objects and tombstone objects) asynchronously. if txn.writes[i].fileName != "" { - vs, area := vector.MustVarlenaRawData(txn.writes[i].bat.GetVector(0)) - for i := range vs { - loc, _ := blockio.EncodeLocationFromString(vs[i].UnsafeGetString(area)) - if _, ok := objsToGC[loc.Name().String()]; !ok { - objsToGC[loc.Name().String()] = struct{}{} - objsName = append(objsName, loc.Name().String()) + col, area := vector.MustVarlenaRawData(txn.writes[i].bat.GetVector(0)) + for idx := 0; idx < len(col); idx++ { + stats := objectio.ObjectStats(col[i].GetByteSlice(area)) + objName := stats.ObjectName().String() + //loc, _ := blockio.EncodeLocationFromString(s) + if _, ok := objsToGC[objName]; !ok { + objsToGC[objName] = struct{}{} + objsName = append(objsName, objName) } } } From e8214368baf507d79c87ede23588cc1b07c6b472 Mon Sep 17 00:00:00 2001 From: gouhongshen Date: Tue, 4 Jun 2024 10:03:41 +0800 Subject: [PATCH 06/10] fix gcObjs panic --- pkg/vm/engine/disttae/types.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/vm/engine/disttae/types.go b/pkg/vm/engine/disttae/types.go index e1480c76b62e..793982bff645 100644 --- a/pkg/vm/engine/disttae/types.go +++ b/pkg/vm/engine/disttae/types.go @@ -467,7 +467,7 @@ func (txn *Transaction) gcObjs(start int) error { if txn.writes[i].fileName != "" { col, area := vector.MustVarlenaRawData(txn.writes[i].bat.GetVector(0)) for idx := 0; idx < len(col); idx++ { - stats := objectio.ObjectStats(col[i].GetByteSlice(area)) + stats := objectio.ObjectStats(col[idx].GetByteSlice(area)) objName := stats.ObjectName().String() //loc, _ := blockio.EncodeLocationFromString(s) if _, ok := objsToGC[objName]; !ok { From 71fe91f93c951a2a7086be85b158fcf81bee2a72 Mon Sep 17 00:00:00 2001 From: gouhongshen Date: Tue, 4 Jun 2024 14:29:16 +0800 Subject: [PATCH 07/10] fix ranges uncommitted objects --- pkg/vm/engine/disttae/txn_table.go | 14 +++++++++----- pkg/vm/engine/disttae/types.go | 6 ++++++ 2 files changed, 15 insertions(+), 5 deletions(-) diff --git a/pkg/vm/engine/disttae/txn_table.go b/pkg/vm/engine/disttae/txn_table.go index a6def3b2bc30..cb51d4acfa02 100644 --- a/pkg/vm/engine/disttae/txn_table.go +++ b/pkg/vm/engine/disttae/txn_table.go @@ -924,13 +924,17 @@ func (tbl *txnTable) collectUnCommittedObjects() []objectio.ObjectStats { if entry.typ == INSERT_TXN { return } - if entry.typ != INSERT || - len(entry.bat.Attrs) < 2 || - entry.bat.Attrs[1] != catalog.ObjectMeta_ObjectStats { + + if entry.typ != INSERT || entry.fileName == "" { return } - for i := 0; i < entry.bat.Vecs[1].Length(); i++ { - stats.UnMarshal(entry.bat.Vecs[1].GetBytesAt(i)) + + if entry.bat.Attrs[0] != catalog.ObjectMeta_ObjectStats { + panic(fmt.Sprintf("expected object stats, but got: %s", entry.String())) + } + + for i := 0; i < entry.bat.Vecs[0].Length(); i++ { + stats.UnMarshal(entry.bat.Vecs[0].GetBytesAt(i)) unCommittedObjects = append(unCommittedObjects, stats) } }) diff --git a/pkg/vm/engine/disttae/types.go b/pkg/vm/engine/disttae/types.go index 793982bff645..09a60842e072 100644 --- a/pkg/vm/engine/disttae/types.go +++ b/pkg/vm/engine/disttae/types.go @@ -16,6 +16,7 @@ package disttae import ( "context" + "fmt" "math" "sync" "sync/atomic" @@ -620,6 +621,11 @@ type Entry struct { truncate bool } +func (e *Entry) String() string { + return fmt.Sprintf("type=%s, accId=%d, db=(%s-%d) tbl=(%s-%d), filename=%s, batLen=%d, truncate=%v", + typesNames[e.typ], e.accountId, e.databaseName, e.databaseId, e.tableName, e.tableId, e.fileName, e.bat.Size(), e.truncate) +} + // isGeneratedByTruncate denotes the entry is yielded by the truncate operation. func (e *Entry) isGeneratedByTruncate() bool { return e.typ == DELETE && From 148cfbcdc76cb855158fb6e82d5352c00a710287 Mon Sep 17 00:00:00 2001 From: gouhongshen Date: Wed, 5 Jun 2024 10:04:49 +0800 Subject: [PATCH 08/10] rm logs, fix sca --- pkg/container/batch/batch.go | 4 ---- pkg/sql/colexec/mergeblock/types.go | 1 + pkg/vm/engine/disttae/txn.go | 18 +----------------- 3 files changed, 2 insertions(+), 21 deletions(-) diff --git a/pkg/container/batch/batch.go b/pkg/container/batch/batch.go index 0e3330a5fe56..e45accbd13fd 100644 --- a/pkg/container/batch/batch.go +++ b/pkg/container/batch/batch.go @@ -177,10 +177,6 @@ func (bat *Batch) SetVector(pos int32, vec *vector.Vector) { } func (bat *Batch) GetVector(pos int32) *vector.Vector { - if len(bat.Vecs) == 0 { - x := 0 - x++ - } return bat.Vecs[pos] } diff --git a/pkg/sql/colexec/mergeblock/types.go b/pkg/sql/colexec/mergeblock/types.go index 28b460d0e39c..157f85f9f440 100644 --- a/pkg/sql/colexec/mergeblock/types.go +++ b/pkg/sql/colexec/mergeblock/types.go @@ -15,6 +15,7 @@ package mergeblock import ( "fmt" + "github.com/matrixorigin/matrixone/pkg/catalog" "github.com/matrixorigin/matrixone/pkg/common/reuse" "github.com/matrixorigin/matrixone/pkg/container/batch" diff --git a/pkg/vm/engine/disttae/txn.go b/pkg/vm/engine/disttae/txn.go index 52e53740bf17..535c9ac6919f 100644 --- a/pkg/vm/engine/disttae/txn.go +++ b/pkg/vm/engine/disttae/txn.go @@ -875,29 +875,14 @@ func (txn *Transaction) compactionBlksLocked() error { objStats = stats[objectio.SchemaTombstone] } - //if len(createdBlks) > 0 { bat := batch.NewWithSize(1) bat.Attrs = []string{catalog.ObjectMeta_ObjectStats} - //bat.SetVector(0, vector.NewVec(types.T_text.ToType())) bat.SetVector(0, vector.NewVec(types.T_binary.ToType())) - //for _, blkInfo := range createdBlks { - // vector.AppendBytes( - // bat.GetVector(0), - // objectio.EncodeBlockInfo(blkInfo), - // false, - // tbl.getTxn().proc.GetMPool()) - //} - - // append the object stats to bat - //for idx := 0; idx < len(stats); idx++ { - // if stats[idx].IsZero() { - // continue - // } + if err = vector.AppendBytes(bat.Vecs[0], objStats.Marshal(), false, tbl.getTxn().proc.GetMPool()); err != nil { return err } - //} bat.SetRowCount(1) defer func() { @@ -919,7 +904,6 @@ func (txn *Transaction) compactionBlksLocked() error { return err } } - //} //compaction for txn.writes for i, entry := range txn.writes { From 975073b6b88e33d4f8cc3b6187290f781c93284c Mon Sep 17 00:00:00 2001 From: gouhongshen Date: Wed, 5 Jun 2024 19:05:40 +0800 Subject: [PATCH 09/10] left blkinfo in deletes tombstone --- pkg/vm/engine/tae/rpc/handle.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pkg/vm/engine/tae/rpc/handle.go b/pkg/vm/engine/tae/rpc/handle.go index dbb4b881e54a..6ebf920d1347 100644 --- a/pkg/vm/engine/tae/rpc/handle.go +++ b/pkg/vm/engine/tae/rpc/handle.go @@ -267,13 +267,13 @@ func (h *Handle) HandlePreCommitWrite( PkCheck: db.PKCheckType(pe.GetPkCheckByTn()), } if req.FileName != "" { - //loc := req.Batch.Vecs[0] for i := 0; i < req.Batch.RowCount(); i++ { - stats := objectio.ObjectStats(req.Batch.Vecs[0].GetBytesAt(i)) if req.Type == db.EntryInsert { + stats := objectio.ObjectStats(req.Batch.Vecs[0].GetBytesAt(i)) req.MetaLocs = append(req.MetaLocs, stats.ObjectLocation().String()) } else { - req.DeltaLocs = append(req.DeltaLocs, stats.ObjectLocation().String()) + loc := req.Batch.Vecs[0] + req.DeltaLocs = append(req.DeltaLocs, loc.GetStringAt(i)) } } } From e9b2208dd3071330bc153796d442ff74b00f5187 Mon Sep 17 00:00:00 2001 From: gouhongshen Date: Tue, 11 Jun 2024 11:49:21 +0800 Subject: [PATCH 10/10] fix ut --- pkg/sql/colexec/mergeblock/mergeblock_test.go | 30 +++++++++++------ pkg/sql/colexec/mergeblock/types.go | 32 ++++++------------- pkg/vm/engine/tae/rpc/rpc_test.go | 30 ++++++----------- 3 files changed, 38 insertions(+), 54 deletions(-) diff --git a/pkg/sql/colexec/mergeblock/mergeblock_test.go b/pkg/sql/colexec/mergeblock/mergeblock_test.go index c2c2fded079a..3304f2ab39d9 100644 --- a/pkg/sql/colexec/mergeblock/mergeblock_test.go +++ b/pkg/sql/colexec/mergeblock/mergeblock_test.go @@ -52,9 +52,20 @@ func TestMergeBlock(t *testing.T) { name1 := objectio.BuildObjectName(segmentid, 0) name2 := objectio.BuildObjectName(segmentid, 1) name3 := objectio.BuildObjectName(segmentid, 2) - loc1 := blockio.EncodeLocation(name1, objectio.Extent{}, 15, 0) - loc2 := blockio.EncodeLocation(name2, objectio.Extent{}, 15, 0) - loc3 := blockio.EncodeLocation(name3, objectio.Extent{}, 15, 0) + + rowCnt := uint32(15) + loc1 := blockio.EncodeLocation(name1, objectio.Extent{}, rowCnt, 0) + loc2 := blockio.EncodeLocation(name2, objectio.Extent{}, rowCnt, 0) + loc3 := blockio.EncodeLocation(name3, objectio.Extent{}, rowCnt, 0) + + var stats1, stats2, stats3 objectio.ObjectStats + objectio.SetObjectStatsObjectName(&stats1, name1) + objectio.SetObjectStatsObjectName(&stats2, name2) + objectio.SetObjectStatsObjectName(&stats3, name3) + + objectio.SetObjectStatsRowCnt(&stats1, rowCnt) + objectio.SetObjectStatsRowCnt(&stats2, rowCnt) + objectio.SetObjectStatsRowCnt(&stats3, rowCnt) sid1 := loc1.Name().SegmentId() blkInfo1 := objectio.BlockInfo{ @@ -102,9 +113,9 @@ func TestMergeBlock(t *testing.T) { string(objectio.EncodeBlockInfo(blkInfo3))}, nil), testutil.MakeTextVector([]string{ - string(objectio.ZeroObjectStats[:]), - string(objectio.ZeroObjectStats[:]), - string(objectio.ZeroObjectStats[:])}, + string(stats1.Marshal()), + string(stats2.Marshal()), + string(stats3.Marshal())}, nil), }, Cnt: 1, @@ -133,20 +144,19 @@ func TestMergeBlock(t *testing.T) { // argument1.Prepare(proc) _, err := argument1.Call(proc) require.NoError(t, err) - require.Equal(t, uint64(15*3), argument1.affectedRows) + require.Equal(t, uint64(rowCnt*3), argument1.affectedRows) // Check Tbl { result := argument1.container.source.(*mockRelation).result // check attr names require.True(t, reflect.DeepEqual( - []string{catalog.BlockMeta_BlockInfo, catalog.ObjectMeta_ObjectStats}, + []string{catalog.ObjectMeta_ObjectStats}, result.Attrs, )) // check vector - require.Equal(t, 2, len(result.Vecs)) + require.Equal(t, 1, len(result.Vecs)) //for i, vec := range result.Vecs { require.Equal(t, 3, result.Vecs[0].Length(), fmt.Sprintf("column number: %d", 0)) - require.Equal(t, 3, result.Vecs[1].Length(), fmt.Sprintf("column number: %d", 1)) //} } // Check UniqueTables diff --git a/pkg/sql/colexec/mergeblock/types.go b/pkg/sql/colexec/mergeblock/types.go index 157f85f9f440..e53f0509d1eb 100644 --- a/pkg/sql/colexec/mergeblock/types.go +++ b/pkg/sql/colexec/mergeblock/types.go @@ -124,39 +124,25 @@ func (arg *Argument) Free(proc *process.Process, pipelineFailed bool, err error) } func (arg *Argument) GetMetaLocBat(src *batch.Batch, proc *process.Process) { - var typs []types.Type - // exclude the table id column - attrs := src.Attrs[1:] - - for idx := 1; idx < len(src.Vecs); idx++ { - typs = append(typs, *src.Vecs[idx].GetType()) - } - - // src comes from old CN which haven't object stats column - if src.Attrs[len(src.Attrs)-1] != catalog.ObjectMeta_ObjectStats { - attrs = append(attrs, catalog.ObjectMeta_ObjectStats) - typs = append(typs, types.T_binary.ToType()) - } + // [idx | object stats] + // [idx | blk info] + // [idx | blk info | object stats] // If the target is a partition table if len(arg.container.partitionSources) > 0 { // 'i' aligns with partition number for i := range arg.container.partitionSources { - bat := batch.NewWithSize(len(attrs)) - bat.Attrs = attrs + bat := batch.NewWithSize(1) + bat.Attrs = []string{catalog.ObjectMeta_ObjectStats} bat.Cnt = 1 - for idx := 0; idx < len(attrs); idx++ { - bat.Vecs[idx] = proc.GetVector(typs[idx]) - } + bat.Vecs[0] = proc.GetVector(types.T_binary.ToType()) arg.container.mp[i] = bat } } else { - bat := batch.NewWithSize(len(attrs)) - bat.Attrs = attrs + bat := batch.NewWithSize(1) + bat.Attrs = []string{catalog.ObjectMeta_ObjectStats} bat.Cnt = 1 - for idx := 0; idx < len(attrs); idx++ { - bat.Vecs[idx] = proc.GetVector(typs[idx]) - } + bat.Vecs[0] = proc.GetVector(types.T_binary.ToType()) arg.container.mp[0] = bat } } diff --git a/pkg/vm/engine/tae/rpc/rpc_test.go b/pkg/vm/engine/tae/rpc/rpc_test.go index f30b96830f73..95cd124bdfcc 100644 --- a/pkg/vm/engine/tae/rpc/rpc_test.go +++ b/pkg/vm/engine/tae/rpc/rpc_test.go @@ -84,7 +84,7 @@ func TestHandle_HandleCommitPerformanceForS3Load(t *testing.T) { //moBats[3] = containers.CopyToCNBatch(taeBats[3]) var objNames []objectio.ObjectName - var blkMetas []string + //var blkMetas []string var stats []objectio.ObjectStats offset := 0 for i := 0; i < 100; i++ { @@ -98,19 +98,10 @@ func TestHandle_HandleCommitPerformanceForS3Load(t *testing.T) { //offset++ } offset += 50 - blocks, _, err := writer.Sync(context.Background()) + _, _, err = writer.Sync(context.Background()) + stats = append(stats, writer.GetObjectStats()[objectio.SchemaData]) assert.Nil(t, err) - assert.Equal(t, 50, len(blocks)) - for _, blk := range blocks { - metaLoc := blockio.EncodeLocation( - writer.GetName(), - blk.GetExtent(), - uint32(taeBats[0].Vecs[0].Length()), - blk.GetID()) - assert.Nil(t, err) - blkMetas = append(blkMetas, metaLoc.String()) - stats = append(stats, writer.GetObjectStats()[objectio.SchemaData]) - } + assert.Equal(t, 50, int(stats[len(stats)-1].BlkCnt())) } //create dbtest and tbtest; @@ -170,18 +161,15 @@ func TestHandle_HandleCommitPerformanceForS3Load(t *testing.T) { entries = append(entries, createTbEntries...) //add 100 * 50 blocks from S3 into "tbtest" table - attrs := []string{catalog2.BlockMeta_MetaLoc, catalog2.ObjectMeta_ObjectStats} - vecTypes := []types.Type{types.New(types.T_varchar, types.MaxVarcharLen, 0), types.New(types.T_varchar, types.MaxVarcharLen, 0)} + attrs := []string{catalog2.ObjectMeta_ObjectStats} + vecTypes := []types.Type{types.New(types.T_varchar, types.MaxVarcharLen, 0)} vecOpts := containers.Options{} vecOpts.Capacity = 0 offset = 0 - for _, obj := range objNames { + for i, obj := range objNames { metaLocBat := containers.BuildBatch(attrs, vecTypes, vecOpts) - for i := 0; i < 50; i++ { - metaLocBat.Vecs[0].Append([]byte(blkMetas[offset+i]), false) - metaLocBat.Vecs[1].Append([]byte(stats[offset+i][:]), false) - } - offset += 50 + //metaLocBat.Vecs[0].Append([]byte(blkMetas[offset+i]), false) + metaLocBat.Vecs[0].Append([]byte(stats[i][:]), false) metaLocMoBat := containers.ToCNBatch(metaLocBat) addS3BlkEntry, err := makePBEntry(INSERT, dbTestID, tbTestID, dbName, schema.Name, obj.String(), metaLocMoBat)