Skip to content

Commit

Permalink
3.12.0
Browse files Browse the repository at this point in the history
  • Loading branch information
TenderIronh committed Oct 28, 2023
1 parent 52dfe5c commit 7e57237
Show file tree
Hide file tree
Showing 18 changed files with 476 additions and 318 deletions.
45 changes: 0 additions & 45 deletions core/bandwidthLimit.go

This file was deleted.

6 changes: 6 additions & 0 deletions core/errorcode.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,16 @@ var (
ErrorNewUser = errors.New("new user")
ErrorLogin = errors.New("user or password not correct")
ErrNodeTooShort = errors.New("node name too short, it must >=8 charaters")
ErrReadDB = errors.New("read db error")
ErrNoUpdate = errors.New("there are currently no updates available")
ErrPeerOffline = errors.New("peer offline")
ErrNetwork = errors.New("network error")
ErrMsgFormat = errors.New("message format wrong")
ErrVersionNotCompatible = errors.New("version not compatible")
ErrOverlayConnDisconnect = errors.New("overlay connection is disconnected")
ErrConnectRelayNode = errors.New("connect relay node error")
ErrConnectPublicV4 = errors.New("connect public ipv4 error")
ErrMsgChannelNotFound = errors.New("message channel not found")
ErrRelayTunnelNotFound = errors.New("relay tunnel not found")
ErrSymmetricLimit = errors.New("symmetric limit")
)
16 changes: 7 additions & 9 deletions core/handlepush.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ func handlePush(pn *P2PNetwork, subType uint16, msg []byte) error {
}
gLog.Printf(LvDEBUG, "handle push msg type:%d, push header:%+v", subType, pushHead)
switch subType {
case MsgPushConnectReq: // TODO: handle a msg move to a new function
case MsgPushConnectReq:
err = handleConnectReq(pn, subType, msg)
case MsgPushRsp:
rsp := PushRsp{}
Expand Down Expand Up @@ -86,7 +86,6 @@ func handlePush(pn *P2PNetwork, subType uint16, msg []byte) error {
}
gConf.setNode(req.NewName)
gConf.setShareBandwidth(req.Bandwidth)
// TODO: hot reload
os.Exit(0)
case MsgPushSwitchApp:
gLog.Println(LvINFO, "MsgPushSwitchApp")
Expand All @@ -112,10 +111,12 @@ func handlePush(pn *P2PNetwork, subType uint16, msg []byte) error {
gLog.Println(LvINFO, "retry peerNode ", req.Node)
gConf.retryApp(req.Node)
default:
pn.msgMapMtx.Lock()
ch := pn.msgMap[pushHead.From]
pn.msgMapMtx.Unlock()
ch <- pushMsg{data: msg, ts: time.Now()}
i, ok := pn.msgMap.Load(pushHead.From)
if !ok {
return ErrMsgChannelNotFound
}
ch := i.(chan msgCtx)
ch <- msgCtx{data: msg, ts: time.Now()}
}
return err
}
Expand Down Expand Up @@ -145,9 +146,6 @@ func handleEditApp(pn *P2PNetwork, subType uint16, msg []byte) (err error) {
gConf.add(newConf, false)
pn.DeleteApp(oldConf) // DeleteApp may cost some times, execute at the end
return nil
// autoReconnect will auto AddApp
// pn.AddApp(config)
// TODO: report result
}

