diff --git a/app/demo/dispatch/config.go b/app/demo/dispatch/config.go new file mode 100644 index 00000000..9263440c --- /dev/null +++ b/app/demo/dispatch/config.go @@ -0,0 +1,30 @@ +// Copyright 2020, Chef. All rights reserved. +// https://github.com/q191201771/lal +// +// Use of this source code is governed by a MIT-style license +// that can be found in the License file. +// +// Author: Chef (191201771@qq.com) + +package main + +// 服务启动前,设置好一些配置 +type Config struct { + // 本服务HTTP监听端口,用于接收各lal节点的HTTP Notify + ListenAddr string + + // 配置向本服务汇报的节点信息 + ServerID2Server map[string]Server + + // 级联拉流时,携带该URL参数,使得我们可以区分是级联拉流还是用户拉流 + PullSecretParam string + + // 检测lal节点update报活的超时时间 + ServerTimeoutSec int +} + +// lal节点静态配置信息 +type Server struct { + RTMPAddr string // 可用于级联拉流的RTMP地址 + APIAddr string // HTTP API接口地址 +} diff --git a/app/demo/dispatch/datamanager/data.go b/app/demo/dispatch/datamanager/data.go index 93d6bddc..435b6241 100644 --- a/app/demo/dispatch/datamanager/data.go +++ b/app/demo/dispatch/datamanager/data.go @@ -10,43 +10,118 @@ package datamanager import ( "sync" + "time" "github.com/q191201771/naza/pkg/nazalog" ) type DataManagerMemory struct { - mutex sync.Mutex - pubStream2ServerID map[string]string + serverTimeoutSec int + mutex sync.Mutex + serverID2pubStreams map[string]map[string]struct{} + serverID2AliveTS map[string]int64 } -func NewDataManagerMemory() *DataManagerMemory { - return &DataManagerMemory{ - pubStream2ServerID: make(map[string]string), +func NewDataManagerMemory(serverTimeoutSec int) *DataManagerMemory { + d := &DataManagerMemory{ + serverTimeoutSec: serverTimeoutSec, + serverID2pubStreams: make(map[string]map[string]struct{}), + serverID2AliveTS: make(map[string]int64), } + + // TODO chef: release goroutine + go func() { + var count int + for { + time.Sleep(1 * time.Second) + count++ + now := time.Now().Unix() + + d.mutex.Lock() + // 清除长时间没有update报活的节点 + for serverID, ts := range d.serverID2AliveTS { + if now > ts && now-ts > int64(d.serverTimeoutSec)*1000 { + nazalog.Warnf("server timeout. serverID=%s", serverID) + delete(d.serverID2pubStreams, serverID) + } + } + + // 定时打印数据日志 + if count%60 == 0 { + nazalog.Infof("data info. %+v", d.serverID2pubStreams) + } + + d.mutex.Unlock() + } + }() + + return d } func (d *DataManagerMemory) AddPub(streamName, serverID string) { d.mutex.Lock() defer d.mutex.Unlock() - nazalog.Infof("add pub. streamName=%s, serverID=%s", streamName, serverID) - d.pubStream2ServerID[streamName] = serverID + pss, _ := d.serverID2pubStreams[serverID] + if pss == nil { + pss = make(map[string]struct{}) + } + pss[streamName] = struct{}{} } func (d *DataManagerMemory) DelPub(streamName, serverID string) { d.mutex.Lock() defer d.mutex.Unlock() - // 清除用户推流对应的节点信息 - cacheServerID, exist := d.pubStream2ServerID[streamName] - if !exist || serverID != cacheServerID { - nazalog.Errorf("del pub but server id dismatch. streamName=%s, serverID=%s, cache id=%s", streamName, serverID, cacheServerID) + actualServerID, _ := d.queryPub(streamName) + if actualServerID != serverID { return } - delete(d.pubStream2ServerID, streamName) + delete(d.serverID2pubStreams[serverID], streamName) } func (d *DataManagerMemory) QueryPub(streamName string) (serverID string, exist bool) { d.mutex.Lock() defer d.mutex.Unlock() - serverID, exist = d.pubStream2ServerID[streamName] - return + return d.queryPub(streamName) +} + +func (d *DataManagerMemory) UpdatePub(serverID string, streamNameList []string) { + // 3. server超时,去掉所有上面所有的pub + + d.mutex.Lock() + defer d.mutex.Unlock() + + d.markAlive(serverID) + + // 更新serverID对应的stream列表 + pss := make(map[string]struct{}) + for _, s := range streamNameList { + pss[s] = struct{}{} + } + cpss := d.serverID2pubStreams[serverID] + d.serverID2pubStreams[serverID] = pss + + // only for log + for s := range pss { + if _, exist := cpss[s]; !exist { + nazalog.Warnf("update pub, add. serverID=%s, streamName=%s", serverID, s) + } + } + for s := range cpss { + if _, exist := pss[s]; !exist { + nazalog.Warnf("update pub, del. serverID=%s, streamName=%s", serverID, s) + } + } +} + +func (d *DataManagerMemory) queryPub(streamName string) (string, bool) { + for serverID, pss := range d.serverID2pubStreams { + if _, exist := pss[streamName]; exist { + return serverID, true + } + } + return "", false +} + +func (d *DataManagerMemory) markAlive(serverID string) { + d.serverID2AliveTS[serverID] = time.Now().Unix() } diff --git a/app/demo/dispatch/datamanager/data_interface.go b/app/demo/dispatch/datamanager/data_interface.go index f0a7f9b4..a3a05915 100644 --- a/app/demo/dispatch/datamanager/data_interface.go +++ b/app/demo/dispatch/datamanager/data_interface.go @@ -8,10 +8,17 @@ package datamanager +// 本demo的数据存储在内存中(只实现了DataManagerMemory),所以存在单点风险(指的是dispatch永久性发生故障,短暂故障或重启是ok的), +// 生产环境可以把数据存储在redis、mysql等数据库中(实现DataManager interface即可)。 + type DataManger interface { AddPub(streamName, serverID string) DelPub(streamName, serverID string) QueryPub(streamName string) (serverID string, exist bool) + + // 1. 全量校正。比如自身服务重启了,lal节点重启了,或其他原因Add、Del消息丢失了 + // 2. 心跳保活 + UpdatePub(serverID string, streamNameList []string) } type DataManagerType int @@ -20,10 +27,11 @@ const ( DMTMemory DataManagerType = iota ) -func NewDataManager(t DataManagerType) DataManger { +// @param serverTimeoutSec 超过该时间间隔没有Update,则清空对应节点的所有信息 +func NewDataManager(t DataManagerType, serverTimeoutSec int) DataManger { switch t { case DMTMemory: - return NewDataManagerMemory() + return NewDataManagerMemory(serverTimeoutSec) default: panic("invalid data manager type") } diff --git a/app/demo/dispatch/dispatch.go b/app/demo/dispatch/dispatch.go index 08557147..c72a0108 100644 --- a/app/demo/dispatch/dispatch.go +++ b/app/demo/dispatch/dispatch.go @@ -22,41 +22,32 @@ import ( "github.com/q191201771/naza/pkg/unique" ) +// // 结合lalserver的HTTP Notify事件通知,以及HTTP API接口, -// 简单演示如何试验一个简单的调度服务, +// 简单演示如何实现一个简单的调度服务, // 使得多个lalserver节点可以组成一个集群, // 集群内的所有节点功能都是相同的, // 你可以将流推送至任意一个节点,并从任意一个节点拉流, // 同一路流,推流和拉流可以在不同的节点。 // -// 本demo的数据存储在内存中,所以存在单点风险, -// 生产环境可以把数据存储在redis、mysql等数据库中, -// 多个调度节点从数据库中读写数据。 - -type Server struct { - rtmpAddr string - apiAddr string -} -// config -var ( - listenAddr = ":10101" - serverID2Server = map[string]Server{ +var config = Config{ + ListenAddr: ":10101", + ServerID2Server: map[string]Server{ "1": { - rtmpAddr: "127.0.0.1:19350", - apiAddr: "127.0.0.1:8083", + RTMPAddr: "127.0.0.1:19350", + APIAddr: "127.0.0.1:8083", }, "2": { - rtmpAddr: "127.0.0.1:19550", - apiAddr: "127.0.0.1:8283", + RTMPAddr: "127.0.0.1:19550", + APIAddr: "127.0.0.1:8283", }, - } - pullSecretParam = "lal_cluster_inner_pull=1" -) + }, + PullSecretParam: "lal_cluster_inner_pull=1", + ServerTimeoutSec: 30, +} -var ( - dataManager datamanager.DataManger -) +var dataManager datamanager.DataManger func OnPubStartHandler(w http.ResponseWriter, r *http.Request) { id := unique.GenUniqueKey("ReqID") @@ -70,24 +61,31 @@ func OnPubStartHandler(w http.ResponseWriter, r *http.Request) { // 演示如何踢掉session,服务于鉴权失败等场景 //if info.URLParam == "" { - if info.SessionID == "RTMPPUBSUB1" { - reqServer, exist := serverID2Server[info.ServerID] - if !exist { - nazalog.Errorf("[%s] req server id invalid.", id) - return - } - url := fmt.Sprintf("http://%s/api/ctrl/kick_out_session", reqServer.apiAddr) - var b base.APICtrlKickOutSession - b.StreamName = info.StreamName - b.SessionID = info.SessionID - - nazalog.Infof("[%s] ctrl kick out session. send to %s with %+v", id, reqServer.apiAddr, b) - if _, err := nazahttp.PostJson(url, b, nil); err != nil { - nazalog.Errorf("[%s] post json error. err=%+v", id, err) - } + //if info.SessionID == "RTMPPUBSUB1" { + // reqServer, exist := config.ServerID2Server[info.ServerID] + // if !exist { + // nazalog.Errorf("[%s] req server id invalid.", id) + // return + // } + // url := fmt.Sprintf("http://%s/api/ctrl/kick_out_session", reqServer.APIAddr) + // var b base.APICtrlKickOutSession + // b.StreamName = info.StreamName + // b.SessionID = info.SessionID + // + // nazalog.Infof("[%s] ctrl kick out session. send to %s with %+v", id, reqServer.APIAddr, b) + // if _, err := nazahttp.PostJson(url, b, nil); err != nil { + // nazalog.Errorf("[%s] post json error. err=%+v", id, err) + // } + // return + //} + + if _, exist := config.ServerID2Server[info.ServerID]; !exist { + nazalog.Errorf("server id has not config. serverID=%s", info.ServerID) + return } - dataManager.AddPub(info.StreamName, info.SessionID) + nazalog.Infof("add pub. streamName=%s, serverID=%s", info.StreamName, info.ServerID) + dataManager.AddPub(info.StreamName, info.ServerID) } func OnPubStopHandler(w http.ResponseWriter, r *http.Request) { @@ -100,6 +98,12 @@ func OnPubStopHandler(w http.ResponseWriter, r *http.Request) { } nazalog.Infof("[%s] on_pub_stop. info=%+v", id, info) + if _, exist := config.ServerID2Server[info.ServerID]; !exist { + nazalog.Errorf("server id has not config. serverID=%s", info.ServerID) + return + } + + nazalog.Infof("del pub. streamName=%s, serverID=%s", info.StreamName, info.ServerID) dataManager.DelPub(info.StreamName, info.ServerID) } @@ -115,18 +119,18 @@ func OnSubStartHandler(w http.ResponseWriter, r *http.Request) { // sub拉流时,判断是否需要触发pull级联拉流 // 1. 是内部级联拉流,不需要触发 - if strings.Contains(info.URLParam, pullSecretParam) { + if strings.Contains(info.URLParam, config.PullSecretParam) { nazalog.Infof("[%s] sub is pull by other node, ignore.", id) return } - // 2. 已经存在输入流,不需要触发 + // 2. 汇报的节点已经存在输入流,不需要触发 if info.HasInSession { nazalog.Infof("[%s] in not empty, ignore.", id) return } - // 3. 非法节点,本服务没有配置这个节点 - reqServer, exist := serverID2Server[info.ServerID] + // 3. 非法节点,本服务没有配置汇报的节点 + reqServer, exist := config.ServerID2Server[info.ServerID] if !exist { nazalog.Errorf("[%s] req server id invalid.", id) return @@ -139,23 +143,19 @@ func OnSubStartHandler(w http.ResponseWriter, r *http.Request) { return } - // TODO chef: 5. 这里的容错是否会出现?是否可以去掉? - pubServer, exist := serverID2Server[pubServerID] - if !exist { - nazalog.Errorf("[%s] pub server id invalid. serverID=%s", id, pubServerID) - return - } + pubServer, exist := config.ServerID2Server[pubServerID] + nazalog.Assert(true, exist) - // 向pub所在节点,发送pull级联拉流的命令 - url := fmt.Sprintf("http://%s/api/ctrl/start_pull", reqServer.apiAddr) + // 向汇报节点,发送pull级联拉流的命令,其中包含pub所在节点信息 + url := fmt.Sprintf("http://%s/api/ctrl/start_pull", reqServer.APIAddr) var b base.APICtrlStartPullReq b.Protocol = base.ProtocolRTMP - b.Addr = pubServer.rtmpAddr + b.Addr = pubServer.RTMPAddr b.AppName = info.AppName b.StreamName = info.StreamName - b.URLParam = pullSecretParam + b.URLParam = config.PullSecretParam - nazalog.Infof("[%s] ctrl pull. send to %s with %+v", id, reqServer.apiAddr, b) + nazalog.Infof("[%s] ctrl pull. send to %s with %+v", id, reqServer.APIAddr, b) if _, err := nazahttp.PostJson(url, b, nil); err != nil { nazalog.Errorf("[%s] post json error. err=%+v", id, err) } @@ -170,9 +170,6 @@ func OnSubStopHandler(w http.ResponseWriter, r *http.Request) { return } nazalog.Infof("[%s] on_sub_stop. info=%+v", id, info) - - // 没什么好做的 - // 目前lalserver在sub为空时,内部会主动关闭pull } func OnUpdateHandler(w http.ResponseWriter, r *http.Request) { @@ -185,9 +182,14 @@ func OnUpdateHandler(w http.ResponseWriter, r *http.Request) { } nazalog.Infof("[%s] on_update. info=%+v", id, info) - // TODO chef: - // 1. 更新pubStream2ServerID,去掉过期的,增加不存在的 - // 2. 没有pub但是有sub的,触发ctrl pull + var streamNameList []string + for _, g := range info.Groups { + // pub exist + if g.StatPub.SessionID != "" { + streamNameList = append(streamNameList, g.StreamName) + } + } + dataManager.UpdatePub(info.ServerID, streamNameList) } func logHandler(w http.ResponseWriter, r *http.Request) { @@ -196,9 +198,9 @@ func logHandler(w http.ResponseWriter, r *http.Request) { } func main() { - dataManager = datamanager.NewDataManager(datamanager.DMTMemory) + dataManager = datamanager.NewDataManager(datamanager.DMTMemory, config.ServerTimeoutSec) - l, err := net.Listen("tcp", listenAddr) + l, err := net.Listen("tcp", config.ListenAddr) nazalog.Assert(nil, err) m := http.NewServeMux() @@ -208,6 +210,7 @@ func main() { m.HandleFunc("/on_sub_stop", OnSubStopHandler) m.HandleFunc("/on_update", OnUpdateHandler) m.HandleFunc("/on_rtmp_connect", logHandler) + m.HandleFunc("/on_server_start", logHandler) srv := http.Server{ Handler: m, diff --git a/build.sh b/build.sh index f1b95dd2..23cdf994 100755 --- a/build.sh +++ b/build.sh @@ -18,11 +18,6 @@ GitStatus=`git status -s` BuildTime=`date +'%Y.%m.%d.%H%M%S'` BuildGoVersion=`go version` -# 如果读取到git信息,最新tag是v开头,则修改代码 pkg/base/version.go 中的版本信息 -if [[ ${GitTag} == v* ]]; then - gsed -i "/^var LALVersion/cvar LALVersion = \"${GitTag}\"" pkg/base/version.go -fi - LDFlags=" \ -X 'github.com/q191201771/naza/pkg/bininfo.GitTag=${GitTag}' \ -X 'github.com/q191201771/naza/pkg/bininfo.GitCommitLog=${GitCommitLog}' \ diff --git a/conf/lalserver.conf.json b/conf/lalserver.conf.json index 0258410c..541d362c 100644 --- a/conf/lalserver.conf.json +++ b/conf/lalserver.conf.json @@ -46,6 +46,7 @@ "http_notify": { "enable": true, "update_interval_sec": 5, + "on_server_start": "http://127.0.0.1:10101/on_server_start", "on_update": "http://127.0.0.1:10101/on_update", "on_pub_start": "http://127.0.0.1:10101/on_pub_start", "on_pub_stop": "http://127.0.0.1:10101/on_pub_stop", diff --git a/conf/lalserver.conf.json.brief b/conf/lalserver.conf.json.brief index 02aa13cc..8535e8a2 100644 --- a/conf/lalserver.conf.json.brief +++ b/conf/lalserver.conf.json.brief @@ -45,9 +45,10 @@ }, "server_id": "1", // 当前lalserver唯一ID。多个lalserver HTTP Notify同一个地址时,可通过该ID区分 "http_notify": { - "enable": true, // 是否开启HTTP Notify事件回调 - "update_interval_sec": 5, // update事件回调间隔,单位毫秒 - "on_update": "http://127.0.0.1:10101/on_update", // 各事件HTTP Notify事件回调地址 + "enable": true, // 是否开启HTTP Notify事件回调 + "update_interval_sec": 5, // update事件回调间隔,单位毫秒 + "on_server_start": "http://127.0.0.1:10101/on_server_start", // 各事件HTTP Notify事件回调地址 + "on_update": "http://127.0.0.1:10101/on_update", "on_pub_start": "http://127.0.0.1:10101/on_pub_start", "on_pub_stop": "http://127.0.0.1:10101/on_pub_stop", "on_sub_start": "http://127.0.0.1:10101/on_sub_start", diff --git a/conf/node2.conf.json b/conf/node2.conf.json index ea331a00..6084da53 100644 --- a/conf/node2.conf.json +++ b/conf/node2.conf.json @@ -34,6 +34,7 @@ "http_notify": { "enable": true, "update_interval_sec": 5, + "on_server_start": "http://127.0.0.1:10101/on_server_start", "on_update": "http://127.0.0.1:10101/on_update", "on_pub_start": "http://127.0.0.1:10101/on_pub_start", "on_pub_stop": "http://127.0.0.1:10101/on_pub_stop", diff --git a/gen_release.sh b/gen_release.sh index b0ad17d5..26ebab53 100755 --- a/gen_release.sh +++ b/gen_release.sh @@ -20,9 +20,9 @@ echo ${v} >> ${ROOT_DIR}/${OUT_DIR}/${prefix}linux/VERSION.txt echo ${v} >> ${ROOT_DIR}/${OUT_DIR}/${prefix}macos/VERSION.txt echo ${v} >> ${ROOT_DIR}/${OUT_DIR}/${prefix}windows/VERSION.txt -cp conf/lalserver.conf.json conf/edge.conf.json ${ROOT_DIR}/${OUT_DIR}/${prefix}linux/conf -cp conf/lalserver.conf.json conf/edge.conf.json ${ROOT_DIR}/${OUT_DIR}/${prefix}macos/conf -cp conf/lalserver.conf.json conf/edge.conf.json ${ROOT_DIR}/${OUT_DIR}/${prefix}windows/conf +cp conf/lalserver.conf.json ${ROOT_DIR}/${OUT_DIR}/${prefix}linux/conf +cp conf/lalserver.conf.json ${ROOT_DIR}/${OUT_DIR}/${prefix}macos/conf +cp conf/lalserver.conf.json ${ROOT_DIR}/${OUT_DIR}/${prefix}windows/conf GitTag=`git tag --sort=version:refname | tail -n 1` GitCommitLog=`git log --pretty=oneline -n 1` diff --git a/gen_tag.sh b/gen_tag.sh new file mode 100755 index 00000000..252f7d53 --- /dev/null +++ b/gen_tag.sh @@ -0,0 +1,43 @@ +#!/usr/bin/env bash + +# 根据CHANGELOG.md中的最新版本号,决定是否更新version.go和以及打git tag +# +# 步骤: +# 1. 提交所有代码 +# 2. 修改CHANGELOG.md +# 3. 执行gen_tag.sh + +#set -x + +# CHANGELOG.md中的版本号 +NewVersion=`cat CHANGELOG.md| grep '#### v' | head -n 1 | awk '{print $2}'` +echo 'newest version in CHANGELOG.md: ' $NewVersion + +# git tag中的版本号 +GitTag=`git tag --sort=version:refname | tail -n 1` +echo "newest version in git tag: " $GitTag + +# 源码中的版本号 +FileVersion=`cat pkg/base/version.go | grep 'const LALVersion' | awk -F\" '{print $2}'` +echo "newest version in version.go: " $FileVersion + +# CHANGELOG.md和源码中的不一致,更新源码,并提交修改 +if [ "$NewVersion" == "$FileVersion" ];then + echo 'same tag, noop.' +else + echo 'update version.go' + #gsed -i "/^var LALVersion/cvar LALVersion = \"${NewVersion}\"" pkg/base/version.go + #git add pkg/base/version.go + #git commit -m '${NewVersion} -> version.go' + #git push +fi + +# CHANGELOG.md和git tag不一致,打新的tag +if [ "$NewVersion" == "$FileVersion" ];then + echo 'same tag, noop.' +else + echo 'add tag.' + #git tag ${NewVersion} + #git push --tags +fi + diff --git a/pkg/base/avpacket.go b/pkg/base/avpacket.go index 6ca59816..9a20a91f 100644 --- a/pkg/base/avpacket.go +++ b/pkg/base/avpacket.go @@ -10,7 +10,7 @@ package base type AVPacketPT int -var ( +const ( AVPacketPTAVC AVPacketPT = RTPPacketTypeAVCOrHEVC AVPacketPTAAC AVPacketPT = RTPPacketTypeAAC AVPacketPTHEVC AVPacketPT = 98 diff --git a/pkg/base/http_api_t.go b/pkg/base/http_api_t.go index 0536adb1..5d51406e 100644 --- a/pkg/base/http_api_t.go +++ b/pkg/base/http_api_t.go @@ -10,7 +10,7 @@ package base // 文档见: https://pengrl.com/p/20100/ -const HTTPAPIVersion = "v0.1.1" +const HTTPAPIVersion = "v0.1.2" const ( ErrorCodeSucc = 0 @@ -28,15 +28,18 @@ type HTTPResponseBasic struct { Desp string `json:"desp"` } +type LALInfo struct { + ServerID string `json:"server_id"` + BinInfo string `json:"bin_info"` + LalVersion string `json:"lal_version"` + APIVersion string `json:"api_version"` + NotifyVersion string `json:"notify_version"` + StartTime string `json:"start_time"` +} + type APIStatLALInfo struct { HTTPResponseBasic - Data struct { - BinInfo string `json:"bin_info"` - LalVersion string `json:"lal_version"` - APIVersion string `json:"api_version"` - NotifyVersion string `json:"notify_version"` - StartTime string `json:"start_time"` - } `json:"data"` + Data LALInfo `json:"data"` } type APIStatAllGroup struct { diff --git a/pkg/base/http_notify_t.go b/pkg/base/http_notify_t.go index 042051d8..466f9d74 100644 --- a/pkg/base/http_notify_t.go +++ b/pkg/base/http_notify_t.go @@ -10,7 +10,7 @@ package base // 文档见: https://pengrl.com/p/20101/ -const HTTPNotifyVersion = "v0.0.3" +const HTTPNotifyVersion = "v0.0.4" type SessionEventCommonInfo struct { ServerID string `json:"server_id"` diff --git a/pkg/base/session.go b/pkg/base/session.go new file mode 100644 index 00000000..94e8256a --- /dev/null +++ b/pkg/base/session.go @@ -0,0 +1,37 @@ +// Copyright 2020, Chef. All rights reserved. +// https://github.com/q191201771/lal +// +// Use of this source code is governed by a MIT-style license +// that can be found in the License file. +// +// Author: Chef (191201771@qq.com) + +package base + +// +// | . | rtmp pub | rtsp pub | rtmp sub | flv sub | ts sub | rtsp sub | rtmp push | rtmp pull | flv pull | +// | - | - | - | - | - | - | - | - | - | - | +// | file | server_session.go | server_pub_session.go | server_session.go | server_sub_session.go | server_sub_session.go | server_sub_session.go | client_push_session.go | client_pull_session.go | client_pull_session.go | +// | struct | ServerSession | PubSession | ServerSession | SubSession | SubSession | SubSession | PushSession | PullSession | PullSession | +// +// +// +// | . | all | rtmppub | rtsppub | rtmpsub | flvsub | tssub | rtspsub | rtmppush | rtmppull | flvpull | +// | - | - | - | - | - | - | - | - | - | - | - | +// | UniqueKey | √ | √ | √ | √ | √ | √ | √ | √ | √ | √ | +// | StreamName | x | √ | √ | √ | √ | √ | √ | √ | √ | x | +// | RunLoop() | x | √ | x | √ | √ | √ | x | x | x | x | +// | Dispose() | x | √ | √ | √ | √ | √ | x | √ | √ | √ | +// | GetStat() | x | √ | √ | √ | √ | √ | x | x | √ | x | +// | UpdateStat() | x | √ | √ | √ | √ | √ | x | x | √ | x | +// | IsAlive() | x | √ | √ | √ | √ | √ | x | x | √ | x | +// | SingleConn | x | √ | x | √ | √ | √ | √ | √ | √ | √ | +// | RemoteAddr() | x | √ | x | √ | √ | x | x | x | x | x | +// +// Dispose由外部调用,表示主动关闭正常的session +// 外部调用Dispose后,不应继续使用该session +// Dispose后,RunLoop结束阻塞 +// +// 对端关闭,或session内部关闭也会导致RunLoop结束阻塞 +// +// RunLoop结束阻塞后,可通知上层,告知session生命周期结束 diff --git a/pkg/base/version.go b/pkg/base/version.go index 93a83d9b..0161cadf 100644 --- a/pkg/base/version.go +++ b/pkg/base/version.go @@ -15,8 +15,8 @@ import "strings" // 另外,我们也在本文件提供另外一些信息 // 并且将这些信息打入可执行文件、日志、各协议中的标准版本字段中 -// 版本,该变量由build脚本修改维护 -var LALVersion = "v0.16.0" +// 版本,该变量由外部脚本修改维护 +const LALVersion = "v0.16.0" var ( LALLibraryName = "lal" diff --git a/pkg/httpts/server_sub_session.go b/pkg/httpts/server_sub_session.go index cee25f0c..58010453 100644 --- a/pkg/httpts/server_sub_session.go +++ b/pkg/httpts/server_sub_session.go @@ -25,13 +25,13 @@ import ( var tsHTTPResponseHeader []byte type SubSession struct { - UniqueKey string - - StartTick int64 + UniqueKey string StreamName string - AppName string - URI string - Headers map[string]string + + StartTick int64 + AppName string + URI string + Headers map[string]string IsFresh bool diff --git a/pkg/logic/config.go b/pkg/logic/config.go index 6bf98ed6..6a16b015 100644 --- a/pkg/logic/config.go +++ b/pkg/logic/config.go @@ -19,7 +19,7 @@ import ( "github.com/q191201771/naza/pkg/nazalog" ) -//var ErrMissKeyField = errors.New("missing key field in config file") +const ConfigVersion = "0.0.1" type Config struct { RTMPConfig RTMPConfig `json:"rtmp"` @@ -82,6 +82,7 @@ type HTTPAPIConfig struct { type HTTPNotifyConfig struct { Enable bool `json:"enable"` UpdateIntervalSec int `json:"update_interval_sec"` + OnServerStart string `json:"on_server_start"` OnUpdate string `json:"on_update"` OnPubStart string `json:"on_pub_start"` OnPubStop string `json:"on_pub_stop"` diff --git a/pkg/logic/group.go b/pkg/logic/group.go index 40420869..358cebf4 100644 --- a/pkg/logic/group.go +++ b/pkg/logic/group.go @@ -38,9 +38,6 @@ import ( "github.com/q191201771/naza/pkg/nazalog" ) -// TODO chef: -// group的函数比较多,考虑调整一下函数排列位置 - type Group struct { UniqueKey string // const after init appName string // const after init @@ -67,7 +64,7 @@ type Group struct { url2PushProxy map[string]*pushProxy // hlsMuxer *hls.Muxer - // + // rtmp pub/pull使用 gopCache *GOPCache httpflvGopCache *GOPCache // rtsp pub使用 @@ -477,7 +474,7 @@ func (group *Group) OnRTPPacket(pkt rtprtcp.RTPPacket) { defer group.mutex.Unlock() for s := range group.rtspSubSessionSet { - s.WriteRawRTPPacket(pkt.Raw) + s.WriteRTPPacket(pkt) } } @@ -809,7 +806,7 @@ func (group *Group) broadcastRTMP(msg base.RTMPMsg) { for session := range group.rtmpSubSessionSet { // ## 3.1. 如果是新的 sub session,发送已缓存的信息 if session.IsFresh { - // TODO 头信息和full gop也可以在SubSession刚加入时发送 + // TODO chef: 头信息和full gop也可以在SubSession刚加入时发送 if group.gopCache.Metadata != nil { _ = session.AsyncWrite(group.gopCache.Metadata) } diff --git a/pkg/logic/http_api.go b/pkg/logic/http_api.go index e630f890..ab6cf2f6 100644 --- a/pkg/logic/http_api.go +++ b/pkg/logic/http_api.go @@ -107,6 +107,7 @@ func (h *HTTPAPIServer) statLALInfoHandler(w http.ResponseWriter, req *http.Requ v.Data.APIVersion = base.HTTPAPIVersion v.Data.NotifyVersion = base.HTTPNotifyVersion v.Data.StartTime = serverStartTime + v.Data.ServerID = config.ServerID feedback(v, w) } diff --git a/pkg/logic/http_notify.go b/pkg/logic/http_notify.go index 0c3ae05e..2c1d5723 100644 --- a/pkg/logic/http_notify.go +++ b/pkg/logic/http_notify.go @@ -12,6 +12,8 @@ import ( "net/http" "time" + "github.com/q191201771/naza/pkg/bininfo" + "github.com/q191201771/lal/pkg/base" "github.com/q191201771/naza/pkg/nazahttp" "github.com/q191201771/naza/pkg/nazalog" @@ -34,6 +36,17 @@ type HTTPNotify struct { var httpNotify *HTTPNotify +func (h *HTTPNotify) OnServerStart() { + var info base.LALInfo + info.BinInfo = bininfo.StringifySingleLine() + info.LalVersion = base.LALVersion + info.APIVersion = base.HTTPAPIVersion + info.NotifyVersion = base.HTTPNotifyVersion + info.StartTime = serverStartTime + info.ServerID = config.ServerID + h.asyncPost(config.HTTPNotifyConfig.OnServerStart, info) +} + func (h *HTTPNotify) OnUpdate(info base.UpdateInfo) { h.asyncPost(config.HTTPNotifyConfig.OnUpdate, info) } diff --git a/pkg/logic/server_manager.go b/pkg/logic/server_manager.go index 62e6a923..9139cb60 100644 --- a/pkg/logic/server_manager.go +++ b/pkg/logic/server_manager.go @@ -67,6 +67,8 @@ func NewServerManager() *ServerManager { } func (sm *ServerManager) RunLoop() { + httpNotify.OnServerStart() + if sm.rtmpServer != nil { if err := sm.rtmpServer.Listen(); err != nil { nazalog.Error(err) @@ -372,6 +374,9 @@ func (sm *ServerManager) OnNewHTTPTSSubSession(session *httpts.SubSession) bool defer sm.mutex.Unlock() group := sm.getOrCreateGroup(session.AppName, session.StreamName) group.AddHTTPTSSubSession(session) + + // TODO chef: 部分session没有Notify + return true } diff --git a/pkg/rtmp/client_pull_session.go b/pkg/rtmp/client_pull_session.go index 13cee4fe..7da8b7d0 100644 --- a/pkg/rtmp/client_pull_session.go +++ b/pkg/rtmp/client_pull_session.go @@ -73,7 +73,6 @@ func (s *PullSession) GetStat() base.StatSession { return s.core.GetStat() } -// TODO chef: 默认每5秒调用一次 func (s *PullSession) UpdateStat(interval uint32) { s.core.UpdateStat(interval) } diff --git a/pkg/rtmp/client_session.go b/pkg/rtmp/client_session.go index 09b4eb99..c7e41a49 100644 --- a/pkg/rtmp/client_session.go +++ b/pkg/rtmp/client_session.go @@ -191,7 +191,6 @@ func (s *ClientSession) GetStat() base.StatSession { return s.stat } -// TODO chef: 默认每5秒调用一次 func (s *ClientSession) UpdateStat(interval uint32) { currStat := s.conn.GetStat() var diffStat connection.Stat diff --git a/pkg/rtsp/rtsp.go b/pkg/rtsp/rtsp.go index 0b0ae550..0596b831 100644 --- a/pkg/rtsp/rtsp.go +++ b/pkg/rtsp/rtsp.go @@ -56,7 +56,6 @@ func init() { // ffmpeg -re -stream_loop -1 -i /Volumes/Data/tmp/wontcry.flv -acodec copy -vcodec copy -f rtsp rtsp://localhost:5544/live/test110 // read http request. method=OPTIONS, uri=rtsp://localhost:5544/live/test110, headers=map[CSeq:1 User-Agent:Lavf57.83.100], body= - server.go:95 -// // read http request. method=ANNOUNCE, uri=rtsp://localhost:5544/live/test110, headers=map[CSeq:2 Content-Length:490 Content-Type:application/sdp User-Agent:Lavf57.83.100], body=v=0 // o=- 0 0 IN IP4 127.0.0.1 // s=No Name @@ -73,13 +72,9 @@ func init() { // a=fmtp:97 profile-level-id=1;mode=AAC-hbr;sizelength=13;indexlength=3;indexdeltalength=3; config=121056E500 // a=control:streamid=1 // - server.go:95 -// // read http request. method=SETUP, uri=rtsp://localhost:5544/live/test110/streamid=0, headers=map[CSeq:3 Transport:RTP/AVP/UDP;unicast;client_port=32182-32183;mode=record User-Agent:Lavf57.83.100], body= - server.go:95 -// // read http request. method=SETUP, uri=rtsp://localhost:5544/live/test110/streamid=1, headers=map[CSeq:4 Session:191201771 Transport:RTP/AVP/UDP;unicast;client_port=32184-32185;mode=record User-Agent:Lavf57.83.100], body= - server.go:95 -// // read http request. method=RECORD, uri=rtsp://localhost:5544/live/test110, headers=map[CSeq:5 Range:npt=0.000- Session:191201771 User-Agent:Lavf57.83.100], body= - server.go:95 -// // read http request. method=TEARDOWN, uri=rtsp://localhost:5544/live/test110, headers=map[CSeq:6 Session:191201771 User-Agent:Lavf57.83.100], body= - server.go:95 // read udp packet failed. err=read udp [::]:8002: use of closed network connection - server_pub_session.go:199 diff --git a/pkg/rtsp/server.go b/pkg/rtsp/server.go index 9f780a2d..00ddc50c 100644 --- a/pkg/rtsp/server.go +++ b/pkg/rtsp/server.go @@ -123,14 +123,14 @@ Loop: break Loop } - sdpCtx, err := sdp.ParseSDP(body) + sdpLogicCtx, err := sdp.ParseSDP2LogicContext(body) if err != nil { nazalog.Errorf("parse sdp failed. err=%v", err) break Loop } pubSession = NewPubSession(presentation) - pubSession.InitWithSDP(body, sdpCtx) + pubSession.InitWithSDP(body, sdpLogicCtx) s.m.Lock() s.presentation2PubSession[presentation] = pubSession @@ -185,8 +185,10 @@ Loop: break Loop } } else if subSession != nil { - subSession.SetRTPConn(rtpConn) - subSession.SetRTCPConn(rtcpConn) + if err = subSession.Setup(uri, rtpConn, rtcpConn); err != nil { + nazalog.Errorf("SETUP failed. err=%+v", err) + break Loop + } } else { nazalog.Error("SETUP while session not exist.") break Loop @@ -233,7 +235,8 @@ Loop: pubSession, ok := s.presentation2PubSession[presentation] s.m.Unlock() if ok { - resp := PackResponseDescribe(headers[HeaderFieldCSeq], string(pubSession.rawSDP)) + rawSDP, _ := pubSession.GetSDP() + resp := PackResponseDescribe(headers[HeaderFieldCSeq], string(rawSDP)) _, _ = conn.Write([]byte(resp)) } else { nazalog.Errorf("rtsp sub but pub not exist. presentation=%s", presentation) diff --git a/pkg/rtsp/server_pub_session.go b/pkg/rtsp/server_pub_session.go index 0f8f9ba8..dcb24e57 100644 --- a/pkg/rtsp/server_pub_session.go +++ b/pkg/rtsp/server_pub_session.go @@ -26,7 +26,9 @@ import ( "github.com/q191201771/naza/pkg/nazalog" ) -// PubSession会同时向上层回调rtp packet,以及rtp合并后的av packet +// PubSession会向上层回调两种格式的数据: +// 1. 原始的rtp packet +// 2. rtp合并后的av packet type PubSessionObserver interface { OnRTPPacket(pkt rtprtcp.RTPPacket) @@ -39,21 +41,18 @@ type PubSessionObserver interface { } type PubSession struct { - UniqueKey string - StreamName string // presentation + UniqueKey string + StreamName string // presentation + observer PubSessionObserver avPacketQueue *AVPacketQueue - audioUnpacker *rtprtcp.RTPUnpacker - videoUnpacker *rtprtcp.RTPUnpacker - audioRRProducer *rtprtcp.RRProducer - videoRRProducer *rtprtcp.RRProducer - audioSsrc uint32 - videoSsrc uint32 - audioPayloadType base.AVPacketPT - videoPayloadType base.AVPacketPT - audioAControl string - videoAControl string + audioUnpacker *rtprtcp.RTPUnpacker + videoUnpacker *rtprtcp.RTPUnpacker + audioRRProducer *rtprtcp.RRProducer + videoRRProducer *rtprtcp.RRProducer + audioSsrc uint32 + videoSsrc uint32 audioRTPConn *nazanet.UDPConnection videoRTPConn *nazanet.UDPConnection @@ -65,13 +64,9 @@ type PubSession struct { staleStat *connection.Stat stat base.StatPub - vps []byte // 如果是H265的话 - sps []byte - pps []byte - asc []byte - - m sync.Mutex - rawSDP []byte + m sync.Mutex + rawSDP []byte // const after set + sdpLogicCtx sdp.LogicContext // const after set } func NewPubSession(streamName string) *PubSession { @@ -94,79 +89,31 @@ func NewPubSession(streamName string) *PubSession { func (p *PubSession) SetObserver(observer PubSessionObserver) { p.observer = observer - p.observer.OnAVConfig(p.asc, p.vps, p.sps, p.pps) + p.observer.OnAVConfig(p.sdpLogicCtx.ASC, p.sdpLogicCtx.VPS, p.sdpLogicCtx.SPS, p.sdpLogicCtx.PPS) } -func (p *PubSession) InitWithSDP(rawSDP []byte, sdpCtx sdp.SDPContext) { +func (p *PubSession) InitWithSDP(rawSDP []byte, sdpLogicCtx sdp.LogicContext) { p.m.Lock() p.rawSDP = rawSDP + p.sdpLogicCtx = sdpLogicCtx p.m.Unlock() - var err error - - var audioClockRate int - var videoClockRate int - - for i, item := range sdpCtx.ARTPMapList { - switch item.PayloadType { - case base.RTPPacketTypeAVCOrHEVC: - videoClockRate = item.ClockRate - if item.EncodingName == "H265" { - p.videoPayloadType = base.AVPacketPTHEVC - } else { - p.videoPayloadType = base.AVPacketPTAVC - } - if i < len(sdpCtx.AControlList) { - p.videoAControl = sdpCtx.AControlList[i].Value - } - case base.RTPPacketTypeAAC: - audioClockRate = item.ClockRate - p.audioPayloadType = base.AVPacketPTAAC - if i < len(sdpCtx.AControlList) { - p.audioAControl = sdpCtx.AControlList[i].Value - } - default: - nazalog.Errorf("unknown payloadType. type=%d", item.PayloadType) - } - } - - for _, item := range sdpCtx.AFmtPBaseList { - switch item.Format { - case base.RTPPacketTypeAVCOrHEVC: - if p.videoPayloadType == base.AVPacketPTHEVC { - p.vps, p.sps, p.pps, err = sdp.ParseVPSSPSPPS(item) - } else { - p.sps, p.pps, err = sdp.ParseSPSPPS(item) - } - if err != nil { - nazalog.Errorf("parse sps pps from sdp failed.") - } - case base.RTPPacketTypeAAC: - p.asc, err = sdp.ParseASC(item) - if err != nil { - nazalog.Errorf("parse asc from sdp failed.") - } - default: - nazalog.Errorf("unknown format. fmt=%d", item.Format) - } - } - - p.audioUnpacker = rtprtcp.NewRTPUnpacker(p.audioPayloadType, audioClockRate, unpackerItemMaxSize, p.onAVPacketUnpacked) - p.videoUnpacker = rtprtcp.NewRTPUnpacker(p.videoPayloadType, videoClockRate, unpackerItemMaxSize, p.onAVPacketUnpacked) + p.audioUnpacker = rtprtcp.NewRTPUnpacker(p.sdpLogicCtx.AudioPayloadType, p.sdpLogicCtx.AudioClockRate, unpackerItemMaxSize, p.onAVPacketUnpacked) + p.videoUnpacker = rtprtcp.NewRTPUnpacker(p.sdpLogicCtx.VideoPayloadType, p.sdpLogicCtx.VideoClockRate, unpackerItemMaxSize, p.onAVPacketUnpacked) - p.audioRRProducer = rtprtcp.NewRRProducer(audioClockRate) - p.videoRRProducer = rtprtcp.NewRRProducer(videoClockRate) + p.audioRRProducer = rtprtcp.NewRRProducer(p.sdpLogicCtx.AudioClockRate) + p.videoRRProducer = rtprtcp.NewRRProducer(p.sdpLogicCtx.VideoClockRate) - if p.audioPayloadType != 0 && p.videoPayloadType != 0 { + if p.sdpLogicCtx.AudioPayloadType != 0 && p.sdpLogicCtx.VideoPayloadType != 0 { p.avPacketQueue = NewAVPacketQueue(p.onAVPacket) } } func (p *PubSession) Setup(uri string, rtpConn, rtcpConn *nazanet.UDPConnection) error { - if strings.HasSuffix(uri, p.audioAControl) { + if strings.HasSuffix(uri, p.sdpLogicCtx.AudioAControl) { p.audioRTPConn = rtpConn p.audioRTCPConn = rtcpConn - } else if strings.HasSuffix(uri, p.videoAControl) { + } else if strings.HasSuffix(uri, p.sdpLogicCtx.VideoAControl) { p.videoRTPConn = rtpConn p.videoRTCPConn = rtcpConn } else { @@ -228,10 +175,10 @@ func (p *PubSession) IsAlive(interval uint32) (ret bool) { return ret } -func (p *PubSession) GetSDP() []byte { +func (p *PubSession) GetSDP() ([]byte, sdp.LogicContext) { p.m.Lock() defer p.m.Unlock() - return p.rawSDP + return p.rawSDP, p.sdpLogicCtx } // callback by UDPConnection diff --git a/pkg/rtsp/server_sub_session.go b/pkg/rtsp/server_sub_session.go index eead5619..cd7707e8 100644 --- a/pkg/rtsp/server_sub_session.go +++ b/pkg/rtsp/server_sub_session.go @@ -11,24 +11,32 @@ package rtsp import ( "encoding/hex" "net" + "strings" "time" + "github.com/q191201771/lal/pkg/rtprtcp" + "github.com/q191201771/lal/pkg/sdp" + "github.com/q191201771/lal/pkg/base" "github.com/q191201771/naza/pkg/nazalog" "github.com/q191201771/naza/pkg/nazanet" ) -// to be continued -// 注意,音频和视频是不同的UDP连接 -// pub和sub挂载转发时,需要对应上 +// TODO chef: 主动发送SR type SubSession struct { - UniqueKey string - StreamName string + UniqueKey string // const after ctor + StreamName string // const after ctor + + rawSDP []byte // const after set + sdpLogicCtx sdp.LogicContext // const after set - rtpConn *nazanet.UDPConnection - rtcpConn *nazanet.UDPConnection - stat base.StatPub + audioRTPConn *nazanet.UDPConnection + videoRTPConn *nazanet.UDPConnection + audioRTCPConn *nazanet.UDPConnection + videoRTCPConn *nazanet.UDPConnection + + stat base.StatPub } func NewSubSession(streamName string) *SubSession { @@ -47,14 +55,26 @@ func NewSubSession(streamName string) *SubSession { return ss } -func (s *SubSession) SetRTPConn(conn *nazanet.UDPConnection) { - s.rtpConn = conn - go s.rtpConn.RunLoop(s.onReadUDPPacket) +func (s *SubSession) InitWithSDP(rawSDP []byte, sdpLogicCtx sdp.LogicContext) { + s.rawSDP = rawSDP + s.sdpLogicCtx = sdpLogicCtx } -func (s *SubSession) SetRTCPConn(conn *nazanet.UDPConnection) { - s.rtcpConn = conn - go s.rtcpConn.RunLoop(s.onReadUDPPacket) +func (s *SubSession) Setup(uri string, rtpConn, rtcpConn *nazanet.UDPConnection) error { + if strings.HasSuffix(uri, s.sdpLogicCtx.AudioAControl) { + s.audioRTPConn = rtpConn + s.audioRTCPConn = rtcpConn + } else if strings.HasSuffix(uri, s.sdpLogicCtx.VideoAControl) { + s.videoRTPConn = rtpConn + s.videoRTCPConn = rtcpConn + } else { + return ErrRTSP + } + + go rtpConn.RunLoop(s.onReadUDPPacket) + go rtcpConn.RunLoop(s.onReadUDPPacket) + + return nil } func (s *SubSession) onReadUDPPacket(b []byte, rAddr *net.UDPAddr, err error) bool { @@ -62,8 +82,13 @@ func (s *SubSession) onReadUDPPacket(b []byte, rAddr *net.UDPAddr, err error) bo return true } -func (s *SubSession) WriteRawRTPPacket(b []byte) { - if err := s.rtpConn.Write(b); err != nil { - nazalog.Errorf("err=%+v", err) - } +//to be continued +//conn可能还不存在,这里涉及到pub和sub是否需要等到setup再回调给上层的问题 +func (s *SubSession) WriteRTPPacket(packet rtprtcp.RTPPacket) { + //switch packet.Header.PacketType { + //case base.RTPPacketTypeAVCOrHEVC: + // s.videoRTPConn.Write(packet.Raw) + //case base.RTPPacketTypeAAC: + // s.audioRTPConn.Write(packet.Raw) + //} } diff --git a/pkg/sdp/sdp.go b/pkg/sdp/sdp.go index 107d57d7..288425fe 100644 --- a/pkg/sdp/sdp.go +++ b/pkg/sdp/sdp.go @@ -19,7 +19,24 @@ import ( var ErrSDP = errors.New("lal.sdp: fxxk") -type SDPContext struct { +const ( + ARTPMapEncodingName = "H265" +) + +type LogicContext struct { + AudioClockRate int + VideoClockRate int + AudioPayloadType base.AVPacketPT + VideoPayloadType base.AVPacketPT + AudioAControl string + VideoAControl string + ASC []byte + VPS []byte + SPS []byte + PPS []byte +} + +type RawContext struct { ARTPMapList []ARTPMap AFmtPBaseList []AFmtPBase AControlList []AControl @@ -41,9 +58,64 @@ type AControl struct { Value string } +func ParseSDP2LogicContext(b []byte) (LogicContext, error) { + var ret LogicContext + + c, err := ParseSDP2RawContext(b) + if err != nil { + return ret, err + } + + for i, item := range c.ARTPMapList { + switch item.PayloadType { + case base.RTPPacketTypeAVCOrHEVC: + ret.VideoClockRate = item.ClockRate + if item.EncodingName == ARTPMapEncodingName { + ret.VideoPayloadType = base.AVPacketPTHEVC + } else { + ret.VideoPayloadType = base.AVPacketPTAVC + } + if i < len(c.AControlList) { + ret.VideoAControl = c.AControlList[i].Value + } + case base.RTPPacketTypeAAC: + ret.AudioClockRate = item.ClockRate + ret.AudioPayloadType = base.AVPacketPTAAC + if i < len(c.AControlList) { + ret.AudioAControl = c.AControlList[i].Value + } + default: + return ret, ErrSDP + } + } + + for _, item := range c.AFmtPBaseList { + switch item.Format { + case base.RTPPacketTypeAVCOrHEVC: + if ret.VideoPayloadType == base.AVPacketPTHEVC { + ret.VPS, ret.SPS, ret.PPS, err = ParseVPSSPSPPS(item) + } else { + ret.SPS, ret.PPS, err = ParseSPSPPS(item) + } + if err != nil { + return ret, err + } + case base.RTPPacketTypeAAC: + ret.ASC, err = ParseASC(item) + if err != nil { + return ret, err + } + default: + return ret, ErrSDP + } + } + + return ret, nil +} + // 例子见单元测试 -func ParseSDP(b []byte) (SDPContext, error) { - var sdpCtx SDPContext +func ParseSDP2RawContext(b []byte) (RawContext, error) { + var sdpCtx RawContext s := string(b) lines := strings.Split(s, "\r\n") diff --git a/pkg/sdp/sdp_test.go b/pkg/sdp/sdp_test.go index 62c8e8c6..241f7e8f 100644 --- a/pkg/sdp/sdp_test.go +++ b/pkg/sdp/sdp_test.go @@ -44,8 +44,8 @@ var goldenPPS = []byte{ 0x68, 0xEB, 0xEC, 0xB2, 0x2C, } -func TestParseSDP(t *testing.T) { - sdpCtx, err := sdp.ParseSDP([]byte(goldenSDP)) +func TestParseSDP2RawContext(t *testing.T) { + sdpCtx, err := sdp.ParseSDP2RawContext([]byte(goldenSDP)) assert.Equal(t, nil, err) nazalog.Debugf("sdp=%+v", sdpCtx) }