Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: purge peer connections and information #194

Merged
merged 5 commits into from
Nov 4, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ The following emojis are used to highlight certain changes:

### Added

- Added endpoints to show and purge connected peers [#194](https://github.com/ipfs/rainbow/pull/194)

### Changed

### Removed
Expand Down
13 changes: 13 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,19 @@ possible to dynamically modify the logging at runtime.
- `http://$RAINBOW_CTL_LISTEN_ADDRESS/mgr/log/level?subsystem=<system name or * for all system>&level=<level>` will set the logging level for a subsystem
- `http://$RAINBOW_CTL_LISTEN_ADDRESS/mgr/log/ls` will return a comma separated list of available logging subsystems

## Purging Peer Connections

Connections to a specific peer, or to all peers, can be closed and the peer information removed from the peer store. This can be useful to help determine if the presence/absence of a connection to a peer is affecting behavior. Be aware that purging a connection is inherently racey as it is possible for the peer to reestablish a connection at any time following a purge.

gammazero marked this conversation as resolved.
Show resolved Hide resolved
- `http://$RAINBOW_CTL_LISTEN_ADDRESS/mgr/purge?peer=<peer_id>` purges connection and info for peer identifid by peer_id
- `http://$RAINBOW_CTL_LISTEN_ADDRESS/mgr/purge?peer=all` purges connections and info for all peers
- `http://$RAINBOW_CTL_LISTEN_ADDRESS/mgr/peers` returns a list of currently connected peers

Example cURL commmand to show connected peers and purge peer connection:

curl http://127.0.0.1:8091/mgr/peers
curl http://127.0.0.1:8091/mgr/purge?peer=QmQzqxhK82kAmKvARFZSkUVS6fo9sySaiogAnx5EnZ6ZmC

## Tracing

See [docs/tracing.md](docs/tracing.md).
Expand Down
109 changes: 108 additions & 1 deletion handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
"github.com/ipfs/boxo/blockstore"
leveldb "github.com/ipfs/go-ds-leveldb"
"github.com/ipfs/go-log/v2"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/peer"

_ "embed"
_ "net/http/pprof"
Expand Down Expand Up @@ -72,7 +74,7 @@
})
}

func GCHandler(gnd *Node) func(w http.ResponseWriter, r *http.Request) {
func gcHandler(gnd *Node) func(w http.ResponseWriter, r *http.Request) {

Check warning on line 77 in handlers.go

View check run for this annotation

Codecov / codecov/patch

handlers.go#L77

Added line #L77 was not covered by tests
return func(w http.ResponseWriter, r *http.Request) {
defer r.Body.Close()

Expand All @@ -92,6 +94,111 @@
}
}

func purgePeerHandler(p2pHost host.Host) func(w http.ResponseWriter, r *http.Request) {
return func(w http.ResponseWriter, r *http.Request) {
defer r.Body.Close()

q := r.URL.Query()
peerIDStr := q.Get("peer")
if peerIDStr == "" {
http.Error(w, "missing peer id", http.StatusBadRequest)
return
}

Check warning on line 106 in handlers.go

View check run for this annotation

Codecov / codecov/patch

handlers.go#L97-L106

Added lines #L97 - L106 were not covered by tests

if peerIDStr == "all" {
purgeCount, err := purgeAllConnections(p2pHost)
if err != nil {
goLog.Errorw("Error closing all libp2p connections", "err", err)
http.Error(w, "error closing connections", http.StatusInternalServerError)
return
}
goLog.Infow("Purged connections", "count", purgeCount)

h := w.Header()
h.Set("Content-Type", "text/plain; charset=utf-8")
fmt.Fprintln(w, "Peer connections purged:", purgeCount)
return

Check warning on line 120 in handlers.go

View check run for this annotation

Codecov / codecov/patch

handlers.go#L108-L120

Added lines #L108 - L120 were not covered by tests
}

peerID, err := peer.Decode(peerIDStr)
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}

Check warning on line 127 in handlers.go

View check run for this annotation

Codecov / codecov/patch

handlers.go#L123-L127

Added lines #L123 - L127 were not covered by tests

err = purgeConnection(p2pHost, peerID)
if err != nil {
goLog.Errorw("Error closing libp2p connection", "err", err, "peer", peerID)
http.Error(w, "error closing connection", http.StatusInternalServerError)
return
}
goLog.Infow("Purged connection", "peer", peerID)

h := w.Header()
h.Set("Content-Type", "text/plain; charset=utf-8")
fmt.Fprintln(w, "Purged connection to peer", peerID)

Check warning on line 139 in handlers.go

View check run for this annotation

Codecov / codecov/patch

handlers.go#L129-L139

Added lines #L129 - L139 were not covered by tests
}
}

