Skip to content

Commit

Permalink
Add force backup option (#195)
Browse files Browse the repository at this point in the history
Signed-off-by: wayblink <[email protected]>
  • Loading branch information
wayblink authored and yelusion2 committed Oct 16, 2023
1 parent 6d4dbbf commit b616e60
Show file tree
Hide file tree
Showing 5 changed files with 256 additions and 224 deletions.
5 changes: 4 additions & 1 deletion cmd/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ var (
collectionNames string
databases string
dbCollections string
force bool
)

var createBackupCmd = &cobra.Command{
Expand Down Expand Up @@ -57,6 +58,7 @@ var createBackupCmd = &cobra.Command{
BackupName: backupName,
CollectionNames: collectionNameArr,
DbCollections: utils.WrapDBCollections(dbCollections),
Force: force,
})
fmt.Println(resp.GetCode(), "\n", resp.GetMsg())
},
Expand All @@ -66,7 +68,8 @@ func init() {
createBackupCmd.Flags().StringVarP(&backupName, "name", "n", "", "backup name, if unset will generate a name automatically")
createBackupCmd.Flags().StringVarP(&collectionNames, "colls", "c", "", "collectionNames to backup, use ',' to connect multiple collections")
createBackupCmd.Flags().StringVarP(&databases, "databases", "d", "", "databases to backup")
createBackupCmd.Flags().StringVarP(&dbCollections, "database_collections", "f", "", "databases and collections to backup, json format: {\"db1\":[\"c1\", \"c2\"],\"db2\":[]}")
createBackupCmd.Flags().StringVarP(&dbCollections, "database_collections", "a", "", "databases and collections to backup, json format: {\"db1\":[\"c1\", \"c2\"],\"db2\":[]}")
createBackupCmd.Flags().BoolVarP(&force, "force", "f", false, "force backup skip flush, should make sure data has been stored into disk when using it")

rootCmd.AddCommand(createBackupCmd)
}
116 changes: 64 additions & 52 deletions core/backup_impl_create_backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -390,64 +390,76 @@ func (b *BackupContext) executeCreateBackup(ctx context.Context, request *backup
}
}

// Flush
segmentEntitiesBeforeFlush, err := b.getMilvusClient().GetPersistentSegmentInfo(ctx, collection.GetCollectionName())
if err != nil {
return backupInfo, err
}
log.Info("GetPersistentSegmentInfo before flush from milvus",
zap.String("collectionName", collection.GetCollectionName()),
zap.Int("segmentNumBeforeFlush", len(segmentEntitiesBeforeFlush)))

newSealedSegmentIDs, flushedSegmentIDs, timeOfSeal, err := b.getMilvusClient().FlushV2(ctx, collection.GetCollectionName(), false)
if err != nil {
log.Error(fmt.Sprintf("fail to flush the collection: %s", collection.GetCollectionName()))
return backupInfo, err
}
log.Info("flush segments",
zap.String("collectionName", collection.GetCollectionName()),
zap.Int64s("newSealedSegmentIDs", newSealedSegmentIDs),
zap.Int64s("flushedSegmentIDs", flushedSegmentIDs),
zap.Int64("timeOfSeal", timeOfSeal))
collection.BackupTimestamp = utils.ComposeTS(timeOfSeal, 0)
collection.BackupPhysicalTimestamp = uint64(timeOfSeal)

flushSegmentIDs := append(newSealedSegmentIDs, flushedSegmentIDs...)
segmentEntitiesAfterFlush, err := b.getMilvusClient().GetPersistentSegmentInfo(ctx, collection.GetCollectionName())
if err != nil {
return backupInfo, err
}
log.Info("GetPersistentSegmentInfo after flush from milvus",
zap.String("collectionName", collection.GetCollectionName()),
zap.Int("segmentNumBeforeFlush", len(segmentEntitiesBeforeFlush)),
zap.Int("segmentNumAfterFlush", len(segmentEntitiesAfterFlush)))

