Skip to content

Commit

Permalink
crosscluster/logical: notify sql stats of ldr immediate writes
Browse files Browse the repository at this point in the history
Release note (enterprise change): Rows replicated by LDR in immediate mode are now factored into the determination of when to recompute SQL table statistics.

Epic: CRDB-40872.
  • Loading branch information
dt committed Oct 25, 2024
1 parent 07624d1 commit 9d42c6c
Show file tree
Hide file tree
Showing 4 changed files with 37 additions and 0 deletions.
1 change: 1 addition & 0 deletions pkg/ccl/crosscluster/logical/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ go_library(
"//pkg/sql/sem/tree",
"//pkg/sql/sessiondata",
"//pkg/sql/sessiondatapb",
"//pkg/sql/stats",
"//pkg/sql/syntheticprivilege",
"//pkg/sql/types",
"//pkg/util/admission/admissionpb",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/rowexec"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
"github.com/cockroachdb/cockroach/pkg/sql/stats"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
"github.com/cockroachdb/cockroach/pkg/util/log"
Expand Down Expand Up @@ -552,6 +553,10 @@ func (lrw *logicalReplicationWriterProcessor) checkpoint(
// have exited based on an error.
return nil
}

for _, p := range lrw.bh {
p.ReportMutations(lrw.FlowCtx.Cfg.StatsRefresher)
}
lrw.metrics.CheckpointEvents.Inc(1)
lrw.debug.RecordCheckpoint(lrw.frontier.Frontier().GoTime())
return nil
Expand Down Expand Up @@ -957,6 +962,7 @@ type BatchHandler interface {
HandleBatch(context.Context, []streampb.StreamEvent_KV) (batchStats, error)
GetLastRow() cdcevent.Row
SetSyntheticFailurePercent(uint32)
ReportMutations(*stats.Refresher)
Close(context.Context)
}

Expand All @@ -968,6 +974,7 @@ type RowProcessor interface {
ProcessRow(context.Context, isql.Txn, roachpb.KeyValue, roachpb.Value) (batchStats, error)
GetLastRow() cdcevent.Row
SetSyntheticFailurePercent(uint32)
ReportMutations(*stats.Refresher)
Close(context.Context)
}

Expand Down Expand Up @@ -1029,6 +1036,10 @@ func (t *txnBatch) SetSyntheticFailurePercent(rate uint32) {
t.rp.SetSyntheticFailurePercent(rate)
}

func (t *txnBatch) ReportMutations(s *stats.Refresher) {
t.rp.ReportMutations(s)
}

func (t *txnBatch) Close(ctx context.Context) {
t.rp.Close(ctx)
}
Expand Down
19 changes: 19 additions & 0 deletions pkg/ccl/crosscluster/logical/lww_kv_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/row"
"github.com/cockroachdb/cockroach/pkg/sql/sem/eval"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/stats"
"github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/errors"
Expand Down Expand Up @@ -114,6 +115,17 @@ func (p *kvRowProcessor) ProcessRow(

}

func (p *kvRowProcessor) ReportMutations(refresher *stats.Refresher) {
for _, w := range p.writers {
if w.unreportedMutations > 0 && w.leased != nil {
if desc := w.leased.Underlying(); desc != nil {
refresher.NotifyMutation(desc.(catalog.TableDescriptor), w.unreportedMutations)
w.unreportedMutations = 0
}
}
}
}

// maxRefreshCount is the maximum number of times we will retry a KV batch that has failed with a
// ConditionFailedError with HadNewerOriginTimetamp=true.
const maxRefreshCount = 10
Expand Down Expand Up @@ -253,6 +265,9 @@ func (p *kvRowProcessor) addToBatch(
}
}

// Note that we should report this mutation to sql stats refresh later.
w.unreportedMutations++

return nil
}

Expand Down Expand Up @@ -317,6 +332,10 @@ type kvTableWriter struct {
ru row.Updater
ri row.Inserter
rd row.Deleter

// Mutations to the table this writer wraps that should be reported to sql
// stats at some point.
unreportedMutations int
}

func newKVTableWriter(
Expand Down
6 changes: 6 additions & 0 deletions pkg/ccl/crosscluster/logical/lww_row_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
"github.com/cockroachdb/cockroach/pkg/sql/sessiondatapb"
"github.com/cockroachdb/cockroach/pkg/sql/stats"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/metamorphic"
Expand Down Expand Up @@ -232,6 +233,11 @@ func makeSQLProcessorFromQuerier(
}, nil
}

// ReportMutations implements the RowProcessor interface, but is a no-op for
// sqlRowProcessor because its mutations are already reported by the queries it
// runs when they are run.
func (sqlRowProcessor) ReportMutations(_ *stats.Refresher) {}

func (*sqlRowProcessor) Close(ctx context.Context) {}

var errInjected = errors.New("injected synthetic error")
Expand Down

0 comments on commit 9d42c6c

Please sign in to comment.