func purgeConnection(p2pHost host.Host, peerID peer.ID) error {
peerStore := p2pHost.Peerstore()
if peerStore != nil {
peerStore.RemovePeer(peerID)
peerStore.ClearAddrs(peerID)
}
return p2pHost.Network().ClosePeer(peerID)

Check warning on line 149 in handlers.go

View check run for this annotation

Codecov / codecov/patch

handlers.go#L143-L149

Added lines #L143 - L149 were not covered by tests
}

func purgeAllConnections(p2pHost host.Host) (int, error) {
net := p2pHost.Network()
peers := net.Peers()

peerStore := p2pHost.Peerstore()
if peerStore != nil {
for _, peerID := range peers {
peerStore.RemovePeer(peerID)
peerStore.ClearAddrs(peerID)
}

Check warning on line 161 in handlers.go

View check run for this annotation

Codecov / codecov/patch

handlers.go#L152-L161

Added lines #L152 - L161 were not covered by tests
}

var errCount, purgeCount int
for _, peerID := range peers {
err := net.ClosePeer(peerID)
if err != nil {
goLog.Errorw("Closing libp2p connection", "err", err, "peer", peerID)
errCount++
} else {
purgeCount++
}

Check warning on line 172 in handlers.go

View check run for this annotation

Codecov / codecov/patch

handlers.go#L164-L172

Added lines #L164 - L172 were not covered by tests
}

if errCount != 0 {
return 0, fmt.Errorf("error closing connections to %d peers", errCount)
}

Check warning on line 177 in handlers.go

View check run for this annotation

Codecov / codecov/patch

handlers.go#L175-L177

Added lines #L175 - L177 were not covered by tests

return purgeCount, nil

Check warning on line 179 in handlers.go

View check run for this annotation

Codecov / codecov/patch

handlers.go#L179

Added line #L179 was not covered by tests
}

func showPeersHandler(p2pHost host.Host) func(w http.ResponseWriter, r *http.Request) {
return func(w http.ResponseWriter, r *http.Request) {
defer r.Body.Close()

h := w.Header()
h.Set("Content-Type", "text/plain; charset=utf-8")

peers := p2pHost.Network().Peers()
if len(peers) == 0 {
fmt.Fprintln(w, "no connected peers")
return
}

Check warning on line 193 in handlers.go

View check run for this annotation

Codecov / codecov/patch

handlers.go#L182-L193

Added lines #L182 - L193 were not covered by tests

fmt.Fprintln(w, "Connected peers:", len(peers))
gammazero marked this conversation as resolved.
Show resolved Hide resolved
for _, peerID := range peers {
fmt.Fprintln(w, peerID.String())
}

Check warning on line 198 in handlers.go

View check run for this annotation

Codecov / codecov/patch

handlers.go#L195-L198

Added lines #L195 - L198 were not covered by tests
}
}

func withConnect(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
// ServeMux does not support requests with CONNECT method,
Expand Down
4 changes: 3 additions & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -589,7 +589,9 @@
otel.SetTextMapPropagator(autoprop.NewTextMapPropagator())

apiMux := makeMetricsAndDebuggingHandler()
apiMux.HandleFunc("/mgr/gc", GCHandler(gnd))
apiMux.HandleFunc("/mgr/gc", gcHandler(gnd))
apiMux.HandleFunc("/mgr/purge", purgePeerHandler(gnd.host))
apiMux.HandleFunc("/mgr/peers", showPeersHandler(gnd.host))

Check warning on line 594 in main.go

View check run for this annotation

Codecov / codecov/patch

main.go#L592-L594

Added lines #L592 - L594 were not covered by tests
addLogHandlers(apiMux)

apiSrv := &http.Server{
Expand Down
Loading