Skip to content

Commit

Permalink
Merge #130064
Browse files Browse the repository at this point in the history
130064: upgrade: dynamically adjust batch size for automatic descriptor repair r=fqazi a=fqazi

Previously, an upgrade was added to repair any descriptor corruptions detected. The defaults for this upgrade attempted to fix 1,000 descriptors within a transaction. This could end up failing with retry errors if the batch encountered any competing high-priority transactions like leasing. To address this, this patch dynamically adjusts the batch size either when excessive retries are observed or the batch of updates takes too long. Once the batch is small enough, only high-priority transactions are used.

Fixes: #129949

Release note (bug fix): Addressed a bug in the upgrade pre-condition for repairing descriptor corruption, which could lead to finalization being stuck

Co-authored-by: Faizan Qazi <[email protected]>
  • Loading branch information
craig[bot] and fqazi committed Sep 10, 2024
2 parents c63a63b + 01e5a47 commit 1b0a374
Show file tree
Hide file tree
Showing 3 changed files with 268 additions and 28 deletions.
4 changes: 4 additions & 0 deletions pkg/upgrade/upgrades/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ go_library(
"//pkg/sql/catalog/descidgen",
"//pkg/sql/catalog/descpb",
"//pkg/sql/catalog/descs",
"//pkg/sql/catalog/lease",
"//pkg/sql/catalog/nstree",
"//pkg/sql/catalog/schematelemetry/schematelemetrycontroller",
"//pkg/sql/catalog/systemschema",
Expand All @@ -59,6 +60,7 @@ go_library(
"//pkg/util/envutil",
"//pkg/util/log",
"//pkg/util/protoutil",
"//pkg/util/timeutil",
"@com_github_cockroachdb_errors//:errors",
"@com_github_cockroachdb_redact//:redact",
"@com_github_kr_pretty//:pretty",
Expand Down Expand Up @@ -129,6 +131,7 @@ go_test(
"//pkg/sql/isql",
"//pkg/sql/privilege",
"//pkg/sql/sem/builtins/builtinconstants",
"//pkg/sql/sem/eval",
"//pkg/sql/sem/tree",
"//pkg/sql/sqlinstance/instancestorage",
"//pkg/sql/sqlliveness/slstorage",
Expand All @@ -141,6 +144,7 @@ go_test(
"//pkg/testutils/testcluster",
"//pkg/upgrade",
"//pkg/upgrade/upgradebase",
"//pkg/util/ctxgroup",
"//pkg/util/hlc",
"//pkg/util/leaktest",
"//pkg/util/log",
Expand Down
113 changes: 86 additions & 27 deletions pkg/upgrade/upgrades/first_upgrade.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,18 @@ import (
"fmt"

"github.com/cockroachdb/cockroach/pkg/clusterversion"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/catalogkeys"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descs"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/lease"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/nstree"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/upgrade"
"github.com/cockroachdb/cockroach/pkg/util/envutil"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/errors"
)

Expand Down Expand Up @@ -54,25 +57,18 @@ func FirstUpgradeFromRelease(
}); err != nil {
return err
}
var batch catalog.DescriptorIDSet
const batchSize = 1000
var descsToUpdate catalog.DescriptorIDSet
if err := all.ForEachDescriptor(func(desc catalog.Descriptor) error {
changes := desc.GetPostDeserializationChanges()
if !changes.HasChanges() || (changes.Len() == 1 && changes.Contains(catalog.SetModTimeToMVCCTimestamp)) {
return nil
}
batch.Add(desc.GetID())
if batch.Len() >= batchSize {
if err := upgradeDescriptors(ctx, d, batch); err != nil {
return err
}
batch = catalog.MakeDescriptorIDSet()
}
descsToUpdate.Add(desc.GetID())
return nil
}); err != nil {
return err
}
return upgradeDescriptors(ctx, d, batch)
return upgradeDescriptors(ctx, d, descsToUpdate)
}

// upgradeDescriptors round-trips the descriptor protobufs for the given keys
Expand All @@ -83,18 +79,52 @@ func upgradeDescriptors(
if ids.Empty() {
return nil
}
return d.DB.DescsTxn(ctx, func(ctx context.Context, txn descs.Txn) error {
muts, err := txn.Descriptors().MutableByID(txn.KV()).Descs(ctx, ids.Ordered())
batchSize := 100
// Any batch size below this will use high priority.
const HighPriBatchSize = 25
repairBatchTimeLimit := lease.LeaseDuration.Get(&d.Settings.SV)
currentIdx := 0
idsToRewrite := ids.Ordered()
for currentIdx <= len(idsToRewrite) {
descBatch := idsToRewrite[currentIdx:min(currentIdx+batchSize, len(idsToRewrite))]
err := timeutil.RunWithTimeout(ctx, "repair-post-deserialization", repairBatchTimeLimit, func(ctx context.Context) error {
return d.DB.DescsTxn(ctx, func(ctx context.Context, txn descs.Txn) error {
if batchSize <= HighPriBatchSize {
if err := txn.KV().SetUserPriority(roachpb.MaxUserPriority); err != nil {
return err
}
}
muts, err := txn.Descriptors().MutableByID(txn.KV()).Descs(ctx, descBatch)
if err != nil {
return err
}
b := txn.KV().NewBatch()
for _, mut := range muts {
if !mut.GetPostDeserializationChanges().HasChanges() {
continue
}
key := catalogkeys.MakeDescMetadataKey(d.Codec, mut.GetID())
b.CPut(key, mut.DescriptorProto(), mut.GetRawBytesInStorage())
}
return txn.KV().Run(ctx, b)
})
})
if err != nil {
// If either the operation hits the retry limit or
// times out, then reduce the batch size.
if kv.IsAutoRetryLimitExhaustedError(err) ||
errors.HasType(err, (*timeutil.TimeoutError)(nil)) {
batchSize = max(batchSize/2, 1)
log.Infof(ctx, "reducing batch size of invalid_object repair query to %d (hipri=%t)",
batchSize,
batchSize <= HighPriBatchSize)
continue
}
return err
}
b := txn.KV().NewBatch()
for _, mut := range muts {
key := catalogkeys.MakeDescMetadataKey(d.Codec, mut.GetID())
b.CPut(key, mut.DescriptorProto(), mut.GetRawBytesInStorage())
}
return txn.KV().Run(ctx, b)
})
currentIdx += batchSize
}
return nil
}

var firstUpgradePreconditionUsesAOST = true
Expand Down Expand Up @@ -165,23 +195,52 @@ FROM
FROM
"".crdb_internal.kv_repairable_catalog_corruptions
LIMIT
1000
$1
)
WHERE
was_repaired`
batchSize := 100
// Any batch size below this will use high priority.
const HighPriBatchSize = 25
repairBatchTimeLimit := lease.LeaseDuration.Get(&d.Settings.SV)
for {
row, err := d.InternalExecutor.QueryRow(
ctx, "repair-catalog-corruptions", nil /* txn */, repairQuery,
)
var rowsUpdated tree.DInt
err := timeutil.RunWithTimeout(ctx, "descriptor-repair", repairBatchTimeLimit, func(ctx context.Context) error {
return d.DB.DescsTxn(ctx, func(ctx context.Context, txn descs.Txn) error {
if batchSize <= HighPriBatchSize {
if err = txn.KV().SetUserPriority(roachpb.MaxUserPriority); err != nil {
return err
}
}
row, err := txn.QueryRow(
ctx, "repair-catalog-corruptions", txn.KV() /* txn */, repairQuery, batchSize,
)
if err != nil {
return err
}
rowsUpdated = tree.MustBeDInt(row[0])
return nil
})
})
if err != nil {
// If either the operation hits the retry limit or
// times out, then reduce the batch size.
if kv.IsAutoRetryLimitExhaustedError(err) ||
errors.HasType(err, (*timeutil.TimeoutError)(nil)) {
batchSize = max(batchSize/2, 1)
log.Infof(ctx, "reducing batch size of invalid_object repair query to %d (hipri=%t)",
batchSize,
batchSize <= HighPriBatchSize)
continue
}
// Otherwise, return any unknown errors.
return err
}
c := tree.MustBeDInt(row[0])
if c == 0 {
if rowsUpdated == 0 {
break
}
n += int(c)
log.Infof(ctx, "repaired %d catalog corruptions", c)
n += int(rowsUpdated)
log.Infof(ctx, "repaired %d catalog corruptions", rowsUpdated)
}
if n == 0 {
log.Info(ctx, "no catalog corruptions found to repair during upgrade attempt")
Expand Down
Loading

0 comments on commit 1b0a374

Please sign in to comment.