Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement SearchValue/GetValue #942

Merged
merged 26 commits into from
Sep 28, 2023
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions v2/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,18 @@ type Backend interface {
// wasn't found or another error if any occurred. key won't contain the
// namespace prefix.
Fetch(ctx context.Context, key string) (any, error)

// Validate validates the given values and returns the index of the "best"
// value or an error and -1 if all values are invalid. If the method is used
// with a single value, it will return 0 and no error if it is valid or an
// error and -1 if it is invalid. For multiple values, it will select the
// "best" value based on user-defined logic and return its index in the
// original values list. If we receive a request for /ipns/$binary_id, the
// key parameter will be set to $binary_id. Decisions about which value is
// the "best" from the given list must be stable. So if there are two
// equally good values, the implementation must always return the same
// index.
dennis-tra marked this conversation as resolved.
Show resolved Hide resolved
Validate(ctx context.Context, key string, values ...any) (int, error)
}

// NewBackendIPNS initializes a new backend for the "ipns" namespace that can
Expand Down
39 changes: 38 additions & 1 deletion v2/backend_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,13 +226,50 @@ func (p *ProvidersBackend) Fetch(ctx context.Context, key string) (any, error) {
out.addProvider(addrInfo, rec.expiry)
}

if len(out.providers) > 0 {
if len(out.providers) == 0 {
return nil, ds.ErrNotFound
dennis-tra marked this conversation as resolved.
Show resolved Hide resolved
} else {
p.cache.Add(qKey.String(), *out)
}

return out, nil
}

// Validate verifies that the given values are of type [peer.AddrInfo]. Then it
// decides based on the number of attached multi addresses which value is
// "better" than the other. If there is a tie, Validate will return the index
// of the earliest occurrence.
func (p *ProvidersBackend) Validate(ctx context.Context, key string, values ...any) (int, error) {
// short circuit if it's just a single value
if len(values) == 1 {
_, ok := values[0].(peer.AddrInfo)
if !ok {
return -1, fmt.Errorf("invalid type %T", values[0])
}
return 0, nil
}

bestIdx := -1
for i, value := range values {
addrInfo, ok := value.(peer.AddrInfo)
if !ok {
continue
}

if bestIdx == -1 {
bestIdx = i
} else if len(values[bestIdx].(peer.AddrInfo).Addrs) < len(addrInfo.Addrs) {
bestIdx = i
}
}

if bestIdx == -1 {
return -1, fmt.Errorf("no value of correct type")
}

return bestIdx, nil
}

// Close is here to implement the [io.Closer] interface. This will get called
// when the [DHT] "shuts down"/closes.
func (p *ProvidersBackend) Close() error {
Expand Down
63 changes: 63 additions & 0 deletions v2/backend_provider_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,13 @@ import (
"github.com/benbjohnson/clock"
ds "github.com/ipfs/go-datastore"
"github.com/libp2p/go-libp2p"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/multiformats/go-multiaddr"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"golang.org/x/exp/slog"

"github.com/libp2p/go-libp2p-kad-dht/v2/internal/kadtest"
)

var devnull = slog.New(slog.NewTextHandler(io.Discard, nil))
Expand Down Expand Up @@ -115,3 +119,62 @@ func TestProvidersBackend_GarbageCollection_lifecycle_thread_safe(t *testing.T)
assert.Nil(t, b.gcCancel)
assert.Nil(t, b.gcDone)
}

