diff --git a/internal/msgtransfer/init.go b/internal/msgtransfer/init.go index 7dc2ebeea0..f11cfde1af 100644 --- a/internal/msgtransfer/init.go +++ b/internal/msgtransfer/init.go @@ -128,6 +128,7 @@ func (m *MsgTransfer) Start(index int, config *Config) error { go m.historyCH.historyConsumerGroup.RegisterHandleAndConsumer(m.ctx, m.historyCH) go m.historyMongoCH.historyConsumerGroup.RegisterHandleAndConsumer(m.ctx, m.historyMongoCH) + go m.historyCH.HandleUserHasReadSeqMessages(m.ctx) err := m.historyCH.redisMessageBatches.Start() if err != nil { return err @@ -157,12 +158,14 @@ func (m *MsgTransfer) Start(index int, config *Config) error { // graceful close kafka client. m.cancel() m.historyCH.redisMessageBatches.Close() + m.historyCH.Close() m.historyCH.historyConsumerGroup.Close() m.historyMongoCH.historyConsumerGroup.Close() return nil case <-netDone: m.cancel() m.historyCH.redisMessageBatches.Close() + m.historyCH.Close() m.historyCH.historyConsumerGroup.Close() m.historyMongoCH.historyConsumerGroup.Close() close(netDone) diff --git a/internal/msgtransfer/online_history_msg_handler.go b/internal/msgtransfer/online_history_msg_handler.go index b0078649cb..84453c8df4 100644 --- a/internal/msgtransfer/online_history_msg_handler.go +++ b/internal/msgtransfer/online_history_msg_handler.go @@ -18,8 +18,10 @@ import ( "context" "encoding/json" "errors" + "github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics" "strconv" "strings" + "sync" "time" "github.com/IBM/sarama" @@ -40,11 +42,12 @@ import ( ) const ( - size = 500 - mainDataBuffer = 500 - subChanBuffer = 50 - worker = 50 - interval = 100 * time.Millisecond + size = 500 + mainDataBuffer = 500 + subChanBuffer = 50 + worker = 50 + interval = 100 * time.Millisecond + hasReadChanBuffer = 1000 ) type ContextMsg struct { @@ -52,14 +55,23 @@ type ContextMsg struct { ctx context.Context } +// This structure is used for asynchronously writing the sender’s read sequence (seq) regarding a message into MongoDB. +// For example, if the sender sends a message with a seq of 10, then their own read seq for this conversation should be set to 10. +type userHasReadSeq struct { + conversationID string + userHasReadMap map[string]int64 +} + type OnlineHistoryRedisConsumerHandler struct { historyConsumerGroup *kafka.MConsumerGroup redisMessageBatches *batcher.Batcher[sarama.ConsumerMessage] - msgTransferDatabase controller.MsgTransferDatabase - conversationRpcClient *rpcclient.ConversationRpcClient - groupRpcClient *rpcclient.GroupRpcClient + msgTransferDatabase controller.MsgTransferDatabase + conversationRpcClient *rpcclient.ConversationRpcClient + groupRpcClient *rpcclient.GroupRpcClient + conversationUserHasReadChan chan *userHasReadSeq + wg sync.WaitGroup } func NewOnlineHistoryRedisConsumerHandler(kafkaConf *config.Kafka, database controller.MsgTransferDatabase, @@ -70,6 +82,8 @@ func NewOnlineHistoryRedisConsumerHandler(kafkaConf *config.Kafka, database cont } var och OnlineHistoryRedisConsumerHandler och.msgTransferDatabase = database + och.conversationUserHasReadChan = make(chan *userHasReadSeq, hasReadChanBuffer) + och.wg.Add(1) b := batcher.New[sarama.ConsumerMessage]( batcher.WithSize(size), @@ -115,25 +129,25 @@ func (och *OnlineHistoryRedisConsumerHandler) do(ctx context.Context, channelID } func (och *OnlineHistoryRedisConsumerHandler) doSetReadSeq(ctx context.Context, msgs []*ContextMsg) { - type seqKey struct { - conversationID string - userID string - } - var readSeq map[seqKey]int64 + + var conversationID string + var userSeqMap map[string]int64 for _, msg := range msgs { if msg.message.ContentType != constant.HasReadReceipt { continue } var elem sdkws.NotificationElem if err := json.Unmarshal(msg.message.Content, &elem); err != nil { - log.ZError(ctx, "handlerConversationRead Unmarshal NotificationElem msg err", err, "msg", msg) + log.ZWarn(ctx, "handlerConversationRead Unmarshal NotificationElem msg err", err, "msg", msg) continue } var tips sdkws.MarkAsReadTips if err := json.Unmarshal([]byte(elem.Detail), &tips); err != nil { - log.ZError(ctx, "handlerConversationRead Unmarshal MarkAsReadTips msg err", err, "msg", msg) + log.ZWarn(ctx, "handlerConversationRead Unmarshal MarkAsReadTips msg err", err, "msg", msg) continue } + //The conversation ID for each batch of messages processed by the batcher is the same. + conversationID = tips.ConversationID if len(tips.Seqs) > 0 { for _, seq := range tips.Seqs { if tips.HasReadSeq < seq { @@ -146,26 +160,25 @@ func (och *OnlineHistoryRedisConsumerHandler) doSetReadSeq(ctx context.Context, if tips.HasReadSeq < 0 { continue } - if readSeq == nil { - readSeq = make(map[seqKey]int64) - } - key := seqKey{ - conversationID: tips.ConversationID, - userID: tips.MarkAsReadUserID, + if userSeqMap == nil { + userSeqMap = make(map[string]int64) } - if readSeq[key] > tips.HasReadSeq { + + if userSeqMap[tips.MarkAsReadUserID] > tips.HasReadSeq { continue } - readSeq[key] = tips.HasReadSeq + userSeqMap[tips.MarkAsReadUserID] = tips.HasReadSeq } - if readSeq == nil { + if userSeqMap == nil { return } - for key, seq := range readSeq { - if err := och.msgTransferDatabase.SetHasReadSeqToDB(ctx, key.userID, key.conversationID, seq); err != nil { - log.ZError(ctx, "set read seq to db error", err, "userID", key.userID, "conversationID", key.conversationID, "seq", seq) - } + if len(conversationID) == 0 { + log.ZWarn(ctx, "conversation err", nil, "conversationID", conversationID) } + if err := och.msgTransferDatabase.SetHasReadSeqToDB(ctx, conversationID, userSeqMap); err != nil { + log.ZWarn(ctx, "set read seq to db error", err, "conversationID", conversationID, "userSeqMap", userSeqMap) + } + } func (och *OnlineHistoryRedisConsumerHandler) parseConsumerMessages(ctx context.Context, consumerMessages []*sarama.ConsumerMessage) []*ContextMsg { @@ -250,12 +263,21 @@ func (och *OnlineHistoryRedisConsumerHandler) handleMsg(ctx context.Context, key } if len(storageMessageList) > 0 { msg := storageMessageList[0] - lastSeq, isNewConversation, err := och.msgTransferDatabase.BatchInsertChat2Cache(ctx, conversationID, storageMessageList) + lastSeq, isNewConversation, userSeqMap, err := och.msgTransferDatabase.BatchInsertChat2Cache(ctx, conversationID, storageMessageList) if err != nil && !errors.Is(errs.Unwrap(err), redis.Nil) { - log.ZError(ctx, "batch data insert to redis err", err, "storageMsgList", storageMessageList) + log.ZWarn(ctx, "batch data insert to redis err", err, "storageMsgList", storageMessageList) return } log.ZInfo(ctx, "BatchInsertChat2Cache end") + err = och.msgTransferDatabase.SetHasReadSeqs(ctx, conversationID, userSeqMap) + if err != nil { + log.ZWarn(ctx, "SetHasReadSeqs error", err, "userSeqMap", userSeqMap, "conversationID", conversationID) + prommetrics.SeqSetFailedCounter.Inc() + } + och.conversationUserHasReadChan <- &userHasReadSeq{ + conversationID: conversationID, + userHasReadMap: userSeqMap, + } if isNewConversation { switch msg.SessionType { @@ -308,7 +330,7 @@ func (och *OnlineHistoryRedisConsumerHandler) handleNotification(ctx context.Con storageMessageList = append(storageMessageList, msg.message) } if len(storageMessageList) > 0 { - lastSeq, _, err := och.msgTransferDatabase.BatchInsertChat2Cache(ctx, conversationID, storageMessageList) + lastSeq, _, _, err := och.msgTransferDatabase.BatchInsertChat2Cache(ctx, conversationID, storageMessageList) if err != nil { log.ZError(ctx, "notification batch insert to redis error", err, "conversationID", conversationID, "storageList", storageMessageList) @@ -323,6 +345,21 @@ func (och *OnlineHistoryRedisConsumerHandler) handleNotification(ctx context.Con och.toPushTopic(ctx, key, conversationID, storageList) } } +func (och *OnlineHistoryRedisConsumerHandler) HandleUserHasReadSeqMessages(ctx context.Context) { + defer och.wg.Done() + + for msg := range och.conversationUserHasReadChan { + if err := och.msgTransferDatabase.SetHasReadSeqToDB(ctx, msg.conversationID, msg.userHasReadMap); err != nil { + log.ZWarn(ctx, "set read seq to db error", err, "conversationID", msg.conversationID, "userSeqMap", msg.userHasReadMap) + } + } + + log.ZInfo(ctx, "Channel closed, exiting handleUserHasReadSeqMessages") +} +func (och *OnlineHistoryRedisConsumerHandler) Close() { + close(och.conversationUserHasReadChan) + och.wg.Wait() +} func (och *OnlineHistoryRedisConsumerHandler) toPushTopic(ctx context.Context, key, conversationID string, msgs []*ContextMsg) { for _, v := range msgs { diff --git a/pkg/common/storage/controller/msg_transfer.go b/pkg/common/storage/controller/msg_transfer.go index c6013dbc12..1ecd786aa3 100644 --- a/pkg/common/storage/controller/msg_transfer.go +++ b/pkg/common/storage/controller/msg_transfer.go @@ -24,8 +24,11 @@ type MsgTransferDatabase interface { DeleteMessagesFromCache(ctx context.Context, conversationID string, seqs []int64) error // BatchInsertChat2Cache increments the sequence number and then batch inserts messages into the cache. - BatchInsertChat2Cache(ctx context.Context, conversationID string, msgs []*sdkws.MsgData) (seq int64, isNewConversation bool, err error) - SetHasReadSeqToDB(ctx context.Context, userID string, conversationID string, hasReadSeq int64) error + BatchInsertChat2Cache(ctx context.Context, conversationID string, msgs []*sdkws.MsgData) (seq int64, isNewConversation bool, userHasReadMap map[string]int64, err error) + + SetHasReadSeqs(ctx context.Context, conversationID string, userSeqMap map[string]int64) error + + SetHasReadSeqToDB(ctx context.Context, conversationID string, userSeqMap map[string]int64) error // to mq MsgToPushMQ(ctx context.Context, key, conversationID string, msg2mq *sdkws.MsgData) (int32, int64, error) @@ -219,18 +222,18 @@ func (db *msgTransferDatabase) DeleteMessagesFromCache(ctx context.Context, conv return db.msg.DeleteMessagesFromCache(ctx, conversationID, seqs) } -func (db *msgTransferDatabase) BatchInsertChat2Cache(ctx context.Context, conversationID string, msgs []*sdkws.MsgData) (seq int64, isNew bool, err error) { +func (db *msgTransferDatabase) BatchInsertChat2Cache(ctx context.Context, conversationID string, msgs []*sdkws.MsgData) (seq int64, isNew bool, userHasReadMap map[string]int64, err error) { lenList := len(msgs) if int64(lenList) > db.msgTable.GetSingleGocMsgNum() { - return 0, false, errs.New("message count exceeds limit", "limit", db.msgTable.GetSingleGocMsgNum()).Wrap() + return 0, false, nil, errs.New("message count exceeds limit", "limit", db.msgTable.GetSingleGocMsgNum()).Wrap() } if lenList < 1 { - return 0, false, errs.New("no messages to insert", "minCount", 1).Wrap() + return 0, false, nil, errs.New("no messages to insert", "minCount", 1).Wrap() } currentMaxSeq, err := db.seqConversation.Malloc(ctx, conversationID, int64(len(msgs))) if err != nil { log.ZError(ctx, "storage.seq.Malloc", err) - return 0, false, err + return 0, false, nil, err } isNew = currentMaxSeq == 0 lastMaxSeq := currentMaxSeq @@ -248,25 +251,25 @@ func (db *msgTransferDatabase) BatchInsertChat2Cache(ctx context.Context, conver } else { prommetrics.MsgInsertRedisSuccessCounter.Inc() } - err = db.setHasReadSeqs(ctx, conversationID, userSeqMap) - if err != nil { - log.ZError(ctx, "SetHasReadSeqs error", err, "userSeqMap", userSeqMap, "conversationID", conversationID) - prommetrics.SeqSetFailedCounter.Inc() - } - return lastMaxSeq, isNew, errs.Wrap(err) + return lastMaxSeq, isNew, userSeqMap, errs.Wrap(err) } -func (db *msgTransferDatabase) setHasReadSeqs(ctx context.Context, conversationID string, userSeqMap map[string]int64) error { +func (db *msgTransferDatabase) SetHasReadSeqs(ctx context.Context, conversationID string, userSeqMap map[string]int64) error { for userID, seq := range userSeqMap { - if err := db.seqUser.SetUserReadSeqToDB(ctx, conversationID, userID, seq); err != nil { + if err := db.seqUser.SetUserReadSeq(ctx, conversationID, userID, seq); err != nil { return err } } return nil } -func (db *msgTransferDatabase) SetHasReadSeqToDB(ctx context.Context, userID string, conversationID string, hasReadSeq int64) error { - return db.seqUser.SetUserReadSeqToDB(ctx, conversationID, userID, hasReadSeq) +func (db *msgTransferDatabase) SetHasReadSeqToDB(ctx context.Context, conversationID string, userSeqMap map[string]int64) error { + for userID, seq := range userSeqMap { + if err := db.seqUser.SetUserReadSeqToDB(ctx, conversationID, userID, seq); err != nil { + return err + } + } + return nil } func (db *msgTransferDatabase) MsgToPushMQ(ctx context.Context, key, conversationID string, msg2mq *sdkws.MsgData) (int32, int64, error) {