// fill segments
filledSegments := make([]*entity.Segment, 0)
segmentDict := utils.ArrayToMap(flushSegmentIDs)
for _, seg := range segmentEntitiesAfterFlush {
sid := seg.ID
if _, ok := segmentDict[sid]; ok {
delete(segmentDict, sid)
filledSegments = append(filledSegments, seg)
} else {
log.Debug("this may be new segments after flush, skip it", zap.Int64("id", sid))
if !request.GetForce() {
// Flush
segmentEntitiesBeforeFlush, err := b.getMilvusClient().GetPersistentSegmentInfo(ctx, collection.GetCollectionName())
if err != nil {
return backupInfo, err
}
}
for _, seg := range segmentEntitiesBeforeFlush {
sid := seg.ID
if _, ok := segmentDict[sid]; ok {
delete(segmentDict, sid)
log.Info("GetPersistentSegmentInfo before flush from milvus",
zap.String("collectionName", collection.GetCollectionName()),
zap.Int("segmentNumBeforeFlush", len(segmentEntitiesBeforeFlush)))
newSealedSegmentIDs, flushedSegmentIDs, timeOfSeal, err := b.getMilvusClient().FlushV2(ctx, collection.GetCollectionName(), false)
if err != nil {
log.Error(fmt.Sprintf("fail to flush the collection: %s", collection.GetCollectionName()))
return backupInfo, err
}
log.Info("flush segments",
zap.String("collectionName", collection.GetCollectionName()),
zap.Int64s("newSealedSegmentIDs", newSealedSegmentIDs),
zap.Int64s("flushedSegmentIDs", flushedSegmentIDs),
zap.Int64("timeOfSeal", timeOfSeal))
collection.BackupTimestamp = utils.ComposeTS(timeOfSeal, 0)
collection.BackupPhysicalTimestamp = uint64(timeOfSeal)

flushSegmentIDs := append(newSealedSegmentIDs, flushedSegmentIDs...)
segmentEntitiesAfterFlush, err := b.getMilvusClient().GetPersistentSegmentInfo(ctx, collection.GetCollectionName())
if err != nil {
return backupInfo, err
}
log.Info("GetPersistentSegmentInfo after flush from milvus",
zap.String("collectionName", collection.GetCollectionName()),
zap.Int("segmentNumBeforeFlush", len(segmentEntitiesBeforeFlush)),
zap.Int("segmentNumAfterFlush", len(segmentEntitiesAfterFlush)))
segmentDict := utils.ArrayToMap(flushSegmentIDs)
for _, seg := range segmentEntitiesAfterFlush {
sid := seg.ID
if _, ok := segmentDict[sid]; ok {
delete(segmentDict, sid)
filledSegments = append(filledSegments, seg)
} else {
log.Debug("this may be new segments after flush, skip it", zap.Int64("id", sid))
}
}
for _, seg := range segmentEntitiesBeforeFlush {
sid := seg.ID
if _, ok := segmentDict[sid]; ok {
delete(segmentDict, sid)
filledSegments = append(filledSegments, seg)
} else {
log.Debug("this may be old segments before flush, skip it", zap.Int64("id", sid))
}
}
if len(segmentDict) > 0 {
// very rare situation, segments return in flush doesn't exist in either segmentEntitiesBeforeFlush and segmentEntitiesAfterFlush
errorMsg := "Segment return in Flush not exist in GetPersistentSegmentInfo. segment ids: " + fmt.Sprint(utils.MapKeyArray(segmentDict))
log.Warn(errorMsg)
}
} else {
// Flush
segmentEntitiesBeforeFlush, err := b.getMilvusClient().GetPersistentSegmentInfo(ctx, collection.GetCollectionName())
if err != nil {
return backupInfo, err
}
log.Info("GetPersistentSegmentInfo from milvus",
zap.String("collectionName", collection.GetCollectionName()),
zap.Int("segmentNum", len(segmentEntitiesBeforeFlush)))
for _, seg := range segmentEntitiesBeforeFlush {
filledSegments = append(filledSegments, seg)
} else {
log.Debug("this may be old segments before flush, skip it", zap.Int64("id", sid))
}
}
if len(segmentDict) > 0 {
// very rare situation, segments return in flush doesn't exist in either segmentEntitiesBeforeFlush and segmentEntitiesAfterFlush
errorMsg := "Segment return in Flush not exist in GetPersistentSegmentInfo. segment ids: " + fmt.Sprint(utils.MapKeyArray(segmentDict))
log.Warn(errorMsg)
}

if err != nil {
collection.StateCode = backuppb.BackupTaskStateCode_BACKUP_FAIL
Expand Down
8 changes: 7 additions & 1 deletion core/backup_impl_restore_backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -560,7 +560,13 @@ func (b *BackupContext) executeBulkInsert(ctx context.Context, coll string, part
zap.String("partition", partition),
zap.Strings("files", files),
zap.Int64("endTime", endTime))
taskId, err := b.getMilvusClient().BulkInsert(ctx, coll, partition, files, gomilvus.IsBackup(), gomilvus.WithEndTs(endTime))
var taskId int64
var err error
if endTime == 0 {
taskId, err = b.getMilvusClient().BulkInsert(ctx, coll, partition, files, gomilvus.IsBackup())
} else {
taskId, err = b.getMilvusClient().BulkInsert(ctx, coll, partition, files, gomilvus.IsBackup(), gomilvus.WithEndTs(endTime))
}
if err != nil {
log.Error("fail to bulk insert",
zap.Error(err),
Expand Down
2 changes: 2 additions & 0 deletions core/proto/backup.proto
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,8 @@ message CreateBackupRequest {
bool async = 4;
// database and collections to backup. A json string. To support database. 2023.7.7
google.protobuf.Value db_collections = 5;
// force backup skip flush, Should make sure data has been stored into disk when using it
bool force = 6;
}

/**
Expand Down
Loading

0 comments on commit b616e60

Please sign in to comment.