func TestProvidersBackend_Validate(t *testing.T) {
ctx := kadtest.CtxShort(t)

b := newBackendProvider(t, nil)

pid := newPeerID(t)
peer1 := peer.AddrInfo{ID: pid, Addrs: make([]multiaddr.Multiaddr, 0)}
peer2 := peer.AddrInfo{ID: pid, Addrs: make([]multiaddr.Multiaddr, 1)}
peer3 := peer.AddrInfo{ID: pid, Addrs: make([]multiaddr.Multiaddr, 2)}

t.Run("no values", func(t *testing.T) {
idx, err := b.Validate(ctx, "some-key")
assert.Error(t, err)
assert.Equal(t, -1, idx)
})

t.Run("nil value", func(t *testing.T) {
idx, err := b.Validate(ctx, "some-key", nil)
assert.Error(t, err)
assert.Equal(t, -1, idx)
})

t.Run("nil values", func(t *testing.T) {
idx, err := b.Validate(ctx, "some-key", nil, nil)
assert.Error(t, err)
assert.Equal(t, -1, idx)
})

t.Run("single valid value", func(t *testing.T) {
idx, err := b.Validate(ctx, "some-key", peer1)
assert.NoError(t, err)
assert.Equal(t, 0, idx)
})

t.Run("increasing better values", func(t *testing.T) {
idx, err := b.Validate(ctx, "some-key", peer1, peer2, peer3)
assert.NoError(t, err)
assert.Equal(t, 2, idx)
})

t.Run("mixed better values", func(t *testing.T) {
idx, err := b.Validate(ctx, "some-key", peer1, peer3, peer2)
assert.NoError(t, err)
assert.Equal(t, 1, idx)
})

t.Run("mixed invalid values", func(t *testing.T) {
idx, err := b.Validate(ctx, "some-key", peer1, nil, peer2, nil)
assert.NoError(t, err)
assert.Equal(t, 2, idx)
})

t.Run("identically good values", func(t *testing.T) {
idx, err := b.Validate(ctx, "some-key", peer1, peer1)
assert.NoError(t, err)
assert.Equal(t, 0, idx)
})
}
52 changes: 52 additions & 0 deletions v2/backend_record.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
"context"
"errors"
"fmt"
"path"
"time"

"github.com/benbjohnson/clock"
Expand Down Expand Up @@ -131,6 +132,57 @@
return rec, nil
}

func (r *RecordBackend) Validate(ctx context.Context, key string, values ...any) (int, error) {
k := "/" + path.Join(r.namespace, key)

// short circuit if it's just a single value
if len(values) == 1 {
data, ok := values[0].([]byte)
if !ok {
return -1, fmt.Errorf("value not byte slice")
}

if err := r.validator.Validate(k, data); err != nil {
return -1, err
}

Check warning on line 147 in v2/backend_record.go

View check run for this annotation

Codecov / codecov/patch

v2/backend_record.go#L146-L147

Added lines #L146 - L147 were not covered by tests

return 0, nil
}

// In case there are invalid values in the slice, we still want to return
// the index in the original list of values. The Select method below will
// return the index of the "best" value in the slice of valid values. This
// slice can have a different length and therefore that method will return
// an index that doesn't match the values slice that's passed into this
// method. origIdx stores the original index
origIdx := map[int]int{}
validValues := [][]byte{}
for i, value := range values {
data, ok := value.([]byte)
if !ok {
continue
}

if err := r.validator.Validate(k, data); err != nil {
continue
}

origIdx[len(validValues)] = i
validValues = append(validValues, data)
}

if len(validValues) == 0 {
return -1, fmt.Errorf("no valid values")
}

sel, err := r.validator.Select(k, validValues)
if err != nil {
return -1, err
}

Check warning on line 181 in v2/backend_record.go

View check run for this annotation

Codecov / codecov/patch

v2/backend_record.go#L180-L181

Added lines #L180 - L181 were not covered by tests

return origIdx[sel], nil
}

// shouldReplaceExistingRecord returns true if the given record should replace any
// existing one in the local datastore. It queries the datastore, unmarshalls
// the record, validates it, and compares it to the incoming record. If the
Expand Down
114 changes: 114 additions & 0 deletions v2/backend_record_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
package dht

import (
"fmt"
"strconv"
"strings"
"testing"

record "github.com/libp2p/go-libp2p-record"
"github.com/stretchr/testify/assert"

"github.com/libp2p/go-libp2p-kad-dht/v2/internal/kadtest"
)

// testValidator is a validator that considers all values valid that have a
// "valid-" prefix. Then the suffix will determine which value is better. For
// example, "valid-2" is better than "valid-1".
type testValidator struct{}

var _ record.Validator = (*testValidator)(nil)

func (t testValidator) Validate(key string, value []byte) error {
if strings.HasPrefix(string(value), "valid-") {
return nil
}
return fmt.Errorf("invalid value")
}

func (t testValidator) Select(key string, values [][]byte) (int, error) {
idx := -1
best := -1
for i, val := range values {
if !strings.HasPrefix(string(val), "valid-") {
continue
}
newBest, err := strconv.Atoi(string(val)[6:])
if err != nil {
continue
}
if newBest > best {
idx = i
best = newBest
}
}

if idx == -1 {
return idx, fmt.Errorf("no valid value")
}

return idx, nil
}

