Skip to content

Commit

Permalink
Fire OnTrack before first RTP (non-simulcast)
Browse files Browse the repository at this point in the history
Prior to this, we would wait for a single RTP packet to
figure out the codec which is not to spec.
  • Loading branch information
edaniels committed Oct 15, 2024
1 parent 8fbf821 commit 4e3bbe5
Show file tree
Hide file tree
Showing 4 changed files with 64 additions and 71 deletions.
16 changes: 1 addition & 15 deletions peerconnection.go
Original file line number Diff line number Diff line change
Expand Up @@ -1245,21 +1245,7 @@ func (pc *PeerConnection) startReceiver(incoming trackDetails, receiver *RTPRece
return
}

go func(track *TrackRemote) {
b := make([]byte, pc.api.settingEngine.getReceiveMTU())
n, _, err := track.peek(b)
if err != nil {
pc.log.Warnf("Could not determine PayloadType for SSRC %d (%s)", track.SSRC(), err)
return
}

if err = track.checkAndUpdateTrack(b[:n]); err != nil {
pc.log.Warnf("Failed to set codec settings for track SSRC %d (%s)", track.SSRC(), err)
return
}

pc.onTrack(track, receiver)
}(t)
pc.onTrack(t, receiver)
}
}

Expand Down
66 changes: 23 additions & 43 deletions peerconnection_go_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"time"

"github.com/pion/ice/v4"
"github.com/pion/rtp"
"github.com/pion/transport/v3/test"
"github.com/pion/transport/v3/vnet"
"github.com/pion/webrtc/v4/internal/util"
Expand Down Expand Up @@ -1000,9 +999,11 @@ func TestICERestart_Error_Handling(t *testing.T) {
}

type trackRecords struct {
mu sync.Mutex
trackIDs map[string]struct{}
receivedTrackIDs map[string]struct{}
mu sync.Mutex
trackIDs map[string]struct{}
receivedTrackIDs map[string]struct{}
onAllTracksReceived chan struct{}
onAllTracksReceivedOnce sync.Once
}

