Skip to content

Commit

Permalink
Support pass database in create and restore request (#155)
Browse files Browse the repository at this point in the history
Signed-off-by: wayblink <[email protected]>
  • Loading branch information
wayblink authored Jul 12, 2023
1 parent ff36331 commit e9e5c7d
Show file tree
Hide file tree
Showing 12 changed files with 482 additions and 167 deletions.
22 changes: 21 additions & 1 deletion cmd/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"strings"

jsoniter "github.com/json-iterator/go"
"github.com/spf13/cobra"
"github.com/zilliztech/milvus-backup/core"
"github.com/zilliztech/milvus-backup/core/paramtable"
Expand All @@ -14,6 +15,8 @@ import (
var (
backupName string
collectionNames string
databases string
dbCollections string
)

var createBackupCmd = &cobra.Command{
Expand All @@ -35,17 +38,34 @@ var createBackupCmd = &cobra.Command{
} else {
collectionNameArr = strings.Split(collectionNames, ",")
}

if dbCollections == "" && databases != "" {
dbCollectionDict := make(map[string][]string)
splits := strings.Split(databases, ",")
for _, db := range splits {
dbCollectionDict[db] = []string{}
}
completeDbCollections, err := jsoniter.MarshalToString(dbCollectionDict)
dbCollections = completeDbCollections
if err != nil {
fmt.Println("illegal databases input")
return
}
}
resp := backupContext.CreateBackup(context, &backuppb.CreateBackupRequest{
BackupName: backupName,
CollectionNames: collectionNameArr,
DbCollections: dbCollections,
})
fmt.Println(resp.GetCode(), "\n", resp.GetMsg())
},
}

func init() {
createBackupCmd.Flags().StringVarP(&backupName, "name", "n", "", "backup name, if unset will generate a name automatically")
createBackupCmd.Flags().StringVarP(&collectionNames, "colls", "", "", "collectionNames to backup, use ',' to connect multiple collections")
createBackupCmd.Flags().StringVarP(&collectionNames, "colls", "c", "", "collectionNames to backup, use ',' to connect multiple collections")
createBackupCmd.Flags().StringVarP(&databases, "databases", "d", "", "databases to backup")
createBackupCmd.Flags().StringVarP(&dbCollections, "database_collections", "f", "", "databases and collections to backup, json format: {\"db1\":[\"c1\", \"c2\"],\"db2\":[]}")

rootCmd.AddCommand(createBackupCmd)
}
27 changes: 23 additions & 4 deletions cmd/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"strings"

jsoniter "github.com/json-iterator/go"
"github.com/spf13/cobra"
"github.com/zilliztech/milvus-backup/core"
"github.com/zilliztech/milvus-backup/core/paramtable"
Expand All @@ -15,10 +16,12 @@ import (
)

var (
restoreBackupName string
restoreCollectionNames string
renameSuffix string
renameCollectionNames string
restoreBackupName string
restoreCollectionNames string
renameSuffix string
renameCollectionNames string
restoreDatabases string
restoreDatabaseCollections string
)

var restoreBackupCmd = &cobra.Command{
Expand Down Expand Up @@ -50,11 +53,25 @@ var restoreBackupCmd = &cobra.Command{
}
}

if restoreDatabaseCollections == "" && restoreDatabases != "" {
dbCollectionDict := make(map[string][]string)
splits := strings.Split(restoreDatabases, ",")
for _, db := range splits {
dbCollectionDict[db] = []string{}
}
completeDbCollections, err := jsoniter.MarshalToString(dbCollectionDict)
restoreDatabaseCollections = completeDbCollections
if err != nil {
fmt.Println("illegal databases input")
return
}
}
resp := backupContext.RestoreBackup(context, &backuppb.RestoreBackupRequest{
BackupName: restoreBackupName,
CollectionNames: collectionNameArr,
CollectionSuffix: renameSuffix,
CollectionRenames: renameMap,
DbCollections: restoreDatabaseCollections,
})

fmt.Println(resp.GetCode(), "\n", resp.GetMsg())
Expand All @@ -66,6 +83,8 @@ func init() {
restoreBackupCmd.Flags().StringVarP(&restoreCollectionNames, "collections", "c", "", "collectionNames to restore")
restoreBackupCmd.Flags().StringVarP(&renameSuffix, "suffix", "s", "", "add a suffix to collection name to restore")
restoreBackupCmd.Flags().StringVarP(&renameCollectionNames, "rename", "r", "", "rename collections to new names")
restoreBackupCmd.Flags().StringVarP(&restoreDatabases, "databases", "d", "", "databases to restore, if not set, restore all databases")
restoreBackupCmd.Flags().StringVarP(&restoreDatabaseCollections, "database_collections", "f", "", "databases and collections to restore, json format: {\"db1\":[\"c1\", \"c2\"],\"db2\":[]}")

rootCmd.AddCommand(restoreBackupCmd)
}
39 changes: 39 additions & 0 deletions core/backup_impl_create_backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"strings"
"time"

jsoniter "github.com/json-iterator/go"
"github.com/milvus-io/milvus-sdk-go/v2/entity"
"go.uber.org/zap"

Expand All @@ -25,6 +26,7 @@ func (b BackupContext) CreateBackup(ctx context.Context, request *backuppb.Creat
zap.String("requestId", request.GetRequestId()),
zap.String("backupName", request.GetBackupName()),
zap.Strings("collections", request.GetCollectionNames()),
zap.String("databaseCollections", request.GetDbCollections()),
zap.Bool("async", request.GetAsync()))

resp := &backuppb.BackupInfoResponse{
Expand Down Expand Up @@ -130,11 +132,48 @@ type collection struct {
collectionName string
}

// parse collections to backup
// For backward compatibility:
// 1,parse dbCollections first,
// 2,if dbCollections not set, use collectionNames
func (b BackupContext) parseBackupCollections(request *backuppb.CreateBackupRequest) ([]collection, error) {
log.Debug("Request collection names",
zap.Strings("request_collection_names", request.GetCollectionNames()),
zap.Int("length", len(request.GetCollectionNames())))
var toBackupCollections []collection

// first priority: dbCollections
if request.GetDbCollections() != "" {
var dbCollections DbCollections
err := jsoniter.UnmarshalFromString(request.GetDbCollections(), &dbCollections)
if err != nil {
log.Error("fail in unmarshal dbCollections in CreateBackupRequest", zap.String("dbCollections", request.GetDbCollections()), zap.Error(err))
return nil, err
}
for db, collections := range dbCollections {
if len(collections) == 0 {
err := b.milvusClient.UsingDatabase(b.ctx, db)
if err != nil {
log.Error("fail to call SDK use database", zap.Error(err))
return nil, err
}
collections, err := b.milvusClient.ListCollections(b.ctx)
if err != nil {
log.Error("fail in ListCollections", zap.Error(err))
return nil, err
}
for _, coll := range collections {
toBackupCollections = append(toBackupCollections, collection{db, coll.Name})
}
} else {
for _, coll := range collections {
toBackupCollections = append(toBackupCollections, collection{db, coll})
}
}
}
return toBackupCollections, nil
}

if request.GetCollectionNames() == nil || len(request.GetCollectionNames()) == 0 {
dbs, err := b.milvusClient.ListDatabases(b.ctx)
if err != nil {
Expand Down
36 changes: 34 additions & 2 deletions core/backup_impl_restore_backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"time"

"github.com/cockroachdb/errors"
jsoniter "github.com/json-iterator/go"
gomilvus "github.com/milvus-io/milvus-sdk-go/v2/client"
"github.com/milvus-io/milvus-sdk-go/v2/entity"
"go.uber.org/zap"
Expand All @@ -29,7 +30,8 @@ func (b BackupContext) RestoreBackup(ctx context.Context, request *backuppb.Rest
zap.Any("CollectionRenames", request.GetCollectionRenames()),
zap.Bool("async", request.GetAsync()),
zap.String("bucketName", request.GetBucketName()),
zap.String("path", request.GetPath()))
zap.String("path", request.GetPath()),
zap.String("databaseCollections", request.GetDbCollections()))

resp := &backuppb.RestoreBackupResponse{
RequestId: request.GetRequestId(),
Expand Down Expand Up @@ -99,7 +101,36 @@ func (b BackupContext) RestoreBackup(ctx context.Context, request *backuppb.Rest

// 2, initial restoreCollectionTasks
toRestoreCollectionBackups := make([]*backuppb.CollectionBackupInfo, 0)
if len(request.GetCollectionNames()) == 0 {

if request.GetDbCollections() != "" {
var dbCollections DbCollections
err := jsoniter.UnmarshalFromString(request.GetDbCollections(), &dbCollections)
if err != nil {
log.Error("fail in unmarshal dbCollections in RestoreBackupRequest", zap.String("dbCollections", request.GetDbCollections()), zap.Error(err))
errorMsg := fmt.Sprintf("fail in unmarshal dbCollections in RestoreBackupRequest, dbCollections: %s, err: %s", request.GetDbCollections(), err)
log.Error(errorMsg)
resp.Code = backuppb.ResponseCode_Fail
resp.Msg = errorMsg
return resp
}
for db, collections := range dbCollections {
if len(collections) == 0 {
for _, collectionBackup := range backup.GetCollectionBackups() {
if collectionBackup.GetDbName() == db {
toRestoreCollectionBackups = append(toRestoreCollectionBackups, collectionBackup)
}
}
} else {
for _, coll := range collections {
for _, collectionBackup := range backup.GetCollectionBackups() {
if collectionBackup.GetDbName() == db && collectionBackup.CollectionName == coll {
toRestoreCollectionBackups = append(toRestoreCollectionBackups, collectionBackup)
}
}
}
}
}
} else if len(request.GetCollectionNames()) == 0 {
toRestoreCollectionBackups = backup.GetCollectionBackups()
} else {
collectionNameDict := make(map[string]bool)
Expand Down Expand Up @@ -139,6 +170,7 @@ func (b BackupContext) RestoreBackup(ctx context.Context, request *backuppb.Rest
targetCollectionName = backupCollectionName
}

b.milvusClient.UsingDatabase(ctx, restoreCollection.DbName)
exist, err := b.milvusClient.HasCollection(ctx, targetCollectionName)
if err != nil {
errorMsg := fmt.Sprintf("fail to check whether the collection is exist, collection_name: %s, err: %s", targetCollectionName, err)
Expand Down
4 changes: 4 additions & 0 deletions core/backup_meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,9 @@ func SimpleBackupResponse(input *backuppb.BackupInfoResponse) *backuppb.BackupIn
collections = append(collections, &backuppb.CollectionBackupInfo{
StateCode: coll.GetStateCode(),
ErrorMessage: coll.GetErrorMessage(),
DbName: coll.GetDbName(),
CollectionName: coll.GetCollectionName(),
CollectionId: coll.GetCollectionId(),
BackupTimestamp: coll.GetBackupTimestamp(),
HasIndex: coll.GetHasIndex(),
IndexInfos: coll.GetIndexInfos(),
Expand Down Expand Up @@ -338,3 +340,5 @@ func UpdateRestoreBackupTask(input *backuppb.RestoreBackupTask) *backuppb.Restor
}
return input
}

type DbCollections = map[string][]string
92 changes: 92 additions & 0 deletions core/backup_meta_test.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,14 @@
package core

import (
"bufio"
"fmt"
"io"
"os"
"strconv"
"testing"

jsoniter "github.com/json-iterator/go"
"github.com/stretchr/testify/assert"

"github.com/zilliztech/milvus-backup/core/proto/backuppb"
Expand Down Expand Up @@ -139,3 +144,90 @@ func TestBackupSerialize(t *testing.T) {
deserBackup, err := deserialize(serData)
log.Info(deserBackup.String())
}

func TestDbCollectionJson(t *testing.T) {
dbCollection := DbCollections{"db1": []string{"coll1", "coll2"}, "db2": []string{"coll3", "coll4"}}
jsonStr, err := jsoniter.MarshalToString(dbCollection)
assert.NoError(t, err)
println(jsonStr)

var dbCollection2 DbCollections
jsoniter.UnmarshalFromString(jsonStr, &dbCollection2)
println(dbCollection2)
}

func readBackup(backupDir string) (*backuppb.BackupInfo, error) {
readByteFunc := func(filepath string) ([]byte, error) {
file, err := os.OpenFile(filepath, os.O_RDWR, 0666)
if err != nil {
fmt.Println("Open file error!", err)
return nil, err
}

// Get the file size
stat, err := file.Stat()
if err != nil {
fmt.Println(err)
return nil, err
}

bs := make([]byte, stat.Size())
_, err = bufio.NewReader(file).Read(bs)
if err != nil && err != io.EOF {
fmt.Println(err)
return nil, err
}
return bs, nil
}

backupPath := backupDir + "/backup_meta.json"
collectionPath := backupDir + "/collection_meta.json"
partitionPath := backupDir + "/partition_meta.json"
segmentPath := backupDir + "/segment_meta.json"

backupMetaBytes, err := readByteFunc(backupPath)
if err != nil {
return nil, err
}
collectionBackupMetaBytes, err := readByteFunc(collectionPath)
if err != nil {
return nil, err
}
partitionBackupMetaBytes, err := readByteFunc(partitionPath)
if err != nil {
return nil, err
}
segmentBackupMetaBytes, err := readByteFunc(segmentPath)
if err != nil {
return nil, err
}

completeBackupMetas := &BackupMetaBytes{
BackupMetaBytes: backupMetaBytes,
CollectionMetaBytes: collectionBackupMetaBytes,
PartitionMetaBytes: partitionBackupMetaBytes,
SegmentMetaBytes: segmentBackupMetaBytes,
}

deserBackup, err := deserialize(completeBackupMetas)

return deserBackup, err
}

func TestReadBackupFile(t *testing.T) {
filepath := "/tmp/hxs_meta"

backupInfo, err := readBackup(filepath)
assert.NoError(t, err)

levelBackupInfo, err := treeToLevel(backupInfo)
assert.NoError(t, err)
assert.NotNil(t, levelBackupInfo)

output, _ := serialize(backupInfo)
BackupMetaStr := string(output.BackupMetaBytes)
segmentMetaStr := string(output.SegmentMetaBytes)
fmt.Sprintf(BackupMetaStr)
fmt.Sprintf(segmentMetaStr)
//log.Info("segment meta", zap.String("value", string(output.SegmentMetaBytes)))
}
4 changes: 4 additions & 0 deletions core/proto/backup.proto
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,8 @@ message CreateBackupRequest {
repeated string collection_names = 3;
// async or not
bool async = 4;
// database and collections to backup. A json string. To support database. 2023.7.7
string db_collections = 5;
}

/**
Expand Down Expand Up @@ -239,6 +241,8 @@ message RestoreBackupRequest {
string bucket_name = 7;
// if bucket_name and path is set. will override bucket/path in config.
string path = 8;
// database and collections to restore. A json string. To support database. 2023.7.7
string db_collections = 9;
}

message RestorePartitionTask {
Expand Down
Loading

0 comments on commit e9e5c7d

Please sign in to comment.