Skip to content

Commit

Permalink
[client] Fix controller re-connection (#2758)
Browse files Browse the repository at this point in the history
Rethink the peer reconnection implementation
  • Loading branch information
pappz authored Oct 24, 2024
1 parent 869537c commit 4e918e5
Show file tree
Hide file tree
Showing 29 changed files with 814 additions and 524 deletions.
7 changes: 2 additions & 5 deletions .github/workflows/golang-test-linux.yml
Original file line number Diff line number Diff line change
Expand Up @@ -79,9 +79,6 @@ jobs:
- name: check git status
run: git --no-pager diff --exit-code

- name: Generate Iface Test bin
run: CGO_ENABLED=0 go test -c -o iface-testing.bin ./client/iface/

- name: Generate Shared Sock Test bin
run: CGO_ENABLED=0 go test -c -o sharedsock-testing.bin ./sharedsock

Expand All @@ -98,15 +95,15 @@ jobs:
run: CGO_ENABLED=1 go test -c -o engine-testing.bin ./client/internal

- name: Generate Peer Test bin
run: CGO_ENABLED=0 go test -c -o peer-testing.bin ./client/internal/peer/...
run: CGO_ENABLED=0 go test -c -o peer-testing.bin ./client/internal/peer/

- run: chmod +x *testing.bin

- name: Run Shared Sock tests in docker
run: docker run -t --cap-add=NET_ADMIN --privileged --rm -v $PWD:/ci -w /ci/sharedsock --entrypoint /busybox/sh gcr.io/distroless/base:debug -c /ci/sharedsock-testing.bin -test.timeout 5m -test.parallel 1

- name: Run Iface tests in docker
run: docker run -t --cap-add=NET_ADMIN --privileged --rm -v $PWD:/ci -w /ci/iface --entrypoint /busybox/sh gcr.io/distroless/base:debug -c /ci/iface-testing.bin -test.timeout 5m -test.parallel 1
run: docker run -t --cap-add=NET_ADMIN --privileged --rm -v $PWD:/netbird -v /tmp/cache:/tmp/cache -v /tmp/modcache:/tmp/modcache -w /netbird -e GOCACHE=/tmp/cache -e GOMODCACHE=/tmp/modcache -e CGO_ENABLED=0 golang:1.23-alpine go test -test.timeout 5m -test.parallel 1 ./client/iface/...

- name: Run RouteManager tests in docker
run: docker run -t --cap-add=NET_ADMIN --privileged --rm -v $PWD:/ci -w /ci/client/internal/routemanager --entrypoint /busybox/sh gcr.io/distroless/base:debug -c /ci/routemanager-testing.bin -test.timeout 5m -test.parallel 1
Expand Down
1 change: 0 additions & 1 deletion client/iface/bind/ice_bind.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,6 @@ func (b *ICEBind) Send(bufs [][]byte, ep wgConn.Endpoint) error {
conn, ok := b.endpoints[ep.DstIP()]
b.endpointsMu.Unlock()
if !ok {
log.Infof("failed to find endpoint for %s", ep.DstIP())
return b.StdNetBind.Send(bufs, ep)
}

Expand Down
4 changes: 2 additions & 2 deletions client/iface/wgproxy/ebpf/portlookup.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@ import (
"net"
)

const (
var (
portRangeStart = 3128
portRangeEnd = 3228
portRangeEnd = portRangeStart + 100
)

type portLookup struct {
Expand Down
3 changes: 3 additions & 0 deletions client/iface/wgproxy/ebpf/portlookup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ func Test_portLookup_searchFreePort(t *testing.T) {
func Test_portLookup_on_allocated(t *testing.T) {
pl := portLookup{}

portRangeStart = 4128
portRangeEnd = portRangeStart + 100

allocatedPort, err := allocatePort(portRangeStart)
if err != nil {
t.Fatal(err)
Expand Down
2 changes: 2 additions & 0 deletions client/iface/wgproxy/factory_kernel.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,11 @@ func NewKernelFactory(wgPort int) *KernelFactory {

ebpfProxy := ebpf.NewWGEBPFProxy(wgPort)
if err := ebpfProxy.Listen(); err != nil {
log.Infof("WireGuard Proxy Factory will produce UDP proxy")
log.Warnf("failed to initialize ebpf proxy, fallback to user space proxy: %s", err)
return f
}
log.Infof("WireGuard Proxy Factory will produce eBPF proxy")
f.ebpfProxy = ebpfProxy
return f
}
Expand Down
3 changes: 3 additions & 0 deletions client/iface/wgproxy/factory_kernel_freebsd.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package wgproxy

import (
log "github.com/sirupsen/logrus"

udpProxy "github.com/netbirdio/netbird/client/iface/wgproxy/udp"
)

Expand All @@ -10,6 +12,7 @@ type KernelFactory struct {
}

func NewKernelFactory(wgPort int) *KernelFactory {
log.Infof("WireGuard Proxy Factory will produce UDP proxy")
f := &KernelFactory{
wgPort: wgPort,
}
Expand Down
3 changes: 3 additions & 0 deletions client/iface/wgproxy/factory_usp.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package wgproxy

import (
log "github.com/sirupsen/logrus"

"github.com/netbirdio/netbird/client/iface/bind"
proxyBind "github.com/netbirdio/netbird/client/iface/wgproxy/bind"
)
Expand All @@ -10,6 +12,7 @@ type USPFactory struct {
}

func NewUSPFactory(iceBind *bind.ICEBind) *USPFactory {
log.Infof("WireGuard Proxy Factory will produce bind proxy")
f := &USPFactory{
bind: iceBind,
}
Expand Down
28 changes: 20 additions & 8 deletions client/iface/wgproxy/udp/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,16 @@ package udp

import (
"context"
"errors"
"fmt"
"io"
"net"
"sync"

"github.com/hashicorp/go-multierror"
log "github.com/sirupsen/logrus"

"github.com/netbirdio/netbird/client/errors"
cerrors "github.com/netbirdio/netbird/client/errors"
)

// WGUDPProxy proxies
Expand Down Expand Up @@ -121,7 +123,7 @@ func (p *WGUDPProxy) close() error {
if err := p.localConn.Close(); err != nil {
result = multierror.Append(result, fmt.Errorf("local conn: %s", err))
}
return errors.FormatErrorOrNil(result)
return cerrors.FormatErrorOrNil(result)
}

// proxyToRemote proxies from Wireguard to the RemoteKey
Expand Down Expand Up @@ -160,18 +162,16 @@ func (p *WGUDPProxy) proxyToRemote(ctx context.Context) {
func (p *WGUDPProxy) proxyToLocal(ctx context.Context) {
defer func() {
if err := p.close(); err != nil {
log.Warnf("error in proxy to local loop: %s", err)
if !errors.Is(err, io.EOF) {
log.Warnf("error in proxy to local loop: %s", err)
}
}
}()

buf := make([]byte, 1500)
for {
n, err := p.remoteConn.Read(buf)
n, err := p.remoteConnRead(ctx, buf)
if err != nil {
if ctx.Err() != nil {
return
}
log.Errorf("failed to read from remote conn: %s, %s", p.remoteConn.RemoteAddr(), err)
return
}

Expand All @@ -193,3 +193,15 @@ func (p *WGUDPProxy) proxyToLocal(ctx context.Context) {
}
}
}

func (p *WGUDPProxy) remoteConnRead(ctx context.Context, buf []byte) (n int, err error) {
n, err = p.remoteConn.Read(buf)
if err != nil {
if ctx.Err() != nil {
return
}
log.Errorf("failed to read from remote conn: %s, %s", p.remoteConn.LocalAddr(), err)
return
}
return
}
23 changes: 21 additions & 2 deletions client/internal/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ import (
"github.com/netbirdio/netbird/client/internal/dns"
"github.com/netbirdio/netbird/client/internal/networkmonitor"
"github.com/netbirdio/netbird/client/internal/peer"
"github.com/netbirdio/netbird/client/internal/peer/guard"
icemaker "github.com/netbirdio/netbird/client/internal/peer/ice"
"github.com/netbirdio/netbird/client/internal/relay"
"github.com/netbirdio/netbird/client/internal/rosenpass"
"github.com/netbirdio/netbird/client/internal/routemanager"
Expand Down Expand Up @@ -168,6 +170,7 @@ type Engine struct {

relayManager *relayClient.Manager
stateManager *statemanager.Manager
srWatcher *guard.SRWatcher
}

// Peer is an instance of the Connection Peer
Expand Down Expand Up @@ -263,6 +266,10 @@ func (e *Engine) Stop() error {
e.routeManager.Stop(e.stateManager)
}

if e.srWatcher != nil {
e.srWatcher.Close()
}

err := e.removeAllPeers()
if err != nil {
return fmt.Errorf("failed to remove all peers: %s", err)
Expand Down Expand Up @@ -389,6 +396,18 @@ func (e *Engine) Start() error {
return fmt.Errorf("initialize dns server: %w", err)
}

iceCfg := icemaker.Config{
StunTurn: &e.stunTurn,
InterfaceBlackList: e.config.IFaceBlackList,
DisableIPv6Discovery: e.config.DisableIPv6Discovery,
UDPMux: e.udpMux.UDPMuxDefault,
UDPMuxSrflx: e.udpMux,
NATExternalIPs: e.parseNATExternalIPMappings(),
}

e.srWatcher = guard.NewSRWatcher(e.signal, e.relayManager, e.mobileDep.IFaceDiscover, iceCfg)
e.srWatcher.Start()

e.receiveSignalEvents()
e.receiveManagementEvents()
e.receiveProbeEvents()
Expand Down Expand Up @@ -971,7 +990,7 @@ func (e *Engine) createPeerConn(pubKey string, allowedIPs string) (*peer.Conn, e
LocalWgPort: e.config.WgPort,
RosenpassPubKey: e.getRosenpassPubKey(),
RosenpassAddr: e.getRosenpassAddr(),
ICEConfig: peer.ICEConfig{
ICEConfig: icemaker.Config{
StunTurn: &e.stunTurn,
InterfaceBlackList: e.config.IFaceBlackList,
DisableIPv6Discovery: e.config.DisableIPv6Discovery,
Expand All @@ -981,7 +1000,7 @@ func (e *Engine) createPeerConn(pubKey string, allowedIPs string) (*peer.Conn, e
},
}

peerConn, err := peer.NewConn(e.ctx, config, e.statusRecorder, e.signaler, e.mobileDep.IFaceDiscover, e.relayManager)
peerConn, err := peer.NewConn(e.ctx, config, e.statusRecorder, e.signaler, e.mobileDep.IFaceDiscover, e.relayManager, e.srWatcher)
if err != nil {
return nil, err
}
Expand Down
3 changes: 3 additions & 0 deletions client/internal/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ import (
"github.com/netbirdio/netbird/client/iface/device"
"github.com/netbirdio/netbird/client/internal/dns"
"github.com/netbirdio/netbird/client/internal/peer"
"github.com/netbirdio/netbird/client/internal/peer/guard"
icemaker "github.com/netbirdio/netbird/client/internal/peer/ice"
"github.com/netbirdio/netbird/client/internal/routemanager"
"github.com/netbirdio/netbird/client/ssh"
"github.com/netbirdio/netbird/client/system"
Expand Down Expand Up @@ -258,6 +260,7 @@ func TestEngine_UpdateNetworkMap(t *testing.T) {
}
engine.udpMux = bind.NewUniversalUDPMuxDefault(bind.UniversalUDPMuxParams{UDPConn: conn})
engine.ctx = ctx
engine.srWatcher = guard.NewSRWatcher(nil, nil, nil, icemaker.Config{})

type testCase struct {
name string
Expand Down
Loading

0 comments on commit 4e918e5

Please sign in to comment.