Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
132603: allocatorimpl: vlog on all excl. repl due to catchup conditions  r=sumeerbhola a=kvoli

Previously, we would only `VEventf(ctx, 5, ...)` when a replica was
being excluded due to having a send queue, or not being in
`StateReplicate`.

Now also:
- `V(4)` log when a replica is included due to missing stats.
- `V(6)` log when a replica is included due to passing stats.

Also, stop shadowing the range stat declaration with the replica
stat declaration within the loop.

Part of: #123509
Release note: None

133144: log: add unsafeWrapper in log package r=aa-joshi a=aa-joshi

Previously, we are using `redact.Unsafe()` to mark a particular parameter as
unsafe. We are updating log redaction flow as part of CRDB-37533. This change
introduces `unsafeWrapper` which implements `SafeFormatter`. The wrapper is
introduced so that future work can properly infer whether objects which are
being logged are safe or unsafe. This is a `no-op` change which is necessary in
order to mark redaction operation from within CRDB.

Epic: CRDB-37533
Part of: CRDB-42344
Release note: None

Co-authored-by: Austen McClernon <[email protected]>
Co-authored-by: Akshay Joshi <[email protected]>
  • Loading branch information
3 people committed Oct 24, 2024
3 parents ce067c6 + 2bacf24 + 1869a99 commit 1256394
Show file tree
Hide file tree
Showing 11 changed files with 108 additions and 10 deletions.
1 change: 1 addition & 0 deletions pkg/kv/kvpb/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ go_test(
"//pkg/storage/enginepb",
"//pkg/testutils/echotest",
"//pkg/util/buildutil",
"//pkg/util/encoding",
"//pkg/util/hlc",
"//pkg/util/protoutil",
"//pkg/util/timeutil",
Expand Down
3 changes: 2 additions & 1 deletion pkg/kv/kvpb/string_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/lock"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/testutils/echotest"
"github.com/cockroachdb/cockroach/pkg/util/encoding"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/cockroachdb/errors"
Expand Down Expand Up @@ -85,7 +86,7 @@ func TestReplicaUnavailableError(t *testing.T) {
func TestAmbiguousResultError(t *testing.T) {
ctx := context.Background()

wrapped := errors.Errorf("boom with a %s", redact.Unsafe("secret"))
wrapped := errors.Errorf("boom with a %s", encoding.Unsafe("secret"))
var err error = kvpb.NewAmbiguousResultError(wrapped)
err = errors.DecodeError(ctx, errors.EncodeError(ctx, err))
require.True(t, errors.Is(err, wrapped), "%+v", err)
Expand Down
19 changes: 14 additions & 5 deletions pkg/kv/kvserver/allocator/allocatorimpl/allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -3084,14 +3084,23 @@ func excludeReplicasInNeedOfCatchup(
sendStreamStats(stats)
filled := 0
for _, repl := range replicas {
if stats, ok := stats.ReplicaSendStreamStats(repl.ReplicaID); ok &&
(!stats.IsStateReplicate || stats.HasSendQueue) {
if replicaSendStreamStats, ok := stats.ReplicaSendStreamStats(repl.ReplicaID); ok &&
(!replicaSendStreamStats.IsStateReplicate || replicaSendStreamStats.HasSendQueue) {
log.KvDistribution.VEventf(ctx, 5,
"not considering %s as a potential candidate for a lease transfer "+
"not considering %v as a potential candidate for a lease transfer "+
"because the replica requires catchup: "+
"[is_state_replicate=%v has_send_queue=%v]",
repl, stats.IsStateReplicate, stats.HasSendQueue)
"replica=(%v) range=%v",
repl, replicaSendStreamStats, stats)
continue
} else if ok {
log.KvDistribution.VEventf(ctx, 6,
"replica %v is up-to-date and does not require catchup "+
"replica=(%v) range=%v",
repl, replicaSendStreamStats, stats)
} else {
log.KvDistribution.VEventf(ctx, 4,
"replica %v is not in the send stream stats range=%v",
repl, stats)
}
// We are also not excluding any replicas which weren't included in the
// stats here. If they weren't included it indicates that they were either
Expand Down
1 change: 1 addition & 0 deletions pkg/kv/kvserver/kvflowcontrol/rac2/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ go_library(
"//pkg/util/admission/admissionpb",
"//pkg/util/buildutil",
"//pkg/util/hlc",
"//pkg/util/humanizeutil",
"//pkg/util/log",
"//pkg/util/metric",
"//pkg/util/queue",
Expand Down
37 changes: 36 additions & 1 deletion pkg/kv/kvserver/kvflowcontrol/rac2/range_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb"
"github.com/cockroachdb/cockroach/pkg/util/buildutil"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/humanizeutil"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/errors"
Expand Down Expand Up @@ -194,6 +195,22 @@ type RangeSendStreamStats struct {
internal []ReplicaSendStreamStats
}

func (s *RangeSendStreamStats) String() string {
return redact.StringWithoutMarkers(s)
}

// SafeFormat implements the redact.SafeFormatter interface.
func (s *RangeSendStreamStats) SafeFormat(w redact.SafePrinter, _ rune) {
w.Printf("[")
for i := range s.internal {
if i > 0 {
w.Printf(", ")
}
w.Printf("r%v=(%v)", s.internal[i].ReplicaID, s.internal[i])
}
w.Printf("]")
}

// Clear clears the stats for all replica send streams so that the underlying
// memory can be reused.
func (s *RangeSendStreamStats) Clear() {
Expand Down Expand Up @@ -298,6 +315,15 @@ type ReplicaSendStreamStats struct {
ReplicaSendQueueStats
}

func (rsss ReplicaSendStreamStats) String() string {
return redact.StringWithoutMarkers(rsss)
}

func (rsss ReplicaSendStreamStats) SafeFormat(w redact.SafePrinter, _ rune) {
w.Printf("is_state_replicate=%v has_send_queue=%v %v",
rsss.IsStateReplicate, rsss.HasSendQueue, rsss.ReplicaSendQueueStats)
}

// ReplicaSendQueueStats contains the size and count of the send stream queue
// for a replica.
type ReplicaSendQueueStats struct {
Expand All @@ -308,6 +334,16 @@ type ReplicaSendQueueStats struct {
SendQueueCount int64
}

func (rsqs ReplicaSendQueueStats) String() string {
return redact.StringWithoutMarkers(rsqs)
}

// SafeFormat implements the redact.SafeFormatter interface.
func (rsqs ReplicaSendQueueStats) SafeFormat(w redact.SafePrinter, _ rune) {
w.Printf("send_queue_size=%v / %v entries",
humanizeutil.IBytes(rsqs.SendQueueBytes), rsqs.SendQueueCount)
}

// RaftEvent carries a RACv2-relevant subset of raft state sent to storage.
type RaftEvent struct {
// MsgAppMode is the current mode. This is only relevant on the leader.
Expand Down Expand Up @@ -1455,7 +1491,6 @@ func (rc *rangeController) SendStreamStats(statsToSet *RangeSendStreamStats) {
if len(statsToSet.internal) != 0 {
panic(errors.AssertionFailedf("statsToSet is non-empty %v", statsToSet.internal))
}
statsToSet.Clear()
rc.mu.RLock()
defer rc.mu.RUnlock()

Expand Down
33 changes: 33 additions & 0 deletions pkg/kv/kvserver/kvflowcontrol/rac2/range_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2283,3 +2283,36 @@ func TestConstructRaftEventForReplica(t *testing.T) {
})
}
}

func TestRangeSendStreamStatsString(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

stats := RangeSendStreamStats{
internal: []ReplicaSendStreamStats{
{
IsStateReplicate: false,
HasSendQueue: true,
ReplicaSendQueueStats: ReplicaSendQueueStats{
ReplicaID: 1,
SendQueueCount: 10,
SendQueueBytes: 100,
},
},
{
IsStateReplicate: true,
HasSendQueue: false,
ReplicaSendQueueStats: ReplicaSendQueueStats{
ReplicaID: 2,
SendQueueCount: 0,
SendQueueBytes: 0,
},
},
},
}

require.Equal(t,
"[r1=(is_state_replicate=false has_send_queue=true send_queue_size=100 B / 10 entries), "+
"r2=(is_state_replicate=true has_send_queue=false send_queue_size=0 B / 0 entries)]",
stats.String())
}
2 changes: 1 addition & 1 deletion pkg/sql/inverted/expression.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ func formatSpan(span Span, redactable bool) string {
}
output := fmt.Sprintf("[%s, %s%c", start, end, spanEndOpenOrClosed)
if redactable {
output = string(redact.Sprintf("%s", redact.Unsafe(output)))
output = string(redact.Sprintf("%s", encoding.Unsafe(output)))
}
return output
}
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/opt/cat/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ go_library(
"//pkg/sql/sem/tree",
"//pkg/sql/sessiondata",
"//pkg/sql/types",
"//pkg/util/encoding",
"//pkg/util/treeprinter",
"@com_github_cockroachdb_errors//:errors",
"@com_github_cockroachdb_redact//:redact",
Expand Down
3 changes: 2 additions & 1 deletion pkg/sql/opt/cat/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (

"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/util/encoding"
"github.com/cockroachdb/cockroach/pkg/util/treeprinter"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/redact"
Expand Down Expand Up @@ -324,7 +325,7 @@ func formatFamily(family Family, buf *bytes.Buffer) {
// markRedactable is true.
func MaybeMarkRedactable(unsafe string, markRedactable bool) string {
if markRedactable {
return string(redact.Sprintf("%s", redact.Unsafe(unsafe)))
return string(redact.Sprintf("%s", encoding.Unsafe(unsafe)))
}
return unsafe
}
16 changes: 16 additions & 0 deletions pkg/util/encoding/encoding.go
Original file line number Diff line number Diff line change
Expand Up @@ -3749,3 +3749,19 @@ func BytesPrevish(b []byte, length int) []byte {
copy(buf[bLen:], bytes.Repeat([]byte{0xff}, length-bLen))
return buf
}

// unsafeWrapper is implementation of SafeFormatter. This is used to mark
// arguments as unsafe for redaction. This would make sure that redact.Unsafe() is implementing SafeFormatter interface
// without affecting invocations.
// TODO(aa-joshi): This is a temporary solution to mark arguments as unsafe. We should move/update this into cockroachdb/redact package.
type unsafeWrapper struct {
a any
}

func (uw unsafeWrapper) SafeFormat(w redact.SafePrinter, _ rune) {
w.Print(redact.Unsafe(uw.a))
}

func Unsafe(args any) any {
return unsafeWrapper{a: args}
}
2 changes: 1 addition & 1 deletion pkg/util/log/redact.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ func maybeRedactEntry(payload entryPayload, editor redactEditor) (res entryPaylo

func init() {
// We consider booleans and numeric values to be always safe for
// reporting. A log call can opt out by using redact.Unsafe() around
// reporting. A log call can opt out by using encoding.Unsafe() around
// a value that would be otherwise considered safe.
redact.RegisterSafeType(reflect.TypeOf(true)) // bool
redact.RegisterSafeType(reflect.TypeOf(123)) // int
Expand Down

0 comments on commit 1256394

Please sign in to comment.