diff --git a/blockstore_proxy.go b/blockstore_proxy.go index deab4fb..8e78c84 100644 --- a/blockstore_proxy.go +++ b/blockstore_proxy.go @@ -24,9 +24,6 @@ import ( const ( EnvProxyGateway = "PROXY_GATEWAY_URL" - - DefaultProxyGateway = "http://127.0.0.1:8080" - DefaultKuboRPC = "http://127.0.0.1:5001" ) type proxyBlockStore struct { diff --git a/docs/environment-variables.md b/docs/environment-variables.md index 321987c..da7b5eb 100644 --- a/docs/environment-variables.md +++ b/docs/environment-variables.md @@ -26,12 +26,9 @@ ### `KUBO_RPC_URL` -Default: see `DefaultKuboRPC` - Single URL or a comma separated list of RPC endpoints that provide `/api/v0` from Kubo. - -We use this as temporary solution for IPNS Record routing until [IPIP-351](https://github.com/ipfs/specs/pull/351) ships with Kubo 0.19, -and we also redirect some legacy `/api/v0` commands that need to be handled on `ipfs.io`. +This is used to redirect legacy `/api/v0` commands that need to be handled on `ipfs.io`. +If this is not set, the redirects are not set up. ### `BLOCK_CACHE_SIZE` @@ -59,9 +56,21 @@ in the near feature. ### `PROXY_GATEWAY_URL` -Single URL or a comma separated list of Gateway endpoints that support `?format=block|car` -responses. This is used by default with `http://127.0.0.1:8080` unless `STRN_ORCHESTRATOR_URL` -is set. +Single URL or a comma separated list of Gateway endpoints that support `?format=block|car|ipns-record` +responses. Either this variable or `STRN_ORCHESTRATOR_URL` must be set. + +If this gateway does not support `application/vnd.ipfs.ipns-record`, you can use `IPNS_RECORD_GATEWAY_URL` +to override the gateway address from which to retrieve IPNS Records from. + +### `IPNS_RECORD_GATEWAY_URL` + +Single URL or a comma separated list of Gateway endpoints that support requests for `application/vnd.ipfs.ipns-record`. +This is used for IPNS Record routing. + +`IPNS_RECORD_GATEWAY_URL` also supports [Routing V1 HTTP API](https://specs.ipfs.tech/routing/http-routing-v1/) +for IPNS Record routing ([IPIP-379](https://specs.ipfs.tech/ipips/ipip-0379/)). To use it, the provided URL must end with `/routing/v1`. + +If not set, the IPNS records will be fetched from `KUBO_RPC_URL`. ## Saturn Backend diff --git a/handlers.go b/handlers.go index fb844dd..d64a8dd 100644 --- a/handlers.go +++ b/handlers.go @@ -1,6 +1,7 @@ package main import ( + "errors" "fmt" "math/rand" "net/http" @@ -12,6 +13,7 @@ import ( _ "net/http/pprof" "github.com/ipfs/bifrost-gateway/lib" + "github.com/libp2p/go-libp2p/core/routing" "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp" "github.com/filecoin-saturn/caboose" @@ -68,9 +70,16 @@ func withRequestLogger(next http.Handler) http.Handler { }) } -func makeGatewayHandler(bs bstore.Blockstore, kuboRPC []string, port int, blockCacheSize int, cdns *cachedDNS, useGraphBackend bool) (*http.Server, error) { - // Sets up the routing system, which will proxy the IPNS routing requests to the given gateway. - routing := newProxyRouting(kuboRPC, cdns) +func makeGatewayHandler(bs bstore.Blockstore, kuboRPC, ipnsRecordGateways []string, port int, blockCacheSize int, cdns *cachedDNS, useGraphBackend bool) (*http.Server, error) { + // Sets up the routing system, which will proxy the IPNS routing requests to the given gateway or kubo RPC. + var routing routing.ValueStore + if len(ipnsRecordGateways) != 0 { + routing = newProxyRouting(ipnsRecordGateways, cdns) + } else if len(kuboRPC) != 0 { + routing = newRPCProxyRouting(kuboRPC, cdns) + } else { + return nil, errors.New("either KUBO_RPC_URL, IPNS_RECORD_GATEWAY_URL or PROXY_GATEWAY_URL with support for application/vnd.ipfs.ipns-record must be provided in order to delegate IPNS routing") + } // Sets up a cache to store blocks in cacheBlockStore, err := lib.NewCacheBlockStore(blockCacheSize) @@ -157,10 +166,13 @@ func makeGatewayHandler(bs bstore.Blockstore, kuboRPC []string, port int, blockC mux := http.NewServeMux() mux.Handle("/ipfs/", ipfsHandler) mux.Handle("/ipns/", ipnsHandler) - // TODO: below is legacy which we want to remove, measuring this separately - // allows us to decide when is the time to do it. - legacyKuboRpcHandler := withHTTPMetrics(newKuboRPCHandler(kuboRPC), "legacyKuboRpc") - mux.Handle("/api/v0/", legacyKuboRpcHandler) + + if len(kuboRPC) != 0 { + // TODO: below is legacy which we want to remove, measuring this separately + // allows us to decide when is the time to do it. + legacyKuboRpcHandler := withHTTPMetrics(newKuboRPCHandler(kuboRPC), "legacyKuboRpc") + mux.Handle("/api/v0/", legacyKuboRpcHandler) + } // Construct the HTTP handler for the gateway. handler := withConnect(mux) diff --git a/main.go b/main.go index 58b35f3..ffd27db 100644 --- a/main.go +++ b/main.go @@ -31,10 +31,11 @@ func main() { } const ( - EnvKuboRPC = "KUBO_RPC_URL" - EnvBlockCacheSize = "BLOCK_CACHE_SIZE" - EnvGraphBackend = "GRAPH_BACKEND" - RequestIDHeader = "X-Bfid" + EnvKuboRPC = "KUBO_RPC_URL" + EnvIPNSRecordGateway = "IPNS_RECORD_GATEWAY_URL" + EnvBlockCacheSize = "BLOCK_CACHE_SIZE" + EnvGraphBackend = "GRAPH_BACKEND" + RequestIDHeader = "X-Bfid" ) func init() { @@ -57,7 +58,7 @@ See documentation at: https://github.com/ipfs/bifrost-gateway/#readme`, // Get env variables. saturnOrchestrator := getEnv(EnvSaturnOrchestrator, "") proxyGateway := getEnvs(EnvProxyGateway, "") - kuboRPC := getEnvs(EnvKuboRPC, DefaultKuboRPC) + kuboRPC := getEnvs(EnvKuboRPC, "") blockCacheSize, err := getEnvInt(EnvBlockCacheSize, lib.DefaultCacheBlockStoreSize) if err != nil { @@ -107,7 +108,15 @@ See documentation at: https://github.com/ipfs/bifrost-gateway/#readme`, log.Fatalf("Unable to start. bifrost-gateway requires either PROXY_GATEWAY_URL or STRN_ORCHESTRATOR_URL to be set.\n\nRead docs at https://github.com/ipfs/bifrost-gateway/blob/main/docs/environment-variables.md\n\n") } - gatewaySrv, err := makeGatewayHandler(bs, kuboRPC, gatewayPort, blockCacheSize, cdns, useGraphBackend) + // Prefer IPNS_RECORD_GATEWAY_URL when an explicit URL for IPNS routing is set + ipnsRecordGateway := getEnvs(EnvIPNSRecordGateway, "") + if len(ipnsRecordGateway) == 0 { + // Fallback to PROXY_GATEWAY_URL, assuming it is modern + // enough to support application/vnd.ipfs.ipns-record responses + ipnsRecordGateway = proxyGateway + } + + gatewaySrv, err := makeGatewayHandler(bs, kuboRPC, ipnsRecordGateway, gatewayPort, blockCacheSize, cdns, useGraphBackend) if err != nil { return err } @@ -128,7 +137,10 @@ See documentation at: https://github.com/ipfs/bifrost-gateway/#readme`, log.Printf("%s: %d", EnvBlockCacheSize, blockCacheSize) log.Printf("%s: %t", EnvGraphBackend, useGraphBackend) - log.Printf("Legacy RPC at /api/v0 (%s) provided by %s", EnvKuboRPC, strings.Join(kuboRPC, " ")) + if len(kuboRPC) != 0 { + log.Printf("Legacy RPC at /api/v0 (%s) provided by %s", EnvKuboRPC, strings.Join(kuboRPC, " ")) + } + log.Printf("Path gateway listening on http://127.0.0.1:%d", gatewayPort) log.Printf(" Smoke test (JPG): http://127.0.0.1:%d/ipfs/bafybeigdyrzt5sfp7udm7hu76uh7y26nf3efuylqabf3oclgtqy55fbzdi", gatewayPort) log.Printf("Subdomain gateway configured on dweb.link and http://localhost:%d", gatewayPort) diff --git a/routing.go b/routing.go index 286c5dd..17d9f88 100644 --- a/routing.go +++ b/routing.go @@ -18,17 +18,17 @@ import ( "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp" ) -type proxyRouting struct { +type rpcProxyRouting struct { kuboRPC []string httpClient *http.Client rand *rand.Rand } -func newProxyRouting(kuboRPC []string, cdns *cachedDNS) routing.ValueStore { +func newRPCProxyRouting(kuboRPC []string, cdns *cachedDNS) routing.ValueStore { s := rand.NewSource(time.Now().Unix()) rand := rand.New(s) - return &proxyRouting{ + return &rpcProxyRouting{ kuboRPC: kuboRPC, httpClient: &http.Client{ Transport: otelhttp.NewTransport(&customTransport{ @@ -48,15 +48,15 @@ func newProxyRouting(kuboRPC []string, cdns *cachedDNS) routing.ValueStore { } } -func (ps *proxyRouting) PutValue(context.Context, string, []byte, ...routing.Option) error { +func (ps *rpcProxyRouting) PutValue(context.Context, string, []byte, ...routing.Option) error { return routing.ErrNotSupported } -func (ps *proxyRouting) GetValue(ctx context.Context, k string, opts ...routing.Option) ([]byte, error) { +func (ps *rpcProxyRouting) GetValue(ctx context.Context, k string, opts ...routing.Option) ([]byte, error) { return ps.fetch(ctx, k) } -func (ps *proxyRouting) SearchValue(ctx context.Context, k string, opts ...routing.Option) (<-chan []byte, error) { +func (ps *rpcProxyRouting) SearchValue(ctx context.Context, k string, opts ...routing.Option) (<-chan []byte, error) { if !strings.HasPrefix(k, "/ipns/") { return nil, routing.ErrNotSupported } @@ -76,7 +76,7 @@ func (ps *proxyRouting) SearchValue(ctx context.Context, k string, opts ...routi return ch, nil } -func (ps *proxyRouting) fetch(ctx context.Context, key string) (rb []byte, err error) { +func (ps *rpcProxyRouting) fetch(ctx context.Context, key string) (rb []byte, err error) { name, err := ipns.NameFromRoutingKey([]byte(key)) if err != nil { return nil, err @@ -151,6 +151,130 @@ func (ps *proxyRouting) fetch(ctx context.Context, key string) (rb []byte, err e return rb, nil } -func (ps *proxyRouting) getRandomKuboURL() string { +func (ps *rpcProxyRouting) getRandomKuboURL() string { return ps.kuboRPC[ps.rand.Intn(len(ps.kuboRPC))] } + +type proxyRouting struct { + ipnsRecordGateways []string + httpClient *http.Client + rand *rand.Rand +} + +func newProxyRouting(ipnsRecordGateways []string, cdns *cachedDNS) routing.ValueStore { + s := rand.NewSource(time.Now().Unix()) + rand := rand.New(s) + + return &proxyRouting{ + ipnsRecordGateways: ipnsRecordGateways, + httpClient: &http.Client{ + Transport: otelhttp.NewTransport(&customTransport{ + // RoundTripper with increased defaults than http.Transport such that retrieving + // multiple lookups concurrently is fast. + RoundTripper: &http.Transport{ + MaxIdleConns: 1000, + MaxConnsPerHost: 100, + MaxIdleConnsPerHost: 100, + IdleConnTimeout: 90 * time.Second, + DialContext: cdns.dialWithCachedDNS, + ForceAttemptHTTP2: true, + }, + }), + }, + rand: rand, + } +} + +func (ps *proxyRouting) PutValue(context.Context, string, []byte, ...routing.Option) error { + return routing.ErrNotSupported +} + +func (ps *proxyRouting) GetValue(ctx context.Context, k string, opts ...routing.Option) ([]byte, error) { + if !strings.HasPrefix(k, "/ipns/") { + return nil, routing.ErrNotSupported + } + + name, err := ipns.NameFromRoutingKey([]byte(k)) + if err != nil { + return nil, err + } + + return ps.fetch(ctx, name) +} + +func (ps *proxyRouting) SearchValue(ctx context.Context, k string, opts ...routing.Option) (<-chan []byte, error) { + if !strings.HasPrefix(k, "/ipns/") { + return nil, routing.ErrNotSupported + } + + name, err := ipns.NameFromRoutingKey([]byte(k)) + if err != nil { + return nil, err + } + + ch := make(chan []byte) + + go func() { + v, err := ps.fetch(ctx, name) + if err != nil { + close(ch) + } else { + ch <- v + close(ch) + } + }() + + return ch, nil +} + +func (ps *proxyRouting) fetch(ctx context.Context, name ipns.Name) ([]byte, error) { + urlStr := fmt.Sprintf("%s/ipns/%s", ps.getRandomGatewayURL(), name.String()) + req, err := http.NewRequestWithContext(ctx, http.MethodGet, urlStr, nil) + if err != nil { + return nil, err + } + req.Header.Set("Accept", "application/vnd.ipfs.ipns-record") + + goLog.Debugw("routing proxy fetch", "key", name.String(), "from", req.URL.String()) + defer func() { + if err != nil { + goLog.Debugw("routing proxy fetch error", "key", name.String(), "from", req.URL.String(), "error", err.Error()) + } + }() + + resp, err := ps.httpClient.Do(req) + if err != nil { + return nil, err + } + defer resp.Body.Close() + + if err != nil { + return nil, err + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + return nil, fmt.Errorf("unexpected status from remote gateway: %s", resp.Status) + } + + rb, err := io.ReadAll(io.LimitReader(resp.Body, int64(ipns.MaxRecordSize))) + if err != nil { + return nil, err + } + + rec, err := ipns.UnmarshalRecord(rb) + if err != nil { + return nil, err + } + + err = ipns.ValidateWithName(rec, name) + if err != nil { + return nil, err + } + + return rb, nil +} + +func (ps *proxyRouting) getRandomGatewayURL() string { + return strings.TrimSuffix(ps.ipnsRecordGateways[ps.rand.Intn(len(ps.ipnsRecordGateways))], "/") +}