diff --git a/main.go b/main.go index e6ae228..3c5fa94 100644 --- a/main.go +++ b/main.go @@ -71,6 +71,11 @@ func main() { Value: false, Usage: "If using an Amino DHT client should the libp2p host be shared with the data downloading host", }, + &cli.StringFlag{ + Name: "dht-fallback-type", + Value: "combined", + Usage: "the type of Amino client to be used as a fallback (standard, accelerated, combined)", + }, } app.Name = "rainbow" diff --git a/setup.go b/setup.go index f4c30ad..05b86a0 100644 --- a/setup.go +++ b/setup.go @@ -17,6 +17,7 @@ import ( "github.com/ipfs/boxo/namesys" routingv1client "github.com/ipfs/boxo/routing/http/client" httpcontentrouter "github.com/ipfs/boxo/routing/http/contentrouter" + "github.com/ipfs/go-cid" "github.com/ipfs/go-datastore" flatfs "github.com/ipfs/go-ds-flatfs" levelds "github.com/ipfs/go-ds-leveldb" @@ -30,6 +31,7 @@ import ( "github.com/libp2p/go-libp2p/core/crypto" "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/metrics" + "github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/routing" "github.com/libp2p/go-libp2p/p2p/net/connmgr" "github.com/multiformats/go-multiaddr" @@ -59,6 +61,14 @@ type Node struct { bwc *metrics.BandwidthCounter } +type DHTType int + +const ( + Combined DHTType = iota + Standard + Accelerated +) + type Config struct { ListenAddrs []string AnnounceAddrs []string @@ -76,6 +86,7 @@ type Config struct { RoutingV1 string KuboRPCURLs []string DHTSharedHost bool + DHTType DHTType } func Setup(ctx context.Context, cfg *Config) (*Node, error) { @@ -165,18 +176,49 @@ func Setup(ctx context.Context, cfg *Config) (*Node, error) { } } - fullRTClient, err := fullrt.NewFullRT(dhtHost, dht.DefaultPrefix, - fullrt.DHTOption( - dht.Validator(record.NamespacedValidator{ - "pk": record.PublicKeyValidator{}, - "ipns": ipns.Validator{KeyBook: h.Peerstore()}, - }), + var standardClient *dht.IpfsDHT + var fullRTClient *fullrt.FullRT + + if cfg.DHTType == Combined || cfg.DHTType == Standard { + standardClient, err = dht.New(ctx, dhtHost, dht.Datastore(memDS), dht.BootstrapPeers(dht.GetDefaultBootstrapPeerAddrInfos()...), - dht.BucketSize(20), - )) - if err != nil { - return nil, err + dht.Mode(dht.ModeClient), + ) + if err != nil { + return nil, err + } + } + + if cfg.DHTType == Combined || cfg.DHTType == Accelerated { + fullRTClient, err = fullrt.NewFullRT(dhtHost, dht.DefaultPrefix, + fullrt.DHTOption( + dht.Validator(record.NamespacedValidator{ + "pk": record.PublicKeyValidator{}, + "ipns": ipns.Validator{KeyBook: h.Peerstore()}, + }), + dht.Datastore(memDS), + dht.BootstrapPeers(dht.GetDefaultBootstrapPeerAddrInfos()...), + dht.BucketSize(20), + )) + if err != nil { + return nil, err + } + } + + var dhtRouter routing.Routing + switch cfg.DHTType { + case Combined: + dhtRouter = &bundledDHT{ + standard: standardClient, + fullRT: fullRTClient, + } + case Standard: + dhtRouter = standardClient + case Accelerated: + dhtRouter = fullRTClient + default: + return nil, fmt.Errorf("unsupported DHT type") } // we want to also use the default HTTP routers, so wrap the FullRT client @@ -187,7 +229,7 @@ func Setup(ctx context.Context, cfg *Config) (*Node, error) { } routers := []*routinghelpers.ParallelRouter{ { - Router: fullRTClient, + Router: dhtRouter, ExecuteAfter: 0, DoNotWaitForSearchValue: true, IgnoreError: false, @@ -242,6 +284,48 @@ func Setup(ctx context.Context, cfg *Config) (*Node, error) { }, nil } +type bundledDHT struct { + standard *dht.IpfsDHT + fullRT *fullrt.FullRT +} + +func (b *bundledDHT) getDHT() routing.Routing { + if b.fullRT.Ready() { + return b.fullRT + } + return b.standard +} + +func (b *bundledDHT) Provide(ctx context.Context, c cid.Cid, brdcst bool) error { + return b.getDHT().Provide(ctx, c, brdcst) +} + +func (b *bundledDHT) FindProvidersAsync(ctx context.Context, c cid.Cid, i int) <-chan peer.AddrInfo { + return b.getDHT().FindProvidersAsync(ctx, c, i) +} + +func (b *bundledDHT) FindPeer(ctx context.Context, id peer.ID) (peer.AddrInfo, error) { + return b.getDHT().FindPeer(ctx, id) +} + +func (b *bundledDHT) PutValue(ctx context.Context, k string, v []byte, option ...routing.Option) error { + return b.getDHT().PutValue(ctx, k, v, option...) +} + +func (b *bundledDHT) GetValue(ctx context.Context, s string, option ...routing.Option) ([]byte, error) { + return b.getDHT().GetValue(ctx, s, option...) +} + +func (b *bundledDHT) SearchValue(ctx context.Context, s string, option ...routing.Option) (<-chan []byte, error) { + return b.getDHT().SearchValue(ctx, s, option...) +} + +func (b *bundledDHT) Bootstrap(ctx context.Context) error { + return b.standard.Bootstrap(ctx) +} + +var _ routing.Routing = (*bundledDHT)(nil) + func delegatedHTTPContentRouter(endpoint string, rv1Opts ...routingv1client.Option) (routing.Routing, error) { // Increase per-host connection pool since we are making lots of concurrent requests. transport := http.DefaultTransport.(*http.Transport).Clone()