Skip to content

Commit

Permalink
support maxShardNum parameter in restore request (#428)
Browse files Browse the repository at this point in the history
Signed-off-by: wayblink <[email protected]>
  • Loading branch information
wayblink authored Sep 20, 2024
1 parent ab69696 commit 2906920
Show file tree
Hide file tree
Showing 7 changed files with 285 additions and 222 deletions.
4 changes: 2 additions & 2 deletions core/backup_impl_create_backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -470,7 +470,7 @@ func (b *BackupContext) backupCollectionPrepare(ctx context.Context, backupInfo
}

newSegIDs := lo.Map(unfilledSegments, func(segment *entity.Segment, _ int) int64 { return segment.ID })
log.Info("Finished fill segment",
log.Debug("Finished fill segment",
zap.String("databaseName", collectionBackup.GetDbName()),
zap.String("collectionName", collectionBackup.GetCollectionName()),
zap.Int64s("segments", newSegIDs))
Expand Down Expand Up @@ -741,7 +741,7 @@ func (b *BackupContext) executeCreateBackup(ctx context.Context, request *backup
backupInfo.ErrorMessage = err.Error()
return err
}
log.Info("finish executeCreateBackup",
log.Info("finish backup all collections",
zap.String("requestId", request.GetRequestId()),
zap.String("backupName", request.GetBackupName()),
zap.Strings("collections", request.GetCollectionNames()),
Expand Down
28 changes: 23 additions & 5 deletions core/backup_impl_restore_backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ func (b *BackupContext) RestoreBackup(ctx context.Context, request *backuppb.Res
zap.String("bucketName", request.GetBucketName()),
zap.String("path", request.GetPath()),
zap.String("databaseCollections", utils.GetRestoreDBCollections(request)),
zap.Bool("skipDiskQuotaCheck", request.GetSkipImportDiskQuotaCheck()))
zap.Bool("skipDiskQuotaCheck", request.GetSkipImportDiskQuotaCheck()),
zap.Int32("maxShardNum", request.GetMaxShardNum()))

resp := &backuppb.RestoreBackupResponse{
RequestId: request.GetRequestId(),
Expand Down Expand Up @@ -311,6 +312,7 @@ func (b *BackupContext) RestoreBackup(ctx context.Context, request *backuppb.Res
DropExistIndex: request.GetDropExistIndex(),
SkipCreateCollection: request.GetSkipCreateCollection(),
SkipDiskQuotaCheck: request.GetSkipImportDiskQuotaCheck(),
MaxShardNum: request.GetMaxShardNum(),
}
restoreCollectionTasks = append(restoreCollectionTasks, restoreCollectionTask)
task.CollectionRestoreTasks = restoreCollectionTasks
Expand Down Expand Up @@ -388,7 +390,15 @@ func (b *BackupContext) executeRestoreBackupTask(ctx context.Context, backupBuck
return task, err
}

b.meta.UpdateRestoreTask(id, setRestoreStateCode(backuppb.RestoreTaskStateCode_SUCCESS), setRestoreEndTime(time.Now().Unix()))
endTime := time.Now().Unix()
task.EndTime = endTime
b.meta.UpdateRestoreTask(id, setRestoreStateCode(backuppb.RestoreTaskStateCode_SUCCESS), setRestoreEndTime(endTime))

log.Info("finish restore all collections",
zap.String("backupName", backup.GetName()),
zap.Int("collections", len(backup.GetCollectionBackups())),
zap.String("taskID", task.GetId()),
zap.Int64("duration in seconds", task.GetEndTime()-task.GetStartTime()))
return task, nil
}

Expand All @@ -401,7 +411,8 @@ func (b *BackupContext) executeRestoreCollectionTask(ctx context.Context, backup
zap.String("backup_collection_name", task.GetCollBackup().DbName),
zap.String("target_db_name", targetDBName),
zap.String("target_collection_name", targetCollectionName),
zap.Bool("skipDiskQuotaCheck", task.GetSkipDiskQuotaCheck()))
zap.Bool("skipDiskQuotaCheck", task.GetSkipDiskQuotaCheck()),
zap.Int32("maxShardNum", task.GetMaxShardNum()))
log.Info("start restore",
zap.String("backupBucketName", backupBucketName),
zap.String("backupPath", backupPath))
Expand Down Expand Up @@ -460,21 +471,28 @@ func (b *BackupContext) executeRestoreCollectionTask(ctx context.Context, backup
//so here it is necessary to be compatible with the situation where SkipCreateCollection and DropExistCollection are enabled at the same time.
if !task.GetSkipCreateCollection() || task.GetDropExistCollection() {
err := retry.Do(ctx, func() error {
// overwrite shardNum by request parameter
shardNum := task.GetCollBackup().GetShardsNum()
if shardNum > task.GetMaxShardNum() && task.GetMaxShardNum() != 0 {
shardNum = task.GetMaxShardNum()
log.Info("overwrite shardNum by request parameter", zap.Int32("oldShardNum", task.GetCollBackup().GetShardsNum()), zap.Int32("newShardNum", shardNum))

}
if hasPartitionKey {
partitionNum := len(task.GetCollBackup().GetPartitionBackups())
return b.getMilvusClient().CreateCollection(
ctx,
targetDBName,
collectionSchema,
task.GetCollBackup().GetShardsNum(),
shardNum,
gomilvus.WithConsistencyLevel(entity.ConsistencyLevel(task.GetCollBackup().GetConsistencyLevel())),
gomilvus.WithPartitionNum(int64(partitionNum)))
}
return b.getMilvusClient().CreateCollection(
ctx,
targetDBName,
collectionSchema,
task.GetCollBackup().GetShardsNum(),
shardNum,
gomilvus.WithConsistencyLevel(entity.ConsistencyLevel(task.GetCollBackup().GetConsistencyLevel())))
}, retry.Attempts(10), retry.Sleep(1*time.Second))
if err != nil {
Expand Down
4 changes: 4 additions & 0 deletions core/proto/backup.proto
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,8 @@ message RestoreBackupRequest {
bool skipImportDiskQuotaCheck = 17;
// whether restore RBAC
bool rbac = 18;
// target max shard number
int32 maxShardNum = 19;
}

message RestorePartitionTask {
Expand Down Expand Up @@ -321,6 +323,8 @@ message RestoreCollectionTask {
// if true will skip create collections
bool skipCreateCollection = 18;
bool skipDiskQuotaCheck = 19;
// target max shard number
int32 maxShardNum = 20;
}

message RestoreBackupTask {
Expand Down
Loading

0 comments on commit 2906920

Please sign in to comment.