Skip to content

Commit

Permalink
feat: add test peering and cache peering
Browse files Browse the repository at this point in the history
  • Loading branch information
hacdias committed Apr 8, 2024
1 parent 92332e9 commit 9f64e24
Show file tree
Hide file tree
Showing 5 changed files with 254 additions and 77 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ require (
github.com/ipfs-shipyard/nopfs v0.0.12
github.com/ipfs-shipyard/nopfs/ipfs v0.13.2-0.20231024163508-120e0c51ee3a
github.com/ipfs/boxo v0.18.1-0.20240408102328-6c1937446978
github.com/ipfs/go-block-format v0.2.0
github.com/ipfs/go-cid v0.4.1
github.com/ipfs/go-datastore v0.6.0
github.com/ipfs/go-ds-badger4 v0.1.5
Expand Down Expand Up @@ -90,7 +91,6 @@ require (
github.com/huin/goupnp v1.3.0 // indirect
github.com/ipfs/bbloom v0.0.4 // indirect
github.com/ipfs/go-bitfield v1.1.0 // indirect
github.com/ipfs/go-block-format v0.2.0 // indirect
github.com/ipfs/go-ipfs-pq v0.0.3 // indirect
github.com/ipfs/go-ipfs-redirects-file v0.1.1 // indirect
github.com/ipfs/go-ipfs-util v0.0.3 // indirect
Expand Down
71 changes: 0 additions & 71 deletions handler_test.go
Original file line number Diff line number Diff line change
@@ -1,84 +1,13 @@
package main

import (
"bytes"
"context"
"net/http"
"net/http/httptest"
"testing"

chunker "github.com/ipfs/boxo/chunker"
"github.com/ipfs/boxo/ipld/merkledag"
"github.com/ipfs/boxo/ipld/unixfs/importer/balanced"
uih "github.com/ipfs/boxo/ipld/unixfs/importer/helpers"
util "github.com/ipfs/boxo/util"
"github.com/ipfs/go-cid"
ic "github.com/libp2p/go-libp2p/core/crypto"
"github.com/multiformats/go-multicodec"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func mustTestNode(t *testing.T, cfg Config) *Node {
cfg.DataDir = t.TempDir()
cfg.BlockstoreType = "flatfs"
cfg.DHTRouting = DHTStandard
cfg.RoutingV1Endpoints = []string{cidContactEndpoint}

ctx := context.Background()

sr := util.NewTimeSeededRand()
sk, _, err := ic.GenerateKeyPairWithReader(ic.Ed25519, 2048, sr)
require.NoError(t, err)

cdns := newCachedDNS(dnsCacheRefreshInterval)

t.Cleanup(func() {
_ = cdns.Close()
})

gnd, err := Setup(ctx, cfg, sk, cdns)
require.NoError(t, err)
return gnd
}

func mustTestServer(t *testing.T, cfg Config) (*httptest.Server, *Node) {
gnd := mustTestNode(t, cfg)

handler, err := setupGatewayHandler(cfg, gnd)
if err != nil {
require.NoError(t, err)
}

ts := httptest.NewServer(handler)

return ts, gnd
}

func mustAddFile(t *testing.T, gnd *Node, content []byte) cid.Cid {
dsrv := merkledag.NewDAGService(gnd.bsrv)

// Create a UnixFS graph from our file, parameters described here but can be visualized at https://dag.ipfs.tech/
ufsImportParams := uih.DagBuilderParams{
Maxlinks: uih.DefaultLinksPerBlock, // Default max of 174 links per block
RawLeaves: true, // Leave the actual file bytes untouched instead of wrapping them in a dag-pb protobuf wrapper
CidBuilder: cid.V1Builder{ // Use CIDv1 for all links
Codec: uint64(multicodec.DagPb),
MhType: uint64(multicodec.Sha2_256), // Use SHA2-256 as the hash function
MhLength: -1, // Use the default hash length for the given hash function (in this case 256 bits)
},
Dagserv: dsrv,
NoCopy: false,
}
ufsBuilder, err := ufsImportParams.New(chunker.NewSizeSplitter(bytes.NewReader(content), chunker.DefaultBlockSize)) // Split the file up into fixed sized 256KiB chunks
require.NoError(t, err)

nd, err := balanced.Layout(ufsBuilder) // Arrange the graph with a balanced layout
require.NoError(t, err)

return nd.Cid()
}

func TestTrustless(t *testing.T) {
t.Parallel()

Expand Down
94 changes: 94 additions & 0 deletions main_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
package main

import (
"bytes"
"context"
"crypto/rand"
"net/http/httptest"
"testing"

chunker "github.com/ipfs/boxo/chunker"
"github.com/ipfs/boxo/ipld/merkledag"
"github.com/ipfs/boxo/ipld/unixfs/importer/balanced"
uih "github.com/ipfs/boxo/ipld/unixfs/importer/helpers"
"github.com/ipfs/go-cid"
ic "github.com/libp2p/go-libp2p/core/crypto"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/multiformats/go-multicodec"
"github.com/stretchr/testify/require"
)

func mustTestPeer(t *testing.T) (ic.PrivKey, peer.ID) {
sk, _, err := ic.GenerateKeyPairWithReader(ic.Ed25519, 2048, rand.Reader)
require.NoError(t, err)

pid, err := peer.IDFromPrivateKey(sk)
require.NoError(t, err)

return sk, pid
}

func mustTestNode(t *testing.T, cfg Config) *Node {
sk, _ := mustTestPeer(t)
return mustTestNodeWithKey(t, cfg, sk)
}

func mustTestNodeWithKey(t *testing.T, cfg Config, sk ic.PrivKey) *Node {
// Set necessary fields if not defined.
if cfg.DataDir == "" {
cfg.DataDir = t.TempDir()
}
if cfg.BlockstoreType == "" {
cfg.BlockstoreType = "flatfs"
}
if cfg.DHTRouting == "" {
cfg.DHTRouting = DHTOff
}

ctx := context.Background()
cdns := newCachedDNS(dnsCacheRefreshInterval)

t.Cleanup(func() {
_ = cdns.Close()
})

nd, err := Setup(ctx, cfg, sk, cdns)
require.NoError(t, err)
return nd
}

func mustTestServer(t *testing.T, cfg Config) (*httptest.Server, *Node) {
nd := mustTestNode(t, cfg)

handler, err := setupGatewayHandler(cfg, nd)
if err != nil {
require.NoError(t, err)
}

ts := httptest.NewServer(handler)
return ts, nd
}

func mustAddFile(t *testing.T, gnd *Node, content []byte) cid.Cid {
dsrv := merkledag.NewDAGService(gnd.bsrv)

// Create a UnixFS graph from our file, parameters described here but can be visualized at https://dag.ipfs.tech/
ufsImportParams := uih.DagBuilderParams{
Maxlinks: uih.DefaultLinksPerBlock, // Default max of 174 links per block
RawLeaves: true, // Leave the actual file bytes untouched instead of wrapping them in a dag-pb protobuf wrapper
CidBuilder: cid.V1Builder{ // Use CIDv1 for all links
Codec: uint64(multicodec.DagPb),
MhType: uint64(multicodec.Sha2_256), // Use SHA2-256 as the hash function
MhLength: -1, // Use the default hash length for the given hash function (in this case 256 bits)
},
Dagserv: dsrv,
NoCopy: false,
}
ufsBuilder, err := ufsImportParams.New(chunker.NewSizeSplitter(bytes.NewReader(content), chunker.DefaultBlockSize)) // Split the file up into fixed sized 256KiB chunks
require.NoError(t, err)

nd, err := balanced.Layout(ufsBuilder) // Arrange the graph with a balanced layout
require.NoError(t, err)

return nd.Cid()
}
14 changes: 9 additions & 5 deletions setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,11 +273,10 @@ func Setup(ctx context.Context, cfg Config, key crypto.PrivKey, dnsCache *cached
}
}

if len(routingV1Routers) == 0 && dhtRouter == nil {
return nil, errors.New("no routers available")
}
// Default router is no routing at all: can be especially useful during tests.
router = &routinghelpers.Null{}

if len(routingV1Routers) == 0 {
if len(routingV1Routers) == 0 && dhtRouter != nil {
router = dhtRouter
} else {
var routers []*routinghelpers.ParallelRouter
Expand All @@ -301,7 +300,9 @@ func Setup(ctx context.Context, cfg Config, key crypto.PrivKey, dnsCache *cached
})
}

router = routinghelpers.NewComposableParallel(routers)
if len(routers) > 0 {
router = routinghelpers.NewComposableParallel(routers)
}
}

return router, nil
Expand Down Expand Up @@ -329,11 +330,13 @@ func Setup(ctx context.Context, cfg Config, key crypto.PrivKey, dnsCache *cached
}

pbrf = func(p peer.ID, c cid.Cid) bool {
fmt.Println("MAYBE", p, c)
_, ok := peers[p]
return ok
}
} else {
pbrf = func(p peer.ID, c cid.Cid) bool {
fmt.Println("ALWAYS FALSE", p, c)
return false
}
}
Expand All @@ -353,6 +356,7 @@ func Setup(ctx context.Context, cfg Config, key crypto.PrivKey, dnsCache *cached

// ---- Server Options
bitswap.WithPeerBlockRequestFilter(pbrf),
bitswap.ProvideEnabled(true),
)
bn.Start(bswap)

Expand Down
150 changes: 150 additions & 0 deletions setup_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
package main

import (
"context"
"fmt"
"net"
"testing"
"time"

blocks "github.com/ipfs/go-block-format"
ic "github.com/libp2p/go-libp2p/core/crypto"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/multiformats/go-multiaddr"
"github.com/stretchr/testify/require"
)

func mustFreePort(t *testing.T) (int, *net.TCPListener) {
addr, err := net.ResolveTCPAddr("tcp", "localhost:0")
require.NoError(t, err)

l, err := net.ListenTCP("tcp", addr)
require.NoError(t, err)

return l.Addr().(*net.TCPAddr).Port, l
}

func mustFreePorts(t *testing.T, n int) []int {
ports := make([]int, 0)
for i := 0; i < n; i++ {
port, listener := mustFreePort(t)
defer listener.Close()
ports = append(ports, port)
}

return ports
}

func mustListenAddrWithPort(t *testing.T, port int) multiaddr.Multiaddr {
ma, err := multiaddr.NewMultiaddr(fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", port))
require.NoError(t, err)
return ma
}

// mustPeeredNodes creates a network of [Node]s with the given configuration.
// The configuration contains as many elements as there are nodes. Each element
// indicates to which other nodes it is connected.
//
// Example configuration: [][]int{
// {1, 2},
// {0},
// {0},
// }
//
// - Node 0 is connected to nodes 1 and 2.
// - Node 1 is connected to node 0.
// - Node 2 is connected to node 1.
func mustPeeredNodes(t *testing.T, configuration [][]int, peeringCache bool) []*Node {
n := len(configuration)

// Generate ports, secrets keys, peer IDs and multiaddresses.
ports := mustFreePorts(t, n)
keys := make([]ic.PrivKey, n)
pids := make([]peer.ID, n)
mas := make([]multiaddr.Multiaddr, n)
addrInfos := make([]peer.AddrInfo, n)

for i := 0; i < n; i++ {
keys[i], pids[i] = mustTestPeer(t)
mas[i] = mustListenAddrWithPort(t, ports[i])
addrInfos[i] = peer.AddrInfo{
ID: pids[i],
Addrs: []multiaddr.Multiaddr{mas[i]},
}
}

cfgs := make([]Config, n)
nodes := make([]*Node, n)
for i := 0; i < n; i++ {
cfgs[i] = Config{
DHTRouting: DHTOff,
RoutingV1Endpoints: []string{},
ListenAddrs: []string{mas[i].String()},
Peering: []peer.AddrInfo{},
PeeringCache: peeringCache,
}

for _, j := range configuration[i] {
cfgs[i].Peering = append(cfgs[i].Peering, addrInfos[j])
}

nodes[i] = mustTestNodeWithKey(t, cfgs[i], keys[i])

t.Log("Node", i, "Addresses", nodes[i].host.Addrs(), "Peering", cfgs[i].Peering)
}

require.Eventually(t, func() bool {
for i, node := range nodes {
for _, peer := range cfgs[i].Peering {
if node.host.Network().Connectedness(peer.ID) != network.Connected {
t.Log(node.host.Network().Connectedness(peer.ID))
return false
}
}
}

return true
}, time.Second*30, time.Millisecond*100)

return nodes
}

func TestPeering(t *testing.T) {
_ = mustPeeredNodes(t, [][]int{
{1, 2},
{0, 2},
{0, 1},
}, false)
}

func TestPeeringCache(t *testing.T) {
nodes := mustPeeredNodes(t, [][]int{
{1},
{0},
{},
}, true)

bl := blocks.NewBlock([]byte(string("peering-cache-test")))

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

checkBitswap := func(i int, success bool) {
ctx, cancel := context.WithTimeout(ctx, time.Second*5)
defer cancel()

_, err := nodes[i].bsrv.GetBlock(ctx, bl.Cid())
if success {
require.NoError(t, err)
} else {
require.Error(t, err)
}
}

err := nodes[0].bsrv.AddBlock(ctx, bl)
require.NoError(t, err)

checkBitswap(1, true)
checkBitswap(2, false)
}

0 comments on commit 9f64e24

Please sign in to comment.