func TestRecordBackend_Validate(t *testing.T) {
ctx := kadtest.CtxShort(t)

b := &RecordBackend{
namespace: "test",
validator: &testValidator{},
}

t.Run("no values", func(t *testing.T) {
idx, err := b.Validate(ctx, "some-key")
assert.Error(t, err)
assert.Equal(t, -1, idx)
})

t.Run("nil value", func(t *testing.T) {
idx, err := b.Validate(ctx, "some-key", nil)
assert.Error(t, err)
assert.Equal(t, -1, idx)
})

t.Run("nil values", func(t *testing.T) {
idx, err := b.Validate(ctx, "some-key", nil, nil)
assert.Error(t, err)
assert.Equal(t, -1, idx)
})

t.Run("single valid value", func(t *testing.T) {
idx, err := b.Validate(ctx, "some-key", []byte("valid-0"))
assert.NoError(t, err)
assert.Equal(t, 0, idx)
})

t.Run("increasing better values", func(t *testing.T) {
idx, err := b.Validate(ctx, "some-key", []byte("valid-0"), []byte("valid-1"), []byte("valid-2"))
assert.NoError(t, err)
assert.Equal(t, 2, idx)
})

t.Run("mixed better values", func(t *testing.T) {
idx, err := b.Validate(ctx, "some-key", []byte("valid-0"), []byte("valid-2"), []byte("valid-1"))
assert.NoError(t, err)
assert.Equal(t, 1, idx)
})

t.Run("mixed invalid values", func(t *testing.T) {
idx, err := b.Validate(ctx, "some-key", []byte("valid-0"), []byte("invalid"), []byte("valid-2"), []byte("invalid"))
assert.NoError(t, err)
assert.Equal(t, 2, idx)
})

t.Run("only invalid values", func(t *testing.T) {
idx, err := b.Validate(ctx, "some-key", []byte("invalid"), nil)
assert.Error(t, err)
assert.Equal(t, -1, idx)
})

t.Run("identically good values", func(t *testing.T) {
idx, err := b.Validate(ctx, "some-key", []byte("valid-0"), []byte("valid-0"))
assert.NoError(t, err)
assert.Equal(t, 0, idx)
})
}
15 changes: 15 additions & 0 deletions v2/backend_trace.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,21 @@ func (t *tracedBackend) Fetch(ctx context.Context, key string) (any, error) {
return result, err
}

func (t *tracedBackend) Validate(ctx context.Context, key string, values ...any) (int, error) {
ctx, span := t.tracer.Start(ctx, "Validate", t.traceAttributes(key))
defer span.End()

idx, err := t.backend.Validate(ctx, key, values...)
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
} else {
span.SetAttributes(attribute.Int("idx", idx))
}

return idx, err
}

// traceAttributes is a helper to build the trace attributes.
func (t *tracedBackend) traceAttributes(key string) trace.SpanStartEventOption {
return trace.WithAttributes(attribute.String("namespace", t.namespace), attribute.String("key", key))
Expand Down
16 changes: 16 additions & 0 deletions v2/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,14 @@ type Config struct {
// used to filter out private addresses.
AddressFilter AddressFilter

// DefaultQuorum specifies the minimum number of identical responses before
// a SearchValue/GetValue operation returns. The responses must not only be
// identical, but the responses must also correspond to the "best" records
// we have observed in the network during the SearchValue/GetValue
// operation. A DefaultQuorum of 0 means that we search the network until
// we have exhausted the keyspace.
DefaultQuorum int // TODO: put on QueryConfig?
dennis-tra marked this conversation as resolved.
Show resolved Hide resolved

// MeterProvider provides access to named Meter instances. It's used to,
// e.g., expose prometheus metrics. Check out the [opentelemetry docs]:
//
Expand Down Expand Up @@ -201,6 +209,7 @@ func DefaultConfig() *Config {
Logger: slog.New(zapslog.NewHandler(logging.Logger("dht").Desugar().Core())),
TimeoutStreamIdle: time.Minute, // MAGIC
AddressFilter: AddrFilterPrivate,
DefaultQuorum: 0,
MeterProvider: otel.GetMeterProvider(),
TracerProvider: otel.GetTracerProvider(),
Query: DefaultQueryConfig(),
Expand Down Expand Up @@ -341,6 +350,13 @@ func (c *Config) Validate() error {
}
}

if c.DefaultQuorum < 0 {
return &ConfigurationError{
Component: "Config",
Err: fmt.Errorf("default quorum must not be negative"),
}
}

return nil
}

Expand Down
Loading
Loading