Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: purge peer connections and information #194

Merged
merged 5 commits into from
Nov 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ The following emojis are used to highlight certain changes:

### Added

- Added endpoints to show and purge connected peers [#194](https://github.com/ipfs/rainbow/pull/194)

### Changed

### Removed
Expand Down
15 changes: 15 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,21 @@ possible to dynamically modify the logging at runtime.
- `http://$RAINBOW_CTL_LISTEN_ADDRESS/mgr/log/level?subsystem=<system name or * for all system>&level=<level>` will set the logging level for a subsystem
- `http://$RAINBOW_CTL_LISTEN_ADDRESS/mgr/log/ls` will return a comma separated list of available logging subsystems

## Purging Peer Connections

Connections to a specific peer, or to all peers, can be closed and the peer information removed from the peer store. This can be useful to help determine if the presence/absence of a connection to a peer is affecting behavior. Be aware that purging a connection is inherently racey as it is possible for the peer to reestablish a connection at any time following a purge.

gammazero marked this conversation as resolved.
Show resolved Hide resolved
If `RAINBOW_DHT_SHARED_HOST=false` this endpoint will not show peers connected to DHT host, and only list ones used for Bitswap.

- `http://$RAINBOW_CTL_LISTEN_ADDRESS/mgr/purge?peer=<peer_id>` purges connection and info for peer identifid by peer_id
- `http://$RAINBOW_CTL_LISTEN_ADDRESS/mgr/purge?peer=all` purges connections and info for all peers
- `http://$RAINBOW_CTL_LISTEN_ADDRESS/mgr/peers` returns a list of currently connected peers

Example cURL commmand to show connected peers and purge peer connection:

curl http://127.0.0.1:8091/mgr/peers
curl http://127.0.0.1:8091/mgr/purge?peer=QmQzqxhK82kAmKvARFZSkUVS6fo9sySaiogAnx5EnZ6ZmC

## Tracing

See [docs/tracing.md](docs/tracing.md).
Expand Down
117 changes: 116 additions & 1 deletion handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
"github.com/ipfs/boxo/blockstore"
leveldb "github.com/ipfs/go-ds-leveldb"
"github.com/ipfs/go-log/v2"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/peer"

_ "embed"
_ "net/http/pprof"
Expand Down Expand Up @@ -72,7 +74,7 @@
})
}

func GCHandler(gnd *Node) func(w http.ResponseWriter, r *http.Request) {
func gcHandler(gnd *Node) func(w http.ResponseWriter, r *http.Request) {

Check warning on line 77 in handlers.go

View check run for this annotation

Codecov / codecov/patch

handlers.go#L77

Added line #L77 was not covered by tests
return func(w http.ResponseWriter, r *http.Request) {
defer r.Body.Close()

Expand All @@ -92,6 +94,119 @@
}
}

func purgePeerHandler(p2pHost host.Host) func(w http.ResponseWriter, r *http.Request) {
return func(w http.ResponseWriter, r *http.Request) {
defer r.Body.Close()

q := r.URL.Query()
peerIDStr := q.Get("peer")
if peerIDStr == "" {
http.Error(w, "missing peer id", http.StatusBadRequest)
return
}

Check warning on line 106 in handlers.go

View check run for this annotation

Codecov / codecov/patch

handlers.go#L97-L106

Added lines #L97 - L106 were not covered by tests

if peerIDStr == "all" {
purgeCount, err := purgeAllConnections(p2pHost)
if err != nil {
goLog.Errorw("Error closing all libp2p connections", "err", err)
http.Error(w, "error closing connections", http.StatusInternalServerError)
return
}
goLog.Infow("Purged connections", "count", purgeCount)

w.Header().Set("Content-Type", "text/plain; charset=utf-8")
fmt.Fprintln(w, "Peer connections purged:", purgeCount)
return

Check warning on line 119 in handlers.go

View check run for this annotation

Codecov / codecov/patch

handlers.go#L108-L119

Added lines #L108 - L119 were not covered by tests
}

peerID, err := peer.Decode(peerIDStr)
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}

Check warning on line 126 in handlers.go

View check run for this annotation

Codecov / codecov/patch

handlers.go#L122-L126

Added lines #L122 - L126 were not covered by tests

err = purgeConnection(p2pHost, peerID)
if err != nil {
goLog.Errorw("Error closing libp2p connection", "err", err, "peer", peerID)
http.Error(w, "error closing connection", http.StatusInternalServerError)
return
}
goLog.Infow("Purged connection", "peer", peerID)

w.Header().Set("Content-Type", "text/plain; charset=utf-8")
fmt.Fprintln(w, "Purged connection to peer", peerID)

Check warning on line 137 in handlers.go

View check run for this annotation

Codecov / codecov/patch

handlers.go#L128-L137

Added lines #L128 - L137 were not covered by tests
}
}

