From 067396d75771df542d3188db90188cfef5657443 Mon Sep 17 00:00:00 2001 From: Sean DuBois Date: Fri, 15 Sep 2023 00:19:11 -0400 Subject: [PATCH] Add Simulcast RTCP Test Resolves #1803 --- peerconnection_media_test.go | 135 +++++++++++++++++++++++++---------- 1 file changed, 98 insertions(+), 37 deletions(-) diff --git a/peerconnection_media_test.go b/peerconnection_media_test.go index ead8c44df44..6d92d2357c2 100644 --- a/peerconnection_media_test.go +++ b/peerconnection_media_test.go @@ -15,6 +15,7 @@ import ( "io" "strings" "sync" + "sync/atomic" "testing" "time" @@ -1212,56 +1213,53 @@ func TestPeerConnection_Simulcast(t *testing.T) { defer report() rids := []string{"a", "b", "c"} - var ridMapLock sync.RWMutex - ridMap := map[string]int{} - assertRidCorrect := func(t *testing.T) { - ridMapLock.Lock() - defer ridMapLock.Unlock() - - for _, rid := range rids { - assert.Equal(t, ridMap[rid], 1) - } - assert.Equal(t, len(ridMap), 3) - } - - ridsFullfilled := func() bool { - ridMapLock.Lock() - defer ridMapLock.Unlock() - - ridCount := len(ridMap) - return ridCount == 3 - } + t.Run("E2E", func(t *testing.T) { + pcOffer, pcAnswer, err := newPair() + assert.NoError(t, err) - onTrackHandler := func(trackRemote *TrackRemote, _ *RTPReceiver) { - ridMapLock.Lock() - defer ridMapLock.Unlock() - ridMap[trackRemote.RID()] = ridMap[trackRemote.RID()] + 1 - } + vp8WriterA, err := NewTrackLocalStaticRTP(RTPCodecCapability{MimeType: MimeTypeVP8}, "video", "pion2", WithRTPStreamID(rids[0])) + assert.NoError(t, err) - t.Run("RTP Extension Based", func(t *testing.T) { - pcOffer, pcAnswer, err := newPair() + vp8WriterB, err := NewTrackLocalStaticRTP(RTPCodecCapability{MimeType: MimeTypeVP8}, "video", "pion2", WithRTPStreamID(rids[1])) assert.NoError(t, err) - vp8WriterA, err := NewTrackLocalStaticRTP(RTPCodecCapability{MimeType: MimeTypeVP8}, "video", "pion2", WithRTPStreamID("a")) + vp8WriterC, err := NewTrackLocalStaticRTP(RTPCodecCapability{MimeType: MimeTypeVP8}, "video", "pion2", WithRTPStreamID(rids[2])) assert.NoError(t, err) sender, err := pcOffer.AddTrack(vp8WriterA) assert.NoError(t, err) assert.NotNil(t, sender) - vp8WriterB, err := NewTrackLocalStaticRTP(RTPCodecCapability{MimeType: MimeTypeVP8}, "video", "pion2", WithRTPStreamID("b")) - assert.NoError(t, err) - err = sender.AddEncoding(vp8WriterB) - assert.NoError(t, err) + assert.NoError(t, sender.AddEncoding(vp8WriterB)) + assert.NoError(t, sender.AddEncoding(vp8WriterC)) - vp8WriterC, err := NewTrackLocalStaticRTP(RTPCodecCapability{MimeType: MimeTypeVP8}, "video", "pion2", WithRTPStreamID("c")) - assert.NoError(t, err) - err = sender.AddEncoding(vp8WriterC) - assert.NoError(t, err) + var ridMapLock sync.RWMutex + ridMap := map[string]int{} + + assertRidCorrect := func(t *testing.T) { + ridMapLock.Lock() + defer ridMapLock.Unlock() - ridMap = map[string]int{} - pcAnswer.OnTrack(onTrackHandler) + for _, rid := range rids { + assert.Equal(t, ridMap[rid], 1) + } + assert.Equal(t, len(ridMap), 3) + } + + ridsFullfilled := func() bool { + ridMapLock.Lock() + defer ridMapLock.Unlock() + + ridCount := len(ridMap) + return ridCount == 3 + } + + pcAnswer.OnTrack(func(trackRemote *TrackRemote, _ *RTPReceiver) { + ridMapLock.Lock() + defer ridMapLock.Unlock() + ridMap[trackRemote.RID()] = ridMap[trackRemote.RID()] + 1 + }) parameters := sender.GetParameters() assert.Equal(t, "a", parameters.Encodings[0].RID) @@ -1304,6 +1302,69 @@ func TestPeerConnection_Simulcast(t *testing.T) { assertRidCorrect(t) closePairNow(t, pcOffer, pcAnswer) }) + + t.Run("RTCP", func(t *testing.T) { + pcOffer, pcAnswer, err := newPair() + assert.NoError(t, err) + + vp8WriterA, err := NewTrackLocalStaticRTP(RTPCodecCapability{MimeType: MimeTypeVP8}, "video", "pion2", WithRTPStreamID(rids[0])) + assert.NoError(t, err) + + vp8WriterB, err := NewTrackLocalStaticRTP(RTPCodecCapability{MimeType: MimeTypeVP8}, "video", "pion2", WithRTPStreamID(rids[1])) + assert.NoError(t, err) + + vp8WriterC, err := NewTrackLocalStaticRTP(RTPCodecCapability{MimeType: MimeTypeVP8}, "video", "pion2", WithRTPStreamID(rids[2])) + assert.NoError(t, err) + + sender, err := pcOffer.AddTrack(vp8WriterA) + assert.NoError(t, err) + assert.NotNil(t, sender) + + assert.NoError(t, sender.AddEncoding(vp8WriterB)) + assert.NoError(t, sender.AddEncoding(vp8WriterC)) + + rtcpCounter := uint64(0) + pcAnswer.OnTrack(func(trackRemote *TrackRemote, receiver *RTPReceiver) { + _, _, err = receiver.ReadSimulcastRTCP(trackRemote.RID()) + assert.NoError(t, err) + atomic.AddUint64(&rtcpCounter, 1) + }) + + var midID, ridID uint8 + for _, extension := range sender.GetParameters().HeaderExtensions { + switch extension.URI { + case sdp.SDESMidURI: + midID = uint8(extension.ID) + case sdp.SDESRTPStreamIDURI: + ridID = uint8(extension.ID) + } + } + assert.NotZero(t, midID) + assert.NotZero(t, ridID) + + assert.NoError(t, signalPair(pcOffer, pcAnswer)) + + for sequenceNumber := uint16(0); atomic.LoadUint64(&rtcpCounter) < 3; sequenceNumber++ { + time.Sleep(20 * time.Millisecond) + + for _, track := range []*TrackLocalStaticRTP{vp8WriterA, vp8WriterB, vp8WriterC} { + pkt := &rtp.Packet{ + Header: rtp.Header{ + Version: 2, + SequenceNumber: sequenceNumber, + PayloadType: 96, + }, + Payload: []byte{0x00}, + } + assert.NoError(t, pkt.Header.SetExtension(midID, []byte("0"))) + assert.NoError(t, pkt.Header.SetExtension(ridID, []byte(track.RID()))) + + assert.NoError(t, track.WriteRTP(pkt)) + } + } + + closePairNow(t, pcOffer, pcAnswer) + }) } // Everytime we receieve a new SSRC we probe it and try to determine the proper way to handle it.