From 27024dfb032ab9015b2779c26a7a70d8239eb859 Mon Sep 17 00:00:00 2001 From: Monet Lee Date: Wed, 14 Aug 2024 18:20:42 +0800 Subject: [PATCH] fix: solve uncorrect outdated msg get. (#2513) * refactor: refactor workflows contents. * add tool workflows. * update field. * fix: remove chat error. * Fix err. * fix error. * remove cn comment. * update workflows files. * update infra config. * move workflows. * feat: update bot. * fix: solve uncorrect outdated msg get. * update get docIDs logic. * update * update skip logic. * fix * update. --- .github/workflows/help-comment-issue.yml | 2 +- internal/rpc/conversation/conversaion.go | 4 +-- internal/rpc/msg/clear.go | 27 +++++++------- internal/tools/cron_task.go | 1 + pkg/common/storage/controller/msg.go | 36 ++++++++++++++++--- pkg/common/storage/database/mgo/msg.go | 46 +++++++++++++++++++++++- pkg/common/storage/database/msg.go | 7 ++-- 7 files changed, 99 insertions(+), 24 deletions(-) diff --git a/.github/workflows/help-comment-issue.yml b/.github/workflows/help-comment-issue.yml index c4e72ffc67..b1cc621828 100644 --- a/.github/workflows/help-comment-issue.yml +++ b/.github/workflows/help-comment-issue.yml @@ -29,7 +29,7 @@ jobs: uses: peter-evans/create-or-update-comment@v4 with: issue-number: ${{ github.event.issue.number }} - token: ${{ secrets.BOT_GITHUB_TOKEN }} + token: ${{ secrets.BOT_TOKEN }} body: | This issue is available for anyone to work on. **Make sure to reference this issue in your pull request.** :sparkles: Thank you for your contribution! :sparkles: [Join slack 🤖](https://join.slack.com/t/openimsdk/shared_invite/zt-22720d66b-o_FvKxMTGXtcnnnHiMqe9Q) to connect and communicate with our developers. diff --git a/internal/rpc/conversation/conversaion.go b/internal/rpc/conversation/conversaion.go index 4cf20f919c..117ed8ca71 100644 --- a/internal/rpc/conversation/conversaion.go +++ b/internal/rpc/conversation/conversaion.go @@ -634,11 +634,11 @@ func (c *conversationServer) GetConversationsNeedDestructMsgs(ctx context.Contex conversationIDs, err := c.conversationDatabase.PageConversationIDs(ctx, pagination) if err != nil { - log.ZError(ctx, "PageConversationIDs failed", err, "pageNumber", pageNumber) + // log.ZError(ctx, "PageConversationIDs failed", err, "pageNumber", pageNumber) continue } - log.ZDebug(ctx, "PageConversationIDs success", "pageNumber", pageNumber, "conversationIDsNum", len(conversationIDs), "conversationIDs", conversationIDs) + // log.ZDebug(ctx, "PageConversationIDs success", "pageNumber", pageNumber, "conversationIDsNum", len(conversationIDs), "conversationIDs", conversationIDs) if len(conversationIDs) == 0 { continue } diff --git a/internal/rpc/msg/clear.go b/internal/rpc/msg/clear.go index 6be551eada..4ffa1f43ee 100644 --- a/internal/rpc/msg/clear.go +++ b/internal/rpc/msg/clear.go @@ -30,8 +30,14 @@ func (m *msgServer) ClearMsg(ctx context.Context, req *msg.ClearMsgReq) (_ *msg. msgNum int start = time.Now() ) + clearMsg := func(ctx context.Context) (bool, error) { - msgs, err := m.MsgDatabase.GetBeforeMsg(ctx, req.Timestamp, 100) + docIDs, err := m.MsgDatabase.GetDocIDs(ctx) + if err != nil { + return false, err + } + + msgs, err := m.MsgDatabase.GetBeforeMsg(ctx, req.Timestamp, docIDs, 5000) if err != nil { return false, err } @@ -55,19 +61,14 @@ func (m *msgServer) ClearMsg(ctx context.Context, req *msg.ClearMsgReq) (_ *msg. return true, nil } - for { - keep, err := clearMsg(ctx) - if err != nil { - log.ZError(ctx, "clear msg failed", err, "docNum", docNum, "msgNum", msgNum, "cost", time.Since(start)) - return nil, err - } - if !keep { - log.ZInfo(ctx, "clear msg success", "docNum", docNum, "msgNum", msgNum, "cost", time.Since(start)) - break - } - - log.ZInfo(ctx, "clearing message", "docNum", docNum, "msgNum", msgNum, "cost", time.Since(start)) + _, err = clearMsg(ctx) + if err != nil { + log.ZError(ctx, "clear msg failed", err, "docNum", docNum, "msgNum", msgNum, "cost", time.Since(start)) + return nil, err } + + log.ZInfo(ctx, "clearing message", "docNum", docNum, "msgNum", msgNum, "cost", time.Since(start)) + return &msg.ClearMsgResp{}, nil } diff --git a/internal/tools/cron_task.go b/internal/tools/cron_task.go index b1d59800ce..afbaf34b43 100644 --- a/internal/tools/cron_task.go +++ b/internal/tools/cron_task.go @@ -81,6 +81,7 @@ func Start(ctx context.Context, config *CronTaskConfig) error { deltime := now.Add(-time.Hour * 24 * time.Duration(config.CronTask.RetainChatRecords)) ctx := mcontext.SetOperationID(ctx, fmt.Sprintf("cron_%d_%d", os.Getpid(), deltime.UnixMilli())) log.ZInfo(ctx, "clear chat records", "deltime", deltime, "timestamp", deltime.UnixMilli()) + if _, err := msgClient.ClearMsg(ctx, &msg.ClearMsgReq{Timestamp: deltime.UnixMilli()}); err != nil { log.ZError(ctx, "cron clear chat records failed", err, "deltime", deltime, "cont", time.Since(now)) return diff --git a/pkg/common/storage/controller/msg.go b/pkg/common/storage/controller/msg.go index 49268e0493..caa491a741 100644 --- a/pkg/common/storage/controller/msg.go +++ b/pkg/common/storage/controller/msg.go @@ -18,11 +18,12 @@ import ( "context" "encoding/json" "errors" - "github.com/openimsdk/open-im-server/v3/pkg/common/storage/database" - "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" "strings" "time" + "github.com/openimsdk/open-im-server/v3/pkg/common/storage/database" + "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" + "github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/open-im-server/v3/pkg/common/convert" "github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics" @@ -97,8 +98,10 @@ type CommonMsgDatabase interface { ConvertMsgsDocLen(ctx context.Context, conversationIDs []string) // clear msg - GetBeforeMsg(ctx context.Context, ts int64, limit int) ([]*model.MsgDocModel, error) + GetBeforeMsg(ctx context.Context, ts int64, docIds []string, limit int) ([]*model.MsgDocModel, error) DeleteDocMsgBefore(ctx context.Context, ts int64, doc *model.MsgDocModel) ([]int, error) + + GetDocIDs(ctx context.Context) ([]string, error) } func NewCommonMsgDatabase(msgDocModel database.Msg, msg cache.MsgCache, seqUser cache.SeqUser, seqConversation cache.SeqConversationCache, kafkaConf *config.Kafka) (CommonMsgDatabase, error) { @@ -912,8 +915,25 @@ func (db *commonMsgDatabase) ConvertMsgsDocLen(ctx context.Context, conversation db.msgDocDatabase.ConvertMsgsDocLen(ctx, conversationIDs) } -func (db *commonMsgDatabase) GetBeforeMsg(ctx context.Context, ts int64, limit int) ([]*model.MsgDocModel, error) { - return db.msgDocDatabase.GetBeforeMsg(ctx, ts, limit) +func (db *commonMsgDatabase) GetBeforeMsg(ctx context.Context, ts int64, docIDs []string, limit int) ([]*model.MsgDocModel, error) { + var msgs []*model.MsgDocModel + for i := 0; i < len(docIDs); i += 1000 { + end := i + 1000 + if end > len(docIDs) { + end = len(docIDs) + } + + res, err := db.msgDocDatabase.GetBeforeMsg(ctx, ts, docIDs[i:end], limit) + if err != nil { + return nil, err + } + msgs = append(msgs, res...) + + if len(msgs) >= limit { + return msgs[:limit], nil + } + } + return msgs, nil } func (db *commonMsgDatabase) DeleteDocMsgBefore(ctx context.Context, ts int64, doc *model.MsgDocModel) ([]int, error) { @@ -936,8 +956,10 @@ func (db *commonMsgDatabase) DeleteDocMsgBefore(ctx context.Context, ts int64, d return index, err } if len(index) == notNull { + log.ZDebug(ctx, "Delete db in Doc", "DocID", doc.DocID, "index", index, "maxSeq", maxSeq) return index, db.msgDocDatabase.DeleteDoc(ctx, doc.DocID) } else { + log.ZDebug(ctx, "delete db in index", "DocID", doc.DocID, "index", index, "maxSeq", maxSeq) return index, db.msgDocDatabase.DeleteMsgByIndex(ctx, doc.DocID, index) } } @@ -955,3 +977,7 @@ func (db *commonMsgDatabase) setMinSeq(ctx context.Context, conversationID strin } return db.seqConversation.SetMinSeq(ctx, conversationID, seq) } + +func (db *commonMsgDatabase) GetDocIDs(ctx context.Context) ([]string, error) { + return db.msgDocDatabase.GetDocIDs(ctx) +} diff --git a/pkg/common/storage/database/mgo/msg.go b/pkg/common/storage/database/mgo/msg.go index 7dc308a7c4..7b45cdf51e 100644 --- a/pkg/common/storage/database/mgo/msg.go +++ b/pkg/common/storage/database/mgo/msg.go @@ -8,6 +8,7 @@ import ( "github.com/openimsdk/open-im-server/v3/pkg/common/storage/database" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" "github.com/openimsdk/tools/utils/datautil" + "golang.org/x/exp/rand" "github.com/openimsdk/protocol/constant" "github.com/openimsdk/protocol/msg" @@ -1226,10 +1227,53 @@ func (m *MsgMgo) ConvertMsgsDocLen(ctx context.Context, conversationIDs []string } } -func (m *MsgMgo) GetBeforeMsg(ctx context.Context, ts int64, limit int) ([]*model.MsgDocModel, error) { +func (m *MsgMgo) GetDocIDs(ctx context.Context) ([]string, error) { + limit := 5000 + var skip int + var docIDs []string + var offset int + + count, err := m.coll.CountDocuments(ctx, bson.M{}) + if err != nil { + return nil, err + } + + if count < int64(limit) { + skip = 0 + } else { + rand.Seed(uint64(time.Now().UnixMilli())) + skip = rand.Intn(int(count / int64(limit))) + offset = skip * limit + } + log.ZDebug(ctx, "offset", "skip", skip, "offset", offset) + res, err := mongoutil.Aggregate[*model.MsgDocModel](ctx, m.coll, []bson.M{ + { + "$project": bson.M{ + "doc_id": 1, + }, + }, + { + "$skip": offset, + }, + { + "$limit": limit, + }, + }) + + for _, doc := range res { + docIDs = append(docIDs, doc.DocID) + } + + return docIDs, errs.Wrap(err) +} + +func (m *MsgMgo) GetBeforeMsg(ctx context.Context, ts int64, docIDs []string, limit int) ([]*model.MsgDocModel, error) { return mongoutil.Aggregate[*model.MsgDocModel](ctx, m.coll, []bson.M{ { "$match": bson.M{ + "doc_id": bson.M{ + "$in": docIDs, + }, "msgs.msg.send_time": bson.M{ "$lt": ts, }, diff --git a/pkg/common/storage/database/msg.go b/pkg/common/storage/database/msg.go index 84f3a9e3e2..23a99f5b96 100644 --- a/pkg/common/storage/database/msg.go +++ b/pkg/common/storage/database/msg.go @@ -16,10 +16,11 @@ package database import ( "context" + "time" + "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" "github.com/openimsdk/protocol/msg" "go.mongodb.org/mongo-driver/mongo" - "time" ) type Msg interface { @@ -44,5 +45,7 @@ type Msg interface { DeleteDoc(ctx context.Context, docID string) error DeleteMsgByIndex(ctx context.Context, docID string, index []int) error - GetBeforeMsg(ctx context.Context, ts int64, limit int) ([]*model.MsgDocModel, error) + GetBeforeMsg(ctx context.Context, ts int64, docIDs []string, limit int) ([]*model.MsgDocModel, error) + + GetDocIDs(ctx context.Context) ([]string, error) }