From 4c5b27215524d71490412081834ca53645184ec5 Mon Sep 17 00:00:00 2001 From: lucian Date: Mon, 4 Sep 2023 00:48:27 +0900 Subject: [PATCH] Add readRTX function for TrackRemote Add readRTX function for TrackRemote Add example/read-rtx --- errors.go | 9 +- examples/read-rtx/README.md | 77 ++++++++ examples/read-rtx/jsfiddle/demo.css | 8 + examples/read-rtx/jsfiddle/demo.details | 8 + examples/read-rtx/jsfiddle/demo.html | 23 +++ examples/read-rtx/jsfiddle/demo.js | 58 ++++++ examples/read-rtx/main.go | 248 ++++++++++++++++++++++++ rtpreceiver.go | 27 ++- track_remote.go | 43 +++- 9 files changed, 493 insertions(+), 8 deletions(-) create mode 100644 examples/read-rtx/README.md create mode 100644 examples/read-rtx/jsfiddle/demo.css create mode 100644 examples/read-rtx/jsfiddle/demo.details create mode 100644 examples/read-rtx/jsfiddle/demo.html create mode 100644 examples/read-rtx/jsfiddle/demo.js create mode 100644 examples/read-rtx/main.go diff --git a/errors.go b/errors.go index 009e91dd681..9bd08b41c28 100644 --- a/errors.go +++ b/errors.go @@ -203,10 +203,11 @@ var ( errPeerConnWriteRTCPOpenWriteStream = errors.New("WriteRTCP failed to open WriteStream") errPeerConnTranscieverMidNil = errors.New("cannot find transceiver with mid") - errRTPReceiverDTLSTransportNil = errors.New("DTLSTransport must not be nil") - errRTPReceiverReceiveAlreadyCalled = errors.New("Receive has already been called") - errRTPReceiverWithSSRCTrackStreamNotFound = errors.New("unable to find stream for Track with SSRC") - errRTPReceiverForRIDTrackStreamNotFound = errors.New("no trackStreams found for RID") + errRTPReceiverDTLSTransportNil = errors.New("DTLSTransport must not be nil") + errRTPReceiverReceiveAlreadyCalled = errors.New("Receive has already been called") + errRTPReceiverWithSSRCTrackStreamNotFound = errors.New("unable to find stream for Track with SSRC") + errRTPReceiverWithRtxSSRCTrackStreamNotFound = errors.New("unable to find stream for Track with RtxSSRC") + errRTPReceiverForRIDTrackStreamNotFound = errors.New("no trackStreams found for RID") errRTPSenderTrackNil = errors.New("Track must not be nil") errRTPSenderDTLSTransportNil = errors.New("DTLSTransport must not be nil") diff --git a/examples/read-rtx/README.md b/examples/read-rtx/README.md new file mode 100644 index 00000000000..d86e3ed5a48 --- /dev/null +++ b/examples/read-rtx/README.md @@ -0,0 +1,77 @@ +# read-rtx +read-rtx is a simple application that shows how to record your webcam/microphone using Pion WebRTC and read packets from streams of rtp and rtx + +## Instructions +### Download read-rtx +``` +export GO111MODULE=on +go get github.com/pion/webrtc/v3/examples/read-rtx +``` + +### Open read-rtx example page +[jsfiddle.net](https://jsfiddle.net/s179hacu/) you should see your Webcam, two text-areas and two buttons: `Copy browser SDP to clipboard`, `Start Session`. + +### Run read-rtx, with your browsers SessionDescription as stdin +In the jsfiddle the top textarea is your browser's Session Description. Press `Copy browser SDP to clipboard` or copy the base64 string manually. +We will use this value in the next step. + +#### Linux/macOS +Run `echo $BROWSER_SDP | read-rtx` + +or + +Run `cd examples/read-rtx` and then +`echo $BROWSER_SDP | go run .` + +#### Windows +1. Paste the SessionDescription into a file. +1. Run `read-rtx < my_file` + +### Input read-rtx's SessionDescription into your browser +Copy the text that `read-rtx` just emitted and copy into second text area + +### Hit 'Start Session' +You will see output like below: +```bash +Connection State has changed connected +Ctrl+C the remote client to stop the demo +Got Audio track hasRTX: false +Got Video track hasRTX: true +Got RTX padding packets. rtx sn: 24254 +Got RTX padding packets. rtx sn: 24255 +Send Nack sequence:17721 +Got RTX Packet. osn: 17721 , rtx sn: 24256 +Send Nack sequence:17791 +Got RTX Packet. osn: 17791 , rtx sn: 24257 +Send Nack sequence:17857 +Got RTX Packet. osn: 17857 , rtx sn: 24258 +Send Nack sequence:17929 +Got RTX Packet. osn: 17929 , rtx sn: 24259 +Send Nack sequence:17999 +Got RTX Packet. osn: 17999 , rtx sn: 24260 +Send Nack sequence:18063 +Got RTX Packet. osn: 18063 , rtx sn: 24261 +Send Nack sequence:18123 +Got RTX Packet. osn: 18123 , rtx sn: 24262 +Got RTX padding packets. rtx sn: 24263 +Got RTX Packet. osn: 18185 , rtx sn: 24264 +Got RTX padding packets. rtx sn: 24265 +Got RTX Packet. osn: 18186 , rtx sn: 24266 +Got RTX padding packets. rtx sn: 24267 +Got RTX Packet. osn: 18184 , rtx sn: 24268 +Got RTX Packet. osn: 18183 , rtx sn: 24269 +Got RTX Packet. osn: 18182 , rtx sn: 24270 +Got RTX Packet. osn: 18181 , rtx sn: 24271 +Got RTX Packet. osn: 18180 , rtx sn: 24272 +Got RTX Packet. osn: 18179 , rtx sn: 24273 +Got RTX Packet. osn: 18178 , rtx sn: 24274 +Send Nack sequence:18190 +Got RTX Packet. osn: 18190 , rtx sn: 24275 +Send Nack sequence:18303 +Got RTX Packet. osn: 18303 , rtx sn: 24276 +Send Nack sequence:18434 +Got RTX Packet. osn: 18434 , rtx sn: 24277 +Send Nack sequence:18608 +Got RTX Packet. osn: 18608 , rtx sn: 24278 + +``` \ No newline at end of file diff --git a/examples/read-rtx/jsfiddle/demo.css b/examples/read-rtx/jsfiddle/demo.css new file mode 100644 index 00000000000..78566e91f58 --- /dev/null +++ b/examples/read-rtx/jsfiddle/demo.css @@ -0,0 +1,8 @@ +/* + SPDX-FileCopyrightText: 2023 The Pion community + SPDX-License-Identifier: MIT +*/ +textarea { + width: 500px; + min-height: 75px; +} \ No newline at end of file diff --git a/examples/read-rtx/jsfiddle/demo.details b/examples/read-rtx/jsfiddle/demo.details new file mode 100644 index 00000000000..2cd0ed76be4 --- /dev/null +++ b/examples/read-rtx/jsfiddle/demo.details @@ -0,0 +1,8 @@ +--- +# SPDX-FileCopyrightText: 2023 The Pion community +# SPDX-License-Identifier: MIT + +name: read-rtx +description: Example of using Pion WebRTC to read rtx packet +authors: + - San9H0 diff --git a/examples/read-rtx/jsfiddle/demo.html b/examples/read-rtx/jsfiddle/demo.html new file mode 100644 index 00000000000..0b5de198fc5 --- /dev/null +++ b/examples/read-rtx/jsfiddle/demo.html @@ -0,0 +1,23 @@ + +Browser base64 Session Description
+
+ +
+
+ +Golang base64 Session Description
+
+
+ +
+ +Video
+
+ +Logs
+
diff --git a/examples/read-rtx/jsfiddle/demo.js b/examples/read-rtx/jsfiddle/demo.js new file mode 100644 index 00000000000..c4568d0cd26 --- /dev/null +++ b/examples/read-rtx/jsfiddle/demo.js @@ -0,0 +1,58 @@ +/* eslint-env browser */ + +// SPDX-FileCopyrightText: 2023 The Pion community +// SPDX-License-Identifier: MIT + +const pc = new RTCPeerConnection({ + iceServers: [ + { + urls: 'stun:stun.l.google.com:19302' + } + ] +}) +const log = msg => { + document.getElementById('logs').innerHTML += msg + '
' +} + +navigator.mediaDevices.getUserMedia({ video: true, audio: true }) + .then(stream => { + document.getElementById('video1').srcObject = stream + stream.getTracks().forEach(track => pc.addTrack(track, stream)) + + pc.createOffer().then(d => pc.setLocalDescription(d)).catch(log) + }).catch(log) + +pc.oniceconnectionstatechange = e => log(pc.iceConnectionState) +pc.onicecandidate = event => { + if (event.candidate === null) { + document.getElementById('localSessionDescription').value = btoa(JSON.stringify(pc.localDescription)) + } +} + +window.startSession = () => { + const sd = document.getElementById('remoteSessionDescription').value + if (sd === '') { + return alert('Session Description must not be empty') + } + + try { + pc.setRemoteDescription(JSON.parse(atob(sd))) + } catch (e) { + alert(e) + } +} + +window.copySDP = () => { + const browserSDP = document.getElementById('localSessionDescription') + + browserSDP.focus() + browserSDP.select() + + try { + const successful = document.execCommand('copy') + const msg = successful ? 'successful' : 'unsuccessful' + log('Copying SDP was ' + msg) + } catch (err) { + log('Unable to copy SDP ' + err) + } +} diff --git a/examples/read-rtx/main.go b/examples/read-rtx/main.go new file mode 100644 index 00000000000..dc4f4b31fd9 --- /dev/null +++ b/examples/read-rtx/main.go @@ -0,0 +1,248 @@ +// SPDX-FileCopyrightText: 2023 The Pion community +// SPDX-License-Identifier: MIT + +//go:build !js +// +build !js + +// read-rtx is a simple application that shows how to record your webcam/microphone using Pion WebRTC and read rtx +package main + +import ( + "fmt" + "github.com/pion/rtcp" + "io" + "os" + "strings" + "time" + + "github.com/pion/interceptor" + "github.com/pion/interceptor/pkg/intervalpli" + "github.com/pion/webrtc/v3" + "github.com/pion/webrtc/v3/examples/internal/signal" +) + +const ( + nackInterval = time.Second + lostPacket = 0 +) + +// readRTPAndSendNack read rtp and send rtcp nack sometimes +func readRTPAndSendNack(pc *webrtc.PeerConnection, track *webrtc.TrackRemote) { + nackTime := time.Now() + for { + rtpPacket, _, err := track.ReadRTP() + if err != nil { + if err == io.EOF { + panic(err) + } + continue + } + if track.Kind() == webrtc.RTPCodecTypeVideo { + // Assume that packet is lost and nack is sent + if checkNackInterval(&nackTime) { + nack := makeNack(rtpPacket.SSRC, rtpPacket.SequenceNumber) + if err := pc.WriteRTCP(nack); err != nil { + panic(err) + } + fmt.Printf("Send Nack sequence:%d\n", rtpPacket.SequenceNumber) + continue + } + } + + // you should use jitter + } +} + +func readRTX(track *webrtc.TrackRemote) { + if !track.HasRTX() { + return + } + for { + osn, rtxPacket, _, err := track.ReadRTX() + if err != nil { + if err == io.EOF { + return + } + continue + } + + // some stats if you want + + if len(rtxPacket.Payload) == 0 { + // padding probes + fmt.Println("Got RTX padding packets. rtx sn:", rtxPacket.SequenceNumber) + continue + } + + fmt.Println("Got RTX Packet. osn:", osn, ", rtx sn:", rtxPacket.SequenceNumber) + + // you should use jitter + } +} + +func checkNackInterval(lastNackTime *time.Time) bool { + if time.Since(*lastNackTime) <= nackInterval { + return false + } + + *lastNackTime = time.Now() + return true +} + +func makeNack(ssrc uint32, sequenceNumber uint16) []rtcp.Packet { + return []rtcp.Packet{&rtcp.TransportLayerNack{ + MediaSSRC: ssrc, + Nacks: []rtcp.NackPair{{ + PacketID: sequenceNumber, + LostPackets: rtcp.PacketBitmap(lostPacket), + }}, + }} +} + +// nolint:gocognit +func main() { + // Everything below is the Pion WebRTC API! Thanks for using it ❤️. + + // Create a MediaEngine object to configure the supported codec + m := &webrtc.MediaEngine{} + + // Setup the codecs you want to use. + // We'll use a VP8, video/rtx and Opus but you can also define your own + if err := m.RegisterCodec(webrtc.RTPCodecParameters{ + RTPCodecCapability: webrtc.RTPCodecCapability{MimeType: webrtc.MimeTypeVP8, ClockRate: 90000, Channels: 0, SDPFmtpLine: "", RTCPFeedback: nil}, + PayloadType: 96, + }, webrtc.RTPCodecTypeVideo); err != nil { + panic(err) + } + if err := m.RegisterCodec(webrtc.RTPCodecParameters{ + RTPCodecCapability: webrtc.RTPCodecCapability{MimeType: "video/rtx", ClockRate: 90000, Channels: 0, SDPFmtpLine: "apt=96", RTCPFeedback: nil}, + PayloadType: 97, + }, webrtc.RTPCodecTypeVideo); err != nil { + panic(err) + } + if err := m.RegisterCodec(webrtc.RTPCodecParameters{ + RTPCodecCapability: webrtc.RTPCodecCapability{MimeType: webrtc.MimeTypeOpus, ClockRate: 48000, Channels: 0, SDPFmtpLine: "", RTCPFeedback: nil}, + PayloadType: 111, + }, webrtc.RTPCodecTypeAudio); err != nil { + panic(err) + } + + // Create a InterceptorRegistry. This is the user configurable RTP/RTCP Pipeline. + // This provides NACKs, RTCP Reports and other features. If you use `webrtc.NewPeerConnection` + // this is enabled by default. If you are manually managing You MUST create a InterceptorRegistry + // for each PeerConnection. + i := &interceptor.Registry{} + + // Register a intervalpli factory + // This interceptor sends a PLI every 3 seconds. A PLI causes a video keyframe to be generated by the sender. + // This makes our video seekable and more error resilent, but at a cost of lower picture quality and higher bitrates + // A real world application should process incoming RTCP packets from viewers and forward them to senders + intervalPliFactory, err := intervalpli.NewReceiverInterceptor() + if err != nil { + panic(err) + } + i.Add(intervalPliFactory) + + // Use the default set of Interceptors + if err = webrtc.RegisterDefaultInterceptors(m, i); err != nil { + panic(err) + } + + // Use Disable SRTP Replay Protection. this is option For only RTX Test. It is usually false + settingEngine := webrtc.SettingEngine{} + settingEngine.DisableSRTPReplayProtection(true) + // Create the API object with the MediaEngine + api := webrtc.NewAPI(webrtc.WithMediaEngine(m), webrtc.WithInterceptorRegistry(i), webrtc.WithSettingEngine(settingEngine)) + + // Prepare the configuration + config := webrtc.Configuration{ + ICEServers: []webrtc.ICEServer{ + { + URLs: []string{"stun:stun.l.google.com:19302"}, + }, + }, + } + + // Create a new RTCPeerConnection + peerConnection, err := api.NewPeerConnection(config) + if err != nil { + panic(err) + } + + // Allow us to receive 1 audio track, and 1 video track + if _, err = peerConnection.AddTransceiverFromKind(webrtc.RTPCodecTypeAudio); err != nil { + panic(err) + } else if _, err = peerConnection.AddTransceiverFromKind(webrtc.RTPCodecTypeVideo); err != nil { + panic(err) + } + + // Set a handler for when a new remote track starts + // this handler call readRTPAndSendNack and readRTX functions. + // readRTPAndSendNack read packets from rtp stream and send rtcp nack sometimes + // readRTX read packets from rtx stream and print rtx information + peerConnection.OnTrack(func(track *webrtc.TrackRemote, receiver *webrtc.RTPReceiver) { + codec := track.Codec() + if strings.EqualFold(codec.MimeType, webrtc.MimeTypeOpus) { + fmt.Println("Got Audio track hasRTX:", track.HasRTX()) + } else if strings.EqualFold(codec.MimeType, webrtc.MimeTypeVP8) { + fmt.Println("Got Video track hasRTX:", track.HasRTX()) + } + go readRTPAndSendNack(peerConnection, track) + go readRTX(track) + }) + + // Set the handler for ICE connection state + // This will notify you when the peer has connected/disconnected + peerConnection.OnICEConnectionStateChange(func(connectionState webrtc.ICEConnectionState) { + fmt.Printf("Connection State has changed %s \n", connectionState.String()) + + if connectionState == webrtc.ICEConnectionStateConnected { + fmt.Println("Ctrl+C the remote client to stop the demo") + } else if connectionState == webrtc.ICEConnectionStateFailed { + fmt.Println("End demo") + + // Gracefully shutdown the peer connection + if closeErr := peerConnection.Close(); closeErr != nil { + panic(closeErr) + } + + os.Exit(0) + } + }) + + // Wait for the offer to be pasted + offer := webrtc.SessionDescription{} + signal.Decode(signal.MustReadStdin(), &offer) + + // Set the remote SessionDescription + err = peerConnection.SetRemoteDescription(offer) + if err != nil { + panic(err) + } + + // Create answer + answer, err := peerConnection.CreateAnswer(nil) + if err != nil { + panic(err) + } + + // Create channel that is blocked until ICE Gathering is complete + gatherComplete := webrtc.GatheringCompletePromise(peerConnection) + + // Sets the LocalDescription, and starts our UDP listeners + err = peerConnection.SetLocalDescription(answer) + if err != nil { + panic(err) + } + + // Block until ICE Gathering is complete, disabling trickle ICE + // we do this because we only can exchange one signaling message + // in a production application you should exchange ICE Candidates via OnICECandidate + <-gatherComplete + + // Output the answer in base64 so we can paste it in browser + fmt.Println(signal.Encode(*peerConnection.LocalDescription())) + + // Block forever + select {} +} diff --git a/rtpreceiver.go b/rtpreceiver.go index 656fcc56411..a84e419a919 100644 --- a/rtpreceiver.go +++ b/rtpreceiver.go @@ -15,6 +15,7 @@ import ( "github.com/pion/interceptor" "github.com/pion/rtcp" "github.com/pion/srtp/v3" + "github.com/pion/transport/v3/packetio" "github.com/pion/webrtc/v3/internal/util" ) @@ -31,8 +32,9 @@ type trackStreams struct { rtcpReadStream *srtp.ReadStreamSRTCP rtcpInterceptor interceptor.RTCPReader - repairReadStream *srtp.ReadStreamSRTP - repairInterceptor interceptor.RTPReader + repairReadStream *srtp.ReadStreamSRTP + repairInterceptor interceptor.RTPReader + repairStreamBuffer io.ReadWriteCloser repairRtcpReadStream *srtp.ReadStreamSRTCP repairRtcpInterceptor interceptor.RTCPReader @@ -145,6 +147,7 @@ func (r *RTPReceiver) configureReceive(parameters RTPReceiveParameters) { track: newTrackRemote( r.kind, parameters.Encodings[i].SSRC, + parameters.Encodings[i].RTX.SSRC, parameters.Encodings[i].RID, r, ), @@ -403,12 +406,19 @@ func (r *RTPReceiver) receiveForRtx(ssrc SSRC, rsid string, streamInfo *intercep track.repairRtcpReadStream = rtcpReadStream track.repairRtcpInterceptor = rtcpInterceptor + repairBuffer := packetio.NewBuffer() + repairBuffer.SetLimitSize(1000 * 1000) + track.repairStreamBuffer = repairBuffer + go func() { + defer track.repairStreamBuffer.Close() b := make([]byte, r.api.settingEngine.getReceiveMTU()) for { - if _, _, readErr := track.repairInterceptor.Read(b, nil); readErr != nil { + i, _, readErr := track.repairInterceptor.Read(b, nil) + if readErr != nil { return } + _, _ = track.repairStreamBuffer.Write(b[:i]) } }() return nil @@ -446,3 +456,14 @@ func (r *RTPReceiver) setRTPReadDeadline(deadline time.Time, reader *TrackRemote } return fmt.Errorf("%w: %d", errRTPReceiverWithSSRCTrackStreamNotFound, reader.SSRC()) } + +func (r *RTPReceiver) readRTX(b []byte, reader *TrackRemote) (int, interceptor.Attributes, error) { + <-r.received + if t := r.streamsForTrack(reader); t != nil { + if t.repairStreamBuffer != nil { + n, err := t.repairStreamBuffer.Read(b) + return n, nil, err + } + } + return 0, nil, fmt.Errorf("%w: %d", errRTPReceiverWithRtxSSRCTrackStreamNotFound, reader.RtxSSRC()) +} diff --git a/track_remote.go b/track_remote.go index 150b91bc9e1..6346b6545a5 100644 --- a/track_remote.go +++ b/track_remote.go @@ -7,6 +7,7 @@ package webrtc import ( + "encoding/binary" "sync" "time" @@ -24,6 +25,7 @@ type TrackRemote struct { payloadType PayloadType kind RTPCodecType ssrc SSRC + rtxSsrc SSRC codec RTPCodecParameters params RTPParameters rid string @@ -33,10 +35,11 @@ type TrackRemote struct { peekedAttributes interceptor.Attributes } -func newTrackRemote(kind RTPCodecType, ssrc SSRC, rid string, receiver *RTPReceiver) *TrackRemote { +func newTrackRemote(kind RTPCodecType, ssrc, rtxSsrc SSRC, rid string, receiver *RTPReceiver) *TrackRemote { return &TrackRemote{ kind: kind, ssrc: ssrc, + rtxSsrc: rtxSsrc, rid: rid, receiver: receiver, } @@ -197,3 +200,41 @@ func (t *TrackRemote) peek(b []byte) (n int, a interceptor.Attributes, err error func (t *TrackRemote) SetReadDeadline(deadline time.Time) error { return t.receiver.setRTPReadDeadline(deadline, t) } + +// RtxSSRC returns the RTX SSRC for a track, or 0 if track does not have a separate RTX stream +func (t *TrackRemote) RtxSSRC() SSRC { + t.mu.RLock() + defer t.mu.RUnlock() + return t.rtxSsrc +} + +// HasRTX returns true if the track has a separate RTX stream +func (t *TrackRemote) HasRTX() bool { + t.mu.RLock() + defer t.mu.RUnlock() + return t.rtxSsrc != 0 +} + +// ReadRTX returns a packet from a track's RTX stream, along with the original sequence number (OSN). +// RTX probe packets (with zero RTP payload size) are not retransmissions from the original stream and are returned with 0 as the OSN. +func (t *TrackRemote) ReadRTX() (uint16, *rtp.Packet, interceptor.Attributes, error) { + b := make([]byte, t.receiver.api.settingEngine.getReceiveMTU()) + i, _, err := t.receiver.readRTX(b, t) + if err != nil { + return 0, nil, nil, err + } + + b = b[:i] + r := &rtp.Packet{} + if err := r.Unmarshal(b); err != nil { + return 0, nil, nil, err + } + + originalSequenceNumber := uint16(0) + if len(r.Payload) >= 2 { + originalSequenceNumber = binary.BigEndian.Uint16(r.Payload[:2]) + r.Payload = r.Payload[2:] + } + + return originalSequenceNumber, r, nil, nil +}