Skip to content

Commit

Permalink
add configurable dht fallback type
Browse files Browse the repository at this point in the history
  • Loading branch information
aschmahmann committed Oct 6, 2023
1 parent 00fdfa1 commit fbe11ac
Show file tree
Hide file tree
Showing 2 changed files with 100 additions and 11 deletions.
5 changes: 5 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,11 @@ func main() {
Value: false,
Usage: "If using an Amino DHT client should the libp2p host be shared with the data downloading host",
},
&cli.StringFlag{
Name: "dht-fallback-type",
Value: "combined",
Usage: "the type of Amino client to be used as a fallback (standard, accelerated, combined)",

Check warning on line 77 in main.go

View check run for this annotation

Codecov / codecov/patch

main.go#L66-L77

Added lines #L66 - L77 were not covered by tests
},
}

app.Name = "rainbow"
Expand Down
106 changes: 95 additions & 11 deletions setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/ipfs/boxo/namesys"
routingv1client "github.com/ipfs/boxo/routing/http/client"
httpcontentrouter "github.com/ipfs/boxo/routing/http/contentrouter"
"github.com/ipfs/go-cid"
"github.com/ipfs/go-datastore"
flatfs "github.com/ipfs/go-ds-flatfs"
levelds "github.com/ipfs/go-ds-leveldb"
Expand All @@ -30,6 +31,7 @@ import (
"github.com/libp2p/go-libp2p/core/crypto"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/metrics"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/routing"
"github.com/libp2p/go-libp2p/p2p/net/connmgr"
"github.com/multiformats/go-multiaddr"
Expand Down Expand Up @@ -59,6 +61,14 @@ type Node struct {
bwc *metrics.BandwidthCounter
}

type DHTType int

const (
Combined DHTType = iota
Standard
Accelerated
)

type Config struct {
ListenAddrs []string
AnnounceAddrs []string
Expand All @@ -76,6 +86,7 @@ type Config struct {
RoutingV1 string
KuboRPCURLs []string
DHTSharedHost bool
DHTType DHTType
}

func Setup(ctx context.Context, cfg *Config) (*Node, error) {
Expand Down Expand Up @@ -165,18 +176,49 @@ func Setup(ctx context.Context, cfg *Config) (*Node, error) {
}

Check warning on line 176 in setup.go

View check run for this annotation

Codecov / codecov/patch

setup.go#L164-L176

Added lines #L164 - L176 were not covered by tests
}

fullRTClient, err := fullrt.NewFullRT(dhtHost, dht.DefaultPrefix,
fullrt.DHTOption(
dht.Validator(record.NamespacedValidator{
"pk": record.PublicKeyValidator{},
"ipns": ipns.Validator{KeyBook: h.Peerstore()},
}),
var standardClient *dht.IpfsDHT
var fullRTClient *fullrt.FullRT

if cfg.DHTType == Combined || cfg.DHTType == Standard {
standardClient, err = dht.New(ctx, dhtHost,
dht.Datastore(memDS),
dht.BootstrapPeers(dht.GetDefaultBootstrapPeerAddrInfos()...),
dht.BucketSize(20),
))
if err != nil {
return nil, err
dht.Mode(dht.ModeClient),
)
if err != nil {
return nil, err
}

Check warning on line 190 in setup.go

View check run for this annotation

Codecov / codecov/patch

setup.go#L179-L190

Added lines #L179 - L190 were not covered by tests
}

if cfg.DHTType == Combined || cfg.DHTType == Accelerated {
fullRTClient, err = fullrt.NewFullRT(dhtHost, dht.DefaultPrefix,
fullrt.DHTOption(
dht.Validator(record.NamespacedValidator{
"pk": record.PublicKeyValidator{},
"ipns": ipns.Validator{KeyBook: h.Peerstore()},
}),
dht.Datastore(memDS),
dht.BootstrapPeers(dht.GetDefaultBootstrapPeerAddrInfos()...),
dht.BucketSize(20),
))
if err != nil {
return nil, err
}

Check warning on line 206 in setup.go

View check run for this annotation

Codecov / codecov/patch

setup.go#L193-L206

Added lines #L193 - L206 were not covered by tests
}

var dhtRouter routing.Routing
switch cfg.DHTType {
case Combined:
dhtRouter = &bundledDHT{
standard: standardClient,
fullRT: fullRTClient,
}
case Standard:
dhtRouter = standardClient
case Accelerated:
dhtRouter = fullRTClient
default:
return nil, fmt.Errorf("unsupported DHT type")

Check warning on line 221 in setup.go

View check run for this annotation

Codecov / codecov/patch

setup.go#L209-L221

Added lines #L209 - L221 were not covered by tests
}

// we want to also use the default HTTP routers, so wrap the FullRT client
Expand All @@ -187,7 +229,7 @@ func Setup(ctx context.Context, cfg *Config) (*Node, error) {
}
routers := []*routinghelpers.ParallelRouter{
{
Router: fullRTClient,
Router: dhtRouter,
ExecuteAfter: 0,
DoNotWaitForSearchValue: true,
IgnoreError: false,
Expand Down Expand Up @@ -242,6 +284,48 @@ func Setup(ctx context.Context, cfg *Config) (*Node, error) {
}, nil
}

type bundledDHT struct {
standard *dht.IpfsDHT
fullRT *fullrt.FullRT
}

func (b *bundledDHT) getDHT() routing.Routing {
if b.fullRT.Ready() {
return b.fullRT
}
return b.standard

Check warning on line 296 in setup.go

View check run for this annotation

Codecov / codecov/patch

setup.go#L292-L296

Added lines #L292 - L296 were not covered by tests
}

func (b *bundledDHT) Provide(ctx context.Context, c cid.Cid, brdcst bool) error {
return b.getDHT().Provide(ctx, c, brdcst)

Check warning on line 300 in setup.go

View check run for this annotation

Codecov / codecov/patch

setup.go#L299-L300

Added lines #L299 - L300 were not covered by tests
}

func (b *bundledDHT) FindProvidersAsync(ctx context.Context, c cid.Cid, i int) <-chan peer.AddrInfo {
return b.getDHT().FindProvidersAsync(ctx, c, i)

Check warning on line 304 in setup.go

View check run for this annotation

Codecov / codecov/patch

setup.go#L303-L304

Added lines #L303 - L304 were not covered by tests
}

func (b *bundledDHT) FindPeer(ctx context.Context, id peer.ID) (peer.AddrInfo, error) {
return b.getDHT().FindPeer(ctx, id)

Check warning on line 308 in setup.go

View check run for this annotation

Codecov / codecov/patch

setup.go#L307-L308

Added lines #L307 - L308 were not covered by tests
}

func (b *bundledDHT) PutValue(ctx context.Context, k string, v []byte, option ...routing.Option) error {
return b.getDHT().PutValue(ctx, k, v, option...)

Check warning on line 312 in setup.go

View check run for this annotation

Codecov / codecov/patch

setup.go#L311-L312

Added lines #L311 - L312 were not covered by tests
}

func (b *bundledDHT) GetValue(ctx context.Context, s string, option ...routing.Option) ([]byte, error) {
return b.getDHT().GetValue(ctx, s, option...)

Check warning on line 316 in setup.go

View check run for this annotation

Codecov / codecov/patch

setup.go#L315-L316

Added lines #L315 - L316 were not covered by tests
}

func (b *bundledDHT) SearchValue(ctx context.Context, s string, option ...routing.Option) (<-chan []byte, error) {
return b.getDHT().SearchValue(ctx, s, option...)

Check warning on line 320 in setup.go

View check run for this annotation

Codecov / codecov/patch

setup.go#L319-L320

Added lines #L319 - L320 were not covered by tests
}

func (b *bundledDHT) Bootstrap(ctx context.Context) error {
return b.standard.Bootstrap(ctx)

Check warning on line 324 in setup.go

View check run for this annotation

Codecov / codecov/patch

setup.go#L323-L324

Added lines #L323 - L324 were not covered by tests
}

var _ routing.Routing = (*bundledDHT)(nil)

func delegatedHTTPContentRouter(endpoint string, rv1Opts ...routingv1client.Option) (routing.Routing, error) {
// Increase per-host connection pool since we are making lots of concurrent requests.
transport := http.DefaultTransport.(*http.Transport).Clone()
Expand Down

0 comments on commit fbe11ac

Please sign in to comment.