diff --git a/core/commands.go b/core/commands.go index 93c560b2c..8a69f2d56 100644 --- a/core/commands.go +++ b/core/commands.go @@ -107,7 +107,7 @@ var ( jsonclearCmdMeta = DiceCmdMeta{ Name: "JSON.CLEAR", Info: `JSON.CLEAR key [path] - Returns an integer reply specifying the number ofmatching JSON arrays and + Returns an integer reply specifying the number ofmatching JSON arrays and objects cleared +number of matching JSON numerical values zeroed. Error reply: If the number of arguments is incorrect the key doesn't exist.`, Eval: evalJSONCLEAR, @@ -665,22 +665,22 @@ var ( bitposCmdMeta = DiceCmdMeta{ Name: "BITPOS", Info: `BITPOS returns the position of the first bit set to 1 or 0 in a string - The position is returned, thinking of the string as an array of bits from left to right, - where the first byte's most significant bit is at position 0, the second byte's most significant + The position is returned, thinking of the string as an array of bits from left to right, + where the first byte's most significant bit is at position 0, the second byte's most significant bit is at position 8, and so forth. - By default, all the bytes contained in the string are examined. It is possible to look for bits only in a - specified interval passing the additional arguments start and end (it is possible to just pass start, + By default, all the bytes contained in the string are examined. It is possible to look for bits only in a + specified interval passing the additional arguments start and end (it is possible to just pass start, the operation will assume that the end is the last byte of the string). - By default, the range is interpreted as a range of bytes and not a range of bits, so start=0 and end=2 means + By default, the range is interpreted as a range of bytes and not a range of bits, so start=0 and end=2 means to look at the first three bytes. - You can use the optional BIT modifier to specify that the range should be interpreted as a range of bits. So + You can use the optional BIT modifier to specify that the range should be interpreted as a range of bits. So start=0 and end=2 means to look at the first three bits. - Note that bit positions are returned always as absolute values starting from bit zero even when start and end + Note that bit positions are returned always as absolute values starting from bit zero even when start and end are used to specify a range. - The start and end can contain negative values in order to index bytes starting from the end of the string, - where -1 is the last byte, -2 is the penultimate, and so forth. When BIT is specified, -1 is the last bit, -2 + The start and end can contain negative values in order to index bytes starting from the end of the string, + where -1 is the last byte, -2 is the penultimate, and so forth. When BIT is specified, -1 is the last bit, -2 is the penultimate, and so forth. - Returns + Returns RESP encoded integer indicating the position of the first bit set to 1 or 0 according to the request. RESP encoded integer if we look for clear bits and the string only contains bits set to 1, the function returns the first bit not part of the string on the right. @@ -746,6 +746,22 @@ var ( Arity: -2, KeySpecs: KeySpecs{BeginIndex: 1}, } + pfAddCmdMeta = DiceCmdMeta{ + Name: "PFADD", + Info: `PFADD key [element [element ...]] + Adds elements to a HyperLogLog key. Creates the key if it doesn't exist.`, + Eval: evalPFADD, + Arity: -2, + KeySpecs: KeySpecs{BeginIndex: 1}, + } + pfCountCmdMeta = DiceCmdMeta{ + Name: "PFCOUNT", + Info: `PFCOUNT key [key ...] + Returns the approximated cardinality of the set(s) observed by the HyperLogLog key(s).`, + Eval: evalPFCOUNT, + Arity: -2, + KeySpecs: KeySpecs{BeginIndex: 1}, + } ) func init() { @@ -832,4 +848,6 @@ func init() { diceCmds["SCARD"] = scardCmdMeta diceCmds["SDIFF"] = sdiffCmdMeta diceCmds["SINTER"] = sinterCmdMeta + diceCmds["PFADD"] = pfAddCmdMeta + diceCmds["PFCOUNT"] = pfCountCmdMeta } diff --git a/core/diceerrors/diceerrors.go b/core/diceerrors/diceerrors.go index 6d51cea7e..3798cd211 100644 --- a/core/diceerrors/diceerrors.go +++ b/core/diceerrors/diceerrors.go @@ -17,6 +17,7 @@ const ( NoKeyErr = "no such key" ErrDefault = "-ERR %s" WrongTypeErr = "-WRONGTYPE Operation against a key holding the wrong kind of value" + InvalidHllErr = "-INVALIDOBJ Corrupted HLL object detected" WorkerNotFoundErr = "worker with ID %s not found" ) diff --git a/core/eval.go b/core/eval.go index 37f8a1dc5..deb146f17 100644 --- a/core/eval.go +++ b/core/eval.go @@ -11,14 +11,14 @@ import ( "syscall" "time" - "github.com/cockroachdb/swiss" - "github.com/dicedb/dice/core/auth" - "github.com/dicedb/dice/core/comm" - + "github.com/axiomhq/hyperloglog" "github.com/bytedance/sonic" "github.com/charmbracelet/log" + "github.com/cockroachdb/swiss" "github.com/dicedb/dice/config" + "github.com/dicedb/dice/core/auth" "github.com/dicedb/dice/core/bit" + "github.com/dicedb/dice/core/comm" "github.com/dicedb/dice/core/diceerrors" "github.com/dicedb/dice/internal/constants" "github.com/dicedb/dice/server/utils" @@ -2945,3 +2945,64 @@ func evalSINTER(args []string, store *Store) []byte { }) return Encode(members, false) } + +// PFADD Adds all the element arguments to the HyperLogLog data structure stored at the variable +// name specified as first argument. +// +// Returns: +// If the approximated cardinality estimated by the HyperLogLog changed after executing the command, +// returns 1, otherwise 0 is returned. +func evalPFADD(args []string, store *Store) []byte { + if len(args) < 1 { + return diceerrors.NewErrArity("PFADD") + } + + key := args[0] + obj := store.Get(key) + + // If key doesn't exist prior initial cardinality changes hence return 1 + if obj == nil { + hll := hyperloglog.New() + for _, arg := range args[1:] { + hll.Insert([]byte(arg)) + } + + obj = store.NewObj(hll, -1, ObjTypeString, ObjEncodingRaw) + + store.Put(key, obj) + return Encode(1, false) + } + + existingHll := obj.Value.(*hyperloglog.Sketch) + initialCardinality := existingHll.Estimate() + for _, arg := range args[1:] { + existingHll.Insert([]byte(arg)) + } + + if newCardinality := existingHll.Estimate(); initialCardinality != newCardinality { + return Encode(1, false) + } + + return Encode(0, false) +} + +func evalPFCOUNT(args []string, store *Store) []byte { + if len(args) < 1 { + return diceerrors.NewErrArity("PFCOUNT") + } + + var unionHll = hyperloglog.New() + + for _, arg := range args { + obj := store.Get(arg) + if obj != nil { + currKeyHll := obj.Value.(*hyperloglog.Sketch) + err := unionHll.Merge(currKeyHll) + if err != nil { + return diceerrors.NewErrWithMessage(diceerrors.InvalidHllErr) + } + } + } + + return Encode(unionHll.Estimate(), false) +} diff --git a/core/eval_test.go b/core/eval_test.go index 5d37519a5..7630f2ce2 100644 --- a/core/eval_test.go +++ b/core/eval_test.go @@ -4,6 +4,7 @@ import ( "bytes" "errors" "fmt" + "github.com/axiomhq/hyperloglog" "strconv" "strings" "testing" @@ -59,6 +60,8 @@ func TestEval(t *testing.T) { testEvalDbsize(t, store) testEvalGETSET(t, store) testEvalHSET(t, store) + testEvalPFADD(t, store) + testEvalPFCOUNT(t, store) } func testEvalPING(t *testing.T, store *Store) { @@ -930,6 +933,48 @@ func testEvalGETSET(t *testing.T, store *Store) { runEvalTests(t, tests, evalGETSET, store) } +func testEvalPFADD(t *testing.T, store *Store) { + tests := map[string]evalTestCase{ + "nil value": {input: nil, output: []byte("-ERR wrong number of arguments for 'pfadd' command\r\n")}, + "empty array": {input: []string{}, output: []byte("-ERR wrong number of arguments for 'pfadd' command\r\n")}, + "one value": {input: []string{"KEY"}, output: []byte(":1\r\n")}, + "key val pair": {input: []string{"KEY", "VAL"}, output: []byte(":1\r\n")}, + "key multiple values": {input: []string{"KEY", "VAL", "VAL1", "VAL2"}, output: []byte(":1\r\n")}, + } + + runEvalTests(t, tests, evalPFADD, store) +} + +func testEvalPFCOUNT(t *testing.T, store *Store) { + tests := map[string]evalTestCase{ + "PFCOUNT with empty arg": { + input: []string{}, + output: []byte("-ERR wrong number of arguments for 'pfcount' command\r\n"), + }, + "PFCOUNT key not exists": { + input: []string{"HELLO"}, + output: Encode(0, false), + }, + "PFCOUNT key exists": { + setup: func() { + key := "EXISTING_KEY" + value := hyperloglog.New() + value.Insert([]byte("VALUE")) + obj := &Obj{ + Value: value, + LastAccessedAt: uint32(time.Now().Unix()), + } + store.store.Put(key, obj) + store.keypool.Put(key, &key) + }, + input: []string{"EXISTING_KEY"}, + output: Encode(1, false), + }, + } + + runEvalTests(t, tests, evalPFCOUNT, store) +} + func runEvalTests(t *testing.T, tests map[string]evalTestCase, evalFunc func([]string, *Store) []byte, store *Store) { for name, tc := range tests { t.Run(name, func(t *testing.T) { diff --git a/go.mod b/go.mod index 3f6c559f7..2eaf099df 100644 --- a/go.mod +++ b/go.mod @@ -11,6 +11,7 @@ require ( github.com/charmbracelet/lipgloss v0.10.0 // indirect github.com/cloudwego/base64x v0.1.4 // indirect github.com/cloudwego/iasm v0.2.0 // indirect + github.com/dgryski/go-metro v0.0.0-20180109044635-280f6062b5bc // indirect github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect github.com/go-logfmt/logfmt v0.6.0 // indirect github.com/google/go-cmp v0.5.9 // indirect @@ -28,6 +29,7 @@ require ( ) require ( + github.com/axiomhq/hyperloglog v0.2.0 github.com/bytedance/sonic v1.12.1 github.com/charmbracelet/log v0.4.0 github.com/cockroachdb/swiss v0.0.0-20240612210725-f4de07ae6964 diff --git a/go.sum b/go.sum index d558c06a4..9b04a1438 100644 --- a/go.sum +++ b/go.sum @@ -1,5 +1,7 @@ github.com/aclements/go-perfevent v0.0.0-20240301234650-f7843625020f h1:JjxwchlOepwsUWcQwD2mLUAGE9aCp0/ehy6yCHFBOvo= github.com/aclements/go-perfevent v0.0.0-20240301234650-f7843625020f/go.mod h1:tMDTce/yLLN/SK8gMOxQfnyeMeCg8KGzp0D1cbECEeo= +github.com/axiomhq/hyperloglog v0.2.0 h1:u1XT3yyY1rjzlWuP6NQIrV4bRYHOaqZaovqjcBEvZJo= +github.com/axiomhq/hyperloglog v0.2.0/go.mod h1:GcgMjz9gaDKZ3G0UMS6Fq/VkZ4l7uGgcJyxA7M+omIM= github.com/aymanbagabas/go-osc52/v2 v2.0.1 h1:HwpRHbFMcZLEVr42D4p7XBqjyuxQH5SMiErDT4WkJ2k= github.com/aymanbagabas/go-osc52/v2 v2.0.1/go.mod h1:uYgXzlJ7ZpABp8OJ+exZzJJhRNQ2ASbcXHWsFqH8hp8= github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs= @@ -26,6 +28,8 @@ github.com/cockroachdb/swiss v0.0.0-20240612210725-f4de07ae6964/go.mod h1:yBRu/c github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/dgryski/go-metro v0.0.0-20180109044635-280f6062b5bc h1:8WFBn63wegobsYAX0YjD+8suexZDga5CctH4CCTx2+8= +github.com/dgryski/go-metro v0.0.0-20180109044635-280f6062b5bc/go.mod h1:c9O8+fpSOX1DM8cPNSkX/qsBWdkD4yd2dpciOWQjpBw= github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78= github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc= github.com/dicedb/go-dice v0.0.0-20240820180649-d97f15fca831 h1:Cqyj9WCtoobN6++bFbDSe27q94SPwJD9Z0wmu+SDRuk= diff --git a/tests/hyperloglog_test.go b/tests/hyperloglog_test.go new file mode 100644 index 000000000..6dfbb9f1c --- /dev/null +++ b/tests/hyperloglog_test.go @@ -0,0 +1,58 @@ +package tests + +// All commands related to Hyperloglog are part of this test class +// PFADD, PFCOUNT, PFMERGE, PFDEBUG, PFSELFTEST etc. +import ( + "gotest.tools/v3/assert" + "testing" +) + +func TestHyperLogLogCommands(t *testing.T) { + conn := getLocalConnection() + defer conn.Close() + + testCases := []struct { + name string + commands []string + expected []interface{} + }{ + { + name: "PFADD with one key-value pair", + commands: []string{"PFADD hll0 v1", "PFCOUNT hll0"}, + expected: []interface{}{int64(1), int64(1)}, + }, + { + name: "PFADD with multiple key-value pairs", + commands: []string{"PFADD hll a b c d e f g", "PFCOUNT hll"}, + expected: []interface{}{int64(1), int64(7)}, + }, + { + name: "PFADD with duplicate key-value pairs", + commands: []string{"PFADD hll1 foo bar zap", "PFADD hll1 zap zap zap", "PFADD hll1 foo bar", "PFCOUNT hll1"}, + expected: []interface{}{int64(1), int64(0), int64(0), int64(3)}, + }, + { + name: "PFADD with multiple keys", + commands: []string{ + "PFADD hll2 foo bar zap", "PFADD hll2 zap zap zap", "PFCOUNT hll2", + "PFADD some-other-hll 1 2 3", "PFCOUNT hll2 some-other-hll"}, + expected: []interface{}{int64(1), int64(0), int64(3), int64(1), int64(6)}, + }, + { + name: "PFADD with non-existing key", + commands: []string{ + "PFADD hll3 foo bar zap", "PFADD hll3 zap zap zap", "PFCOUNT hll3", + "PFCOUNT hll3 non-exist-hll", "PFADD some-new-hll abc", "PFCOUNT hll3 non-exist-hll some-new-hll"}, + expected: []interface{}{int64(1), int64(0), int64(3), int64(3), int64(1), int64(4)}, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + for i, cmd := range tc.commands { + result := fireCommand(conn, cmd) + assert.DeepEqual(t, tc.expected[i], result) + } + }) + } +}