func purgeConnection(p2pHost host.Host, peerID peer.ID) error {
peerStore := p2pHost.Peerstore()
if peerStore != nil {
peerStore.RemovePeer(peerID)
peerStore.ClearAddrs(peerID)
}
return p2pHost.Network().ClosePeer(peerID)

Check warning on line 147 in handlers.go

View check run for this annotation

Codecov / codecov/patch

handlers.go#L141-L147

Added lines #L141 - L147 were not covered by tests
}

func purgeAllConnections(p2pHost host.Host) (int, error) {
net := p2pHost.Network()
peers := net.Peers()

peerStore := p2pHost.Peerstore()
if peerStore != nil {
for _, peerID := range peers {
peerStore.RemovePeer(peerID)
peerStore.ClearAddrs(peerID)
}

Check warning on line 159 in handlers.go

View check run for this annotation

Codecov / codecov/patch

handlers.go#L150-L159

Added lines #L150 - L159 were not covered by tests
}

var errCount, purgeCount int
for _, peerID := range peers {
err := net.ClosePeer(peerID)
if err != nil {
goLog.Errorw("Closing libp2p connection", "err", err, "peer", peerID)
errCount++
} else {
purgeCount++
}

Check warning on line 170 in handlers.go

View check run for this annotation

Codecov / codecov/patch

handlers.go#L162-L170

Added lines #L162 - L170 were not covered by tests
}

if errCount != 0 {
return 0, fmt.Errorf("error closing connections to %d peers", errCount)
}

Check warning on line 175 in handlers.go

View check run for this annotation

Codecov / codecov/patch

handlers.go#L173-L175

Added lines #L173 - L175 were not covered by tests

return purgeCount, nil

Check warning on line 177 in handlers.go

View check run for this annotation

Codecov / codecov/patch

handlers.go#L177

Added line #L177 was not covered by tests
}

func showPeersHandler(p2pHost host.Host) func(w http.ResponseWriter, r *http.Request) {
return func(w http.ResponseWriter, r *http.Request) {
defer r.Body.Close()

w.Header().Set("Content-Type", "application/json; charset=utf-8")

peers := p2pHost.Network().Peers()
body := struct {
Count int64
Peers []string
}{
Count: int64(len(peers)),
}

if len(peers) != 0 {
peerStrs := make([]string, len(peers))
for i, peerID := range peers {
peerStrs[i] = peerID.String()
}
body.Peers = peerStrs

Check warning on line 199 in handlers.go

View check run for this annotation

Codecov / codecov/patch

handlers.go#L180-L199

Added lines #L180 - L199 were not covered by tests
}

enc := json.NewEncoder(w)
if err := enc.Encode(body); err != nil {
goLog.Errorw("cannot write response", "err", err)
http.Error(w, "", http.StatusInternalServerError)
}

Check warning on line 206 in handlers.go

View check run for this annotation

Codecov / codecov/patch

handlers.go#L202-L206

Added lines #L202 - L206 were not covered by tests
}
}

func withConnect(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
// ServeMux does not support requests with CONNECT method,
Expand Down
4 changes: 3 additions & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -589,7 +589,9 @@
otel.SetTextMapPropagator(autoprop.NewTextMapPropagator())

apiMux := makeMetricsAndDebuggingHandler()
apiMux.HandleFunc("/mgr/gc", GCHandler(gnd))
apiMux.HandleFunc("/mgr/gc", gcHandler(gnd))
apiMux.HandleFunc("/mgr/purge", purgePeerHandler(gnd.host))
apiMux.HandleFunc("/mgr/peers", showPeersHandler(gnd.host))

Check warning on line 594 in main.go

View check run for this annotation

Codecov / codecov/patch

main.go#L592-L594

Added lines #L592 - L594 were not covered by tests
addLogHandlers(apiMux)

apiSrv := &http.Server{
Expand Down
Loading