Skip to content

Commit

Permalink
Support restore L0 segments (#357)
Browse files Browse the repository at this point in the history
* Support L0 segments with no specific partition

Signed-off-by: wayblink <[email protected]>

* Support parititon L0 segments

Signed-off-by: wayblink <[email protected]>

* Remove useless code

Signed-off-by: wayblink <[email protected]>

* fix miss backup new flushed l0 segment

---------

Signed-off-by: wayblink <[email protected]>
  • Loading branch information
wayblink authored Jun 13, 2024
1 parent aa4084a commit aaadb04
Show file tree
Hide file tree
Showing 20 changed files with 562 additions and 857 deletions.
8 changes: 5 additions & 3 deletions core/backup_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ const (
COLLECTION_RENAME_SUFFIX = "COLLECTION_RENAME_SUFFIX"
RPS = 1000
BackupSegmentGroupMaxSizeInMB = 256

GC_Warn_Message = "This warn won't fail the backup process. Pause GC can protect data not to be GCed during backup, it is necessary to backup very large data(cost more than a hour)."
)

// makes sure BackupContext implements `Backup`
Expand Down Expand Up @@ -270,15 +272,15 @@ func (b *BackupContext) GetBackup(ctx context.Context, request *backuppb.GetBack
zap.String("backupName", request.GetBackupName()),
zap.String("backupId", request.GetBackupId()),
zap.String("bucketName", request.GetBucketName()),
zap.String("path", request.GetPath()),
zap.Any("resp", resp))
zap.String("path", request.GetPath()))
} else {
log.Info("finish GetBackupRequest",
zap.String("requestId", request.GetRequestId()),
zap.String("backupName", request.GetBackupName()),
zap.String("backupId", request.GetBackupId()),
zap.String("bucketName", request.GetBucketName()),
zap.String("path", request.GetPath()))
zap.String("path", request.GetPath()),
zap.Any("resp", resp))
}

return resp
Expand Down
183 changes: 100 additions & 83 deletions core/backup_impl_create_backup.go

Large diffs are not rendered by default.

183 changes: 116 additions & 67 deletions core/backup_impl_restore_backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
jsoniter "github.com/json-iterator/go"
gomilvus "github.com/milvus-io/milvus-sdk-go/v2/client"
"github.com/milvus-io/milvus-sdk-go/v2/entity"
"github.com/samber/lo"
"go.uber.org/atomic"
"go.uber.org/zap"

Expand Down Expand Up @@ -381,9 +382,12 @@ func (b *BackupContext) executeRestoreCollectionTask(ctx context.Context, backup
targetDBName := task.GetTargetDbName()
targetCollectionName := task.GetTargetCollectionName()
task.StateCode = backuppb.RestoreTaskStateCode_EXECUTING
log := log.With(
zap.String("backup_db_name", task.GetCollBackup().DbName),
zap.String("backup_collection_name", task.GetCollBackup().DbName),
zap.String("target_db_name", targetDBName),
zap.String("target_collection_name", targetCollectionName))
log.Info("start restore",
zap.String("db_name", targetDBName),
zap.String("collection_name", targetCollectionName),
zap.String("backupBucketName", backupBucketName),
zap.String("backupPath", backupPath))
// create collection
Expand Down Expand Up @@ -466,13 +470,9 @@ func (b *BackupContext) executeRestoreCollectionTask(ctx context.Context, backup
return task, err
}
log.Info("create collection",
zap.String("database", targetDBName),
zap.String("collectionName", targetCollectionName),
zap.Bool("hasPartitionKey", hasPartitionKey))
} else {
log.Info("skip create collection",
zap.String("database", targetDBName),
zap.String("collectionName", targetCollectionName),
zap.Bool("hasPartitionKey", hasPartitionKey))
}

