Skip to content

Commit

Permalink
[WIP] other transports
Browse files Browse the repository at this point in the history
  • Loading branch information
pulsejet committed Dec 14, 2024
1 parent c9cfd04 commit c86fb86
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 72 deletions.
51 changes: 14 additions & 37 deletions face/multicast-udp-transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ package face
import (
"errors"
"net"
"runtime"
"strconv"

"github.com/named-data/YaNFD/core"
Expand Down Expand Up @@ -71,7 +70,9 @@ func MakeMulticastUDPTransport(localURI *defn.URI) (*MulticastUDPTransport, erro
if err != nil {
return nil, errors.New("Unable to create send connection to group address: " + err.Error())
}

t.sendConn = sendConn.(*net.UDPConn)
t.running.Store(true)

// Create receive connection
t.recvConn, err = net.ListenMulticastUDP(t.remoteURI.Scheme(), localIf, &t.groupAddr)
Expand All @@ -80,8 +81,6 @@ func MakeMulticastUDPTransport(localURI *defn.URI) (*MulticastUDPTransport, erro
localIf.Name + ": " + err.Error())
}

t.changeState(defn.Up)

return t, nil
}

Expand Down Expand Up @@ -134,27 +133,17 @@ func (t *MulticastUDPTransport) sendFrame(frame []byte) {
}

func (t *MulticastUDPTransport) runReceive() {
if lockThreadsToCores {
runtime.LockOSThread()
}

recvBuf := make([]byte, defn.MaxNDNPacketSize)
for {
readSize, remoteAddr, err := t.recvConn.ReadFromUDP(recvBuf)
if err != nil {
if err.Error() == "EOF" {
core.LogDebug(t, "EOF - Face DOWN")
t.changeState(defn.Down)
break
} else {
core.LogWarn(t, "Unable to read from socket (", err, ") - DROP")
t.recvConn.Close()
localIf, err := InterfaceByIP(net.ParseIP(t.localURI.PathHost()))
if err != nil || localIf == nil {
core.LogError(t, "Unable to get interface for local URI ", t.localURI, ": ", err)
}
t.recvConn, _ = net.ListenMulticastUDP(t.remoteURI.Scheme(), localIf, &t.groupAddr)
// Re-create the socket
localIf, err := InterfaceByIP(net.ParseIP(t.localURI.PathHost()))
if err != nil || localIf == nil {
core.LogError(t, "Unable to get interface for local URI ", t.localURI, ": ", err)
}
t.recvConn, _ = net.ListenMulticastUDP(t.remoteURI.Scheme(), localIf, &t.groupAddr)

}

core.LogTrace(t, "Receive of size ", readSize, " from ", remoteAddr)
Expand All @@ -173,23 +162,11 @@ func (t *MulticastUDPTransport) runReceive() {
}
}

func (t *MulticastUDPTransport) changeState(new defn.State) {
if t.state == new {
return
}

core.LogInfo(t, "state: ", t.state, " -> ", new)
t.state = new

if t.state != defn.Up {
core.LogInfo(t, "Closing UDP socket")
t.hasQuit <- true
t.sendConn.Close()
t.recvConn.Close()

// Stop link service
t.linkService.tellTransportQuit()

FaceTable.Remove(t.faceID)
func (t *MulticastUDPTransport) Close() {
if t.running.Swap(false) {
if t.sendConn != nil && t.recvConn != nil {
t.sendConn.Close()
t.recvConn.Close()
}
}
}
10 changes: 5 additions & 5 deletions face/web-socket-listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,12 +114,12 @@ func (l *WebSocketListener) handler(w http.ResponseWriter, r *http.Request) {
return
}

t := NewWebSocketTransport(l.localURI, c)
linkService := MakeNDNLPLinkService(t, MakeNDNLPLinkServiceOptions())
newTransport := NewWebSocketTransport(l.localURI, c)
core.LogInfo(l, "Accepting new WebSocket face ", newTransport.RemoteURI())

core.LogInfo(l, "Accepting new WebSocket face ", t.RemoteURI())
FaceTable.Add(linkService)
go linkService.Run(nil)
options := MakeNDNLPLinkServiceOptions()
options.IsFragmentationEnabled = false // reliable stream
MakeNDNLPLinkService(newTransport, options).Run(nil)
}

// Close closes the WebSocketListener.
Expand Down
37 changes: 7 additions & 30 deletions face/web-socket-transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ package face

import (
"net"
"runtime"
"strconv"

"github.com/gorilla/websocket"
Expand All @@ -29,6 +28,7 @@ var _ transport = &WebSocketTransport{}
func NewWebSocketTransport(localURI *defn.URI, c *websocket.Conn) (t *WebSocketTransport) {
remoteURI := defn.MakeWebSocketClientFaceURI(c.RemoteAddr())
t = &WebSocketTransport{c: c}
t.running.Store(true)

scope := defn.NonLocal
ip := net.ParseIP(remoteURI.PathHost())
Expand All @@ -37,7 +37,6 @@ func NewWebSocketTransport(localURI *defn.URI, c *websocket.Conn) (t *WebSocketT
}

t.makeTransportBase(remoteURI, localURI, PersistencyOnDemand, scope, defn.PointToPoint, defn.MaxNDNPacketSize)
t.changeState(defn.Up)
return t
}

Expand Down Expand Up @@ -66,25 +65,20 @@ func (t *WebSocketTransport) sendFrame(frame []byte) {
e := t.c.WriteMessage(websocket.BinaryMessage, frame)
if e != nil {
core.LogWarn(t, "Unable to send on socket - DROP and Face DOWN")
t.changeState(defn.Down)
t.Close()
return
}

t.nOutBytes += uint64(len(frame))
}

func (t *WebSocketTransport) runReceive() {
core.LogTrace(t, "Starting receive thread")

if lockThreadsToCores {
runtime.LockOSThread()
}

for {
mt, message, e := t.c.ReadMessage()
if e != nil {
core.LogWarn(t, "Unable to read from socket (", e, ") - DROP and Face DOWN")
t.changeState(defn.Down)
break
t.Close()
return
}

if mt != websocket.BinaryMessage {
Expand All @@ -100,27 +94,10 @@ func (t *WebSocketTransport) runReceive() {
continue
}

// Send up to link service
t.linkService.handleIncomingFrame(message)
}
}

func (t *WebSocketTransport) changeState(new defn.State) {
if t.state == new {
return
}

core.LogInfo(t, "state: ", t.state, " -> ", new)
t.state = new

if t.state != defn.Up {
core.LogInfo(t, "Closing Unix stream socket")
t.hasQuit <- true
t.c.Close()

// Stop link service
t.linkService.tellTransportQuit()

FaceTable.Remove(t.faceID)
}
func (t *WebSocketTransport) Close() {
t.c.Close()
}

0 comments on commit c86fb86

Please sign in to comment.