Skip to content

Commit

Permalink
DiceDB#238: PFADD/PFCOUNT implementation (DiceDB#422)
Browse files Browse the repository at this point in the history
  • Loading branch information
lucifercr07 authored Sep 3, 2024
1 parent 78402ea commit ed7ae47
Show file tree
Hide file tree
Showing 7 changed files with 204 additions and 15 deletions.
40 changes: 29 additions & 11 deletions core/commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ var (
jsonclearCmdMeta = DiceCmdMeta{
Name: "JSON.CLEAR",
Info: `JSON.CLEAR key [path]
Returns an integer reply specifying the number ofmatching JSON arrays and
Returns an integer reply specifying the number ofmatching JSON arrays and
objects cleared +number of matching JSON numerical values zeroed.
Error reply: If the number of arguments is incorrect the key doesn't exist.`,
Eval: evalJSONCLEAR,
Expand Down Expand Up @@ -665,22 +665,22 @@ var (
bitposCmdMeta = DiceCmdMeta{
Name: "BITPOS",
Info: `BITPOS returns the position of the first bit set to 1 or 0 in a string
The position is returned, thinking of the string as an array of bits from left to right,
where the first byte's most significant bit is at position 0, the second byte's most significant
The position is returned, thinking of the string as an array of bits from left to right,
where the first byte's most significant bit is at position 0, the second byte's most significant
bit is at position 8, and so forth.
By default, all the bytes contained in the string are examined. It is possible to look for bits only in a
specified interval passing the additional arguments start and end (it is possible to just pass start,
By default, all the bytes contained in the string are examined. It is possible to look for bits only in a
specified interval passing the additional arguments start and end (it is possible to just pass start,
the operation will assume that the end is the last byte of the string).
By default, the range is interpreted as a range of bytes and not a range of bits, so start=0 and end=2 means
By default, the range is interpreted as a range of bytes and not a range of bits, so start=0 and end=2 means
to look at the first three bytes.
You can use the optional BIT modifier to specify that the range should be interpreted as a range of bits. So
You can use the optional BIT modifier to specify that the range should be interpreted as a range of bits. So
start=0 and end=2 means to look at the first three bits.
Note that bit positions are returned always as absolute values starting from bit zero even when start and end
Note that bit positions are returned always as absolute values starting from bit zero even when start and end
are used to specify a range.
The start and end can contain negative values in order to index bytes starting from the end of the string,
where -1 is the last byte, -2 is the penultimate, and so forth. When BIT is specified, -1 is the last bit, -2
The start and end can contain negative values in order to index bytes starting from the end of the string,
where -1 is the last byte, -2 is the penultimate, and so forth. When BIT is specified, -1 is the last bit, -2
is the penultimate, and so forth.
Returns
Returns
RESP encoded integer indicating the position of the first bit set to 1 or 0 according to the request.
RESP encoded integer if we look for clear bits and the string only contains bits set to 1, the function returns
the first bit not part of the string on the right.
Expand Down Expand Up @@ -746,6 +746,22 @@ var (
Arity: -2,
KeySpecs: KeySpecs{BeginIndex: 1},
}
pfAddCmdMeta = DiceCmdMeta{
Name: "PFADD",
Info: `PFADD key [element [element ...]]
Adds elements to a HyperLogLog key. Creates the key if it doesn't exist.`,
Eval: evalPFADD,
Arity: -2,
KeySpecs: KeySpecs{BeginIndex: 1},
}
pfCountCmdMeta = DiceCmdMeta{
Name: "PFCOUNT",
Info: `PFCOUNT key [key ...]
Returns the approximated cardinality of the set(s) observed by the HyperLogLog key(s).`,
Eval: evalPFCOUNT,
Arity: -2,
KeySpecs: KeySpecs{BeginIndex: 1},
}
)

func init() {
Expand Down Expand Up @@ -832,4 +848,6 @@ func init() {
diceCmds["SCARD"] = scardCmdMeta
diceCmds["SDIFF"] = sdiffCmdMeta
diceCmds["SINTER"] = sinterCmdMeta
diceCmds["PFADD"] = pfAddCmdMeta
diceCmds["PFCOUNT"] = pfCountCmdMeta
}
1 change: 1 addition & 0 deletions core/diceerrors/diceerrors.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ const (
NoKeyErr = "no such key"
ErrDefault = "-ERR %s"
WrongTypeErr = "-WRONGTYPE Operation against a key holding the wrong kind of value"
InvalidHllErr = "-INVALIDOBJ Corrupted HLL object detected"
WorkerNotFoundErr = "worker with ID %s not found"
)

Expand Down
69 changes: 65 additions & 4 deletions core/eval.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,14 @@ import (
"syscall"
"time"

"github.com/cockroachdb/swiss"
"github.com/dicedb/dice/core/auth"
"github.com/dicedb/dice/core/comm"

"github.com/axiomhq/hyperloglog"
"github.com/bytedance/sonic"
"github.com/charmbracelet/log"
"github.com/cockroachdb/swiss"
"github.com/dicedb/dice/config"
"github.com/dicedb/dice/core/auth"
"github.com/dicedb/dice/core/bit"
"github.com/dicedb/dice/core/comm"
"github.com/dicedb/dice/core/diceerrors"
"github.com/dicedb/dice/internal/constants"
"github.com/dicedb/dice/server/utils"
Expand Down Expand Up @@ -2945,3 +2945,64 @@ func evalSINTER(args []string, store *Store) []byte {
})
return Encode(members, false)
}

// PFADD Adds all the element arguments to the HyperLogLog data structure stored at the variable
// name specified as first argument.
//
// Returns:
// If the approximated cardinality estimated by the HyperLogLog changed after executing the command,
// returns 1, otherwise 0 is returned.
func evalPFADD(args []string, store *Store) []byte {
if len(args) < 1 {
return diceerrors.NewErrArity("PFADD")
}

key := args[0]
obj := store.Get(key)

// If key doesn't exist prior initial cardinality changes hence return 1
if obj == nil {
hll := hyperloglog.New()
for _, arg := range args[1:] {
hll.Insert([]byte(arg))
}

obj = store.NewObj(hll, -1, ObjTypeString, ObjEncodingRaw)

store.Put(key, obj)
return Encode(1, false)
}

existingHll := obj.Value.(*hyperloglog.Sketch)
initialCardinality := existingHll.Estimate()
for _, arg := range args[1:] {
existingHll.Insert([]byte(arg))
}

if newCardinality := existingHll.Estimate(); initialCardinality != newCardinality {
return Encode(1, false)
}

return Encode(0, false)
}

func evalPFCOUNT(args []string, store *Store) []byte {
if len(args) < 1 {
return diceerrors.NewErrArity("PFCOUNT")
}

var unionHll = hyperloglog.New()

for _, arg := range args {
obj := store.Get(arg)
if obj != nil {
currKeyHll := obj.Value.(*hyperloglog.Sketch)
err := unionHll.Merge(currKeyHll)
if err != nil {
return diceerrors.NewErrWithMessage(diceerrors.InvalidHllErr)
}
}
}

return Encode(unionHll.Estimate(), false)
}
45 changes: 45 additions & 0 deletions core/eval_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"errors"
"fmt"
"github.com/axiomhq/hyperloglog"
"strconv"
"strings"
"testing"
Expand Down Expand Up @@ -59,6 +60,8 @@ func TestEval(t *testing.T) {
testEvalDbsize(t, store)
testEvalGETSET(t, store)
testEvalHSET(t, store)
testEvalPFADD(t, store)
testEvalPFCOUNT(t, store)
}

func testEvalPING(t *testing.T, store *Store) {
Expand Down Expand Up @@ -930,6 +933,48 @@ func testEvalGETSET(t *testing.T, store *Store) {
runEvalTests(t, tests, evalGETSET, store)
}

func testEvalPFADD(t *testing.T, store *Store) {
tests := map[string]evalTestCase{
"nil value": {input: nil, output: []byte("-ERR wrong number of arguments for 'pfadd' command\r\n")},
"empty array": {input: []string{}, output: []byte("-ERR wrong number of arguments for 'pfadd' command\r\n")},
"one value": {input: []string{"KEY"}, output: []byte(":1\r\n")},
"key val pair": {input: []string{"KEY", "VAL"}, output: []byte(":1\r\n")},
"key multiple values": {input: []string{"KEY", "VAL", "VAL1", "VAL2"}, output: []byte(":1\r\n")},
}

runEvalTests(t, tests, evalPFADD, store)
}

func testEvalPFCOUNT(t *testing.T, store *Store) {
tests := map[string]evalTestCase{
"PFCOUNT with empty arg": {
input: []string{},
output: []byte("-ERR wrong number of arguments for 'pfcount' command\r\n"),
},
"PFCOUNT key not exists": {
input: []string{"HELLO"},
output: Encode(0, false),
},
"PFCOUNT key exists": {
setup: func() {
key := "EXISTING_KEY"
value := hyperloglog.New()
value.Insert([]byte("VALUE"))
obj := &Obj{
Value: value,
LastAccessedAt: uint32(time.Now().Unix()),
}
store.store.Put(key, obj)
store.keypool.Put(key, &key)
},
input: []string{"EXISTING_KEY"},
output: Encode(1, false),
},
}

runEvalTests(t, tests, evalPFCOUNT, store)
}

func runEvalTests(t *testing.T, tests map[string]evalTestCase, evalFunc func([]string, *Store) []byte, store *Store) {
for name, tc := range tests {
t.Run(name, func(t *testing.T) {
Expand Down
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ require (
github.com/charmbracelet/lipgloss v0.10.0 // indirect
github.com/cloudwego/base64x v0.1.4 // indirect
github.com/cloudwego/iasm v0.2.0 // indirect
github.com/dgryski/go-metro v0.0.0-20180109044635-280f6062b5bc // indirect
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
github.com/go-logfmt/logfmt v0.6.0 // indirect
github.com/google/go-cmp v0.5.9 // indirect
Expand All @@ -28,6 +29,7 @@ require (
)

require (
github.com/axiomhq/hyperloglog v0.2.0
github.com/bytedance/sonic v1.12.1
github.com/charmbracelet/log v0.4.0
github.com/cockroachdb/swiss v0.0.0-20240612210725-f4de07ae6964
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
github.com/aclements/go-perfevent v0.0.0-20240301234650-f7843625020f h1:JjxwchlOepwsUWcQwD2mLUAGE9aCp0/ehy6yCHFBOvo=
github.com/aclements/go-perfevent v0.0.0-20240301234650-f7843625020f/go.mod h1:tMDTce/yLLN/SK8gMOxQfnyeMeCg8KGzp0D1cbECEeo=
github.com/axiomhq/hyperloglog v0.2.0 h1:u1XT3yyY1rjzlWuP6NQIrV4bRYHOaqZaovqjcBEvZJo=
github.com/axiomhq/hyperloglog v0.2.0/go.mod h1:GcgMjz9gaDKZ3G0UMS6Fq/VkZ4l7uGgcJyxA7M+omIM=
github.com/aymanbagabas/go-osc52/v2 v2.0.1 h1:HwpRHbFMcZLEVr42D4p7XBqjyuxQH5SMiErDT4WkJ2k=
github.com/aymanbagabas/go-osc52/v2 v2.0.1/go.mod h1:uYgXzlJ7ZpABp8OJ+exZzJJhRNQ2ASbcXHWsFqH8hp8=
github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs=
Expand All @@ -26,6 +28,8 @@ github.com/cockroachdb/swiss v0.0.0-20240612210725-f4de07ae6964/go.mod h1:yBRu/c
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/dgryski/go-metro v0.0.0-20180109044635-280f6062b5bc h1:8WFBn63wegobsYAX0YjD+8suexZDga5CctH4CCTx2+8=
github.com/dgryski/go-metro v0.0.0-20180109044635-280f6062b5bc/go.mod h1:c9O8+fpSOX1DM8cPNSkX/qsBWdkD4yd2dpciOWQjpBw=
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78=
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc=
github.com/dicedb/go-dice v0.0.0-20240820180649-d97f15fca831 h1:Cqyj9WCtoobN6++bFbDSe27q94SPwJD9Z0wmu+SDRuk=
Expand Down
58 changes: 58 additions & 0 deletions tests/hyperloglog_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package tests

// All commands related to Hyperloglog are part of this test class
// PFADD, PFCOUNT, PFMERGE, PFDEBUG, PFSELFTEST etc.
import (
"gotest.tools/v3/assert"
"testing"
)

func TestHyperLogLogCommands(t *testing.T) {
conn := getLocalConnection()
defer conn.Close()

testCases := []struct {
name string
commands []string
expected []interface{}
}{
{
name: "PFADD with one key-value pair",
commands: []string{"PFADD hll0 v1", "PFCOUNT hll0"},
expected: []interface{}{int64(1), int64(1)},
},
{
name: "PFADD with multiple key-value pairs",
commands: []string{"PFADD hll a b c d e f g", "PFCOUNT hll"},
expected: []interface{}{int64(1), int64(7)},
},
{
name: "PFADD with duplicate key-value pairs",
commands: []string{"PFADD hll1 foo bar zap", "PFADD hll1 zap zap zap", "PFADD hll1 foo bar", "PFCOUNT hll1"},
expected: []interface{}{int64(1), int64(0), int64(0), int64(3)},
},
{
name: "PFADD with multiple keys",
commands: []string{
"PFADD hll2 foo bar zap", "PFADD hll2 zap zap zap", "PFCOUNT hll2",
"PFADD some-other-hll 1 2 3", "PFCOUNT hll2 some-other-hll"},
expected: []interface{}{int64(1), int64(0), int64(3), int64(1), int64(6)},
},
{
name: "PFADD with non-existing key",
commands: []string{
"PFADD hll3 foo bar zap", "PFADD hll3 zap zap zap", "PFCOUNT hll3",
"PFCOUNT hll3 non-exist-hll", "PFADD some-new-hll abc", "PFCOUNT hll3 non-exist-hll some-new-hll"},
expected: []interface{}{int64(1), int64(0), int64(3), int64(3), int64(1), int64(4)},
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
for i, cmd := range tc.commands {
result := fireCommand(conn, cmd)
assert.DeepEqual(t, tc.expected[i], result)
}
})
}
}

0 comments on commit ed7ae47

Please sign in to comment.