Expand All @@ -484,7 +484,6 @@ func (b *BackupContext) executeRestoreCollectionTask(ctx context.Context, backup
strings.HasPrefix(err.Error(), "index doesn't exist") {
// todo
log.Info("field has no index",
zap.String("collection_name", targetCollectionName),
zap.String("field_name", field.Name))
continue
} else {
Expand All @@ -496,13 +495,10 @@ func (b *BackupContext) executeRestoreCollectionTask(ctx context.Context, backup
err = b.milvusClient.DropIndex(ctx, targetDBName, targetCollectionName, fieldIndex.Name())
if err != nil {
log.Warn("Fail to drop index",
zap.String("db", targetDBName),
zap.String("collection", targetCollectionName),
zap.Error(err))
return task, err
}
log.Info("drop index",
zap.String("collection_name", targetCollectionName),
zap.String("field_name", field.Name),
zap.String("index_name", fieldIndex.Name()))
}
Expand Down Expand Up @@ -563,15 +559,55 @@ func (b *BackupContext) executeRestoreCollectionTask(ctx context.Context, backup
}
}()

// bulk insert
copyAndBulkInsert := func(dbName, collectionName, partitionName string, files []string, isL0 bool) error {
realFiles := make([]string, len(files))
// if milvus bucket and backup bucket are not the same, should copy the data first
if !isSameBucket {
log.Info("milvus bucket and backup bucket are not the same, copy the data first", zap.Strings("files", files))
for i, file := range files {
// empty delta file, no need to copy
if file == "" {
realFiles[i] = file
} else {
log.Debug("Copy temporary restore file", zap.String("from", file), zap.String("to", tempDir+file))
err := retry.Do(ctx, func() error {
return b.getStorageClient().Copy(ctx, backupBucketName, b.milvusBucketName, file, tempDir+file)
}, retry.Sleep(2*time.Second), retry.Attempts(5))
if err != nil {
log.Error("fail to copy backup date from backup bucket to restore target milvus bucket after retry", zap.Error(err))
return err
}
realFiles[i] = tempDir + file
}
}
} else {
realFiles = files
}

err := b.executeBulkInsert(ctx, dbName, collectionName, partitionName, realFiles, int64(task.GetCollBackup().BackupTimestamp), isL0)
if err != nil {
log.Error("fail to bulk insert to partition",
zap.String("partition", partitionName),
zap.Error(err))
return err
}
return nil
}

jobIds := make([]int64, 0)
restoredSize := atomic.Int64{}

type partitionL0Segment struct {
collectionID int64
partitionName string
partitionID int64
segmentID int64
}
partitionL0Segments := make([]partitionL0Segment, 0)
for _, v := range task.GetCollBackup().GetPartitionBackups() {
partitionBackup := v
log.Info("start restore partition",
zap.String("backupCollectionName", task.GetCollBackup().GetCollectionName()),
zap.String("targetDBName", targetDBName),
zap.String("targetCollectionName", targetCollectionName),
zap.String("partition", partitionBackup.GetPartitionName()))
log.Info("start restore partition", zap.String("partition", partitionBackup.GetPartitionName()))
// pre-check whether partition exist, if not create it
exist, err := b.getMilvusClient().HasPartition(ctx, targetDBName, targetCollectionName, partitionBackup.GetPartitionName())
if err != nil {
Expand All @@ -586,25 +622,28 @@ func (b *BackupContext) executeRestoreCollectionTask(ctx context.Context, backup
log.Error("fail to create partition", zap.Error(err))
return task, err
}
log.Info("create partition",
zap.String("collectionName", targetCollectionName),
zap.String("partitionName", partitionBackup.GetPartitionName()))
log.Info("create partition", zap.String("partitionName", partitionBackup.GetPartitionName()))
}

type restoreGroup struct {
files []string
size int64
}
restoreFileGroups := make([]restoreGroup, 0)
groupIds := collectGroupIdsFromSegments(partitionBackup.GetSegmentBackups())

var l0Segments []*backuppb.SegmentBackupInfo = lo.Filter(partitionBackup.GetSegmentBackups(), func(segment *backuppb.SegmentBackupInfo, _ int) bool {
return segment.IsL0
})
notl0Segments := lo.Filter(partitionBackup.GetSegmentBackups(), func(segment *backuppb.SegmentBackupInfo, _ int) bool {
return !segment.IsL0
})
groupIds := collectGroupIdsFromSegments(notl0Segments)
if len(groupIds) == 1 && groupIds[0] == 0 {
// backward compatible old backup without group id
files, size, err := b.getBackupPartitionPaths(ctx, backupBucketName, backupPath, partitionBackup)
if err != nil {
log.Error("fail to get partition backup binlog files",
zap.Error(err),
zap.String("backupCollectionName", task.GetCollBackup().GetCollectionName()),
zap.String("targetCollectionName", targetCollectionName),
zap.String("partition", partitionBackup.GetPartitionName()))
return task, err
}
Expand All @@ -616,58 +655,17 @@ func (b *BackupContext) executeRestoreCollectionTask(ctx context.Context, backup
if err != nil {
log.Error("fail to get partition backup binlog files",
zap.Error(err),
zap.String("backupCollectionName", task.GetCollBackup().GetCollectionName()),
zap.String("targetCollectionName", targetCollectionName),
zap.String("partition", partitionBackup.GetPartitionName()))
return task, err
}
restoreFileGroups = append(restoreFileGroups, restoreGroup{files: files, size: size})
}
}

// bulk insert
copyAndBulkInsert := func(files []string) error {
realFiles := make([]string, len(files))
// if milvus bucket and backup bucket are not the same, should copy the data first
if !isSameBucket {
log.Info("milvus bucket and backup bucket are not the same, copy the data first", zap.Strings("files", files))
for i, file := range files {
// empty delta file, no need to copy
if file == "" {
realFiles[i] = file
} else {
log.Debug("Copy temporary restore file", zap.String("from", file), zap.String("to", tempDir+file))
err := retry.Do(ctx, func() error {
return b.getStorageClient().Copy(ctx, backupBucketName, b.milvusBucketName, file, tempDir+file)
}, retry.Sleep(2*time.Second), retry.Attempts(5))
if err != nil {
log.Error("fail to copy backup date from backup bucket to restore target milvus bucket after retry", zap.Error(err))
return err
}
realFiles[i] = tempDir + file
}
}
} else {
realFiles = files
}

err := b.executeBulkInsert(ctx, targetDBName, targetCollectionName, partitionBackup.GetPartitionName(), realFiles, int64(task.GetCollBackup().BackupTimestamp))
if err != nil {
log.Error("fail to bulk insert to partition",
zap.String("backupCollectionName", task.GetCollBackup().GetCollectionName()),
zap.String("targetDBName", targetDBName),
zap.String("targetCollectionName", targetCollectionName),
zap.String("partition", partitionBackup.GetPartitionName()),
zap.Error(err))
return err
}
return nil
}

for _, value := range restoreFileGroups {
group := value
job := func(ctx context.Context) error {
err := copyAndBulkInsert(group.files)
err := copyAndBulkInsert(targetDBName, targetCollectionName, partitionBackup.GetPartitionName(), group.files, false)
if err != nil {
return err
} else {
Expand All @@ -680,9 +678,52 @@ func (b *BackupContext) executeRestoreCollectionTask(ctx context.Context, backup
jobId := b.getRestoreWorkerPool(parentTaskID).SubmitWithId(job)
jobIds = append(jobIds, jobId)
}

if len(l0Segments) > 0 {
for _, segment := range l0Segments {
partitionL0Segments = append(partitionL0Segments, partitionL0Segment{
collectionID: segment.CollectionId,
partitionName: partitionBackup.GetPartitionName(),
partitionID: segment.GetPartitionId(),
segmentID: segment.GetSegmentId(),
})
}
}
}

err := b.getRestoreWorkerPool(parentTaskID).WaitJobs(jobIds)
if err != nil {
return task, err
}

// restore l0 segments
l0JobIds := make([]int64, 0)
log.Info("start restore l0 segments", zap.Int("global_l0_segment_num", len(task.GetCollBackup().GetL0Segments())), zap.Int("partition_l0_segment_num", len(partitionL0Segments)))
for _, v := range partitionL0Segments {
segmentBackup := v
job := func(ctx context.Context) error {
l0Files := fmt.Sprintf("%s/%s/%s/%d/%d/%d", backupPath, BINGLOG_DIR, DELTA_LOG_DIR, segmentBackup.collectionID, segmentBackup.partitionID, segmentBackup.segmentID)
log.Info("restore l0 segment ", zap.String("files", l0Files))
return copyAndBulkInsert(targetDBName, targetCollectionName, segmentBackup.partitionName, []string{l0Files}, true)
}
jobId := b.getRestoreWorkerPool(parentTaskID).SubmitWithId(job)
l0JobIds = append(l0JobIds, jobId)
}

if len(task.GetCollBackup().GetL0Segments()) > 0 {
for _, v := range task.GetCollBackup().GetL0Segments() {
segment := v
job := func(ctx context.Context) error {
l0Files := fmt.Sprintf("%s/%s/%s/%d/%d/%d", backupPath, BINGLOG_DIR, DELTA_LOG_DIR, task.CollBackup.CollectionId, -1, segment.GetSegmentId())
log.Info("restore l0 segment ", zap.String("files", l0Files))
return copyAndBulkInsert(targetDBName, targetCollectionName, "", []string{l0Files}, true)
}
jobId := b.getRestoreWorkerPool(parentTaskID).SubmitWithId(job)
l0JobIds = append(l0JobIds, jobId)
}
}
err = b.getRestoreWorkerPool(parentTaskID).WaitJobs(l0JobIds)

return task, err
}

Expand All @@ -698,7 +739,7 @@ func collectGroupIdsFromSegments(segments []*backuppb.SegmentBackupInfo) []int64
return res
}

func (b *BackupContext) executeBulkInsert(ctx context.Context, db, coll string, partition string, files []string, endTime int64) error {
func (b *BackupContext) executeBulkInsert(ctx context.Context, db, coll string, partition string, files []string, endTime int64, isL0 bool) error {
log.Info("execute bulk insert",
zap.String("db", db),
zap.String("collection", coll),
Expand All @@ -707,10 +748,18 @@ func (b *BackupContext) executeBulkInsert(ctx context.Context, db, coll string,
zap.Int64("endTime", endTime))
var taskId int64
var err error
if endTime == 0 {
taskId, err = b.getMilvusClient().BulkInsert(ctx, db, coll, partition, files, gomilvus.IsBackup())
if isL0 {
if endTime == 0 {
taskId, err = b.getMilvusClient().BulkInsert(ctx, db, coll, partition, files, gomilvus.IsL0())
} else {
taskId, err = b.getMilvusClient().BulkInsert(ctx, db, coll, partition, files, gomilvus.IsL0(), gomilvus.WithEndTs(endTime))
}
} else {
taskId, err = b.getMilvusClient().BulkInsert(ctx, db, coll, partition, files, gomilvus.IsBackup(), gomilvus.WithEndTs(endTime))
if endTime == 0 {
taskId, err = b.getMilvusClient().BulkInsert(ctx, db, coll, partition, files, gomilvus.IsBackup())
} else {
taskId, err = b.getMilvusClient().BulkInsert(ctx, db, coll, partition, files, gomilvus.IsBackup(), gomilvus.WithEndTs(endTime))
}
}
if err != nil {
log.Error("fail to bulk insert",
Expand Down
16 changes: 1 addition & 15 deletions core/backup_meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,21 +76,7 @@ func treeToLevel(backup *backuppb.BackupInfo) (LeveledBackupInfo, error) {
}

collectionBack.Size = collectionSize
cloneCollectionBackup := &backuppb.CollectionBackupInfo{
CollectionId: collectionBack.GetCollectionId(),
DbName: collectionBack.GetDbName(),
CollectionName: collectionBack.GetCollectionName(),
Schema: collectionBack.GetSchema(),
ShardsNum: collectionBack.GetShardsNum(),
ConsistencyLevel: collectionBack.GetConsistencyLevel(),
BackupTimestamp: collectionBack.GetBackupTimestamp(),
Size: collectionBack.GetSize(),
HasIndex: collectionBack.GetHasIndex(),
IndexInfos: collectionBack.GetIndexInfos(),
LoadState: collectionBack.GetLoadState(),
BackupPhysicalTimestamp: collectionBack.GetBackupPhysicalTimestamp(),
ChannelCheckpoints: collectionBack.GetChannelCheckpoints(),
}
cloneCollectionBackup := proto.Clone(collectionBack).(*backuppb.CollectionBackupInfo)
collections = append(collections, cloneCollectionBackup)
backupSize = backupSize + collectionSize
}
Expand Down
22 changes: 22 additions & 0 deletions core/backup_meta_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,12 @@ func setCollectionSize(size int64) CollectionOpt {
}
}

func setL0Segments(segments []*backuppb.SegmentBackupInfo) CollectionOpt {
return func(collection *backuppb.CollectionBackupInfo) {
collection.L0Segments = segments
}
}

func incCollectionSize(size int64) CollectionOpt {
return func(collection *backuppb.CollectionBackupInfo) {
collection.Size = collection.Size + size
Expand Down Expand Up @@ -361,6 +367,12 @@ func setSegmentSize(size int64) SegmentOpt {
}
}

func setSegmentL0(isL0 bool) SegmentOpt {
return func(segment *backuppb.SegmentBackupInfo) {
segment.IsL0 = isL0
}
}

func setGroupID(groupID int64) SegmentOpt {
return func(segment *backuppb.SegmentBackupInfo) {
segment.GroupId = groupID
Expand Down Expand Up @@ -426,6 +438,16 @@ func (meta *MetaManager) GetBackupBySegmentID(segmentID int64) *backuppb.BackupI
return meta.backups[backupID]
}

func (meta *MetaManager) GetBackupByCollectionID(collectionID int64) *backuppb.BackupInfo {
meta.mu.Lock()
defer meta.mu.Unlock()
backupID, exist := meta.collectionBackupReverse[collectionID]
if !exist {
return nil
}
return meta.backups[backupID]
}

func (meta *MetaManager) GetFullMeta(id string) *backuppb.BackupInfo {
meta.mu.Lock()
defer meta.mu.Unlock()
Expand Down
Loading

0 comments on commit aaadb04

Please sign in to comment.