Skip to content

Commit

Permalink
ignore the blk info when commit data object to tn (step 1) (#18858)
Browse files Browse the repository at this point in the history
1. CN only commits data object stats to tn if there has any data flushed. (step 1)
2. CN no need to keep any block info.

Approved by: @XuPeng-SH
  • Loading branch information
gouhongshen authored Sep 20, 2024
1 parent 6f08087 commit e1f1b09
Show file tree
Hide file tree
Showing 5 changed files with 111 additions and 127 deletions.
5 changes: 5 additions & 0 deletions pkg/vm/engine/disttae/tools.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,11 @@ func toPBEntry(e Entry) (*api.Entry, error) {
if e.bat.Attrs[0] == catalog.BlockMeta_MetaLoc {
ebat.Vecs = e.bat.Vecs
ebat.Attrs = e.bat.Attrs

// no need to commit the blk info to tn
ebat.Vecs = ebat.Vecs[1:]
ebat.Attrs = e.bat.Attrs[1:]
ebat.SetRowCount(ebat.Vecs[0].Length())
} else {
//e.bat.Vecs[0] is rowid vector
ebat.Vecs = e.bat.Vecs[1:]
Expand Down
3 changes: 2 additions & 1 deletion pkg/vm/engine/tae/db/operations.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,8 @@ type WriteReq struct {
PkCheck PKCheckType
//S3 object file name
FileName string
MetaLocs []string
// cn flushed data object stats
DataObjectStats []objectio.ObjectStats
//for delete on S3
TombstoneStats []objectio.ObjectStats
//tasks for loading primary keys or deleted row ids
Expand Down
42 changes: 21 additions & 21 deletions pkg/vm/engine/tae/rpc/handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,10 +309,10 @@ func (h *Handle) HandlePreCommitWrite(
if req.FileName != "" {
col := req.Batch.Vecs[0]
for i := 0; i < req.Batch.RowCount(); i++ {
stats := objectio.ObjectStats(col.GetBytesAt(i))
if req.Type == db.EntryInsert {
req.MetaLocs = append(req.MetaLocs, col.GetStringAt(i))
req.DataObjectStats = append(req.DataObjectStats, stats)
} else {
stats := objectio.ObjectStats(col.GetBytesAt(i))
req.TombstoneStats = append(req.TombstoneStats, stats)
}
}
Expand Down Expand Up @@ -718,31 +718,31 @@ func (h *Handle) HandleWrite(
if req.Type == db.EntryInsert {
//Add blocks which had been bulk-loaded into S3 into table.
if req.FileName != "" {
metalocations := make(map[string]struct{})
for _, metLoc := range req.MetaLocs {
location, err := blockio.EncodeLocationFromString(metLoc)
if err != nil {
return err
}
metalocations[location.Name().String()] = struct{}{}
}
statsVec := req.Batch.Vecs[1]
//metalocations := make(map[string]struct{})
//for _, metLoc := range req.MetaLocs {
// location, err := blockio.EncodeLocationFromString(metLoc)
// if err != nil {
// return err
// }
// metalocations[location.Name().String()] = struct{}{}
//}
statsVec := req.Batch.Vecs[0]
for i := 0; i < statsVec.Length(); i++ {
s := objectio.ObjectStats(statsVec.GetBytesAt(i))
if !s.GetCNCreated() {
logutil.Fatal("the `CNCreated` mask not set")
}
delete(metalocations, s.ObjectName().String())
}
if len(metalocations) != 0 {
logutil.Warn(
"TAE-EMPTY-STATS",
zap.Any("locations", metalocations),
zap.String("table", req.TableName),
)
err = moerr.NewInternalError(ctx, "object stats doesn't match meta locations")
return
//delete(metalocations, s.ObjectName().String())
}
//if len(metalocations) != 0 {
// logutil.Warn(
// "TAE-EMPTY-STATS",
// zap.Any("locations", metalocations),
// zap.String("table", req.TableName),
// )
// err = moerr.NewInternalError(ctx, "object stats doesn't match meta locations")
// return
//}
err = tb.AddObjsWithMetaLoc(
ctx,
containers.ToTNVector(statsVec, common.WorkspaceAllocator),
Expand Down
105 changes: 26 additions & 79 deletions pkg/vm/engine/tae/rpc/rpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
catalog2 "github.com/matrixorigin/matrixone/pkg/catalog"
"github.com/matrixorigin/matrixone/pkg/container/batch"
"github.com/matrixorigin/matrixone/pkg/container/types"
"github.com/matrixorigin/matrixone/pkg/container/vector"
"github.com/matrixorigin/matrixone/pkg/objectio"
"github.com/matrixorigin/matrixone/pkg/pb/api"
"github.com/matrixorigin/matrixone/pkg/pb/plan"
Expand Down Expand Up @@ -67,53 +66,28 @@ func TestHandle_HandleCommitPerformanceForS3Load(t *testing.T) {
defer taeBat.Close()
taeBats := taeBat.Split(100 * 50)

//taeBats[0] = taeBats[0].CloneWindow(0, 10)
//taeBats[1] = taeBats[1].CloneWindow(0, 10)
//taeBats[2] = taeBats[2].CloneWindow(0, 10)
//taeBats[3] = taeBats[3].CloneWindow(0, 10)

//sort by primary key
//_, err = mergesort.SortBlockColumns(taeBats[0].Vecs, 1)
//assert.Nil(t, err)
//_, err = mergesort.SortBlockColumns(taeBats[1].Vecs, 1)
//assert.Nil(t, err)
//_, err = mergesort.SortBlockColumns(taeBats[2].Vecs, 1)
//assert.Nil(t, err)

//moBats := make([]*batch.Batch, 4)
//moBats[0] = containers.CopyToCNBatch(taeBats[0])
//moBats[1] = containers.CopyToCNBatch(taeBats[1])
//moBats[2] = containers.CopyToCNBatch(taeBats[2])
//moBats[3] = containers.CopyToCNBatch(taeBats[3])

var objNames []objectio.ObjectName
var blkMetas []string
var stats []objectio.ObjectStats
offset := 0
for i := 0; i < 100; i++ {
name := objectio.BuildObjectNameWithObjectID(objectio.NewObjectid())
objNames = append(objNames, name)
writer, err := blockio.NewBlockWriterNew(fs, objNames[i], 0, nil)
assert.Nil(t, err)
for i := 0; i < 50; i++ {
_, err := writer.WriteBatch(containers.ToCNBatch(taeBats[offset+i]))
for j := 0; j < 50; j++ {
_, err = writer.WriteBatch(containers.ToCNBatch(taeBats[offset+j]))
assert.Nil(t, err)
//offset++
}
offset += 50
blocks, _, err := writer.Sync(context.Background())
assert.Nil(t, err)
assert.Equal(t, 50, len(blocks))
for _, blk := range blocks {
metaLoc := blockio.EncodeLocation(
writer.GetName(),
blk.GetExtent(),
uint32(taeBats[0].Vecs[0].Length()),
blk.GetID())
assert.Nil(t, err)
blkMetas = append(blkMetas, metaLoc.String())
stats = append(stats, writer.GetObjectStats(objectio.WithCNCreated()))
}

ss := writer.GetObjectStats(objectio.WithCNCreated())
stats = append(stats, ss)

require.Equal(t, int(50), int(ss.BlkCnt()))
require.Equal(t, int(50*10), int(ss.Rows()))
}

//create dbtest and tbtest;
Expand Down Expand Up @@ -173,19 +147,13 @@ func TestHandle_HandleCommitPerformanceForS3Load(t *testing.T) {
entries = append(entries, createTbEntries...)

//add 100 * 50 blocks from S3 into "tbtest" table
attrs := []string{catalog2.BlockMeta_MetaLoc, catalog2.ObjectMeta_ObjectStats}
vecTypes := []types.Type{types.New(types.T_varchar, types.MaxVarcharLen, 0), types.New(types.T_varchar, types.MaxVarcharLen, 0)}
attrs := []string{catalog2.ObjectMeta_ObjectStats}
vecTypes := []types.Type{types.New(types.T_varchar, types.MaxVarcharLen, 0)}
vecOpts := containers.Options{}
vecOpts.Capacity = 0
offset = 0
for _, obj := range objNames {
for i, obj := range objNames {
metaLocBat := containers.BuildBatch(attrs, vecTypes, vecOpts)
for i := 0; i < 50; i++ {
metaLocBat.Vecs[0].Append([]byte(blkMetas[offset+i]), false)
//stats[offset+i].SetCNCreated()
metaLocBat.Vecs[1].Append([]byte(stats[offset+i][:]), false)
}
offset += 50
metaLocBat.Vecs[0].Append([]byte(stats[i][:]), false)
metaLocMoBat := containers.ToCNBatch(metaLocBat)
addS3BlkEntry, err := makePBEntry(INSERT, dbTestID,
tbTestID, dbName, schema.Name, obj.String(), metaLocMoBat)
Expand Down Expand Up @@ -229,6 +197,7 @@ func TestHandle_HandlePreCommitWriteS3(t *testing.T) {
schema.Extra.ObjectMaxBlocks = 2
taeBat := catalog.MockBatch(schema, 40)
defer taeBat.Close()

taeBats := taeBat.Split(4)
taeBats[0] = taeBats[0].CloneWindow(0, 10)
taeBats[1] = taeBats[1].CloneWindow(0, 10)
Expand Down Expand Up @@ -264,21 +233,9 @@ func TestHandle_HandlePreCommitWriteS3(t *testing.T) {
blocks, _, err := writer.Sync(context.Background())
assert.Nil(t, err)
assert.Equal(t, 2, len(blocks))
metaLoc1 := blockio.EncodeLocation(
writer.GetName(),
blocks[0].GetExtent(),
uint32(taeBats[0].Vecs[0].Length()),
blocks[0].GetID(),
).String()
assert.Nil(t, err)
metaLoc2 := blockio.EncodeLocation(
writer.GetName(),
blocks[1].GetExtent(),
uint32(taeBats[1].Vecs[0].Length()),
blocks[1].GetID(),
).String()
assert.Nil(t, err)
stats1 := writer.GetObjectStats(objectio.WithCNCreated())
require.Equal(t, int(2), int(stats1.BlkCnt()))
require.Equal(t, int(20), int(stats1.Rows()))

//write taeBats[3] into file service
objName2 := objectio.BuildObjectNameWithObjectID(objectio.NewObjectid())
Expand All @@ -290,14 +247,10 @@ func TestHandle_HandlePreCommitWriteS3(t *testing.T) {
blocks, _, err = writer.Sync(context.Background())
assert.Equal(t, 1, len(blocks))
assert.Nil(t, err)
metaLoc3 := blockio.EncodeLocation(
writer.GetName(),
blocks[0].GetExtent(),
uint32(taeBats[3].Vecs[0].Length()),
blocks[0].GetID(),
).String()

stats3 := writer.GetObjectStats(objectio.WithCNCreated())
assert.Nil(t, err)
require.Equal(t, int(1), int(stats3.BlkCnt()))
require.Equal(t, int(10), int(stats3.Rows()))

//create db;
dbName := "dbtest"
Expand Down Expand Up @@ -388,31 +341,25 @@ func TestHandle_HandlePreCommitWriteS3(t *testing.T) {
entries = append(entries, insertEntry)

//add two non-appendable blocks from S3 into "tbtest" table
attrs := []string{catalog2.BlockMeta_MetaLoc, catalog2.ObjectMeta_ObjectStats}
vecTypes := []types.Type{types.New(types.T_varchar, types.MaxVarcharLen, 0), types.New(types.T_varchar, types.MaxVarcharLen, 0)}
attrs := []string{catalog2.ObjectMeta_ObjectStats}
vecTypes := []types.Type{types.New(types.T_varchar, types.MaxVarcharLen, 0)}
vecOpts := containers.Options{}
vecOpts.Capacity = 0

metaLocBat1 := containers.BuildBatch(attrs, vecTypes, vecOpts)
metaLocBat1.Vecs[0].Append([]byte(metaLoc1), false)
metaLocBat1.Vecs[0].Append([]byte(metaLoc2), false)
//stats1.SetCNCreated()
metaLocBat1.Vecs[1].Append([]byte(stats1[:]), false)
metaLocBat1.Vecs[1].Append([]byte(stats1[:]), false)
metaLocBat1.Vecs[0].Append([]byte(stats1[:]), false)

metaLocMoBat1 := containers.ToCNBatch(metaLocBat1)
addS3BlkEntry1, err := makePBEntry(INSERT, dbTestID,
tbTestID, dbName, schema.Name, objName1.String(), metaLocMoBat1)
assert.NoError(t, err)
loc1 := vector.InefficientMustStrCol(metaLocMoBat1.GetVector(0))[0]
loc2 := vector.InefficientMustStrCol(metaLocMoBat1.GetVector(0))[1]
assert.Equal(t, metaLoc1, loc1)
assert.Equal(t, metaLoc2, loc2)

entries = append(entries, addS3BlkEntry1)

//add one non-appendable block from S3 into "tbtest" table
metaLocBat2 := containers.BuildBatch(attrs, vecTypes, vecOpts)
metaLocBat2.Vecs[0].Append([]byte(metaLoc3), false)
//stats3.SetCNCreated()
metaLocBat2.Vecs[1].Append([]byte(stats3[:]), false)
metaLocBat2.Vecs[0].Append([]byte(stats3[:]), false)

metaLocMoBat2 := containers.ToCNBatch(metaLocBat2)
addS3BlkEntry2, err := makePBEntry(INSERT, dbTestID,
tbTestID, dbName, schema.Name, objName2.String(), metaLocMoBat2)
Expand Down
83 changes: 57 additions & 26 deletions pkg/vm/engine/tae/rpc/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ package rpc

import (
"context"

"fmt"
"github.com/matrixorigin/matrixone/pkg/common/util"
"github.com/matrixorigin/matrixone/pkg/logutil"
"github.com/matrixorigin/matrixone/pkg/objectio"
Expand Down Expand Up @@ -79,37 +79,69 @@ func (h *Handle) prefetchDeleteRowID(_ context.Context, req *db.WriteReq) error
return err
}
}

logCNCommittedObjects(true, req.TableID, req.TableName, req.TombstoneStats)

return nil
}

func (h *Handle) prefetchMetadata(_ context.Context, req *db.WriteReq) (int, error) {
if len(req.MetaLocs) == 0 {
return 0, nil
func (h *Handle) prefetchMetadata(_ context.Context, req *db.WriteReq) error {
if len(req.DataObjectStats) == 0 {
return nil
}

//start loading jobs asynchronously,should create a new root context.
objCnt := 0
var objectName objectio.ObjectNameShort
for _, meta := range req.MetaLocs {
loc, err := blockio.EncodeLocationFromString(meta)
for _, stats := range req.DataObjectStats {
location := stats.BlockLocation(uint16(0), objectio.BlockMaxRows)
err := blockio.PrefetchMeta(h.db.Opts.SID, h.db.Runtime.Fs.Service, location)
if err != nil {
return 0, err
}
if !objectio.IsSameObjectLocVsShort(loc, &objectName) {
err := blockio.PrefetchMeta(h.db.Opts.SID, h.db.Runtime.Fs.Service, loc)
if err != nil {
return 0, err
}
objCnt++
objectName = *loc.Name().Short()
return err
}
}

logCNCommittedObjects(false, req.TableID, req.TableName, req.DataObjectStats)

return nil
}

func logCNCommittedObjects(
isTombstone bool,
tableId uint64,
tableName string,
statsList []objectio.ObjectStats) {

totalBlkCnt := 0
totalRowCnt := 0
totalOSize := float64(0)
totalCSize := float64(0)
var objNames = make([]string, 0, len(statsList))
for _, stats := range statsList {
totalBlkCnt += int(stats.BlkCnt())
totalRowCnt += int(stats.Rows())
totalCSize += float64(stats.Size())
totalOSize += float64(stats.OriginSize())
objNames = append(objNames, stats.ObjectName().ObjectId().ShortStringEx())
}

totalCSize /= 1024.0 * 1024.0
totalOSize /= 1024.0 * 1024.0

hint := "CN-COMMIT-S3-Data-Object"
if isTombstone {
hint = "CN-COMMIT-S3-Tombstone-Object"
}

logutil.Info(
"CN-COMMIT-S3",
zap.Int("table-id", int(req.TableID)),
zap.String("table-name", req.TableName),
zap.Int("obj-cnt", objCnt),
hint,
zap.Int("table-id", int(tableId)),
zap.String("table-name", tableName),
zap.Int("obj-cnt", len(statsList)),
zap.String("obj-osize", fmt.Sprintf("%.6fmb", totalOSize)),
zap.String("obj-csize", fmt.Sprintf("%.6fmb", totalCSize)),
zap.Int("blk-cnt", totalBlkCnt),
zap.Int("row-cnt", totalRowCnt),
zap.Strings("names", objNames),
)
return objCnt, nil
}

// TryPrefetchTxn only prefetch data written by CN, do not change the state machine of TxnEngine.
Expand All @@ -133,16 +165,15 @@ func (h *Handle) TryPrefetchTxn(ctx context.Context, meta txn.TxnMeta) error {
if r, ok := e.(*db.WriteReq); ok && r.FileName != "" {
if r.Type == db.EntryDelete {
// start to load deleted row ids
deltaLocCnt += 1
deltaLocCnt += len(r.TombstoneStats)
if err := h.prefetchDeleteRowID(ctx, r); err != nil {
return err
}
} else if r.Type == db.EntryInsert {
objCnt, err := h.prefetchMetadata(ctx, r)
if err != nil {
metaLocCnt += len(r.DataObjectStats)
if err := h.prefetchMetadata(ctx, r); err != nil {
return err
}
metaLocCnt += objCnt
}
}
}
Expand Down

0 comments on commit e1f1b09

Please sign in to comment.