From fedaa55c42537da9b90c253833e05749d3c6af50 Mon Sep 17 00:00:00 2001 From: garmr Date: Fri, 1 Sep 2023 13:35:46 -0700 Subject: [PATCH] Added consumer-tests --- clientcore/consumer.go | 949 ++++++++++++++++++------------------ clientcore/consumer_test.go | 604 +++++++++++++++++++++++ go.mod | 4 +- go.sum | 6 + 4 files changed, 1100 insertions(+), 463 deletions(-) create mode 100644 clientcore/consumer_test.go diff --git a/clientcore/consumer.go b/clientcore/consumer.go index 4a97eb0..5f52509 100644 --- a/clientcore/consumer.go +++ b/clientcore/consumer.go @@ -17,524 +17,549 @@ import ( "sync" "time" + "github.com/pion/webrtc/v3" + "github.com/getlantern/broflake/common" "github.com/getlantern/broflake/otel" - "github.com/pion/webrtc/v3" ) func NewConsumerWebRTC(options *WebRTCOptions, wg *sync.WaitGroup) *WorkerFSM { return NewWorkerFSM(wg, []FSMstate{ - FSMstate(func(ctx context.Context, com *ipcChan, input []interface{}) (int, []interface{}) { - // State 0 - // (no input data) - common.Debugf("Consumer state 0, constructing RTCPeerConnection...") - - // We're resetting this slot, so send a nil path assertion IPC message - com.tx <- IPCMsg{IpcType: PathAssertionIPC, Data: common.PathAssertion{}} - - STUNSrvs, err := options.STUNBatch(options.STUNBatchSize) - if err != nil { - common.Debugf("Error creating STUN batch: %v", err) - return 0, []interface{}{} - } + newFSMstate0(options), + newFSMstate1(options), + newFSMstate2(options), + newFSMstate3(options), + newFSMstate4(options), + newFSMstate5(options), + }) +} - common.Debugf("Created STUN batch (%v/%v servers)", len(STUNSrvs), options.STUNBatchSize) +func newFSMstate0(options *WebRTCOptions) FSMstate { + return FSMstate(func(ctx context.Context, com *ipcChan, input []interface{}) (int, []interface{}) { + // State 0 + // (no input data) + common.Debugf("Consumer state 0, constructing RTCPeerConnection...") - config := webrtc.Configuration{ - ICEServers: []webrtc.ICEServer{ - { - URLs: STUNSrvs, - }, - }, - } + // We're resetting this slot, so send a nil path assertion IPC message + com.tx <- IPCMsg{IpcType: PathAssertionIPC, Data: common.PathAssertion{}} - // Construct the RTCPeerConnection - peerConnection, err := webrtc.NewPeerConnection(config) - if err != nil { - common.Debugf("Error creating RTCPeerConnection: %v", err) - return 0, []interface{}{} - } + STUNSrvs, err := options.STUNBatch(options.STUNBatchSize) + if err != nil { + common.Debugf("Error creating STUN batch: %v", err) + return 0, []interface{}{} + } - // Consumers are the offerers, so we must create a datachannel - // The following configuration creates a UDP-like unreliable channel - dataChannelConfig := webrtc.DataChannelInit{Ordered: new(bool), MaxRetransmits: new(uint16)} - d, err := peerConnection.CreateDataChannel("data", &dataChannelConfig) - if err != nil { - common.Debugf("Error creating WebRTC datachannel: %v", err) - peerConnection.Close() // TODO: there's an err we should handle here - return 0, []interface{}{} - } + common.Debugf("Created STUN batch (%v/%v servers)", len(STUNSrvs), options.STUNBatchSize) - // We want to make sure we capture the connection establishment event whenever it happens, - // but we also want to avoid control flow spaghetti (it would very hard to reason about - // client operation if we sometimes jump forward to future states based on async events - // firing outside of the state machine). Solution: Pass forward this buffered channel such - // that we can explicitly check for connection establishment in state 4. In theory, it's - // possible that magical ICE mysteries could cause the connection to open as early as the end - // of state 2. In practice, the differences here should be on the order of nanoseconds. But - // we should monitor the logs to see if connections open too long before we check for them. - connectionEstablished := make(chan *webrtc.DataChannel, 1) - - d.OnOpen(func() { - common.Debugf("A datachannel has opened!") - connectionEstablished <- d - }) - - // connectionClosed (and the OnClose handler below) is implemented for Firefox, the only - // browser which doesn't implement WebRTC's onconnectionstatechange event. We listen for both - // onclose and onconnectionstatechange under the assumption that non-Firefox browsers can - // benefit from faster connection failure detection by listening for the `failed` event. - connectionClosed := make(chan struct{}, 1) - d.OnClose(func() { - common.Debugf("A datachannel has closed!") - connectionClosed <- struct{}{} - }) - - // Ditto, but for connection state changes - connectionChange := make(chan webrtc.PeerConnectionState, 16) - peerConnection.OnConnectionStateChange(func(s webrtc.PeerConnectionState) { - common.Debugf("Peer connection state change: %v", s.String()) - connectionChange <- s - }) - - // TODO: right now we listen for ICE connection state changes only to log messages about - // client behavior. In the future, by passing a channel forward in the same manner as above, - // we could probably use the ICE connection state change event to determine the precise - // moment of NAT traversal failure (instead of just waiting on a timer). - peerConnection.OnICEConnectionStateChange(func(s webrtc.ICEConnectionState) { - common.Debugf("ICE connection state change: %v", s.String()) - }) + config := webrtc.Configuration{ + ICEServers: []webrtc.ICEServer{ + { + URLs: STUNSrvs, + }, + }, + } - return 1, []interface{}{peerConnection, connectionEstablished, connectionChange, connectionClosed} - }), - FSMstate(func(ctx context.Context, com *ipcChan, input []interface{}) (int, []interface{}) { - // State 1 - // input[0]: *webrtc.PeerConnection - // input[1]: chan *webrtc.DataChannel - // input[2]: chan webrtc.PeerConnectionState - // input[3]: chan struct{} - peerConnection := input[0].(*webrtc.PeerConnection) - connectionEstablished := input[1].(chan *webrtc.DataChannel) - connectionChange := input[2].(chan webrtc.PeerConnectionState) - connectionClosed := input[3].(chan struct{}) - common.Debugf("Consumer state 1...") - - // Listen for genesis messages - req, err := http.NewRequestWithContext( - ctx, - "GET", - options.DiscoverySrv+options.Endpoint, - nil, - ) - if err != nil { - common.Debugf("Error constructing request") - return 1, []interface{}{peerConnection, connectionEstablished, connectionChange, connectionClosed} - } + // Construct the RTCPeerConnection + peerConnection, err := webrtc.NewPeerConnection(config) + if err != nil { + common.Debugf("Error creating RTCPeerConnection: %v", err) + return 0, []interface{}{} + } + + // Consumers are the offerers, so we must create a datachannel + // The following configuration creates a UDP-like unreliable channel + dataChannelConfig := webrtc.DataChannelInit{Ordered: new(bool), MaxRetransmits: new(uint16)} + d, err := peerConnection.CreateDataChannel("data", &dataChannelConfig) + if err != nil { + common.Debugf("Error creating WebRTC datachannel: %v", err) + peerConnection.Close() // TODO: there's an err we should handle here + return 0, []interface{}{} + } + + // We want to make sure we capture the connection establishment event whenever it happens, + // but we also want to avoid control flow spaghetti (it would very hard to reason about + // client operation if we sometimes jump forward to future states based on async events + // firing outside of the state machine). Solution: Pass forward this buffered channel such + // that we can explicitly check for connection establishment in state 4. In theory, it's + // possible that magical ICE mysteries could cause the connection to open as early as the end + // of state 2. In practice, the differences here should be on the order of nanoseconds. But + // we should monitor the logs to see if connections open too long before we check for them. + connectionEstablished := make(chan *webrtc.DataChannel, 1) + + d.OnOpen(func() { + common.Debugf("A datachannel has opened!") + connectionEstablished <- d + }) + + // connectionClosed (and the OnClose handler below) is implemented for Firefox, the only + // browser which doesn't implement WebRTC's onconnectionstatechange event. We listen for both + // onclose and onconnectionstatechange under the assumption that non-Firefox browsers can + // benefit from faster connection failure detection by listening for the `failed` event. + connectionClosed := make(chan struct{}, 1) + d.OnClose(func() { + common.Debugf("A datachannel has closed!") + connectionClosed <- struct{}{} + }) + + // Ditto, but for connection state changes + connectionChange := make(chan webrtc.PeerConnectionState, 16) + peerConnection.OnConnectionStateChange(func(s webrtc.PeerConnectionState) { + common.Debugf("Peer connection state change: %v", s.String()) + connectionChange <- s + }) + + // TODO: right now we listen for ICE connection state changes only to log messages about + // client behavior. In the future, by passing a channel forward in the same manner as above, + // we could probably use the ICE connection state change event to determine the precise + // moment of NAT traversal failure (instead of just waiting on a timer). + peerConnection.OnICEConnectionStateChange(func(s webrtc.ICEConnectionState) { + common.Debugf("ICE connection state change: %v", s.String()) + }) + + return 1, []interface{}{peerConnection, connectionEstablished, connectionChange, connectionClosed} + }) +} - req.Header.Add(common.VersionHeader, common.Version) +func newFSMstate1(options *WebRTCOptions) FSMstate { + return FSMstate(func(ctx context.Context, com *ipcChan, input []interface{}) (int, []interface{}) { + // State 1 + // input[0]: *webrtc.PeerConnection + // input[1]: chan *webrtc.DataChannel + // input[2]: chan webrtc.PeerConnectionState + // input[3]: chan struct{} + peerConnection := input[0].(*webrtc.PeerConnection) + connectionEstablished := input[1].(chan *webrtc.DataChannel) + connectionChange := input[2].(chan webrtc.PeerConnectionState) + connectionClosed := input[3].(chan struct{}) + common.Debugf("Consumer state 1...") + + // Listen for genesis messages + req, err := http.NewRequestWithContext( + ctx, + "GET", + options.DiscoverySrv+options.Endpoint, + nil, + ) + if err != nil { + common.Debugf("Error constructing request") + return 1, []interface{}{peerConnection, connectionEstablished, connectionChange, connectionClosed} + } - res, err := options.HttpClient.Do(req) - if err != nil { - common.Debugf("Couldn't subscribe to genesis stream at %v: %v", options.DiscoverySrv+options.Endpoint, err) - <-time.After(options.ErrorBackoff) - return 1, []interface{}{peerConnection, connectionEstablished, connectionChange, connectionClosed} - } - defer res.Body.Close() + req.Header.Add(common.VersionHeader, common.Version) - // Handle bad protocol version - if res.StatusCode == 418 { - common.Debugf("Received 'bad protocol version' response") - <-time.After(options.ErrorBackoff) - return 1, []interface{}{peerConnection, connectionEstablished, connectionChange, connectionClosed} - } + res, err := options.HttpClient.Do(req) + if err != nil { + common.Debugf("Couldn't subscribe to genesis stream at %v: %v", options.DiscoverySrv+options.Endpoint, err) + <-time.After(options.ErrorBackoff) + return 1, []interface{}{peerConnection, connectionEstablished, connectionChange, connectionClosed} + } + defer res.Body.Close() - // We make a long-lived HTTP request to Freddie. Freddie streams newline-terminated genesis - // messages as they become available. We wait until we hear one genesis message, then continue - // listening for a tunable amount of time ("patience") to see if we might hear a few more - // messages to select from. When either our patience expires or our HTTP request times out, we - // pick a random message from the set we've collected and make an offer for it. - scanner := bufio.NewScanner(res.Body) - genesisMsg := make(chan struct{}) - reqTimeout := make(chan struct{}) - patienceExpired := make(<-chan time.Time) - doneListening := make(chan struct{}, 1) - genesisCandidates := []string{} - - listenLoop: - for { - go func() { - isReqOpen := scanner.Scan() - - if len(doneListening) > 0 { - return - } + // Handle bad protocol version + if res.StatusCode == 418 { + common.Debugf("Received 'bad protocol version' response") + <-time.After(options.ErrorBackoff) + return 1, []interface{}{peerConnection, connectionEstablished, connectionChange, connectionClosed} + } + + // We make a long-lived HTTP request to Freddie. Freddie streams newline-terminated genesis + // messages as they become available. We wait until we hear one genesis message, then continue + // listening for a tunable amount of time ("patience") to see if we might hear a few more + // messages to select from. When either our patience expires or our HTTP request times out, we + // pick a random message from the set we've collected and make an offer for it. + scanner := bufio.NewScanner(res.Body) + genesisMsg := make(chan struct{}) + reqTimeout := make(chan struct{}) + patienceExpired := make(<-chan time.Time) + doneListening := make(chan struct{}, 1) + genesisCandidates := []string{} + + listenLoop: + for { + go func() { + isReqOpen := scanner.Scan() + + if len(doneListening) > 0 { + return + } - if isReqOpen { - genesisMsg <- struct{}{} - return - } + if isReqOpen { + genesisMsg <- struct{}{} + return + } - reqTimeout <- struct{}{} - }() + reqTimeout <- struct{}{} + }() - select { - case <-genesisMsg: - rawMsg := scanner.Bytes() - if err := scanner.Err(); err != nil { - // TODO: what does this error mean? Should we be returning to state 1? - return 1, []interface{}{peerConnection, connectionEstablished, connectionChange, connectionClosed} - } + select { + case <-genesisMsg: + rawMsg := scanner.Bytes() + if err := scanner.Err(); err != nil { + // TODO: what does this error mean? Should we be returning to state 1? + return 1, []interface{}{peerConnection, connectionEstablished, connectionChange, connectionClosed} + } - rt, _, err := common.DecodeSignalMsg(rawMsg) - if err != nil { - common.Debugf("Error decoding signal message: %v (msg: %v)", err, string(rawMsg)) - <-time.After(options.ErrorBackoff) - // Take the error in stride, continue listening to our existing HTTP request stream - continue - } + rt, _, err := common.DecodeSignalMsg(rawMsg) + if err != nil { + common.Debugf("Error decoding signal message: %v (msg: %v)", err, string(rawMsg)) + <-time.After(options.ErrorBackoff) + // Take the error in stride, continue listening to our existing HTTP request stream + continue + } - // TODO: post-MVP, evaluate the genesis message for suitability! - genesisCandidates = append(genesisCandidates, rt) - if len(genesisCandidates) == 1 { - patienceExpired = time.After(options.Patience) - } - case <-reqTimeout: - break listenLoop - case <-patienceExpired: - break listenLoop + // TODO: post-MVP, evaluate the genesis message for suitability! + genesisCandidates = append(genesisCandidates, rt) + if len(genesisCandidates) == 1 { + patienceExpired = time.After(options.Patience) } + case <-reqTimeout: + break listenLoop + case <-patienceExpired: + break listenLoop } + } - doneListening <- struct{}{} - - // Endgame case 1: we never heard any suitable genesis messages, so just restart this state - if len(genesisCandidates) == 0 { - return 1, []interface{}{peerConnection, connectionEstablished, connectionChange, connectionClosed} - } + doneListening <- struct{}{} - // Endgame case 2: create an offer SDP, pick a random genesis candidate, and shoot our shot - sdp, err := peerConnection.CreateOffer(nil) - if err != nil { - // An error creating the offer is troubling, so let's start fresh by resetting the state - common.Debugf("Error creating offer SDP: %v", err) - return 1, []interface{}{peerConnection, connectionEstablished, connectionChange, connectionClosed} - } + // Endgame case 1: we never heard any suitable genesis messages, so just restart this state + if len(genesisCandidates) == 0 { + return 1, []interface{}{peerConnection, connectionEstablished, connectionChange, connectionClosed} + } - idx := rand.Intn(len(genesisCandidates)) - replyTo := genesisCandidates[idx] - - common.Debugf( - "Sending offer for genesis message %v/%v (patience: %v)", - idx+1, - len(genesisCandidates), - options.Patience, - ) - - return 2, []interface{}{ - peerConnection, - replyTo, - sdp, - connectionEstablished, - connectionChange, - connectionClosed, - } - }), - FSMstate(func(ctx context.Context, com *ipcChan, input []interface{}) (int, []interface{}) { - // State 2 - // input[0]: *webrtc.PeerConnection - // input[1]: string (reply-to UUID) - // input[2]: webrtc.SessionDescription (offer) - // input[3]: chan *webrtc.DataChannel - // input[4]: chan webrtc.PeerConnectionState - // input[5]: chan struct{} - peerConnection := input[0].(*webrtc.PeerConnection) - replyTo := input[1].(string) - sdp := input[2].(webrtc.SessionDescription) - connectionEstablished := input[3].(chan *webrtc.DataChannel) - connectionChange := input[4].(chan webrtc.PeerConnectionState) - connectionClosed := input[5].(chan struct{}) - common.Debugf("Consumer state 2...") - - offerJSON, err := json.Marshal(common.OfferMsg{SDP: sdp, Tag: options.Tag}) - if err != nil { - common.Debugf("Error marshaling JSON: %v", err) - return 1, []interface{}{peerConnection, connectionEstablished, connectionChange, connectionClosed} - } + // Endgame case 2: create an offer SDP, pick a random genesis candidate, and shoot our shot + sdp, err := peerConnection.CreateOffer(nil) + if err != nil { + // An error creating the offer is troubling, so let's start fresh by resetting the state + common.Debugf("Error creating offer SDP: %v", err) + return 1, []interface{}{peerConnection, connectionEstablished, connectionChange, connectionClosed} + } + + idx := rand.Intn(len(genesisCandidates)) + replyTo := genesisCandidates[idx] + + common.Debugf( + "Sending offer for genesis message %v/%v (patience: %v)", + idx+1, + len(genesisCandidates), + options.Patience, + ) + + return 2, []interface{}{ + peerConnection, + replyTo, + sdp, + connectionEstablished, + connectionChange, + connectionClosed, + } + }) +} - // Signal the offer - form := url.Values{ - "data": {string(offerJSON)}, - "send-to": {replyTo}, - "type": {strconv.Itoa(int(common.SignalMsgOffer))}, - } +func newFSMstate2(options *WebRTCOptions) FSMstate { + return FSMstate(func(ctx context.Context, com *ipcChan, input []interface{}) (int, []interface{}) { + // State 2 + // input[0]: *webrtc.PeerConnection + // input[1]: string (reply-to UUID) + // input[2]: webrtc.SessionDescription (offer) + // input[3]: chan *webrtc.DataChannel + // input[4]: chan webrtc.PeerConnectionState + // input[5]: chan struct{} + peerConnection := input[0].(*webrtc.PeerConnection) + replyTo := input[1].(string) + sdp := input[2].(webrtc.SessionDescription) + connectionEstablished := input[3].(chan *webrtc.DataChannel) + connectionChange := input[4].(chan webrtc.PeerConnectionState) + connectionClosed := input[5].(chan struct{}) + common.Debugf("Consumer state 2...") + + offerJSON, err := json.Marshal(common.OfferMsg{SDP: sdp, Tag: options.Tag}) + if err != nil { + common.Debugf("Error marshaling JSON: %v", err) + return 1, []interface{}{peerConnection, connectionEstablished, connectionChange, connectionClosed} + } + + // Signal the offer + form := url.Values{ + "data": {string(offerJSON)}, + "send-to": {replyTo}, + "type": {strconv.Itoa(int(common.SignalMsgOffer))}, + } + + req, err := http.NewRequestWithContext( + ctx, + "POST", + options.DiscoverySrv+options.Endpoint, + strings.NewReader(form.Encode()), + ) + if err != nil { + common.Debugf("Error constructing request") + return 1, []interface{}{peerConnection, connectionEstablished, connectionChange, connectionClosed} + } - req, err := http.NewRequestWithContext( - ctx, - "POST", - options.DiscoverySrv+options.Endpoint, - strings.NewReader(form.Encode()), - ) - if err != nil { - common.Debugf("Error constructing request") - return 1, []interface{}{peerConnection, connectionEstablished, connectionChange, connectionClosed} - } + req.Header.Add("Content-Type", "application/x-www-form-urlencoded") + req.Header.Add(common.VersionHeader, common.Version) - req.Header.Add("Content-Type", "application/x-www-form-urlencoded") - req.Header.Add(common.VersionHeader, common.Version) + res, err := options.HttpClient.Do(req) + if err != nil { + common.Debugf("Couldn't signal offer SDP to %v: %v", options.DiscoverySrv+options.Endpoint, err) + <-time.After(options.ErrorBackoff) + return 1, []interface{}{peerConnection, connectionEstablished, connectionChange, connectionClosed} + } + defer res.Body.Close() - res, err := options.HttpClient.Do(req) - if err != nil { - common.Debugf("Couldn't signal offer SDP to %v: %v", options.DiscoverySrv+options.Endpoint, err) - <-time.After(options.ErrorBackoff) - return 1, []interface{}{peerConnection, connectionEstablished, connectionChange, connectionClosed} - } - defer res.Body.Close() - - switch res.StatusCode { - case 418: - common.Debugf("Received 'bad protocol version' response") - <-time.After(options.ErrorBackoff) - return 1, []interface{}{peerConnection, connectionEstablished, connectionChange, connectionClosed} - case 404: - // We didn't win the connection - common.Debugf("Too late for genesis message %v!", replyTo) - return 1, []interface{}{peerConnection, connectionEstablished, connectionChange, connectionClosed} - } + switch res.StatusCode { + case 418: + common.Debugf("Received 'bad protocol version' response") + <-time.After(options.ErrorBackoff) + return 1, []interface{}{peerConnection, connectionEstablished, connectionChange, connectionClosed} + case 404: + // We didn't win the connection + common.Debugf("Too late for genesis message %v!", replyTo) + return 1, []interface{}{peerConnection, connectionEstablished, connectionChange, connectionClosed} + } - // The HTTP request is complete - answerBytes, err := io.ReadAll(res.Body) - if err != nil { - common.Debugf("Error reading body: %v\n", err) - return 1, []interface{}{peerConnection, connectionEstablished, connectionChange, connectionClosed} - } + // The HTTP request is complete + answerBytes, err := io.ReadAll(res.Body) + if err != nil { + common.Debugf("Error reading body: %v\n", err) + return 1, []interface{}{peerConnection, connectionEstablished, connectionChange, connectionClosed} + } - // TODO: Freddie sends back a 0-length body when nobody replied to our message. Is that the - // smartest way to handle this case systemwide? - if len(answerBytes) == 0 { - common.Debugf("No response for our offer SDP!") - return 1, []interface{}{peerConnection, connectionEstablished, connectionChange, connectionClosed} - } + // TODO: Freddie sends back a 0-length body when nobody replied to our message. Is that the + // smartest way to handle this case systemwide? + if len(answerBytes) == 0 { + common.Debugf("No response for our offer SDP!") + return 1, []interface{}{peerConnection, connectionEstablished, connectionChange, connectionClosed} + } - // Looks like we got some kind of response. Should be an answer SDP in a SignalMsg - replyTo, answer, err := common.DecodeSignalMsg(answerBytes) - if err != nil { - common.Debugf("Error decoding signal message: %v (msg: %v)", err, string(answerBytes)) - return 1, []interface{}{peerConnection, connectionEstablished, connectionChange, connectionClosed} - } + // Looks like we got some kind of response. Should be an answer SDP in a SignalMsg + replyTo, answer, err := common.DecodeSignalMsg(answerBytes) + if err != nil { + common.Debugf("Error decoding signal message: %v (msg: %v)", err, string(answerBytes)) + return 1, []interface{}{peerConnection, connectionEstablished, connectionChange, connectionClosed} + } - // TODO: here we assume valid answer SDP, but we need to handle the invalid case too + // TODO: here we assume valid answer SDP, but we need to handle the invalid case too - // Create a channel that's blocked until ICE gathering is complete - gatherComplete := webrtc.GatheringCompletePromise(peerConnection) + // Create a channel that's blocked until ICE gathering is complete + gatherComplete := webrtc.GatheringCompletePromise(peerConnection) - candidates := []webrtc.ICECandidate{} - peerConnection.OnICECandidate(func(c *webrtc.ICECandidate) { - // Interestingly, the null candidate is a nil pointer so we cause a nil ptr dereference - // if we try to append it to the list... so let's just not include it? - if c != nil { - candidates = append(candidates, *c) - } - }) - - // This kicks off ICE candidate gathering - err = peerConnection.SetLocalDescription(sdp) - if err != nil { - common.Debugf("Error setting local description: %v", err) - // Borked! - peerConnection.Close() // TODO: there's an err we should handle here - return 0, []interface{}{} + candidates := []webrtc.ICECandidate{} + peerConnection.OnICECandidate(func(c *webrtc.ICECandidate) { + // Interestingly, the null candidate is a nil pointer so we cause a nil ptr dereference + // if we try to append it to the list... so let's just not include it? + if c != nil { + candidates = append(candidates, *c) } + }) - // Assign the answer to our connection - err = peerConnection.SetRemoteDescription(answer.(webrtc.SessionDescription)) - if err != nil { - common.Debugf("Error setting remote description: %v", err) - // Borked! - peerConnection.Close() // TODO: there's an err we should handle here - return 0, []interface{}{} - } + // This kicks off ICE candidate gathering + err = peerConnection.SetLocalDescription(sdp) + if err != nil { + common.Debugf("Error setting local description: %v", err) + // Borked! + peerConnection.Close() // TODO: there's an err we should handle here + return 0, []interface{}{} + } - select { - case <-gatherComplete: - common.Debug("Ice gathering complete!") - case <-time.After(options.ICEFailTimeout): - common.Debugf("Failed to gather ICE candidates!") - // Borked! - peerConnection.Close() // TODO: there's an err we should handle here - return 0, []interface{}{} - } + // Assign the answer to our connection + err = peerConnection.SetRemoteDescription(answer.(webrtc.SessionDescription)) + if err != nil { + common.Debugf("Error setting remote description: %v", err) + // Borked! + peerConnection.Close() // TODO: there's an err we should handle here + return 0, []interface{}{} + } - return 3, []interface{}{peerConnection, replyTo, candidates, connectionEstablished, connectionChange, connectionClosed} - }), - FSMstate(func(ctx context.Context, com *ipcChan, input []interface{}) (int, []interface{}) { - // State 3 - // input[0]: *webrtc.PeerConnection - // input[1]: string (replyTo) - // input[2]: []webrtc.ICECandidates - // input[3]: chan *webrtc.DataChannel - // input[4]: chan webrtc.PeerConnectionState - // input[5]: chan struct{} - peerConnection := input[0].(*webrtc.PeerConnection) - replyTo := input[1].(string) - candidates := input[2].([]webrtc.ICECandidate) - connectionEstablished := input[3].(chan *webrtc.DataChannel) - connectionChange := input[4].(chan webrtc.PeerConnectionState) - connectionClosed := input[5].(chan struct{}) - common.Debugf("Consumer state 3...") - - candidatesJSON, err := json.Marshal(candidates) - if err != nil { - common.Debugf("Error marshaling JSON: %v", err) - peerConnection.Close() // TODO: there's an err we should handle here - return 0, []interface{}{} - } + select { + case <-gatherComplete: + common.Debug("Ice gathering complete!") + case <-time.After(options.ICEFailTimeout): + common.Debugf("Failed to gather ICE candidates!") + // Borked! + peerConnection.Close() // TODO: there's an err we should handle here + return 0, []interface{}{} + } - // Signal our ICE candidates - form := url.Values{ - "data": {string(candidatesJSON)}, - "send-to": {replyTo}, - "type": {strconv.Itoa(int(common.SignalMsgICE))}, - } + return 3, []interface{}{peerConnection, replyTo, candidates, connectionEstablished, connectionChange, connectionClosed} + }) +} - req, err := http.NewRequestWithContext( - ctx, - "POST", - options.DiscoverySrv+options.Endpoint, - strings.NewReader(form.Encode()), - ) - if err != nil { - common.Debugf("Error constructing request") - // Borked! - peerConnection.Close() // TODO: there's an err we should handle here - return 0, []interface{}{} - } +func newFSMstate3(options *WebRTCOptions) FSMstate { + return FSMstate(func(ctx context.Context, com *ipcChan, input []interface{}) (int, []interface{}) { + // State 3 + // input[0]: *webrtc.PeerConnection + // input[1]: string (replyTo) + // input[2]: []webrtc.ICECandidates + // input[3]: chan *webrtc.DataChannel + // input[4]: chan webrtc.PeerConnectionState + // input[5]: chan struct{} + peerConnection := input[0].(*webrtc.PeerConnection) + replyTo := input[1].(string) + candidates := input[2].([]webrtc.ICECandidate) + connectionEstablished := input[3].(chan *webrtc.DataChannel) + connectionChange := input[4].(chan webrtc.PeerConnectionState) + connectionClosed := input[5].(chan struct{}) + common.Debugf("Consumer state 3...") + + candidatesJSON, err := json.Marshal(candidates) + if err != nil { + common.Debugf("Error marshaling JSON: %v", err) + peerConnection.Close() // TODO: there's an err we should handle here + return 0, []interface{}{} + } + + // Signal our ICE candidates + form := url.Values{ + "data": {string(candidatesJSON)}, + "send-to": {replyTo}, + "type": {strconv.Itoa(int(common.SignalMsgICE))}, + } + + req, err := http.NewRequestWithContext( + ctx, + "POST", + options.DiscoverySrv+options.Endpoint, + strings.NewReader(form.Encode()), + ) + if err != nil { + common.Debugf("Error constructing request") + // Borked! + peerConnection.Close() // TODO: there's an err we should handle here + return 0, []interface{}{} + } - req.Header.Add("Content-Type", "application/x-www-form-urlencoded") - req.Header.Add(common.VersionHeader, common.Version) + req.Header.Add("Content-Type", "application/x-www-form-urlencoded") + req.Header.Add(common.VersionHeader, common.Version) - res, err := options.HttpClient.Do(req) - if err != nil { - common.Debugf("Couldn't signal ICE candidates to %v: %v", options.DiscoverySrv+options.Endpoint, err) - <-time.After(options.ErrorBackoff) - // Borked! - peerConnection.Close() // TODO: there's an err we should handle here - return 0, []interface{}{} - } - defer res.Body.Close() - - switch res.StatusCode { - case 418: - common.Debugf("Received 'bad protocol version' response") - <-time.After(options.ErrorBackoff) - // Borked! - peerConnection.Close() // TODO: there's an err we should handle here - return 0, []interface{}{} - case 404: - common.Debugf("Signaling partner hung up, aborting!") - // Borked! - peerConnection.Close() // TODO: there's an err we should handle here - return 0, []interface{}{} - case 200: - // Signaling is complete, so we can short circuit instead of awaiting the response body - return 4, []interface{}{peerConnection, connectionEstablished, connectionChange, connectionClosed} - } + res, err := options.HttpClient.Do(req) + if err != nil { + common.Debugf("Couldn't signal ICE candidates to %v: %v", options.DiscoverySrv+options.Endpoint, err) + <-time.After(options.ErrorBackoff) + // Borked! + peerConnection.Close() // TODO: there's an err we should handle here + return 0, []interface{}{} + } + defer res.Body.Close() - // This code path should never be reachable + switch res.StatusCode { + case 418: + common.Debugf("Received 'bad protocol version' response") + <-time.After(options.ErrorBackoff) // Borked! peerConnection.Close() // TODO: there's an err we should handle here return 0, []interface{}{} - }), - FSMstate(func(ctx context.Context, com *ipcChan, input []interface{}) (int, []interface{}) { - // State 4 - // input[0]: *webrtc.PeerConnection - // input[1]: chan *webrtc.DataChannel - // input[2]: chan webrtc.PeerConnectionState - // input[3]: chan struct{} - peerConnection := input[0].(*webrtc.PeerConnection) - connectionEstablished := input[1].(chan *webrtc.DataChannel) - connectionChange := input[2].(chan webrtc.PeerConnectionState) - connectionClosed := input[3].(chan struct{}) - common.Debugf("Consumer state 4, signaling complete!") - - // XXX: Let's select some STUN servers to perform NAT behavior discovery for the purpose of - // sending interesting traces revealing the outcome of our NAT traversal attempt - STUNSrvs, err := options.STUNBatch(options.STUNBatchSize) - if err != nil { - common.Debugf("Error creating STUN batch: %v", err) - // Borked! - peerConnection.Close() // TODO: there's an err we should handle here - return 0, []interface{}{} - } + case 404: + common.Debugf("Signaling partner hung up, aborting!") + // Borked! + peerConnection.Close() // TODO: there's an err we should handle here + return 0, []interface{}{} + case 200: + // Signaling is complete, so we can short circuit instead of awaiting the response body + return 4, []interface{}{peerConnection, connectionEstablished, connectionChange, connectionClosed} + } + + // This code path should never be reachable + // Borked! + peerConnection.Close() // TODO: there's an err we should handle here + return 0, []interface{}{} + }) +} +func newFSMstate4(options *WebRTCOptions) FSMstate { + return FSMstate(func(ctx context.Context, com *ipcChan, input []interface{}) (int, []interface{}) { + // State 4 + // input[0]: *webrtc.PeerConnection + // input[1]: chan *webrtc.DataChannel + // input[2]: chan webrtc.PeerConnectionState + // input[3]: chan struct{} + peerConnection := input[0].(*webrtc.PeerConnection) + connectionEstablished := input[1].(chan *webrtc.DataChannel) + connectionChange := input[2].(chan webrtc.PeerConnectionState) + connectionClosed := input[3].(chan struct{}) + common.Debugf("Consumer state 4, signaling complete!") + + // XXX: Let's select some STUN servers to perform NAT behavior discovery for the purpose of + // sending interesting traces revealing the outcome of our NAT traversal attempt + STUNSrvs, err := options.STUNBatch(options.STUNBatchSize) + if err != nil { + common.Debugf("Error creating STUN batch: %v", err) + // Borked! + peerConnection.Close() // TODO: there's an err we should handle here + return 0, []interface{}{} + } + + select { + case d := <-connectionEstablished: + common.Debugf("A WebRTC connection has been established!") + go otel.CollectAndSendNATBehaviorTelemetry(STUNSrvs, "nat_success") + return 5, []interface{}{peerConnection, d, connectionChange, connectionClosed} + case <-time.After(options.NATFailTimeout): + common.Debugf("NAT failure, aborting!") + go otel.CollectAndSendNATBehaviorTelemetry(STUNSrvs, "nat_failure") + // Borked! + peerConnection.Close() // TODO: there's an err we should handle here + return 0, []interface{}{} + } + }) +} + +func newFSMstate5(options *WebRTCOptions) FSMstate { + return FSMstate(func(ctx context.Context, com *ipcChan, input []interface{}) (int, []interface{}) { + // State 5 + // input[0]: *webrtc.PeerConnection + // input[1]: *webrtc.DataChannel + // input[2]: chan webrtc.PeerConnectionState + // input[3]: chan struct{} + peerConnection := input[0].(*webrtc.PeerConnection) + d := input[1].(*webrtc.DataChannel) + connectionChange := input[2].(chan webrtc.PeerConnectionState) + connectionClosed := input[3].(chan struct{}) + + // Send a path assertion IPC message representing the connectivity now provided by this slot + // TODO: post-MVP we shouldn't be hardcoding (*, 1) here... + allowAll := []common.Endpoint{{Host: "*", Distance: 1}} + com.tx <- IPCMsg{IpcType: PathAssertionIPC, Data: common.PathAssertion{Allow: allowAll}} + + // Inbound from datachannel: + d.OnMessage(func(msg webrtc.DataChannelMessage) { select { - case d := <-connectionEstablished: - common.Debugf("A WebRTC connection has been established!") - go otel.CollectAndSendNATBehaviorTelemetry(STUNSrvs, "nat_success") - return 5, []interface{}{peerConnection, d, connectionChange, connectionClosed} - case <-time.After(options.NATFailTimeout): - common.Debugf("NAT failure, aborting!") - go otel.CollectAndSendNATBehaviorTelemetry(STUNSrvs, "nat_failure") - // Borked! - peerConnection.Close() // TODO: there's an err we should handle here - return 0, []interface{}{} + case com.tx <- IPCMsg{IpcType: ChunkIPC, Data: msg.Data}: + // Do nothing, msg sent + default: + // Drop the chunk if we can't keep up with the data rate } - }), - FSMstate(func(ctx context.Context, com *ipcChan, input []interface{}) (int, []interface{}) { - // State 5 - // input[0]: *webrtc.PeerConnection - // input[1]: *webrtc.DataChannel - // input[2]: chan webrtc.PeerConnectionState - // input[3]: chan struct{} - peerConnection := input[0].(*webrtc.PeerConnection) - d := input[1].(*webrtc.DataChannel) - connectionChange := input[2].(chan webrtc.PeerConnectionState) - connectionClosed := input[3].(chan struct{}) - - // Send a path assertion IPC message representing the connectivity now provided by this slot - // TODO: post-MVP we shouldn't be hardcoding (*, 1) here... - allowAll := []common.Endpoint{{Host: "*", Distance: 1}} - com.tx <- IPCMsg{IpcType: PathAssertionIPC, Data: common.PathAssertion{Allow: allowAll}} - - // Inbound from datachannel: - d.OnMessage(func(msg webrtc.DataChannelMessage) { - select { - case com.tx <- IPCMsg{IpcType: ChunkIPC, Data: msg.Data}: - // Do nothing, msg sent - default: - // Drop the chunk if we can't keep up with the data rate + }) + + proxyloop: + for { + select { + // Handle connection failure + case s := <-connectionChange: + if s == webrtc.PeerConnectionStateFailed || s == webrtc.PeerConnectionStateDisconnected { + common.Debugf("Connection failure, resetting!") + break proxyloop } - }) - - proxyloop: - for { - select { - // Handle connection failure - case s := <-connectionChange: - if s == webrtc.PeerConnectionStateFailed || s == webrtc.PeerConnectionStateDisconnected { - common.Debugf("Connection failure, resetting!") + // Handle connection failure for Firefox + case <-connectionClosed: + common.Debugf("Firefox connection failure, resetting!") + break proxyloop + // Handle messages from the router + case msg := <-com.rx: + switch msg.IpcType { + case ChunkIPC: + if err := d.Send(msg.Data.([]byte)); err != nil { + common.Debugf("Error sending to datachannel, resetting!") break proxyloop } - // Handle connection failure for Firefox - case _ = <-connectionClosed: - common.Debugf("Firefox connection failure, resetting!") - break proxyloop - // Handle messages from the router - case msg := <-com.rx: - switch msg.IpcType { - case ChunkIPC: - if err := d.Send(msg.Data.([]byte)); err != nil { - common.Debugf("Error sending to datachannel, resetting!") - break proxyloop - } - } - // Since we're putting this state into an infinite loop, explicitly handle cancellation - case <-ctx.Done(): - break proxyloop } + // Since we're putting this state into an infinite loop, explicitly handle cancellation + case <-ctx.Done(): + break proxyloop } + } - peerConnection.Close() // TODO: there's an err we should handle here - return 0, []interface{}{} - }), + peerConnection.Close() // TODO: there's an err we should handle here + return 0, []interface{}{} }) } diff --git a/clientcore/consumer_test.go b/clientcore/consumer_test.go new file mode 100644 index 0000000..a4e177a --- /dev/null +++ b/clientcore/consumer_test.go @@ -0,0 +1,604 @@ +package clientcore + +import ( + "context" + "encoding/json" + "net/http" + "strconv" + "sync" + "testing" + "time" + + "github.com/h2non/gock" + "github.com/pion/webrtc/v3" + "github.com/stretchr/testify/assert" + + "github.com/getlantern/broflake/common" +) + +// ************************************************************* +// +// FSMstate0 Tests +// +// ************************************************************* +func TestCreatePeerConnection(t *testing.T) { + com := &ipcChan{ + tx: make(chan IPCMsg, 1), + rx: make(chan IPCMsg, 1), + } + opts, stunReqC := getTestOpts() + state, values := newFSMstate0(opts)(context.Background(), com, nil) + + assert.Len(t, stunReqC, 1, "FSMstate0: did not request STUN servers") + + // check that a nil PathAssertion IPCMsg was sent over com.tx + var nilPA bool + select { + case msg := <-com.tx: + nilPA = (msg.IpcType == PathAssertionIPC && msg.Data.(common.PathAssertion).Nil()) + default: + } + + assert.True(t, nilPA, "FSMstate0: did not send nil PathAssertion") + validateState(t, state, 1, "FSMstate0: failed to create peer connection") + + // validate offerer was set up correctly by creating a mock answerer and signaling it + pcOff, ok := values[0].(*webrtc.PeerConnection) + if !ok { + assert.IsType(t, &webrtc.PeerConnection{}, values[0], "FSMstate0: expected returnedValues[0] to be a *webrtc.PeerConnection") + } + + pcOffererConnChangeC, ok := values[2].(chan webrtc.PeerConnectionState) + if !ok { + var cType chan webrtc.PeerConnectionState + assert.IsType(t, cType, values[2], "FSMstate0: expected returnedValues[2] to be a chan webrtc.PeerConnectionState") + } + + if !ok { + assert.FailNow(t, "FSMstate0: did not return expected values") + } + defer pcOff.Close() + + // create mock answerer and try to send signal between offerer and answerer + pcAnswerer, err := newPeerConn() + assertFailIfErr(t, err, "FSMstate0: failed to create mock answerer") + defer pcAnswerer.pc.Close() + + err = signalSDP(pcOff, pcAnswerer.pc) + assertFailIfErr(t, err, "FSMstate0: peerConnection failed to signal mock answerer. Ensure a data channel was reated") + + err = waitUntilConnected(pcOffererConnChangeC, pcAnswerer.connChangeC, 5*time.Second) + assertFailIfErr(t, err, "FSMstate0: peerConnection failed to connect to mock answerer. Ensure connection state is being sent over connectionChange chan") +} + +// func TestState0FailToGetSTUNBatch(t *testing.T) {} + +// ************************************************************* +// +// FSMstate1 Tests +// +// ************************************************************* +func TestCreateOffer(t *testing.T) { + var ( + signalMsg = common.SignalMsg{ + ReplyTo: "someone", + Type: common.SignalMsgGenesis, + Payload: "{}", + } + + connEstablished chan *webrtc.DataChannel + connChanged chan webrtc.PeerConnectionState + connClosed chan struct{} + ) + + pcOfferer, err := newOfferer() + assertFailIfErr(t, err, "FSMstate1: failed to create mock offerer") + defer pcOfferer.pc.Close() + + opts, _ := getTestOpts() + + // intercept request for genesis message + defer gock.Off() + gock.New(opts.DiscoverySrv). + Get(opts.Endpoint). + Reply(200). + JSON(signalMsg) + gock.InterceptClient(opts.HttpClient) + + values := []interface{}{pcOfferer.pc, connEstablished, connChanged, connClosed} + state, values := newFSMstate1(opts)(context.Background(), nil, values) + validateState(t, state, 2, "FSMstate1: failed to create offer") + + assert.Equal(t, signalMsg.ReplyTo, values[1], "FSMstate1: returnedValues[1] does not match expected 'replyTo'") + assert.IsType(t, webrtc.SessionDescription{}, values[2], "FSMstate1: expected returnedValues[2] to be an offer webrtc.SessionDescription") +} + +// func TestState1GenesisMsgReqTimeout(t *testing.T) {} + +// func TestState1GenesisMsgReqBadProtocalResponse(t *testing.T) {} + +// func TestState1NoGenesisReceived(t *testing.T) {} + +// ************************************************************* +// +// FSMstate2 Tests +// +// ************************************************************* +func TestGatherICECandidates(t *testing.T) { + var ( + replyTo = "someone" + connEstablished chan *webrtc.DataChannel + connChanged chan webrtc.PeerConnectionState + connClosed chan struct{} + ) + + pcAnswerer, err := newPeerConn() + assertFailIfErr(t, err, "FSMstate2: failed to create mock answerer") + defer pcAnswerer.pc.Close() + + opts, _ := getTestOpts() + + // intercept offer request + defer gock.Off() + gock.New(opts.DiscoverySrv). + Post(opts.Endpoint). + MatchType("url") + + opts.HttpClient.Transport = &mockSignalMsgTransport{ + pcAns: pcAnswerer.pc, + ICEFailTimeout: opts.ICEFailTimeout, + } + + pcOfferer, err := newOfferer() + assertFailIfErr(t, err, "FSMstate2: failed to create mock offerer") + defer pcOfferer.pc.Close() + + sdp, err := pcOfferer.pc.CreateOffer(nil) + assert.NoError(t, err) + + values := []interface{}{pcOfferer.pc, replyTo, sdp, connEstablished, connChanged, connClosed} + state, _ := newFSMstate2(opts)(context.Background(), nil, values) + validateState(t, state, 3, "FSMstate2: failed to connect to answerer") +} + +// func TestRequestAnswerBadProtocalResponse(t *testing.T) {} + +// TODO: @nelson, this may need to be renamed as I don't know if this is the right description for what's happening +// func TestRequestAnswerGenesisMsgExpired(t *testing.T) {} + +// func TestRequestAnswerNoAnswerReceived(t *testing.T) {} + +// func TestRequestAnswerIceCandidateTimeout(t *testing.T) {} + +// ************************************************************* +// +// FSMstate3 Tests +// +// ************************************************************* +func TestSignalICECandidates(t *testing.T) { + var ( + replyTo = "someone" + candidates = []webrtc.ICECandidate{ + {Foundation: "foundation1", Priority: 1}, + {Foundation: "foundation2", Priority: 2}, + {Foundation: "foundation3", Priority: 3}, + } + + connEstablished chan *webrtc.DataChannel + connChanged chan webrtc.PeerConnectionState + connClosed chan struct{} + ) + + opts, _ := getTestOpts() + + // intercept ICE candidate request + defer gock.Off() + gock.New(opts.DiscoverySrv). + Post(opts.Endpoint). + MatchType("url") + + opts.HttpClient.Transport = &mockSignalMsgTransport{} + + pcOfferer, err := newOfferer() + assertFailIfErr(t, err, "FSMstate3: failed to create mock offerer") + defer pcOfferer.pc.Close() + + values := []interface{}{pcOfferer.pc, replyTo, candidates, connEstablished, connChanged, connClosed} + state, _ := newFSMstate3(opts)(context.Background(), nil, values) + validateState(t, state, 4, "FSMstate3: failed to send ICE candidates") +} + +// func TestSendICECandidatesBadProtocalResponse(t *testing.T) {} + +// func TestSendICECandidatesSignalerHungUp(t *testing.T) {} + +// ************************************************************* +// +// FSMstate4 Tests +// +// ************************************************************* +func TestConnectionEstablished(t *testing.T) { + var ( + pc = &webrtc.PeerConnection{} + dcC = make(chan *webrtc.DataChannel, 1) + + connChanged chan webrtc.PeerConnectionState + connClosed chan struct{} + ) + + dcC <- &webrtc.DataChannel{} + opts, stunReqC := getTestOpts() + + values := []interface{}{pc, dcC, connChanged, connClosed} + state, _ := newFSMstate4(opts)(context.Background(), nil, values) + + assert.Len(t, stunReqC, 1, "FSMstate4: did not request STUN servers") + assert.Len(t, dcC, 0, "FSMstate4: did not recieve dataChannel") + + validateState(t, state, 5, "FSMstate4: failed to verify connection established") +} + +// func TestState4FailToGetSTUNBatch(t *testing.T) {} + +// func TestConnectionNATTimeout(t *testing.T) {} + +// ************************************************************* +// +// FSMstate5 Tests +// +// ************************************************************* +func TestDataProxying(t *testing.T) { + var ( + connClosed chan struct{} + com = &ipcChan{ + tx: make(chan IPCMsg, 3), + rx: make(chan IPCMsg), + } + stateC = make(chan int) + dcOpen = make(chan bool) + + // test message and response + konamiCode = "uuddlrlrba" // hack the system ;) + inputRes = "cheat activated" + ) + + // create mock offerer and answerer + pcOfferer, err := newOfferer() + assertFailIfErr(t, err, "FSMstate5: failed to create mock offerer") + defer pcOfferer.pc.Close() + + pcAnswerer, err := newPeerConn() + assertFailIfErr(t, err, "FSMstate5: failed to create mock answerer") + defer pcAnswerer.pc.Close() + + // set up data channel on answerer to validate the correct message is received + // and send back a response + pcAnswerer.pc.OnDataChannel(func(dc *webrtc.DataChannel) { + dc.OnMessage(func(msg webrtc.DataChannelMessage) { + // validate data is received and is correct msg + if assert.Equal(t, string(msg.Data), konamiCode, "FSMstate5: did not proxy correct msg") { + dc.Send([]byte(inputRes)) + } + }) + dc.OnOpen(func() { + dcOpen <- true + }) + }) + + if err = signalSDP(pcOfferer.pc, pcAnswerer.pc); err != nil { + assert.FailNow(t, "FSMstate5: peerConnection failed to signal mock answerer") + } + + // wait for data channel to open + select { + case <-dcOpen: + case <-time.After(5 * time.Second): + assert.FailNow(t, "FSMstate5: data channel did not open") + } + + ctx, cancel := context.WithCancel(context.Background()) + defer func() { + if ctx.Err() == nil { + cancel() + time.Sleep(250 * time.Millisecond) + } + }() + + opts, _ := getTestOpts() + go func() { + values := []interface{}{pcOfferer.pc, pcOfferer.dc, pcOfferer.connChangeC, connClosed} + state, _ := newFSMstate5(opts)(ctx, com, values) + stateC <- state + }() + + // wait for a non-nil PathAssertion IPCMsg + select { + case msg := <-com.tx: + if msg.IpcType != PathAssertionIPC || msg.Data.(common.PathAssertion).Nil() { + assert.Failf(t, "FSMstate5: expected non-nil PathAssertion IPCMsg", "got %+v", msg) + } + case <-time.After(250 * time.Millisecond): + assert.Fail(t, "FSMstate5: did not send PathAssertion IPCMsg") + } + + // send ChunkIPC IPCMsg with our test msg + // this will be sent over com.rx to a consumer in FSMstate5 which will proxy it to the answerer + // over the peerConnection data channel. The answerer will then respond with inputRes which the + // consumer will forward directly over com.tx + msg := IPCMsg{ + IpcType: ChunkIPC, + Data: []byte(konamiCode), + } + select { + case com.rx <- msg: + // validate response is received on com.tx + select { + case msg := <-com.tx: + assert.Equal(t, string(msg.Data.([]byte)), inputRes, "FSMstate5: did not proxy correct response") + case <-time.After(2 * time.Second): + assert.Fail(t, "FSMstate5: did not send response msg over com.tx") + } + case <-time.After(500 * time.Millisecond): + assert.Fail(t, "FSMstate5: did not read msg from com.rx") + } + + // signal that we're done and the connection should be closed + cancel() + select { + case state := <-stateC: + validateState(t, state, 0, "FSMstate5: failed to reset FSM") + case <-time.After(500 * time.Millisecond): + assert.Fail(t, "FSMstate5: did not cancel connection and reset FSM") + } +} + +// func TestFailedConnection(t *testing.T) {} + +// func TestConnectionClosed(t *testing.T) {} + +// ************************************************************* +// +// helper funcs +// +// ************************************************************* +// validateState validates gotState equals wantState +// otherwise fails test immediately with failMsg +func validateState(t *testing.T, gotState, wantState int, failMsg string) { + if !assert.Equal(t, wantState, gotState, "should be in state %d", wantState) { + assert.FailNow(t, failMsg) + } +} + +type peerConn struct { + pc *webrtc.PeerConnection + connChangeC chan webrtc.PeerConnectionState + dc *webrtc.DataChannel +} + +func newOfferer() (*peerConn, error) { + conn, err := newPeerConn() + if err != nil { + return nil, err + } + + conn.dc, err = conn.pc.CreateDataChannel("data_chan", nil) + return conn, err +} + +func newPeerConn() (*peerConn, error) { + pc, err := webrtc.NewPeerConnection(webrtc.Configuration{}) + if err != nil { + return nil, err + } + + conn := peerConn{ + pc: pc, + connChangeC: make(chan webrtc.PeerConnectionState), + } + + pc.OnConnectionStateChange(func(state webrtc.PeerConnectionState) { + conn.connChangeC <- state + }) + return &conn, nil +} + +// signalSDP creates an offer and answer and signals them between the two peerConnections. +// Returns nil if successful, otherwise returns error. +func signalSDP(pcOff, pcAns *webrtc.PeerConnection) error { + offer, err := pcOff.CreateOffer(nil) + if err != nil { + return err + } + + offererGatheringComplete := webrtc.GatheringCompletePromise(pcOff) + if err = pcOff.SetLocalDescription(offer); err != nil { + return err + } + + <-offererGatheringComplete + if err = pcAns.SetRemoteDescription(*pcOff.LocalDescription()); err != nil { + return err + } + + answer, err := pcAns.CreateAnswer(nil) + if err != nil { + return err + } + + answererGatheringComplete := webrtc.GatheringCompletePromise(pcAns) + if err = pcAns.SetLocalDescription(answer); err != nil { + return err + } + + <-answererGatheringComplete + return pcOff.SetRemoteDescription(*pcAns.LocalDescription()) +} + +// waitUntilConnected waits until both offerer and answerer are connected +// Returns nil if successful, otherwise returns error. +func waitUntilConnected(pcO, pcA chan webrtc.PeerConnectionState, timeout time.Duration) error { + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + + var wg sync.WaitGroup + wait := func(stateC chan webrtc.PeerConnectionState) { + defer wg.Done() + for { + select { + case state := <-stateC: + if state == webrtc.PeerConnectionStateConnected { + return + } + case <-ctx.Done(): + return + } + } + } + + wg.Add(2) + go wait(pcO) + go wait(pcA) + + wg.Wait() + return ctx.Err() +} + +// getTestOpts returns the default WebRTCOptions with a mock STUNBatch function +// and a buffered channel to signal when the mock STUNBatch function is called. The +// discovery server and endpoint are set to a dummy value to prevent any +// requests from being sent. +func getTestOpts() (*WebRTCOptions, chan struct{}) { + opts := NewDefaultWebRTCOptions() + opts.DiscoverySrv = "http://server.com" + opts.Endpoint = "/" + + c := make(chan struct{}, 1) + opts.STUNBatch = func(size uint32) (batch []string, err error) { + c <- struct{}{} + return []string{}, nil + } + opts.STUNBatchSize = 1 + return opts, c +} + +func assertFailIfErr(t *testing.T, err error, failMsg string) { + if !assert.NoError(t, err) { + assert.FailNow(t, failMsg, err) + } +} + +// mockSignalMsgTransport is a mock http.RoundTripper that intercepts SignalMsg requests. +// It then validates neither the data, send-to, or type fields are empty and sends the +// appropriate response based on the type field. +type mockSignalMsgTransport struct { + mutex sync.Mutex + pcAns *webrtc.PeerConnection + ICEFailTimeout time.Duration +} + +func (m *mockSignalMsgTransport) RoundTrip(req *http.Request) (*http.Response, error) { + m.mutex.Lock() + mock, err := gock.MatchMock(req) + if err != nil { + m.mutex.Unlock() + return nil, err + } + + if err = req.ParseForm(); err != nil { + m.mutex.Unlock() + return nil, err + } + + var ( + data string + sendTo string + msgType string + httpRes *http.Response + + mockRes = mock.Response() + Responder = gock.Responder + ) + + // validate required fields (data, send-to, type) are present in request + data = req.Form.Get("data") + sendTo = req.Form.Get("send-to") + msgType = req.Form.Get("type") + m.mutex.Unlock() + + if data == "" { + mockRes.Status(418).BodyString("missing data field in request") + return Responder(req, mockRes, httpRes) + } + + if sendTo == "" { + mockRes.Status(418).BodyString("missing send-to field in request") + return Responder(req, mockRes, httpRes) + } + + mt, err := strconv.Atoi(msgType) + if err != nil { + mockRes.Status(418).BodyString(err.Error()) + return Responder(req, mockRes, httpRes) + } + + switch common.SignalMsgType(mt) { + case common.SignalMsgOffer: + // parse offer from data field and create answer to send back + var offerJSON common.OfferMsg + if err = json.Unmarshal([]byte(data), &offerJSON); err != nil { + mockRes.Status(418).BodyString(err.Error()) + return Responder(req, mockRes, httpRes) + } + + pcAns := m.pcAns + if err = pcAns.SetRemoteDescription(offerJSON.SDP); err != nil { + mockRes.Status(418).BodyString(err.Error()) + return Responder(req, mockRes, httpRes) + } + + answer, err := pcAns.CreateAnswer(nil) + if err != nil { + mockRes.Status(418).BodyString(err.Error()) + return Responder(req, mockRes, httpRes) + } + + answererGatheringComplete := webrtc.GatheringCompletePromise(pcAns) + if err = pcAns.SetLocalDescription(answer); err != nil { + mockRes.Status(418).BodyString(err.Error()) + return Responder(req, mockRes, httpRes) + } + + select { + case <-answererGatheringComplete: + case <-time.After(m.ICEFailTimeout): + mockRes.Status(418).BodyString("answerer failed to gather ICE candidates") + return Responder(req, mockRes, httpRes) + } + + answerJSON, err := json.Marshal(pcAns.LocalDescription()) + if err != nil { + mockRes.Status(418).BodyString(err.Error()) + return Responder(req, mockRes, httpRes) + } + + signalMsg := common.SignalMsg{ + ReplyTo: sendTo, + Type: common.SignalMsgAnswer, + Payload: string(answerJSON), + } + + mockRes.JSON(signalMsg) + case common.SignalMsgICE: + // validate ICE candidates sent in data field + var candidates []webrtc.ICECandidate + if err = json.Unmarshal([]byte(data), &candidates); err != nil { + mockRes.Status(418).BodyString("failed to unmarshal ICE candidates") + return Responder(req, mockRes, httpRes) + } + } + + return Responder(req, mockRes.Status(200), httpRes) +} + +// CancelRequest is a no-op function +func (m *mockSignalMsgTransport) CancelRequest(req *http.Request) {} diff --git a/go.mod b/go.mod index 199cc34..34683f9 100644 --- a/go.mod +++ b/go.mod @@ -9,8 +9,10 @@ require ( github.com/enobufs/go-nats v0.0.1 github.com/getlantern/telemetry v0.0.0-20230523155019-be7c1d8cd8cb github.com/google/uuid v1.3.0 + github.com/h2non/gock v1.2.0 github.com/pion/webrtc/v3 v3.2.6 github.com/quic-go/quic-go v0.34.0 + github.com/stretchr/testify v1.8.3 go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.42.0 go.opentelemetry.io/otel v1.16.0 go.opentelemetry.io/otel/metric v1.16.0 @@ -30,6 +32,7 @@ require ( github.com/golang/protobuf v1.5.3 // indirect github.com/google/pprof v0.0.0-20210407192527-94a9f03dee38 // indirect github.com/grpc-ecosystem/grpc-gateway/v2 v2.7.0 // indirect + github.com/h2non/parth v0.0.0-20190131123155-b4df798d6542 // indirect github.com/klauspost/compress v1.10.3 // indirect github.com/onsi/ginkgo/v2 v2.2.0 // indirect github.com/pion/datachannel v1.5.5 // indirect @@ -53,7 +56,6 @@ require ( github.com/pmezard/go-difflib v1.0.0 // indirect github.com/quic-go/qtls-go1-19 v0.3.2 // indirect github.com/quic-go/qtls-go1-20 v0.2.2 // indirect - github.com/stretchr/testify v1.8.3 // indirect go.opentelemetry.io/otel/exporters/otlp/internal/retry v1.16.0 // indirect go.opentelemetry.io/otel/exporters/otlp/otlpmetric v0.39.0 // indirect go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp v0.39.0 // indirect diff --git a/go.sum b/go.sum index 11d9d7e..aa1f833 100644 --- a/go.sum +++ b/go.sum @@ -167,6 +167,10 @@ github.com/gorilla/websocket v1.4.1/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/ad github.com/grpc-ecosystem/grpc-gateway v1.16.0/go.mod h1:BDjrQk3hbvj6Nolgz8mAMFbcEtjT1g+wF4CSlocrBnw= github.com/grpc-ecosystem/grpc-gateway/v2 v2.7.0 h1:BZHcxBETFHIdVyhyEfOvn/RdU/QGdLI4y34qQGjGWO0= github.com/grpc-ecosystem/grpc-gateway/v2 v2.7.0/go.mod h1:hgWBS7lorOAVIJEQMi4ZsPv9hVvWI6+ch50m39Pf2Ks= +github.com/h2non/gock v1.2.0 h1:K6ol8rfrRkUOefooBC8elXoaNGYkpp7y2qcxGG6BzUE= +github.com/h2non/gock v1.2.0/go.mod h1:tNhoxHYW2W42cYkYb1WqzdbYIieALC99kpYr7rH/BQk= +github.com/h2non/parth v0.0.0-20190131123155-b4df798d6542 h1:2VTzZjLZBgl62/EtslCrtky5vbi9dd7HrQPQIx6wqiw= +github.com/h2non/parth v0.0.0-20190131123155-b4df798d6542/go.mod h1:Ow0tF8D4Kplbc8s8sSb3V2oUCygFHVp8gC3Dn6U4MNI= github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= @@ -192,6 +196,8 @@ github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421 h1:ZqeYNhU3OH github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742 h1:Esafd1046DLDQ0W1YjYsBW+p8U2u7vzgW2SQVmlNazg= github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0= +github.com/nbio/st v0.0.0-20140626010706-e9e8d9816f32 h1:W6apQkHrMkS0Muv8G/TipAy/FJl/rCYT0+EuS8+Z0z4= +github.com/nbio/st v0.0.0-20140626010706-e9e8d9816f32/go.mod h1:9wM+0iRr9ahx58uYLpLIr5fm8diHn0JbqRycJi6w0Ms= github.com/noahlevenson/go-nats v0.0.0-20230720174341-49df1f749775 h1:CVBqDCqhtrS2etCKGuwruUkwg3f/axVpa2Il5IQQtEs= github.com/noahlevenson/go-nats v0.0.0-20230720174341-49df1f749775/go.mod h1:dXVvPZcJIwdWDH5ZXQ8oVA7dz+Ecu9TPF+6biaMl1dY= github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A=