func (r *trackRecords) newTrack() (*TrackLocalStaticRTP, error) {
Expand All @@ -1019,6 +1020,11 @@ func (r *trackRecords) handleTrack(t *TrackRemote, _ *RTPReceiver) {
if _, exist := r.trackIDs[tID]; exist {
r.receivedTrackIDs[tID] = struct{}{}
}
if len(r.receivedTrackIDs) == len(r.trackIDs) {
r.onAllTracksReceivedOnce.Do(func() {
close(r.onAllTracksReceived)
})
}
}

func (r *trackRecords) remains() int {
Expand All @@ -1031,32 +1037,16 @@ func (r *trackRecords) remains() int {
func TestPeerConnection_MassiveTracks(t *testing.T) {
var (
tRecs = &trackRecords{
trackIDs: make(map[string]struct{}),
receivedTrackIDs: make(map[string]struct{}),
trackIDs: make(map[string]struct{}),
receivedTrackIDs: make(map[string]struct{}),
onAllTracksReceived: make(chan struct{}),
}
tracks = []*TrackLocalStaticRTP{}
trackCount = 256
pingInterval = 1 * time.Second
noiseInterval = 100 * time.Microsecond
timeoutDuration = 20 * time.Second
rawPkt = []byte{
0x90, 0xe0, 0x69, 0x8f, 0xd9, 0xc2, 0x93, 0xda, 0x1c, 0x64,
0x27, 0x82, 0x00, 0x01, 0x00, 0x01, 0xFF, 0xFF, 0xFF, 0xFF, 0x98, 0x36, 0xbe, 0x88, 0x9e,
}
samplePkt = &rtp.Packet{
Header: rtp.Header{
Marker: true,
Extension: false,
ExtensionProfile: 1,
Version: 2,
SequenceNumber: 27023,
Timestamp: 3653407706,
CSRC: []uint32{},
},
Payload: rawPkt[20:],
}
connected = make(chan struct{})
stopped = make(chan struct{})
connected = make(chan struct{})
stopped = make(chan struct{})
)
offerPC, answerPC, err := newPair()
assert.NoError(t, err)
Expand All @@ -1066,7 +1056,6 @@ func TestPeerConnection_MassiveTracks(t *testing.T) {
assert.NoError(t, err)
_, err = offerPC.AddTrack(track)
assert.NoError(t, err)
tracks = append(tracks, track)
}
answerPC.OnTrack(tRecs.handleTrack)
offerPC.OnICEConnectionStateChange(func(s ICEConnectionState) {
Expand All @@ -1088,26 +1077,17 @@ func TestPeerConnection_MassiveTracks(t *testing.T) {
}
}()
assert.NoError(t, signalPair(offerPC, answerPC))
// Send a RTP packets to each track to trigger track event after connected.
<-connected
time.Sleep(1 * time.Second)
for _, track := range tracks {
assert.NoError(t, track.WriteRTP(samplePkt))
}
// Ping trackRecords to see if any track event not received yet.
tooLong := time.After(timeoutDuration)
for {
remains := tRecs.remains()
if remains == 0 {
break
}
t.Log("remain tracks", remains)
time.Sleep(pingInterval)
select {
case <-tooLong:
t.Error("unable to receive all track events in time")
default:
}
pingTicker := time.NewTicker(pingInterval)
defer pingTicker.Stop()
select {
case <-tRecs.onAllTracksReceived:
case <-tooLong:
t.Error("unable to receive all track events in time")
case <-pingTicker.C:
t.Log("remain tracks", tRecs.remains())
}
close(stopped)
closePairNow(t, offerPC, answerPC)
Expand Down
49 changes: 36 additions & 13 deletions peerconnection_media_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import (
"testing"
"time"

"github.com/pion/interceptor"
mock_interceptor "github.com/pion/interceptor/pkg/mock"
"github.com/pion/logging"
"github.com/pion/rtcp"
"github.com/pion/rtp"
Expand Down Expand Up @@ -1046,11 +1048,34 @@ func TestPeerConnection_Simulcast_Probe(t *testing.T) {

m := &MediaEngine{}
assert.NoError(t, m.RegisterDefaultCodecs())
ir := &interceptor.Registry{}

trackReadDone := make(chan struct{})
ir.Add(&mock_interceptor.Factory{
NewInterceptorFn: func(_ string) (interceptor.Interceptor, error) {
return &mock_interceptor.Interceptor{
BindRemoteStreamFn: func(_ *interceptor.StreamInfo, reader interceptor.RTPReader) interceptor.RTPReader {
count := int64(0)
return interceptor.RTPReaderFunc(func(b []byte, a interceptor.Attributes) (int, interceptor.Attributes, error) {
if a == nil {
a = interceptor.Attributes{}
}
if atomic.AddInt64(&count, 1) > 5 {
// confirm read before sending any more packets for probing
<-trackReadDone
}
return reader.Read(b, a)
})
},
}, nil
},
})

assert.NoError(t, ConfigureSimulcastExtensionHeaders(m))

pcOffer, pcAnswer, err := NewAPI(WithSettingEngine(SettingEngine{
LoggerFactory: &undeclaredSsrcLoggerFactory{unhandledSimulcastError},
}), WithMediaEngine(m)).newPair(Configuration{})
}), WithMediaEngine(m), WithInterceptorRegistry(ir)).newPair(Configuration{})
assert.NoError(t, err)

firstTrack, err := NewTrackLocalStaticRTP(RTPCodecCapability{MimeType: MimeTypeVP8}, "firstTrack", "firstTrack")
Expand Down Expand Up @@ -1097,6 +1122,7 @@ func TestPeerConnection_Simulcast_Probe(t *testing.T) {
time.Sleep(20 * time.Millisecond)
}

// establish undeclared SSRC (half number of probes)
for ; sequenceNumber <= 5; sequenceNumber++ {
sendRTPPacket()
}
Expand All @@ -1108,15 +1134,12 @@ func TestPeerConnection_Simulcast_Probe(t *testing.T) {

assert.NoError(t, signalPair(pcOffer, pcAnswer))

trackRemote := func() *TrackRemote {
for {
select {
case t := <-trackRemoteChan:
return t
default:
sendRTPPacket()
}
}
trackRemote := <-trackRemoteChan

go func() {
_, _, err = trackRemote.Read(make([]byte, 1500))
assert.NoError(t, err)
close(trackReadDone)
}()

func() {
Expand All @@ -1130,9 +1153,7 @@ func TestPeerConnection_Simulcast_Probe(t *testing.T) {
}
}()

_, _, err = trackRemote.Read(make([]byte, 1500))
assert.NoError(t, err)

<-trackReadDone
closePairNow(t, pcOffer, pcAnswer)
})
}
Expand Down Expand Up @@ -1759,6 +1780,8 @@ func TestPeerConnection_Zero_PayloadType(t *testing.T) {
trackFired := make(chan struct{})

pcAnswer.OnTrack(func(track *TrackRemote, _ *RTPReceiver) {
_, _, err = track.Read(make([]byte, 1500))
assert.NoError(t, err)
require.Equal(t, track.Codec().MimeType, MimeTypePCMU)
close(trackFired)
})
Expand Down
4 changes: 4 additions & 0 deletions track_local_static_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,8 @@ func Test_TrackLocalStatic_PayloadType(t *testing.T) {

onTrackFired, onTrackFiredFunc := context.WithCancel(context.Background())
offerer.OnTrack(func(track *TrackRemote, _ *RTPReceiver) {
_, _, err = track.Read(make([]byte, 1500))
assert.NoError(t, err)
assert.Equal(t, track.PayloadType(), PayloadType(100))
assert.Equal(t, track.Codec().RTPCodecCapability.MimeType, "video/VP8")

Expand Down Expand Up @@ -284,6 +286,8 @@ func Test_TrackLocalStatic_Padding(t *testing.T) {
onTrackFired, onTrackFiredFunc := context.WithCancel(context.Background())

offerer.OnTrack(func(track *TrackRemote, _ *RTPReceiver) {
_, _, err = track.Read(make([]byte, 1500))
assert.NoError(t, err)
assert.Equal(t, track.PayloadType(), PayloadType(100))
assert.Equal(t, track.Codec().RTPCodecCapability.MimeType, "video/VP8")

Expand Down

0 comments on commit 4e3bbe5

Please sign in to comment.