diff --git a/test/cli/harness/harness.go b/test/cli/harness/harness.go index a35fead3512..e68116b5efc 100644 --- a/test/cli/harness/harness.go +++ b/test/cli/harness/harness.go @@ -11,6 +11,8 @@ import ( logging "github.com/ipfs/go-log/v2" . "github.com/ipfs/kubo/test/cli/testutils" + "github.com/libp2p/go-libp2p/core/peer" + "github.com/multiformats/go-multiaddr" ) // Harness tracks state for a test, such as temp dirs and IFPS nodes, and cleans them up after the test. @@ -188,3 +190,22 @@ func (h *Harness) Cleanup() { log.Panicf("removing temp dir %s: %s", h.Dir, err) } } + +// ExtractPeerID extracts a peer ID from the given multiaddr, and fatals if it does not contain a peer ID. +func (h *Harness) ExtractPeerID(m multiaddr.Multiaddr) peer.ID { + var peerIDStr string + multiaddr.ForEach(m, func(c multiaddr.Component) bool { + if c.Protocol().Code == multiaddr.P_P2P { + peerIDStr = c.Value() + } + return true + }) + if peerIDStr == "" { + panic(multiaddr.ErrProtocolNotFound) + } + peerID, err := peer.Decode(peerIDStr) + if err != nil { + panic(err) + } + return peerID +} diff --git a/test/cli/harness/log.go b/test/cli/harness/log.go new file mode 100644 index 00000000000..d76bb2747c1 --- /dev/null +++ b/test/cli/harness/log.go @@ -0,0 +1,155 @@ +package harness + +import ( + "fmt" + "path/filepath" + "runtime" + "sort" + "strings" + "sync" + "testing" + "time" +) + +type event struct { + timestamp time.Time + msg string +} + +type events []*event + +func (e events) Len() int { return len(e) } +func (e events) Less(i, j int) bool { return e[i].timestamp.Before(e[j].timestamp) } +func (e events) Swap(i, j int) { e[i], e[j] = e[j], e[i] } + +// TestLogger is a logger for tests. +// It buffers output and only writes the output if the test fails or output is explicitly turned on. +// The purpose of this logger is to allow Go test to run with the verbose flag without printing logs. +// The verbose flag is useful since it streams test progress, but also printing logs makes the output too verbose. +// +// You can also add prefixes that are prepended to each log message, for extra logging context. +// +// This is implemented as a hierarchy of loggers, with children flushing log entries back to parents. +// This works because t.Cleanup() processes entries in LIFO order, so children always flush first. +// +// Obviously this logger should never be used in production systems. +type TestLogger struct { + parent *TestLogger + children []*TestLogger + prefixes []string + prefixesIface []any + t *testing.T + buf events + m sync.Mutex + logsEnabled bool +} + +func NewTestLogger(t *testing.T) *TestLogger { + l := &TestLogger{t: t, buf: make(events, 0)} + t.Cleanup(l.flush) + return l +} + +func (t *TestLogger) buildPrefix(timestamp time.Time) string { + d := timestamp.Format("2006-01-02T15:04:05.999999") + _, file, lineno, _ := runtime.Caller(2) + file = filepath.Base(file) + caller := fmt.Sprintf("%s:%d", file, lineno) + + if len(t.prefixes) == 0 { + return fmt.Sprintf("%s\t%s\t", d, caller) + } + + prefixes := strings.Join(t.prefixes, ":") + return fmt.Sprintf("%s\t%s\t%s: ", d, caller, prefixes) +} + +func (t *TestLogger) Log(args ...any) { + timestamp := time.Now() + e := t.buildPrefix(timestamp) + fmt.Sprint(args...) + t.add(&event{timestamp: timestamp, msg: e}) +} + +func (t *TestLogger) Logf(format string, args ...any) { + timestamp := time.Now() + e := t.buildPrefix(timestamp) + fmt.Sprintf(format, args...) + t.add(&event{timestamp: timestamp, msg: e}) +} + +func (t *TestLogger) Fatal(args ...any) { + timestamp := time.Now() + e := t.buildPrefix(timestamp) + fmt.Sprint(append([]any{"fatal: "}, args...)...) + t.add(&event{timestamp: timestamp, msg: e}) + t.t.FailNow() +} + +func (t *TestLogger) Fatalf(format string, args ...any) { + timestamp := time.Now() + e := t.buildPrefix(timestamp) + fmt.Sprintf(fmt.Sprintf("fatal: %s", format), args...) + t.add(&event{timestamp: timestamp, msg: e}) + t.t.FailNow() +} + +func (t *TestLogger) add(e *event) { + t.m.Lock() + defer t.m.Unlock() + t.buf = append(t.buf, e) +} + +func (t *TestLogger) AddPrefix(prefix string) *TestLogger { + l := &TestLogger{ + prefixes: append(t.prefixes, prefix), + prefixesIface: append(t.prefixesIface, prefix), + t: t.t, + parent: t, + logsEnabled: t.logsEnabled, + } + t.m.Lock() + defer t.m.Unlock() + + t.children = append(t.children, l) + t.t.Cleanup(l.flush) + + return l +} + +func (t *TestLogger) EnableLogs() { + t.m.Lock() + defer t.m.Unlock() + t.logsEnabled = true + if t.parent != nil { + if t.parent.logsEnabled { + t.parent.EnableLogs() + } + } + fmt.Printf("enabling %d children\n", len(t.children)) + for _, c := range t.children { + if !c.logsEnabled { + c.EnableLogs() + } + } +} + +func (t *TestLogger) flush() { + if t.t.Failed() || t.logsEnabled { + t.m.Lock() + defer t.m.Unlock() + // if this is a child, send the events to the parent + // the root parent will print all the events in sorted order + if t.parent != nil { + for _, e := range t.buf { + t.parent.add(e) + } + } else { + // we're the root, sort all the events and then print them + sort.Sort(t.buf) + fmt.Println() + fmt.Printf("Logs for test %q:\n\n", t.t.Name()) + for _, e := range t.buf { + fmt.Println(e.msg) + } + fmt.Println() + } + t.buf = nil + } +} diff --git a/test/cli/harness/node.go b/test/cli/harness/node.go index cc251e11b0f..f740ab1b19f 100644 --- a/test/cli/harness/node.go +++ b/test/cli/harness/node.go @@ -453,9 +453,8 @@ func (n *Node) Peers() []multiaddr.Multiaddr { Path: n.IPFSBin, Args: []string{"swarm", "peers"}, }) - lines := strings.Split(strings.TrimSpace(res.Stdout.String()), "\n") var addrs []multiaddr.Multiaddr - for _, line := range lines { + for _, line := range res.Stdout.Lines() { ma, err := multiaddr.NewMultiaddr(line) if err != nil { panic(err) @@ -465,6 +464,28 @@ func (n *Node) Peers() []multiaddr.Multiaddr { return addrs } +func (n *Node) PeerWith(other *Node) { + n.UpdateConfig(func(cfg *config.Config) { + var addrs []multiaddr.Multiaddr + for _, addrStr := range other.ReadConfig().Addresses.Swarm { + ma, err := multiaddr.NewMultiaddr(addrStr) + if err != nil { + panic(err) + } + addrs = append(addrs, ma) + } + + cfg.Peering.Peers = append(cfg.Peering.Peers, peer.AddrInfo{ + ID: other.PeerID(), + Addrs: addrs, + }) + }) +} + +func (n *Node) Disconnect(other *Node) { + n.IPFS("swarm", "disconnect", "/p2p/"+other.PeerID().String()) +} + // GatewayURL waits for the gateway file and then returns its contents or times out. func (n *Node) GatewayURL() string { timer := time.NewTimer(1 * time.Second) diff --git a/test/cli/harness/nodes.go b/test/cli/harness/nodes.go index 872d7767913..78662afbbea 100644 --- a/test/cli/harness/nodes.go +++ b/test/cli/harness/nodes.go @@ -3,6 +3,7 @@ package harness import ( "sync" + . "github.com/ipfs/kubo/test/cli/testutils" "github.com/multiformats/go-multiaddr" "golang.org/x/sync/errgroup" ) @@ -11,9 +12,7 @@ import ( type Nodes []*Node func (n Nodes) Init(args ...string) Nodes { - for _, node := range n { - node.Init() - } + ForEachPar(n, func(node *Node) { node.Init(args...) }) return n } @@ -59,22 +58,11 @@ func (n Nodes) Connect() Nodes { } func (n Nodes) StartDaemons() Nodes { - wg := sync.WaitGroup{} - for _, node := range n { - wg.Add(1) - node := node - go func() { - defer wg.Done() - node.StartDaemon() - }() - } - wg.Wait() + ForEachPar(n, func(node *Node) { node.StartDaemon() }) return n } func (n Nodes) StopDaemons() Nodes { - for _, node := range n { - node.StopDaemon() - } + ForEachPar(n, func(node *Node) { node.StopDaemon() }) return n } diff --git a/test/cli/peering_test.go b/test/cli/peering_test.go new file mode 100644 index 00000000000..f3e797fae80 --- /dev/null +++ b/test/cli/peering_test.go @@ -0,0 +1,141 @@ +package cli + +import ( + "fmt" + "math/rand" + "testing" + "time" + + "github.com/ipfs/kubo/config" + "github.com/ipfs/kubo/test/cli/harness" + . "github.com/ipfs/kubo/test/cli/testutils" + "github.com/libp2p/go-libp2p/core/peer" + "github.com/stretchr/testify/assert" +) + +func TestPeering(t *testing.T) { + t.Parallel() + + type peering struct { + from int + to int + } + + newRandPort := func() int { + n := rand.Int() + return 3000 + (n % 1000) + } + + containsPeerID := func(p peer.ID, peers []peer.ID) bool { + for _, peerID := range peers { + if p == peerID { + return true + } + } + return false + } + + assertPeered := func(h *harness.Harness, from *harness.Node, to *harness.Node) { + assert.Eventuallyf(t, func() bool { + fromPeers := from.Peers() + if len(fromPeers) == 0 { + return false + } + var fromPeerIDs []peer.ID + for _, p := range fromPeers { + fromPeerIDs = append(fromPeerIDs, h.ExtractPeerID(p)) + } + return containsPeerID(to.PeerID(), fromPeerIDs) + }, 20*time.Second, 10*time.Millisecond, "%d -> %d not peered", from.ID, to.ID) + } + + assertNotPeered := func(h *harness.Harness, from *harness.Node, to *harness.Node) { + assert.Eventuallyf(t, func() bool { + fromPeers := from.Peers() + if len(fromPeers) == 0 { + return false + } + var fromPeerIDs []peer.ID + for _, p := range fromPeers { + fromPeerIDs = append(fromPeerIDs, h.ExtractPeerID(p)) + } + return !containsPeerID(to.PeerID(), fromPeerIDs) + }, 20*time.Second, 10*time.Millisecond, "%d -> %d peered", from.ID, to.ID) + } + + assertPeerings := func(h *harness.Harness, nodes []*harness.Node, peerings []peering) { + ForEachPar(peerings, func(peering peering) { + assertPeered(h, nodes[peering.from], nodes[peering.to]) + }) + } + + createNodes := func(t *testing.T, n int, peerings []peering) (*harness.Harness, harness.Nodes) { + h := harness.NewT(t) + nodes := h.NewNodes(n).Init() + nodes.ForEachPar(func(node *harness.Node) { + node.UpdateConfig(func(cfg *config.Config) { + cfg.Routing.Type = config.NewOptionalString("none") + cfg.Addresses.Swarm = []string{fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", newRandPort())} + }) + + }) + + for _, peering := range peerings { + nodes[peering.from].PeerWith(nodes[peering.to]) + } + + return h, nodes + } + + t.Run("bidirectional peering should work (simultaneous connect)", func(t *testing.T) { + t.Parallel() + peerings := []peering{{from: 0, to: 1}, {from: 1, to: 0}, {from: 1, to: 2}} + h, nodes := createNodes(t, 3, peerings) + + nodes.StartDaemons() + assertPeerings(h, nodes, peerings) + + nodes[0].Disconnect(nodes[1]) + assertPeerings(h, nodes, peerings) + }) + + t.Run("1 should reconnect to 2 when 2 disconnects from 1", func(t *testing.T) { + t.Parallel() + peerings := []peering{{from: 0, to: 1}, {from: 1, to: 0}, {from: 1, to: 2}} + h, nodes := createNodes(t, 3, peerings) + + nodes.StartDaemons() + assertPeerings(h, nodes, peerings) + + nodes[2].Disconnect(nodes[1]) + assertPeerings(h, nodes, peerings) + }) + + t.Run("1 will peer with 2 when it comes online", func(t *testing.T) { + t.Parallel() + peerings := []peering{{from: 0, to: 1}, {from: 1, to: 0}, {from: 1, to: 2}} + h, nodes := createNodes(t, 3, peerings) + + nodes[0].StartDaemon() + nodes[1].StartDaemon() + assertPeerings(h, nodes, []peering{{from: 0, to: 1}, {from: 1, to: 0}}) + + nodes[2].StartDaemon() + assertPeerings(h, nodes, peerings) + }) + + t.Run("1 will re-peer with 2 when it disconnects and then comes back online", func(t *testing.T) { + t.Parallel() + peerings := []peering{{from: 0, to: 1}, {from: 1, to: 0}, {from: 1, to: 2}} + h, nodes := createNodes(t, 3, peerings) + + nodes.StartDaemons() + assertPeerings(h, nodes, peerings) + + nodes[2].StopDaemon() + assertNotPeered(h, nodes[1], nodes[2]) + + nodes[2].StartDaemon() + assertPeerings(h, nodes, peerings) + }) +} diff --git a/test/cli/testutils/strings.go b/test/cli/testutils/strings.go index 1fb1512485e..110051e679f 100644 --- a/test/cli/testutils/strings.go +++ b/test/cli/testutils/strings.go @@ -7,6 +7,7 @@ import ( "net/netip" "net/url" "strings" + "sync" "github.com/multiformats/go-multiaddr" manet "github.com/multiformats/go-multiaddr/net" @@ -75,3 +76,16 @@ func URLStrToMultiaddr(u string) multiaddr.Multiaddr { } return ma } + +// ForEachPar invokes f in a new goroutine for each element of s and waits for all to complete. +func ForEachPar[T any](s []T, f func(T)) { + wg := sync.WaitGroup{} + wg.Add(len(s)) + for _, x := range s { + go func(x T) { + defer wg.Done() + f(x) + }(x) + } + wg.Wait() +} diff --git a/test/sharness/t0171-peering.sh b/test/sharness/t0171-peering.sh deleted file mode 100755 index 207b279803a..00000000000 --- a/test/sharness/t0171-peering.sh +++ /dev/null @@ -1,127 +0,0 @@ -#!/usr/bin/env bash - -test_description="Test peering service" - -. lib/test-lib.sh - -NUM_NODES=3 - -test_expect_success 'init iptb' ' - rm -rf .iptb/ && - iptb testbed create -type localipfs -count $NUM_NODES -init -' - -test_expect_success 'disabling routing' ' - iptb run -- ipfs config Routing.Type none -' - -for i in $(seq 0 2); do - ADDR="$(printf '["/ip4/127.0.0.1/tcp/%s"]' "$(( 3000 + ( RANDOM % 1000 ) ))")" - test_expect_success "configuring node $i to listen on $ADDR" ' - ipfsi "$i" config --json Addresses.Swarm "$ADDR" - ' -done - -peer_id() { - ipfsi "$1" config Identity.PeerID -} - -peer_addrs() { - ipfsi "$1" config Addresses.Swarm -} - -peer() { - PEER1="$1" && - PEER2="$2" && - PEER_LIST="$(ipfsi "$PEER1" config Peering.Peers || true)" && - { [[ "$PEER_LIST" == "null" ]] || PEER_LIST_INNER="${PEER_LIST:1:-1}"; } && - ADDR_INFO="$(printf '[%s{"ID": "%s", "Addrs": %s}]' \ - "${PEER_LIST_INNER:+${PEER_LIST_INNER},}" \ - "$(peer_id "$PEER2")" \ - "$(peer_addrs "$PEER2")")" && - ipfsi "$PEER1" config --json Peering.Peers "${ADDR_INFO}" -} - -# Peer: -# - 0 <-> 1 -# - 1 -> 2 -test_expect_success 'configure peering' ' - peer 0 1 && - peer 1 0 && - peer 1 2 -' - -list_peers() { - ipfsi "$1" swarm peers | sed 's|.*/p2p/\([^/]*\)$|\1|' | sort -u -} - -check_peers() { - sleep 20 # give it some time to settle. - test_expect_success 'verifying peering for peer 0' ' - list_peers 0 > peers_0_actual && - peer_id 1 > peers_0_expected && - test_cmp peers_0_expected peers_0_actual - ' - - test_expect_success 'verifying peering for peer 1' ' - list_peers 1 > peers_1_actual && - { peer_id 0 && peer_id 2 ; } | sort -u > peers_1_expected && - test_cmp peers_1_expected peers_1_actual - ' - - test_expect_success 'verifying peering for peer 2' ' - list_peers 2 > peers_2_actual && - peer_id 1 > peers_2_expected && - test_cmp peers_2_expected peers_2_actual - ' -} - -test_expect_success 'startup cluster' ' - iptb start -wait && - iptb run -- ipfs log level peering debug -' - -check_peers - -disconnect() { - ipfsi "$1" swarm disconnect "/p2p/$(peer_id "$2")" -} - -# Bidirectional peering shouldn't cause problems (e.g., simultaneous connect -# issues). -test_expect_success 'disconnecting 0->1' ' - disconnect 0 1 -' - -check_peers - -# 1 should reconnect to 2 when 2 disconnects from 1. -test_expect_success 'disconnecting 2->1' ' - disconnect 2 1 -' - -check_peers - -# 2 isn't peering. This test ensures that 1 will re-peer with 2 when it comes -# back online. -test_expect_success 'stopping 2' ' - iptb stop 2 -' - -# Wait to disconnect -sleep 30 - -test_expect_success 'starting 2' ' - iptb start 2 -' - -# Wait for backoff -sleep 30 - -check_peers - -test_expect_success "stop testbed" ' - iptb stop -' - -test_done