Skip to content

Commit

Permalink
Streaming backend
Browse files Browse the repository at this point in the history
  • Loading branch information
expede committed Jul 8, 2021
1 parent 641a4cb commit 840fdb8
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 63 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,13 @@ import Fission.URL.Types
import qualified Fission.Web.API.Auth.Types as Auth
import Fission.Web.API.Prelude





import Servant.Types.SourceT as S
import Streamly.Prelude

type StreamingUpdate
= Summary "" -- FIXME
:> Description "" -- FIXME
Expand All @@ -19,4 +26,17 @@ type StreamingUpdate
--
:> Auth.HigherOrder
-- :> Stream 'PATCH 200 NewlineFraming JSON (SourceIO Natural) -- FIXME better type
:> Stream 'PATCH 200 NewlineFraming JSON (SourceIO Natural) -- FIXME better type
-- :> Stream 'PATCH 200 NewlineFraming JSON (SourceIO Natural) -- FIXME better type
:> Stream 'PATCH 200 NewlineFraming JSON (SourceIO UploadStatus) -- FIXME better type


isUploading :: UploadStatus -> Bool
isUploading = \case
Uploading _ -> True
_ -> False


data UploadStatus
= Failed
| Uploading Natural
| Done
1 change: 1 addition & 0 deletions fission-web-api/package.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ dependencies:

## Data ##
- swagger2
- streamly
- uuid

## Fission
Expand Down
101 changes: 39 additions & 62 deletions fission-web-server/library/Fission/Web/Server/Handler/App/Update.hs
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,10 @@ import Fission.Web.Server.Error as Web.Error

-- 🌐

import qualified RIO.List as List
import qualified RIO.NonEmpty as NonEmpty

import Servant.Types.SourceT
import Servant.Types.SourceT as S
import qualified Streamly.Prelude as Streamly

import Network.IPFS.Client.Streaming.Pin
Expand All @@ -38,10 +39,13 @@ import Fission.Prelude
import Fission.Web.Async
import Fission.Web.Server.IPFS.Cluster.Class

import Fission.URL.Types




import Fission.Web.API.App.Update.Streaming.Types


update :: (MonadLogger m, MonadThrow m, MonadTime m, App.Modifier m) => ServerT API.App.Update m
update url newCID copyDataFlag Authorization {about = Entity userId _} = do
Expand All @@ -52,46 +56,48 @@ update url newCID copyDataFlag Authorization {about = Entity userId _} = do
copyFiles :: Bool
copyFiles = maybe True identity copyDataFlag

instance ToSourceIO a (Streamly.SerialT IO a) where
toSourceIO serialStream =
SourceT \k ->
k $ Effect do
cont <- Streamly.foldr folder Skip serialStream
return $ cont Stop
where
folder :: a -> (StepT IO a -> StepT IO a) -> (StepT IO a -> StepT IO a)
folder x contAcc = \nextCont -> contAcc (Yield x nextCont)

-- CID -> Authorization -> m (SourceT IO Natural)
updateStreaming ::
forall m .
( MonadIO m
, MonadLogger m
, MonadThrow m
, MonadTime m
, MonadBaseControl IO m
, App.Modifier m
, MonadIPFSCluster m PinStatus
, ServerT API.App.StreamingUpdate m ~ (URL -> CID -> Authorization -> m (SourceT IO UploadStatus))
)
=> ServerT API.App.StreamingUpdate m
updateStreaming url newCID Authorization {about = Entity userId _} = do
-- updateStreaming newCID Authorization {about = Entity userId _} = do
-- updateStreaming = do
now <- currentTime

pseudoStreams <- streamCluster $ (Streaming.client $ Proxy @PinComplete) undefined (Just True)
-- pseudoStreams <- streamCluster $ (Streaming.client $ Proxy @PinComplete) newCID (Just True)
now <- currentTime
status <- liftIO . newTVarIO $ Uploading 0 -- FIXME switch to mvar?
pseudoStreams <- streamCluster $ (Streaming.client $ Proxy @PinComplete) newCID (Just True)

let
asyncRefs = fst <$> pseudoStreams
chans = snd <$> pseudoStreams
(asyncRefs, chans) = NonEmpty.unzip pseudoStreams

status <- liftIO . newTVarIO $ Uploading 0
asyncUpdates <- for chans \statusChan -> liftIO $ withAsync (action statusChan status) pure

-- status
-- |> readTVarIO
-- |> liftIO
-- |> Streamly.repeatM
-- |> Streamly.takeWhile isUploading
-- |> Streamly.finally (cancel <$> asyncUpdates)
-- |> Streamly.fromSerial
-- |> toSourceIO

return undefined -- (source [1] :: SourceIO Natural)
let
source :: Streamly.SerialT IO UploadStatus
source = Streamly.repeatM (readTVarIO status)

isUploading :: UploadStatus -> Bool
isUploading = \case
Uploading _ -> True
_ -> False
source
|> Streamly.takeWhile isUploading
|> Streamly.finally (forM asyncRefs cancel)
|> toSourceIO
|> pure

action :: MonadIO m => TChan (Either ClientError PinStatus) -> TVar UploadStatus -> m ()
action channel status =
Expand Down Expand Up @@ -121,41 +127,12 @@ action channel status =

go

data UploadStatus
= Failed
| Uploading Natural
| Done

-- class ToSourceIO chunk a | a -> chunk where

instance MonadIO m => ToSourceIO a (Streamly.SerialT m a) where
-- toSourceIO :: SerialT a -> SourceIO a
toSourceIO serialStream = do
SourceT \k -> do
-- foldr :: Monad m => (a -> b -> b) -> b -> SerialT m a -> m b
k $ Streamly.foldr folder Skip serialStream
where
folder x acc = \more -> acc $ Yield x more

-- where
-- go ::
-- (forall x . ResourceT m x -> m x)
-- -> SerialT a
-- -> StepT IO a
-- go cont serialStep = undefined


-- instance m ~ IO => ConduitToSourceIO (ResourceT m) where
-- conduitToSourceIO (ConduitT con) =
-- S.SourceT $ \k ->
-- runResourceT $ withRunInIO $ \runRes ->
-- k (go runRes (con Done))
-- where
-- go :: (forall x. ResourceT m x -> m x)
-- -> Pipe i i o () (ResourceT m) ()
-- -> S.StepT IO o
-- go _ (Done ()) = S.Stop
-- go runRes (HaveOutput p o) = S.Yield o (go runRes p)
-- go runRes (NeedInput _ip up) = S.Skip (go runRes (up ()))
-- go runRes (PipeM m) = S.Effect $ runRes $ fmap (go runRes) m
-- go runRes (Leftover p _l) = S.Skip (go runRes p)
toSourceT :: Monad m => Streamly.SerialT m a -> SourceT m a
toSourceT serialStream =
SourceT \k ->
k $ Effect do
cont <- Streamly.foldr folder Skip serialStream
return $ cont Stop
where
folder :: a -> (StepT m a -> StepT m a) -> (StepT m a -> StepT m a)
folder x contAcc = \nextCont -> contAcc (Yield x nextCont)

0 comments on commit 840fdb8

Please sign in to comment.