Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: shared cache for manual peering #114

Merged
merged 4 commits into from
Apr 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions docs/environment-variables.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
- [`RAINBOW_GC_INTERVAL`](#rainbow_gc_interval)
- [`RAINBOW_GC_THRESHOLD`](#rainbow_gc_threshold)
- [`RAINBOW_IPNS_MAX_CACHE_TTL`](#rainbow_ipns_max_cache_ttl)
- [`RAINBOW_PEERING`](#rainbow_peering)
- [`RAINBOW_PEERING_SHARED_CACHE`](#rainbow_peering_shared_cache)
- [Logging](#logging)
- [`GOLOG_LOG_LEVEL`](#golog_log_level)
- [`GOLOG_LOG_FMT`](#golog_log_fmt)
Expand Down Expand Up @@ -90,6 +92,33 @@ with [DNSLink](https://dnslink.dev/).

Default: No upper bound, [TTL from IPNS Record](https://specs.ipfs.tech/ipns/ipns-record/#ttl-uint64) or [TTL from DNSLink](https://datatracker.ietf.org/doc/html/rfc2181#section-8) used as-is.

### `RAINBOW_PEERING`

A comma-separated list of [multiaddresses](https://docs.libp2p.io/concepts/fundamentals/addressing/) of peers to stay connected to.


If `RAINBOW_SEED` is set and `/p2p/rainbow-seed/N` value is found here, Rainbow
will replace it with a valid `/p2p/` for a peer ID generated from same seed
and index `N`.

Default: not set (no peering)


### `RAINBOW_PEERING_SHARED_CACHE`

Enable sharing of local cache to peers safe-listed with `RAINBOW_PEERING`.

Once enabled, Rainbow will respond to [Bitswap](https://docs.ipfs.tech/concepts/bitswap/)
queries from these safelisted peers, serving locally cached blocks if requested.

The main use case for this feature is scaling and load balancing acrosss a
fleet of rainbow, or other bitswap-capable IPFS services. Cache sharing allows
clustered services to check if any of the other instances has a requested CID.
This saves resources as data cached on other instance can be fetched internally
(e.g. LAN) rather than externally (WAN, p2p).

Default: `false` (no cache sharing)

## Logging

### `GOLOG_LOG_LEVEL`
Expand Down
4 changes: 3 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ require (
github.com/ipfs-shipyard/nopfs v0.0.12
github.com/ipfs-shipyard/nopfs/ipfs v0.13.2-0.20231024163508-120e0c51ee3a
github.com/ipfs/boxo v0.19.1-0.20240418055150-eeea41458735
github.com/ipfs/go-block-format v0.2.0
github.com/ipfs/go-cid v0.4.1
github.com/ipfs/go-datastore v0.6.0
github.com/ipfs/go-ds-badger4 v0.1.5
Expand Down Expand Up @@ -90,18 +91,19 @@ require (
github.com/huin/goupnp v1.3.0 // indirect
github.com/ipfs/bbloom v0.0.4 // indirect
github.com/ipfs/go-bitfield v1.1.0 // indirect
github.com/ipfs/go-block-format v0.2.0 // indirect
github.com/ipfs/go-blockservice v0.5.2 // indirect
github.com/ipfs/go-ipfs-blockstore v1.3.1 // indirect
github.com/ipfs/go-ipfs-ds-help v1.1.1 // indirect
github.com/ipfs/go-ipfs-exchange-interface v0.2.1 // indirect
github.com/ipfs/go-ipfs-pq v0.0.3 // indirect
github.com/ipfs/go-ipfs-redirects-file v0.1.1 // indirect
github.com/ipfs/go-ipfs-util v0.0.3 // indirect
github.com/ipfs/go-ipld-cbor v0.1.0 // indirect
github.com/ipfs/go-ipld-format v0.6.0 // indirect
github.com/ipfs/go-ipld-legacy v0.2.1 // indirect
github.com/ipfs/go-log v1.0.5 // indirect
github.com/ipfs/go-merkledag v0.11.0 // indirect
github.com/ipfs/go-peertaskqueue v0.8.1 // indirect
github.com/ipfs/go-verifcid v0.0.3 // indirect
github.com/ipld/go-car v0.6.2 // indirect
github.com/ipld/go-car/v2 v2.13.1 // indirect
Expand Down
71 changes: 0 additions & 71 deletions handler_test.go
Original file line number Diff line number Diff line change
@@ -1,84 +1,13 @@
package main

import (
"bytes"
"context"
"net/http"
"net/http/httptest"
"testing"

chunker "github.com/ipfs/boxo/chunker"
"github.com/ipfs/boxo/ipld/merkledag"
"github.com/ipfs/boxo/ipld/unixfs/importer/balanced"
uih "github.com/ipfs/boxo/ipld/unixfs/importer/helpers"
util "github.com/ipfs/boxo/util"
"github.com/ipfs/go-cid"
ic "github.com/libp2p/go-libp2p/core/crypto"
"github.com/multiformats/go-multicodec"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func mustTestNode(t *testing.T, cfg Config) *Node {
cfg.DataDir = t.TempDir()
cfg.BlockstoreType = "flatfs"
cfg.DHTRouting = DHTStandard
cfg.RoutingV1Endpoints = []string{cidContactEndpoint}

ctx := context.Background()

sr := util.NewTimeSeededRand()
sk, _, err := ic.GenerateKeyPairWithReader(ic.Ed25519, 2048, sr)
require.NoError(t, err)

cdns := newCachedDNS(dnsCacheRefreshInterval)

t.Cleanup(func() {
_ = cdns.Close()
})

gnd, err := Setup(ctx, cfg, sk, cdns)
require.NoError(t, err)
return gnd
}

func mustTestServer(t *testing.T, cfg Config) (*httptest.Server, *Node) {
gnd := mustTestNode(t, cfg)

handler, err := setupGatewayHandler(cfg, gnd)
if err != nil {
require.NoError(t, err)
}

ts := httptest.NewServer(handler)

return ts, gnd
}

func mustAddFile(t *testing.T, gnd *Node, content []byte) cid.Cid {
dsrv := merkledag.NewDAGService(gnd.bsrv)

// Create a UnixFS graph from our file, parameters described here but can be visualized at https://dag.ipfs.tech/
ufsImportParams := uih.DagBuilderParams{
Maxlinks: uih.DefaultLinksPerBlock, // Default max of 174 links per block
RawLeaves: true, // Leave the actual file bytes untouched instead of wrapping them in a dag-pb protobuf wrapper
CidBuilder: cid.V1Builder{ // Use CIDv1 for all links
Codec: uint64(multicodec.DagPb),
MhType: uint64(multicodec.Sha2_256), // Use SHA2-256 as the hash function
MhLength: -1, // Use the default hash length for the given hash function (in this case 256 bits)
},
Dagserv: dsrv,
NoCopy: false,
}
ufsBuilder, err := ufsImportParams.New(chunker.NewSizeSplitter(bytes.NewReader(content), chunker.DefaultBlockSize)) // Split the file up into fixed sized 256KiB chunks
require.NoError(t, err)

nd, err := balanced.Layout(ufsBuilder) // Arrange the graph with a balanced layout
require.NoError(t, err)

return nd.Cid()
}

func TestTrustless(t *testing.T) {
t.Parallel()

Expand Down
5 changes: 5 additions & 0 deletions keys.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
crand "crypto/rand"
"crypto/sha256"
"errors"
"fmt"
"io"

libp2p "github.com/libp2p/go-libp2p/core/crypto"
Expand Down Expand Up @@ -43,3 +44,7 @@ func deriveKey(b58secret string, info []byte) (libp2p.PrivKey, error) {
key := ed25519.NewKeyFromSeed(keySeed)
return libp2p.UnmarshalEd25519PrivateKey(key)
}

func deriveKeyInfo(index int) []byte {
return []byte(fmt.Sprintf("rainbow-%d", index))
}
51 changes: 48 additions & 3 deletions main.go
hacdias marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@
"os"
"os/signal"
"path/filepath"
"regexp"
"runtime"
"strconv"
"strings"
"sync"
"syscall"
Expand Down Expand Up @@ -205,6 +207,12 @@
EnvVars: []string{"RAINBOW_PEERING"},
Usage: "Multiaddresses of peers to stay connected to (comma-separated)",
},
&cli.BoolFlag{
Name: "peering-shared-cache",
Value: false,
EnvVars: []string{"RAINBOW_PEERING_SHARED_CACHE"},
Usage: "Enable sharing of local cache to peers safe-listed with --peering. Rainbow will respond to Bitswap queries from these peers, serving locally cached data as needed.",
},
&cli.StringFlag{
Name: "blockstore",
Value: "flatfs",
Expand Down Expand Up @@ -250,6 +258,8 @@
}

app.Action = func(cctx *cli.Context) error {
fmt.Printf("Starting %s %s\n", name, version)

Check warning on line 261 in main.go

View check run for this annotation

Codecov / codecov/patch

main.go#L261

Added line #L261 was not covered by tests

ddir := cctx.String("datadir")
cdns := newCachedDNS(dnsCacheRefreshInterval)
defer cdns.Close()
Expand Down Expand Up @@ -280,8 +290,8 @@

index := cctx.Int("seed-index")
if len(seed) > 0 && index >= 0 {
fmt.Println("Deriving identity from seed")
priv, err = deriveKey(seed, []byte(fmt.Sprintf("rainbow-%d", index)))
fmt.Printf("Deriving identity from seed[%d]\n", index)
priv, err = deriveKey(seed, deriveKeyInfo(index))

Check warning on line 294 in main.go

View check run for this annotation

Codecov / codecov/patch

main.go#L293-L294

Added lines #L293 - L294 were not covered by tests
} else {
fmt.Println("Setting identity from libp2p.key")
keyFile := filepath.Join(secretsDir, "libp2p.key")
Expand All @@ -293,6 +303,15 @@

var peeringAddrs []peer.AddrInfo
for _, maStr := range cctx.StringSlice("peering") {
if len(seed) > 0 && index >= 0 {
maStr, err = replaceRainbowSeedWithPeer(maStr, seed)
if err != nil {
return err

Check warning on line 309 in main.go

View check run for this annotation

Codecov / codecov/patch

main.go#L306-L309

Added lines #L306 - L309 were not covered by tests
}
} else if rainbowSeedRegex.MatchString(maStr) {
return fmt.Errorf("unable to peer with %q without defining --seed-index of this instance first", maStr)

Check warning on line 312 in main.go

View check run for this annotation

Codecov / codecov/patch

main.go#L311-L312

Added lines #L311 - L312 were not covered by tests
}

ai, err := peer.AddrInfoFromString(maStr)
if err != nil {
return err
Expand All @@ -318,6 +337,7 @@
IpnsMaxCacheTTL: cctx.Duration("ipns-max-cache-ttl"),
DenylistSubs: cctx.StringSlice("denylists"),
Peering: peeringAddrs,
PeeringCache: cctx.Bool("peering-shared-cache"),

Check warning on line 340 in main.go

View check run for this annotation

Codecov / codecov/patch

main.go#L340

Added line #L340 was not covered by tests
GCInterval: cctx.Duration("gc-interval"),
GCThreshold: cctx.Float64("gc-threshold"),
}
Expand All @@ -342,7 +362,6 @@
Handler: handler,
}

fmt.Printf("Starting %s %s\n", name, version)
pid, err := peer.IDFromPublicKey(priv.GetPublic())
if err != nil {
return err
Expand Down Expand Up @@ -484,3 +503,29 @@
fmt.Printf(message+"%v\n", strings.Join(list, ", "))
}
}

var rainbowSeedRegex = regexp.MustCompile(`/p2p/rainbow-seed/(\d+)`)

func replaceRainbowSeedWithPeer(addr string, seed string) (string, error) {
match := rainbowSeedRegex.FindStringSubmatch(addr)
if len(match) != 2 {
return addr, nil
}

index, err := strconv.Atoi(match[1])
if err != nil {
return "", err

Check warning on line 517 in main.go

View check run for this annotation

Codecov / codecov/patch

main.go#L517

Added line #L517 was not covered by tests
}

priv, err := deriveKey(seed, deriveKeyInfo(index))
if err != nil {
return "", err

Check warning on line 522 in main.go

View check run for this annotation

Codecov / codecov/patch

main.go#L522

Added line #L522 was not covered by tests
}

pid, err := peer.IDFromPublicKey(priv.GetPublic())
if err != nil {
return "", err

Check warning on line 527 in main.go

View check run for this annotation

Codecov / codecov/patch

main.go#L527

Added line #L527 was not covered by tests
}

return strings.Replace(addr, match[0], "/p2p/"+pid.String(), 1), nil
}
Loading
Loading