Skip to content

Commit

Permalink
Support concurrent clients in kvstore
Browse files Browse the repository at this point in the history
This allows us to have better parity with the Java kvstore benchmark,
specifically the gson one, which has a semaphore limiting concurrency of
clients.
  • Loading branch information
FinleyMcIlwaine committed Oct 16, 2024
1 parent a0b2d70 commit 6ee8107
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 26 deletions.
60 changes: 37 additions & 23 deletions kvstore/KVStore/Client.hs
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
module KVStore.Client (runKeyValueClient) where

import Control.Concurrent
import Control.Exception
import Control.Monad
import Data.ByteString (ByteString)
import Data.ByteString qualified as BS
import Data.IORef
import Data.Maybe
import System.Timeout
import Text.Printf

Expand Down Expand Up @@ -60,11 +62,11 @@ statsTotal stats = sum [
, statsNumDelete stats
]

incNumCreate, incNumUpdate, incNumRetrieve, incNumDelete :: Stats -> Stats
incNumCreate stats = stats{statsNumCreate = statsNumCreate stats + 1}
incNumUpdate stats = stats{statsNumUpdate = statsNumUpdate stats + 1}
incNumRetrieve stats = stats{statsNumRetrieve = statsNumRetrieve stats + 1}
incNumDelete stats = stats{statsNumDelete = statsNumDelete stats + 1}
incNumCreate, incNumUpdate, incNumRetrieve, incNumDelete :: Stats -> (Stats, ())
incNumCreate stats = (stats{statsNumCreate = statsNumCreate stats + 1}, ())
incNumUpdate stats = (stats{statsNumUpdate = statsNumUpdate stats + 1}, ())
incNumRetrieve stats = (stats{statsNumRetrieve = statsNumRetrieve stats + 1}, ())
incNumDelete stats = (stats{statsNumDelete = statsNumDelete stats + 1}, ())

showStats :: Cmdline -> Stats -> String
showStats Cmdline{cmdDuration} stats = unlines [
Expand Down Expand Up @@ -94,9 +96,11 @@ client Cmdline{
, cmdSecure
, cmdDisableTcpNoDelay
, cmdPingRateLimit
, cmdSemaphoreLimit
} statsVar = do
knownKeys <- RandomAccessSet.new
random <- RandomGen.new
limiter <- newQSem (fromMaybe 1 cmdSemaphoreLimit)

withConnection params server $ \conn -> do
let kvstore :: KVStore
Expand All @@ -105,24 +109,9 @@ client Cmdline{
| otherwise = Protobuf.client conn

forever $ do
-- Pick a random CRUD action to take
command <- RandomGen.nextInt random 4

if command == 0 then do
doCreate kvstore knownKeys
modifyIORef' statsVar incNumCreate
else do
-- If we don't know about any keys, retry with a new random action
noKnownKeys <- RandomAccessSet.isEmpty knownKeys
unless noKnownKeys $ do
case command of
1 -> do doRetrieve kvstore knownKeys
modifyIORef' statsVar incNumRetrieve
2 -> do doUpdate kvstore knownKeys
modifyIORef' statsVar incNumUpdate
3 -> do doDelete kvstore knownKeys
modifyIORef' statsVar incNumDelete
_ -> error "impossible"
waitQSem limiter
forkIO $
doRandomCRUD random knownKeys kvstore `finally` signalQSem limiter
where
params :: ConnParams
params = def {
Expand Down Expand Up @@ -151,6 +140,31 @@ client Cmdline{
, addressAuthority = Nothing
}

doRandomCRUD ::
RandomGen.RandomGen
-> RandomAccessSet Key
-> KVStore
-> IO ()
doRandomCRUD random knownKeys kvstore = do
-- Pick a random CRUD action to take
command <- RandomGen.nextInt random 4

if command == 0 then do
doCreate kvstore knownKeys
atomicModifyIORef' statsVar incNumCreate
else do
-- If we don't know about any keys, retry with a new random action
noKnownKeys <- RandomAccessSet.isEmpty knownKeys
unless noKnownKeys $ do
case command of
1 -> do doRetrieve kvstore knownKeys
atomicModifyIORef' statsVar incNumRetrieve
2 -> do doUpdate kvstore knownKeys
atomicModifyIORef' statsVar incNumUpdate
3 -> do doDelete kvstore knownKeys
atomicModifyIORef' statsVar incNumDelete
_ -> error "impossible"

{-------------------------------------------------------------------------------
Access the various server features
-------------------------------------------------------------------------------}
Expand Down
8 changes: 8 additions & 0 deletions kvstore/KVStore/Cmdline.hs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ data Cmdline = Cmdline {
, cmdSecure :: Bool
, cmdDisableTcpNoDelay :: Bool
, cmdPingRateLimit :: Maybe Int
, cmdSemaphoreLimit :: Maybe Int
}

data Mode =
Expand Down Expand Up @@ -77,6 +78,13 @@ parseCmdline =
, Opt.help "Allow at most this many pings per second from the peer"
]
)
<*> (Opt.optional $
Opt.option Opt.auto $ mconcat [
Opt.long "semaphore-limit"
, Opt.metavar "NUM_TOKENS"
, Opt.help "Allow at most this many clients to run concurrently"
]
)

parseMode :: Parser Mode
parseMode = asum [
Expand Down
13 changes: 10 additions & 3 deletions kvstore/KVStore/Util/RandomAccessSet.hs
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,9 @@ isEmpty ras = withRAS ras $ Set.null
getRandomKey :: RandomAccessSet a -> IO a
getRandomKey ras = do
gen <- Random.new
size <- withRAS ras $ Set.size
n <- Random.nextInt gen size
withRAS ras $ Set.elemAt n
withRASIO ras $ \s -> do
n <- Random.nextInt gen (Set.size s)
return $ Set.elemAt n s

add :: Ord a => RandomAccessSet a -> a -> IO Bool
add ras value =
Expand All @@ -67,3 +67,10 @@ modifyRAS ras f = modifyMVar (unwrap ras) $ return . f

modifyRAS_ :: RandomAccessSet a -> (Set a -> Set a) -> IO ()
modifyRAS_ ras f = modifyMVar_ (unwrap ras) $ return . f

{-------------------------------------------------------------------------------
Internal: wrap IO operations
-------------------------------------------------------------------------------}

withRASIO :: RandomAccessSet a -> (Set a -> IO b) -> IO b
withRASIO ras = withMVar (unwrap ras)

0 comments on commit 6ee8107

Please sign in to comment.