Skip to content

Commit

Permalink
- [refactor] 转换mpegts使用remux.Rtmp2MpegtsRemuxer不再依赖hls.Muxer - [opt] …
Browse files Browse the repository at this point in the history
…lalserver: relay push增加超时检查,增加带宽统计 - [refactor] 所有interface类型以i字母开头
  • Loading branch information
q191201771 committed Mar 27, 2022
1 parent c2fa468 commit 8824038
Show file tree
Hide file tree
Showing 23 changed files with 157 additions and 154 deletions.
4 changes: 3 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
# LAL

[![Platform](https://img.shields.io/badge/platform-linux%20%7C%20macos%20%7C%20windows-green.svg)](https://github.com/q191201771/lal)
[![Release](https://img.shields.io/github/tag/q191201771/lal.svg?label=release)](https://github.com/q191201771/lal/releases)
[![CI](https://github.com/q191201771/lal/actions/workflows/ci.yml/badge.svg?branch=master)](https://github.com/q191201771/lal/actions/workflows/ci.yml)
[![codecov](https://codecov.io/gh/q191201771/lal/branch/master/graph/badge.svg?token=YxPgzXAAmV)](https://codecov.io/gh/q191201771/lal)
[![goreportcard](https://goreportcard.com/badge/github.com/q191201771/lal)](https://goreportcard.com/report/github.com/q191201771/lal)
![wechat](https://img.shields.io/:微信-q191201771-blue.svg)
![qqgroup](https://img.shields.io/:QQ群-1090510973-blue.svg)
Expand Down Expand Up @@ -86,9 +86,11 @@ Play multi protocol stream from lalserver via ffplay:

```shell
$ffplay rtmp://127.0.0.1/live/test110
$ffplay rtsp://127.0.0.1:5544/live/test110
$ffplay http://127.0.0.1:8080/live/test110.flv
$ffplay http://127.0.0.1:8080/hls/test110/playlist.m3u8
$ffplay http://127.0.0.1:8080/hls/test110/record.m3u8
$ffplay http://127.0.0.1:8080/hls/test110.m3u8
$ffplay http://127.0.0.1:8080/live/test110.ts
```

Expand Down
2 changes: 1 addition & 1 deletion app/demo/pullrtmp2hls/pullrtmp2hls.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func main() {
}
streamName := ctx.LastItemOfPath

hlsMuexer := hls.NewMuxer(streamName, true, &hlsMuxerConfig, nil)
hlsMuexer := hls.NewMuxer(streamName, &hlsMuxerConfig, nil)
hlsMuexer.Start()

rtmp2Mpegts := remux.NewRtmp2MpegtsRemuxer(hlsMuexer)
Expand Down
29 changes: 29 additions & 0 deletions pkg/base/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,36 @@ import (
// 情况1: 协议正常走完回调OnAdd,在自身server的RunLoop结束后,回调OnDel
// 情况2: 在group中pull阻塞结束后,手动回调OnDel
// 情况3: 在logic中sub RunLoop结束后,手动回调OnDel

// TODO(chef): 整理所有Server类型Session的生命周期管理
// -
// - rtmp没有独立的Pub、Sub Session结构体类型,而是直接使用ServerSession
// - write失败,需要反应到loop来
// - rtsp是否也应该上层使用Command作为代理,避免生命周期管理混乱
//
// server.pub: rtmp(), rtsp
// server.sub: rtmp(), rtsp, flv, ts, 还有一个比较特殊的hls
//
// client.push: rtmp, rtsp
// client.pull: rtmp, rtsp, flv
//
// other: rtmp.ClientSession, rtmp.ServerSession
// rtsp.BaseInSession, rtsp.BaseOutSession, rtsp.ClientCommandSession, rtsp.ServerCommandSessionS
// base.HttpSubSession

// ISessionUrlContext 实际测试
//
// | | 实际url | Url() | AppName, StreamName, RawQuery |
// | - | - | - | - |
// | rtmp pub推流 | rtmp://127.0.0.1:1935/live/test110 | 同实际url | live, test110, |
// | | rtmp://127.0.0.1:1935/a/b/c/d/test110?p1=1&p2=2 | 同实际url | a/b, c/d/test110, p1=1&p2=2 |
// | rtsp pub推流 | rtsp://localhost:5544/live/test110 | 同实际url | live, test110, |
// | rtsp pub推流 | rtsp://localhost:5544/a/b/c/d/test110?p1=1&p2=2 | 同实际url | a/b/c/d, test110, p1=1&p2=2 |
// | httpflv sub拉流 | http://127.0.0.1:8080/live/test110.flv | 同实际url | live, test110, |
// | | http://127.0.0.1:8080/a/b/c/d/test110.flv?p1=1&p2=2 | 同实际url | a/b/c/d, test110, p1=1&p2=2 |
// | rtmp sub拉流 | 同rtmp pub | . | . |
// | rtsp sub拉流 | 同rtsp pub | . | . |
// | httpts sub拉流 | 同httpflv sub,只是末尾的.flv换成.ts,不再赘述 | . | . |

// IsUseClosedConnectionError 当connection处于这些情况时,就不需要再Close了
// TODO(chef): 临时放这
Expand Down
49 changes: 19 additions & 30 deletions pkg/hls/muxer.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,11 @@ import (
"github.com/q191201771/lal/pkg/base"
)

type MuxerObserver interface {
type IMuxerObserver interface {
// OnFragmentOpen
//
// 内部决定开启新的fragment切片,将该事件通知给上层
//
OnFragmentOpen()
}

Expand All @@ -43,7 +47,7 @@ const (

// Muxer
//
// 输入mpegts流,转出hls(m3u8+ts)至文件中
// 输入mpegts流,输出hls(m3u8+ts)至文件中
//
type Muxer struct {
UniqueKey string
Expand All @@ -56,8 +60,7 @@ type Muxer struct {
recordPlayListFilenameBak string // const after init

config *MuxerConfig
enable bool
observer MuxerObserver
observer IMuxerObserver

fragment Fragment
videoCc uint8
Expand Down Expand Up @@ -88,10 +91,9 @@ type fragmentInfo struct {

// NewMuxer
//
// @param enable 如果false,说明hls功能没开,也即不写文件,但是MuxerObserver依然会回调
// @param observer 可以为nil,如果不为nil,TS流将回调给上层
//
func NewMuxer(streamName string, enable bool, config *MuxerConfig, observer MuxerObserver) *Muxer {
func NewMuxer(streamName string, config *MuxerConfig, observer IMuxerObserver) *Muxer {
uk := base.GenUkHlsMuxer()
op := PathStrategy.GetMuxerOutPath(config.OutPath, streamName)
playlistFilename := PathStrategy.GetLiveM3u8FileName(op, streamName)
Expand All @@ -106,7 +108,6 @@ func NewMuxer(streamName string, enable bool, config *MuxerConfig, observer Muxe
playlistFilenameBak: playlistFilenameBak,
recordPlayListFilename: recordPlaylistFilename,
recordPlayListFilenameBak: recordPlaylistFilenameBak,
enable: enable,
config: config,
observer: observer,
}
Expand All @@ -131,7 +132,7 @@ func (m *Muxer) Dispose() {

// OnPatPmt OnTsPackets
//
// 实现 remux.Rtmp2MpegtsRemuxerObserver,方便直接将 remux.Rtmp2MpegtsRemuxer 的数据喂入 hls.Muxer
// 实现 remux.IRtmp2MpegtsRemuxerObserver,方便直接将 remux.Rtmp2MpegtsRemuxer 的数据喂入 hls.Muxer
//
func (m *Muxer) OnPatPmt(b []byte) {
m.FeedPatPmt(b)
Expand Down Expand Up @@ -273,14 +274,15 @@ func (m *Muxer) openFragment(ts uint64, discont bool) error {

filename := PathStrategy.GetTsFileName(m.streamName, id, int(Clock.Now().UnixNano()/1e6))
filenameWithPath := PathStrategy.GetTsFileNameWithPath(m.outPath, filename)
if m.enable {
if err := m.fragment.OpenFile(filenameWithPath); err != nil {
return err
}
if err := m.fragment.WriteFile(m.patpmt); err != nil {
return err
}

if err := m.fragment.OpenFile(filenameWithPath); err != nil {
return err
}

if err := m.fragment.WriteFile(m.patpmt); err != nil {
return err
}

m.opened = true

frag := m.getCurrFrag()
Expand All @@ -307,10 +309,8 @@ func (m *Muxer) closeFragment(isLast bool) error {
return nil
}

if m.enable {
if err := m.fragment.CloseFile(); err != nil {
return err
}
if err := m.fragment.CloseFile(); err != nil {
return err
}

m.opened = false
Expand Down Expand Up @@ -339,10 +339,6 @@ func (m *Muxer) closeFragment(isLast bool) error {
}

func (m *Muxer) writeRecordPlaylist() {
if !m.enable {
return
}

// 找出整个直播流从开始到结束最大的分片时长
currFrag := m.getClosedFrag()
if currFrag.duration > m.recordMaxFragDuration {
Expand Down Expand Up @@ -392,10 +388,6 @@ func (m *Muxer) writeRecordPlaylist() {
}

func (m *Muxer) writePlaylist(isLast bool) {
if !m.enable {
return
}

// 找出时长最长的fragment
maxFrag := float64(m.config.FragmentDurationMs) / 1000
m.iterateFragsInPlaylist(func(frag *fragmentInfo) {
Expand Down Expand Up @@ -430,9 +422,6 @@ func (m *Muxer) writePlaylist(isLast bool) {
}

func (m *Muxer) ensureDir() {
if !m.enable {
return
}
//err := fslCtx.RemoveAll(m.outPath)
//Log.Assert(nil, err)
err := fslCtx.MkdirAll(m.outPath, 0777)
Expand Down
72 changes: 21 additions & 51 deletions pkg/innertest/iface_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,39 +19,9 @@ import (
"github.com/q191201771/lal/pkg/rtsp"
)

// TODO(chef): refactor 有的interface以I开头,有的不是

// TODO(chef): 整理所有Server类型Session的生命周期管理
// -
// - rtmp没有独立的Pub、Sub Session结构体类型,而是直接使用ServerSession
// - write失败,需要反应到loop来
// - rtsp是否也应该上层使用Command作为代理,避免生命周期管理混乱
//
// server.pub: rtmp(), rtsp
// server.sub: rtmp(), rtsp, flv, ts, 还有一个比较特殊的hls
//
// client.push: rtmp, rtsp
// client.pull: rtmp, rtsp, flv
//
// other: rtmp.ClientSession, rtmp.ServerSession
// rtsp.BaseInSession, rtsp.BaseOutSession, rtsp.ClientCommandSession, rtsp.ServerCommandSessionS
// base.HttpSubSession

// ISessionUrlContext 实际测试
//
// | | 实际url | Url() | AppName, StreamName, RawQuery |
// | - | - | - | - |
// | rtmp pub推流 | rtmp://127.0.0.1:1935/live/test110 | 同实际url | live, test110, |
// | | rtmp://127.0.0.1:1935/a/b/c/d/test110?p1=1&p2=2 | 同实际url | a/b, c/d/test110, p1=1&p2=2 |
// | rtsp pub推流 | rtsp://localhost:5544/live/test110 | 同实际url | live, test110, |
// | rtsp pub推流 | rtsp://localhost:5544/a/b/c/d/test110?p1=1&p2=2 | 同实际url | a/b/c/d, test110, p1=1&p2=2 |
// | httpflv sub拉流 | http://127.0.0.1:8080/live/test110.flv | 同实际url | live, test110, |
// | | http://127.0.0.1:8080/a/b/c/d/test110.flv?p1=1&p2=2 | 同实际url | a/b/c/d, test110, p1=1&p2=2 |
// | rtmp sub拉流 | 同rtmp pub | . | . |
// | rtsp sub拉流 | 同rtsp pub | . | . |
// | httpts sub拉流 | 同httpflv sub,只是末尾的.flv换成.ts,不再赘述 | . | . |

//
// TODO(chef):
// 规范检查
// 1. 所有interface以I开头

// IClientSession: 所有Client Session都满足
var (
Expand Down Expand Up @@ -167,33 +137,33 @@ var (
// ---------------------------------------------------------------------------------------------------------------------

var _ logic.ILalServer = &logic.ServerManager{}
var _ rtmp.ServerObserver = &logic.ServerManager{}
var _ logic.HttpServerHandlerObserver = &logic.ServerManager{}
var _ rtsp.ServerObserver = &logic.ServerManager{}
var _ rtmp.IServerObserver = &logic.ServerManager{}
var _ logic.IHttpServerHandlerObserver = &logic.ServerManager{}
var _ rtsp.IServerObserver = &logic.ServerManager{}
var _ logic.IGroupCreator = &logic.ServerManager{}
var _ logic.GroupObserver = &logic.ServerManager{}
var _ logic.IGroupObserver = &logic.ServerManager{}

var _ logic.INotifyHandler = &logic.HttpNotify{}
var _ logic.IGroupManager = &logic.SimpleGroupManager{}
var _ logic.IGroupManager = &logic.ComplexGroupManager{}

var _ rtmp.PubSessionObserver = &logic.Group{} //
var _ rtsp.PullSessionObserver = &logic.Group{}
var _ rtsp.PullSessionObserver = &remux.AvPacket2RtmpRemuxer{}
var _ rtsp.PubSessionObserver = &logic.Group{}
var _ rtsp.PubSessionObserver = &remux.AvPacket2RtmpRemuxer{}
var _ hls.MuxerObserver = &logic.Group{}
var _ rtsp.BaseInSessionObserver = &logic.Group{} //
var _ rtsp.BaseInSessionObserver = &remux.AvPacket2RtmpRemuxer{}
var _ remux.Rtmp2MpegtsRemuxerObserver = &hls.Muxer{}

var _ rtmp.ServerSessionObserver = &rtmp.Server{}
var _ rtmp.IPubSessionObserver = &logic.Group{} //
var _ rtsp.IPullSessionObserver = &logic.Group{}
var _ rtsp.IPullSessionObserver = &remux.AvPacket2RtmpRemuxer{}
var _ rtsp.IPubSessionObserver = &logic.Group{}
var _ rtsp.IPubSessionObserver = &remux.AvPacket2RtmpRemuxer{}
var _ hls.IMuxerObserver = &logic.Group{}
var _ rtsp.IBaseInSessionObserver = &logic.Group{} //
var _ rtsp.IBaseInSessionObserver = &remux.AvPacket2RtmpRemuxer{}
var _ remux.IRtmp2MpegtsRemuxerObserver = &hls.Muxer{}

var _ rtmp.IServerSessionObserver = &rtmp.Server{}
var _ rtmp.IHandshakeClient = &rtmp.HandshakeClientSimple{}
var _ rtmp.IHandshakeClient = &rtmp.HandshakeClientComplex{}

var _ rtsp.ServerCommandSessionObserver = &rtsp.Server{}
var _ rtsp.ClientCommandSessionObserver = &rtsp.PushSession{}
var _ rtsp.ClientCommandSessionObserver = &rtsp.PullSession{}
var _ rtsp.IServerCommandSessionObserver = &rtsp.Server{}
var _ rtsp.IClientCommandSessionObserver = &rtsp.PushSession{}
var _ rtsp.IClientCommandSessionObserver = &rtsp.PullSession{}
var _ rtsp.IInterleavedPacketWriter = &rtsp.PushSession{}
var _ rtsp.IInterleavedPacketWriter = &rtsp.PullSession{}
var _ rtsp.IInterleavedPacketWriter = &rtsp.PubSession{}
Expand Down
24 changes: 18 additions & 6 deletions pkg/logic/group.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
"github.com/q191201771/lal/pkg/sdp"
)

type GroupObserver interface {
type IGroupObserver interface {
CleanupHlsIfNeeded(appName string, streamName string, path string)
}

Expand All @@ -33,7 +33,7 @@ type Group struct {
appName string // const after init
streamName string // const after init TODO chef: 和stat里的字段重复,可以删除掉
config *Config
observer GroupObserver
observer IGroupObserver

exitChan chan struct{}

Expand Down Expand Up @@ -79,7 +79,7 @@ type Group struct {
stat base.StatGroup
}

func NewGroup(appName string, streamName string, config *Config, observer GroupObserver) *Group {
func NewGroup(appName string, streamName string, config *Config, observer IGroupObserver) *Group {
uk := base.GenUkGroup()

g := &Group{
Expand Down Expand Up @@ -304,7 +304,6 @@ func (group *Group) HasOutSession() bool {

// disposeInactiveSessions 关闭不活跃的session
//
// TODO(chef): [fix] Push是否需要检查
// TODO chef: [refactor] 梳理和naza.Connection超时重复部分
//
func (group *Group) disposeInactiveSessions() {
Expand Down Expand Up @@ -350,12 +349,19 @@ func (group *Group) disposeInactiveSessions() {
session.Dispose()
}
}
for _, item := range group.url2PushProxy {
session := item.pushSession
if item.isPushing || session != nil {
if _, writeAlive := session.IsAlive(); !writeAlive {
Log.Warnf("[%s] session timeout. session=%s", group.UniqueKey, session.UniqueKey())
session.Dispose()
}
}
}
}

// updateAllSessionStat 更新所有session的状态
//
// TODO(chef): [fix] Push是否需要更新
//
func (group *Group) updateAllSessionStat() {
if group.rtmpPubSession != nil {
group.rtmpPubSession.UpdateStat(calcSessionStatIntervalSec)
Expand All @@ -378,6 +384,12 @@ func (group *Group) updateAllSessionStat() {
for session := range group.rtspSubSessionSet {
session.UpdateStat(calcSessionStatIntervalSec)
}
for _, item := range group.url2PushProxy {
session := item.pushSession
if item.isPushing || session != nil {
session.UpdateStat(calcSessionStatIntervalSec)
}
}
}

func (group *Group) hasPubSession() bool {
Expand Down
1 change: 1 addition & 0 deletions pkg/logic/group__record_flv.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ func (group *Group) stopRecordFlvIfNeeded() {
if !group.config.RecordConfig.EnableFlv {
return
}

if group.recordFlv != nil {
_ = group.recordFlv.Dispose()
group.recordFlv = nil
Expand Down
9 changes: 4 additions & 5 deletions pkg/logic/group__record_hls.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,19 @@ func (group *Group) IsHlsMuxerAlive() bool {
// startHlsIfNeeded 必要时启动hls
//
func (group *Group) startHlsIfNeeded() {
// TODO(chef): [refactor] ts依赖hls
if !group.config.HlsConfig.Enable {
if !group.config.HlsConfig.Enable && !group.config.HlsConfig.EnableHttps {
return
}

enable := group.config.HlsConfig.Enable || group.config.HlsConfig.EnableHttps
group.hlsMuxer = hls.NewMuxer(group.streamName, enable, &group.config.HlsConfig.MuxerConfig, group)
group.hlsMuxer = hls.NewMuxer(group.streamName, &group.config.HlsConfig.MuxerConfig, group)
group.hlsMuxer.Start()
}

func (group *Group) stopHlsIfNeeded() {
if !group.config.HlsConfig.Enable {
if !group.config.HlsConfig.Enable && !group.config.HlsConfig.EnableHttps {
return
}

if group.hlsMuxer != nil {
group.hlsMuxer.Dispose()
group.observer.CleanupHlsIfNeeded(group.appName, group.streamName, group.hlsMuxer.OutPath())
Expand Down
Loading

0 comments on commit 8824038

Please sign in to comment.