Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add RoutingDHT that implements the routing.Routing interface #947

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion v2/internal/coord/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,33 +154,33 @@

func NewCoordinator(self kadt.PeerID, rtr coordt.Router[kadt.Key, kadt.PeerID, *pb.Message], rt routing.RoutingTableCpl[kadt.Key, kadt.PeerID], cfg *CoordinatorConfig) (*Coordinator, error) {
if cfg == nil {
cfg = DefaultCoordinatorConfig()

Check warning on line 157 in v2/internal/coord/coordinator.go

View check run for this annotation

Codecov / codecov/patch

v2/internal/coord/coordinator.go#L157

Added line #L157 was not covered by tests
} else if err := cfg.Validate(); err != nil {
return nil, err
}

Check warning on line 160 in v2/internal/coord/coordinator.go

View check run for this annotation

Codecov / codecov/patch

v2/internal/coord/coordinator.go#L159-L160

Added lines #L159 - L160 were not covered by tests

// initialize a new telemetry struct
tele, err := NewTelemetry(cfg.MeterProvider, cfg.TracerProvider)
if err != nil {
return nil, fmt.Errorf("init telemetry: %w", err)
}

Check warning on line 166 in v2/internal/coord/coordinator.go

View check run for this annotation

Codecov / codecov/patch

v2/internal/coord/coordinator.go#L165-L166

Added lines #L165 - L166 were not covered by tests

queryBehaviour, err := NewPooledQueryBehaviour(self, &cfg.Query)
if err != nil {
return nil, fmt.Errorf("query behaviour: %w", err)
}

Check warning on line 171 in v2/internal/coord/coordinator.go

View check run for this annotation

Codecov / codecov/patch

v2/internal/coord/coordinator.go#L170-L171

Added lines #L170 - L171 were not covered by tests

routingBehaviour, err := NewRoutingBehaviour(self, rt, &cfg.Routing)
if err != nil {
return nil, fmt.Errorf("routing behaviour: %w", err)
}

Check warning on line 176 in v2/internal/coord/coordinator.go

View check run for this annotation

Codecov / codecov/patch

v2/internal/coord/coordinator.go#L175-L176

Added lines #L175 - L176 were not covered by tests

networkBehaviour := NewNetworkBehaviour(rtr, cfg.Logger, tele.Tracer)

b, err := brdcst.NewPool[kadt.Key, kadt.PeerID, *pb.Message](self, nil)
if err != nil {
return nil, fmt.Errorf("broadcast: %w", err)
}

Check warning on line 183 in v2/internal/coord/coordinator.go

View check run for this annotation

Codecov / codecov/patch

v2/internal/coord/coordinator.go#L182-L183

Added lines #L182 - L183 were not covered by tests

brdcstBehaviour := NewPooledBroadcastBehaviour(b, cfg.Logger, tele.Tracer)

Expand Down Expand Up @@ -215,8 +215,8 @@
return nil
}

func (c *Coordinator) ID() kadt.PeerID {
return c.self

Check warning on line 219 in v2/internal/coord/coordinator.go

View check run for this annotation

Codecov / codecov/patch

v2/internal/coord/coordinator.go#L218-L219

Added lines #L218 - L219 were not covered by tests
}

