Skip to content

Commit

Permalink
commit messages:
Browse files Browse the repository at this point in the history
- `/app/demo/dispatch`处理`on_update`事件回调
- HTTP Notify增加新事件回调`/on_server_start`
- HTTP API `/api/stat/lal_info` 中增加`server_id`字段
- 增加`gen_tag.sh`,用于打tag
  • Loading branch information
q191201771 committed Nov 21, 2020
1 parent 08287e7 commit 34bd5cf
Show file tree
Hide file tree
Showing 29 changed files with 483 additions and 229 deletions.
30 changes: 30 additions & 0 deletions app/demo/dispatch/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
// Copyright 2020, Chef. All rights reserved.
// https://github.com/q191201771/lal
//
// Use of this source code is governed by a MIT-style license
// that can be found in the License file.
//
// Author: Chef ([email protected])

package main

// 服务启动前,设置好一些配置
type Config struct {
// 本服务HTTP监听端口,用于接收各lal节点的HTTP Notify
ListenAddr string

// 配置向本服务汇报的节点信息
ServerID2Server map[string]Server

// 级联拉流时,携带该URL参数,使得我们可以区分是级联拉流还是用户拉流
PullSecretParam string

// 检测lal节点update报活的超时时间
ServerTimeoutSec int
}

// lal节点静态配置信息
type Server struct {
RTMPAddr string // 可用于级联拉流的RTMP地址
APIAddr string // HTTP API接口地址
}
103 changes: 89 additions & 14 deletions app/demo/dispatch/datamanager/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,43 +10,118 @@ package datamanager

import (
"sync"
"time"

"github.com/q191201771/naza/pkg/nazalog"
)

type DataManagerMemory struct {
mutex sync.Mutex
pubStream2ServerID map[string]string
serverTimeoutSec int
mutex sync.Mutex
serverID2pubStreams map[string]map[string]struct{}
serverID2AliveTS map[string]int64
}

func NewDataManagerMemory() *DataManagerMemory {
return &DataManagerMemory{
pubStream2ServerID: make(map[string]string),
func NewDataManagerMemory(serverTimeoutSec int) *DataManagerMemory {
d := &DataManagerMemory{
serverTimeoutSec: serverTimeoutSec,
serverID2pubStreams: make(map[string]map[string]struct{}),
serverID2AliveTS: make(map[string]int64),
}

// TODO chef: release goroutine
go func() {
var count int
for {
time.Sleep(1 * time.Second)
count++
now := time.Now().Unix()

d.mutex.Lock()
// 清除长时间没有update报活的节点
for serverID, ts := range d.serverID2AliveTS {
if now > ts && now-ts > int64(d.serverTimeoutSec)*1000 {
nazalog.Warnf("server timeout. serverID=%s", serverID)
delete(d.serverID2pubStreams, serverID)
}
}

// 定时打印数据日志
if count%60 == 0 {
nazalog.Infof("data info. %+v", d.serverID2pubStreams)
}

d.mutex.Unlock()
}
}()

return d
}

func (d *DataManagerMemory) AddPub(streamName, serverID string) {
d.mutex.Lock()
defer d.mutex.Unlock()
nazalog.Infof("add pub. streamName=%s, serverID=%s", streamName, serverID)
d.pubStream2ServerID[streamName] = serverID
pss, _ := d.serverID2pubStreams[serverID]
if pss == nil {
pss = make(map[string]struct{})
}
pss[streamName] = struct{}{}
}

func (d *DataManagerMemory) DelPub(streamName, serverID string) {
d.mutex.Lock()
defer d.mutex.Unlock()
// 清除用户推流对应的节点信息
cacheServerID, exist := d.pubStream2ServerID[streamName]
if !exist || serverID != cacheServerID {
nazalog.Errorf("del pub but server id dismatch. streamName=%s, serverID=%s, cache id=%s", streamName, serverID, cacheServerID)
actualServerID, _ := d.queryPub(streamName)
if actualServerID != serverID {
return
}
delete(d.pubStream2ServerID, streamName)
delete(d.serverID2pubStreams[serverID], streamName)
}

func (d *DataManagerMemory) QueryPub(streamName string) (serverID string, exist bool) {
d.mutex.Lock()
defer d.mutex.Unlock()
serverID, exist = d.pubStream2ServerID[streamName]
return
return d.queryPub(streamName)
}

func (d *DataManagerMemory) UpdatePub(serverID string, streamNameList []string) {
// 3. server超时,去掉所有上面所有的pub

d.mutex.Lock()
defer d.mutex.Unlock()

d.markAlive(serverID)

// 更新serverID对应的stream列表
pss := make(map[string]struct{})
for _, s := range streamNameList {
pss[s] = struct{}{}
}
cpss := d.serverID2pubStreams[serverID]
d.serverID2pubStreams[serverID] = pss

// only for log
for s := range pss {
if _, exist := cpss[s]; !exist {
nazalog.Warnf("update pub, add. serverID=%s, streamName=%s", serverID, s)
}
}
for s := range cpss {
if _, exist := pss[s]; !exist {
nazalog.Warnf("update pub, del. serverID=%s, streamName=%s", serverID, s)
}
}
}

func (d *DataManagerMemory) queryPub(streamName string) (string, bool) {
for serverID, pss := range d.serverID2pubStreams {
if _, exist := pss[streamName]; exist {
return serverID, true
}
}
return "", false
}

func (d *DataManagerMemory) markAlive(serverID string) {
d.serverID2AliveTS[serverID] = time.Now().Unix()
}
12 changes: 10 additions & 2 deletions app/demo/dispatch/datamanager/data_interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,17 @@

package datamanager

// 本demo的数据存储在内存中(只实现了DataManagerMemory),所以存在单点风险(指的是dispatch永久性发生故障,短暂故障或重启是ok的),
// 生产环境可以把数据存储在redis、mysql等数据库中(实现DataManager interface即可)。

type DataManger interface {
AddPub(streamName, serverID string)
DelPub(streamName, serverID string)
QueryPub(streamName string) (serverID string, exist bool)

// 1. 全量校正。比如自身服务重启了,lal节点重启了,或其他原因Add、Del消息丢失了
// 2. 心跳保活
UpdatePub(serverID string, streamNameList []string)
}

type DataManagerType int
Expand All @@ -20,10 +27,11 @@ const (
DMTMemory DataManagerType = iota
)

func NewDataManager(t DataManagerType) DataManger {
// @param serverTimeoutSec 超过该时间间隔没有Update,则清空对应节点的所有信息
func NewDataManager(t DataManagerType, serverTimeoutSec int) DataManger {
switch t {
case DMTMemory:
return NewDataManagerMemory()
return NewDataManagerMemory(serverTimeoutSec)
default:
panic("invalid data manager type")
}
Expand Down
Loading

0 comments on commit 34bd5cf

Please sign in to comment.