func handleConnectReq(pn *P2PNetwork, subType uint16, msg []byte) (err error) {
Expand Down
17 changes: 6 additions & 11 deletions core/holepunch.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,8 @@ func handshakeC2C(t *P2PTunnel) (err error) {
}
ra, head, _, _, err := UDPRead(conn, HandshakeTimeout)
if err != nil {
time.Sleep(time.Millisecond * 200)
gLog.Println(LvDEBUG, err, ", return this error when ip was not reachable, retry read")
ra, head, _, _, err = UDPRead(conn, HandshakeTimeout)
if err != nil {
gLog.Println(LvDEBUG, "handshakeC2C read MsgPunchHandshake error:", err)
return err
}
gLog.Println(LvDEBUG, "handshakeC2C read MsgPunchHandshake error:", err)
return err
}
t.ra, _ = net.ResolveUDPAddr("udp", ra.String())
if head.MainType == MsgP2P && head.SubType == MsgPunchHandshake {
Expand All @@ -57,6 +52,7 @@ func handshakeC2C(t *P2PTunnel) (err error) {
func handshakeC2S(t *P2PTunnel) error {
gLog.Printf(LvDEBUG, "handshakeC2S start")
defer gLog.Printf(LvDEBUG, "handshakeC2S end")
startTime := time.Now()
r := rand.New(rand.NewSource(time.Now().UnixNano()))
randPorts := r.Perm(65532)
conn, err := net.ListenUDP("udp", t.la)
Expand All @@ -68,7 +64,6 @@ func handshakeC2S(t *P2PTunnel) error {
go func() error {
gLog.Printf(LvDEBUG, "send symmetric handshake to %s from %d:%d start", t.config.peerIP, t.coneLocalPort, t.coneNatPort)
for i := 0; i < SymmetricHandshakeNum; i++ {
// TODO: auto calc cost time
// time.Sleep(SymmetricHandshakeInterval)
dst, err := net.ResolveUDPAddr("udp", fmt.Sprintf("%s:%d", t.config.peerIP, randPorts[i]+2))
if err != nil {
Expand Down Expand Up @@ -124,19 +119,19 @@ func handshakeC2S(t *P2PTunnel) error {
} else {
gLog.Println(LvDEBUG, "handshakeS2C read msg but not MsgPunchHandshakeAck")
}
gLog.Printf(LvINFO, "handshakeC2S ok")
gLog.Printf(LvINFO, "handshakeC2S ok. cost %d ms", time.Since(startTime)/time.Millisecond)
return nil
}

func handshakeS2C(t *P2PTunnel) error {
gLog.Printf(LvDEBUG, "handshakeS2C start")
defer gLog.Printf(LvDEBUG, "handshakeS2C end")
startTime := time.Now()
gotCh := make(chan *net.UDPAddr, 5)
// sequencely udp send handshake, do not parallel send
gLog.Printf(LvDEBUG, "send symmetric handshake to %s:%d start", t.config.peerIP, t.config.peerConeNatPort)
gotIt := false
for i := 0; i < SymmetricHandshakeNum; i++ {
// TODO: auto calc cost time
// time.Sleep(SymmetricHandshakeInterval)
go func(t *P2PTunnel) error {
conn, err := net.ListenUDP("udp", nil) // TODO: system allocated port really random?
Expand Down Expand Up @@ -197,7 +192,7 @@ func handshakeS2C(t *P2PTunnel) error {
case la := <-gotCh:
t.la = la
gLog.Println(LvDEBUG, "symmetric handshake ok", la)
gLog.Printf(LvINFO, "handshakeS2C ok")
gLog.Printf(LvINFO, "handshakeS2C ok. cost %dms", time.Since(startTime)/time.Millisecond)
}
return nil
}
7 changes: 7 additions & 0 deletions core/iptree.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,13 @@ func (iptree *IPTree) Contains(ipStr string) bool {
return iptree.ContainsInt(ip)
}

func IsLocalhost(ipStr string) bool {
if ipStr == "localhost" || ipStr == "127.0.0.1" || ipStr == "::1" {
return true
}
return false
}

func (iptree *IPTree) ContainsInt(ip uint32) bool {
iptree.treeMtx.RLock()
defer iptree.treeMtx.RUnlock()
Expand Down
11 changes: 7 additions & 4 deletions core/iptree_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"encoding/binary"
"net"
"testing"
"time"
)

func wrapTestContains(t *testing.T, iptree *IPTree, ip string, result bool) {
Expand Down Expand Up @@ -128,6 +129,7 @@ func BenchmarkBuildipTree20k(t *testing.B) {
t.Logf("clear. ipTree size:%d\n", iptree.Size())
}
func BenchmarkQuery(t *testing.B) {
ts := time.Now()
iptree := NewIPTree("")
iptree.Clear()
iptree.Add("10.1.5.50", "10.1.5.100")
Expand All @@ -145,7 +147,7 @@ func BenchmarkQuery(t *testing.B) {
binary.Read(bytes.NewBuffer(net.ParseIP("10.1.1.1").To4()), binary.BigEndian, &minIP)

// insert 10k block ip single
nodeNum := uint32(10000 * 100)
nodeNum := uint32(10000 * 1000)
gap := uint32(10)
for i := minIP; i < minIP+nodeNum*gap; i += gap {
iptree.AddIntIP(i, i)
Expand All @@ -156,8 +158,9 @@ func BenchmarkQuery(t *testing.B) {
for i := minIP; i < minIP+nodeNum*gap; i += gap {
iptree.AddIntIP(i, i+5)
}
t.Logf("ipTree size:%d\n", iptree.Size())
t.ResetTimer()
t.Logf("ipTree size:%d cost:%dms\n", iptree.Size(), time.Since(ts)/time.Millisecond)
ts = time.Now()
// t.ResetTimer()
queryNum := 100 * 10000
for i := 0; i < queryNum; i++ {
iptree.ContainsInt(minIP + uint32(i))
Expand All @@ -166,6 +169,6 @@ func BenchmarkQuery(t *testing.B) {
wrapBenchmarkContains(t, iptree, "10.1.5.200", false)
wrapBenchmarkContains(t, iptree, "200.1.1.1", false)
}
t.Logf("query list:%d\n", queryNum*4)
t.Logf("query num:%d cost:%dms\n", queryNum*4, time.Since(ts)/time.Millisecond)

}
4 changes: 2 additions & 2 deletions core/overlay.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,13 +67,13 @@ func (oConn *overlayConn) run() {
writeBytes := append(tunnelHead.Bytes(), payload...)
if oConn.rtid == 0 {
oConn.tunnel.conn.WriteBytes(MsgP2P, MsgOverlayData, writeBytes)
gLog.Printf(LvDEBUG, "write overlay data to %d:%d bodylen=%d", oConn.rtid, oConn.id, len(writeBytes))
gLog.Printf(LvDEBUG, "write overlay data to tid:%d,oid:%d bodylen=%d", oConn.tunnel.id, oConn.id, len(writeBytes))
} else {
// write raley data
all := append(relayHead.Bytes(), encodeHeader(MsgP2P, MsgOverlayData, uint32(len(writeBytes)))...)
all = append(all, writeBytes...)
oConn.tunnel.conn.WriteBytes(MsgP2P, MsgRelayData, all)
gLog.Printf(LvDEBUG, "write relay data to %d:%d bodylen=%d", oConn.rtid, oConn.id, len(writeBytes))
gLog.Printf(LvDEBUG, "write relay data to tid:%d,rtid:%d,oid:%d bodylen=%d", oConn.tunnel.id, oConn.rtid, oConn.id, len(writeBytes))
}
}
if oConn.connTCP != nil {
Expand Down
21 changes: 15 additions & 6 deletions core/p2papp.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,11 @@ func (app *p2pApp) listenTCP() error {
gLog.Printf(LvDEBUG, "tcp accept on port %d start", app.config.SrcPort)
defer gLog.Printf(LvDEBUG, "tcp accept on port %d end", app.config.SrcPort)
var err error
app.listener, err = net.Listen("tcp", fmt.Sprintf("0.0.0.0:%d", app.config.SrcPort)) // support tcp4 and tcp6
listenAddr := ""
if IsLocalhost(app.config.Whitelist) { // not expose port
listenAddr = "127.0.0.1"
}
app.listener, err = net.Listen("tcp", fmt.Sprintf("%s:%d", listenAddr, app.config.SrcPort))
if err != nil {
gLog.Printf(LvERROR, "listen error:%s", err)
return err
Expand All @@ -67,8 +71,8 @@ func (app *p2pApp) listenTCP() error {
}
// check white list
if app.config.Whitelist != "" {
remoteIP := strings.Split(conn.RemoteAddr().String(), ":")[0]
if !app.iptree.Contains(remoteIP) {
remoteIP := conn.RemoteAddr().(*net.TCPAddr).IP.String()
if !app.iptree.Contains(remoteIP) && !IsLocalhost(remoteIP) {
conn.Close()
gLog.Printf(LvERROR, "%s not in whitelist, access denied", remoteIP)
continue
Expand Down Expand Up @@ -252,16 +256,21 @@ func (app *p2pApp) close() {
func (app *p2pApp) relayHeartbeatLoop() {
app.wg.Add(1)
defer app.wg.Done()
gLog.Printf(LvDEBUG, "relayHeartbeat to %d start", app.rtid)
defer gLog.Printf(LvDEBUG, "relayHeartbeat to %d end", app.rtid)
gLog.Printf(LvDEBUG, "relayHeartbeat to rtid:%d start", app.rtid)
defer gLog.Printf(LvDEBUG, "relayHeartbeat to rtid%d end", app.rtid)
relayHead := new(bytes.Buffer)
binary.Write(relayHead, binary.LittleEndian, app.rtid)
req := RelayHeartbeat{RelayTunnelID: app.tunnel.id,
AppID: app.id}
msg, _ := newMessage(MsgP2P, MsgRelayHeartbeat, &req)
msgWithHead := append(relayHead.Bytes(), msg...)
for app.tunnel.isRuning() && app.running {
app.tunnel.conn.WriteBytes(MsgP2P, MsgRelayData, msgWithHead)
err := app.tunnel.conn.WriteBytes(MsgP2P, MsgRelayData, msgWithHead)
if err != nil {
gLog.Printf(LvERROR, "%d app write relay tunnel heartbeat error %s", app.rtid, err)
return
}
gLog.Printf(LvDEBUG, "%d app write relay tunnel heartbeat ok", app.rtid)
time.Sleep(TunnelHeartbeatTime)
}
}
Loading

0 comments on commit 7e57237

Please sign in to comment.