Skip to content

Commit

Permalink
Support skipImportDiskQuotaCheck option in restore (#394)
Browse files Browse the repository at this point in the history
Signed-off-by: wayblink <[email protected]>
  • Loading branch information
wayblink authored Aug 14, 2024
1 parent 1e1fc88 commit cd087e4
Show file tree
Hide file tree
Showing 8 changed files with 261 additions and 224 deletions.
33 changes: 14 additions & 19 deletions core/backup_impl_restore_backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ func (b *BackupContext) RestoreBackup(ctx context.Context, request *backuppb.Res
zap.Bool("async", request.GetAsync()),
zap.String("bucketName", request.GetBucketName()),
zap.String("path", request.GetPath()),
zap.String("databaseCollections", utils.GetRestoreDBCollections(request)))
zap.String("databaseCollections", utils.GetRestoreDBCollections(request)),
zap.Bool("skipDiskQuotaCheck", request.GetSkipImportDiskQuotaCheck()))

resp := &backuppb.RestoreBackupResponse{
RequestId: request.GetRequestId(),
Expand Down Expand Up @@ -297,6 +298,7 @@ func (b *BackupContext) RestoreBackup(ctx context.Context, request *backuppb.Res
DropExistCollection: request.GetDropExistCollection(),
DropExistIndex: request.GetDropExistIndex(),
SkipCreateCollection: request.GetSkipCreateCollection(),
SkipDiskQuotaCheck: request.GetSkipImportDiskQuotaCheck(),
}
restoreCollectionTasks = append(restoreCollectionTasks, restoreCollectionTask)
task.CollectionRestoreTasks = restoreCollectionTasks
Expand Down Expand Up @@ -386,7 +388,8 @@ func (b *BackupContext) executeRestoreCollectionTask(ctx context.Context, backup
zap.String("backup_db_name", task.GetCollBackup().DbName),
zap.String("backup_collection_name", task.GetCollBackup().DbName),
zap.String("target_db_name", targetDBName),
zap.String("target_collection_name", targetCollectionName))
zap.String("target_collection_name", targetCollectionName),
zap.Bool("skipDiskQuotaCheck", task.GetSkipDiskQuotaCheck()))
log.Info("start restore",
zap.String("backupBucketName", backupBucketName),
zap.String("backupPath", backupPath))
Expand Down Expand Up @@ -560,7 +563,7 @@ func (b *BackupContext) executeRestoreCollectionTask(ctx context.Context, backup
}()

