From e9a69731af6be53f04e6a8314443fac170ebbc95 Mon Sep 17 00:00:00 2001 From: withchao <993506633@qq.com> Date: Tue, 24 Oct 2023 18:37:22 +0800 Subject: [PATCH] fix: not support redis cluster. CROSSSLOT Keys in request don't hash to the same slot --- pkg/common/db/cache/black.go | 21 +- pkg/common/db/cache/conversation.go | 282 ++++++++------------ pkg/common/db/cache/friend.go | 79 +++--- pkg/common/db/cache/group.go | 309 +++++++++------------- pkg/common/db/cache/meta_cache.go | 231 ++++++++-------- pkg/common/db/cache/msg.go | 397 ++++++++++++++++------------ pkg/common/db/cache/user.go | 184 +++++++------ 7 files changed, 721 insertions(+), 782 deletions(-) diff --git a/pkg/common/db/cache/black.go b/pkg/common/db/cache/black.go index 6da7d5d05b..5a70097ed8 100644 --- a/pkg/common/db/cache/black.go +++ b/pkg/common/db/cache/black.go @@ -52,6 +52,7 @@ func NewBlackCacheRedis( options rockscache.Options, ) BlackCache { rcClient := rockscache.NewClient(rdb, options) + return &BlackCacheRedis{ expireTime: blackExpireTime, rcClient: rcClient, @@ -61,12 +62,7 @@ func NewBlackCacheRedis( } func (b *BlackCacheRedis) NewCache() BlackCache { - return &BlackCacheRedis{ - expireTime: b.expireTime, - rcClient: b.rcClient, - blackDB: b.blackDB, - metaCache: NewMetaCacheRedis(b.rcClient, b.metaCache.GetPreDelKeys()...), - } + return &BlackCacheRedis{expireTime: b.expireTime, rcClient: b.rcClient, blackDB: b.blackDB, metaCache: NewMetaCacheRedis(b.rcClient, b.metaCache.GetPreDelKeys()...)} } func (b *BlackCacheRedis) getBlackIDsKey(ownerUserID string) string { @@ -74,19 +70,14 @@ func (b *BlackCacheRedis) getBlackIDsKey(ownerUserID string) string { } func (b *BlackCacheRedis) GetBlackIDs(ctx context.Context, userID string) (blackIDs []string, err error) { - return getCache( - ctx, - b.rcClient, - b.getBlackIDsKey(userID), - b.expireTime, - func(ctx context.Context) ([]string, error) { - return b.blackDB.FindBlackUserIDs(ctx, userID) - }, - ) + return getCache(ctx, b.rcClient, b.getBlackIDsKey(userID), b.expireTime, func(ctx context.Context) ([]string, error) { + return b.blackDB.FindBlackUserIDs(ctx, userID) + }) } func (b *BlackCacheRedis) DelBlackIDs(ctx context.Context, userID string) BlackCache { cache := b.NewCache() cache.AddKeys(b.getBlackIDsKey(userID)) + return cache } diff --git a/pkg/common/db/cache/conversation.go b/pkg/common/db/cache/conversation.go index d755de645f..9c0bcfae4a 100644 --- a/pkg/common/db/cache/conversation.go +++ b/pkg/common/db/cache/conversation.go @@ -73,7 +73,7 @@ type ConversationCache interface { GetSuperGroupRecvMsgNotNotifyUserIDsHash(ctx context.Context, groupID string) (hash uint64, err error) DelSuperGroupRecvMsgNotNotifyUserIDsHash(groupID string) ConversationCache - GetUserAllHasReadSeqs(ctx context.Context, ownerUserID string) (map[string]int64, error) + //GetUserAllHasReadSeqs(ctx context.Context, ownerUserID string) (map[string]int64, error) DelUserAllHasReadSeqs(ownerUserID string, conversationIDs ...string) ConversationCache GetConversationsByConversationID(ctx context.Context, @@ -83,12 +83,9 @@ type ConversationCache interface { DelConversationNotReceiveMessageUserIDs(conversationIDs ...string) ConversationCache } -func NewConversationRedis( - rdb redis.UniversalClient, - opts rockscache.Options, - db relationtb.ConversationModelInterface, -) ConversationCache { +func NewConversationRedis(rdb redis.UniversalClient, opts rockscache.Options, db relationtb.ConversationModelInterface) ConversationCache { rcClient := rockscache.NewClient(rdb, opts) + return &ConversationRedisCache{ rcClient: rcClient, metaCache: NewMetaCacheRedis(rcClient), @@ -110,6 +107,7 @@ func NewNewConversationRedis( options rockscache.Options, ) ConversationCache { rcClient := rockscache.NewClient(rdb, options) + return &ConversationRedisCache{ rcClient: rcClient, metaCache: NewMetaCacheRedis(rcClient), @@ -156,24 +154,19 @@ func (c *ConversationRedisCache) getConversationNotReceiveMessageUserIDsKey(conv } func (c *ConversationRedisCache) GetUserConversationIDs(ctx context.Context, ownerUserID string) ([]string, error) { - return getCache( - ctx, - c.rcClient, - c.getConversationIDsKey(ownerUserID), - c.expireTime, - func(ctx context.Context) ([]string, error) { - return c.conversationDB.FindUserIDAllConversationID(ctx, ownerUserID) - }, - ) + return getCache(ctx, c.rcClient, c.getConversationIDsKey(ownerUserID), c.expireTime, func(ctx context.Context) ([]string, error) { + return c.conversationDB.FindUserIDAllConversationID(ctx, ownerUserID) + }) } func (c *ConversationRedisCache) DelConversationIDs(userIDs ...string) ConversationCache { - var keys []string + keys := make([]string, 0, len(userIDs)) for _, userID := range userIDs { keys = append(keys, c.getConversationIDsKey(userID)) } cache := c.NewCache() cache.AddKeys(keys...) + return cache } @@ -181,10 +174,7 @@ func (c *ConversationRedisCache) getUserConversationIDsHashKey(ownerUserID strin return conversationIDsHashKey + ownerUserID } -func (c *ConversationRedisCache) GetUserConversationIDsHash( - ctx context.Context, - ownerUserID string, -) (hash uint64, err error) { +func (c *ConversationRedisCache) GetUserConversationIDsHash(ctx context.Context, ownerUserID string) (hash uint64, err error) { return getCache( ctx, c.rcClient, @@ -204,229 +194,180 @@ func (c *ConversationRedisCache) GetUserConversationIDsHash( } func (c *ConversationRedisCache) DelUserConversationIDsHash(ownerUserIDs ...string) ConversationCache { - var keys []string + keys := make([]string, 0, len(ownerUserIDs)) for _, ownerUserID := range ownerUserIDs { keys = append(keys, c.getUserConversationIDsHashKey(ownerUserID)) } cache := c.NewCache() cache.AddKeys(keys...) + return cache } -func (c *ConversationRedisCache) GetConversation( - ctx context.Context, - ownerUserID, conversationID string, -) (*relationtb.ConversationModel, error) { - return getCache( - ctx, - c.rcClient, - c.getConversationKey(ownerUserID, conversationID), - c.expireTime, - func(ctx context.Context) (*relationtb.ConversationModel, error) { - return c.conversationDB.Take(ctx, ownerUserID, conversationID) - }, - ) +func (c *ConversationRedisCache) GetConversation(ctx context.Context, ownerUserID, conversationID string) (*relationtb.ConversationModel, error) { + return getCache(ctx, c.rcClient, c.getConversationKey(ownerUserID, conversationID), c.expireTime, func(ctx context.Context) (*relationtb.ConversationModel, error) { + return c.conversationDB.Take(ctx, ownerUserID, conversationID) + }) } func (c *ConversationRedisCache) DelConversations(ownerUserID string, conversationIDs ...string) ConversationCache { - var keys []string + keys := make([]string, 0, len(conversationIDs)) for _, conversationID := range conversationIDs { keys = append(keys, c.getConversationKey(ownerUserID, conversationID)) } cache := c.NewCache() cache.AddKeys(keys...) + return cache } -func (c *ConversationRedisCache) getConversationIndex( - convsation *relationtb.ConversationModel, - keys []string, -) (int, error) { +func (c *ConversationRedisCache) getConversationIndex(convsation *relationtb.ConversationModel, keys []string) (int, error) { key := c.getConversationKey(convsation.OwnerUserID, convsation.ConversationID) for _i, _key := range keys { if _key == key { return _i, nil } } - return 0, errors.New("not found key:" + key + " in keys") -} -func (c *ConversationRedisCache) GetConversations( - ctx context.Context, - ownerUserID string, - conversationIDs []string, -) ([]*relationtb.ConversationModel, error) { - var keys []string - for _, conversarionID := range conversationIDs { - keys = append(keys, c.getConversationKey(ownerUserID, conversarionID)) - } - return batchGetCache( - ctx, - c.rcClient, - keys, - c.expireTime, - c.getConversationIndex, - func(ctx context.Context) ([]*relationtb.ConversationModel, error) { - return c.conversationDB.Find(ctx, ownerUserID, conversationIDs) - }, - ) + return 0, errors.New("not found key:" + key + " in keys") } -func (c *ConversationRedisCache) GetUserAllConversations( - ctx context.Context, - ownerUserID string, -) ([]*relationtb.ConversationModel, error) { +func (c *ConversationRedisCache) GetConversations(ctx context.Context, ownerUserID string, conversationIDs []string) ([]*relationtb.ConversationModel, error) { + //var keys []string + //for _, conversarionID := range conversationIDs { + // keys = append(keys, c.getConversationKey(ownerUserID, conversarionID)) + //} + //return batchGetCache( + // ctx, + // c.rcClient, + // keys, + // c.expireTime, + // c.getConversationIndex, + // func(ctx context.Context) ([]*relationtb.ConversationModel, error) { + // return c.conversationDB.Find(ctx, ownerUserID, conversationIDs) + // }, + //) + return batchGetCache2(ctx, c.rcClient, c.expireTime, conversationIDs, func(conversationID string) string { + return c.getConversationKey(ownerUserID, conversationID) + }, func(ctx context.Context, conversationID string) (*relationtb.ConversationModel, error) { + return c.conversationDB.Take(ctx, ownerUserID, conversationID) + }) +} + +func (c *ConversationRedisCache) GetUserAllConversations(ctx context.Context, ownerUserID string) ([]*relationtb.ConversationModel, error) { conversationIDs, err := c.GetUserConversationIDs(ctx, ownerUserID) if err != nil { return nil, err } - var keys []string - for _, conversarionID := range conversationIDs { - keys = append(keys, c.getConversationKey(ownerUserID, conversarionID)) - } - return batchGetCache( - ctx, - c.rcClient, - keys, - c.expireTime, - c.getConversationIndex, - func(ctx context.Context) ([]*relationtb.ConversationModel, error) { - return c.conversationDB.FindUserIDAllConversations(ctx, ownerUserID) - }, - ) -} - -func (c *ConversationRedisCache) GetUserRecvMsgOpt( - ctx context.Context, - ownerUserID, conversationID string, -) (opt int, err error) { - return getCache( - ctx, - c.rcClient, - c.getRecvMsgOptKey(ownerUserID, conversationID), - c.expireTime, - func(ctx context.Context) (opt int, err error) { - return c.conversationDB.GetUserRecvMsgOpt(ctx, ownerUserID, conversationID) - }, - ) -} - -func (c *ConversationRedisCache) GetSuperGroupRecvMsgNotNotifyUserIDs( - ctx context.Context, - groupID string, -) (userIDs []string, err error) { - return getCache( - ctx, - c.rcClient, - c.getSuperGroupRecvNotNotifyUserIDsKey(groupID), - c.expireTime, - func(ctx context.Context) (userIDs []string, err error) { - return c.conversationDB.FindSuperGroupRecvMsgNotNotifyUserIDs(ctx, groupID) - }, - ) + //var keys []string + //for _, conversarionID := range conversationIDs { + // keys = append(keys, c.getConversationKey(ownerUserID, conversarionID)) + //} + //return batchGetCache( + // ctx, + // c.rcClient, + // keys, + // c.expireTime, + // c.getConversationIndex, + // func(ctx context.Context) ([]*relationtb.ConversationModel, error) { + // return c.conversationDB.FindUserIDAllConversations(ctx, ownerUserID) + // }, + //) + return c.GetConversations(ctx, ownerUserID, conversationIDs) +} + +func (c *ConversationRedisCache) GetUserRecvMsgOpt(ctx context.Context, ownerUserID, conversationID string) (opt int, err error) { + return getCache(ctx, c.rcClient, c.getRecvMsgOptKey(ownerUserID, conversationID), c.expireTime, func(ctx context.Context) (opt int, err error) { + return c.conversationDB.GetUserRecvMsgOpt(ctx, ownerUserID, conversationID) + }) +} + +func (c *ConversationRedisCache) GetSuperGroupRecvMsgNotNotifyUserIDs(ctx context.Context, groupID string) (userIDs []string, err error) { + return getCache(ctx, c.rcClient, c.getSuperGroupRecvNotNotifyUserIDsKey(groupID), c.expireTime, func(ctx context.Context) (userIDs []string, err error) { + return c.conversationDB.FindSuperGroupRecvMsgNotNotifyUserIDs(ctx, groupID) + }) } func (c *ConversationRedisCache) DelUsersConversation(conversationID string, ownerUserIDs ...string) ConversationCache { - var keys []string + keys := make([]string, 0, len(ownerUserIDs)) for _, ownerUserID := range ownerUserIDs { keys = append(keys, c.getConversationKey(ownerUserID, conversationID)) } cache := c.NewCache() cache.AddKeys(keys...) + return cache } func (c *ConversationRedisCache) DelUserRecvMsgOpt(ownerUserID, conversationID string) ConversationCache { cache := c.NewCache() cache.AddKeys(c.getRecvMsgOptKey(ownerUserID, conversationID)) + return cache } func (c *ConversationRedisCache) DelSuperGroupRecvMsgNotNotifyUserIDs(groupID string) ConversationCache { cache := c.NewCache() cache.AddKeys(c.getSuperGroupRecvNotNotifyUserIDsKey(groupID)) + return cache } -func (c *ConversationRedisCache) GetSuperGroupRecvMsgNotNotifyUserIDsHash( - ctx context.Context, - groupID string, -) (hash uint64, err error) { - return getCache( - ctx, - c.rcClient, - c.getSuperGroupRecvNotNotifyUserIDsHashKey(groupID), - c.expireTime, - func(ctx context.Context) (hash uint64, err error) { - userIDs, err := c.GetSuperGroupRecvMsgNotNotifyUserIDs(ctx, groupID) - if err != nil { - return 0, err - } - utils.Sort(userIDs, true) - bi := big.NewInt(0) - bi.SetString(utils.Md5(strings.Join(userIDs, ";"))[0:8], 16) - return bi.Uint64(), nil - }, +func (c *ConversationRedisCache) GetSuperGroupRecvMsgNotNotifyUserIDsHash(ctx context.Context, groupID string) (hash uint64, err error) { + return getCache(ctx, c.rcClient, c.getSuperGroupRecvNotNotifyUserIDsHashKey(groupID), c.expireTime, func(ctx context.Context) (hash uint64, err error) { + userIDs, err := c.GetSuperGroupRecvMsgNotNotifyUserIDs(ctx, groupID) + if err != nil { + return 0, err + } + utils.Sort(userIDs, true) + bi := big.NewInt(0) + bi.SetString(utils.Md5(strings.Join(userIDs, ";"))[0:8], 16) + return bi.Uint64(), nil + }, ) } func (c *ConversationRedisCache) DelSuperGroupRecvMsgNotNotifyUserIDsHash(groupID string) ConversationCache { cache := c.NewCache() cache.AddKeys(c.getSuperGroupRecvNotNotifyUserIDsHashKey(groupID)) + return cache } -func (c *ConversationRedisCache) getUserAllHasReadSeqsIndex( - conversationID string, - conversationIDs []string, -) (int, error) { +func (c *ConversationRedisCache) getUserAllHasReadSeqsIndex(conversationID string, conversationIDs []string) (int, error) { for _i, _conversationID := range conversationIDs { if _conversationID == conversationID { return _i, nil } } - return 0, errors.New("not found key:" + conversationID + " in keys") -} -func (c *ConversationRedisCache) GetUserAllHasReadSeqs( - ctx context.Context, - ownerUserID string, -) (map[string]int64, error) { - conversationIDs, err := c.GetUserConversationIDs(ctx, ownerUserID) - if err != nil { - return nil, err - } - var keys []string - for _, conversarionID := range conversationIDs { - keys = append(keys, c.getConversationHasReadSeqKey(ownerUserID, conversarionID)) - } - return batchGetCacheMap( - ctx, - c.rcClient, - keys, - conversationIDs, - c.expireTime, - c.getUserAllHasReadSeqsIndex, - func(ctx context.Context) (map[string]int64, error) { - return c.conversationDB.GetUserAllHasReadSeqs(ctx, ownerUserID) - }, - ) + return 0, errors.New("not found key:" + conversationID + " in keys") } -func (c *ConversationRedisCache) DelUserAllHasReadSeqs(ownerUserID string, - conversationIDs ...string, -) ConversationCache { +//func (c *ConversationRedisCache) GetUserAllHasReadSeqs(ctx context.Context, ownerUserID string) (map[string]int64, error) { +// conversationIDs, err := c.GetUserConversationIDs(ctx, ownerUserID) +// if err != nil { +// return nil, err +// } +// var keys []string +// for _, conversarionID := range conversationIDs { +// keys = append(keys, c.getConversationHasReadSeqKey(ownerUserID, conversarionID)) +// } +// return batchGetCacheMap(ctx, c.rcClient, keys, conversationIDs, c.expireTime, c.getUserAllHasReadSeqsIndex, func(ctx context.Context) (map[string]int64, error) { +// return c.conversationDB.GetUserAllHasReadSeqs(ctx, ownerUserID) +// }) +//} + +func (c *ConversationRedisCache) DelUserAllHasReadSeqs(ownerUserID string, conversationIDs ...string) ConversationCache { cache := c.NewCache() for _, conversationID := range conversationIDs { cache.AddKeys(c.getConversationHasReadSeqKey(ownerUserID, conversationID)) } + return cache } -func (c *ConversationRedisCache) GetConversationsByConversationID( - ctx context.Context, - conversationIDs []string, -) ([]*relationtb.ConversationModel, error) { +func (c *ConversationRedisCache) GetConversationsByConversationID(ctx context.Context, conversationIDs []string) ([]*relationtb.ConversationModel, error) { panic("implement me") } @@ -435,15 +376,9 @@ func (c *ConversationRedisCache) DelConversationByConversationID(conversationIDs } func (c *ConversationRedisCache) GetConversationNotReceiveMessageUserIDs(ctx context.Context, conversationID string) ([]string, error) { - return getCache( - ctx, - c.rcClient, - c.getConversationNotReceiveMessageUserIDsKey(conversationID), - c.expireTime, - func(ctx context.Context) ([]string, error) { - return c.conversationDB.GetConversationNotReceiveMessageUserIDs(ctx, conversationID) - }, - ) + return getCache(ctx, c.rcClient, c.getConversationNotReceiveMessageUserIDsKey(conversationID), c.expireTime, func(ctx context.Context) ([]string, error) { + return c.conversationDB.GetConversationNotReceiveMessageUserIDs(ctx, conversationID) + }) } func (c *ConversationRedisCache) DelConversationNotReceiveMessageUserIDs(conversationIDs ...string) ConversationCache { @@ -451,5 +386,6 @@ func (c *ConversationRedisCache) DelConversationNotReceiveMessageUserIDs(convers for _, conversationID := range conversationIDs { cache.AddKeys(c.getConversationNotReceiveMessageUserIDsKey(conversationID)) } + return cache } diff --git a/pkg/common/db/cache/friend.go b/pkg/common/db/cache/friend.go index fd8c1d3c06..b2f929560f 100644 --- a/pkg/common/db/cache/friend.go +++ b/pkg/common/db/cache/friend.go @@ -53,12 +53,9 @@ type FriendCacheRedis struct { rcClient *rockscache.Client } -func NewFriendCacheRedis( - rdb redis.UniversalClient, - friendDB relationtb.FriendModelInterface, - options rockscache.Options, -) FriendCache { +func NewFriendCacheRedis(rdb redis.UniversalClient, friendDB relationtb.FriendModelInterface, options rockscache.Options) FriendCache { rcClient := rockscache.NewClient(rdb, options) + return &FriendCacheRedis{ metaCache: NewMetaCacheRedis(rcClient), friendDB: friendDB, @@ -67,12 +64,12 @@ func NewFriendCacheRedis( } } -func (c *FriendCacheRedis) NewCache() FriendCache { +func (f *FriendCacheRedis) NewCache() FriendCache { return &FriendCacheRedis{ - rcClient: c.rcClient, - metaCache: NewMetaCacheRedis(c.rcClient, c.metaCache.GetPreDelKeys()...), - friendDB: c.friendDB, - expireTime: c.expireTime, + rcClient: f.rcClient, + metaCache: NewMetaCacheRedis(f.rcClient, f.metaCache.GetPreDelKeys()...), + friendDB: f.friendDB, + expireTime: f.expireTime, } } @@ -89,32 +86,24 @@ func (f *FriendCacheRedis) getFriendKey(ownerUserID, friendUserID string) string } func (f *FriendCacheRedis) GetFriendIDs(ctx context.Context, ownerUserID string) (friendIDs []string, err error) { - return getCache( - ctx, - f.rcClient, - f.getFriendIDsKey(ownerUserID), - f.expireTime, - func(ctx context.Context) ([]string, error) { - return f.friendDB.FindFriendUserIDs(ctx, ownerUserID) - }, - ) + return getCache(ctx, f.rcClient, f.getFriendIDsKey(ownerUserID), f.expireTime, func(ctx context.Context) ([]string, error) { + return f.friendDB.FindFriendUserIDs(ctx, ownerUserID) + }) } -func (f *FriendCacheRedis) DelFriendIDs(ownerUserID ...string) FriendCache { - new := f.NewCache() - var keys []string - for _, userID := range ownerUserID { +func (f *FriendCacheRedis) DelFriendIDs(ownerUserIDs ...string) FriendCache { + newGroupCache := f.NewCache() + keys := make([]string, 0, len(ownerUserIDs)) + for _, userID := range ownerUserIDs { keys = append(keys, f.getFriendIDsKey(userID)) } - new.AddKeys(keys...) - return new + newGroupCache.AddKeys(keys...) + + return newGroupCache } // todo. -func (f *FriendCacheRedis) GetTwoWayFriendIDs( - ctx context.Context, - ownerUserID string, -) (twoWayFriendIDs []string, err error) { +func (f *FriendCacheRedis) GetTwoWayFriendIDs(ctx context.Context, ownerUserID string) (twoWayFriendIDs []string, err error) { friendIDs, err := f.GetFriendIDs(ctx, ownerUserID) if err != nil { return nil, err @@ -128,32 +117,26 @@ func (f *FriendCacheRedis) GetTwoWayFriendIDs( twoWayFriendIDs = append(twoWayFriendIDs, ownerUserID) } } + return twoWayFriendIDs, nil } func (f *FriendCacheRedis) DelTwoWayFriendIDs(ctx context.Context, ownerUserID string) FriendCache { - new := f.NewCache() - new.AddKeys(f.getTwoWayFriendsIDsKey(ownerUserID)) - return new + newFriendCache := f.NewCache() + newFriendCache.AddKeys(f.getTwoWayFriendsIDsKey(ownerUserID)) + + return newFriendCache } -func (f *FriendCacheRedis) GetFriend( - ctx context.Context, - ownerUserID, friendUserID string, -) (friend *relationtb.FriendModel, err error) { - return getCache( - ctx, - f.rcClient, - f.getFriendKey(ownerUserID, friendUserID), - f.expireTime, - func(ctx context.Context) (*relationtb.FriendModel, error) { - return f.friendDB.Take(ctx, ownerUserID, friendUserID) - }, - ) +func (f *FriendCacheRedis) GetFriend(ctx context.Context, ownerUserID, friendUserID string) (friend *relationtb.FriendModel, err error) { + return getCache(ctx, f.rcClient, f.getFriendKey(ownerUserID, friendUserID), f.expireTime, func(ctx context.Context) (*relationtb.FriendModel, error) { + return f.friendDB.Take(ctx, ownerUserID, friendUserID) + }) } func (f *FriendCacheRedis) DelFriend(ownerUserID, friendUserID string) FriendCache { - new := f.NewCache() - new.AddKeys(f.getFriendKey(ownerUserID, friendUserID)) - return new + newFriendCache := f.NewCache() + newFriendCache.AddKeys(f.getFriendKey(ownerUserID, friendUserID)) + + return newFriendCache } diff --git a/pkg/common/db/cache/group.go b/pkg/common/db/cache/group.go index 7d4c2b0439..d505772eb4 100644 --- a/pkg/common/db/cache/group.go +++ b/pkg/common/db/cache/group.go @@ -65,22 +65,10 @@ type GroupCache interface { GetJoinedGroupIDs(ctx context.Context, userID string) (joinedGroupIDs []string, err error) DelJoinedGroupID(userID ...string) GroupCache - GetGroupMemberInfo( - ctx context.Context, - groupID, userID string, - ) (groupMember *relationtb.GroupMemberModel, err error) - GetGroupMembersInfo( - ctx context.Context, - groupID string, - userID []string, - ) (groupMembers []*relationtb.GroupMemberModel, err error) + GetGroupMemberInfo(ctx context.Context, groupID, userID string) (groupMember *relationtb.GroupMemberModel, err error) + GetGroupMembersInfo(ctx context.Context, groupID string, userID []string) (groupMembers []*relationtb.GroupMemberModel, err error) GetAllGroupMembersInfo(ctx context.Context, groupID string) (groupMembers []*relationtb.GroupMemberModel, err error) - GetGroupMembersPage( - ctx context.Context, - groupID string, - userID []string, - showNumber, pageNumber int32, - ) (total uint32, groupMembers []*relationtb.GroupMemberModel, err error) + GetGroupMembersPage(ctx context.Context, groupID string, userID []string, showNumber, pageNumber int32) (total uint32, groupMembers []*relationtb.GroupMemberModel, err error) DelGroupMembersInfo(groupID string, userID ...string) GroupCache @@ -109,6 +97,7 @@ func NewGroupCacheRedis( opts rockscache.Options, ) GroupCache { rcClient := rockscache.NewClient(rdb, opts) + return &GroupCacheRedis{ rcClient: rcClient, expireTime: groupExpireTime, groupDB: groupDB, groupMemberDB: groupMemberDB, groupRequestDB: groupRequestDB, @@ -169,6 +158,7 @@ func (g *GroupCacheRedis) GetGroupIndex(group *relationtb.GroupModel, keys []str return i, nil } } + return 0, errIndex } @@ -179,117 +169,98 @@ func (g *GroupCacheRedis) GetGroupMemberIndex(groupMember *relationtb.GroupMembe return i, nil } } + return 0, errIndex } // / groupInfo. -func (g *GroupCacheRedis) GetGroupsInfo( - ctx context.Context, - groupIDs []string, -) (groups []*relationtb.GroupModel, err error) { - var keys []string - for _, group := range groupIDs { - keys = append(keys, g.getGroupInfoKey(group)) - } - return batchGetCache( - ctx, - g.rcClient, - keys, - g.expireTime, - g.GetGroupIndex, - func(ctx context.Context) ([]*relationtb.GroupModel, error) { - return g.groupDB.Find(ctx, groupIDs) - }, - ) +func (g *GroupCacheRedis) GetGroupsInfo(ctx context.Context, groupIDs []string) (groups []*relationtb.GroupModel, err error) { + //var keys []string + //for _, group := range groupIDs { + // keys = append(keys, g.getGroupInfoKey(group)) + //} + //return batchGetCache(ctx, g.rcClient, keys, g.expireTime, g.GetGroupIndex, func(ctx context.Context) ([]*relationtb.GroupModel, error) { + // return g.groupDB.Find(ctx, groupIDs) + //}) + return batchGetCache2(ctx, g.rcClient, g.expireTime, groupIDs, func(groupID string) string { + return g.getGroupInfoKey(groupID) + }, func(ctx context.Context, groupID string) (*relationtb.GroupModel, error) { + return g.groupDB.Take(ctx, groupID) + }) } func (g *GroupCacheRedis) GetGroupInfo(ctx context.Context, groupID string) (group *relationtb.GroupModel, err error) { - return getCache( - ctx, - g.rcClient, - g.getGroupInfoKey(groupID), - g.expireTime, - func(ctx context.Context) (*relationtb.GroupModel, error) { - return g.groupDB.Take(ctx, groupID) - }, - ) + return getCache(ctx, g.rcClient, g.getGroupInfoKey(groupID), g.expireTime, func(ctx context.Context) (*relationtb.GroupModel, error) { + return g.groupDB.Take(ctx, groupID) + }) } func (g *GroupCacheRedis) DelGroupsInfo(groupIDs ...string) GroupCache { - new := g.NewCache() - var keys []string + newGroupCache := g.NewCache() + keys := make([]string, 0, len(groupIDs)) for _, groupID := range groupIDs { keys = append(keys, g.getGroupInfoKey(groupID)) } - new.AddKeys(keys...) - return new -} - -func (g *GroupCacheRedis) GetJoinedSuperGroupIDs( - ctx context.Context, - userID string, -) (joinedSuperGroupIDs []string, err error) { - return getCache( - ctx, - g.rcClient, - g.getJoinedSuperGroupsIDKey(userID), - g.expireTime, - func(ctx context.Context) ([]string, error) { - userGroup, err := g.mongoDB.GetSuperGroupByUserID(ctx, userID) - if err != nil { - return nil, err - } - return userGroup.GroupIDs, nil - }, - ) + newGroupCache.AddKeys(keys...) + + return newGroupCache } -func (g *GroupCacheRedis) GetSuperGroupMemberIDs( - ctx context.Context, - groupIDs ...string, -) (models []*unrelationtb.SuperGroupModel, err error) { - var keys []string - for _, group := range groupIDs { - keys = append(keys, g.getSuperGroupMemberIDsKey(group)) - } - return batchGetCache( - ctx, - g.rcClient, - keys, - g.expireTime, - func(model *unrelationtb.SuperGroupModel, keys []string) (int, error) { - for i, key := range keys { - if g.getSuperGroupMemberIDsKey(model.GroupID) == key { - return i, nil - } - } - return 0, errIndex - }, - func(ctx context.Context) ([]*unrelationtb.SuperGroupModel, error) { - return g.mongoDB.FindSuperGroup(ctx, groupIDs) - }, +func (g *GroupCacheRedis) GetJoinedSuperGroupIDs(ctx context.Context, userID string) (joinedSuperGroupIDs []string, err error) { + return getCache(ctx, g.rcClient, g.getJoinedSuperGroupsIDKey(userID), g.expireTime, func(ctx context.Context) ([]string, error) { + userGroup, err := g.mongoDB.GetSuperGroupByUserID(ctx, userID) + if err != nil { + return nil, err + } + return userGroup.GroupIDs, nil + }, ) } +func (g *GroupCacheRedis) GetSuperGroupMemberIDs(ctx context.Context, groupIDs ...string) (models []*unrelationtb.SuperGroupModel, err error) { + //var keys []string + //for _, group := range groupIDs { + // keys = append(keys, g.getSuperGroupMemberIDsKey(group)) + //} + //return batchGetCache(ctx, g.rcClient, keys, g.expireTime, func(model *unrelationtb.SuperGroupModel, keys []string) (int, error) { + // for i, key := range keys { + // if g.getSuperGroupMemberIDsKey(model.GroupID) == key { + // return i, nil + // } + // } + // return 0, errIndex + //}, + // func(ctx context.Context) ([]*unrelationtb.SuperGroupModel, error) { + // return g.mongoDB.FindSuperGroup(ctx, groupIDs) + // }) + return batchGetCache2(ctx, g.rcClient, g.expireTime, groupIDs, func(groupID string) string { + return g.getSuperGroupMemberIDsKey(groupID) + }, func(ctx context.Context, groupID string) (*unrelationtb.SuperGroupModel, error) { + return g.mongoDB.TakeSuperGroup(ctx, groupID) + }) +} + // userJoinSuperGroup. func (g *GroupCacheRedis) DelJoinedSuperGroupIDs(userIDs ...string) GroupCache { - new := g.NewCache() - var keys []string + newGroupCache := g.NewCache() + keys := make([]string, 0, len(userIDs)) for _, userID := range userIDs { keys = append(keys, g.getJoinedSuperGroupsIDKey(userID)) } - new.AddKeys(keys...) - return new + newGroupCache.AddKeys(keys...) + + return newGroupCache } func (g *GroupCacheRedis) DelSuperGroupMemberIDs(groupIDs ...string) GroupCache { - new := g.NewCache() - var keys []string + newGroupCache := g.NewCache() + keys := make([]string, 0, len(groupIDs)) for _, groupID := range groupIDs { keys = append(keys, g.getSuperGroupMemberIDsKey(groupID)) } - new.AddKeys(keys...) - return new + newGroupCache.AddKeys(keys...) + + return newGroupCache } // groupMembersHash. @@ -351,10 +322,7 @@ func (g *GroupCacheRedis) GetGroupMembersHash(ctx context.Context, groupID strin //) } -func (g *GroupCacheRedis) GetGroupMemberHashMap( - ctx context.Context, - groupIDs []string, -) (map[string]*relationtb.GroupSimpleUserID, error) { +func (g *GroupCacheRedis) GetGroupMemberHashMap(ctx context.Context, groupIDs []string) (map[string]*relationtb.GroupSimpleUserID, error) { res := make(map[string]*relationtb.GroupSimpleUserID) for _, groupID := range groupIDs { hash, err := g.GetGroupMembersHash(ctx, groupID) @@ -368,26 +336,22 @@ func (g *GroupCacheRedis) GetGroupMemberHashMap( } res[groupID] = &relationtb.GroupSimpleUserID{Hash: hash, MemberNum: uint32(num)} } + return res, nil } func (g *GroupCacheRedis) DelGroupMembersHash(groupID string) GroupCache { cache := g.NewCache() cache.AddKeys(g.getGroupMembersHashKey(groupID)) + return cache } // groupMemberIDs. func (g *GroupCacheRedis) GetGroupMemberIDs(ctx context.Context, groupID string) (groupMemberIDs []string, err error) { - return getCache( - ctx, - g.rcClient, - g.getGroupMemberIDsKey(groupID), - g.expireTime, - func(ctx context.Context) ([]string, error) { - return g.groupMemberDB.FindMemberUserID(ctx, groupID) - }, - ) + return getCache(ctx, g.rcClient, g.getGroupMemberIDsKey(groupID), g.expireTime, func(ctx context.Context) ([]string, error) { + return g.groupMemberDB.FindMemberUserID(ctx, groupID) + }) } func (g *GroupCacheRedis) GetGroupsMemberIDs(ctx context.Context, groupIDs []string) (map[string][]string, error) { @@ -399,79 +363,56 @@ func (g *GroupCacheRedis) GetGroupsMemberIDs(ctx context.Context, groupIDs []str } m[groupID] = userIDs } + return m, nil } func (g *GroupCacheRedis) DelGroupMemberIDs(groupID string) GroupCache { cache := g.NewCache() cache.AddKeys(g.getGroupMemberIDsKey(groupID)) + return cache } func (g *GroupCacheRedis) GetJoinedGroupIDs(ctx context.Context, userID string) (joinedGroupIDs []string, err error) { - return getCache( - ctx, - g.rcClient, - g.getJoinedGroupsKey(userID), - g.expireTime, - func(ctx context.Context) ([]string, error) { - return g.groupMemberDB.FindUserJoinedGroupID(ctx, userID) - }, - ) + return getCache(ctx, g.rcClient, g.getJoinedGroupsKey(userID), g.expireTime, func(ctx context.Context) ([]string, error) { + return g.groupMemberDB.FindUserJoinedGroupID(ctx, userID) + }) } func (g *GroupCacheRedis) DelJoinedGroupID(userIDs ...string) GroupCache { - var keys []string + keys := make([]string, 0, len(userIDs)) for _, userID := range userIDs { keys = append(keys, g.getJoinedGroupsKey(userID)) } cache := g.NewCache() cache.AddKeys(keys...) + return cache } -func (g *GroupCacheRedis) GetGroupMemberInfo( - ctx context.Context, - groupID, userID string, -) (groupMember *relationtb.GroupMemberModel, err error) { - return getCache( - ctx, - g.rcClient, - g.getGroupMemberInfoKey(groupID, userID), - g.expireTime, - func(ctx context.Context) (*relationtb.GroupMemberModel, error) { - return g.groupMemberDB.Take(ctx, groupID, userID) - }, - ) +func (g *GroupCacheRedis) GetGroupMemberInfo(ctx context.Context, groupID, userID string) (groupMember *relationtb.GroupMemberModel, err error) { + return getCache(ctx, g.rcClient, g.getGroupMemberInfoKey(groupID, userID), g.expireTime, func(ctx context.Context) (*relationtb.GroupMemberModel, error) { + return g.groupMemberDB.Take(ctx, groupID, userID) + }) } -func (g *GroupCacheRedis) GetGroupMembersInfo( - ctx context.Context, - groupID string, - userIDs []string, -) ([]*relationtb.GroupMemberModel, error) { - var keys []string - for _, userID := range userIDs { - keys = append(keys, g.getGroupMemberInfoKey(groupID, userID)) - } - return batchGetCache( - ctx, - g.rcClient, - keys, - g.expireTime, - g.GetGroupMemberIndex, - func(ctx context.Context) ([]*relationtb.GroupMemberModel, error) { - return g.groupMemberDB.Find(ctx, []string{groupID}, userIDs, nil) - }, - ) +func (g *GroupCacheRedis) GetGroupMembersInfo(ctx context.Context, groupID string, userIDs []string) ([]*relationtb.GroupMemberModel, error) { + //var keys []string + //for _, userID := range userIDs { + // keys = append(keys, g.getGroupMemberInfoKey(groupID, userID)) + //} + //return batchGetCache(ctx, g.rcClient, keys, g.expireTime, g.GetGroupMemberIndex, func(ctx context.Context) ([]*relationtb.GroupMemberModel, error) { + // return g.groupMemberDB.Find(ctx, []string{groupID}, userIDs, nil) + //}) + return batchGetCache2(ctx, g.rcClient, g.expireTime, userIDs, func(userID string) string { + return g.getGroupMemberInfoKey(groupID, userID) + }, func(ctx context.Context, userID string) (*relationtb.GroupMemberModel, error) { + return g.groupMemberDB.Take(ctx, groupID, userID) + }) } -func (g *GroupCacheRedis) GetGroupMembersPage( - ctx context.Context, - groupID string, - userIDs []string, - showNumber, pageNumber int32, -) (total uint32, groupMembers []*relationtb.GroupMemberModel, err error) { +func (g *GroupCacheRedis) GetGroupMembersPage(ctx context.Context, groupID string, userIDs []string, showNumber, pageNumber int32) (total uint32, groupMembers []*relationtb.GroupMemberModel, err error) { groupMemberIDs, err := g.GetGroupMemberIDs(ctx, groupID) if err != nil { return 0, nil, err @@ -482,72 +423,58 @@ func (g *GroupCacheRedis) GetGroupMembersPage( userIDs = groupMemberIDs } groupMembers, err = g.GetGroupMembersInfo(ctx, groupID, utils.Paginate(userIDs, int(showNumber), int(showNumber))) + return uint32(len(userIDs)), groupMembers, err } -func (g *GroupCacheRedis) GetAllGroupMembersInfo( - ctx context.Context, - groupID string, -) (groupMembers []*relationtb.GroupMemberModel, err error) { +func (g *GroupCacheRedis) GetAllGroupMembersInfo(ctx context.Context, groupID string) (groupMembers []*relationtb.GroupMemberModel, err error) { groupMemberIDs, err := g.GetGroupMemberIDs(ctx, groupID) if err != nil { return nil, err } + return g.GetGroupMembersInfo(ctx, groupID, groupMemberIDs) } -func (g *GroupCacheRedis) GetAllGroupMemberInfo( - ctx context.Context, - groupID string, -) ([]*relationtb.GroupMemberModel, error) { +func (g *GroupCacheRedis) GetAllGroupMemberInfo(ctx context.Context, groupID string) ([]*relationtb.GroupMemberModel, error) { groupMemberIDs, err := g.GetGroupMemberIDs(ctx, groupID) if err != nil { return nil, err } - var keys []string - for _, groupMemberID := range groupMemberIDs { - keys = append(keys, g.getGroupMemberInfoKey(groupID, groupMemberID)) - } - return batchGetCache( - ctx, - g.rcClient, - keys, - g.expireTime, - g.GetGroupMemberIndex, - func(ctx context.Context) ([]*relationtb.GroupMemberModel, error) { - return g.groupMemberDB.Find(ctx, []string{groupID}, groupMemberIDs, nil) - }, - ) + //var keys []string + //for _, groupMemberID := range groupMemberIDs { + // keys = append(keys, g.getGroupMemberInfoKey(groupID, groupMemberID)) + //} + //return batchGetCache(ctx, g.rcClient, keys, g.expireTime, g.GetGroupMemberIndex, func(ctx context.Context) ([]*relationtb.GroupMemberModel, error) { + // return g.groupMemberDB.Find(ctx, []string{groupID}, groupMemberIDs, nil) + //}) + return g.GetGroupMembersInfo(ctx, groupID, groupMemberIDs) } func (g *GroupCacheRedis) DelGroupMembersInfo(groupID string, userIDs ...string) GroupCache { - var keys []string + keys := make([]string, 0, len(userIDs)) for _, userID := range userIDs { keys = append(keys, g.getGroupMemberInfoKey(groupID, userID)) } cache := g.NewCache() cache.AddKeys(keys...) + return cache } func (g *GroupCacheRedis) GetGroupMemberNum(ctx context.Context, groupID string) (memberNum int64, err error) { - return getCache( - ctx, - g.rcClient, - g.getGroupMemberNumKey(groupID), - g.expireTime, - func(ctx context.Context) (int64, error) { - return g.groupMemberDB.TakeGroupMemberNum(ctx, groupID) - }, - ) + return getCache(ctx, g.rcClient, g.getGroupMemberNumKey(groupID), g.expireTime, func(ctx context.Context) (int64, error) { + return g.groupMemberDB.TakeGroupMemberNum(ctx, groupID) + }) } func (g *GroupCacheRedis) DelGroupsMemberNum(groupID ...string) GroupCache { - var keys []string + keys := make([]string, 0, len(groupID)) for _, groupID := range groupID { keys = append(keys, g.getGroupMemberNumKey(groupID)) } cache := g.NewCache() cache.AddKeys(keys...) + return cache } diff --git a/pkg/common/db/cache/meta_cache.go b/pkg/common/db/cache/meta_cache.go index ca742d4a3a..5cff3df7f8 100644 --- a/pkg/common/db/cache/meta_cache.go +++ b/pkg/common/db/cache/meta_cache.go @@ -18,7 +18,6 @@ import ( "context" "encoding/json" "errors" - "fmt" "time" "github.com/dtm-labs/rockscache" @@ -59,27 +58,40 @@ type metaCacheRedis struct { func (m *metaCacheRedis) ExecDel(ctx context.Context) error { if len(m.keys) > 0 { log.ZDebug(ctx, "delete cache", "keys", m.keys) - retryTimes := 0 - for { - if err := m.rcClient.TagAsDeletedBatch2(ctx, m.keys); err != nil { - if retryTimes >= m.maxRetryTimes { - err = errs.ErrInternalServer.Wrap( - fmt.Sprintf( - "delete cache error: %v, keys: %v, retry times %d, please check redis server", - err, - m.keys, - retryTimes, - ), - ) - log.ZWarn(ctx, "delete cache failed, please handle keys", err, "keys", m.keys) - return err + for _, key := range m.keys { + for i := 0; i < m.maxRetryTimes; i++ { + if err := m.rcClient.TagAsDeleted(key); err != nil { + log.ZError(ctx, "delete cache failed", err, "key", key) + time.Sleep(m.retryInterval) + continue } - retryTimes++ - } else { break } + + //retryTimes := 0 + //for { + // m.rcClient.TagAsDeleted() + // if err := m.rcClient.TagAsDeletedBatch2(ctx, []string{key}); err != nil { + // if retryTimes >= m.maxRetryTimes { + // err = errs.ErrInternalServer.Wrap( + // fmt.Sprintf( + // "delete cache error: %v, keys: %v, retry times %d, please check redis server", + // err, + // key, + // retryTimes, + // ), + // ) + // log.ZWarn(ctx, "delete cache failed, please handle keys", err, "keys", key) + // return err + // } + // retryTimes++ + // } else { + // break + // } + //} } } + return nil } @@ -103,16 +115,11 @@ func GetDefaultOpt() rockscache.Options { opts := rockscache.NewDefaultOptions() opts.StrongConsistency = true opts.RandomExpireAdjustment = 0.2 + return opts } -func getCache[T any]( - ctx context.Context, - rcClient *rockscache.Client, - key string, - expire time.Duration, - fn func(ctx context.Context) (T, error), -) (T, error) { +func getCache[T any](ctx context.Context, rcClient *rockscache.Client, key string, expire time.Duration, fn func(ctx context.Context) (T, error)) (T, error) { var t T var write bool v, err := rcClient.Fetch2(ctx, key, expire, func() (s string, err error) { @@ -125,6 +132,7 @@ func getCache[T any]( return "", utils.Wrap(err, "") } write = true + return string(bs), nil }) if err != nil { @@ -139,95 +147,108 @@ func getCache[T any]( err = json.Unmarshal([]byte(v), &t) if err != nil { log.ZError(ctx, "cache json.Unmarshal failed", err, "key", key, "value", v, "expire", expire) + return t, utils.Wrap(err, "") } + return t, nil } -func batchGetCache[T any]( - ctx context.Context, - rcClient *rockscache.Client, - keys []string, - expire time.Duration, - keyIndexFn func(t T, keys []string) (int, error), - fn func(ctx context.Context) ([]T, error), -) ([]T, error) { - batchMap, err := rcClient.FetchBatch2(ctx, keys, expire, func(idxs []int) (m map[int]string, err error) { - values := make(map[int]string) - tArrays, err := fn(ctx) - if err != nil { - return nil, err - } - for _, v := range tArrays { - index, err := keyIndexFn(v, keys) - if err != nil { - continue - } - bs, err := json.Marshal(v) - if err != nil { - return nil, utils.Wrap(err, "marshal failed") - } - values[index] = string(bs) - } - return values, nil - }) - if err != nil { - return nil, err +//func batchGetCache[T any](ctx context.Context, rcClient *rockscache.Client, keys []string, expire time.Duration, keyIndexFn func(t T, keys []string) (int, error), fn func(ctx context.Context) ([]T, error)) ([]T, error) { +// batchMap, err := rcClient.FetchBatch2(ctx, keys, expire, func(idxs []int) (m map[int]string, err error) { +// values := make(map[int]string) +// tArrays, err := fn(ctx) +// if err != nil { +// return nil, err +// } +// for _, v := range tArrays { +// index, err := keyIndexFn(v, keys) +// if err != nil { +// continue +// } +// bs, err := json.Marshal(v) +// if err != nil { +// return nil, utils.Wrap(err, "marshal failed") +// } +// values[index] = string(bs) +// } +// return values, nil +// }) +// if err != nil { +// return nil, err +// } +// var tArrays []T +// for _, v := range batchMap { +// if v != "" { +// var t T +// err = json.Unmarshal([]byte(v), &t) +// if err != nil { +// return nil, utils.Wrap(err, "unmarshal failed") +// } +// tArrays = append(tArrays, t) +// } +// } +// return tArrays, nil +//} + +func batchGetCache2[T any, K comparable](ctx context.Context, rcClient *rockscache.Client, expire time.Duration, keys []K, keyFn func(key K) string, fns func(ctx context.Context, key K) (T, error)) ([]T, error) { + if len(keys) == 0 { + return nil, nil } - var tArrays []T - for _, v := range batchMap { - if v != "" { - var t T - err = json.Unmarshal([]byte(v), &t) - if err != nil { - return nil, utils.Wrap(err, "unmarshal failed") - } - tArrays = append(tArrays, t) - } - } - return tArrays, nil -} - -func batchGetCacheMap[T any]( - ctx context.Context, - rcClient *rockscache.Client, - keys, originKeys []string, - expire time.Duration, - keyIndexFn func(s string, keys []string) (int, error), - fn func(ctx context.Context) (map[string]T, error), -) (map[string]T, error) { - batchMap, err := rcClient.FetchBatch2(ctx, keys, expire, func(idxs []int) (m map[int]string, err error) { - tArrays, err := fn(ctx) + res := make([]T, 0, len(keys)) + for _, key := range keys { + val, err := getCache(ctx, rcClient, keyFn(key), expire, func(ctx context.Context) (T, error) { + return fns(ctx, key) + }) if err != nil { return nil, err } - values := make(map[int]string) - for k, v := range tArrays { - index, err := keyIndexFn(k, originKeys) - if err != nil { - continue - } - bs, err := json.Marshal(v) - if err != nil { - return nil, utils.Wrap(err, "marshal failed") - } - values[index] = string(bs) - } - return values, nil - }) - if err != nil { - return nil, err + res = append(res, val) } - tMap := make(map[string]T) - for i, v := range batchMap { - if v != "" { - var t T - err = json.Unmarshal([]byte(v), &t) - if err != nil { - return nil, utils.Wrap(err, "unmarshal failed") - } - tMap[originKeys[i]] = t - } - } - return tMap, nil + + return res, nil } + +//func batchGetCacheMap[T any]( +// ctx context.Context, +// rcClient *rockscache.Client, +// keys, originKeys []string, +// expire time.Duration, +// keyIndexFn func(s string, keys []string) (int, error), +// fn func(ctx context.Context) (map[string]T, error), +//) (map[string]T, error) { +// batchMap, err := rcClient.FetchBatch2(ctx, keys, expire, func(idxs []int) (m map[int]string, err error) { +// tArrays, err := fn(ctx) +// if err != nil { +// return nil, err +// } +// values := make(map[int]string) +// for k, v := range tArrays { +// index, err := keyIndexFn(k, originKeys) +// if err != nil { +// continue +// } +// bs, err := json.Marshal(v) +// if err != nil { +// return nil, utils.Wrap(err, "marshal failed") +// } +// values[index] = string(bs) +// } +// return values, nil +// }) +// if err != nil { +// return nil, err +// } +// tMap := make(map[string]T) +// for i, v := range batchMap { +// if v != "" { +// var t T +// err = json.Unmarshal([]byte(v), &t) +// if err != nil { +// return nil, utils.Wrap(err, "unmarshal failed") +// } +// tMap[originKeys[i]] = t +// } +// } +// return tMap, nil +//} diff --git a/pkg/common/db/cache/msg.go b/pkg/common/db/cache/msg.go index 65b8d63de3..7753d4e965 100644 --- a/pkg/common/db/cache/msg.go +++ b/pkg/common/db/cache/msg.go @@ -16,13 +16,14 @@ package cache import ( "context" + "errors" + "github.com/dtm-labs/rockscache" + unrelationtb "github.com/openimsdk/open-im-server/v3/pkg/common/db/table/unrelation" "strconv" "time" "github.com/openimsdk/open-im-server/v3/pkg/msgprocessor" - "github.com/dtm-labs/rockscache" - "github.com/OpenIMSDK/tools/errs" "github.com/gogo/protobuf/jsonpb" @@ -33,7 +34,6 @@ import ( "github.com/OpenIMSDK/tools/utils" "github.com/openimsdk/open-im-server/v3/pkg/common/config" - unrelationtb "github.com/openimsdk/open-im-server/v3/pkg/common/db/table/unrelation" "github.com/redis/go-redis/v9" ) @@ -105,11 +105,7 @@ type MsgModel interface { GetTokensWithoutError(ctx context.Context, userID string, platformID int) (map[string]int, error) SetTokenMapByUidPid(ctx context.Context, userID string, platformID int, m map[string]int) error DeleteTokenByUidPid(ctx context.Context, userID string, platformID int, fields []string) error - GetMessagesBySeq( - ctx context.Context, - conversationID string, - seqs []int64, - ) (seqMsg []*sdkws.MsgData, failedSeqList []int64, err error) + GetMessagesBySeq(ctx context.Context, conversationID string, seqs []int64) (seqMsg []*sdkws.MsgData, failedSeqList []int64, err error) SetMessageToCache(ctx context.Context, conversationID string, msgs []*sdkws.MsgData) (int, error) UserDeleteMsgs(ctx context.Context, conversationID string, seqs []int64, userID string) error DelUserDeleteMsgsList(ctx context.Context, conversationID string, seqs []int64) @@ -122,12 +118,7 @@ type MsgModel interface { JudgeMessageReactionExist(ctx context.Context, clientMsgID string, sessionType int32) (bool, error) GetOneMessageAllReactionList(ctx context.Context, clientMsgID string, sessionType int32) (map[string]string, error) DeleteOneMessageKey(ctx context.Context, clientMsgID string, sessionType int32, subKey string) error - SetMessageReactionExpire( - ctx context.Context, - clientMsgID string, - sessionType int32, - expiration time.Duration, - ) (bool, error) + SetMessageReactionExpire(ctx context.Context, clientMsgID string, sessionType int32, expiration time.Duration) (bool, error) GetMessageTypeKeyValue(ctx context.Context, clientMsgID string, sessionType int32, typeKey string) (string, error) SetMessageTypeKeyValue(ctx context.Context, clientMsgID string, sessionType int32, typeKey, value string) error LockMessageTypeKey(ctx context.Context, clientMsgID string, TypeKey string) error @@ -158,50 +149,51 @@ func (c *msgCache) getHasReadSeqKey(conversationID string, userID string) string return hasReadSeq + userID + ":" + conversationID } -func (c *msgCache) setSeq( - ctx context.Context, - conversationID string, - seq int64, - getkey func(conversationID string) string, -) error { +func (c *msgCache) setSeq(ctx context.Context, conversationID string, seq int64, getkey func(conversationID string) string) error { return utils.Wrap1(c.rdb.Set(ctx, getkey(conversationID), seq, 0).Err()) } -func (c *msgCache) getSeq( - ctx context.Context, - conversationID string, - getkey func(conversationID string) string, -) (int64, error) { +func (c *msgCache) getSeq(ctx context.Context, conversationID string, getkey func(conversationID string) string) (int64, error) { return utils.Wrap2(c.rdb.Get(ctx, getkey(conversationID)).Int64()) } -func (c *msgCache) getSeqs( - ctx context.Context, - items []string, - getkey func(s string) string, -) (m map[string]int64, err error) { - pipe := c.rdb.Pipeline() - for _, v := range items { - if err := pipe.Get(ctx, getkey(v)).Err(); err != nil && err != redis.Nil { - return nil, errs.Wrap(err) - } - } - result, err := pipe.Exec(ctx) - if err != nil && err != redis.Nil { - return nil, errs.Wrap(err) - } +func (c *msgCache) getSeqs(ctx context.Context, items []string, getkey func(s string) string) (m map[string]int64, err error) { m = make(map[string]int64, len(items)) - for i, v := range result { - seq := v.(*redis.StringCmd) - if seq.Err() != nil && seq.Err() != redis.Nil { - return nil, errs.Wrap(v.Err()) + for i, v := range items { + res, err := c.rdb.Get(ctx, getkey(v)).Result() + if err != nil && err != redis.Nil { + return nil, errs.Wrap(err) } - val := utils.StringToInt64(seq.Val()) + val := utils.StringToInt64(res) if val != 0 { m[items[i]] = val } } + return m, nil + + //pipe := c.rdb.Pipeline() + //for _, v := range items { + // if err := pipe.Get(ctx, getkey(v)).Err(); err != nil && err != redis.Nil { + // return nil, errs.Wrap(err) + // } + //} + //result, err := pipe.Exec(ctx) + //if err != nil && err != redis.Nil { + // return nil, errs.Wrap(err) + //} + //m = make(map[string]int64, len(items)) + //for i, v := range result { + // seq := v.(*redis.StringCmd) + // if seq.Err() != nil && seq.Err() != redis.Nil { + // return nil, errs.Wrap(v.Err()) + // } + // val := utils.StringToInt64(seq.Val()) + // if val != 0 { + // m[items[i]] = val + // } + //} + //return m, nil } func (c *msgCache) SetMaxSeq(ctx context.Context, conversationID string, maxSeq int64) error { @@ -221,15 +213,21 @@ func (c *msgCache) SetMinSeq(ctx context.Context, conversationID string, minSeq } func (c *msgCache) setSeqs(ctx context.Context, seqs map[string]int64, getkey func(key string) string) error { - pipe := c.rdb.Pipeline() - for k, seq := range seqs { - err := pipe.Set(ctx, getkey(k), seq, 0).Err() - if err != nil { + for conversationID, seq := range seqs { + if err := c.rdb.Set(ctx, getkey(conversationID), seq, 0).Err(); err != nil { return errs.Wrap(err) } } - _, err := pipe.Exec(ctx) - return err + return nil + //pipe := c.rdb.Pipeline() + //for k, seq := range seqs { + // err := pipe.Set(ctx, getkey(k), seq, 0).Err() + // if err != nil { + // return errs.Wrap(err) + // } + //} + //_, err := pipe.Exec(ctx) + //return err } func (c *msgCache) SetMinSeqs(ctx context.Context, seqs map[string]int64) error { @@ -252,30 +250,17 @@ func (c *msgCache) GetConversationUserMinSeq(ctx context.Context, conversationID return utils.Wrap2(c.rdb.Get(ctx, c.getConversationUserMinSeqKey(conversationID, userID)).Int64()) } -func (c *msgCache) GetConversationUserMinSeqs( - ctx context.Context, - conversationID string, - userIDs []string, -) (m map[string]int64, err error) { +func (c *msgCache) GetConversationUserMinSeqs(ctx context.Context, conversationID string, userIDs []string) (m map[string]int64, err error) { return c.getSeqs(ctx, userIDs, func(userID string) string { return c.getConversationUserMinSeqKey(conversationID, userID) }) } -func (c *msgCache) SetConversationUserMinSeq( - ctx context.Context, - conversationID string, - userID string, - minSeq int64, -) error { +func (c *msgCache) SetConversationUserMinSeq(ctx context.Context, conversationID string, userID string, minSeq int64) error { return utils.Wrap1(c.rdb.Set(ctx, c.getConversationUserMinSeqKey(conversationID, userID), minSeq, 0).Err()) } -func (c *msgCache) SetConversationUserMinSeqs( - ctx context.Context, - conversationID string, - seqs map[string]int64, -) (err error) { +func (c *msgCache) SetConversationUserMinSeqs(ctx context.Context, conversationID string, seqs map[string]int64) (err error) { return c.setSeqs(ctx, seqs, func(userID string) string { return c.getConversationUserMinSeqKey(conversationID, userID) }) @@ -303,11 +288,7 @@ func (c *msgCache) UserSetHasReadSeqs(ctx context.Context, userID string, hasRea }) } -func (c *msgCache) GetHasReadSeqs( - ctx context.Context, - userID string, - conversationIDs []string, -) (map[string]int64, error) { +func (c *msgCache) GetHasReadSeqs(ctx context.Context, userID string, conversationIDs []string) (map[string]int64, error) { return c.getSeqs(ctx, conversationIDs, func(conversationID string) string { return c.getHasReadSeqKey(conversationID, userID) }) @@ -319,6 +300,7 @@ func (c *msgCache) GetHasReadSeq(ctx context.Context, userID string, conversatio func (c *msgCache) AddTokenFlag(ctx context.Context, userID string, platformID int, token string, flag int) error { key := uidPidToken + userID + ":" + constant.PlatformIDToName(platformID) + return errs.Wrap(c.rdb.HSet(ctx, key, token, flag).Err()) } @@ -332,6 +314,7 @@ func (c *msgCache) GetTokensWithoutError(ctx context.Context, userID string, pla for k, v := range m { mm[k] = utils.StringToInt(v) } + return mm, nil } @@ -341,11 +324,13 @@ func (c *msgCache) SetTokenMapByUidPid(ctx context.Context, userID string, platf for k, v := range m { mm[k] = v } + return errs.Wrap(c.rdb.HSet(ctx, key, mm).Err()) } func (c *msgCache) DeleteTokenByUidPid(ctx context.Context, userID string, platform int, fields []string) error { key := uidPidToken + userID + ":" + constant.PlatformIDToName(platform) + return errs.Wrap(c.rdb.HDel(ctx, key, fields...).Err()) } @@ -357,58 +342,86 @@ func (c *msgCache) allMessageCacheKey(conversationID string) string { return messageCache + conversationID + "_*" } -func (c *msgCache) GetMessagesBySeq( - ctx context.Context, - conversationID string, - seqs []int64, -) (seqMsgs []*sdkws.MsgData, failedSeqs []int64, err error) { - pipe := c.rdb.Pipeline() - for _, v := range seqs { - // MESSAGE_CACHE:169.254.225.224_reliability1653387820_0_1 - key := c.getMessageCacheKey(conversationID, v) - if err := pipe.Get(ctx, key).Err(); err != nil && err != redis.Nil { - return nil, nil, err +func (c *msgCache) GetMessagesBySeq(ctx context.Context, conversationID string, seqs []int64) (seqMsgs []*sdkws.MsgData, failedSeqs []int64, err error) { + for _, seq := range seqs { + res, err := c.rdb.Get(ctx, c.getMessageCacheKey(conversationID, seq)).Result() + if err != nil { + log.ZError(ctx, "GetMessagesBySeq failed", err, "conversationID", conversationID, "seq", seq) + failedSeqs = append(failedSeqs, seq) + continue } - } - result, err := pipe.Exec(ctx) - for i, v := range result { - cmd := v.(*redis.StringCmd) - if cmd.Err() != nil { - failedSeqs = append(failedSeqs, seqs[i]) - } else { - msg := sdkws.MsgData{} - err = msgprocessor.String2Pb(cmd.Val(), &msg) - if err == nil { - if msg.Status != constant.MsgDeleted { - seqMsgs = append(seqMsgs, &msg) - continue - } - } else { - log.ZWarn(ctx, "UnmarshalString failed", err, "conversationID", conversationID, "seq", seqs[i], "msg", cmd.Val()) - } - failedSeqs = append(failedSeqs, seqs[i]) + msg := sdkws.MsgData{} + if err = msgprocessor.String2Pb(res, &msg); err != nil { + log.ZError(ctx, "GetMessagesBySeq Unmarshal failed", err, "res", res, "conversationID", conversationID, "seq", seq) + failedSeqs = append(failedSeqs, seq) + continue + } + if msg.Status == constant.MsgDeleted { + failedSeqs = append(failedSeqs, seq) + continue } + seqMsgs = append(seqMsgs, &msg) } - return seqMsgs, failedSeqs, err + + return + //pipe := c.rdb.Pipeline() + //for _, v := range seqs { + // // MESSAGE_CACHE:169.254.225.224_reliability1653387820_0_1 + // key := c.getMessageCacheKey(conversationID, v) + // if err := pipe.Get(ctx, key).Err(); err != nil && err != redis.Nil { + // return nil, nil, err + // } + //} + //result, err := pipe.Exec(ctx) + //for i, v := range result { + // cmd := v.(*redis.StringCmd) + // if cmd.Err() != nil { + // failedSeqs = append(failedSeqs, seqs[i]) + // } else { + // msg := sdkws.MsgData{} + // err = msgprocessor.String2Pb(cmd.Val(), &msg) + // if err == nil { + // if msg.Status != constant.MsgDeleted { + // seqMsgs = append(seqMsgs, &msg) + // continue + // } + // } else { + // log.ZWarn(ctx, "UnmarshalString failed", err, "conversationID", conversationID, "seq", seqs[i], "msg", cmd.Val()) + // } + // failedSeqs = append(failedSeqs, seqs[i]) + // } + //} + //return seqMsgs, failedSeqs, err } func (c *msgCache) SetMessageToCache(ctx context.Context, conversationID string, msgs []*sdkws.MsgData) (int, error) { - pipe := c.rdb.Pipeline() - var failedMsgs []*sdkws.MsgData for _, msg := range msgs { - key := c.getMessageCacheKey(conversationID, msg.Seq) s, err := msgprocessor.Pb2String(msg) if err != nil { return 0, errs.Wrap(err) } - err = pipe.Set(ctx, key, s, time.Duration(config.Config.MsgCacheTimeout)*time.Second).Err() - if err != nil { - failedMsgs = append(failedMsgs, msg) - log.ZWarn(ctx, "set msg 2 cache failed", err, "msg", failedMsgs) + key := c.getMessageCacheKey(conversationID, msg.Seq) + if err := c.rdb.Set(ctx, key, s, time.Duration(config.Config.MsgCacheTimeout)*time.Second).Err(); err != nil { + return 0, errs.Wrap(err) } } - _, err := pipe.Exec(ctx) - return len(failedMsgs), err + return len(msgs), nil + //pipe := c.rdb.Pipeline() + //var failedMsgs []*sdkws.MsgData + //for _, msg := range msgs { + // key := c.getMessageCacheKey(conversationID, msg.Seq) + // s, err := msgprocessor.Pb2String(msg) + // if err != nil { + // return 0, errs.Wrap(err) + // } + // err = pipe.Set(ctx, key, s, time.Duration(config.Config.MsgCacheTimeout)*time.Second).Err() + // if err != nil { + // failedMsgs = append(failedMsgs, msg) + // log.ZWarn(ctx, "set msg 2 cache failed", err, "msg", failedMsgs) + // } + //} + //_, err := pipe.Exec(ctx) + //return len(failedMsgs), err } func (c *msgCache) getMessageDelUserListKey(conversationID string, seq int64) string { @@ -420,27 +433,47 @@ func (c *msgCache) getUserDelList(conversationID, userID string) string { } func (c *msgCache) UserDeleteMsgs(ctx context.Context, conversationID string, seqs []int64, userID string) error { - pipe := c.rdb.Pipeline() for _, seq := range seqs { delUserListKey := c.getMessageDelUserListKey(conversationID, seq) userDelListKey := c.getUserDelList(conversationID, userID) - err := pipe.SAdd(ctx, delUserListKey, userID).Err() + err := c.rdb.SAdd(ctx, delUserListKey, userID).Err() if err != nil { return errs.Wrap(err) } - err = pipe.SAdd(ctx, userDelListKey, seq).Err() + err = c.rdb.SAdd(ctx, userDelListKey, seq).Err() if err != nil { return errs.Wrap(err) } - if err := pipe.Expire(ctx, delUserListKey, time.Duration(config.Config.MsgCacheTimeout)*time.Second).Err(); err != nil { + if err := c.rdb.Expire(ctx, delUserListKey, time.Duration(config.Config.MsgCacheTimeout)*time.Second).Err(); err != nil { return errs.Wrap(err) } - if err := pipe.Expire(ctx, userDelListKey, time.Duration(config.Config.MsgCacheTimeout)*time.Second).Err(); err != nil { + if err := c.rdb.Expire(ctx, userDelListKey, time.Duration(config.Config.MsgCacheTimeout)*time.Second).Err(); err != nil { return errs.Wrap(err) } } - _, err := pipe.Exec(ctx) - return errs.Wrap(err) + + return nil + //pipe := c.rdb.Pipeline() + //for _, seq := range seqs { + // delUserListKey := c.getMessageDelUserListKey(conversationID, seq) + // userDelListKey := c.getUserDelList(conversationID, userID) + // err := pipe.SAdd(ctx, delUserListKey, userID).Err() + // if err != nil { + // return errs.Wrap(err) + // } + // err = pipe.SAdd(ctx, userDelListKey, seq).Err() + // if err != nil { + // return errs.Wrap(err) + // } + // if err := pipe.Expire(ctx, delUserListKey, time.Duration(config.Config.MsgCacheTimeout)*time.Second).Err(); err != nil { + // return errs.Wrap(err) + // } + // if err := pipe.Expire(ctx, userDelListKey, time.Duration(config.Config.MsgCacheTimeout)*time.Second).Err(); err != nil { + // return errs.Wrap(err) + // } + //} + //_, err := pipe.Exec(ctx) + //return errs.Wrap(err) } func (c *msgCache) GetUserDelList(ctx context.Context, userID, conversationID string) (seqs []int64, err error) { @@ -452,6 +485,7 @@ func (c *msgCache) GetUserDelList(ctx context.Context, userID, conversationID st for i, v := range result { seqs[i] = utils.StringToInt64(v) } + return seqs, nil } @@ -460,67 +494,102 @@ func (c *msgCache) DelUserDeleteMsgsList(ctx context.Context, conversationID str delUsers, err := c.rdb.SMembers(ctx, c.getMessageDelUserListKey(conversationID, seq)).Result() if err != nil { log.ZWarn(ctx, "DelUserDeleteMsgsList failed", err, "conversationID", conversationID, "seq", seq) + continue } if len(delUsers) > 0 { - pipe := c.rdb.Pipeline() var failedFlag bool for _, userID := range delUsers { - err = pipe.SRem(ctx, c.getUserDelList(conversationID, userID), seq).Err() + err = c.rdb.SRem(ctx, c.getUserDelList(conversationID, userID), seq).Err() if err != nil { failedFlag = true - log.ZWarn( - ctx, - "DelUserDeleteMsgsList failed", - err, - "conversationID", - conversationID, - "seq", - seq, - "userID", - userID, - ) + log.ZWarn(ctx, "DelUserDeleteMsgsList failed", err, "conversationID", conversationID, "seq", seq, "userID", userID) } } if !failedFlag { - if err := pipe.Del(ctx, c.getMessageDelUserListKey(conversationID, seq)).Err(); err != nil { + if err := c.rdb.Del(ctx, c.getMessageDelUserListKey(conversationID, seq)).Err(); err != nil { log.ZWarn(ctx, "DelUserDeleteMsgsList failed", err, "conversationID", conversationID, "seq", seq) } } - if _, err := pipe.Exec(ctx); err != nil { - log.ZError(ctx, "pipe exec failed", err, "conversationID", conversationID, "seq", seq) - } } } + //for _, seq := range seqs { + // delUsers, err := c.rdb.SMembers(ctx, c.getMessageDelUserListKey(conversationID, seq)).Result() + // if err != nil { + // log.ZWarn(ctx, "DelUserDeleteMsgsList failed", err, "conversationID", conversationID, "seq", seq) + // continue + // } + // if len(delUsers) > 0 { + // pipe := c.rdb.Pipeline() + // var failedFlag bool + // for _, userID := range delUsers { + // err = pipe.SRem(ctx, c.getUserDelList(conversationID, userID), seq).Err() + // if err != nil { + // failedFlag = true + // log.ZWarn( + // ctx, + // "DelUserDeleteMsgsList failed", + // err, + // "conversationID", + // conversationID, + // "seq", + // seq, + // "userID", + // userID, + // ) + // } + // } + // if !failedFlag { + // if err := pipe.Del(ctx, c.getMessageDelUserListKey(conversationID, seq)).Err(); err != nil { + // log.ZWarn(ctx, "DelUserDeleteMsgsList failed", err, "conversationID", conversationID, "seq", seq) + // } + // } + // if _, err := pipe.Exec(ctx); err != nil { + // log.ZError(ctx, "pipe exec failed", err, "conversationID", conversationID, "seq", seq) + // } + // } + //} } func (c *msgCache) DeleteMessages(ctx context.Context, conversationID string, seqs []int64) error { - pipe := c.rdb.Pipeline() for _, seq := range seqs { - if err := pipe.Del(ctx, c.getMessageCacheKey(conversationID, seq)).Err(); err != nil { + if err := c.rdb.Del(ctx, c.getMessageCacheKey(conversationID, seq)).Err(); err != nil { return errs.Wrap(err) } } - _, err := pipe.Exec(ctx) - return errs.Wrap(err) + return nil + //pipe := c.rdb.Pipeline() + //for _, seq := range seqs { + // if err := pipe.Del(ctx, c.getMessageCacheKey(conversationID, seq)).Err(); err != nil { + // return errs.Wrap(err) + // } + //} + //_, err := pipe.Exec(ctx) + //return errs.Wrap(err) } func (c *msgCache) CleanUpOneConversationAllMsg(ctx context.Context, conversationID string) error { vals, err := c.rdb.Keys(ctx, c.allMessageCacheKey(conversationID)).Result() - if err == redis.Nil { + if errors.Is(err, redis.Nil) { return nil } if err != nil { return errs.Wrap(err) } - pipe := c.rdb.Pipeline() for _, v := range vals { - if err := pipe.Del(ctx, v).Err(); err != nil { + if err := c.rdb.Del(ctx, v).Err(); err != nil { return errs.Wrap(err) } } - _, err = pipe.Exec(ctx) - return errs.Wrap(err) + return nil + //pipe := c.rdb.Pipeline() + //for _, v := range vals { + // if err := pipe.Del(ctx, v).Err(); err != nil { + // return errs.Wrap(err) + // } + //} + //_, err = pipe.Exec(ctx) + //return errs.Wrap(err) } func (c *msgCache) DelMsgFromCache(ctx context.Context, userID string, seqs []int64) error { @@ -528,13 +597,15 @@ func (c *msgCache) DelMsgFromCache(ctx context.Context, userID string, seqs []in key := c.getMessageCacheKey(userID, seq) result, err := c.rdb.Get(ctx, key).Result() if err != nil { - if err == redis.Nil { + if errors.Is(err, redis.Nil) { continue } + return errs.Wrap(err) } var msg sdkws.MsgData - if err := jsonpb.UnmarshalString(result, &msg); err != nil { + err = jsonpb.UnmarshalString(result, &msg) + if err != nil { return err } msg.Status = constant.MsgDeleted @@ -546,6 +617,7 @@ func (c *msgCache) DelMsgFromCache(ctx context.Context, userID string, seqs []in return errs.Wrap(err) } } + return nil } @@ -571,20 +643,12 @@ func (c *msgCache) SetSendMsgStatus(ctx context.Context, id string, status int32 func (c *msgCache) GetSendMsgStatus(ctx context.Context, id string) (int32, error) { result, err := c.rdb.Get(ctx, sendMsgFailedFlag+id).Int() + return int32(result), errs.Wrap(err) } -func (c *msgCache) SetFcmToken( - ctx context.Context, - account string, - platformID int, - fcmToken string, - expireTime int64, -) (err error) { - return errs.Wrap( - c.rdb.Set(ctx, fcmToken+account+":"+strconv.Itoa(platformID), fcmToken, time.Duration(expireTime)*time.Second). - Err(), - ) +func (c *msgCache) SetFcmToken(ctx context.Context, account string, platformID int, fcmToken string, expireTime int64) (err error) { + return errs.Wrap(c.rdb.Set(ctx, fcmToken+account+":"+strconv.Itoa(platformID), fcmToken, time.Duration(expireTime)*time.Second).Err()) } func (c *msgCache) GetFcmToken(ctx context.Context, account string, platformID int) (string, error) { @@ -597,6 +661,7 @@ func (c *msgCache) DelFcmToken(ctx context.Context, account string, platformID i func (c *msgCache) IncrUserBadgeUnreadCountSum(ctx context.Context, userID string) (int, error) { seq, err := c.rdb.Incr(ctx, userBadgeUnreadCountSum+userID).Result() + return int(seq), errs.Wrap(err) } @@ -610,11 +675,13 @@ func (c *msgCache) GetUserBadgeUnreadCountSum(ctx context.Context, userID string func (c *msgCache) LockMessageTypeKey(ctx context.Context, clientMsgID string, TypeKey string) error { key := exTypeKeyLocker + clientMsgID + "_" + TypeKey + return errs.Wrap(c.rdb.SetNX(ctx, key, 1, time.Minute).Err()) } func (c *msgCache) UnLockMessageTypeKey(ctx context.Context, clientMsgID string, TypeKey string) error { key := exTypeKeyLocker + clientMsgID + "_" + TypeKey + return errs.Wrap(c.rdb.Del(ctx, key).Err()) } @@ -629,6 +696,7 @@ func (c *msgCache) getMessageReactionExPrefix(clientMsgID string, sessionType in case constant.NotificationChatType: return "EX_NOTIFICATION" + clientMsgID } + return "" } @@ -637,6 +705,7 @@ func (c *msgCache) JudgeMessageReactionExist(ctx context.Context, clientMsgID st if err != nil { return false, utils.Wrap(err, "") } + return n > 0, nil } @@ -649,21 +718,11 @@ func (c *msgCache) SetMessageTypeKeyValue( return errs.Wrap(c.rdb.HSet(ctx, c.getMessageReactionExPrefix(clientMsgID, sessionType), typeKey, value).Err()) } -func (c *msgCache) SetMessageReactionExpire( - ctx context.Context, - clientMsgID string, - sessionType int32, - expiration time.Duration, -) (bool, error) { +func (c *msgCache) SetMessageReactionExpire(ctx context.Context, clientMsgID string, sessionType int32, expiration time.Duration) (bool, error) { return utils.Wrap2(c.rdb.Expire(ctx, c.getMessageReactionExPrefix(clientMsgID, sessionType), expiration).Result()) } -func (c *msgCache) GetMessageTypeKeyValue( - ctx context.Context, - clientMsgID string, - sessionType int32, - typeKey string, -) (string, error) { +func (c *msgCache) GetMessageTypeKeyValue(ctx context.Context, clientMsgID string, sessionType int32, typeKey string) (string, error) { return utils.Wrap2(c.rdb.HGet(ctx, c.getMessageReactionExPrefix(clientMsgID, sessionType), typeKey).Result()) } diff --git a/pkg/common/db/cache/user.go b/pkg/common/db/cache/user.go index b821b4a524..d1164f2c08 100644 --- a/pkg/common/db/cache/user.go +++ b/pkg/common/db/cache/user.go @@ -17,6 +17,7 @@ package cache import ( "context" "encoding/json" + "errors" "hash/crc32" "strconv" "time" @@ -70,6 +71,7 @@ func NewUserCacheRedis( options rockscache.Options, ) UserCache { rcClient := rockscache.NewClient(rdb, options) + return &UserCacheRedis{ rdb: rdb, metaCache: NewMetaCacheRedis(rcClient), @@ -97,10 +99,6 @@ func (u *UserCacheRedis) getUserGlobalRecvMsgOptKey(userID string) string { return userGlobalRecvMsgOptKey + userID } -func (u *UserCacheRedis) getUserStatusHashKey(userID string, Id int32) string { - return userID + "_" + string(Id) + platformID -} - func (u *UserCacheRedis) GetUserInfo(ctx context.Context, userID string) (userInfo *relationtb.UserModel, err error) { return getCache( ctx, @@ -114,36 +112,42 @@ func (u *UserCacheRedis) GetUserInfo(ctx context.Context, userID string) (userIn } func (u *UserCacheRedis) GetUsersInfo(ctx context.Context, userIDs []string) ([]*relationtb.UserModel, error) { - var keys []string - for _, userID := range userIDs { - keys = append(keys, u.getUserInfoKey(userID)) - } - return batchGetCache( - ctx, - u.rcClient, - keys, - u.expireTime, - func(user *relationtb.UserModel, keys []string) (int, error) { - for i, key := range keys { - if key == u.getUserInfoKey(user.UserID) { - return i, nil - } - } - return 0, errIndex - }, - func(ctx context.Context) ([]*relationtb.UserModel, error) { - return u.userDB.Find(ctx, userIDs) - }, - ) + //var keys []string + //for _, userID := range userIDs { + // keys = append(keys, u.getUserInfoKey(userID)) + //} + //return batchGetCache( + // ctx, + // u.rcClient, + // keys, + // u.expireTime, + // func(user *relationtb.UserModel, keys []string) (int, error) { + // for i, key := range keys { + // if key == u.getUserInfoKey(user.UserID) { + // return i, nil + // } + // } + // return 0, errIndex + // }, + // func(ctx context.Context) ([]*relationtb.UserModel, error) { + // return u.userDB.Find(ctx, userIDs) + // }, + //) + return batchGetCache2(ctx, u.rcClient, u.expireTime, userIDs, func(userID string) string { + return u.getUserInfoKey(userID) + }, func(ctx context.Context, userID string) (*relationtb.UserModel, error) { + return u.userDB.Take(ctx, userID) + }) } func (u *UserCacheRedis) DelUsersInfo(userIDs ...string) UserCache { - var keys []string + keys := make([]string, 0, len(userIDs)) for _, userID := range userIDs { keys = append(keys, u.getUserInfoKey(userID)) } cache := u.NewCache() cache.AddKeys(keys...) + return cache } @@ -160,22 +164,19 @@ func (u *UserCacheRedis) GetUserGlobalRecvMsgOpt(ctx context.Context, userID str } func (u *UserCacheRedis) DelUsersGlobalRecvMsgOpt(userIDs ...string) UserCache { - var keys []string + keys := make([]string, 0, len(userIDs)) for _, userID := range userIDs { keys = append(keys, u.getUserGlobalRecvMsgOptKey(userID)) } cache := u.NewCache() cache.AddKeys(keys...) - return cache -} -func (u *UserCacheRedis) getOnlineStatusKey(userID string) string { - return olineStatusKey + userID + return cache } // GetUserStatus get user status. func (u *UserCacheRedis) GetUserStatus(ctx context.Context, userIDs []string) ([]*user.OnlineStatus, error) { - var res []*user.OnlineStatus + userStatus := make([]*user.OnlineStatus, 0, len(userIDs)) for _, userID := range userIDs { UserIDNum := crc32.ChecksumIEEE([]byte(userID)) modKey := strconv.Itoa(int(UserIDNum % statusMod)) @@ -183,13 +184,14 @@ func (u *UserCacheRedis) GetUserStatus(ctx context.Context, userIDs []string) ([ key := olineStatusKey + modKey result, err := u.rdb.HGet(ctx, key, userID).Result() if err != nil { - if err == redis.Nil { + if errors.Is(err, redis.Nil) { // key or field does not exist - res = append(res, &user.OnlineStatus{ + userStatus = append(userStatus, &user.OnlineStatus{ UserID: userID, Status: constant.Offline, PlatformIDs: nil, }) + continue } else { return nil, errs.Wrap(err) @@ -201,9 +203,10 @@ func (u *UserCacheRedis) GetUserStatus(ctx context.Context, userIDs []string) ([ } onlineStatus.UserID = userID onlineStatus.Status = constant.Online - res = append(res, &onlineStatus) + userStatus = append(userStatus, &onlineStatus) } - return res, nil + + return userStatus, nil } // SetUserStatus Set the user status and save it in redis. @@ -224,15 +227,16 @@ func (u *UserCacheRedis) SetUserStatus(ctx context.Context, userID string, statu Status: constant.Online, PlatformIDs: []int32{platformID}, } - jsonData, err := json.Marshal(onlineStatus) - if err != nil { - return errs.Wrap(err) + jsonData, err2 := json.Marshal(&onlineStatus) + if err2 != nil { + return errs.Wrap(err2) } - _, err = u.rdb.HSet(ctx, key, userID, string(jsonData)).Result() - if err != nil { - return errs.Wrap(err) + _, err2 = u.rdb.HSet(ctx, key, userID, string(jsonData)).Result() + if err2 != nil { + return errs.Wrap(err2) } u.rdb.Expire(ctx, key, userOlineStatusExpireTime) + return nil } } @@ -240,7 +244,7 @@ func (u *UserCacheRedis) SetUserStatus(ctx context.Context, userID string, statu isNil := false result, err := u.rdb.HGet(ctx, key, userID).Result() if err != nil { - if err == redis.Nil { + if errors.Is(err, redis.Nil) { isNil = true } else { return errs.Wrap(err) @@ -248,51 +252,45 @@ func (u *UserCacheRedis) SetUserStatus(ctx context.Context, userID string, statu } if status == constant.Offline { - if isNil { - log.ZWarn(ctx, "this user not online,maybe trigger order not right", - err, "userStatus", status) - return nil + err = u.refreshStatusOffline(ctx, userID, status, platformID, isNil, err, result, key) + if err != nil { + return err } - var onlineStatus user.OnlineStatus - err = json.Unmarshal([]byte(result), &onlineStatus) + } else { + err = u.refreshStatusOnline(ctx, userID, platformID, isNil, err, result, key) if err != nil { return errs.Wrap(err) } - var newPlatformIDs []int32 - for _, val := range onlineStatus.PlatformIDs { - if val != platformID { - newPlatformIDs = append(newPlatformIDs, val) - } + } + + return nil +} + +func (u *UserCacheRedis) refreshStatusOffline(ctx context.Context, userID string, status, platformID int32, isNil bool, err error, result, key string) error { + if isNil { + log.ZWarn(ctx, "this user not online,maybe trigger order not right", + err, "userStatus", status) + + return nil + } + var onlineStatus user.OnlineStatus + err = json.Unmarshal([]byte(result), &onlineStatus) + if err != nil { + return errs.Wrap(err) + } + var newPlatformIDs []int32 + for _, val := range onlineStatus.PlatformIDs { + if val != platformID { + newPlatformIDs = append(newPlatformIDs, val) } - if newPlatformIDs == nil { - _, err = u.rdb.HDel(ctx, key, userID).Result() - if err != nil { - return errs.Wrap(err) - } - } else { - onlineStatus.PlatformIDs = newPlatformIDs - newjsonData, err := json.Marshal(&onlineStatus) - if err != nil { - return errs.Wrap(err) - } - _, err = u.rdb.HSet(ctx, key, userID, string(newjsonData)).Result() - if err != nil { - return errs.Wrap(err) - } + } + if newPlatformIDs == nil { + _, err = u.rdb.HDel(ctx, key, userID).Result() + if err != nil { + return errs.Wrap(err) } } else { - var onlineStatus user.OnlineStatus - if !isNil { - err = json.Unmarshal([]byte(result), &onlineStatus) - if err != nil { - return errs.Wrap(err) - } - onlineStatus.PlatformIDs = RemoveRepeatedElementsInList(append(onlineStatus.PlatformIDs, platformID)) - } else { - onlineStatus.PlatformIDs = append(onlineStatus.PlatformIDs, platformID) - } - onlineStatus.Status = constant.Online - onlineStatus.UserID = userID + onlineStatus.PlatformIDs = newPlatformIDs newjsonData, err := json.Marshal(&onlineStatus) if err != nil { return errs.Wrap(err) @@ -301,7 +299,31 @@ func (u *UserCacheRedis) SetUserStatus(ctx context.Context, userID string, statu if err != nil { return errs.Wrap(err) } + } + + return nil +} +func (u *UserCacheRedis) refreshStatusOnline(ctx context.Context, userID string, platformID int32, isNil bool, err error, result, key string) error { + var onlineStatus user.OnlineStatus + if !isNil { + err2 := json.Unmarshal([]byte(result), &onlineStatus) + if err != nil { + return errs.Wrap(err2) + } + onlineStatus.PlatformIDs = RemoveRepeatedElementsInList(append(onlineStatus.PlatformIDs, platformID)) + } else { + onlineStatus.PlatformIDs = append(onlineStatus.PlatformIDs, platformID) + } + onlineStatus.Status = constant.Online + onlineStatus.UserID = userID + newjsonData, err := json.Marshal(&onlineStatus) + if err != nil { + return errs.Wrap(err) + } + _, err = u.rdb.HSet(ctx, key, userID, string(newjsonData)).Result() + if err != nil { + return errs.Wrap(err) } return nil