Skip to content

Commit

Permalink
feat!: independent dht and routing v1 flags (#113)
Browse files Browse the repository at this point in the history
- Independent DHT and Routing V1 flags
- Fixed typo in 'dht-shared-host' flag
- Uses StringsSlice instead of custom comma-separated code
  • Loading branch information
hacdias authored Apr 9, 2024
1 parent 19723fe commit 51c9228
Show file tree
Hide file tree
Showing 4 changed files with 112 additions and 87 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/gateway-conformance.yml
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ jobs:
kuboNodeMultiaddr=$(ipfs --api=/ip4/127.0.0.1/tcp/5001 swarm addrs local --id | head -n 1)
# run gw
./rainbow --routing=http://127.0.0.1:8080 --peering=$kuboNodeMultiaddr &
./rainbow --http-routers=http://127.0.0.1:8080 --dht-routing=off --peering=$kuboNodeMultiaddr &
working-directory: rainbow

# 6. Run the gateway-conformance tests
Expand Down
2 changes: 2 additions & 0 deletions handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import (
func mustTestNode(t *testing.T, cfg Config) *Node {
cfg.DataDir = t.TempDir()
cfg.BlockstoreType = "flatfs"
cfg.DHTRouting = DHTStandard
cfg.RoutingV1Endpoints = []string{cidContactEndpoint}

ctx := context.Background()

Expand Down
81 changes: 43 additions & 38 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,31 +81,31 @@ Generate an identity seed and launch a gateway:
Name: "seed",
Value: "",
EnvVars: []string{"RAINBOW_SEED"},
Usage: "Seed to derive peerID from. Generate with gen-seed. Needs --seed-index. Best to use $CREDENTIALS_DIRECTORY/seed or $RAINBOW_DATADIR/seed.",
Usage: "Seed to derive peerID from. Generate with gen-seed. Needs --seed-index. Best to use $CREDENTIALS_DIRECTORY/seed or $RAINBOW_DATADIR/seed",
},
&cli.IntFlag{
Name: "seed-index",
Value: -1,
EnvVars: []string{"RAINBOW_SEED_INDEX"},
Usage: "Index to derivate the peerID (needs --seed)",
},
&cli.StringFlag{
&cli.StringSliceFlag{
Name: "gateway-domains",
Value: "",
Value: cli.NewStringSlice(),
EnvVars: []string{"RAINBOW_GATEWAY_DOMAINS"},
Usage: "Domains with flat path gateway, no Origin isolation. Comma-separated list.",
Usage: "Domains with flat path gateway, no Origin isolation (comma-separated)",
},
&cli.StringFlag{
&cli.StringSliceFlag{
Name: "subdomain-gateway-domains",
Value: "",
Value: cli.NewStringSlice(),
EnvVars: []string{"RAINBOW_SUBDOMAIN_GATEWAY_DOMAINS"},
Usage: "Domains with subdomain-based Origin isolation. Comma-separated list.",
Usage: "Domains with subdomain-based Origin isolation (comma-separated)",
},
&cli.StringFlag{
&cli.StringSliceFlag{
Name: "trustless-gateway-domains",
Value: "",
Value: cli.NewStringSlice(),
EnvVars: []string{"RAINBOW_TRUSTLESS_GATEWAY_DOMAINS"},
Usage: "Domains limited to trustless, verifiable response types. Comma-separated list.",
Usage: "Domains limited to trustless, verifiable response types (comma-separated)",
},
&cli.StringFlag{
Name: "gateway-listen-address",
Expand All @@ -123,13 +123,13 @@ Generate an identity seed and launch a gateway:
Name: "gc-interval",
Value: time.Minute * 60,
EnvVars: []string{"RAINBOW_GC_INTERVAL"},
Usage: "The interval between automatic GC runs. Set 0 to disable.",
Usage: "The interval between automatic GC runs. Set 0 to disable",
},
&cli.Float64Flag{
Name: "gc-threshold",
Value: 0.3,
EnvVars: []string{"RAINBOW_GC_THRESHOLD"},
Usage: "Percentage of how much of the disk free space must be available.",
Usage: "Percentage of how much of the disk free space must be available",
},
&cli.IntFlag{
Name: "connmgr-low",
Expand Down Expand Up @@ -167,26 +167,41 @@ Generate an identity seed and launch a gateway:
EnvVars: []string{"RAINBOW_MAX_FD"},
Usage: "Maximum number of file descriptors. Defaults to 50% of the process' limit",
},
&cli.StringSliceFlag{
Name: "http-routers",
Value: cli.NewStringSlice(cidContactEndpoint),
EnvVars: []string{"RAINBOW_HTTP_ROUTERS"},
Usage: "HTTP servers with /routing/v1 endpoints to use for delegated routing (comma-separated)",
},
&cli.StringFlag{
Name: "routing",
Value: "",
Usage: "RoutingV1 Endpoint (otherwise Amino DHT and cid.contact is used)",
Name: "dht-routing",
Value: "accelerated",
EnvVars: []string{"RAINBOW_DHT_ROUTING"},
Usage: "Use the Amino DHT for routing. Options are 'accelerated', 'standard' and 'off'",
Action: func(ctx *cli.Context, s string) error {
switch DHTRouting(s) {
case DHTAccelerated, DHTStandard, DHTOff:
return nil
default:
return errors.New("invalid value for --dht-routing: use 'accelerated', 'standard' or 'off'")
}
},
},
&cli.BoolFlag{
Name: "dht-share-host",
Name: "dht-shared-host",
Value: false,
EnvVars: []string{"RAINBOW_DHT_SHARED_HOST"},
Usage: "If false, DHT operations are run using an ephemeral peer, separate from the main one",
},
&cli.StringFlag{
&cli.StringSliceFlag{
Name: "denylists",
Value: "",
Value: cli.NewStringSlice(),
EnvVars: []string{"RAINBOW_DENYLISTS"},
Usage: "Denylist HTTP subscriptions (comma-separated). Must be append-only denylists",
},
&cli.StringFlag{
&cli.StringSliceFlag{
Name: "peering",
Value: "",
Value: cli.NewStringSlice(),
EnvVars: []string{"RAINBOW_PEERING"},
Usage: "Multiaddresses of peers to stay connected to (comma-separated)",
},
Expand All @@ -200,7 +215,7 @@ Generate an identity seed and launch a gateway:
Name: "ipns-max-cache-ttl",
Value: 0,
EnvVars: []string{"RAINBOW_IPNS_MAX_CACHE_TTL"},
Usage: "Optional cap on caching duration for IPNS/DNSLink lookups. Set to 0 to respect original TTLs.",
Usage: "Optional cap on caching duration for IPNS/DNSLink lookups. Set to 0 to respect original TTLs",
},
}

Expand Down Expand Up @@ -277,7 +292,7 @@ share the same seed as long as the indexes are different.
}

var peeringAddrs []peer.AddrInfo
for _, maStr := range getCommaSeparatedList(cctx.String("peering")) {
for _, maStr := range cctx.StringSlice("peering") {
ai, err := peer.AddrInfoFromString(maStr)
if err != nil {
return err
Expand All @@ -288,19 +303,20 @@ share the same seed as long as the indexes are different.
cfg := Config{
DataDir: ddir,
BlockstoreType: cctx.String("blockstore"),
GatewayDomains: getCommaSeparatedList(cctx.String("gateway-domains")),
SubdomainGatewayDomains: getCommaSeparatedList(cctx.String("subdomain-gateway-domains")),
TrustlessGatewayDomains: getCommaSeparatedList(cctx.String("trustless-gateway-domains")),
GatewayDomains: cctx.StringSlice("gateway-domains"),
SubdomainGatewayDomains: cctx.StringSlice("subdomain-gateway-domains"),
TrustlessGatewayDomains: cctx.StringSlice("trustless-gateway-domains"),
ConnMgrLow: cctx.Int("connmgr-low"),
ConnMgrHi: cctx.Int("connmgr-high"),
ConnMgrGrace: cctx.Duration("connmgr-grace"),
MaxMemory: cctx.Uint64("max-memory"),
MaxFD: cctx.Int("max-fd"),
InMemBlockCache: cctx.Int64("inmem-block-cache"),
RoutingV1: cctx.String("routing"),
RoutingV1Endpoints: cctx.StringSlice("http-routers"),
DHTRouting: DHTRouting(cctx.String("dht-routing")),
DHTSharedHost: cctx.Bool("dht-shared-host"),
IpnsMaxCacheTTL: cctx.Duration("ipns-max-cache-ttl"),
DenylistSubs: getCommaSeparatedList(cctx.String("denylists")),
DenylistSubs: cctx.StringSlice("denylists"),
Peering: peeringAddrs,
GCInterval: cctx.Duration("gc-interval"),
GCThreshold: cctx.Float64("gc-threshold"),
Expand Down Expand Up @@ -463,17 +479,6 @@ func writeAllGoroutineStacks(w io.Writer) error {
return err
}

func getCommaSeparatedList(val string) []string {
if val == "" {
return nil
}
items := strings.Split(val, ",")
for i, item := range items {
items[i] = strings.TrimSpace(item)
}
return items
}

func printIfListConfigured(message string, list []string) {
if len(list) > 0 {
fmt.Printf(message+"%v\n", strings.Join(list, ", "))
Expand Down
114 changes: 66 additions & 48 deletions setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,15 @@ func init() {
}
}

const ipniFallbackEndpoint = "https://cid.contact"
const cidContactEndpoint = "https://cid.contact"

type DHTRouting string

const (
DHTAccelerated DHTRouting = "accelerated"
DHTStandard DHTRouting = "standard"
DHTOff DHTRouting = "off"
)

type Node struct {
vs routing.ValueStore
Expand Down Expand Up @@ -97,7 +105,8 @@ type Config struct {
GatewayDomains []string
SubdomainGatewayDomains []string
TrustlessGatewayDomains []string
RoutingV1 string
RoutingV1Endpoints []string
DHTRouting DHTRouting
DHTSharedHost bool
IpnsMaxCacheTTL time.Duration

Expand Down Expand Up @@ -176,9 +185,7 @@ func Setup(ctx context.Context, cfg Config, key crypto.PrivKey, dnsCache *cached
)
blkst = blockstore.NewIdStore(blkst)

var pr routing.PeerRouting
var vs routing.ValueStore
var cr routing.ContentRouting
var router routing.Routing

// Increase per-host connection pool since we are making lots of concurrent requests.
httpClient := &http.Client{
Expand All @@ -201,17 +208,21 @@ func Setup(ctx context.Context, cfg Config, key crypto.PrivKey, dnsCache *cached
}

opts = append(opts, libp2p.Routing(func(h host.Host) (routing.PeerRouting, error) {
if cfg.RoutingV1 != "" {
routingClient, err := delegatedHTTPContentRouter(cfg.RoutingV1, routingv1client.WithStreamResultsRequired(), routingv1client.WithHTTPClient(httpClient))
var routingV1Routers []routing.Routing
for _, endpoint := range cfg.RoutingV1Endpoints {
rv1Opts := []routingv1client.Option{routingv1client.WithHTTPClient(httpClient)}
if endpoint != cidContactEndpoint {
rv1Opts = append(rv1Opts, routingv1client.WithStreamResultsRequired())
}
httpClient, err := delegatedHTTPContentRouter(endpoint, rv1Opts...)
if err != nil {
return nil, err
}
pr = routingClient
vs = routingClient
cr = routingClient
} else {
// If there are no delegated routing endpoints run an accelerated Amino DHT client and send IPNI requests to cid.contact
routingV1Routers = append(routingV1Routers, httpClient)
}

var dhtRouter routing.Routing
if cfg.DHTRouting != DHTOff {
var dhtHost host.Host
if cfg.DHTSharedHost {
dhtHost = h
Expand All @@ -237,54 +248,61 @@ func Setup(ctx context.Context, cfg Config, key crypto.PrivKey, dnsCache *cached
return nil, err
}

fullRTClient, err := fullrt.NewFullRT(dhtHost, dht.DefaultPrefix,
fullrt.DHTOption(
dht.Validator(record.NamespacedValidator{
"pk": record.PublicKeyValidator{},
"ipns": ipns.Validator{KeyBook: h.Peerstore()},
}),
dht.Datastore(ds),
dht.BootstrapPeers(dht.GetDefaultBootstrapPeerAddrInfos()...),
dht.BucketSize(20),
))
if err != nil {
return nil, err
if cfg.DHTRouting == DHTAccelerated {
fullRTClient, err := fullrt.NewFullRT(dhtHost, dht.DefaultPrefix,
fullrt.DHTOption(
dht.Validator(record.NamespacedValidator{
"pk": record.PublicKeyValidator{},
"ipns": ipns.Validator{KeyBook: h.Peerstore()},
}),
dht.Datastore(ds),
dht.BootstrapPeers(dht.GetDefaultBootstrapPeerAddrInfos()...),
dht.BucketSize(20),
))
if err != nil {
return nil, err
}
dhtRouter = &bundledDHT{
standard: standardClient,
fullRT: fullRTClient,
}
} else {
dhtRouter = standardClient
}
}

dhtRouter := &bundledDHT{
standard: standardClient,
fullRT: fullRTClient,
}
if len(routingV1Routers) == 0 && dhtRouter == nil {
return nil, errors.New("no routers configured: enable dht and/or configure /routing/v1 http endpoint")
}

// we want to also use the default HTTP routers, so wrap the FullRT client
// in a parallel router that calls them in parallel
httpRouters, err := delegatedHTTPContentRouter(ipniFallbackEndpoint, routingv1client.WithHTTPClient(httpClient))
if err != nil {
return nil, err
}
routers := []*routinghelpers.ParallelRouter{
{
if len(routingV1Routers) == 0 {
router = dhtRouter
} else {
var routers []*routinghelpers.ParallelRouter

if dhtRouter != nil {
routers = append(routers, &routinghelpers.ParallelRouter{
Router: dhtRouter,
ExecuteAfter: 0,
DoNotWaitForSearchValue: true,
IgnoreError: false,
},
{
})
}

for _, routingV1Router := range routingV1Routers {
routers = append(routers, &routinghelpers.ParallelRouter{
Timeout: 15 * time.Second,
Router: httpRouters,
Router: routingV1Router,
ExecuteAfter: 0,
DoNotWaitForSearchValue: true,
IgnoreError: true,
},
})
}
router := routinghelpers.NewComposableParallel(routers)

pr = router
vs = router
cr = router
router = routinghelpers.NewComposableParallel(routers)
}

return pr, nil
return router, nil
}))
h, err := libp2p.New(opts...)
if err != nil {
Expand All @@ -302,7 +320,7 @@ func Setup(ctx context.Context, cfg Config, key crypto.PrivKey, dnsCache *cached
}

bsctx := metri.CtxScope(ctx, "ipfs_bitswap")
bn := bsnet.NewFromIpfsHost(h, cr)
bn := bsnet.NewFromIpfsHost(h, router)
bswap := bsclient.New(bsctx, bn, blkst,
// default is 1 minute to search for a random live-want (1
// CID). I think we want to search for random live-wants more
Expand Down Expand Up @@ -357,7 +375,7 @@ func Setup(ctx context.Context, cfg Config, key crypto.PrivKey, dnsCache *cached
if cfg.IpnsMaxCacheTTL > 0 {
nsOptions = append(nsOptions, namesys.WithMaxCacheTTL(cfg.IpnsMaxCacheTTL))
}
ns, err := namesys.NewNameSystem(vs, nsOptions...)
ns, err := namesys.NewNameSystem(router, nsOptions...)
if err != nil {
return nil, err
}
Expand All @@ -376,7 +394,7 @@ func Setup(ctx context.Context, cfg Config, key crypto.PrivKey, dnsCache *cached
datastore: ds,
bsClient: bswap,
ns: ns,
vs: vs,
vs: router,
bsrv: bsrv,
resolver: r,
bwc: bwc,
Expand Down

0 comments on commit 51c9228

Please sign in to comment.