diff --git a/face/multicast-udp-transport.go b/face/multicast-udp-transport.go index fdab6969..a57b347d 100644 --- a/face/multicast-udp-transport.go +++ b/face/multicast-udp-transport.go @@ -10,7 +10,6 @@ package face import ( "errors" "net" - "runtime" "strconv" "github.com/named-data/YaNFD/core" @@ -71,7 +70,9 @@ func MakeMulticastUDPTransport(localURI *defn.URI) (*MulticastUDPTransport, erro if err != nil { return nil, errors.New("Unable to create send connection to group address: " + err.Error()) } + t.sendConn = sendConn.(*net.UDPConn) + t.running.Store(true) // Create receive connection t.recvConn, err = net.ListenMulticastUDP(t.remoteURI.Scheme(), localIf, &t.groupAddr) @@ -80,8 +81,6 @@ func MakeMulticastUDPTransport(localURI *defn.URI) (*MulticastUDPTransport, erro localIf.Name + ": " + err.Error()) } - t.changeState(defn.Up) - return t, nil } @@ -134,27 +133,17 @@ func (t *MulticastUDPTransport) sendFrame(frame []byte) { } func (t *MulticastUDPTransport) runReceive() { - if lockThreadsToCores { - runtime.LockOSThread() - } - recvBuf := make([]byte, defn.MaxNDNPacketSize) for { readSize, remoteAddr, err := t.recvConn.ReadFromUDP(recvBuf) if err != nil { - if err.Error() == "EOF" { - core.LogDebug(t, "EOF - Face DOWN") - t.changeState(defn.Down) - break - } else { - core.LogWarn(t, "Unable to read from socket (", err, ") - DROP") - t.recvConn.Close() - localIf, err := InterfaceByIP(net.ParseIP(t.localURI.PathHost())) - if err != nil || localIf == nil { - core.LogError(t, "Unable to get interface for local URI ", t.localURI, ": ", err) - } - t.recvConn, _ = net.ListenMulticastUDP(t.remoteURI.Scheme(), localIf, &t.groupAddr) + // Re-create the socket + localIf, err := InterfaceByIP(net.ParseIP(t.localURI.PathHost())) + if err != nil || localIf == nil { + core.LogError(t, "Unable to get interface for local URI ", t.localURI, ": ", err) } + t.recvConn, _ = net.ListenMulticastUDP(t.remoteURI.Scheme(), localIf, &t.groupAddr) + } core.LogTrace(t, "Receive of size ", readSize, " from ", remoteAddr) @@ -173,23 +162,11 @@ func (t *MulticastUDPTransport) runReceive() { } } -func (t *MulticastUDPTransport) changeState(new defn.State) { - if t.state == new { - return - } - - core.LogInfo(t, "state: ", t.state, " -> ", new) - t.state = new - - if t.state != defn.Up { - core.LogInfo(t, "Closing UDP socket") - t.hasQuit <- true - t.sendConn.Close() - t.recvConn.Close() - - // Stop link service - t.linkService.tellTransportQuit() - - FaceTable.Remove(t.faceID) +func (t *MulticastUDPTransport) Close() { + if t.running.Swap(false) { + if t.sendConn != nil && t.recvConn != nil { + t.sendConn.Close() + t.recvConn.Close() + } } } diff --git a/face/web-socket-listener.go b/face/web-socket-listener.go index e2b361b0..0a48b427 100644 --- a/face/web-socket-listener.go +++ b/face/web-socket-listener.go @@ -114,12 +114,12 @@ func (l *WebSocketListener) handler(w http.ResponseWriter, r *http.Request) { return } - t := NewWebSocketTransport(l.localURI, c) - linkService := MakeNDNLPLinkService(t, MakeNDNLPLinkServiceOptions()) + newTransport := NewWebSocketTransport(l.localURI, c) + core.LogInfo(l, "Accepting new WebSocket face ", newTransport.RemoteURI()) - core.LogInfo(l, "Accepting new WebSocket face ", t.RemoteURI()) - FaceTable.Add(linkService) - go linkService.Run(nil) + options := MakeNDNLPLinkServiceOptions() + options.IsFragmentationEnabled = false // reliable stream + MakeNDNLPLinkService(newTransport, options).Run(nil) } // Close closes the WebSocketListener. diff --git a/face/web-socket-transport.go b/face/web-socket-transport.go index 4f4a6553..968043c3 100644 --- a/face/web-socket-transport.go +++ b/face/web-socket-transport.go @@ -9,7 +9,6 @@ package face import ( "net" - "runtime" "strconv" "github.com/gorilla/websocket" @@ -29,6 +28,7 @@ var _ transport = &WebSocketTransport{} func NewWebSocketTransport(localURI *defn.URI, c *websocket.Conn) (t *WebSocketTransport) { remoteURI := defn.MakeWebSocketClientFaceURI(c.RemoteAddr()) t = &WebSocketTransport{c: c} + t.running.Store(true) scope := defn.NonLocal ip := net.ParseIP(remoteURI.PathHost()) @@ -37,7 +37,6 @@ func NewWebSocketTransport(localURI *defn.URI, c *websocket.Conn) (t *WebSocketT } t.makeTransportBase(remoteURI, localURI, PersistencyOnDemand, scope, defn.PointToPoint, defn.MaxNDNPacketSize) - t.changeState(defn.Up) return t } @@ -66,25 +65,20 @@ func (t *WebSocketTransport) sendFrame(frame []byte) { e := t.c.WriteMessage(websocket.BinaryMessage, frame) if e != nil { core.LogWarn(t, "Unable to send on socket - DROP and Face DOWN") - t.changeState(defn.Down) + t.Close() + return } t.nOutBytes += uint64(len(frame)) } func (t *WebSocketTransport) runReceive() { - core.LogTrace(t, "Starting receive thread") - - if lockThreadsToCores { - runtime.LockOSThread() - } - for { mt, message, e := t.c.ReadMessage() if e != nil { core.LogWarn(t, "Unable to read from socket (", e, ") - DROP and Face DOWN") - t.changeState(defn.Down) - break + t.Close() + return } if mt != websocket.BinaryMessage { @@ -100,27 +94,10 @@ func (t *WebSocketTransport) runReceive() { continue } - // Send up to link service t.linkService.handleIncomingFrame(message) } } -func (t *WebSocketTransport) changeState(new defn.State) { - if t.state == new { - return - } - - core.LogInfo(t, "state: ", t.state, " -> ", new) - t.state = new - - if t.state != defn.Up { - core.LogInfo(t, "Closing Unix stream socket") - t.hasQuit <- true - t.c.Close() - - // Stop link service - t.linkService.tellTransportQuit() - - FaceTable.Remove(t.faceID) - } +func (t *WebSocketTransport) Close() { + t.c.Close() }