Skip to content

Commit

Permalink
dht: switch GetClosestPeers to return a slice of peers instead of a c…
Browse files Browse the repository at this point in the history
…hannel
  • Loading branch information
aschmahmann committed Apr 30, 2021
1 parent d543baa commit ee4a44e
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 22 deletions.
18 changes: 8 additions & 10 deletions dht_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1483,7 +1483,7 @@ func testFindPeerQuery(t *testing.T,
require.NoError(t, err)

var outpeers []peer.ID
for p := range out {
for _, p := range out {
outpeers = append(outpeers, p)
}

Expand Down Expand Up @@ -1521,7 +1521,7 @@ func TestFindClosestPeers(t *testing.T) {
}

var out []peer.ID
for p := range peers {
for _, p := range peers {
out = append(out, p)
}

Expand Down Expand Up @@ -2112,18 +2112,16 @@ func TestPreconnectedNodes(t *testing.T) {
}

// See if it works
peerCh, err := d2.GetClosestPeers(ctx, "testkey")
peers, err := d2.GetClosestPeers(ctx, "testkey")
if err != nil {
t.Fatal(err)
}

select {
case p := <-peerCh:
if p == h1.ID() {
break
}
if len(peers) != 1 {
t.Fatal("why is there more than one peer?")
}

if peers[0] != h1.ID() {
t.Fatal("could not find peer")
case <-ctx.Done():
t.Fatal("test hung")
}
}
11 changes: 2 additions & 9 deletions lookup.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import (
//
// If the context is canceled, this function will return the context error along
// with the closest K peers it has found so far.
func (dht *IpfsDHT) GetClosestPeers(ctx context.Context, key string) (<-chan peer.ID, error) {
func (dht *IpfsDHT) GetClosestPeers(ctx context.Context, key string) ([]peer.ID, error) {
if key == "" {
return nil, fmt.Errorf("can't lookup empty key")
}
Expand Down Expand Up @@ -51,17 +51,10 @@ func (dht *IpfsDHT) GetClosestPeers(ctx context.Context, key string) (<-chan pee
return nil, err
}

out := make(chan peer.ID, dht.bucketSize)
defer close(out)

for _, p := range lookupRes.peers {
out <- p
}

if ctx.Err() == nil && lookupRes.completed {
// refresh the cpl for this key as the query was successful
dht.routingTable.ResetCplRefreshedAtForID(kb.ConvertKey(key), time.Now())
}

return out, ctx.Err()
return lookupRes.peers, ctx.Err()
}
6 changes: 3 additions & 3 deletions routing.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,13 +65,13 @@ func (dht *IpfsDHT) PutValue(ctx context.Context, key string, value []byte, opts
return err
}

pchan, err := dht.GetClosestPeers(ctx, key)
peers, err := dht.GetClosestPeers(ctx, key)
if err != nil {
return err
}

wg := sync.WaitGroup{}
for p := range pchan {
for _, p := range peers {
wg.Add(1)
go func(p peer.ID) {
ctx, cancel := context.WithCancel(ctx)
Expand Down Expand Up @@ -446,7 +446,7 @@ func (dht *IpfsDHT) Provide(ctx context.Context, key cid.Cid, brdcst bool) (err
}

wg := sync.WaitGroup{}
for p := range peers {
for _, p := range peers {
wg.Add(1)
go func(p peer.ID) {
defer wg.Done()
Expand Down

0 comments on commit ee4a44e

Please sign in to comment.