Skip to content

Commit

Permalink
feat: update Handler logic.
Browse files Browse the repository at this point in the history
  • Loading branch information
mo3et committed Sep 10, 2024
1 parent 01b8a87 commit 15e5836
Show file tree
Hide file tree
Showing 4 changed files with 130 additions and 21 deletions.
115 changes: 115 additions & 0 deletions internal/push/offlinepush_handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
package push

import (
"context"

"github.com/IBM/sarama"
"github.com/openimsdk/open-im-server/v3/internal/push/offlinepush"
"github.com/openimsdk/open-im-server/v3/internal/push/offlinepush/options"
"github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics"
"github.com/openimsdk/protocol/constant"
pbpush "github.com/openimsdk/protocol/push"
"github.com/openimsdk/protocol/sdkws"
"github.com/openimsdk/tools/log"
"github.com/openimsdk/tools/mq/kafka"
"github.com/openimsdk/tools/utils/jsonutil"
"google.golang.org/protobuf/proto"
)

type OfflinePushConsumerHandler struct {
OfflinePushConsumerGroup *kafka.MConsumerGroup
offlinePusher offlinepush.OfflinePusher
}

func NewOfflinePushConsumerHandler(config *Config) (*OfflinePushConsumerHandler, error) {
var offlinePushConsumerHandler OfflinePushConsumerHandler
var err error
offlinePushConsumerHandler.OfflinePushConsumerGroup, err = kafka.NewMConsumerGroup(config.KafkaConfig.Build(), config.KafkaConfig.ToOfflineGroupID,
[]string{config.KafkaConfig.ToOfflinePushTopic}, true)
if err != nil {
return nil, err
}
return &offlinePushConsumerHandler, nil
}

func (*OfflinePushConsumerHandler) Setup(sarama.ConsumerGroupSession) error { return nil }
func (*OfflinePushConsumerHandler) Cleanup(sarama.ConsumerGroupSession) error { return nil }
func (o *OfflinePushConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
for msg := range claim.Messages() {
ctx := o.OfflinePushConsumerGroup.GetContextFromMsg(msg)
o.handleMsg2OfflinePush(ctx, msg.Value)
sess.MarkMessage(msg, "")
}
return nil
}

func (o *OfflinePushConsumerHandler) handleMsg2OfflinePush(ctx context.Context, msg []byte) {
offlinePushMsg := pbpush.PushMsgReq{}
if err := proto.Unmarshal(msg, &offlinePushMsg); err != nil {
log.ZError(ctx, "offline push Unmarshal msg err", err, "msg", string(msg))
return
}
err := o.offlinePushMsg(ctx, offlinePushMsg.MsgData, offlinePushMsg.UserIDs)
if err != nil {
log.ZWarn(ctx, "offline push failed", err, "msg", offlinePushMsg.String())
}

}

func (c *OfflinePushConsumerHandler) getOfflinePushInfos(msg *sdkws.MsgData) (title, content string, opts *options.Opts, err error) {
type AtTextElem struct {
Text string `json:"text,omitempty"`
AtUserList []string `json:"atUserList,omitempty"`
IsAtSelf bool `json:"isAtSelf"`
}

opts = &options.Opts{Signal: &options.Signal{}}
if msg.OfflinePushInfo != nil {
opts.IOSBadgeCount = msg.OfflinePushInfo.IOSBadgeCount
opts.IOSPushSound = msg.OfflinePushInfo.IOSPushSound
opts.Ex = msg.OfflinePushInfo.Ex
}

if msg.OfflinePushInfo != nil {
title = msg.OfflinePushInfo.Title
content = msg.OfflinePushInfo.Desc
}
if title == "" {
switch msg.ContentType {
case constant.Text:
fallthrough
case constant.Picture:
fallthrough
case constant.Voice:
fallthrough
case constant.Video:
fallthrough
case constant.File:
title = constant.ContentType2PushContent[int64(msg.ContentType)]
case constant.AtText:
ac := AtTextElem{}
_ = jsonutil.JsonStringToStruct(string(msg.Content), &ac)
case constant.SignalingNotification:
title = constant.ContentType2PushContent[constant.SignalMsg]
default:
title = constant.ContentType2PushContent[constant.Common]
}
}
if content == "" {
content = title
}
return
}

