Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

not record blocks info after CN written s3 #16649

6 changes: 3 additions & 3 deletions pkg/sql/colexec/insert/insert.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,12 +247,12 @@ func (arg *Argument) insert_table(proc *process.Process) (vm.CallResult, error)

// Collect all partition subtables' s3writers metaLoc information and output it
func collectAndOutput(proc *process.Process, s3Writers []*colexec.S3Writer, result *vm.CallResult) (err error) {
attrs := []string{catalog.BlockMeta_TableIdx_Insert, catalog.BlockMeta_BlockInfo, catalog.ObjectMeta_ObjectStats}
attrs := []string{catalog.BlockMeta_TableIdx_Insert, catalog.ObjectMeta_ObjectStats}
res := batch.NewWithSize(len(attrs))
res.SetAttributes(attrs)
res.Vecs[0] = proc.GetVector(types.T_int16.ToType())
res.Vecs[1] = proc.GetVector(types.T_text.ToType())
res.Vecs[2] = proc.GetVector(types.T_binary.ToType())
//res.Vecs[1] = proc.GetVector(types.T_text.ToType())
res.Vecs[1] = proc.GetVector(types.T_binary.ToType())

for _, w := range s3Writers {
//deep copy.
Expand Down
30 changes: 20 additions & 10 deletions pkg/sql/colexec/mergeblock/mergeblock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,20 @@ func TestMergeBlock(t *testing.T) {
name1 := objectio.BuildObjectName(segmentid, 0)
name2 := objectio.BuildObjectName(segmentid, 1)
name3 := objectio.BuildObjectName(segmentid, 2)
loc1 := blockio.EncodeLocation(name1, objectio.Extent{}, 15, 0)
loc2 := blockio.EncodeLocation(name2, objectio.Extent{}, 15, 0)
loc3 := blockio.EncodeLocation(name3, objectio.Extent{}, 15, 0)

rowCnt := uint32(15)
loc1 := blockio.EncodeLocation(name1, objectio.Extent{}, rowCnt, 0)
loc2 := blockio.EncodeLocation(name2, objectio.Extent{}, rowCnt, 0)
loc3 := blockio.EncodeLocation(name3, objectio.Extent{}, rowCnt, 0)

var stats1, stats2, stats3 objectio.ObjectStats
objectio.SetObjectStatsObjectName(&stats1, name1)
objectio.SetObjectStatsObjectName(&stats2, name2)
objectio.SetObjectStatsObjectName(&stats3, name3)

objectio.SetObjectStatsRowCnt(&stats1, rowCnt)
objectio.SetObjectStatsRowCnt(&stats2, rowCnt)
objectio.SetObjectStatsRowCnt(&stats3, rowCnt)

sid1 := loc1.Name().SegmentId()
blkInfo1 := objectio.BlockInfo{
Expand Down Expand Up @@ -102,9 +113,9 @@ func TestMergeBlock(t *testing.T) {
string(objectio.EncodeBlockInfo(blkInfo3))},
nil),
testutil.MakeTextVector([]string{
string(objectio.ZeroObjectStats[:]),
string(objectio.ZeroObjectStats[:]),
string(objectio.ZeroObjectStats[:])},
string(stats1.Marshal()),
string(stats2.Marshal()),
string(stats3.Marshal())},
nil),
},
Cnt: 1,
Expand Down Expand Up @@ -133,20 +144,19 @@ func TestMergeBlock(t *testing.T) {
// argument1.Prepare(proc)
_, err := argument1.Call(proc)
require.NoError(t, err)
require.Equal(t, uint64(15*3), argument1.affectedRows)
require.Equal(t, uint64(rowCnt*3), argument1.affectedRows)
// Check Tbl
{
result := argument1.container.source.(*mockRelation).result
// check attr names
require.True(t, reflect.DeepEqual(
[]string{catalog.BlockMeta_BlockInfo, catalog.ObjectMeta_ObjectStats},
[]string{catalog.ObjectMeta_ObjectStats},
result.Attrs,
))
// check vector
require.Equal(t, 2, len(result.Vecs))
require.Equal(t, 1, len(result.Vecs))
//for i, vec := range result.Vecs {
require.Equal(t, 3, result.Vecs[0].Length(), fmt.Sprintf("column number: %d", 0))
require.Equal(t, 3, result.Vecs[1].Length(), fmt.Sprintf("column number: %d", 1))
//}
}
// Check UniqueTables
Expand Down
124 changes: 69 additions & 55 deletions pkg/sql/colexec/mergeblock/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
package mergeblock

import (
"fmt"

"github.com/matrixorigin/matrixone/pkg/catalog"
"github.com/matrixorigin/matrixone/pkg/common/reuse"
"github.com/matrixorigin/matrixone/pkg/container/batch"
Expand Down Expand Up @@ -122,89 +124,107 @@ func (arg *Argument) Free(proc *process.Process, pipelineFailed bool, err error)
}

func (arg *Argument) GetMetaLocBat(src *batch.Batch, proc *process.Process) {
var typs []types.Type
// exclude the table id column
attrs := src.Attrs[1:]

for idx := 1; idx < len(src.Vecs); idx++ {
typs = append(typs, *src.Vecs[idx].GetType())
}

// src comes from old CN which haven't object stats column
if src.Attrs[len(src.Attrs)-1] != catalog.ObjectMeta_ObjectStats {
attrs = append(attrs, catalog.ObjectMeta_ObjectStats)
typs = append(typs, types.T_binary.ToType())
}
// [idx | object stats]
// [idx | blk info]
// [idx | blk info | object stats]

// If the target is a partition table
if len(arg.container.partitionSources) > 0 {
// 'i' aligns with partition number
for i := range arg.container.partitionSources {
bat := batch.NewWithSize(len(attrs))
bat.Attrs = attrs
bat := batch.NewWithSize(1)
bat.Attrs = []string{catalog.ObjectMeta_ObjectStats}
bat.Cnt = 1
for idx := 0; idx < len(attrs); idx++ {
bat.Vecs[idx] = proc.GetVector(typs[idx])
}
bat.Vecs[0] = proc.GetVector(types.T_binary.ToType())
arg.container.mp[i] = bat
}
} else {
bat := batch.NewWithSize(len(attrs))
bat.Attrs = attrs
bat := batch.NewWithSize(1)
bat.Attrs = []string{catalog.ObjectMeta_ObjectStats}
bat.Cnt = 1
for idx := 0; idx < len(attrs); idx++ {
bat.Vecs[idx] = proc.GetVector(typs[idx])
}
bat.Vecs[0] = proc.GetVector(types.T_binary.ToType())
arg.container.mp[0] = bat
}
}

func splitObjectStats(arg *Argument, proc *process.Process,
bat *batch.Batch, blkVec *vector.Vector, tblIdx []int16,
) error {
// bat comes from old CN, no object stats vec in it.
func splitObjectStats(arg *Argument, proc *process.Process, bat *batch.Batch) error {
var (
err error
fs fileservice.FileService
needLoad bool
statsIdx int
objStats objectio.ObjectStats
blkVec *vector.Vector
tblIdx []int16
statsVec *vector.Vector
batLen = len(bat.Attrs)
objDataMeta objectio.ObjectDataMeta
)

tblIdx = vector.MustFixedCol[int16](bat.GetVector(0))

// bat comes from old CN, may has not object stats vec in it.
// to ensure all bats the TN received contain the object stats column, we should
// construct the object stats from block info here.
needLoad := bat.Attrs[len(bat.Attrs)-1] != catalog.ObjectMeta_ObjectStats

fs, err := fileservice.Get[fileservice.FileService](proc.FileService, defines.SharedFileServiceName)
if err != nil {
logutil.Error("get fs failed when split object stats. ", zap.Error(err))
return err
//
// bat comes from different version:
// V1 [table index | blk info]
// V2 [table index | blk info | object stats ]
// V3 [table index | object stats]
// only V1 need to construct the object stats from block info
//
if bat.Attrs[batLen-1] == catalog.ObjectMeta_ObjectStats {
// V2, V3
statsVec = bat.GetVector(int32(batLen - 1))
} else if bat.Attrs[batLen-1] == catalog.BlockMeta_MetaLoc {
// V1
needLoad = true
blkVec = bat.GetVector(int32(batLen - 1))
objDataMeta = objectio.BuildObjectMeta(uint16(blkVec.Length()))
fs, err = fileservice.Get[fileservice.FileService](proc.FileService, defines.SharedFileServiceName)
if err != nil {
logutil.Error("get fs failed when split object stats. ", zap.Error(err))
return err
}
} else {
panic(fmt.Sprintf("splitObjectStats got wrong bat: [attrs=%s, rows=%d, cnt=%d]\n",
bat.Attrs, bat.RowCount(), bat.Cnt))
}

objDataMeta := objectio.BuildObjectMeta(uint16(blkVec.Length()))
var objStats objectio.ObjectStats
statsVec := bat.Vecs[2]
statsIdx := 0

for idx := 0; idx < len(tblIdx); idx++ {
if tblIdx[idx] < 0 {
// will the data and blk infos mixed together in one batch?
// batch [ data | data | blk info | blk info | .... ]
continue
}

blkInfo := objectio.DecodeBlockInfo(blkVec.GetBytesAt(idx))
if objectio.IsSameObjectLocVsMeta(blkInfo.MetaLocation(), objDataMeta) {
continue
}

destVec := arg.container.mp[int(tblIdx[idx])].Vecs[1]

affectedRows := uint32(0)
destVec := arg.container.mp[int(tblIdx[idx])].Vecs[0]
// comes from old version cn without object stats
if needLoad {
// comes from old version cn
blkInfo := objectio.DecodeBlockInfo(blkVec.GetBytesAt(idx))
if objectio.IsSameObjectLocVsMeta(blkInfo.MetaLocation(), objDataMeta) {
continue
}

objStats, objDataMeta, err = disttae.ConstructObjStatsByLoadObjMeta(proc.Ctx, blkInfo.MetaLocation(), fs)
if err != nil {
return err
}

affectedRows = objStats.Rows()
vector.AppendBytes(destVec, objStats.Marshal(), false, proc.GetMPool())
} else {
// with object stats in it
// not comes from old version cn
vector.AppendBytes(destVec, statsVec.GetBytesAt(statsIdx), false, proc.GetMPool())
objDataMeta.BlockHeader().SetBlockID(&blkInfo.BlockID)
stats := objectio.ObjectStats(statsVec.GetBytesAt(statsIdx))
vector.AppendBytes(destVec, stats.Marshal(), false, proc.GetMPool())
statsIdx++
affectedRows = uint32(stats.Rows())
}

if arg.AddAffectedRows {
arg.affectedRows += uint64(affectedRows)
}
}

Expand All @@ -220,12 +240,6 @@ func (arg *Argument) Split(proc *process.Process, bat *batch.Batch) error {
hasObject := false
for i := range tblIdx { // append s3 writer returned blk info
if tblIdx[i] >= 0 {
if arg.AddAffectedRows {
blkInfo := objectio.DecodeBlockInfo(blkInfosVec.GetBytesAt(i))
arg.affectedRows += uint64(blkInfo.MetaLocation().Rows())
}
vector.AppendBytes(arg.container.mp[int(tblIdx[i])].Vecs[0],
blkInfosVec.GetBytesAt(i), false, proc.GetMPool())
hasObject = true
} else { // append data
idx := int(-(tblIdx[i] + 1))
Expand All @@ -243,7 +257,7 @@ func (arg *Argument) Split(proc *process.Process, bat *batch.Batch) error {

// exist blk info, split it
if hasObject {
if err := splitObjectStats(arg, proc, bat, blkInfosVec, tblIdx); err != nil {
if err := splitObjectStats(arg, proc, bat); err != nil {
return err
}
}
Expand Down
Loading
Loading