From ee70d5d0a9dcd97410e50ad3ec3d0dbc3cadf227 Mon Sep 17 00:00:00 2001 From: Henrique Dias Date: Mon, 8 Apr 2024 15:42:21 +0200 Subject: [PATCH] feat: add test peering --- go.mod | 2 +- handler_test.go | 71 ----------------------- main_test.go | 94 ++++++++++++++++++++++++++++++ setup.go | 25 +++++--- setup_test.go | 150 ++++++++++++++++++++++++++++++++++++++++++++++++ 5 files changed, 261 insertions(+), 81 deletions(-) create mode 100644 main_test.go create mode 100644 setup_test.go diff --git a/go.mod b/go.mod index 65c30a0..77c1e38 100644 --- a/go.mod +++ b/go.mod @@ -10,6 +10,7 @@ require ( github.com/ipfs-shipyard/nopfs v0.0.12 github.com/ipfs-shipyard/nopfs/ipfs v0.13.2-0.20231024163508-120e0c51ee3a github.com/ipfs/boxo v0.18.1-0.20240408102328-6c1937446978 + github.com/ipfs/go-block-format v0.2.0 github.com/ipfs/go-cid v0.4.1 github.com/ipfs/go-datastore v0.6.0 github.com/ipfs/go-ds-badger4 v0.1.5 @@ -90,7 +91,6 @@ require ( github.com/huin/goupnp v1.3.0 // indirect github.com/ipfs/bbloom v0.0.4 // indirect github.com/ipfs/go-bitfield v1.1.0 // indirect - github.com/ipfs/go-block-format v0.2.0 // indirect github.com/ipfs/go-ipfs-pq v0.0.3 // indirect github.com/ipfs/go-ipfs-redirects-file v0.1.1 // indirect github.com/ipfs/go-ipfs-util v0.0.3 // indirect diff --git a/handler_test.go b/handler_test.go index f51d2d3..120d775 100644 --- a/handler_test.go +++ b/handler_test.go @@ -1,84 +1,13 @@ package main import ( - "bytes" - "context" "net/http" - "net/http/httptest" "testing" - chunker "github.com/ipfs/boxo/chunker" - "github.com/ipfs/boxo/ipld/merkledag" - "github.com/ipfs/boxo/ipld/unixfs/importer/balanced" - uih "github.com/ipfs/boxo/ipld/unixfs/importer/helpers" - util "github.com/ipfs/boxo/util" - "github.com/ipfs/go-cid" - ic "github.com/libp2p/go-libp2p/core/crypto" - "github.com/multiformats/go-multicodec" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) -func mustTestNode(t *testing.T, cfg Config) *Node { - cfg.DataDir = t.TempDir() - cfg.BlockstoreType = "flatfs" - cfg.DHTRouting = DHTStandard - cfg.RoutingV1Endpoints = []string{cidContactEndpoint} - - ctx := context.Background() - - sr := util.NewTimeSeededRand() - sk, _, err := ic.GenerateKeyPairWithReader(ic.Ed25519, 2048, sr) - require.NoError(t, err) - - cdns := newCachedDNS(dnsCacheRefreshInterval) - - t.Cleanup(func() { - _ = cdns.Close() - }) - - gnd, err := Setup(ctx, cfg, sk, cdns) - require.NoError(t, err) - return gnd -} - -func mustTestServer(t *testing.T, cfg Config) (*httptest.Server, *Node) { - gnd := mustTestNode(t, cfg) - - handler, err := setupGatewayHandler(cfg, gnd) - if err != nil { - require.NoError(t, err) - } - - ts := httptest.NewServer(handler) - - return ts, gnd -} - -func mustAddFile(t *testing.T, gnd *Node, content []byte) cid.Cid { - dsrv := merkledag.NewDAGService(gnd.bsrv) - - // Create a UnixFS graph from our file, parameters described here but can be visualized at https://dag.ipfs.tech/ - ufsImportParams := uih.DagBuilderParams{ - Maxlinks: uih.DefaultLinksPerBlock, // Default max of 174 links per block - RawLeaves: true, // Leave the actual file bytes untouched instead of wrapping them in a dag-pb protobuf wrapper - CidBuilder: cid.V1Builder{ // Use CIDv1 for all links - Codec: uint64(multicodec.DagPb), - MhType: uint64(multicodec.Sha2_256), // Use SHA2-256 as the hash function - MhLength: -1, // Use the default hash length for the given hash function (in this case 256 bits) - }, - Dagserv: dsrv, - NoCopy: false, - } - ufsBuilder, err := ufsImportParams.New(chunker.NewSizeSplitter(bytes.NewReader(content), chunker.DefaultBlockSize)) // Split the file up into fixed sized 256KiB chunks - require.NoError(t, err) - - nd, err := balanced.Layout(ufsBuilder) // Arrange the graph with a balanced layout - require.NoError(t, err) - - return nd.Cid() -} - func TestTrustless(t *testing.T) { t.Parallel() diff --git a/main_test.go b/main_test.go new file mode 100644 index 0000000..211c726 --- /dev/null +++ b/main_test.go @@ -0,0 +1,94 @@ +package main + +import ( + "bytes" + "context" + "crypto/rand" + "net/http/httptest" + "testing" + + chunker "github.com/ipfs/boxo/chunker" + "github.com/ipfs/boxo/ipld/merkledag" + "github.com/ipfs/boxo/ipld/unixfs/importer/balanced" + uih "github.com/ipfs/boxo/ipld/unixfs/importer/helpers" + "github.com/ipfs/go-cid" + ic "github.com/libp2p/go-libp2p/core/crypto" + "github.com/libp2p/go-libp2p/core/peer" + "github.com/multiformats/go-multicodec" + "github.com/stretchr/testify/require" +) + +func mustTestPeer(t *testing.T) (ic.PrivKey, peer.ID) { + sk, _, err := ic.GenerateKeyPairWithReader(ic.Ed25519, 2048, rand.Reader) + require.NoError(t, err) + + pid, err := peer.IDFromPrivateKey(sk) + require.NoError(t, err) + + return sk, pid +} + +func mustTestNode(t *testing.T, cfg Config) *Node { + sk, _ := mustTestPeer(t) + return mustTestNodeWithKey(t, cfg, sk) +} + +func mustTestNodeWithKey(t *testing.T, cfg Config, sk ic.PrivKey) *Node { + // Set necessary fields if not defined. + if cfg.DataDir == "" { + cfg.DataDir = t.TempDir() + } + if cfg.BlockstoreType == "" { + cfg.BlockstoreType = "flatfs" + } + if cfg.DHTRouting == "" { + cfg.DHTRouting = DHTOff + } + + ctx := context.Background() + cdns := newCachedDNS(dnsCacheRefreshInterval) + + t.Cleanup(func() { + _ = cdns.Close() + }) + + nd, err := Setup(ctx, cfg, sk, cdns) + require.NoError(t, err) + return nd +} + +func mustTestServer(t *testing.T, cfg Config) (*httptest.Server, *Node) { + nd := mustTestNode(t, cfg) + + handler, err := setupGatewayHandler(cfg, nd) + if err != nil { + require.NoError(t, err) + } + + ts := httptest.NewServer(handler) + return ts, nd +} + +func mustAddFile(t *testing.T, gnd *Node, content []byte) cid.Cid { + dsrv := merkledag.NewDAGService(gnd.bsrv) + + // Create a UnixFS graph from our file, parameters described here but can be visualized at https://dag.ipfs.tech/ + ufsImportParams := uih.DagBuilderParams{ + Maxlinks: uih.DefaultLinksPerBlock, // Default max of 174 links per block + RawLeaves: true, // Leave the actual file bytes untouched instead of wrapping them in a dag-pb protobuf wrapper + CidBuilder: cid.V1Builder{ // Use CIDv1 for all links + Codec: uint64(multicodec.DagPb), + MhType: uint64(multicodec.Sha2_256), // Use SHA2-256 as the hash function + MhLength: -1, // Use the default hash length for the given hash function (in this case 256 bits) + }, + Dagserv: dsrv, + NoCopy: false, + } + ufsBuilder, err := ufsImportParams.New(chunker.NewSizeSplitter(bytes.NewReader(content), chunker.DefaultBlockSize)) // Split the file up into fixed sized 256KiB chunks + require.NoError(t, err) + + nd, err := balanced.Layout(ufsBuilder) // Arrange the graph with a balanced layout + require.NoError(t, err) + + return nd.Cid() +} diff --git a/setup.go b/setup.go index c4c65b3..5553163 100644 --- a/setup.go +++ b/setup.go @@ -273,11 +273,10 @@ func Setup(ctx context.Context, cfg Config, key crypto.PrivKey, dnsCache *cached } } - if len(routingV1Routers) == 0 && dhtRouter == nil { - return nil, errors.New("no routers available") - } + // Default router is no routing at all: can be especially useful during tests. + router = &routinghelpers.Null{} - if len(routingV1Routers) == 0 { + if len(routingV1Routers) == 0 && dhtRouter != nil { router = dhtRouter } else { var routers []*routinghelpers.ParallelRouter @@ -301,7 +300,9 @@ func Setup(ctx context.Context, cfg Config, key crypto.PrivKey, dnsCache *cached }) } - router = routinghelpers.NewComposableParallel(routers) + if len(routers) > 0 { + router = routinghelpers.NewComposableParallel(routers) + } } return router, nil @@ -321,19 +322,24 @@ func Setup(ctx context.Context, cfg Config, key crypto.PrivKey, dnsCache *cached } } - var pbrf bsserver.PeerBlockRequestFilter + var ( + provideEnabled bool + peerBlockRequestFilter bsserver.PeerBlockRequestFilter + ) if cfg.PeeringCache && len(cfg.Peering) > 0 { peers := make(map[peer.ID]struct{}, len(cfg.Peering)) for _, a := range cfg.Peering { peers[a.ID] = struct{}{} } - pbrf = func(p peer.ID, c cid.Cid) bool { + provideEnabled = true + peerBlockRequestFilter = func(p peer.ID, c cid.Cid) bool { _, ok := peers[p] return ok } } else { - pbrf = func(p peer.ID, c cid.Cid) bool { + provideEnabled = false + peerBlockRequestFilter = func(p peer.ID, c cid.Cid) bool { return false } } @@ -352,7 +358,8 @@ func Setup(ctx context.Context, cfg Config, key crypto.PrivKey, dnsCache *cached bitswap.WithoutDuplicatedBlockStats(), // ---- Server Options - bitswap.WithPeerBlockRequestFilter(pbrf), + bitswap.WithPeerBlockRequestFilter(peerBlockRequestFilter), + bitswap.ProvideEnabled(provideEnabled), ) bn.Start(bswap) diff --git a/setup_test.go b/setup_test.go new file mode 100644 index 0000000..abd7ac2 --- /dev/null +++ b/setup_test.go @@ -0,0 +1,150 @@ +package main + +import ( + "context" + "fmt" + "net" + "testing" + "time" + + blocks "github.com/ipfs/go-block-format" + ic "github.com/libp2p/go-libp2p/core/crypto" + "github.com/libp2p/go-libp2p/core/network" + "github.com/libp2p/go-libp2p/core/peer" + "github.com/multiformats/go-multiaddr" + "github.com/stretchr/testify/require" +) + +func mustFreePort(t *testing.T) (int, *net.TCPListener) { + addr, err := net.ResolveTCPAddr("tcp", "localhost:0") + require.NoError(t, err) + + l, err := net.ListenTCP("tcp", addr) + require.NoError(t, err) + + return l.Addr().(*net.TCPAddr).Port, l +} + +func mustFreePorts(t *testing.T, n int) []int { + ports := make([]int, 0) + for i := 0; i < n; i++ { + port, listener := mustFreePort(t) + defer listener.Close() + ports = append(ports, port) + } + + return ports +} + +func mustListenAddrWithPort(t *testing.T, port int) multiaddr.Multiaddr { + ma, err := multiaddr.NewMultiaddr(fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", port)) + require.NoError(t, err) + return ma +} + +// mustPeeredNodes creates a network of [Node]s with the given configuration. +// The configuration contains as many elements as there are nodes. Each element +// indicates to which other nodes it is connected. +// +// Example configuration: [][]int{ +// {1, 2}, +// {0}, +// {0}, +// } +// +// - Node 0 is connected to nodes 1 and 2. +// - Node 1 is connected to node 0. +// - Node 2 is connected to node 1. +func mustPeeredNodes(t *testing.T, configuration [][]int, peeringCache bool) []*Node { + n := len(configuration) + + // Generate ports, secrets keys, peer IDs and multiaddresses. + ports := mustFreePorts(t, n) + keys := make([]ic.PrivKey, n) + pids := make([]peer.ID, n) + mas := make([]multiaddr.Multiaddr, n) + addrInfos := make([]peer.AddrInfo, n) + + for i := 0; i < n; i++ { + keys[i], pids[i] = mustTestPeer(t) + mas[i] = mustListenAddrWithPort(t, ports[i]) + addrInfos[i] = peer.AddrInfo{ + ID: pids[i], + Addrs: []multiaddr.Multiaddr{mas[i]}, + } + } + + cfgs := make([]Config, n) + nodes := make([]*Node, n) + for i := 0; i < n; i++ { + cfgs[i] = Config{ + DHTRouting: DHTOff, + RoutingV1Endpoints: []string{}, + ListenAddrs: []string{mas[i].String()}, + Peering: []peer.AddrInfo{}, + PeeringCache: peeringCache, + } + + for _, j := range configuration[i] { + cfgs[i].Peering = append(cfgs[i].Peering, addrInfos[j]) + } + + nodes[i] = mustTestNodeWithKey(t, cfgs[i], keys[i]) + + t.Log("Node", i, "Addresses", nodes[i].host.Addrs(), "Peering", cfgs[i].Peering) + } + + require.Eventually(t, func() bool { + for i, node := range nodes { + for _, peer := range cfgs[i].Peering { + if node.host.Network().Connectedness(peer.ID) != network.Connected { + t.Log(node.host.Network().Connectedness(peer.ID)) + return false + } + } + } + + return true + }, time.Second*30, time.Millisecond*100) + + return nodes +} + +func TestPeering(t *testing.T) { + _ = mustPeeredNodes(t, [][]int{ + {1, 2}, + {0, 2}, + {0, 1}, + }, false) +} + +func TestPeeringCache(t *testing.T) { + nodes := mustPeeredNodes(t, [][]int{ + {1}, + {0}, + {}, + }, true) + + bl := blocks.NewBlock([]byte(string("peering-cache-test"))) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + checkBitswap := func(i int, success bool) { + ctx, cancel := context.WithTimeout(ctx, time.Second*5) + defer cancel() + + _, err := nodes[i].bsrv.GetBlock(ctx, bl.Cid()) + if success { + require.NoError(t, err) + } else { + require.Error(t, err) + } + } + + err := nodes[0].bsrv.AddBlock(ctx, bl) + require.NoError(t, err) + + checkBitswap(1, true) + checkBitswap(2, false) +}