Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ignore the blk info when commit data object to tn (step 1) #18858

Merged
5 changes: 5 additions & 0 deletions pkg/vm/engine/disttae/tools.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,11 @@ func toPBEntry(e Entry) (*api.Entry, error) {
if e.bat.Attrs[0] == catalog.BlockMeta_MetaLoc {
ebat.Vecs = e.bat.Vecs
ebat.Attrs = e.bat.Attrs

// no need to commit the blk info to tn
ebat.Vecs = ebat.Vecs[1:]
ebat.Attrs = e.bat.Attrs[1:]
ebat.SetRowCount(ebat.Vecs[0].Length())
} else {
//e.bat.Vecs[0] is rowid vector
ebat.Vecs = e.bat.Vecs[1:]
Expand Down
3 changes: 2 additions & 1 deletion pkg/vm/engine/tae/db/operations.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,8 @@ type WriteReq struct {
PkCheck PKCheckType
//S3 object file name
FileName string
MetaLocs []string
// cn flushed data object stats
DataObjectStats []objectio.ObjectStats
//for delete on S3
TombstoneStats []objectio.ObjectStats
//tasks for loading primary keys or deleted row ids
Expand Down
42 changes: 21 additions & 21 deletions pkg/vm/engine/tae/rpc/handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,10 +309,10 @@ func (h *Handle) HandlePreCommitWrite(
if req.FileName != "" {
col := req.Batch.Vecs[0]
for i := 0; i < req.Batch.RowCount(); i++ {
stats := objectio.ObjectStats(col.GetBytesAt(i))
if req.Type == db.EntryInsert {
req.MetaLocs = append(req.MetaLocs, col.GetStringAt(i))
req.DataObjectStats = append(req.DataObjectStats, stats)
} else {
stats := objectio.ObjectStats(col.GetBytesAt(i))
req.TombstoneStats = append(req.TombstoneStats, stats)
}
}
Expand Down Expand Up @@ -718,31 +718,31 @@ func (h *Handle) HandleWrite(
if req.Type == db.EntryInsert {
//Add blocks which had been bulk-loaded into S3 into table.
if req.FileName != "" {
metalocations := make(map[string]struct{})
for _, metLoc := range req.MetaLocs {
location, err := blockio.EncodeLocationFromString(metLoc)
if err != nil {
return err
}
metalocations[location.Name().String()] = struct{}{}
}
statsVec := req.Batch.Vecs[1]
//metalocations := make(map[string]struct{})
//for _, metLoc := range req.MetaLocs {
// location, err := blockio.EncodeLocationFromString(metLoc)
// if err != nil {
// return err
// }
// metalocations[location.Name().String()] = struct{}{}
//}
statsVec := req.Batch.Vecs[0]
for i := 0; i < statsVec.Length(); i++ {
s := objectio.ObjectStats(statsVec.GetBytesAt(i))
if !s.GetCNCreated() {
logutil.Fatal("the `CNCreated` mask not set")
}
delete(metalocations, s.ObjectName().String())
}
if len(metalocations) != 0 {
logutil.Warn(
"TAE-EMPTY-STATS",
zap.Any("locations", metalocations),
zap.String("table", req.TableName),
)
err = moerr.NewInternalError(ctx, "object stats doesn't match meta locations")
return
//delete(metalocations, s.ObjectName().String())
}
//if len(metalocations) != 0 {
// logutil.Warn(
// "TAE-EMPTY-STATS",
// zap.Any("locations", metalocations),
// zap.String("table", req.TableName),
// )
// err = moerr.NewInternalError(ctx, "object stats doesn't match meta locations")
// return
//}
err = tb.AddObjsWithMetaLoc(
ctx,
containers.ToTNVector(statsVec, common.WorkspaceAllocator),
Expand Down
105 changes: 26 additions & 79 deletions pkg/vm/engine/tae/rpc/rpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
catalog2 "github.com/matrixorigin/matrixone/pkg/catalog"
"github.com/matrixorigin/matrixone/pkg/container/batch"
"github.com/matrixorigin/matrixone/pkg/container/types"
"github.com/matrixorigin/matrixone/pkg/container/vector"
"github.com/matrixorigin/matrixone/pkg/objectio"
"github.com/matrixorigin/matrixone/pkg/pb/api"
"github.com/matrixorigin/matrixone/pkg/pb/plan"
Expand Down Expand Up @@ -67,53 +66,28 @@ func TestHandle_HandleCommitPerformanceForS3Load(t *testing.T) {
defer taeBat.Close()
taeBats := taeBat.Split(100 * 50)

//taeBats[0] = taeBats[0].CloneWindow(0, 10)
//taeBats[1] = taeBats[1].CloneWindow(0, 10)
//taeBats[2] = taeBats[2].CloneWindow(0, 10)
//taeBats[3] = taeBats[3].CloneWindow(0, 10)

//sort by primary key
//_, err = mergesort.SortBlockColumns(taeBats[0].Vecs, 1)
//assert.Nil(t, err)
//_, err = mergesort.SortBlockColumns(taeBats[1].Vecs, 1)
//assert.Nil(t, err)
//_, err = mergesort.SortBlockColumns(taeBats[2].Vecs, 1)
//assert.Nil(t, err)

//moBats := make([]*batch.Batch, 4)
//moBats[0] = containers.CopyToCNBatch(taeBats[0])
//moBats[1] = containers.CopyToCNBatch(taeBats[1])
//moBats[2] = containers.CopyToCNBatch(taeBats[2])
//moBats[3] = containers.CopyToCNBatch(taeBats[3])

var objNames []objectio.ObjectName
var blkMetas []string
var stats []objectio.ObjectStats
offset := 0
for i := 0; i < 100; i++ {
name := objectio.BuildObjectNameWithObjectID(objectio.NewObjectid())
objNames = append(objNames, name)
writer, err := blockio.NewBlockWriterNew(fs, objNames[i], 0, nil)
assert.Nil(t, err)
for i := 0; i < 50; i++ {
_, err := writer.WriteBatch(containers.ToCNBatch(taeBats[offset+i]))
for j := 0; j < 50; j++ {
_, err = writer.WriteBatch(containers.ToCNBatch(taeBats[offset+j]))
assert.Nil(t, err)
//offset++
}
offset += 50
blocks, _, err := writer.Sync(context.Background())
assert.Nil(t, err)
assert.Equal(t, 50, len(blocks))
for _, blk := range blocks {
metaLoc := blockio.EncodeLocation(
writer.GetName(),
blk.GetExtent(),
uint32(taeBats[0].Vecs[0].Length()),
blk.GetID())
assert.Nil(t, err)
blkMetas = append(blkMetas, metaLoc.String())
stats = append(stats, writer.GetObjectStats(objectio.WithCNCreated()))
}

ss := writer.GetObjectStats(objectio.WithCNCreated())
stats = append(stats, ss)

require.Equal(t, int(50), int(ss.BlkCnt()))
require.Equal(t, int(50*10), int(ss.Rows()))
}

//create dbtest and tbtest;
Expand Down Expand Up @@ -173,19 +147,13 @@ func TestHandle_HandleCommitPerformanceForS3Load(t *testing.T) {
entries = append(entries, createTbEntries...)

//add 100 * 50 blocks from S3 into "tbtest" table
attrs := []string{catalog2.BlockMeta_MetaLoc, catalog2.ObjectMeta_ObjectStats}
vecTypes := []types.Type{types.New(types.T_varchar, types.MaxVarcharLen, 0), types.New(types.T_varchar, types.MaxVarcharLen, 0)}
attrs := []string{catalog2.ObjectMeta_ObjectStats}
vecTypes := []types.Type{types.New(types.T_varchar, types.MaxVarcharLen, 0)}
vecOpts := containers.Options{}
vecOpts.Capacity = 0
offset = 0
for _, obj := range objNames {
for i, obj := range objNames {
metaLocBat := containers.BuildBatch(attrs, vecTypes, vecOpts)
for i := 0; i < 50; i++ {
metaLocBat.Vecs[0].Append([]byte(blkMetas[offset+i]), false)
//stats[offset+i].SetCNCreated()
metaLocBat.Vecs[1].Append([]byte(stats[offset+i][:]), false)
}
offset += 50
metaLocBat.Vecs[0].Append([]byte(stats[i][:]), false)
metaLocMoBat := containers.ToCNBatch(metaLocBat)
addS3BlkEntry, err := makePBEntry(INSERT, dbTestID,
tbTestID, dbName, schema.Name, obj.String(), metaLocMoBat)
Expand Down Expand Up @@ -229,6 +197,7 @@ func TestHandle_HandlePreCommitWriteS3(t *testing.T) {
schema.Extra.ObjectMaxBlocks = 2
taeBat := catalog.MockBatch(schema, 40)
defer taeBat.Close()

taeBats := taeBat.Split(4)
taeBats[0] = taeBats[0].CloneWindow(0, 10)
taeBats[1] = taeBats[1].CloneWindow(0, 10)
Expand Down Expand Up @@ -264,21 +233,9 @@ func TestHandle_HandlePreCommitWriteS3(t *testing.T) {
blocks, _, err := writer.Sync(context.Background())
assert.Nil(t, err)
assert.Equal(t, 2, len(blocks))
metaLoc1 := blockio.EncodeLocation(
writer.GetName(),
blocks[0].GetExtent(),
uint32(taeBats[0].Vecs[0].Length()),
blocks[0].GetID(),
).String()
assert.Nil(t, err)
metaLoc2 := blockio.EncodeLocation(
writer.GetName(),
blocks[1].GetExtent(),
uint32(taeBats[1].Vecs[0].Length()),
blocks[1].GetID(),
).String()
assert.Nil(t, err)
stats1 := writer.GetObjectStats(objectio.WithCNCreated())
require.Equal(t, int(2), int(stats1.BlkCnt()))
require.Equal(t, int(20), int(stats1.Rows()))

//write taeBats[3] into file service
objName2 := objectio.BuildObjectNameWithObjectID(objectio.NewObjectid())
Expand All @@ -290,14 +247,10 @@ func TestHandle_HandlePreCommitWriteS3(t *testing.T) {
blocks, _, err = writer.Sync(context.Background())
assert.Equal(t, 1, len(blocks))
assert.Nil(t, err)
metaLoc3 := blockio.EncodeLocation(
writer.GetName(),
blocks[0].GetExtent(),
uint32(taeBats[3].Vecs[0].Length()),
blocks[0].GetID(),
).String()

stats3 := writer.GetObjectStats(objectio.WithCNCreated())
assert.Nil(t, err)
require.Equal(t, int(1), int(stats3.BlkCnt()))
require.Equal(t, int(10), int(stats3.Rows()))

//create db;
dbName := "dbtest"
Expand Down Expand Up @@ -388,31 +341,25 @@ func TestHandle_HandlePreCommitWriteS3(t *testing.T) {
entries = append(entries, insertEntry)

//add two non-appendable blocks from S3 into "tbtest" table
attrs := []string{catalog2.BlockMeta_MetaLoc, catalog2.ObjectMeta_ObjectStats}
vecTypes := []types.Type{types.New(types.T_varchar, types.MaxVarcharLen, 0), types.New(types.T_varchar, types.MaxVarcharLen, 0)}
attrs := []string{catalog2.ObjectMeta_ObjectStats}
vecTypes := []types.Type{types.New(types.T_varchar, types.MaxVarcharLen, 0)}
vecOpts := containers.Options{}
vecOpts.Capacity = 0

metaLocBat1 := containers.BuildBatch(attrs, vecTypes, vecOpts)
metaLocBat1.Vecs[0].Append([]byte(metaLoc1), false)
metaLocBat1.Vecs[0].Append([]byte(metaLoc2), false)
//stats1.SetCNCreated()
metaLocBat1.Vecs[1].Append([]byte(stats1[:]), false)
metaLocBat1.Vecs[1].Append([]byte(stats1[:]), false)
metaLocBat1.Vecs[0].Append([]byte(stats1[:]), false)

metaLocMoBat1 := containers.ToCNBatch(metaLocBat1)
addS3BlkEntry1, err := makePBEntry(INSERT, dbTestID,
tbTestID, dbName, schema.Name, objName1.String(), metaLocMoBat1)
assert.NoError(t, err)
loc1 := vector.InefficientMustStrCol(metaLocMoBat1.GetVector(0))[0]
loc2 := vector.InefficientMustStrCol(metaLocMoBat1.GetVector(0))[1]
assert.Equal(t, metaLoc1, loc1)
assert.Equal(t, metaLoc2, loc2)

entries = append(entries, addS3BlkEntry1)

//add one non-appendable block from S3 into "tbtest" table
metaLocBat2 := containers.BuildBatch(attrs, vecTypes, vecOpts)
metaLocBat2.Vecs[0].Append([]byte(metaLoc3), false)
//stats3.SetCNCreated()
metaLocBat2.Vecs[1].Append([]byte(stats3[:]), false)
metaLocBat2.Vecs[0].Append([]byte(stats3[:]), false)

metaLocMoBat2 := containers.ToCNBatch(metaLocBat2)
addS3BlkEntry2, err := makePBEntry(INSERT, dbTestID,
tbTestID, dbName, schema.Name, objName2.String(), metaLocMoBat2)
Expand Down
83 changes: 57 additions & 26 deletions pkg/vm/engine/tae/rpc/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ package rpc

import (
"context"

"fmt"
"github.com/matrixorigin/matrixone/pkg/common/util"
"github.com/matrixorigin/matrixone/pkg/logutil"
"github.com/matrixorigin/matrixone/pkg/objectio"
Expand Down Expand Up @@ -79,37 +79,69 @@ func (h *Handle) prefetchDeleteRowID(_ context.Context, req *db.WriteReq) error
return err
}
}

logCNCommittedObjects(true, req.TableID, req.TableName, req.TombstoneStats)

return nil
}

func (h *Handle) prefetchMetadata(_ context.Context, req *db.WriteReq) (int, error) {
if len(req.MetaLocs) == 0 {
return 0, nil
func (h *Handle) prefetchMetadata(_ context.Context, req *db.WriteReq) error {
if len(req.DataObjectStats) == 0 {
return nil
}

//start loading jobs asynchronously,should create a new root context.
objCnt := 0
var objectName objectio.ObjectNameShort
for _, meta := range req.MetaLocs {
loc, err := blockio.EncodeLocationFromString(meta)
for _, stats := range req.DataObjectStats {
location := stats.BlockLocation(uint16(0), objectio.BlockMaxRows)
err := blockio.PrefetchMeta(h.db.Opts.SID, h.db.Runtime.Fs.Service, location)
if err != nil {
return 0, err
}
if !objectio.IsSameObjectLocVsShort(loc, &objectName) {
err := blockio.PrefetchMeta(h.db.Opts.SID, h.db.Runtime.Fs.Service, loc)
if err != nil {
return 0, err
}
objCnt++
objectName = *loc.Name().Short()
return err
}
}

logCNCommittedObjects(false, req.TableID, req.TableName, req.DataObjectStats)

return nil
}

func logCNCommittedObjects(
isTombstone bool,
tableId uint64,
tableName string,
statsList []objectio.ObjectStats) {

totalBlkCnt := 0
totalRowCnt := 0
totalOSize := float64(0)
totalCSize := float64(0)
var objNames = make([]string, 0, len(statsList))
for _, stats := range statsList {
totalBlkCnt += int(stats.BlkCnt())
totalRowCnt += int(stats.Rows())
totalCSize += float64(stats.Size())
totalOSize += float64(stats.OriginSize())
objNames = append(objNames, stats.ObjectName().ObjectId().ShortStringEx())
}

totalCSize /= 1024.0 * 1024.0
totalOSize /= 1024.0 * 1024.0

hint := "CN-COMMIT-S3-Data-Object"
if isTombstone {
hint = "CN-COMMIT-S3-Tombstone-Object"
}

logutil.Info(
"CN-COMMIT-S3",
zap.Int("table-id", int(req.TableID)),
zap.String("table-name", req.TableName),
zap.Int("obj-cnt", objCnt),
hint,
zap.Int("table-id", int(tableId)),
zap.String("table-name", tableName),
zap.Int("obj-cnt", len(statsList)),
zap.String("obj-osize", fmt.Sprintf("%.6fmb", totalOSize)),
zap.String("obj-csize", fmt.Sprintf("%.6fmb", totalCSize)),
zap.Int("blk-cnt", totalBlkCnt),
zap.Int("row-cnt", totalRowCnt),
zap.Strings("names", objNames),
)
return objCnt, nil
}

// TryPrefetchTxn only prefetch data written by CN, do not change the state machine of TxnEngine.
Expand All @@ -133,16 +165,15 @@ func (h *Handle) TryPrefetchTxn(ctx context.Context, meta txn.TxnMeta) error {
if r, ok := e.(*db.WriteReq); ok && r.FileName != "" {
if r.Type == db.EntryDelete {
// start to load deleted row ids
deltaLocCnt += 1
deltaLocCnt += len(r.TombstoneStats)
if err := h.prefetchDeleteRowID(ctx, r); err != nil {
return err
}
} else if r.Type == db.EntryInsert {
objCnt, err := h.prefetchMetadata(ctx, r)
if err != nil {
metaLocCnt += len(r.DataObjectStats)
if err := h.prefetchMetadata(ctx, r); err != nil {
return err
}
metaLocCnt += objCnt
}
}
}
Expand Down
Loading