Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

client id gen #1157

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions internal/comm/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (

"github.com/dicedb/dice/internal/auth"
"github.com/dicedb/dice/internal/cmd"
"github.com/dicedb/dice/internal/id"
)

type CmdWatchResponse struct {
Expand All @@ -27,7 +28,7 @@ type Client struct {
Cqueue cmd.RedisCmds
IsTxn bool
Session *auth.Session
ClientIdentifierID uint32
ClientIdentifierID uint64
}

func (c *Client) Write(b []byte) (int, error) {
Expand Down Expand Up @@ -62,12 +63,12 @@ func NewClient(fd int) *Client {
}
}

func NewHTTPQwatchClient(qwatchResponseChan chan QwatchResponse, clientIdentifierID uint32) *Client {
func NewHTTPQwatchClient(qwatchResponseChan chan QwatchResponse) *Client {
cmds := make([]*cmd.DiceDBCmd, 0)
return &Client{
Cqueue: cmd.RedisCmds{Cmds: cmds},
Session: auth.NewSession(),
ClientIdentifierID: clientIdentifierID,
ClientIdentifierID: id.NextUint64(),
HTTPQwatchResponseChan: qwatchResponseChan,
}
}
4 changes: 2 additions & 2 deletions internal/eval/eval.go
Original file line number Diff line number Diff line change
Expand Up @@ -1983,7 +1983,7 @@ func EvalQWATCH(args []string, httpOp bool, client *comm.Client, store *dstore.S
Query: query,
CacheChan: cacheChannel,
QwatchClientChan: client.HTTPQwatchResponseChan,
ClientIdentifierID: client.ClientIdentifierID,
ClientIdentifierID: uint32(client.ClientIdentifierID),
}
} else {
watchSubscription = querymanager.QuerySubscription{
Expand Down Expand Up @@ -2028,7 +2028,7 @@ func EvalQUNWATCH(args []string, httpOp bool, client *comm.Client) []byte {
Subscribe: false,
Query: query,
QwatchClientChan: client.HTTPQwatchResponseChan,
ClientIdentifierID: client.ClientIdentifierID,
ClientIdentifierID: uint32(client.ClientIdentifierID),
}
} else {
querymanager.QuerySubscriptionChan <- querymanager.QuerySubscription{
Expand Down
9 changes: 9 additions & 0 deletions internal/id/monotonous_int.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package id

import "sync/atomic"

var id uint64 = 0

func NextUint64() uint64 {
return atomic.AddUint64(&id, 1)
}
19 changes: 3 additions & 16 deletions internal/server/httpServer.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
"context"
"encoding/json"
"fmt"
"hash/crc32"
"log/slog"
"net/http"
"strings"
Expand Down Expand Up @@ -209,9 +208,8 @@
writer.Header().Set("Connection", "keep-alive")
writer.WriteHeader(http.StatusOK)
// We're a generating a unique client id, to keep track in core of requests from registered clients
clientIdentifierID := generateUniqueInt32(request)
qwatchQuery := diceDBCmd.Args[0]
qwatchClient := comm.NewHTTPQwatchClient(s.qwatchResponseChan, clientIdentifierID)
qwatchClient := comm.NewHTTPQwatchClient(s.qwatchResponseChan)
// Prepare the store operation
storeOp := &ops.StoreOp{
Cmd: diceDBCmd,
Expand All @@ -221,7 +219,7 @@
HTTPOp: true,
}

s.logger.Info("Registered client for watching query", slog.Any("clientID", clientIdentifierID),
s.logger.Info("Registered client for watching query", slog.Any("clientID", qwatchClient.ClientIdentifierID),
slog.Any("query", qwatchQuery))
s.shardManager.GetShard(0).ReqChan <- storeOp

Expand All @@ -235,7 +233,7 @@
select {
case resp := <-s.qwatchResponseChan:
// Since we're reusing
if resp.ClientIdentifierID == clientIdentifierID {
if resp.ClientIdentifierID == uint32(qwatchClient.ClientIdentifierID) {

Check failure on line 236 in internal/server/httpServer.go

View workflow job for this annotation

GitHub Actions / lint

truncateCmp: truncation in comparison 64->32 bit; cast the other operand to uint64 instead (gocritic)
s.writeQWatchResponse(writer, resp)
}
case <-s.shutdownChan:
Expand Down Expand Up @@ -456,17 +454,6 @@
return utils.HTTPResponse{Data: responseValue}
}

func generateUniqueInt32(r *http.Request) uint32 {
var sb strings.Builder
sb.WriteString(r.RemoteAddr)
sb.WriteString(r.UserAgent())
sb.WriteString(r.Method)
sb.WriteString(r.URL.Path)

// Hash the string using CRC32 and cast it to an int32
return crc32.ChecksumIEEE([]byte(sb.String()))
}

func replaceNilInInterface(data interface{}) interface{} {
switch v := data.(type) {
case string:
Expand Down
Loading