func (c *Coordinator) eventLoop(ctx context.Context) {
Expand All @@ -233,8 +233,8 @@
case <-ctx.Done():
// coordinator is closing
return
case <-c.networkBehaviour.Ready():
ev, ok = c.networkBehaviour.Perform(ctx)

Check warning on line 237 in v2/internal/coord/coordinator.go

View check run for this annotation

Codecov / codecov/patch

v2/internal/coord/coordinator.go#L236-L237

Added lines #L236 - L237 were not covered by tests
case <-c.routingBehaviour.Ready():
ev, ok = c.routingBehaviour.Perform(ctx)
case <-c.queryBehaviour.Ready():
Expand All @@ -256,10 +256,10 @@
switch ev := ev.(type) {
case NetworkCommand:
c.networkBehaviour.Notify(ctx, ev)
case QueryCommand:
c.queryBehaviour.Notify(ctx, ev)
case BrdcstCommand:
c.brdcstBehaviour.Notify(ctx, ev)

Check warning on line 262 in v2/internal/coord/coordinator.go

View check run for this annotation

Codecov / codecov/patch

v2/internal/coord/coordinator.go#L259-L262

Added lines #L259 - L262 were not covered by tests
case RoutingCommand:
c.routingBehaviour.Notify(ctx, ev)
case RoutingNotification:
Expand All @@ -267,8 +267,8 @@
rn := c.routingNotifier
c.routingNotifierMu.RUnlock()
rn.Notify(ctx, ev)
default:
panic(fmt.Sprintf("unexpected event: %T", ev))

Check warning on line 271 in v2/internal/coord/coordinator.go

View check run for this annotation

Codecov / codecov/patch

v2/internal/coord/coordinator.go#L270-L271

Added lines #L270 - L271 were not covered by tests
}
}

Expand All @@ -289,8 +289,8 @@

nh, err := c.networkBehaviour.getNodeHandler(ctx, id)
if err != nil {
return nil, err
}

Check warning on line 293 in v2/internal/coord/coordinator.go

View check run for this annotation

Codecov / codecov/patch

v2/internal/coord/coordinator.go#L292-L293

Added lines #L292 - L293 were not covered by tests
return nh, nil
}

Expand All @@ -303,8 +303,8 @@
for _, id := range closest {
nh, err := c.networkBehaviour.getNodeHandler(ctx, id)
if err != nil {
return nil, err
}

Check warning on line 307 in v2/internal/coord/coordinator.go

View check run for this annotation

Codecov / codecov/patch

v2/internal/coord/coordinator.go#L306-L307

Added lines #L306 - L307 were not covered by tests
nodes = append(nodes, nh)
}
return nodes, nil
Expand All @@ -312,14 +312,14 @@

// GetValue requests that the node return any value associated with the supplied key.
// If the node does not have a value for the key it returns ErrValueNotFound.
func (c *Coordinator) GetValue(ctx context.Context, k kadt.Key) (coordt.Value, error) {
panic("not implemented")

Check warning on line 316 in v2/internal/coord/coordinator.go

View check run for this annotation

Codecov / codecov/patch

v2/internal/coord/coordinator.go#L315-L316

Added lines #L315 - L316 were not covered by tests
}

// PutValue requests that the node stores a value to be associated with the supplied key.
// If the node cannot or chooses not to store the value for the key it returns ErrValueNotAccepted.
func (c *Coordinator) PutValue(ctx context.Context, r coordt.Value, q int) error {
panic("not implemented")

Check warning on line 322 in v2/internal/coord/coordinator.go

View check run for this annotation

Codecov / codecov/patch

v2/internal/coord/coordinator.go#L321-L322

Added lines #L321 - L322 were not covered by tests
}

// QueryClosest starts a query that attempts to find the closest nodes to the target key.
Expand All @@ -343,8 +343,8 @@

seeds, err := c.GetClosestNodes(ctx, target, 20)
if err != nil {
return nil, coordt.QueryStats{}, err
}

Check warning on line 347 in v2/internal/coord/coordinator.go

View check run for this annotation

Codecov / codecov/patch

v2/internal/coord/coordinator.go#L346-L347

Added lines #L346 - L347 were not covered by tests

