Skip to content
This repository has been archived by the owner on Mar 16, 2022. It is now read-only.

Commit

Permalink
fix: srtpWriteFuture would block in read if read after close
Browse files Browse the repository at this point in the history
  • Loading branch information
cnderrauber committed May 21, 2021
1 parent 12072f0 commit 74cad3c
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 1 deletion.
6 changes: 5 additions & 1 deletion peerconnection.go
Original file line number Diff line number Diff line change
Expand Up @@ -1515,11 +1515,15 @@ func (pc *PeerConnection) undeclaredMediaProcessor() {
return
}

_, ssrc, err := srtcpSession.AcceptStream()
s, ssrc, err := srtcpSession.AcceptStream()
if err != nil {
pc.log.Warnf("Failed to accept RTCP %v", err)
return
}
time.AfterFunc(5*time.Minute, func() {
s.Close()
})

pc.log.Warnf("Incoming unhandled RTCP ssrc(%d), OnTrack will not be fired", ssrc)
}
}()
Expand Down
2 changes: 2 additions & 0 deletions rtpreceiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -335,9 +335,11 @@ func (r *RTPReceiver) Stop() error {
}
if fec := r.tracks[i].fecTrack; fec != nil {
errs = append(errs, closefunc(fec)...)
r.api.interceptor.UnbindRemoteStream(&fec.streamInfo)
}
if rtx := r.tracks[i].rtxTrack; rtx != nil {
errs = append(errs, closefunc(rtx)...)
r.api.interceptor.UnbindRemoteStream(&rtx.streamInfo)
}

err = util.FlattenErrs(errs)
Expand Down
16 changes: 16 additions & 0 deletions srtp_writer_future.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ package webrtc

import (
"io"
"sync"
"sync/atomic"
"time"

Expand All @@ -17,6 +18,8 @@ type srtpWriterFuture struct {
rtpSender *RTPSender
rtcpReadStream atomic.Value // *srtp.ReadStreamSRTCP
rtpWriteStream atomic.Value // *srtp.WriteStreamSRTP
mu sync.Mutex
closed bool
}

func (s *srtpWriterFuture) init(returnWhenNoSRTP bool) error {
Expand All @@ -36,6 +39,12 @@ func (s *srtpWriterFuture) init(returnWhenNoSRTP bool) error {
}
}

s.mu.Lock()
defer s.mu.Unlock()
if s.closed {
return io.ErrClosedPipe
}

srtcpSession, err := s.rtpSender.transport.getSRTCPSession()
if err != nil {
return err
Expand All @@ -62,6 +71,13 @@ func (s *srtpWriterFuture) init(returnWhenNoSRTP bool) error {
}

func (s *srtpWriterFuture) Close() error {
s.mu.Lock()
defer s.mu.Unlock()
if s.closed {
return nil
}

s.closed = true
if value := s.rtcpReadStream.Load(); value != nil {
return value.(*srtp.ReadStreamSRTCP).Close()
}
Expand Down

0 comments on commit 74cad3c

Please sign in to comment.