// bulk insert
copyAndBulkInsert := func(dbName, collectionName, partitionName string, files []string, isL0 bool) error {
copyAndBulkInsert := func(dbName, collectionName, partitionName string, files []string, isL0 bool, skipDiskQuotaCheck bool) error {
realFiles := make([]string, len(files))
// if milvus bucket and backup bucket are not the same, should copy the data first
if !isSameBucket {
Expand All @@ -585,7 +588,7 @@ func (b *BackupContext) executeRestoreCollectionTask(ctx context.Context, backup
realFiles = files
}

err := b.executeBulkInsert(ctx, dbName, collectionName, partitionName, realFiles, int64(task.GetCollBackup().BackupTimestamp), isL0)
err := b.executeBulkInsert(ctx, dbName, collectionName, partitionName, realFiles, int64(task.GetCollBackup().BackupTimestamp), isL0, skipDiskQuotaCheck)
if err != nil {
log.Error("fail to bulk insert to partition",
zap.String("partition", partitionName),
Expand Down Expand Up @@ -665,7 +668,7 @@ func (b *BackupContext) executeRestoreCollectionTask(ctx context.Context, backup
for _, value := range restoreFileGroups {
group := value
job := func(ctx context.Context) error {
err := copyAndBulkInsert(targetDBName, targetCollectionName, partitionBackup.GetPartitionName(), group.files, false)
err := copyAndBulkInsert(targetDBName, targetCollectionName, partitionBackup.GetPartitionName(), group.files, false, task.GetSkipDiskQuotaCheck())
if err != nil {
return err
} else {
Expand Down Expand Up @@ -704,7 +707,7 @@ func (b *BackupContext) executeRestoreCollectionTask(ctx context.Context, backup
job := func(ctx context.Context) error {
l0Files := fmt.Sprintf("%s/%s/%s/%d/%d/%d", backupPath, BINGLOG_DIR, DELTA_LOG_DIR, segmentBackup.collectionID, segmentBackup.partitionID, segmentBackup.segmentID)
log.Info("restore l0 segment ", zap.String("files", l0Files))
return copyAndBulkInsert(targetDBName, targetCollectionName, segmentBackup.partitionName, []string{l0Files}, true)
return copyAndBulkInsert(targetDBName, targetCollectionName, segmentBackup.partitionName, []string{l0Files}, true, task.GetSkipDiskQuotaCheck())
}
jobId := b.getRestoreWorkerPool(parentTaskID).SubmitWithId(job)
l0JobIds = append(l0JobIds, jobId)
Expand All @@ -716,7 +719,7 @@ func (b *BackupContext) executeRestoreCollectionTask(ctx context.Context, backup
job := func(ctx context.Context) error {
l0Files := fmt.Sprintf("%s/%s/%s/%d/%d/%d", backupPath, BINGLOG_DIR, DELTA_LOG_DIR, task.CollBackup.CollectionId, -1, segment.GetSegmentId())
log.Info("restore l0 segment ", zap.String("files", l0Files))
return copyAndBulkInsert(targetDBName, targetCollectionName, "", []string{l0Files}, true)
return copyAndBulkInsert(targetDBName, targetCollectionName, "", []string{l0Files}, true, task.GetSkipDiskQuotaCheck())
}
jobId := b.getRestoreWorkerPool(parentTaskID).SubmitWithId(job)
l0JobIds = append(l0JobIds, jobId)
Expand All @@ -739,7 +742,7 @@ func collectGroupIdsFromSegments(segments []*backuppb.SegmentBackupInfo) []int64
return res
}

func (b *BackupContext) executeBulkInsert(ctx context.Context, db, coll string, partition string, files []string, endTime int64, isL0 bool) error {
func (b *BackupContext) executeBulkInsert(ctx context.Context, db, coll string, partition string, files []string, endTime int64, isL0 bool, skipDiskQuotaCheck bool) error {
log.Info("execute bulk insert",
zap.String("db", db),
zap.String("collection", coll),
Expand All @@ -748,18 +751,10 @@ func (b *BackupContext) executeBulkInsert(ctx context.Context, db, coll string,
zap.Int64("endTime", endTime))
var taskId int64
var err error
if isL0 {
if endTime == 0 {
taskId, err = b.getMilvusClient().BulkInsert(ctx, db, coll, partition, files, gomilvus.IsL0())
} else {
taskId, err = b.getMilvusClient().BulkInsert(ctx, db, coll, partition, files, gomilvus.IsL0(), gomilvus.WithEndTs(endTime))
}
if endTime == 0 {
taskId, err = b.getMilvusClient().BulkInsert(ctx, db, coll, partition, files, gomilvus.IsBackup(), gomilvus.IsL0(isL0), gomilvus.SkipDiskQuotaCheck(skipDiskQuotaCheck))
} else {
if endTime == 0 {
taskId, err = b.getMilvusClient().BulkInsert(ctx, db, coll, partition, files, gomilvus.IsBackup())
} else {
taskId, err = b.getMilvusClient().BulkInsert(ctx, db, coll, partition, files, gomilvus.IsBackup(), gomilvus.WithEndTs(endTime))
}
taskId, err = b.getMilvusClient().BulkInsert(ctx, db, coll, partition, files, gomilvus.IsBackup(), gomilvus.IsL0(isL0), gomilvus.SkipDiskQuotaCheck(skipDiskQuotaCheck), gomilvus.WithEndTs(endTime))
}
if err != nil {
log.Error("fail to bulk insert",
Expand Down
3 changes: 3 additions & 0 deletions core/proto/backup.proto
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,8 @@ message RestoreBackupRequest {
// if true, will skip collection, use when collection exist, restore index or data
bool skipCreateCollection = 15;
string id = 16;
// if true, skip the diskQuota in Import
bool skipImportDiskQuotaCheck = 17;
}

message RestorePartitionTask {
Expand Down Expand Up @@ -313,6 +315,7 @@ message RestoreCollectionTask {
bool dropExistIndex = 17;
// if true will skip create collections
bool skipCreateCollection = 18;
bool skipDiskQuotaCheck = 19;
}

message RestoreBackupTask {
Expand Down
Loading

0 comments on commit cd087e4

Please sign in to comment.