seedIDs := make([]kadt.PeerID, 0, len(seeds))
for _, s := range seeds {
Expand Down Expand Up @@ -383,21 +383,21 @@
ctx, span := c.tele.Tracer.Start(ctx, "Coordinator.QueryMessage")
defer span.End()
if msg == nil {
return coordt.QueryStats{}, fmt.Errorf("no message supplied for query")
return nil, coordt.QueryStats{}, fmt.Errorf("no message supplied for query")
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's this change? Was it not building before? Or is it a merge conflict?

}

Check warning on line 387 in v2/internal/coord/coordinator.go

View check run for this annotation

Codecov / codecov/patch

v2/internal/coord/coordinator.go#L386-L387

Added lines #L386 - L387 were not covered by tests
c.cfg.Logger.Debug("starting query with message", tele.LogAttrKey(msg.Target()), slog.String("type", msg.Type.String()))

ctx, cancel := context.WithCancel(ctx)
defer cancel()

if numResults < 1 {
numResults = 20 // TODO: parameterize
}

Check warning on line 395 in v2/internal/coord/coordinator.go

View check run for this annotation

Codecov / codecov/patch

v2/internal/coord/coordinator.go#L394-L395

Added lines #L394 - L395 were not covered by tests

seeds, err := c.GetClosestNodes(ctx, msg.Target(), numResults)
if err != nil {
return nil, coordt.QueryStats{}, err
}

Check warning on line 400 in v2/internal/coord/coordinator.go

View check run for this annotation

Codecov / codecov/patch

v2/internal/coord/coordinator.go#L399-L400

Added lines #L399 - L400 were not covered by tests

seedIDs := make([]kadt.PeerID, 0, len(seeds))
for _, s := range seeds {
Expand Down Expand Up @@ -427,8 +427,8 @@
ctx, span := c.tele.Tracer.Start(ctx, "Coordinator.BroadcastRecord")
defer span.End()
if msg == nil {
return fmt.Errorf("no message supplied for broadcast")
}

Check warning on line 431 in v2/internal/coord/coordinator.go

View check run for this annotation

Codecov / codecov/patch

v2/internal/coord/coordinator.go#L430-L431

Added lines #L430 - L431 were not covered by tests
c.cfg.Logger.Debug("starting broadcast with message", tele.LogAttrKey(msg.Target()), slog.String("type", msg.Type.String()))

ctx, cancel := context.WithCancel(ctx)
Expand All @@ -436,8 +436,8 @@

seedNodes, err := c.GetClosestNodes(ctx, msg.Target(), 20) // TODO: parameterize
if err != nil {
return err
}

Check warning on line 440 in v2/internal/coord/coordinator.go

View check run for this annotation

Codecov / codecov/patch

v2/internal/coord/coordinator.go#L439-L440

Added lines #L439 - L440 were not covered by tests

seeds := make([]kadt.PeerID, 0, len(seedNodes))
for _, s := range seedNodes {
Expand Down Expand Up @@ -496,8 +496,8 @@
return nil, lastStats, ctx.Err()
case wev, more := <-waiter.Chan():
if !more {
return nil, lastStats, ctx.Err()
}

Check warning on line 500 in v2/internal/coord/coordinator.go

View check run for this annotation

Codecov / codecov/patch

v2/internal/coord/coordinator.go#L499-L500

Added lines #L499 - L500 were not covered by tests
ctx, ev := wev.Ctx, wev.Event
switch ev := ev.(type) {
case *EventQueryProgressed:
Expand All @@ -510,9 +510,9 @@
}
nh, err := c.networkBehaviour.getNodeHandler(ctx, ev.NodeID)
if err != nil {
// ignore unknown node
c.cfg.Logger.Debug("node handler not found", "query_id", queryID, tele.LogAttrError, err)
break

Check warning on line 515 in v2/internal/coord/coordinator.go

View check run for this annotation

Codecov / codecov/patch

v2/internal/coord/coordinator.go#L513-L515

Added lines #L513 - L515 were not covered by tests
}

err = fn(ctx, nh.ID(), ev.Response, lastStats)
Expand All @@ -523,10 +523,10 @@
return nil, lastStats, nil
}
if err != nil {
// user defined error that terminates the query
c.queryBehaviour.Notify(ctx, &EventStopQuery{QueryID: queryID})
return nil, lastStats, err
}

Check warning on line 529 in v2/internal/coord/coordinator.go

View check run for this annotation

Codecov / codecov/patch

v2/internal/coord/coordinator.go#L526-L529

Added lines #L526 - L529 were not covered by tests

case *EventQueryFinished:
// query is done
Expand All @@ -534,8 +534,8 @@
c.cfg.Logger.Debug("query ran to exhaustion", "query_id", queryID, slog.Duration("elapsed", ev.Stats.End.Sub(ev.Stats.Start)), slog.Int("requests", ev.Stats.Requests), slog.Int("failures", ev.Stats.Failure))
return ev.ClosestNodes, lastStats, nil

default:
panic(fmt.Sprintf("unexpected event: %T", ev))

Check warning on line 538 in v2/internal/coord/coordinator.go

View check run for this annotation

Codecov / codecov/patch

v2/internal/coord/coordinator.go#L537-L538

Added lines #L537 - L538 were not covered by tests
}
}
}
Expand All @@ -552,16 +552,16 @@
return nil, nil, ctx.Err()
case wev, more := <-waiter.Chan():
if !more {
return nil, nil, ctx.Err()
}

