Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

optimize restore account with cluster level pitr #19382

Merged
merged 8 commits into from
Oct 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 47 additions & 23 deletions pkg/frontend/pitr.go
Original file line number Diff line number Diff line change
Expand Up @@ -724,7 +724,7 @@ func doRestorePitr(ctx context.Context, ses *Session, stmt *tree.RestorePitr) (e

// get pitr name
pitrName := string(stmt.Name)
srcAccountName := string(stmt.AccountName)
accountName := string(stmt.AccountName)
dbName := string(stmt.DatabaseName)
tblName := string(stmt.TableName)

Expand Down Expand Up @@ -768,55 +768,79 @@ func doRestorePitr(ctx context.Context, ses *Session, stmt *tree.RestorePitr) (e
return err
}

if len(srcAccountName) > 0 {
if len(accountName) > 0 {
restoreOtherAccount := func() (rtnErr error) {
fromAccount := string(stmt.SrcAccountName)
var fromAccountId uint32
var toAccountId uint32
if len(fromAccount) == 0 {
fromAccount = pitr.accountName
fromAccountId = uint32(pitr.accountId)
if fromAccount == accountName {
// restore to the same account
toAccountId = fromAccountId
} else {
// restore to new account
toAccountId, rtnErr = getAccountId(ctx, bh, accountName)
if rtnErr != nil {
return rtnErr
}
}
} else {
if fromAccount == accountName {
// restore to the same account
fromAccountId, rtnErr = getAccountId(ctx, bh, fromAccount)
if rtnErr != nil {
return rtnErr
}
toAccountId = fromAccountId
} else {
// restore to new account
fromAccountId, rtnErr = getAccountId(ctx, bh, fromAccount)
if rtnErr != nil {
return rtnErr
}
toAccountId, rtnErr = getAccountId(ctx, bh, accountName)
if rtnErr != nil {
return rtnErr
}
}
}
toAccountId := uint32(pitr.accountId)

// check account exists or not
var accountExist bool
if accountExist, rtnErr = doCheckAccountExistsInPitrRestore(ctx, ses.GetService(), bh, pitrName, ts, fromAccount, toAccountId); rtnErr != nil {
if accountExist, rtnErr = doCheckAccountExistsInPitrRestore(ctx, ses.GetService(), bh, pitrName, ts, fromAccount, ses.GetAccountId()); rtnErr != nil {
return rtnErr
}
if !accountExist {
return moerr.NewInternalErrorf(ctx, "account `%s` does not exists at timestamp: %v", tenantInfo.GetTenant(), nanoTimeFormat(ts))
}
// mock snapshot
var snapshotName string
snapshotName, rtnErr = insertSnapshotRecord(ctx, ses.GetService(), bh, pitrName, ts, uint64(toAccountId), fromAccount)
snapshotName, rtnErr = insertSnapshotRecord(ctx, ses.GetService(), bh, pitrName, ts, uint64(fromAccountId), fromAccount)
defer func() {
deleteSnapshotRecord(ctx, ses.GetService(), bh, pitrName, snapshotName)
}()
if rtnErr != nil {
return rtnErr
}

restoreAccount := toAccountId

if srcAccountName != pitr.accountName {
// restore account to other account
toAccountId, rtnErr = getAccountId(ctx, bh, string(stmt.AccountName))
if rtnErr != nil {
return rtnErr
}
}

restoreAccount := fromAccountId
// drop foreign key related tables first
if err = deleteCurFkTables(ctx, ses.GetService(), bh, dbName, tblName, toAccountId); err != nil {
rtnErr = deleteCurFkTables(ctx, ses.GetService(), bh, dbName, tblName, toAccountId)
if err != nil {
return
}

// get topo sorted tables with foreign key
sortedFkTbls, err := fkTablesTopoSort(ctx, bh, snapshotName, dbName, tblName)
if err != nil {
sortedFkTbls, rtnErr := fkTablesTopoSort(ctx, bh, snapshotName, dbName, tblName)
if rtnErr != nil {
return
}

// get foreign key table infos
fkTableMap, err := getTableInfoMap(ctx, ses.GetService(), bh, snapshotName, dbName, tblName, sortedFkTbls)
if err != nil {
fkTableMap, rtnErr := getTableInfoMap(ctx, ses.GetService(), bh, snapshotName, dbName, tblName, sortedFkTbls)
if rtnErr != nil {
return
}

Expand All @@ -829,18 +853,18 @@ func doRestorePitr(ctx context.Context, ses *Session, stmt *tree.RestorePitr) (e
}

if len(fkTableMap) > 0 {
if err = restoreTablesWithFk(ctx, ses.GetService(), bh, snapshotName, sortedFkTbls, fkTableMap, toAccountId, ts); err != nil {
if rtnErr = restoreTablesWithFk(ctx, ses.GetService(), bh, snapshotName, sortedFkTbls, fkTableMap, toAccountId, ts); rtnErr != nil {
return
}
}

if len(viewMap) > 0 {
if err = restoreViews(ctx, ses, bh, snapshotName, viewMap, toAccountId); err != nil {
if rtnErr = restoreViews(ctx, ses, bh, snapshotName, viewMap, toAccountId); rtnErr != nil {
return
}
}
// checks if the given context has been canceled.
if err = CancelCheck(ctx); err != nil {
if rtnErr = CancelCheck(ctx); rtnErr != nil {
return
}
return rtnErr
Expand Down
Loading
Loading