Skip to content

Commit

Permalink
experimental dht: add stored addresses to peerstore temporarily durin…
Browse files Browse the repository at this point in the history
…g GetClosestPeers calls. Only keep the backup addresses for peers found durin a crawl that we actually connected with. Properly clear out peermap between crawls
  • Loading branch information
aschmahmann committed Apr 30, 2021
1 parent 7e79983 commit d543baa
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 9 deletions.
26 changes: 20 additions & 6 deletions crawler/crawler.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,8 @@ type HandleQueryResult func(p peer.ID, rtPeers []*peer.AddrInfo)
// HandleQueryFail is a callback on failed peer query
type HandleQueryFail func(p peer.ID, err error)

const startAddressDur time.Duration = time.Minute * 30

// Run crawls dht peers from an initial seed of `startingPeers`
func (c *Crawler) Run(ctx context.Context, startingPeers []*peer.AddrInfo, handleSuccess HandleQueryResult, handleFail HandleQueryFail) {
jobs := make(chan peer.ID, 1)
Expand All @@ -140,15 +142,27 @@ func (c *Crawler) Run(ctx context.Context, startingPeers []*peer.AddrInfo, handl
defer wg.Wait()
defer close(jobs)

toDial := make([]*peer.AddrInfo, 0, len(startingPeers))
var toDial []*peer.AddrInfo
peersSeen := make(map[peer.ID]struct{})

numSkipped := 0
for _, ai := range startingPeers {
extendAddrs := c.host.Peerstore().Addrs(ai.ID)
if len(ai.Addrs) > 0 {
extendAddrs = append(extendAddrs, ai.Addrs...)
c.host.Peerstore().AddAddrs(ai.ID, extendAddrs, startAddressDur)
}
if len(extendAddrs) == 0 {
numSkipped++
continue
}

toDial = append(toDial, ai)
peersSeen[ai.ID] = struct{}{}
extendAddrs := c.host.Peerstore().Addrs(ai.ID)
extendAddrs = append(extendAddrs, ai.Addrs...)
c.host.Peerstore().AddAddrs(ai.ID, extendAddrs, time.Hour)
}

if numSkipped > 0 {
logger.Infof("%d starting peers were skipped due to lack of addresses. Starting crawl with %d peers", numSkipped, len(toDial))
}

numQueried := 0
Expand All @@ -168,7 +182,7 @@ func (c *Crawler) Run(ctx context.Context, startingPeers []*peer.AddrInfo, handl
logger.Debugf("peer %v had %d peers", res.peer, len(res.data))
rtPeers := make([]*peer.AddrInfo, 0, len(res.data))
for p, ai := range res.data {
c.host.Peerstore().AddAddrs(p, ai.Addrs, time.Hour)
c.host.Peerstore().AddAddrs(p, ai.Addrs, time.Minute*30)
if _, ok := peersSeen[p]; !ok {
peersSeen[p] = struct{}{}
toDial = append(toDial, ai)
Expand Down Expand Up @@ -208,7 +222,7 @@ func (c *Crawler) queryPeer(ctx context.Context, nextPeer peer.ID) *queryResult
defer cancel()
err = c.host.Connect(connCtx, peer.AddrInfo{ID: nextPeer})
if err != nil {
logger.Infof("could not connect to peer %v: %v", nextPeer, err)
logger.Debugf("could not connect to peer %v: %v", nextPeer, err)
return &queryResult{nextPeer, nil, err}
}

Expand Down
35 changes: 32 additions & 3 deletions fullrt/dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,10 @@ func (dht *FullRT) Ready() bool {
return rtSize > len(dht.bootstrapPeers)+1
}

func (dht *FullRT) Host() host.Host {
return dht.h
}

func (dht *FullRT) runCrawler(ctx context.Context) {
t := time.NewTicker(dht.crawlerInterval)

Expand All @@ -233,17 +237,37 @@ func (dht *FullRT) runCrawler(ctx context.Context) {

var addrs []*peer.AddrInfo
dht.peerAddrsLk.Lock()
for k, v := range m {
addrs = append(addrs, &peer.AddrInfo{ID: k, Addrs: v.addrs})
for k := range m {
addrs = append(addrs, &peer.AddrInfo{ID: k}) // Addrs: v.addrs
}

addrs = append(addrs, dht.bootstrapPeers...)
dht.peerAddrsLk.Unlock()

for k := range m {
delete(m, k)
}

start := time.Now()
dht.crawler.Run(ctx, addrs,
func(p peer.ID, rtPeers []*peer.AddrInfo) {
addrs := dht.h.Peerstore().Addrs(p)
conns := dht.h.Network().ConnsToPeer(p)
var addrs []multiaddr.Multiaddr
for _, conn := range conns {
addr := conn.RemoteMultiaddr()
addrs = append(addrs, addr)
}

if len(addrs) == 0 {
logger.Debugf("no connections to %v after successful query. keeping addresses from the peerstore", p)
addrs = dht.h.Peerstore().Addrs(p)
}

keep := kaddht.PublicRoutingTableFilter(dht, p)
if !keep {
return
}

mxLk.Lock()
defer mxLk.Unlock()
m[p] = &crawlVal{
Expand Down Expand Up @@ -356,6 +380,11 @@ func (dht *FullRT) GetClosestPeers(ctx context.Context, key string) ([]peer.ID,
logger.Errorf("key not found in map")
}
dht.kMapLk.RUnlock()
dht.peerAddrsLk.RLock()
peerAddrs := dht.peerAddrs[p]
dht.peerAddrsLk.RUnlock()

dht.h.Peerstore().AddAddrs(p, peerAddrs, peerstore.TempAddrTTL)
peers = append(peers, p)
}
return peers, nil
Expand Down

0 comments on commit d543baa

Please sign in to comment.