Skip to content
This repository has been archived by the owner on Jun 20, 2024. It is now read-only.

feat!: IPNS routing based on IPIP-351 or IPIP-379 #185

Merged
merged 5 commits into from
Oct 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions blockstore_proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,6 @@ import (

const (
EnvProxyGateway = "PROXY_GATEWAY_URL"

DefaultProxyGateway = "http://127.0.0.1:8080"
DefaultKuboRPC = "http://127.0.0.1:5001"
)

type proxyBlockStore struct {
Expand Down
25 changes: 17 additions & 8 deletions docs/environment-variables.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,9 @@

### `KUBO_RPC_URL`

Default: see `DefaultKuboRPC`

Single URL or a comma separated list of RPC endpoints that provide `/api/v0` from Kubo.

We use this as temporary solution for IPNS Record routing until [IPIP-351](https://github.com/ipfs/specs/pull/351) ships with Kubo 0.19,
and we also redirect some legacy `/api/v0` commands that need to be handled on `ipfs.io`.
This is used to redirect legacy `/api/v0` commands that need to be handled on `ipfs.io`.
If this is not set, the redirects are not set up.

### `BLOCK_CACHE_SIZE`

Expand Down Expand Up @@ -59,9 +56,21 @@ in the near feature.

### `PROXY_GATEWAY_URL`

Single URL or a comma separated list of Gateway endpoints that support `?format=block|car`
responses. This is used by default with `http://127.0.0.1:8080` unless `STRN_ORCHESTRATOR_URL`
is set.
Single URL or a comma separated list of Gateway endpoints that support `?format=block|car|ipns-record`
responses. Either this variable or `STRN_ORCHESTRATOR_URL` must be set.

If this gateway does not support `application/vnd.ipfs.ipns-record`, you can use `IPNS_RECORD_GATEWAY_URL`
to override the gateway address from which to retrieve IPNS Records from.

### `IPNS_RECORD_GATEWAY_URL`

Single URL or a comma separated list of Gateway endpoints that support requests for `application/vnd.ipfs.ipns-record`.
This is used for IPNS Record routing.

`IPNS_RECORD_GATEWAY_URL` also supports [Routing V1 HTTP API](https://specs.ipfs.tech/routing/http-routing-v1/)
for IPNS Record routing ([IPIP-379](https://specs.ipfs.tech/ipips/ipip-0379/)). To use it, the provided URL must end with `/routing/v1`.

If not set, the IPNS records will be fetched from `KUBO_RPC_URL`.

## Saturn Backend

Expand Down
26 changes: 19 additions & 7 deletions handlers.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package main

import (
"errors"
"fmt"
"math/rand"
"net/http"
Expand All @@ -12,6 +13,7 @@
_ "net/http/pprof"

"github.com/ipfs/bifrost-gateway/lib"
"github.com/libp2p/go-libp2p/core/routing"
"go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"

"github.com/filecoin-saturn/caboose"
Expand Down Expand Up @@ -68,9 +70,16 @@
})
}

func makeGatewayHandler(bs bstore.Blockstore, kuboRPC []string, port int, blockCacheSize int, cdns *cachedDNS, useGraphBackend bool) (*http.Server, error) {
// Sets up the routing system, which will proxy the IPNS routing requests to the given gateway.
routing := newProxyRouting(kuboRPC, cdns)
func makeGatewayHandler(bs bstore.Blockstore, kuboRPC, ipnsRecordGateways []string, port int, blockCacheSize int, cdns *cachedDNS, useGraphBackend bool) (*http.Server, error) {
// Sets up the routing system, which will proxy the IPNS routing requests to the given gateway or kubo RPC.
var routing routing.ValueStore
if len(ipnsRecordGateways) != 0 {
routing = newProxyRouting(ipnsRecordGateways, cdns)
} else if len(kuboRPC) != 0 {
routing = newRPCProxyRouting(kuboRPC, cdns)
} else {
return nil, errors.New("either KUBO_RPC_URL, IPNS_RECORD_GATEWAY_URL or PROXY_GATEWAY_URL with support for application/vnd.ipfs.ipns-record must be provided in order to delegate IPNS routing")
}

Check warning on line 82 in handlers.go

View check run for this annotation

Codecov / codecov/patch

handlers.go#L73-L82

Added lines #L73 - L82 were not covered by tests

// Sets up a cache to store blocks in
cacheBlockStore, err := lib.NewCacheBlockStore(blockCacheSize)
Expand Down Expand Up @@ -157,10 +166,13 @@
mux := http.NewServeMux()
mux.Handle("/ipfs/", ipfsHandler)
mux.Handle("/ipns/", ipnsHandler)
// TODO: below is legacy which we want to remove, measuring this separately
// allows us to decide when is the time to do it.
legacyKuboRpcHandler := withHTTPMetrics(newKuboRPCHandler(kuboRPC), "legacyKuboRpc")
mux.Handle("/api/v0/", legacyKuboRpcHandler)

if len(kuboRPC) != 0 {
// TODO: below is legacy which we want to remove, measuring this separately
// allows us to decide when is the time to do it.
legacyKuboRpcHandler := withHTTPMetrics(newKuboRPCHandler(kuboRPC), "legacyKuboRpc")
mux.Handle("/api/v0/", legacyKuboRpcHandler)
}

Check warning on line 175 in handlers.go

View check run for this annotation

Codecov / codecov/patch

handlers.go#L169-L175

Added lines #L169 - L175 were not covered by tests

// Construct the HTTP handler for the gateway.
handler := withConnect(mux)
Expand Down
26 changes: 19 additions & 7 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,11 @@
}

const (
EnvKuboRPC = "KUBO_RPC_URL"
EnvBlockCacheSize = "BLOCK_CACHE_SIZE"
EnvGraphBackend = "GRAPH_BACKEND"
RequestIDHeader = "X-Bfid"
EnvKuboRPC = "KUBO_RPC_URL"
hacdias marked this conversation as resolved.
Show resolved Hide resolved
EnvIPNSRecordGateway = "IPNS_RECORD_GATEWAY_URL"
EnvBlockCacheSize = "BLOCK_CACHE_SIZE"
EnvGraphBackend = "GRAPH_BACKEND"
RequestIDHeader = "X-Bfid"
)

func init() {
Expand All @@ -57,7 +58,7 @@
// Get env variables.
saturnOrchestrator := getEnv(EnvSaturnOrchestrator, "")
proxyGateway := getEnvs(EnvProxyGateway, "")
kuboRPC := getEnvs(EnvKuboRPC, DefaultKuboRPC)
kuboRPC := getEnvs(EnvKuboRPC, "")

Check warning on line 61 in main.go

View check run for this annotation

Codecov / codecov/patch

main.go#L61

Added line #L61 was not covered by tests

blockCacheSize, err := getEnvInt(EnvBlockCacheSize, lib.DefaultCacheBlockStoreSize)
if err != nil {
Expand Down Expand Up @@ -107,7 +108,15 @@
log.Fatalf("Unable to start. bifrost-gateway requires either PROXY_GATEWAY_URL or STRN_ORCHESTRATOR_URL to be set.\n\nRead docs at https://github.com/ipfs/bifrost-gateway/blob/main/docs/environment-variables.md\n\n")
}

gatewaySrv, err := makeGatewayHandler(bs, kuboRPC, gatewayPort, blockCacheSize, cdns, useGraphBackend)
// Prefer IPNS_RECORD_GATEWAY_URL when an explicit URL for IPNS routing is set
ipnsRecordGateway := getEnvs(EnvIPNSRecordGateway, "")
if len(ipnsRecordGateway) == 0 {
// Fallback to PROXY_GATEWAY_URL, assuming it is modern
// enough to support application/vnd.ipfs.ipns-record responses
ipnsRecordGateway = proxyGateway
}

Check warning on line 117 in main.go

View check run for this annotation

Codecov / codecov/patch

main.go#L112-L117

Added lines #L112 - L117 were not covered by tests

gatewaySrv, err := makeGatewayHandler(bs, kuboRPC, ipnsRecordGateway, gatewayPort, blockCacheSize, cdns, useGraphBackend)

Check warning on line 119 in main.go

View check run for this annotation

Codecov / codecov/patch

main.go#L119

Added line #L119 was not covered by tests
if err != nil {
return err
}
Expand All @@ -128,7 +137,10 @@
log.Printf("%s: %d", EnvBlockCacheSize, blockCacheSize)
log.Printf("%s: %t", EnvGraphBackend, useGraphBackend)

log.Printf("Legacy RPC at /api/v0 (%s) provided by %s", EnvKuboRPC, strings.Join(kuboRPC, " "))
if len(kuboRPC) != 0 {
log.Printf("Legacy RPC at /api/v0 (%s) provided by %s", EnvKuboRPC, strings.Join(kuboRPC, " "))
}

Check warning on line 142 in main.go

View check run for this annotation

Codecov / codecov/patch

main.go#L140-L142

Added lines #L140 - L142 were not covered by tests

log.Printf("Path gateway listening on http://127.0.0.1:%d", gatewayPort)
log.Printf(" Smoke test (JPG): http://127.0.0.1:%d/ipfs/bafybeigdyrzt5sfp7udm7hu76uh7y26nf3efuylqabf3oclgtqy55fbzdi", gatewayPort)
log.Printf("Subdomain gateway configured on dweb.link and http://localhost:%d", gatewayPort)
Expand Down
140 changes: 132 additions & 8 deletions routing.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,17 @@
"go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
)

type proxyRouting struct {
type rpcProxyRouting struct {
kuboRPC []string
hacdias marked this conversation as resolved.
Show resolved Hide resolved
httpClient *http.Client
rand *rand.Rand
}

func newProxyRouting(kuboRPC []string, cdns *cachedDNS) routing.ValueStore {
func newRPCProxyRouting(kuboRPC []string, cdns *cachedDNS) routing.ValueStore {

Check warning on line 27 in routing.go

View check run for this annotation

Codecov / codecov/patch

routing.go#L27

Added line #L27 was not covered by tests
s := rand.NewSource(time.Now().Unix())
rand := rand.New(s)

return &proxyRouting{
return &rpcProxyRouting{

Check warning on line 31 in routing.go

View check run for this annotation

Codecov / codecov/patch

routing.go#L31

Added line #L31 was not covered by tests
kuboRPC: kuboRPC,
httpClient: &http.Client{
Transport: otelhttp.NewTransport(&customTransport{
Expand All @@ -48,15 +48,15 @@
}
}

func (ps *proxyRouting) PutValue(context.Context, string, []byte, ...routing.Option) error {
func (ps *rpcProxyRouting) PutValue(context.Context, string, []byte, ...routing.Option) error {

Check warning on line 51 in routing.go

View check run for this annotation

Codecov / codecov/patch

routing.go#L51

Added line #L51 was not covered by tests
return routing.ErrNotSupported
}

func (ps *proxyRouting) GetValue(ctx context.Context, k string, opts ...routing.Option) ([]byte, error) {
func (ps *rpcProxyRouting) GetValue(ctx context.Context, k string, opts ...routing.Option) ([]byte, error) {

Check warning on line 55 in routing.go

View check run for this annotation

Codecov / codecov/patch

routing.go#L55

Added line #L55 was not covered by tests
return ps.fetch(ctx, k)
}

func (ps *proxyRouting) SearchValue(ctx context.Context, k string, opts ...routing.Option) (<-chan []byte, error) {
func (ps *rpcProxyRouting) SearchValue(ctx context.Context, k string, opts ...routing.Option) (<-chan []byte, error) {

Check warning on line 59 in routing.go

View check run for this annotation

Codecov / codecov/patch

routing.go#L59

Added line #L59 was not covered by tests
if !strings.HasPrefix(k, "/ipns/") {
return nil, routing.ErrNotSupported
}
Expand All @@ -76,7 +76,7 @@
return ch, nil
}

func (ps *proxyRouting) fetch(ctx context.Context, key string) (rb []byte, err error) {
func (ps *rpcProxyRouting) fetch(ctx context.Context, key string) (rb []byte, err error) {

Check warning on line 79 in routing.go

View check run for this annotation

Codecov / codecov/patch

routing.go#L79

Added line #L79 was not covered by tests
name, err := ipns.NameFromRoutingKey([]byte(key))
if err != nil {
return nil, err
Expand Down Expand Up @@ -151,6 +151,130 @@
return rb, nil
}

func (ps *proxyRouting) getRandomKuboURL() string {
func (ps *rpcProxyRouting) getRandomKuboURL() string {

Check warning on line 154 in routing.go

View check run for this annotation

Codecov / codecov/patch

routing.go#L154

Added line #L154 was not covered by tests
return ps.kuboRPC[ps.rand.Intn(len(ps.kuboRPC))]
}

type proxyRouting struct {
ipnsRecordGateways []string
httpClient *http.Client
rand *rand.Rand
}

func newProxyRouting(ipnsRecordGateways []string, cdns *cachedDNS) routing.ValueStore {
s := rand.NewSource(time.Now().Unix())
rand := rand.New(s)

return &proxyRouting{
ipnsRecordGateways: ipnsRecordGateways,
httpClient: &http.Client{
Transport: otelhttp.NewTransport(&customTransport{
// RoundTripper with increased defaults than http.Transport such that retrieving
// multiple lookups concurrently is fast.
RoundTripper: &http.Transport{
MaxIdleConns: 1000,
MaxConnsPerHost: 100,
MaxIdleConnsPerHost: 100,
IdleConnTimeout: 90 * time.Second,
DialContext: cdns.dialWithCachedDNS,
ForceAttemptHTTP2: true,
},
}),
},
rand: rand,
}

Check warning on line 185 in routing.go

View check run for this annotation

Codecov / codecov/patch

routing.go#L164-L185

Added lines #L164 - L185 were not covered by tests
}

func (ps *proxyRouting) PutValue(context.Context, string, []byte, ...routing.Option) error {
return routing.ErrNotSupported

Check warning on line 189 in routing.go

View check run for this annotation

Codecov / codecov/patch

routing.go#L188-L189

Added lines #L188 - L189 were not covered by tests
}

func (ps *proxyRouting) GetValue(ctx context.Context, k string, opts ...routing.Option) ([]byte, error) {
if !strings.HasPrefix(k, "/ipns/") {
return nil, routing.ErrNotSupported
}

Check warning on line 195 in routing.go

View check run for this annotation

Codecov / codecov/patch

routing.go#L192-L195

Added lines #L192 - L195 were not covered by tests

name, err := ipns.NameFromRoutingKey([]byte(k))
if err != nil {
return nil, err
}

Check warning on line 200 in routing.go

View check run for this annotation

Codecov / codecov/patch

routing.go#L197-L200

Added lines #L197 - L200 were not covered by tests

return ps.fetch(ctx, name)

Check warning on line 202 in routing.go

View check run for this annotation

Codecov / codecov/patch

routing.go#L202

Added line #L202 was not covered by tests
}

func (ps *proxyRouting) SearchValue(ctx context.Context, k string, opts ...routing.Option) (<-chan []byte, error) {
if !strings.HasPrefix(k, "/ipns/") {
return nil, routing.ErrNotSupported
}

Check warning on line 208 in routing.go

View check run for this annotation

Codecov / codecov/patch

routing.go#L205-L208

Added lines #L205 - L208 were not covered by tests

name, err := ipns.NameFromRoutingKey([]byte(k))
if err != nil {
return nil, err
}

Check warning on line 213 in routing.go

View check run for this annotation

Codecov / codecov/patch

routing.go#L210-L213

Added lines #L210 - L213 were not covered by tests

ch := make(chan []byte)

go func() {
v, err := ps.fetch(ctx, name)
if err != nil {
close(ch)
} else {
ch <- v
close(ch)
}

Check warning on line 224 in routing.go

View check run for this annotation

Codecov / codecov/patch

routing.go#L215-L224

Added lines #L215 - L224 were not covered by tests
}()

return ch, nil

Check warning on line 227 in routing.go

View check run for this annotation

Codecov / codecov/patch

routing.go#L227

Added line #L227 was not covered by tests
}

func (ps *proxyRouting) fetch(ctx context.Context, name ipns.Name) ([]byte, error) {
urlStr := fmt.Sprintf("%s/ipns/%s", ps.getRandomGatewayURL(), name.String())
req, err := http.NewRequestWithContext(ctx, http.MethodGet, urlStr, nil)
if err != nil {
return nil, err
}
req.Header.Set("Accept", "application/vnd.ipfs.ipns-record")

goLog.Debugw("routing proxy fetch", "key", name.String(), "from", req.URL.String())
defer func() {
if err != nil {
goLog.Debugw("routing proxy fetch error", "key", name.String(), "from", req.URL.String(), "error", err.Error())
}

Check warning on line 242 in routing.go

View check run for this annotation

Codecov / codecov/patch

routing.go#L230-L242

Added lines #L230 - L242 were not covered by tests
}()

resp, err := ps.httpClient.Do(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()

if err != nil {
return nil, err
}
defer resp.Body.Close()

if resp.StatusCode != http.StatusOK {
return nil, fmt.Errorf("unexpected status from remote gateway: %s", resp.Status)
}

Check warning on line 258 in routing.go

View check run for this annotation

Codecov / codecov/patch

routing.go#L245-L258

Added lines #L245 - L258 were not covered by tests

rb, err := io.ReadAll(io.LimitReader(resp.Body, int64(ipns.MaxRecordSize)))
if err != nil {
return nil, err
}

Check warning on line 263 in routing.go

View check run for this annotation

Codecov / codecov/patch

routing.go#L260-L263

Added lines #L260 - L263 were not covered by tests

rec, err := ipns.UnmarshalRecord(rb)
if err != nil {
return nil, err
}

Check warning on line 268 in routing.go

View check run for this annotation

Codecov / codecov/patch

routing.go#L265-L268

Added lines #L265 - L268 were not covered by tests

err = ipns.ValidateWithName(rec, name)
if err != nil {
return nil, err
}

Check warning on line 273 in routing.go

View check run for this annotation

Codecov / codecov/patch

routing.go#L270-L273

Added lines #L270 - L273 were not covered by tests

return rb, nil

Check warning on line 275 in routing.go

View check run for this annotation

Codecov / codecov/patch

routing.go#L275

Added line #L275 was not covered by tests
}

func (ps *proxyRouting) getRandomGatewayURL() string {
return strings.TrimSuffix(ps.ipnsRecordGateways[ps.rand.Intn(len(ps.ipnsRecordGateways))], "/")

Check warning on line 279 in routing.go

View check run for this annotation

Codecov / codecov/patch

routing.go#L278-L279

Added lines #L278 - L279 were not covered by tests
}
Loading