Skip to content

Commit

Permalink
WIP cleaning up
Browse files Browse the repository at this point in the history
  • Loading branch information
expede committed Jul 9, 2021
1 parent 3037757 commit 35fbec6
Show file tree
Hide file tree
Showing 3 changed files with 79 additions and 60 deletions.
5 changes: 3 additions & 2 deletions fission-core/library/Fission/Web/Async.hs
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,9 @@ waitAnySuccessCatchCancel asyncRefs = do

-- | Wait for all cluster peers to complete.
waitAll :: MonadIO m => NonEmpty (Async (Either ClientError a)) -> m (NonEmpty (Either ClientError a))
waitAll asyncRefs = liftIO $ forConcurrently asyncRefs \ref ->
normalizeResult <$> waitCatch ref
waitAll asyncRefs =
liftIO $ forConcurrently asyncRefs \ref ->
normalizeResult <$> waitCatch ref

normalizeResult :: Either SomeException (Either ClientError a) -> Either ClientError a
normalizeResult = \case
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import qualified Network.IPFS.Add.Error as IPFS.Pin
import Network.IPFS.CID.Types
import qualified Network.IPFS.Get.Error as IPFS.Stat

import Servant.API
import Servant.Server

import Fission.Prelude hiding (on)
Expand All @@ -17,6 +18,9 @@ import Fission.URL
import Fission.Web.Server.Error.ActionNotAuthorized.Types
import Fission.Web.Server.Models

-- FIXME onlu the bytesrecieved; extract out!
import Fission.Web.API.App.Update.Streaming.Types

type Errors' = OpenUnion
'[ NotFound App
, NotFound AppDomain
Expand All @@ -42,3 +46,10 @@ class Monad m => Modifier m where
-> Bool -- ^ Flag: copy data (default yes)
-> UTCTime -- ^ Now
-> m (Either Errors' AppId)

setCIDStreaming ::
UserId
-> URL
-> CID
-> UTCTime
-> m (SourceIO BytesReceived)
123 changes: 65 additions & 58 deletions fission-web-server/library/Fission/Web/Server/Handler/App/Update.hs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import Fission.Web.Server.Error as Web.Error

import qualified RIO.List as List
import qualified RIO.NonEmpty as NonEmpty
import qualified RIO.Text as Text

import Servant.Types.SourceT as S
import qualified Streamly.Prelude as Streamly
Expand Down Expand Up @@ -71,79 +72,85 @@ updateStreaming ::
=> ServerT API.App.StreamingUpdate m
updateStreaming url newCID Authorization {about = Entity userId _} = do
now <- currentTime
status <- liftIO . newTVarIO $ Uploading 0 -- FIXME switch to mvar?
status <- liftIO $ newTVarIO . Right $ BytesReceived 0
pseudoStreams <- streamCluster $ (Streaming.client $ Proxy @PinComplete) newCID (Just True)

let (asyncRefs, chans) = NonEmpty.unzip pseudoStreams
asyncListeners <- foo chans status

let
source :: Streamly.SerialT IO (Maybe BytesReceived)
source =
Streamly.repeatM do
sleepThread . Seconds $ Milli @Natural 500
readTVarIO status >>= \case
Uploading byteCount -> return . Just $ BytesReceived byteCount
_ -> return $ Nothing

source
|> Streamly.takeWhile isJust
asyncListeners <- fanIn chans status

_ <- liftIO ( pure |> withAsync do
results <- waitAll asyncRefs
when (all isLeft results) $ atomically do
let
result' =
case NonEmpty.head results of
Right PinStatus {progress} ->
case progress of
Nothing -> Right $ BytesReceived 0
Just bytes -> Right $ BytesReceived bytes

Left err
-> Left err

writeTVar status result')

Streamly.repeatM (readTVarIO status)
|> Streamly.delay 0.500
|> Streamly.takeWhile isRight
|> Streamly.finally do
forM_ asyncListeners cancel
forM_ asyncRefs cancel
readTVarIO status
|> Streamly.take 1
|> Streamly.takeWhile isLeft
|> asSerial
|> toSourceIO
|> mapStepT go
|> mapStepT simplify
|> pure

where
go :: Monad m => StepT m (Maybe BytesReceived) -> StepT m BytesReceived
go = \case
S.Yield Nothing more -> go more
S.Yield (Just byteCount) more -> S.Yield byteCount (go more)
simplify :: Monad m => StepT m (Either ClientError BytesReceived) -> StepT m BytesReceived
simplify = \case
S.Yield (Right byteCount) more -> S.Yield byteCount (simplify more)
S.Yield (Left err) _ -> S.Error (show err)

S.Skip more -> go more
S.Effect action -> S.Effect $ fmap go action
S.Error msg -> S.Error msg
S.Stop -> S.Stop
S.Skip more -> simplify more
S.Effect action -> S.Effect $ fmap simplify action
S.Error msg -> S.Error msg
S.Stop -> S.Stop

foo ::
fanIn ::
MonadIO m
=> NonEmpty (TChan (Either ClientError PinStatus))
-> TVar UploadStatus
-> TVar (Either ClientError BytesReceived)
-> m (NonEmpty (Async ()))
foo chans statusVar =
forM chans \statusChan ->
liftIO $ withAsync (atomically $ fanIn statusChan statusVar) pure

fanIn :: TChan (Either ClientError PinStatus) -> TVar UploadStatus -> STM ()
fanIn channel status = go
where
go :: STM ()
go =
readTVar status >>= \case
Done ->
return ()

Failed ->
fanIn chans statusVar =
forM chans \statusChan -> do
liftIO $ withAsync (atomically $ reportBytes statusChan statusVar) pure

asSerial :: Streamly.Serial a -> Streamly.Serial a
asSerial a = a

reportBytes ::
TChan (Either ClientError PinStatus)
-> TVar (Either ClientError BytesReceived)
-> STM ()
reportBytes channel status =
readTVar status >>= \case
Left _ ->
return ()

Right (BytesReceived lastMax) ->
readTChan channel >>= \case
Left _ ->
return ()

Uploading lastMax ->
readTChan channel >>= \case
Left err ->
undefined -- FIXME

Right PinStatus {progress} ->
case progress of
Nothing ->
return ()

Just bytesHere -> do -- FIXME I think it's bytes? Maybe blocks?
when (bytesHere > lastMax) do
writeTVar status $ Uploading bytesHere
Right PinStatus {progress} ->
case progress of
Nothing ->
return ()

go
Just bytesHere -> do -- FIXME I think it's bytes? Maybe blocks?
when (bytesHere > lastMax) do
writeTVar status . Right $ BytesReceived bytesHere

data UploadStatus
= Failed
| Uploading Natural
| Done
reportBytes channel status

0 comments on commit 35fbec6

Please sign in to comment.