func (c *OfflinePushConsumerHandler) offlinePushMsg(ctx context.Context, msg *sdkws.MsgData, offlinePushUserIDs []string) error {
title, content, opts, err := c.getOfflinePushInfos(msg)
if err != nil {
return err
}
err = c.offlinePusher.Push(ctx, offlinePushUserIDs, title, content, opts)
if err != nil {
prommetrics.MsgOfflinePushFailedCounter.Inc()
return err
}
return nil
}
13 changes: 13 additions & 0 deletions internal/push/push.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ type pushServer struct {
disCov discovery.SvcDiscoveryRegistry
offlinePusher offlinepush.OfflinePusher
pushCh *ConsumerHandler
offlinePushCh *OfflinePushConsumerHandler
}

type Config struct {
Expand Down Expand Up @@ -55,18 +56,30 @@ func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryReg
if err != nil {
return err
}

database := controller.NewPushDatabase(cacheModel, &config.KafkaConfig)

consumer, err := NewConsumerHandler(config, database, offlinePusher, rdb, client)
if err != nil {
return err
}

offlinePushConsumer, err := NewOfflinePushConsumerHandler(config)
if err != nil {
return err
}

pbpush.RegisterPushMsgServiceServer(server, &pushServer{
database: database,
disCov: client,
offlinePusher: offlinePusher,
pushCh: consumer,
offlinePushCh: offlinePushConsumer,
})

go consumer.pushConsumerGroup.RegisterHandleAndConsumer(ctx, consumer)

go offlinePushConsumer.OfflinePushConsumerGroup.RegisterHandleAndConsumer(ctx, offlinePushConsumer)

return nil
}
22 changes: 2 additions & 20 deletions internal/push/push_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func NewConsumerHandler(config *Config, database controller.PushDatabase, offlin
var consumerHandler ConsumerHandler
var err error
consumerHandler.pushConsumerGroup, err = kafka.NewMConsumerGroup(config.KafkaConfig.Build(), config.KafkaConfig.ToPushGroupID,
[]string{config.KafkaConfig.ToPushTopic, config.KafkaConfig.ToOfflinePushTopic}, true)
[]string{config.KafkaConfig.ToPushTopic}, true)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -103,32 +103,14 @@ func (c *ConsumerHandler) handleMs2PsChat(ctx context.Context, msg []byte) {
}
}

func (c *ConsumerHandler) handleMsg2OfflinePush(ctx context.Context, msg []byte) {
offlinePushMsg := pbpush.PushMsgReq{}
if err := proto.Unmarshal(msg, &offlinePushMsg); err != nil {
log.ZError(ctx, "offline push Unmarshal msg err", err, "msg", string(msg))
return
}
err := c.offlinePushMsg(ctx, offlinePushMsg.MsgData, offlinePushMsg.UserIDs)
if err != nil {
log.ZWarn(ctx, "offline push failed", err, "msg", offlinePushMsg.String())
}

}

func (*ConsumerHandler) Setup(sarama.ConsumerGroupSession) error { return nil }

func (*ConsumerHandler) Cleanup(sarama.ConsumerGroupSession) error { return nil }

func (c *ConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
for msg := range claim.Messages() {
ctx := c.pushConsumerGroup.GetContextFromMsg(msg)
switch msg.Topic {
case c.config.KafkaConfig.ToPushTopic:
c.handleMs2PsChat(ctx, msg.Value)
case c.config.KafkaConfig.ToOfflinePushTopic:
c.handleMsg2OfflinePush(ctx, msg.Value)
}
c.handleMs2PsChat(ctx, msg.Value)
sess.MarkMessage(msg, "")
}
return nil
Expand Down
1 change: 0 additions & 1 deletion pkg/common/cmd/push.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ func NewPushRpcCmd() *PushRpcCmd {
ret.configMap = map[string]any{
OpenIMPushCfgFileName: &pushConfig.RpcConfig,
RedisConfigFileName: &pushConfig.RedisConfig,
MongodbConfigFileName: &pushConfig.MongodbConfig,
KafkaConfigFileName: &pushConfig.KafkaConfig,
ShareFileName: &pushConfig.Share,
NotificationFileName: &pushConfig.NotificationConfig,
Expand Down

0 comments on commit 15e5836

Please sign in to comment.