Skip to content

Commit

Permalink
support restore progress (#354)
Browse files Browse the repository at this point in the history
Signed-off-by: wayblink <[email protected]>
  • Loading branch information
wayblink authored May 30, 2024
1 parent 969cac7 commit 98aa172
Show file tree
Hide file tree
Showing 6 changed files with 311 additions and 279 deletions.
8 changes: 7 additions & 1 deletion core/backup_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -504,10 +504,16 @@ func (b *BackupContext) GetRestore(ctx context.Context, request *backuppb.GetRes
}

task := b.meta.GetRestoreTask(request.GetId())
progress := int32(float32(task.GetRestoredSize()) * 100 / float32(task.GetToRestoreSize()))
// don't return zero
if progress == 0 {
progress = 1
}
task.Progress = progress
if task != nil {
resp.Code = backuppb.ResponseCode_Success
resp.Msg = "success"
resp.Data = UpdateRestoreBackupTask(task)
resp.Data = task
return resp
} else {
resp.Code = backuppb.ResponseCode_Fail
Expand Down
120 changes: 52 additions & 68 deletions core/backup_impl_restore_backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
jsoniter "github.com/json-iterator/go"
gomilvus "github.com/milvus-io/milvus-sdk-go/v2/client"
"github.com/milvus-io/milvus-sdk-go/v2/entity"
"go.uber.org/atomic"
"go.uber.org/zap"

"github.com/zilliztech/milvus-backup/core/proto/backuppb"
Expand Down Expand Up @@ -97,32 +98,24 @@ func (b *BackupContext) RestoreBackup(ctx context.Context, request *backuppb.Res

backup := getResp.GetData()

id := utils.UUID()
var taskID string
if request.GetId() != "" {
taskID = request.GetId()
} else {
taskID = "restore_" + fmt.Sprint(time.Now().UTC().Format("2006_01_02_15_04_05_")) + fmt.Sprint(time.Now().Nanosecond())
}

task := &backuppb.RestoreBackupTask{
Id: id,
Id: taskID,
StateCode: backuppb.RestoreTaskStateCode_INITIAL,
StartTime: time.Now().Unix(),
Progress: 0,
}
// clean thread pool
defer func() {
b.cleanRestoreWorkerPool(id)
b.cleanRestoreWorkerPool(taskID)
}()

//go func() {
// ticker := time.NewTicker(5 * time.Second)
// defer ticker.Stop()
// for {
// select {
// case <-ctx.Done():
// log.Info("background checking channels loop quit")
// return
// case <-ticker.C:
// log.Info("restore worker pool", zap.Int32("jobs_num", b.getRestoreWorkerPool(id).JobNum()))
// }
// }
//}()

// 2, initial restoreCollectionTasks
toRestoreCollectionBackups := make([]*backuppb.CollectionBackupInfo, 0)

Expand Down Expand Up @@ -308,6 +301,7 @@ func (b *BackupContext) RestoreBackup(ctx context.Context, request *backuppb.Res
task.CollectionRestoreTasks = restoreCollectionTasks
task.ToRestoreSize = task.GetToRestoreSize() + toRestoreSize
}
b.meta.AddRestoreTask(task)

if request.Async {
go b.executeRestoreBackupTask(ctx, backupBucketName, backupPath, backup, task)
Expand Down Expand Up @@ -345,17 +339,11 @@ func (b *BackupContext) executeRestoreBackupTask(ctx context.Context, backupBuck
log.Info("Start collection level restore pool", zap.Int("parallelism", b.params.BackupCfg.RestoreParallelism))

id := task.GetId()
b.meta.AddRestoreTask(task)
task.StateCode = backuppb.RestoreTaskStateCode_EXECUTING

b.meta.UpdateRestoreTask(id, setRestoreStateCode(backuppb.RestoreTaskStateCode_EXECUTING))
log.Info("executeRestoreBackupTask start",
zap.String("backup_name", backup.GetName()),
zap.String("backupBucketName", backupBucketName),
zap.String("backupPath", backupPath))
updateRestoreTaskFunc := func(id string, task *backuppb.RestoreBackupTask) {
b.meta.AddRestoreTask(task)
}
defer updateRestoreTaskFunc(id, task)

restoreCollectionTasks := task.GetCollectionRestoreTasks()

Expand All @@ -371,17 +359,11 @@ func (b *BackupContext) executeRestoreBackupTask(ctx context.Context, backupBuck
zap.Error(err))
return err
}
restoreCollectionTaskClone.StateCode = backuppb.RestoreTaskStateCode_SUCCESS
log.Info("finish restore collection",
zap.String("db_name", restoreCollectionTaskClone.GetTargetDbName()),
zap.String("collection_name", restoreCollectionTaskClone.GetTargetCollectionName()))
restoreCollectionTaskClone.StateCode = backuppb.RestoreTaskStateCode_SUCCESS
task.RestoredSize += endTask.RestoredSize
if task.GetToRestoreSize() == 0 {
task.Progress = 100
} else {
task.Progress = int32(100 * task.GetRestoredSize() / task.GetToRestoreSize())
}
updateRestoreTaskFunc(id, task)
zap.String("collection_name", restoreCollectionTaskClone.GetTargetCollectionName()),
zap.Int64("size", endTask.RestoredSize))
return nil
}
wp.Submit(job)
Expand All @@ -391,8 +373,7 @@ func (b *BackupContext) executeRestoreBackupTask(ctx context.Context, backupBuck
return task, err
}

task.StateCode = backuppb.RestoreTaskStateCode_SUCCESS
task.EndTime = time.Now().Unix()
b.meta.UpdateRestoreTask(id, setRestoreStateCode(backuppb.RestoreTaskStateCode_SUCCESS), setRestoreEndTime(time.Now().Unix()))
return task, nil
}

Expand Down Expand Up @@ -572,6 +553,7 @@ func (b *BackupContext) executeRestoreCollectionTask(ctx context.Context, backup
}()

jobIds := make([]int64, 0)
restoredSize := atomic.Int64{}
for _, v := range task.GetCollBackup().GetPartitionBackups() {
partitionBackup := v
log.Info("start restore partition",
Expand All @@ -598,11 +580,15 @@ func (b *BackupContext) executeRestoreCollectionTask(ctx context.Context, backup
zap.String("partitionName", partitionBackup.GetPartitionName()))
}

restoreFileGroups := make([][]string, 0)
type restoreGroup struct {
files []string
size int64
}
restoreFileGroups := make([]restoreGroup, 0)
groupIds := collectGroupIdsFromSegments(partitionBackup.GetSegmentBackups())
if len(groupIds) == 1 && groupIds[0] == 0 {
// backward compatible old backup without group id
files, err := b.getBackupPartitionPaths(ctx, backupBucketName, backupPath, partitionBackup)
files, size, err := b.getBackupPartitionPaths(ctx, backupBucketName, backupPath, partitionBackup)
if err != nil {
log.Error("fail to get partition backup binlog files",
zap.Error(err),
Expand All @@ -611,11 +597,11 @@ func (b *BackupContext) executeRestoreCollectionTask(ctx context.Context, backup
zap.String("partition", partitionBackup.GetPartitionName()))
return task, err
}
restoreFileGroups = append(restoreFileGroups, files)
restoreFileGroups = append(restoreFileGroups, restoreGroup{files: files, size: size})
} else {
// bulk insert by segment groups
for _, groupId := range groupIds {
files, err := b.getBackupPartitionPathsWithGroupID(ctx, backupBucketName, backupPath, partitionBackup, groupId)
files, size, err := b.getBackupPartitionPathsWithGroupID(ctx, backupBucketName, backupPath, partitionBackup, groupId)
if err != nil {
log.Error("fail to get partition backup binlog files",
zap.Error(err),
Expand All @@ -624,7 +610,7 @@ func (b *BackupContext) executeRestoreCollectionTask(ctx context.Context, backup
zap.String("partition", partitionBackup.GetPartitionName()))
return task, err
}
restoreFileGroups = append(restoreFileGroups, files)
restoreFileGroups = append(restoreFileGroups, restoreGroup{files: files, size: size})
}
}

Expand Down Expand Up @@ -668,10 +654,17 @@ func (b *BackupContext) executeRestoreCollectionTask(ctx context.Context, backup
}

for _, value := range restoreFileGroups {
files := value
group := value
job := func(ctx context.Context) error {
// todo: progress
return copyAndBulkInsert(files)
err := copyAndBulkInsert(group.files)
if err != nil {
return err
} else {
b.meta.UpdateRestoreTask(parentTaskID, addCollectionRestoredSize(task.GetCollBackup().GetCollectionId(), group.size))
restoredSize.Add(group.size)
task.RestoredSize = restoredSize.Load()
return nil
}
}
jobId := b.getRestoreWorkerPool(parentTaskID).SubmitWithId(job)
jobIds = append(jobIds, jobId)
Expand All @@ -682,23 +675,6 @@ func (b *BackupContext) executeRestoreCollectionTask(ctx context.Context, backup
return task, err
}

//func (b *BackupContext) restorePartition(ctx context.Context, targetDBName, targetCollectionName string,
// partitionBackup *backuppb.PartitionBackupInfo, task *backuppb.RestoreCollectionTask, isSameBucket bool, backupBucketName string, backupPath string, tempDir string) (*backuppb.RestoreCollectionTask, error) {
//
// if task.GetMetaOnly() {
// task.Progress = 100
// } else {
//
// task.RestoredSize = task.RestoredSize + partitionBackup.GetSize()
// if task.ToRestoreSize == 0 {
// task.Progress = 100
// } else {
// task.Progress = int32(100 * task.RestoredSize / task.ToRestoreSize)
// }
// }
// return task, nil
//}

func collectGroupIdsFromSegments(segments []*backuppb.SegmentBackupInfo) []int64 {
dict := make(map[int64]bool)
res := make([]int64, 0)
Expand Down Expand Up @@ -786,7 +762,7 @@ func (b *BackupContext) watchBulkInsertState(ctx context.Context, taskId int64,
return errors.New("import task timeout")
}

func (b *BackupContext) getBackupPartitionPaths(ctx context.Context, bucketName string, backupPath string, partition *backuppb.PartitionBackupInfo) ([]string, error) {
func (b *BackupContext) getBackupPartitionPaths(ctx context.Context, bucketName string, backupPath string, partition *backuppb.PartitionBackupInfo) ([]string, int64, error) {
log.Info("getBackupPartitionPaths",
zap.String("bucketName", bucketName),
zap.String("backupPath", backupPath),
Expand All @@ -798,19 +774,19 @@ func (b *BackupContext) getBackupPartitionPaths(ctx context.Context, bucketName
exist, err := b.getStorageClient().Exist(ctx, bucketName, deltaPath)
if err != nil {
log.Warn("check binlog exist fail", zap.Error(err))
return []string{}, err
return []string{}, 0, err
}
log.Debug("check delta log exist",
zap.Int64("partitionID", partition.PartitionId),
zap.String("deltaPath", deltaPath),
zap.Bool("exist", exist))
if !exist {
return []string{insertPath, ""}, nil
return []string{insertPath, ""}, partition.GetSize(), nil
}
return []string{insertPath, deltaPath}, nil
return []string{insertPath, deltaPath}, partition.GetSize(), nil
}

func (b *BackupContext) getBackupPartitionPathsWithGroupID(ctx context.Context, bucketName string, backupPath string, partition *backuppb.PartitionBackupInfo, groupId int64) ([]string, error) {
func (b *BackupContext) getBackupPartitionPathsWithGroupID(ctx context.Context, bucketName string, backupPath string, partition *backuppb.PartitionBackupInfo, groupId int64) ([]string, int64, error) {
log.Info("getBackupPartitionPaths",
zap.String("bucketName", bucketName),
zap.String("backupPath", backupPath),
Expand All @@ -820,13 +796,21 @@ func (b *BackupContext) getBackupPartitionPathsWithGroupID(ctx context.Context,
insertPath := fmt.Sprintf("%s/%s/%s/%v/%v/%d/", backupPath, BINGLOG_DIR, INSERT_LOG_DIR, partition.GetCollectionId(), partition.GetPartitionId(), groupId)
deltaPath := fmt.Sprintf("%s/%s/%s/%v/%v/%d/", backupPath, BINGLOG_DIR, DELTA_LOG_DIR, partition.GetCollectionId(), partition.GetPartitionId(), groupId)

var totalSize int64
for _, seg := range partition.GetSegmentBackups() {
if seg.GetGroupId() == groupId {
totalSize += seg.GetSize()
}
}

exist, err := b.getStorageClient().Exist(ctx, bucketName, deltaPath)
if err != nil {
log.Warn("check binlog exist fail", zap.Error(err))
return []string{}, err
return []string{}, 0, err
}
if !exist {
return []string{insertPath, ""}, nil
return []string{insertPath, ""}, totalSize, nil
}
return []string{insertPath, deltaPath}, nil

return []string{insertPath, deltaPath}, totalSize, nil
}
29 changes: 3 additions & 26 deletions core/backup_meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,8 @@ func SimpleBackupResponse(input *backuppb.BackupInfoResponse) *backuppb.BackupIn
func SimpleRestoreResponse(input *backuppb.RestoreBackupResponse) *backuppb.RestoreBackupResponse {
restore := input.GetData()

simpleRestore := proto.Clone(restore).(*backuppb.RestoreBackupTask)

collectionRestores := make([]*backuppb.RestoreCollectionTask, 0)
for _, coll := range restore.GetCollectionRestoreTasks() {
collectionRestores = append(collectionRestores, &backuppb.RestoreCollectionTask{
Expand All @@ -343,15 +345,7 @@ func SimpleRestoreResponse(input *backuppb.RestoreBackupResponse) *backuppb.Rest
})
}

simpleRestore := &backuppb.RestoreBackupTask{
Id: restore.GetId(),
StateCode: restore.GetStateCode(),
ErrorMessage: restore.GetErrorMessage(),
StartTime: restore.GetStartTime(),
EndTime: restore.GetEndTime(),
CollectionRestoreTasks: collectionRestores,
Progress: restore.GetProgress(),
}
simpleRestore.CollectionRestoreTasks = collectionRestores

return &backuppb.RestoreBackupResponse{
RequestId: input.GetRequestId(),
Expand All @@ -361,21 +355,4 @@ func SimpleRestoreResponse(input *backuppb.RestoreBackupResponse) *backuppb.Rest
}
}

func UpdateRestoreBackupTask(input *backuppb.RestoreBackupTask) *backuppb.RestoreBackupTask {
var storedSize int64 = 0
for _, coll := range input.GetCollectionRestoreTasks() {
storedSize += coll.GetRestoredSize()
}
if input.ToRestoreSize == 0 {
if input.StateCode == backuppb.RestoreTaskStateCode_SUCCESS {
input.Progress = 100
} else {
input.Progress = 0
}
} else {
input.Progress = int32(storedSize * 100 / input.ToRestoreSize)
}
return input
}

type DbCollections = map[string][]string
56 changes: 56 additions & 0 deletions core/backup_meta_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -449,6 +449,62 @@ func (meta *MetaManager) GetFullMeta(id string) *backuppb.BackupInfo {
return cloneBackup
}

type RestoreTaskOpt func(task *backuppb.RestoreBackupTask)

func setRestoreStateCode(stateCode backuppb.RestoreTaskStateCode) RestoreTaskOpt {
return func(task *backuppb.RestoreBackupTask) {
task.StateCode = stateCode
}
}

func setRestoreErrorMessage(errorMessage string) RestoreTaskOpt {
return func(task *backuppb.RestoreBackupTask) {
task.ErrorMessage = errorMessage
}
}

func setRestoreStartTime(startTime int64) RestoreTaskOpt {
return func(task *backuppb.RestoreBackupTask) {
task.StartTime = startTime
}
}

func setRestoreEndTime(endTime int64) RestoreTaskOpt {
return func(task *backuppb.RestoreBackupTask) {
task.EndTime = endTime
}
}

func addRestoreRestoredSize(restoredSize int64) RestoreTaskOpt {
return func(task *backuppb.RestoreBackupTask) {
task.RestoredSize = task.RestoredSize + restoredSize
}
}

func addCollectionRestoredSize(collectionID, restoredSize int64) RestoreTaskOpt {
return func(task *backuppb.RestoreBackupTask) {
task.RestoredSize = task.RestoredSize + restoredSize
for _, coll := range task.GetCollectionRestoreTasks() {
if coll.CollBackup.CollectionId == collectionID {
coll.RestoredSize = coll.RestoredSize + restoredSize
}
}
}
}

func (meta *MetaManager) UpdateRestoreTask(restoreID string, opts ...RestoreTaskOpt) {
meta.mu.Lock()
defer meta.mu.Unlock()
backup := meta.restoreTasks[restoreID]
cBackup := proto.Clone(backup).(*backuppb.RestoreBackupTask)
for _, opt := range opts {
opt(cBackup)
}
meta.restoreTasks[backup.Id] = cBackup
}

//CollectionRestoreTasks []*RestoreCollectionTask `protobuf:"bytes,6,rep,name=collection_restore_tasks,json=collectionRestoreTasks,proto3" json:"collection_restore_tasks,omitempty"`

func (meta *MetaManager) AddRestoreTask(task *backuppb.RestoreBackupTask) {
meta.mu.Lock()
defer meta.mu.Unlock()
Expand Down
1 change: 1 addition & 0 deletions core/proto/backup.proto
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,7 @@ message RestoreBackupRequest {
bool dropExistIndex = 14;
// if true, will skip collection, use when collection exist, restore index or data
bool skipCreateCollection = 15;
string id = 16;
}

message RestorePartitionTask {
Expand Down
Loading

0 comments on commit 98aa172

Please sign in to comment.