From 01e5a47ff8bff10e033376bd7bebbd33dc4d2fee Mon Sep 17 00:00:00 2001 From: Faizan Qazi Date: Tue, 3 Sep 2024 23:27:20 -0400 Subject: [PATCH] upgrade: dynamically adjust batch size for automatic descriptor repair Previously, an upgrade was added to repair any descriptor corruptions detected. The defaults for this upgrade attempted to fix 1,000 descriptors within a transaction. This could end up failing with retry errors if the batch encountered any competing high-priority transactions like leasing. To address this, this patch dynamically adjusts the batch size either when excessive retries are observed or the batch of updates takes too long. Once the batch is small enough, only high-priority transactions are used. Fixes: #129949 Release note (bug fix): Addressed a bug in the upgrade pre-condition for repairing descriptor corruption, which could lead to finalization being stuck --- pkg/upgrade/upgrades/BUILD.bazel | 4 + pkg/upgrade/upgrades/first_upgrade.go | 113 +++++++++---- pkg/upgrade/upgrades/first_upgrade_test.go | 179 ++++++++++++++++++++- 3 files changed, 268 insertions(+), 28 deletions(-) diff --git a/pkg/upgrade/upgrades/BUILD.bazel b/pkg/upgrade/upgrades/BUILD.bazel index ade1a3ba5a70..ad47ad26a26c 100644 --- a/pkg/upgrade/upgrades/BUILD.bazel +++ b/pkg/upgrade/upgrades/BUILD.bazel @@ -44,6 +44,7 @@ go_library( "//pkg/sql/catalog/descidgen", "//pkg/sql/catalog/descpb", "//pkg/sql/catalog/descs", + "//pkg/sql/catalog/lease", "//pkg/sql/catalog/nstree", "//pkg/sql/catalog/schematelemetry/schematelemetrycontroller", "//pkg/sql/catalog/systemschema", @@ -58,6 +59,7 @@ go_library( "//pkg/util/envutil", "//pkg/util/log", "//pkg/util/protoutil", + "//pkg/util/timeutil", "@com_github_cockroachdb_errors//:errors", "@com_github_cockroachdb_redact//:redact", "@com_github_kr_pretty//:pretty", @@ -124,6 +126,7 @@ go_test( "//pkg/sql/isql", "//pkg/sql/privilege", "//pkg/sql/sem/builtins/builtinconstants", + "//pkg/sql/sem/eval", "//pkg/sql/sem/tree", "//pkg/sql/types", "//pkg/testutils", @@ -133,6 +136,7 @@ go_test( "//pkg/testutils/testcluster", "//pkg/upgrade", "//pkg/upgrade/upgradebase", + "//pkg/util/ctxgroup", "//pkg/util/hlc", "//pkg/util/leaktest", "//pkg/util/log", diff --git a/pkg/upgrade/upgrades/first_upgrade.go b/pkg/upgrade/upgrades/first_upgrade.go index 5eae657305dd..6a11d232bcb5 100644 --- a/pkg/upgrade/upgrades/first_upgrade.go +++ b/pkg/upgrade/upgrades/first_upgrade.go @@ -15,15 +15,18 @@ import ( "fmt" "github.com/cockroachdb/cockroach/pkg/clusterversion" + "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/sql/catalog" "github.com/cockroachdb/cockroach/pkg/sql/catalog/catalogkeys" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descs" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/lease" "github.com/cockroachdb/cockroach/pkg/sql/catalog/nstree" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/upgrade" "github.com/cockroachdb/cockroach/pkg/util/envutil" "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/errors" ) @@ -54,25 +57,18 @@ func FirstUpgradeFromRelease( }); err != nil { return err } - var batch catalog.DescriptorIDSet - const batchSize = 1000 + var descsToUpdate catalog.DescriptorIDSet if err := all.ForEachDescriptor(func(desc catalog.Descriptor) error { changes := desc.GetPostDeserializationChanges() if !changes.HasChanges() || (changes.Len() == 1 && changes.Contains(catalog.SetModTimeToMVCCTimestamp)) { return nil } - batch.Add(desc.GetID()) - if batch.Len() >= batchSize { - if err := upgradeDescriptors(ctx, d, batch); err != nil { - return err - } - batch = catalog.MakeDescriptorIDSet() - } + descsToUpdate.Add(desc.GetID()) return nil }); err != nil { return err } - return upgradeDescriptors(ctx, d, batch) + return upgradeDescriptors(ctx, d, descsToUpdate) } // upgradeDescriptors round-trips the descriptor protobufs for the given keys @@ -83,18 +79,52 @@ func upgradeDescriptors( if ids.Empty() { return nil } - return d.DB.DescsTxn(ctx, func(ctx context.Context, txn descs.Txn) error { - muts, err := txn.Descriptors().MutableByID(txn.KV()).Descs(ctx, ids.Ordered()) + batchSize := 100 + // Any batch size below this will use high priority. + const HighPriBatchSize = 25 + repairBatchTimeLimit := lease.LeaseDuration.Get(&d.Settings.SV) + currentIdx := 0 + idsToRewrite := ids.Ordered() + for currentIdx <= len(idsToRewrite) { + descBatch := idsToRewrite[currentIdx:min(currentIdx+batchSize, len(idsToRewrite))] + err := timeutil.RunWithTimeout(ctx, "repair-post-deserialization", repairBatchTimeLimit, func(ctx context.Context) error { + return d.DB.DescsTxn(ctx, func(ctx context.Context, txn descs.Txn) error { + if batchSize <= HighPriBatchSize { + if err := txn.KV().SetUserPriority(roachpb.MaxUserPriority); err != nil { + return err + } + } + muts, err := txn.Descriptors().MutableByID(txn.KV()).Descs(ctx, descBatch) + if err != nil { + return err + } + b := txn.KV().NewBatch() + for _, mut := range muts { + if !mut.GetPostDeserializationChanges().HasChanges() { + continue + } + key := catalogkeys.MakeDescMetadataKey(d.Codec, mut.GetID()) + b.CPut(key, mut.DescriptorProto(), mut.GetRawBytesInStorage()) + } + return txn.KV().Run(ctx, b) + }) + }) if err != nil { + // If either the operation hits the retry limit or + // times out, then reduce the batch size. + if kv.IsAutoRetryLimitExhaustedError(err) || + errors.HasType(err, (*timeutil.TimeoutError)(nil)) { + batchSize = max(batchSize/2, 1) + log.Infof(ctx, "reducing batch size of invalid_object repair query to %d (hipri=%t)", + batchSize, + batchSize <= HighPriBatchSize) + continue + } return err } - b := txn.KV().NewBatch() - for _, mut := range muts { - key := catalogkeys.MakeDescMetadataKey(d.Codec, mut.GetID()) - b.CPut(key, mut.DescriptorProto(), mut.GetRawBytesInStorage()) - } - return txn.KV().Run(ctx, b) - }) + currentIdx += batchSize + } + return nil } var firstUpgradePreconditionUsesAOST = true @@ -165,23 +195,52 @@ FROM FROM "".crdb_internal.kv_repairable_catalog_corruptions LIMIT - 1000 + $1 ) WHERE was_repaired` + batchSize := 100 + // Any batch size below this will use high priority. + const HighPriBatchSize = 25 + repairBatchTimeLimit := lease.LeaseDuration.Get(&d.Settings.SV) for { - row, err := d.InternalExecutor.QueryRow( - ctx, "repair-catalog-corruptions", nil /* txn */, repairQuery, - ) + var rowsUpdated tree.DInt + err := timeutil.RunWithTimeout(ctx, "descriptor-repair", repairBatchTimeLimit, func(ctx context.Context) error { + return d.DB.DescsTxn(ctx, func(ctx context.Context, txn descs.Txn) error { + if batchSize <= HighPriBatchSize { + if err = txn.KV().SetUserPriority(roachpb.MaxUserPriority); err != nil { + return err + } + } + row, err := txn.QueryRow( + ctx, "repair-catalog-corruptions", txn.KV() /* txn */, repairQuery, batchSize, + ) + if err != nil { + return err + } + rowsUpdated = tree.MustBeDInt(row[0]) + return nil + }) + }) if err != nil { + // If either the operation hits the retry limit or + // times out, then reduce the batch size. + if kv.IsAutoRetryLimitExhaustedError(err) || + errors.HasType(err, (*timeutil.TimeoutError)(nil)) { + batchSize = max(batchSize/2, 1) + log.Infof(ctx, "reducing batch size of invalid_object repair query to %d (hipri=%t)", + batchSize, + batchSize <= HighPriBatchSize) + continue + } + // Otherwise, return any unknown errors. return err } - c := tree.MustBeDInt(row[0]) - if c == 0 { + if rowsUpdated == 0 { break } - n += int(c) - log.Infof(ctx, "repaired %d catalog corruptions", c) + n += int(rowsUpdated) + log.Infof(ctx, "repaired %d catalog corruptions", rowsUpdated) } if n == 0 { log.Info(ctx, "no catalog corruptions found to repair during upgrade attempt") diff --git a/pkg/upgrade/upgrades/first_upgrade_test.go b/pkg/upgrade/upgrades/first_upgrade_test.go index ed157c11e55c..61bc544a1f05 100644 --- a/pkg/upgrade/upgrades/first_upgrade_test.go +++ b/pkg/upgrade/upgrades/first_upgrade_test.go @@ -12,7 +12,9 @@ package upgrades_test import ( "context" + "fmt" "testing" + "time" "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/clusterversion" @@ -21,15 +23,23 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/server" "github.com/cockroachdb/cockroach/pkg/settings/cluster" + "github.com/cockroachdb/cockroach/pkg/sql" "github.com/cockroachdb/cockroach/pkg/sql/catalog" "github.com/cockroachdb/cockroach/pkg/sql/catalog/catalogkeys" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/catpb" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descbuilder" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/descs" "github.com/cockroachdb/cockroach/pkg/sql/catalog/desctestutils" "github.com/cockroachdb/cockroach/pkg/sql/catalog/funcdesc" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/lease" "github.com/cockroachdb/cockroach/pkg/sql/catalog/tabledesc" + "github.com/cockroachdb/cockroach/pkg/sql/sem/eval" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" + "github.com/cockroachdb/cockroach/pkg/testutils/skip" "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" + "github.com/cockroachdb/cockroach/pkg/upgrade/upgradebase" + "github.com/cockroachdb/cockroach/pkg/util/ctxgroup" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" @@ -154,10 +164,24 @@ func TestFirstUpgradeRepair(t *testing.T) { ctx := context.Background() settings := cluster.MakeTestingClusterSettingsWithVersions(v1, v0, false /* initializeVersion */) + // We intentionally create contention on the system.descriptor table by having + // a high priority txn that intentionally updates rows in this table. This txn + // also holds a lease on the system database descriptor, which we will wait to + // be released. Reducing the lease duration makes this part of the test speed + // up. + lease.LeaseDuration.Override(ctx, &settings.SV, time.Second*30) require.NoError(t, clusterversion.Initialize(ctx, v0, &settings.SV)) + upgradePausePoint := make(chan struct{}) + upgradeResumePoint := make(chan struct{}) + upgradeCompleted := make(chan struct{}) testServer, sqlDB, kvDB := serverutils.StartServer(t, base.TestServerArgs{ Settings: settings, Knobs: base.TestingKnobs{ + UpgradeManager: &upgradebase.TestingKnobs{ + InterlockPausePoint: upgradebase.AfterMigration, + InterlockReachedPausePointChannel: &upgradePausePoint, + InterlockResumeChannel: &upgradeResumePoint, + }, Server: &server.TestingKnobs{ DisableAutomaticVersionUpgrade: make(chan struct{}), ClusterVersionOverride: v0, @@ -269,13 +293,50 @@ func TestFirstUpgradeRepair(t *testing.T) { const qWaitForAOST = "SELECT count(*) FROM [SHOW DATABASES] AS OF SYSTEM TIME '-10s'" tdb.CheckQueryResultsRetry(t, qWaitForAOST, [][]string{{"5"}}) + // Intentionally block any attempt to upgrade by holding the descriptors + // with another txn. + tdb.Exec(t, "CREATE TABLE t1(n int)") + grp := ctxgroup.WithContext(ctx) + locksHeld := make(chan struct{}) + grp.GoCtx(func(ctx context.Context) error { + tx, err := sqlDB.BeginTx(ctx, nil) + if err != nil { + return err + } + for _, corruptDesc := range corruptDescs { + _, err = tx.Exec("SELECT crdb_internal.unsafe_upsert_descriptor($1, descriptor, true) FROM system.descriptor WHERE id=$1", corruptDesc.GetID()) + if err != nil { + return err + } + _, err = tx.Exec("INSERT INTO t1 VALUES(5)") + if err != nil { + return err + } + + } + close(locksHeld) + <-upgradePausePoint + doneUpgrade := false + for !doneUpgrade { + select { + case <-upgradePausePoint: + case <-upgradeCompleted: + doneUpgrade = true + case upgradeResumePoint <- struct{}{}: + } + } + return tx.Rollback() + }) + // Try upgrading the cluster version. // Precondition check should repair all corruptions and upgrade should succeed. const qUpgrade = "SET CLUSTER SETTING version = crdb_internal.node_executable_version()" + <-locksHeld tdb.Exec(t, qUpgrade) tdb.CheckQueryResults(t, qDetectCorruption, [][]string{{"0"}}) tdb.CheckQueryResults(t, qDetectRepairableCorruption, [][]string{{"0"}}) - + close(upgradeCompleted) + require.NoError(t, grp.Wait()) // Assert that a version upgrade is reflected for repaired descriptors (stricly one version upgrade). for _, d := range corruptDescs { descId := d.GetID() @@ -296,3 +357,119 @@ func TestFirstUpgradeRepair(t *testing.T) { tdb.CheckQueryResults(t, "SELECT * FROM test.public.foo", [][]string{{"1", "2", "42"}}) tdb.CheckQueryResults(t, "SELECT test.public.f()", [][]string{{"1"}}) } + +// TestFirstUpgradeRepairBatchSize validates that the maximum batch size +// used by the repair upgrade is sustainable. +func TestFirstUpgradeRepairBatchSize(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + // This test is fairly slow because of the large number of + // descriptors. + skip.UnderDuress(t) + + var ( + v0 = clusterversion.MinSupported.Version() + v1 = clusterversion.Latest.Version() + ) + + ctx := context.Background() + settings := cluster.MakeTestingClusterSettingsWithVersions(v1, v0, false /* initializeVersion */) + // We intentionally create contention on the system.descriptor table by having + // a high priority txn that intentionally updates rows in this table. This txn + // also holds a lease on the system database descriptor, which we will wait to + // be released. Reducing the lease duration makes this part of the test speed + // up. + lease.LeaseDuration.Override(ctx, &settings.SV, time.Second*30) + require.NoError(t, clusterversion.Initialize(ctx, v0, &settings.SV)) + testServer, sqlDB, _ := serverutils.StartServer(t, base.TestServerArgs{ + Settings: settings, + Knobs: base.TestingKnobs{ + Server: &server.TestingKnobs{ + DisableAutomaticVersionUpgrade: make(chan struct{}), + ClusterVersionOverride: v0, + }, + SQLEvalContext: &eval.TestingKnobs{ + ForceProductionValues: true, + }, + JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(), + }, + }) + defer testServer.Stopper().Stop(ctx) + + const totalDescriptorsToTest = 2500 + sqlRunner := sqlutils.MakeSQLRunner(sqlDB) + idb := testServer.InternalDB().(*sql.InternalDB) + tx := sqlRunner.Begin(t) + const batchSize = 100 + lastCommit := 0 + commitFn := func(startIdx int) { + require.NoError(t, tx.Commit()) + // Execute a txn to inject corruption + require.NoError(t, + idb.DescsTxn(ctx, func(ctx context.Context, txn descs.Txn) error { + for i := lastCommit; i < startIdx; i++ { + db, err := txn.Descriptors().MutableByName(txn.KV()).Database(ctx, "defaultdb") + if err != nil { + return err + } + schema, err := txn.Descriptors().MutableByName(txn.KV()).Schema(ctx, db, "public") + if err != nil { + return err + } + tbl, err := txn.Descriptors().MutableByName(txn.KV()).Table(ctx, db, schema, fmt.Sprintf("t_%d", i)) + if err != nil { + return err + } + nonExistentRole := i%2 == 0 + if nonExistentRole { + // Forces an invalid_object error. + tbl.Privileges.Users = append(tbl.Privileges.Users, catpb.UserPrivileges{UserProto: "bobby"}) + } else { + // Forces a post deserialization change. + tbl.Indexes[0].Invisibility = 0.0 + tbl.Indexes[0].NotVisible = true + } + txn.Descriptors().SkipValidationOnWrite() + err = txn.Descriptors().WriteDesc(ctx, false, tbl, txn.KV()) + if err != nil { + return err + } + } + return nil + })) + + lastCommit = startIdx + if startIdx == totalDescriptorsToTest { + return + } + tx = sqlRunner.Begin(t) + } + for i := 0; i < totalDescriptorsToTest; i++ { + if i%batchSize == 0 { + commitFn(i) + } + _, err := tx.Exec(fmt.Sprintf("CREATE TABLE t_%d(n int, j int, index(j))", i)) + require.NoError(t, err) + _, err = tx.Exec(fmt.Sprintf("SELECT 't_%d'::REGCLASS::OID", i)) + require.NoError(t, err) + } + commitFn(totalDescriptorsToTest) + // AOST query is run 10 seconds in the past below so wait for the corruption + // to appear. + time.Sleep(time.Second * 10) + + // Confirm the repairable descriptors exist, we will avoid scanning all of + // them avoid hitting the memory limit in testing. + require.Equal(t, + [][]string{{"true"}}, + sqlRunner.QueryStr(t, "SELECT count(*)>0 FROM \"\".crdb_internal.kv_repairable_catalog_corruptions")) + // Upgrade to the latest version. + sqlRunner.Exec(t, "SET CLUSTER SETTING version = crdb_internal.node_executable_version()") + // Confirm that all descriptors are repaired. + require.Equal(t, + [][]string{{"0"}}, + sqlRunner.QueryStr(t, "SELECT count(*) FROM \"\".crdb_internal.kv_repairable_catalog_corruptions"), + ) + +}