From 71598922a0596aee42afd06b048450429a14d3fa Mon Sep 17 00:00:00 2001 From: Adin Schmahmann Date: Thu, 22 Apr 2021 14:58:20 -0700 Subject: [PATCH] Add basic crawler (#663) * feat: added a simple dht crawler Co-authored-by: Will Scott --- crawler/crawler.go | 240 +++++++++++++++++++++++++++++++++++++++++++++ crawler/options.go | 60 ++++++++++++ 2 files changed, 300 insertions(+) create mode 100644 crawler/crawler.go create mode 100644 crawler/options.go diff --git a/crawler/crawler.go b/crawler/crawler.go new file mode 100644 index 000000000..5f4ec7732 --- /dev/null +++ b/crawler/crawler.go @@ -0,0 +1,240 @@ +package crawler + +import ( + "context" + "sync" + "time" + + "github.com/libp2p/go-libp2p-core/host" + "github.com/libp2p/go-libp2p-core/network" + "github.com/libp2p/go-libp2p-core/peer" + "github.com/libp2p/go-libp2p-core/protocol" + + logging "github.com/ipfs/go-log" + "github.com/libp2p/go-msgio/protoio" + + pb "github.com/libp2p/go-libp2p-kad-dht/pb" + kbucket "github.com/libp2p/go-libp2p-kbucket" +) + +var logger = logging.Logger("dht-crawler") + +// Crawler connects to hosts in the DHT to track routing tables of peers. +type Crawler struct { + parallelism int + connectTimeout time.Duration + host host.Host + dhtRPC *pb.ProtocolMessenger +} + +// New creates a new Crawler +func New(host host.Host, opts ...Option) (*Crawler, error) { + o := new(options) + if err := defaults(o); err != nil { + return nil, err + } + for _, opt := range opts { + if err := opt(o); err != nil { + return nil, err + } + } + + pm, err := pb.NewProtocolMessenger(&messageSender{h: host, protocols: o.protocols, timeout: o.perMsgTimeout}) + if err != nil { + return nil, err + } + + return &Crawler{ + parallelism: o.parallelism, + connectTimeout: o.connectTimeout, + host: host, + dhtRPC: pm, + }, nil +} + +// MessageSender handles sending wire protocol messages to a given peer +type messageSender struct { + h host.Host + protocols []protocol.ID + timeout time.Duration +} + +// SendRequest sends a peer a message and waits for its response +func (ms *messageSender) SendRequest(ctx context.Context, p peer.ID, pmes *pb.Message) (*pb.Message, error) { + s, err := ms.h.NewStream(ctx, p, ms.protocols...) + if err != nil { + return nil, err + } + + w := protoio.NewDelimitedWriter(s) + if err := w.WriteMsg(pmes); err != nil { + return nil, err + } + + r := protoio.NewDelimitedReader(s, network.MessageSizeMax) + tctx, cancel := context.WithTimeout(ctx, ms.timeout) + defer cancel() + defer func() { _ = s.Close() }() + + msg := new(pb.Message) + if err := ctxReadMsg(tctx, r, msg); err != nil { + _ = s.Reset() + return nil, err + } + + return msg, nil +} + +func ctxReadMsg(ctx context.Context, rc protoio.ReadCloser, mes *pb.Message) error { + errc := make(chan error, 1) + go func(r protoio.ReadCloser) { + defer close(errc) + err := r.ReadMsg(mes) + errc <- err + }(rc) + + select { + case err := <-errc: + return err + case <-ctx.Done(): + return ctx.Err() + } +} + +// SendMessage sends a peer a message without waiting on a response +func (ms *messageSender) SendMessage(ctx context.Context, p peer.ID, pmes *pb.Message) error { + s, err := ms.h.NewStream(ctx, p, ms.protocols...) + if err != nil { + return err + } + defer func() { _ = s.Close() }() + + w := protoio.NewDelimitedWriter(s) + return w.WriteMsg(pmes) +} + +// HandleQueryResult is a callback on successful peer query +type HandleQueryResult func(p peer.ID, rtPeers []*peer.AddrInfo) + +// HandleQueryFail is a callback on failed peer query +type HandleQueryFail func(p peer.ID, err error) + +// Run crawls dht peers from an initial seed of `startingPeers` +func (c *Crawler) Run(ctx context.Context, startingPeers []*peer.AddrInfo, handleSuccess HandleQueryResult, handleFail HandleQueryFail) { + jobs := make(chan peer.ID, 1) + results := make(chan *queryResult, 1) + + // Start worker goroutines + var wg sync.WaitGroup + wg.Add(c.parallelism) + for i := 0; i < c.parallelism; i++ { + go func() { + defer wg.Done() + for p := range jobs { + res := c.queryPeer(ctx, p) + results <- res + } + }() + } + + defer wg.Wait() + defer close(jobs) + + toDial := make([]*peer.AddrInfo, 0, len(startingPeers)) + peersSeen := make(map[peer.ID]struct{}) + + for _, ai := range startingPeers { + toDial = append(toDial, ai) + peersSeen[ai.ID] = struct{}{} + extendAddrs := c.host.Peerstore().Addrs(ai.ID) + extendAddrs = append(extendAddrs, ai.Addrs...) + c.host.Peerstore().AddAddrs(ai.ID, extendAddrs, time.Hour) + } + + numQueried := 0 + outstanding := 0 + + for len(toDial) > 0 || outstanding > 0 { + var jobCh chan peer.ID + var nextPeerID peer.ID + if len(toDial) > 0 { + jobCh = jobs + nextPeerID = toDial[0].ID + } + + select { + case res := <-results: + if len(res.data) > 0 { + logger.Debugf("peer %v had %d peers", res.peer, len(res.data)) + rtPeers := make([]*peer.AddrInfo, 0, len(res.data)) + for p, ai := range res.data { + c.host.Peerstore().AddAddrs(p, ai.Addrs, time.Hour) + if _, ok := peersSeen[p]; !ok { + peersSeen[p] = struct{}{} + toDial = append(toDial, ai) + } + rtPeers = append(rtPeers, ai) + } + if handleSuccess != nil { + handleSuccess(res.peer, rtPeers) + } + } else if handleFail != nil { + handleFail(res.peer, res.err) + } + outstanding-- + case jobCh <- nextPeerID: + outstanding++ + numQueried++ + toDial = toDial[1:] + logger.Debugf("starting %d out of %d", numQueried, len(peersSeen)) + } + } +} + +type queryResult struct { + peer peer.ID + data map[peer.ID]*peer.AddrInfo + err error +} + +func (c *Crawler) queryPeer(ctx context.Context, nextPeer peer.ID) *queryResult { + tmpRT, err := kbucket.NewRoutingTable(20, kbucket.ConvertPeerID(nextPeer), time.Hour, c.host.Peerstore(), time.Hour, nil) + if err != nil { + logger.Errorf("error creating rt for peer %v : %v", nextPeer, err) + return &queryResult{nextPeer, nil, err} + } + + connCtx, cancel := context.WithTimeout(ctx, c.connectTimeout) + defer cancel() + err = c.host.Connect(connCtx, peer.AddrInfo{ID: nextPeer}) + if err != nil { + logger.Infof("could not connect to peer %v: %v", nextPeer, err) + return &queryResult{nextPeer, nil, err} + } + + localPeers := make(map[peer.ID]*peer.AddrInfo) + var retErr error + for cpl := 0; cpl <= 15; cpl++ { + generatePeer, err := tmpRT.GenRandPeerID(uint(cpl)) + if err != nil { + panic(err) + } + peers, err := c.dhtRPC.GetClosestPeers(ctx, nextPeer, generatePeer) + if err != nil { + logger.Debugf("error finding data on peer %v with cpl %d : %v", nextPeer, cpl, err) + retErr = err + break + } + for _, ai := range peers { + if _, ok := localPeers[ai.ID]; !ok { + localPeers[ai.ID] = ai + } + } + } + + if retErr != nil { + return &queryResult{nextPeer, nil, retErr} + } + + return &queryResult{nextPeer, localPeers, retErr} +} diff --git a/crawler/options.go b/crawler/options.go new file mode 100644 index 000000000..b33175009 --- /dev/null +++ b/crawler/options.go @@ -0,0 +1,60 @@ +package crawler + +import ( + "time" + + "github.com/libp2p/go-libp2p-core/protocol" +) + +// Option DHT Crawler option type. +type Option func(*options) error + +type options struct { + protocols []protocol.ID + parallelism int + connectTimeout time.Duration + perMsgTimeout time.Duration +} + +// defaults are the default crawler options. This option will be automatically +// prepended to any options you pass to the crawler constructor. +var defaults = func(o *options) error { + o.protocols = []protocol.ID{"/ipfs/kad/1.0.0"} + o.parallelism = 1000 + o.connectTimeout = time.Second * 5 + o.perMsgTimeout = time.Second * 5 + + return nil +} + +// WithProtocols defines the ordered set of protocols the crawler will use to talk to other nodes +func WithProtocols(protocols []protocol.ID) Option { + return func(o *options) error { + o.protocols = append([]protocol.ID{}, protocols...) + return nil + } +} + +// WithParallelism defines the number of queries that can be issued in parallel +func WithParallelism(parallelism int) Option { + return func(o *options) error { + o.parallelism = parallelism + return nil + } +} + +// WithMsgTimeout defines the amount of time a single DHT message is allowed to take before it's deemed failed +func WithMsgTimeout(timeout time.Duration) Option { + return func(o *options) error { + o.perMsgTimeout = timeout + return nil + } +} + +// WithConnectTimeout defines the time for peer connection before timing out +func WithConnectTimeout(timeout time.Duration) Option { + return func(o *options) error { + o.connectTimeout = timeout + return nil + } +}