Skip to content

Commit

Permalink
face: fix websocket
Browse files Browse the repository at this point in the history
  • Loading branch information
pulsejet committed Dec 14, 2024
1 parent c86fb86 commit c91acbc
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 34 deletions.
30 changes: 12 additions & 18 deletions face/web-socket-listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,13 @@ type WebSocketListenerConfig struct {
TLSKey string
}

// URL returns server URL.
// WebSocketListener listens for incoming WebSockets connections.
type WebSocketListener struct {
server http.Server
upgrader websocket.Upgrader
localURI *defn.URI
}

func (cfg WebSocketListenerConfig) URL() *url.URL {
addr := net.JoinHostPort(cfg.Bind, strconv.FormatUint(uint64(cfg.Port), 10))
u := &url.URL{
Expand All @@ -55,7 +61,6 @@ func (cfg WebSocketListenerConfig) String() string {
return b.String()
}

// NewWebSocketListener constructs a WebSocketListener.
func NewWebSocketListener(cfg WebSocketListenerConfig) (*WebSocketListener, error) {
localURI := cfg.URL()
ret := &WebSocketListener{
Expand All @@ -80,31 +85,21 @@ func NewWebSocketListener(cfg WebSocketListenerConfig) (*WebSocketListener, erro
return ret, nil
}

// WebSocketListener listens for incoming WebSockets connections.
type WebSocketListener struct {
server http.Server
upgrader websocket.Upgrader
localURI *defn.URI
}

var _ Listener = &WebSocketListener{}

func (l *WebSocketListener) String() string {
return "WebSocketListener, " + l.localURI.String()
}

// Run starts the WebSocket listener.
func (l *WebSocketListener) Run() {
l.server.Handler = http.HandlerFunc(l.handler)

var e error
var err error
if l.server.TLSConfig == nil {
e = l.server.ListenAndServe()
err = l.server.ListenAndServe()
} else {
e = l.server.ListenAndServeTLS("", "")
err = l.server.ListenAndServeTLS("", "")
}
if !errors.Is(e, http.ErrServerClosed) {
core.LogFatal(l, "Unable to start listener: ", e)
if !errors.Is(err, http.ErrServerClosed) {
core.LogFatal(l, "Unable to start listener: ", err)
}
}

Expand All @@ -122,7 +117,6 @@ func (l *WebSocketListener) handler(w http.ResponseWriter, r *http.Request) {
MakeNDNLPLinkService(newTransport, options).Run(nil)
}

// Close closes the WebSocketListener.
func (l *WebSocketListener) Close() {
core.LogInfo(l, "Stopping listener")
l.server.Shutdown(context.TODO())
Expand Down
36 changes: 20 additions & 16 deletions face/web-socket-transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@
package face

import (
"fmt"
"net"
"strconv"

"github.com/gorilla/websocket"
"github.com/named-data/YaNFD/core"
Expand All @@ -22,46 +22,44 @@ type WebSocketTransport struct {
c *websocket.Conn
}

var _ transport = &WebSocketTransport{}

// NewWebSocketTransport creates a Unix stream transport.
func NewWebSocketTransport(localURI *defn.URI, c *websocket.Conn) (t *WebSocketTransport) {
remoteURI := defn.MakeWebSocketClientFaceURI(c.RemoteAddr())
t = &WebSocketTransport{c: c}
t.running.Store(true)

scope := defn.NonLocal
ip := net.ParseIP(remoteURI.PathHost())
if ip != nil && ip.IsLoopback() {
scope = defn.Local
}

t = &WebSocketTransport{c: c}
t.makeTransportBase(remoteURI, localURI, PersistencyOnDemand, scope, defn.PointToPoint, defn.MaxNDNPacketSize)
t.running.Store(true)

return t
}

func (t *WebSocketTransport) String() string {
return "WebSocketTransport, FaceID=" + strconv.FormatUint(t.faceID, 10) +
", RemoteURI=" + t.remoteURI.String() + ", LocalURI=" + t.localURI.String()
return fmt.Sprintf("WebSocketTransport, FaceID=%d, RemoteURI=%s, LocalURI=%s", t.faceID, t.remoteURI, t.localURI)
}

// SetPersistency changes the persistency of the face.
func (t *WebSocketTransport) SetPersistency(persistency Persistency) bool {
return persistency == PersistencyOnDemand
}

// GetSendQueueSize returns the current size of the send queue.
func (t *WebSocketTransport) GetSendQueueSize() uint64 {
return 0
}

func (t *WebSocketTransport) sendFrame(frame []byte) {
if !t.running.Load() {
return
}

if len(frame) > t.MTU() {
core.LogWarn(t, "Attempted to send frame larger than MTU - DROP")
return
}

core.LogDebug(t, "Sending frame of size ", len(frame))
e := t.c.WriteMessage(websocket.BinaryMessage, frame)
if e != nil {
core.LogWarn(t, "Unable to send on socket - DROP and Face DOWN")
Expand All @@ -73,11 +71,18 @@ func (t *WebSocketTransport) sendFrame(frame []byte) {
}

func (t *WebSocketTransport) runReceive() {
defer t.Close()

for {
mt, message, e := t.c.ReadMessage()
if e != nil {
core.LogWarn(t, "Unable to read from socket (", e, ") - DROP and Face DOWN")
t.Close()
if websocket.IsCloseError(e) {
// gracefully closed
} else if websocket.IsUnexpectedCloseError(e) {
core.LogInfo(t, "WebSocket closed unexpectedly (", e, ") - DROP and Face DOWN")
} else {
core.LogWarn(t, "Unable to read from WebSocket (", e, ") - DROP and Face DOWN")
}
return
}

Expand All @@ -86,18 +91,17 @@ func (t *WebSocketTransport) runReceive() {
continue
}

core.LogTrace(t, "Receive of size ", len(message))
t.nInBytes += uint64(len(message))

if len(message) > defn.MaxNDNPacketSize {
core.LogWarn(t, "Received too much data without valid TLV block - DROP")
continue
}

t.nInBytes += uint64(len(message))
t.linkService.handleIncomingFrame(message)
}
}

func (t *WebSocketTransport) Close() {
t.running.Store(false)
t.c.Close()
}

0 comments on commit c91acbc

Please sign in to comment.