Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dynamic pool implementation and process separation per worker id #10

Open
wants to merge 14 commits into
base: main
Choose a base branch
from
45 changes: 36 additions & 9 deletions client/app/Client.hs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ import Data.Int (Int32)
import Data.List (isPrefixOf, lookup, partition, stripPrefix)
import Data.Monoid (First (..))
import Message
( Msg (..),
( Id (..),
Msg (..),
Request (..),
Response (..),
recvMsg,
Expand All @@ -33,26 +34,52 @@ import System.Environment (getArgs, getEnv, getEnvironment)
import System.Exit (exitFailure)
import System.IO (hFlush, hPutStrLn, stderr, stdout)

data WorkerConfig = WorkerConfig
{ workerConfigSocket :: String,
workerConfigId :: Maybe Id,
workerConfigClose :: Bool
}
deriving (Show)

splitArgs :: [String] -> ([String], [String])
splitArgs = partition ("--worker-" `isPrefixOf`)

getWorkerConfig :: [String] -> Maybe WorkerConfig
getWorkerConfig args = do
socket <- getFirst $ foldMap (First . stripPrefix "--worker-socket=") args
let mid = getFirst $ foldMap (First . stripPrefix "--worker-id=") args
willClose = any ("--worker-close" `isPrefixOf`) args
pure WorkerConfig
{ workerConfigSocket = socket,
workerConfigId = Id <$> mid,
workerConfigClose = willClose
}

main :: IO ()
main = do
args <- getArgs
let (workerArgs, ghcArgs) = splitArgs args
mSocketPath = getFirst $ foldMap (First . stripPrefix "--worker-socket=") workerArgs
case mSocketPath of
mConf = getWorkerConfig workerArgs
hPutStrLn stderr (show mConf)
hPutStrLn stderr (show args)
hFlush stderr
case mConf of
Nothing -> do
hPutStrLn stderr "ghc-persistent-worker-client: Please set GHC_PERSISTENT_WORKER_SOCKET env variable with the socket file path."
hPutStrLn stderr "ghc-persistent-worker-client: Please pass --worker-socket=(socket file path)."
exitFailure
Just sockPath -> do
Just conf -> do
let sockPath = workerConfigSocket conf
mid = workerConfigId conf
willClose = workerConfigClose conf
env <- getEnvironment
process sockPath env ghcArgs
process sockPath mid willClose env ghcArgs

process :: FilePath -> [(String, String)] -> [String] -> IO ()
process socketPath env args = runClient socketPath $ \s -> do
process :: FilePath -> Maybe Id -> Bool -> [(String, String)] -> [String] -> IO ()
process socketPath mid willClose env args = runClient socketPath $ \s -> do
let req = Request
{ requestEnv = env,
{ requestWorkerId = mid,
requestWorkerClose = willClose,
requestEnv = env,
requestArgs = args
}
let msg = wrapMsg req
Expand Down
15 changes: 7 additions & 8 deletions comm/src/Message.hs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ module Message
wrapMsg,
unwrapMsg,
--
Id (..),
Request (..),
Response (..),
) where
Expand Down Expand Up @@ -76,8 +77,13 @@ wrapMsg x =
unwrapMsg :: (Binary a) => Msg -> a
unwrapMsg (Msg _n bs) = decode (L.fromStrict bs)

newtype Id = Id String
deriving (Show, Eq, Binary)

data Request = Request
{ requestEnv :: [(String, String)],
{ requestWorkerId :: Maybe Id,
requestWorkerClose :: Bool,
requestEnv :: [(String, String)],
requestArgs :: [String]
}
deriving (Show, Generic)
Expand All @@ -92,10 +98,3 @@ data Response = Response
deriving (Show, Generic)

instance Binary Response

{-
newtype ConsoleOutput = ConsoleOutput
{ unConsoleOutput :: [String]
}
deriving Binary
-}
2 changes: 2 additions & 0 deletions plugin/src/GHCPersistentWorkerPlugin.hs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ workerMain flags = do
liftIO $ do
mapM_ (\_ -> hPutStrLn stderr "=================================") [1..5]
time <- getCurrentTime
hPutStrLn stderr $ "worker: " ++ (show n)
hPutStrLn stderr (show time)
mapM_ (\_ -> hPutStrLn stderr "=================================") [1..5]
--
Expand All @@ -74,6 +75,7 @@ workerMain flags = do
liftIO $ do
mapM_ (\_ -> hPutStrLn stderr "|||||||||||||||||||||||||||||||||") [1..5]
time <- getCurrentTime
hPutStrLn stderr $ "worker: " ++ (show n)
hPutStrLn stderr (show time)
mapM_ (\_ -> hPutStrLn stderr "|||||||||||||||||||||||||||||||||") [1..5]
--
Expand Down
94 changes: 94 additions & 0 deletions server/app/Pool.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
{-# OPTIONS_GHC -fno-warn-incomplete-uni-patterns #-}

module Pool
( HandleSet (..),
Pool (..),
dumpStatus,
assignJob,
finishJob,
removeWorker,
) where

import Control.Concurrent.STM (STM, TVar, atomically, readTVar, retry, writeTVar)
import qualified Data.Foldable as F
import Data.IntMap (IntMap)
import qualified Data.IntMap as IM
import qualified Data.List as List
import Message (Id)
import System.IO (Handle, hFlush, hPrint, hPutStrLn, stdout)
import System.Process (ProcessHandle, terminateProcess)

data HandleSet = HandleSet
{ handleProcess :: ProcessHandle,
handleArgIn :: Handle,
handleMsgOut :: Handle
}

data Pool = Pool
{ poolLimit :: Int,
poolNext :: Int,
poolStatus :: IntMap (Bool, Maybe Id),
poolHandles :: [(Int, HandleSet)]
}

dumpStatus :: TVar Pool -> IO ()
dumpStatus ref = do
pool <- atomically (readTVar ref)
hPutStrLn stdout $ "poolLimit = " ++ show (poolLimit pool)
mapM_ (hPrint stdout) $ IM.toAscList (poolStatus pool)
hFlush stdout

getAssignableWorker :: IntMap (Bool, Maybe Id) -> Maybe Id -> Maybe (Int, (Bool, Maybe Id))
getAssignableWorker workers mid' = List.find (isAssignable . snd) . IM.toAscList $ workers
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FWIW, Foldable.find @IntMap searches in ascending key order 🙂

where
isAssignable (b, mid)
| not b = case (mid, mid') of
(Nothing, _) -> True
(Just _, Nothing) -> True
(Just id'', Just id') -> id' == id''
| otherwise = False

assignJob ::
TVar Pool ->
Maybe Id ->
-- | Right assigned, Left new id that will be used for new spawned worker process.
STM (Either Int (Int, HandleSet))
assignJob ref mid' = do
pool <- readTVar ref
let workers = poolStatus pool
let m = getAssignableWorker workers mid'
case m of
Nothing -> do
let nRunningJobs = length $ filter (\(b, _) -> b) $ F.toList workers
if (nRunningJobs >= poolLimit pool)
then retry
else pure (Left nRunningJobs)
Just (i, _) -> do
let upd (_, Nothing) = Just (True, mid')
upd (_, Just id'') = Just (True, Just id'')
!workers' = IM.update upd i workers
Just hset = List.lookup i (poolHandles pool)
writeTVar ref (pool {poolStatus = workers'})
pure $ Right (i, hset)

finishJob :: TVar Pool -> Int -> STM ()
finishJob ref i = do
pool <- readTVar ref
let workers = poolStatus pool
!workers' = IM.update (\(_, m) -> Just (False, m)) i workers
writeTVar ref (pool {poolStatus = workers'})

removeWorker :: TVar Pool -> Id -> IO ()
removeWorker ref id' = do
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe mask this function? not sure if these processes get cleaned up reliably on interrupt

dismissedHandles <-
atomically $ do
pool <- readTVar ref
let workers = poolStatus pool
(dismissed, remained) = IM.partition (\(_, m) -> m == Just id') workers
let ks = IM.keys dismissed
(dismissedHandles, remainedHandles) = List.partition ((`elem` ks) . fst) $ poolHandles pool
writeTVar ref (pool {poolStatus = remained, poolHandles = remainedHandles})
pure (fmap (handleProcess . snd) dismissedHandles)

mapM_ terminateProcess dismissedHandles
-- atomically $ removeWorker ref id'
Loading