Skip to content

Commit

Permalink
go fmt + prov fix
Browse files Browse the repository at this point in the history
  • Loading branch information
aschmahmann committed Apr 13, 2021
1 parent 2247085 commit 5305ca3
Show file tree
Hide file tree
Showing 2 changed files with 83 additions and 74 deletions.
153 changes: 81 additions & 72 deletions fullrt/dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,8 @@ type FullRT struct {
h host.Host

crawlerInterval time.Duration
crawler *crawler.Crawler
protoMessenger *dht_pb.ProtocolMessenger
crawler *crawler.Crawler
protoMessenger *dht_pb.ProtocolMessenger

filterFromTable kaddht.QueryFilterFunc
rtLk sync.RWMutex
Expand All @@ -74,15 +74,15 @@ type FullRT struct {

triggerRefresh chan struct{}

waitFrac float64
waitFrac float64
timeoutPerOp time.Duration

provideManyParallelism int
}

// NewFullRT creates a DHT client that tracks the full network. It takes a protocol prefix for the given network,
// For example, the protocol /ipfs/kad/1.0.0 has the prefix /ipfs.
func NewFullRT(ctx context.Context, h host.Host, protocolPrefix protocol.ID, opts ... Option) (*FullRT, error) {
func NewFullRT(ctx context.Context, h host.Host, protocolPrefix protocol.ID, opts ...Option) (*FullRT, error) {
cfg := &config{}
for _, o := range opts {
if err := o(cfg); err != nil {
Expand Down Expand Up @@ -133,7 +133,7 @@ func NewFullRT(ctx context.Context, h host.Host, protocolPrefix protocol.ID, opt

triggerRefresh: make(chan struct{}),

waitFrac: 0.3,
waitFrac: 0.3,
timeoutPerOp: 5 * time.Second,

crawlerInterval: time.Minute * 60,
Expand Down Expand Up @@ -748,6 +748,7 @@ func (dht *FullRT) Provide(ctx context.Context, key cid.Cid, brdcst bool) (err e

func (dht *FullRT) execOnMany(ctx context.Context, fn func(context.Context, peer.ID) error, peers []peer.ID) int {
putctx, cancel := context.WithCancel(ctx)
defer cancel()

waitAllCh := make(chan struct{}, len(peers))
numSuccessfulToWaitFor := int(float64(len(peers)) * dht.waitFrac)
Expand Down Expand Up @@ -790,7 +791,7 @@ func (dht *FullRT) ProvideMany(ctx context.Context, keys []multihash.Multihash)
return routing.ErrNotSupported
}

keysAsPeerIDs := make([]peer.ID, len(keys))
keysAsPeerIDs := make([]peer.ID, 0, len(keys))
for _, k := range keys {
keysAsPeerIDs = append(keysAsPeerIDs, peer.ID(k))
}
Expand All @@ -814,19 +815,28 @@ func (dht *FullRT) ProvideMany(ctx context.Context, keys []multihash.Multihash)

wg := sync.WaitGroup{}
wg.Add(dht.provideManyParallelism)
chunkSize := len(sortedKeys)/dht.provideManyParallelism
chunkSize := len(sortedKeys) / dht.provideManyParallelism
for i := 0; i < dht.provideManyParallelism; i++ {
var chunk []peer.ID
if i == dht.provideManyParallelism - 1 {
end := (i + 1) * chunkSize
if end > len(sortedKeys) {
chunk = sortedKeys[i*chunkSize:]
} else {
chunk = sortedKeys[i*chunkSize:i*(chunkSize+1)]
chunk = sortedKeys[i*chunkSize : end]
}

loopIndex := i

go func() {
defer wg.Done()
for _, key := range chunk {
for ki, key := range chunk {
if loopIndex == 0 {
if ki%100 == 0 {
logger.Infof("reprovide goroutine: %v pct done - %d/%d done - %d total", (ki*100)/len(chunk), ki, len(chunk), len(sortedKeys))
}
}
if err := fn(key); err != nil {
logger.Infow("failed to complete provide of key :%v. %w", internal.LoggableProviderRecordBytes(key), err)
logger.Infof("failed to complete provide of key :%v. %v", internal.LoggableProviderRecordBytes(key), err)
} else {
atomic.CompareAndSwapUint64(&anyProvidesSuccessful, 0, 1)
}
Expand Down Expand Up @@ -920,49 +930,48 @@ func (dht *FullRT) findProvidersAsyncRoutine(ctx context.Context, key multihash.
queryctx, cancelquery := context.WithCancel(ctx)
defer cancelquery()


fn := func(ctx context.Context, p peer.ID) error {
// For DHT query command
routing.PublishQueryEvent(ctx, &routing.QueryEvent{
Type: routing.SendingQuery,
ID: p,
})
// For DHT query command
routing.PublishQueryEvent(ctx, &routing.QueryEvent{
Type: routing.SendingQuery,
ID: p,
})

provs, closest, err := dht.protoMessenger.GetProviders(ctx, p, key)
if err != nil {
return err
}
provs, closest, err := dht.protoMessenger.GetProviders(ctx, p, key)
if err != nil {
return err
}

logger.Debugf("%d provider entries", len(provs))

// Add unique providers from request, up to 'count'
for _, prov := range provs {
dht.maybeAddAddrs(prov.ID, prov.Addrs, peerstore.TempAddrTTL)
logger.Debugf("got provider: %s", prov)
if ps.TryAdd(prov.ID) {
logger.Debugf("using provider: %s", prov)
select {
case peerOut <- *prov:
case <-ctx.Done():
logger.Debug("context timed out sending more providers")
return ctx.Err()
}
}
if !findAll && ps.Size() >= count {
logger.Debugf("got enough providers (%d/%d)", ps.Size(), count)
cancelquery()
return nil
logger.Debugf("%d provider entries", len(provs))

// Add unique providers from request, up to 'count'
for _, prov := range provs {
dht.maybeAddAddrs(prov.ID, prov.Addrs, peerstore.TempAddrTTL)
logger.Debugf("got provider: %s", prov)
if ps.TryAdd(prov.ID) {
logger.Debugf("using provider: %s", prov)
select {
case peerOut <- *prov:
case <-ctx.Done():
logger.Debug("context timed out sending more providers")
return ctx.Err()
}
}
if !findAll && ps.Size() >= count {
logger.Debugf("got enough providers (%d/%d)", ps.Size(), count)
cancelquery()
return nil
}
}

// Give closer peers back to the query to be queried
logger.Debugf("got closer peers: %d %s", len(closest), closest)
// Give closer peers back to the query to be queried
logger.Debugf("got closer peers: %d %s", len(closest), closest)

routing.PublishQueryEvent(ctx, &routing.QueryEvent{
Type: routing.PeerResponse,
ID: p,
Responses: closest,
})
routing.PublishQueryEvent(ctx, &routing.QueryEvent{
Type: routing.PeerResponse,
ID: p,
Responses: closest,
})
return nil
}

Expand Down Expand Up @@ -1024,36 +1033,36 @@ func (dht *FullRT) FindPeer(ctx context.Context, id peer.ID) (_ peer.AddrInfo, e
}()

fn := func(ctx context.Context, p peer.ID) error {
// For DHT query command
routing.PublishQueryEvent(ctx, &routing.QueryEvent{
Type: routing.SendingQuery,
ID: p,
})
// For DHT query command
routing.PublishQueryEvent(ctx, &routing.QueryEvent{
Type: routing.SendingQuery,
ID: p,
})

peers, err := dht.protoMessenger.GetClosestPeers(ctx, p, id)
if err != nil {
logger.Debugf("error getting closer peers: %s", err)
return err
}
peers, err := dht.protoMessenger.GetClosestPeers(ctx, p, id)
if err != nil {
logger.Debugf("error getting closer peers: %s", err)
return err
}

// For DHT query command
routing.PublishQueryEvent(ctx, &routing.QueryEvent{
Type: routing.PeerResponse,
ID: p,
Responses: peers,
})
// For DHT query command
routing.PublishQueryEvent(ctx, &routing.QueryEvent{
Type: routing.PeerResponse,
ID: p,
Responses: peers,
})

for _, a := range peers {
if a.ID == id {
select {
case addrsCh <- a:
case <-ctx.Done():
return ctx.Err()
}
return nil
for _, a := range peers {
if a.ID == id {
select {
case addrsCh <- a:
case <-ctx.Done():
return ctx.Err()
}
return nil
}
return nil
}
return nil
}

dht.execOnMany(queryctx, fn, peers)
Expand Down
4 changes: 2 additions & 2 deletions fullrt/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ import (
)

type config struct {
validator record.Validator
datastore ds.Batching
validator record.Validator
datastore ds.Batching
bootstrapPeers []peer.AddrInfo
}

Expand Down

0 comments on commit 5305ca3

Please sign in to comment.