Check warning on line 556 in v2/internal/coord/coordinator.go

View check run for this annotation

Codecov / codecov/patch

v2/internal/coord/coordinator.go#L555-L556

Added lines #L555 - L556 were not covered by tests

switch ev := wev.Event.(type) {
case *EventQueryProgressed:
case *EventBroadcastFinished:
return ev.Contacted, ev.Errors, nil

default:
panic(fmt.Sprintf("unexpected event: %T", ev))

Check warning on line 564 in v2/internal/coord/coordinator.go

View check run for this annotation

Codecov / codecov/patch

v2/internal/coord/coordinator.go#L563-L564

Added lines #L563 - L564 were not covered by tests
}
}
}
Expand All @@ -575,8 +575,8 @@
defer span.End()
for _, id := range ids {
if id.Equal(c.self) {
// skip self
continue

Check warning on line 579 in v2/internal/coord/coordinator.go

View check run for this annotation

Codecov / codecov/patch

v2/internal/coord/coordinator.go#L578-L579

Added lines #L578 - L579 were not covered by tests
}

c.routingBehaviour.Notify(ctx, &EventAddNode{
Expand Down Expand Up @@ -614,14 +614,14 @@

// NotifyNonConnectivity notifies the coordinator that a peer has failed a connectivity check
// which means it is not connected and/or it doesn't support finding closer nodes
func (c *Coordinator) NotifyNonConnectivity(ctx context.Context, id kadt.PeerID) {
ctx, span := c.tele.Tracer.Start(ctx, "Coordinator.NotifyNonConnectivity")
defer span.End()

c.cfg.Logger.Debug("peer has no connectivity", tele.LogAttrPeerID(id), "source", "notify")
c.routingBehaviour.Notify(ctx, &EventNotifyNonConnectivity{
NodeID: id,
})

Check warning on line 624 in v2/internal/coord/coordinator.go

View check run for this annotation

Codecov / codecov/patch

v2/internal/coord/coordinator.go#L617-L624

Added lines #L617 - L624 were not covered by tests
}

func (c *Coordinator) newOperationID() coordt.QueryID {
Expand Down Expand Up @@ -669,8 +669,8 @@

// wait to be signaled that there is a new event
select {
case <-ctx.Done():
return nil, fmt.Errorf("test deadline exceeded while waiting for event %T", expected)

Check warning on line 673 in v2/internal/coord/coordinator.go

View check run for this annotation

Codecov / codecov/patch

v2/internal/coord/coordinator.go#L672-L673

Added lines #L672 - L673 were not covered by tests
case <-w.signal:
}
}
Expand All @@ -695,8 +695,8 @@

// wait to be signaled that there is a new event
select {
case <-ctx.Done():
return nil, fmt.Errorf("test deadline exceeded while waiting for routing updated event")

Check warning on line 699 in v2/internal/coord/coordinator.go

View check run for this annotation

Codecov / codecov/patch

v2/internal/coord/coordinator.go#L698-L699

Added lines #L698 - L699 were not covered by tests
case <-w.signal:
}
}
Expand All @@ -721,8 +721,8 @@

// wait to be signaled that there is a new event
select {
case <-ctx.Done():
return nil, fmt.Errorf("test deadline exceeded while waiting for routing removed event")

Check warning on line 725 in v2/internal/coord/coordinator.go

View check run for this annotation

Codecov / codecov/patch

v2/internal/coord/coordinator.go#L724-L725

Added lines #L724 - L725 were not covered by tests
case <-w.signal:
}
}
Expand Down
4 changes: 2 additions & 2 deletions v2/query_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func TestRTAdditionOnSuccessfulQuery(t *testing.T) {
require.ErrorIs(t, err, coordt.ErrNodeNotFound)

// // but when d3 queries d2, d1 and d3 discover each other
_, _ = d3.FindPeer(ctx, "something")
_, _ = NewRouting(d3).FindPeer(ctx, "something")
// ignore the error

// d3 should update its routing table to include d1 during the query
Expand Down Expand Up @@ -74,7 +74,7 @@ func TestRTEvictionOnFailedQuery(t *testing.T) {
require.NoError(t, err)

// failed queries should remove the queried peers from the routing table
_, _ = d1.FindPeer(ctx, "test")
_, _ = NewRouting(d1).FindPeer(ctx, "test")

// d1 should update its routing table to remove d2 because of the failure
_, err = top.ExpectRoutingRemoved(ctx, d1, d2.host.ID())
Expand Down
42 changes: 29 additions & 13 deletions v2/routing.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,25 @@
"github.com/libp2p/go-libp2p-kad-dht/v2/pb"
)

var _ routing.Routing = (*DHT)(nil)
// RoutingDHT is a wrapper around the [DHT] struct that implements the
// [routing.Routing] interface. As people have raised concerns about the
// interface, we decided to not "pollute" the DHTs public API surface with
// interface methods that we can already foresee will eventually change.
// Use the [NewRouting] convenience method to create a new RoutingDHT.
type RoutingDHT struct {
*DHT // the wrapped DHT
}

var _ routing.Routing = (*RoutingDHT)(nil)

// NewRouting wraps the given [DHT] in a [RoutingDHT] that implements the
// [routing.Routing] interface. See [RoutingDHT]'s documentation for more
// information.
func NewRouting(d *DHT) *RoutingDHT {
return &RoutingDHT{DHT: d}
}

func (d *DHT) FindPeer(ctx context.Context, id peer.ID) (peer.AddrInfo, error) {
func (d *RoutingDHT) FindPeer(ctx context.Context, id peer.ID) (peer.AddrInfo, error) {
ctx, span := d.tele.Tracer.Start(ctx, "DHT.FindPeer")
defer span.End()

Expand Down Expand Up @@ -64,7 +80,7 @@
return d.host.Peerstore().PeerInfo(foundPeer), nil
}

func (d *DHT) Provide(ctx context.Context, c cid.Cid, brdcst bool) error {
func (d *RoutingDHT) Provide(ctx context.Context, c cid.Cid, brdcst bool) error {
ctx, span := d.tele.Tracer.Start(ctx, "DHT.Provide", otel.WithAttributes(attribute.String("cid", c.String())))
defer span.End()

Expand Down Expand Up @@ -109,13 +125,13 @@
return d.kad.BroadcastRecord(ctx, msg)
}

func (d *DHT) FindProvidersAsync(ctx context.Context, c cid.Cid, count int) <-chan peer.AddrInfo {
func (d *RoutingDHT) FindProvidersAsync(ctx context.Context, c cid.Cid, count int) <-chan peer.AddrInfo {
peerOut := make(chan peer.AddrInfo)
go d.findProvidersAsyncRoutine(ctx, c, count, peerOut)
return peerOut
}

func (d *DHT) findProvidersAsyncRoutine(ctx context.Context, c cid.Cid, count int, out chan<- peer.AddrInfo) {
func (d *RoutingDHT) findProvidersAsyncRoutine(ctx context.Context, c cid.Cid, count int, out chan<- peer.AddrInfo) {
_, span := d.tele.Tracer.Start(ctx, "DHT.findProvidersAsyncRoutine", otel.WithAttributes(attribute.String("cid", c.String()), attribute.Int("count", count)))
defer span.End()

Expand Down Expand Up @@ -147,10 +163,10 @@

ps, ok := stored.(*providerSet)
if !ok {
span.RecordError(err)
d.log.Warn("Stored value is not a provider set", slog.String("cid", c.String()), slog.String("type", fmt.Sprintf("%T", stored)))
return
}

Check warning on line 169 in v2/routing.go

View check run for this annotation

Codecov / codecov/patch

v2/routing.go#L166-L169

Added lines #L166 - L169 were not covered by tests

for _, provider := range ps.providers {
providers[provider.ID] = struct{}{}
Expand Down Expand Up @@ -187,8 +203,8 @@

// actually send the provider information to the user
select {
case <-ctx.Done():
return coordt.ErrSkipRemaining

Check warning on line 207 in v2/routing.go

View check run for this annotation

Codecov / codecov/patch

v2/routing.go#L206-L207

Added lines #L206 - L207 were not covered by tests
case out <- provider:
}

Expand All @@ -205,10 +221,10 @@

_, _, err = d.kad.QueryMessage(ctx, msg, fn, d.cfg.BucketSize)
if err != nil {
span.RecordError(err)
d.log.Warn("Failed querying", slog.String("cid", c.String()), slog.String("err", err.Error()))
return
}

Check warning on line 227 in v2/routing.go

View check run for this annotation

Codecov / codecov/patch

v2/routing.go#L224-L227

Added lines #L224 - L227 were not covered by tests
}

// PutValue satisfies the [routing.Routing] interface and will add the given
Expand All @@ -216,7 +232,7 @@
// format `/$namespace/$binary_id`. Namespace examples are `pk` or `ipns`. To
// identify the closest peers to keyStr, that complete string will be SHA256
// hashed.
func (d *DHT) PutValue(ctx context.Context, keyStr string, value []byte, opts ...routing.Option) error {
func (d *RoutingDHT) PutValue(ctx context.Context, keyStr string, value []byte, opts ...routing.Option) error {
ctx, span := d.tele.Tracer.Start(ctx, "DHT.PutValue")
defer span.End()

Expand Down Expand Up @@ -247,15 +263,15 @@
// finally, find the closest peers to the target key.
err := d.kad.BroadcastRecord(ctx, msg)
if err != nil {
return fmt.Errorf("query error: %w", err)
}

Check warning on line 267 in v2/routing.go

View check run for this annotation

Codecov / codecov/patch

v2/routing.go#L266-L267

Added lines #L266 - L267 were not covered by tests

return nil
}

// putValueLocal stores a value in the local datastore without reaching out to
// the network.
func (d *DHT) putValueLocal(ctx context.Context, key string, value []byte) error {
func (d *RoutingDHT) putValueLocal(ctx context.Context, key string, value []byte) error {
ctx, span := d.tele.Tracer.Start(ctx, "DHT.PutValueLocal")
defer span.End()

Expand All @@ -280,14 +296,14 @@
return nil
}

func (d *DHT) GetValue(ctx context.Context, key string, opts ...routing.Option) ([]byte, error) {
func (d *RoutingDHT) GetValue(ctx context.Context, key string, opts ...routing.Option) ([]byte, error) {
ctx, span := d.tele.Tracer.Start(ctx, "DHT.GetValue")
defer span.End()

valueChan, err := d.SearchValue(ctx, key, opts...)
if err != nil {
return nil, err
}

Check warning on line 306 in v2/routing.go

View check run for this annotation

Codecov / codecov/patch

v2/routing.go#L305-L306

Added lines #L305 - L306 were not covered by tests

var best []byte
for val := range valueChan {
Expand All @@ -307,7 +323,7 @@

// SearchValue will search in the DHT for keyStr. keyStr must have the form
// `/$namespace/$binary_id`
func (d *DHT) SearchValue(ctx context.Context, keyStr string, options ...routing.Option) (<-chan []byte, error) {
func (d *RoutingDHT) SearchValue(ctx context.Context, keyStr string, options ...routing.Option) (<-chan []byte, error) {
_, span := d.tele.Tracer.Start(ctx, "DHT.SearchValue")
defer span.End()

Expand All @@ -330,7 +346,7 @@
val, err := b.Fetch(ctx, path)
if err != nil {
if !errors.Is(err, ds.ErrNotFound) {
return nil, fmt.Errorf("fetch from backend: %w", err)

Check warning on line 349 in v2/routing.go

View check run for this annotation

Codecov / codecov/patch

v2/routing.go#L349

Added line #L349 was not covered by tests
}

if rOpt.Offline {
Expand All @@ -344,8 +360,8 @@

rec, ok := val.(*recpb.Record)
if !ok {
return nil, fmt.Errorf("expected *recpb.Record from backend, got: %T", val)
}

Check warning on line 364 in v2/routing.go

View check run for this annotation

Codecov / codecov/patch

v2/routing.go#L363-L364

Added lines #L363 - L364 were not covered by tests

if rOpt.Offline {
out := make(chan []byte, 1)
Expand All @@ -363,7 +379,7 @@
return out, nil
}

func (d *DHT) searchValueRoutine(ctx context.Context, backend Backend, ns string, path string, ropt *routing.Options, out chan<- []byte) {
func (d *RoutingDHT) searchValueRoutine(ctx context.Context, backend Backend, ns string, path string, ropt *routing.Options, out chan<- []byte) {
_, span := d.tele.Tracer.Start(ctx, "DHT.searchValueRoutine")
defer span.End()
defer close(out)
Expand Down Expand Up @@ -396,8 +412,8 @@
}

if !bytes.Equal(routingKey, rec.GetKey()) {
return nil
}

Check warning on line 416 in v2/routing.go

View check run for this annotation

Codecov / codecov/patch

v2/routing.go#L415-L416

Added lines #L415 - L416 were not covered by tests

idx, _ := backend.Validate(ctx, path, best, rec.GetValue())
switch idx {
Expand All @@ -424,8 +440,8 @@
case -1: // "best" and rec.GetValue() are both invalid
return nil

default:
d.log.Warn("unexpected validate index", slog.Int("idx", idx))

Check warning on line 444 in v2/routing.go

View check run for this annotation

Codecov / codecov/patch

v2/routing.go#L443-L444

Added lines #L443 - L444 were not covered by tests
}

// Check if we have reached the quorum
Expand All @@ -438,7 +454,7 @@

_, _, err := d.kad.QueryMessage(ctx, req, fn, d.cfg.BucketSize)
if err != nil {
d.logErr(err, "Search value query failed")
d.warnErr(err, "Search value query failed")
return
}

Expand Down Expand Up @@ -487,7 +503,7 @@

// getQuorum extracts the quorum value from the given routing options and
// returns [Config.DefaultQuorum] if no quorum value is present.
func (d *DHT) getQuorum(opts *routing.Options) int {
func (d *RoutingDHT) getQuorum(opts *routing.Options) int {
quorum, ok := opts.Other[quorumOptionKey{}].(int)
if !ok {
quorum = d.cfg.Query.DefaultQuorum
Expand All @@ -496,7 +512,7 @@
return quorum
}

func (d *DHT) Bootstrap(ctx context.Context) error {
func (d *RoutingDHT) Bootstrap(ctx context.Context) error {
ctx, span := d.tele.Tracer.Start(ctx, "DHT.Bootstrap")
defer span.End()
d.log.Info("Starting bootstrap")
Expand Down
Loading
Loading