Skip to content

Commit

Permalink
Support CDC (#340)
Browse files Browse the repository at this point in the history
Signed-off-by: wayblink <[email protected]>
  • Loading branch information
wayblink authored May 14, 2024
1 parent db2cee5 commit d6b9fcb
Show file tree
Hide file tree
Showing 8 changed files with 401 additions and 199 deletions.
54 changes: 48 additions & 6 deletions core/backup_impl_create_backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package core

import (
"context"
"encoding/json"
"errors"
"fmt"
"io/ioutil"
Expand Down Expand Up @@ -357,22 +358,37 @@ func (b *BackupContext) backupCollectionPrepare(ctx context.Context, levelInfo *
zap.String("databaseName", collectionBackup.GetDbName()),
zap.String("collectionName", collectionBackup.GetCollectionName()),
zap.Int("segmentNumBeforeFlush", len(segmentEntitiesBeforeFlush)))
newSealedSegmentIDs, flushedSegmentIDs, timeOfSeal, err := b.getMilvusClient().FlushV2(ctx, collectionBackup.GetDbName(), collectionBackup.GetCollectionName(), false)
newSealedSegmentIDs, flushedSegmentIDs, timeOfSeal, channelCPs, err := b.getMilvusClient().FlushV2(ctx, collectionBackup.GetDbName(), collectionBackup.GetCollectionName(), false)
if err != nil {
log.Error("fail to flush the collection",
zap.String("databaseName", collectionBackup.GetDbName()),
zap.String("collectionName", collectionBackup.GetCollectionName()),
zap.Error(err))
return nil, err
}

//collectionBackup.BackupTimestamp = utils.ComposeTS(timeOfSeal, 0)
collectionBackup.BackupPhysicalTimestamp = uint64(timeOfSeal)
channelCheckpoints := make(map[string]string, 0)
var maxChannelBackupTimeStamp uint64 = 0
for vch, checkpoint := range channelCPs {
channelCheckpoints[vch] = utils.Base64MsgPosition(&checkpoint)
if maxChannelBackupTimeStamp == 0 {
maxChannelBackupTimeStamp = checkpoint.GetTimestamp()
} else if maxChannelBackupTimeStamp < checkpoint.GetTimestamp() {
maxChannelBackupTimeStamp = checkpoint.GetTimestamp()
}
}
collectionBackup.ChannelCheckpoints = channelCheckpoints
collectionBackup.BackupTimestamp = maxChannelBackupTimeStamp
log.Info("flush segments",
zap.String("databaseName", collectionBackup.GetDbName()),
zap.String("collectionName", collectionBackup.GetCollectionName()),
zap.Int64s("newSealedSegmentIDs", newSealedSegmentIDs),
zap.Int64s("flushedSegmentIDs", flushedSegmentIDs),
zap.Int64("timeOfSeal", timeOfSeal))
collectionBackup.BackupTimestamp = utils.ComposeTS(timeOfSeal, 0)
collectionBackup.BackupPhysicalTimestamp = uint64(timeOfSeal)
zap.Int64("timeOfSeal", timeOfSeal),
zap.Uint64("BackupTimestamp", collectionBackup.BackupTimestamp),
zap.Any("channelCPs", channelCPs))

flushSegmentIDs := append(newSealedSegmentIDs, flushedSegmentIDs...)
segmentEntitiesAfterFlush, err := b.getMilvusClient().GetPersistentSegmentInfo(ctx, collectionBackup.GetDbName(), collectionBackup.GetCollectionName())
Expand Down Expand Up @@ -666,7 +682,12 @@ func (b *BackupContext) executeCreateBackup(ctx context.Context, request *backup
b.updateBackup(levelInfo, backupInfo)

// 7, write meta data
b.writeBackupInfoMeta(ctx, levelInfo, backupInfo.GetName())
err = b.writeBackupInfoMeta(ctx, levelInfo, backupInfo.GetName())
if err != nil {
backupInfo.StateCode = backuppb.BackupTaskStateCode_BACKUP_FAIL
backupInfo.ErrorMessage = err.Error()
return backupInfo, err
}
log.Info("finish executeCreateBackup",
zap.String("requestId", request.GetRequestId()),
zap.String("backupName", request.GetBackupName()),
Expand All @@ -675,7 +696,7 @@ func (b *BackupContext) executeCreateBackup(ctx context.Context, request *backup
return backupInfo, nil
}

func (b *BackupContext) writeBackupInfoMeta(ctx context.Context, levelBackupInfo *LeveledBackupInfo, path string) {
func (b *BackupContext) writeBackupInfoMeta(ctx context.Context, levelBackupInfo *LeveledBackupInfo, path string) error {
backupInfo, _ := levelToTree(levelBackupInfo)
log.Info("Final backupInfo", zap.String("backupInfo", backupInfo.String()))
output, _ := serialize(backupInfo)
Expand All @@ -684,16 +705,37 @@ func (b *BackupContext) writeBackupInfoMeta(ctx context.Context, levelBackupInfo
log.Debug("partition meta", zap.String("value", string(output.PartitionMetaBytes)))
log.Debug("segment meta", zap.String("value", string(output.SegmentMetaBytes)))

collectionBackups := backupInfo.GetCollectionBackups()
collectionPositions := make(map[string][]*backuppb.ChannelPosition, 0)
for _, collectionBackup := range collectionBackups {
collectionCPs := make([]*backuppb.ChannelPosition, 0)
for vCh, position := range collectionBackup.GetChannelCheckpoints() {
pCh := strings.Split(vCh, "_")[0] + "_" + strings.Split(vCh, "_")[1]
collectionCPs = append(collectionCPs, &backuppb.ChannelPosition{
Name: pCh,
Position: position,
})
}
collectionPositions[collectionBackup.GetCollectionName()] = collectionCPs
}
channelCPsBytes, err := json.Marshal(collectionPositions)
if err != nil {
return err
}
log.Debug("channel cp meta", zap.String("value", string(channelCPsBytes)))

b.getStorageClient().Write(ctx, b.backupBucketName, BackupMetaPath(b.backupRootPath, path), output.BackupMetaBytes)
b.getStorageClient().Write(ctx, b.backupBucketName, CollectionMetaPath(b.backupRootPath, path), output.CollectionMetaBytes)
b.getStorageClient().Write(ctx, b.backupBucketName, PartitionMetaPath(b.backupRootPath, path), output.PartitionMetaBytes)
b.getStorageClient().Write(ctx, b.backupBucketName, SegmentMetaPath(b.backupRootPath, path), output.SegmentMetaBytes)
b.getStorageClient().Write(ctx, b.backupBucketName, FullMetaPath(b.backupRootPath, path), output.FullMetaBytes)
b.getStorageClient().Write(ctx, b.backupBucketName, ChannelCPMetaPath(b.backupRootPath, backupInfo.GetName()), channelCPsBytes)

log.Info("finish writeBackupInfoMeta",
zap.String("path", path),
zap.String("backupName", backupInfo.GetName()),
zap.String("backup meta", string(output.BackupMetaBytes)))
return nil
}

func (b *BackupContext) copySegments(ctx context.Context, segments []*backuppb.SegmentBackupInfo, levelInfo *LeveledBackupInfo) error {
Expand Down
6 changes: 6 additions & 0 deletions core/backup_meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ const (
PARTITION_META_FILE = "partition_meta.json"
SEGMENT_META_FILE = "segment_meta.json"
FULL_META_FILE = "full_meta.json"
CP_META_FILE = "channel_cp_meta.json"
SEPERATOR = "/"

BINGLOG_DIR = "binlogs"
Expand Down Expand Up @@ -101,6 +102,7 @@ func treeToLevel(backup *backuppb.BackupInfo) (LeveledBackupInfo, error) {
IndexInfos: collectionBack.GetIndexInfos(),
LoadState: collectionBack.GetLoadState(),
BackupPhysicalTimestamp: collectionBack.GetBackupPhysicalTimestamp(),
ChannelCheckpoints: collectionBack.GetChannelCheckpoints(),
}
collections = append(collections, cloneCollectionBackup)
backupSize = backupSize + collectionSize
Expand Down Expand Up @@ -273,6 +275,10 @@ func FullMetaPath(backupRootPath, backupName string) string {
return BackupMetaDirPath(backupRootPath, backupName) + SEPERATOR + FULL_META_FILE
}

func ChannelCPMetaPath(backupRootPath, backupName string) string {
return BackupMetaDirPath(backupRootPath, backupName) + SEPERATOR + CP_META_FILE
}

func BackupBinlogDirPath(backupRootPath, backupName string) string {
return backupRootPath + SEPERATOR + backupName + SEPERATOR + BINGLOG_DIR
}
Expand Down
5 changes: 3 additions & 2 deletions core/milvus_sdk_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package core

import (
"context"
"github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
gomilvus "github.com/milvus-io/milvus-sdk-go/v2/client"
"github.com/milvus-io/milvus-sdk-go/v2/entity"
"github.com/zilliztech/milvus-backup/internal/util/retry"
Expand Down Expand Up @@ -81,12 +82,12 @@ func (m *MilvusClient) GetPersistentSegmentInfo(ctx context.Context, db, collNam
return m.client.GetPersistentSegmentInfo(ctx, collName)
}

func (m *MilvusClient) FlushV2(ctx context.Context, db, collName string, async bool) ([]int64, []int64, int64, error) {
func (m *MilvusClient) FlushV2(ctx context.Context, db, collName string, async bool) ([]int64, []int64, int64, map[string]msgpb.MsgPosition, error) {
m.mu.Lock()
defer m.mu.Unlock()
err := m.client.UsingDatabase(ctx, db)
if err != nil {
return nil, nil, 0, err
return nil, nil, 0, nil, err
}
return m.client.FlushV2(ctx, collName, async)
}
Expand Down
13 changes: 13 additions & 0 deletions core/proto/backup.proto
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ message CollectionBackupInfo {
string load_state = 18;
// physical unix time of backup
uint64 backup_physical_timestamp = 19;
map<string, string> channel_checkpoints = 20;
}

message PartitionBackupInfo {
Expand Down Expand Up @@ -456,3 +457,15 @@ message CheckResponse {
// error msg if fail
string msg = 2;
}

message MsgPosition {
string channel_name = 1;
bytes msgID = 2;
string msgGroup = 3;
uint64 timestamp = 4;
}

message ChannelPosition {
string name = 1;
string position = 2;
}
Loading

0 comments on commit d6b9fcb

Please sign in to comment.