Skip to content

Commit

Permalink
Beginning cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
expede committed Jul 8, 2021
1 parent 840fdb8 commit 3037757
Show file tree
Hide file tree
Showing 7 changed files with 160 additions and 93 deletions.
3 changes: 2 additions & 1 deletion fission-web-api/library/Fission/Web/API/App/Types.hs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@ import Fission.Web.API.Prelude
import Fission.Web.API.App.Create.Types
import Fission.Web.API.App.Destroy.Types
import Fission.Web.API.App.Index.Types
import Fission.Web.API.App.Update.Streaming.Types
import Fission.Web.API.App.Update.Types

type App = "app" :> API
type API = Index :<|> Create :<|> Update :<|> Destroy
type API = Index :<|> Create :<|> Update :<|> StreamingUpdate :<|> Destroy
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,16 @@ import qualified Network.IPFS.CID.Types as IPFS

import Fission.URL.Types

import Fission.Prelude

import qualified Fission.Web.API.Auth.Types as Auth
import Fission.Web.API.Prelude





import Data.Swagger hiding (URL, url)
import Servant.Types.SourceT as S
import Streamly.Prelude

Expand All @@ -25,18 +28,32 @@ type StreamingUpdate
:> Capture "New CID" IPFS.CID
--
:> Auth.HigherOrder
-- :> Stream 'PATCH 200 NewlineFraming JSON (SourceIO Natural) -- FIXME better type
-- :> Stream 'PATCH 200 NewlineFraming JSON (SourceIO Natural) -- FIXME better type
:> Stream 'PATCH 200 NewlineFraming JSON (SourceIO UploadStatus) -- FIXME better type


isUploading :: UploadStatus -> Bool
isUploading = \case
Uploading _ -> True
_ -> False


data UploadStatus
= Failed
| Uploading Natural
| Done
:> Stream 'PATCH 200 NewlineFraming JSON (SourceIO BytesReceived)


newtype BytesReceived = BytesReceived { byteCount :: Natural } -- FIXME is it bytes?
deriving (Show, Eq)

instance ToJSON BytesReceived where
toJSON BytesReceived {..} = object [ "bytes" .= byteCount ]

-- ToSchema Fission.Web.API.App.Update.Streaming.Types.BytesReceived

instance ToSchema BytesReceived where -- VERY MUCH FIXME
declareNamedSchema _ = do
urls' <- declareSchemaRef $ Proxy @[URL]
insertedAt' <- declareSchemaRef $ Proxy @UTCTime
modifiedAt' <- declareSchemaRef $ Proxy @UTCTime

mempty
|> type_ ?~ SwaggerObject
|> properties .~
[ ("urls", urls')
, ("insertedAt", insertedAt')
, ("modifiedAt", modifiedAt')
]
|> required .~ ["username", "email"]
|> description ?~ "Properties for a registered application"
|> example ?~ toJSON (BytesReceived 42)
|> NamedSchema (Just "App Index Payload")
|> pure
55 changes: 29 additions & 26 deletions fission-web-server/library/Fission/Web/Server.hs
Original file line number Diff line number Diff line change
Expand Up @@ -13,56 +13,57 @@ import Servant

import Fission.Prelude

import qualified Fission.Web.API.Types as Fission
import qualified Fission.Web.API.Types as Fission

import qualified Fission.Web.Auth.Token.JWT.Resolver as Proof
import qualified Fission.Web.Auth.Token.JWT.Resolver as Proof

import qualified Fission.Web.Server.Auth as Auth
import qualified Fission.Web.Server.Challenge as Challenge
import qualified Fission.Web.Server.Auth as Auth
import qualified Fission.Web.Server.Challenge as Challenge
import Fission.Web.Server.Email
import qualified Fission.Web.Server.Swagger as Web.Swagger
import qualified Fission.Web.Server.User as User
import qualified Fission.Web.Server.Swagger as Web.Swagger
import qualified Fission.Web.Server.User as User
import Fission.Web.Server.WNFS

import qualified Fission.Web.Server.Handler.App as App
import qualified Fission.Web.Server.Handler.DNS as DNS
import qualified Fission.Web.Server.Handler.Heroku as Heroku
import qualified Fission.Web.Server.Handler.IPFS as IPFS
import qualified Fission.Web.Server.Handler.Ping as Ping
import qualified Fission.Web.Server.Handler.User as User
import qualified Fission.Web.Server.Heroku.AddOn as Heroku.AddOn
import qualified Fission.Web.Server.Swagger.Types as Swagger
import qualified Fission.Web.Server.Handler.App as App
import qualified Fission.Web.Server.Handler.DNS as DNS
import qualified Fission.Web.Server.Handler.Heroku as Heroku
import qualified Fission.Web.Server.Handler.IPFS as IPFS
import qualified Fission.Web.Server.Handler.Ping as Ping
import qualified Fission.Web.Server.Handler.User as User
import qualified Fission.Web.Server.Heroku.AddOn as Heroku.AddOn
import qualified Fission.Web.Server.Swagger.Types as Swagger

import Fission.Web.Server.IPFS.DNSLink as DNSLink
import qualified Fission.Web.Server.LoosePin as LoosePin
import Fission.Web.Server.IPFS.DNSLink as DNSLink
import qualified Fission.Web.Server.LoosePin as LoosePin

import qualified Fission.Web.Server.App as App
import qualified Fission.Web.Server.App.Content as App.Content
import qualified Fission.Web.Server.App.Domain as App.Domain
import qualified Fission.Web.Server.App as App
import qualified Fission.Web.Server.App.Content as App.Content
import qualified Fission.Web.Server.App.Domain as App.Domain

import qualified Fission.Web.Server.Handler.Auth.UCAN.Verify as Auth.UCAN.Verify
import qualified Fission.Web.Server.Handler.Auth.UCAN.Verify as Auth.UCAN.Verify

import Fission.Web.Server.Types as Fission
import Fission.Web.Server.Types as Fission

import Fission.Web.Server.Handler
import qualified Fission.Web.Server.Handler.Relay as Relay
import qualified Fission.Web.Server.Handler.Relay as Relay
import Fission.Web.Server.Handler.Relay.Types

import qualified Fission.Web.Server.Host.Types as Web
import Fission.Web.Server.IPFS.Cluster as Cluster
import qualified Fission.Web.Server.Host.Types as Web
import Fission.Web.Server.IPFS.Cluster as Cluster
import Fission.Web.Server.IPFS.Linked
import Fission.Web.Server.MonadDB
import Fission.Web.Server.Reflective
import Fission.Web.Server.Relay.Store.Class

import Fission.Internal.Orphanage.OctetStream ()
import Fission.Internal.Orphanage.PlainText ()
import Fission.Internal.Orphanage.OctetStream ()
import Fission.Internal.Orphanage.PlainText ()

-- | Top level web API type. Handled by 'server'.
type API = Swagger.API :<|> Fission.API :<|> LinkWS
type LinkWS = "user" :> "link" :> RelayWS

app ::
-- ( MonadIPFSCluster m PinStatus -- FIXME abstract out
( App.Domain.Initializer m
, App.Content.Initializer m
, App.CRUD m
Expand Down Expand Up @@ -105,6 +106,7 @@ app handlerNT authChecks appHost = do

-- | Web handlers for the 'API'
server ::
-- ( MonadIPFSCluster m PinStatus -- FIXME abstract out
( App.Domain.Initializer m
, App.Content.Initializer m
, App.CRUD m
Expand Down Expand Up @@ -141,6 +143,7 @@ server appHost
:<|> Relay.relay

bizServer ::
-- ( MonadIPFSCluster m PinStatus -- FIXME abstract out
( App.Domain.Initializer m
, App.Content.Initializer m
, App.CRUD m
Expand Down
9 changes: 8 additions & 1 deletion fission-web-server/library/Fission/Web/Server/Handler/App.hs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,14 @@ import qualified Fission.Web.Server.Handler.App.Destroy as Destroy
import qualified Fission.Web.Server.Handler.App.Index as Index
import qualified Fission.Web.Server.Handler.App.Update as Update



import Fission.Web.Server.IPFS.Cluster.Class
import Network.IPFS.Client.Streaming.Pin

handler ::
( App.Domain.Initializer m
( MonadIPFSCluster m PinStatus -- FIXME abstract out
, App.Domain.Initializer m
, App.CRUD m
, App.Content.Initializer m
, MonadTime m
Expand All @@ -33,4 +39,5 @@ handler ::
handler = Index.index
:<|> Create.create
:<|> Update.update
:<|> Update.updateStreaming
:<|> Destroy.handler
111 changes: 61 additions & 50 deletions fission-web-server/library/Fission/Web/Server/Handler/App/Update.hs
Original file line number Diff line number Diff line change
@@ -1,36 +1,36 @@
module Fission.Web.Server.Handler.App.Update (update) where
module Fission.Web.Server.Handler.App.Update (update, updateStreaming) where

import Servant

import Fission.Prelude

import qualified Fission.Web.API.App.Update.Streaming.Types as API.App
import qualified Fission.Web.API.App.Update.Types as API.App
import qualified Fission.Web.API.App.Update.Streaming.Types as API.App
import qualified Fission.Web.API.App.Update.Types as API.App

import qualified Fission.Web.Server.App as App
import qualified Fission.Web.Server.App as App
import Fission.Web.Server.Authorization.Types
import Fission.Web.Server.Error as Web.Error
import Fission.Web.Server.Error as Web.Error




-- 🌐

import qualified RIO.List as List
import qualified RIO.NonEmpty as NonEmpty
import qualified RIO.List as List
import qualified RIO.NonEmpty as NonEmpty

import Servant.Types.SourceT as S
import qualified Streamly.Prelude as Streamly
import Servant.Types.SourceT as S
import qualified Streamly.Prelude as Streamly

import Network.IPFS.Client.Streaming.Pin

import Network.IPFS.CID.Types
import qualified Network.IPFS.Client as IPFS
import Network.IPFS.Client.Pin as Pin
import Network.IPFS.Client.Streaming.Pin as Pin
import qualified Network.IPFS.Client as IPFS
import Network.IPFS.Client.Pin as Pin
import Network.IPFS.Client.Streaming.Pin as Pin

import Servant.Client
import qualified Servant.Client.Streaming as Streaming
import qualified Servant.Client.Streaming as Streaming

-- ⚛️

Expand All @@ -41,12 +41,16 @@ import Fission.Web.Server.IPFS.Cluster.Class

import Fission.URL.Types

import Fission.Process.Time
import Fission.Time



import Fission.Web.API.App.Update.Streaming.Types


import Fission.Web.Server.Internal.Orphanage.SerialT ()

update :: (MonadLogger m, MonadThrow m, MonadTime m, App.Modifier m) => ServerT API.App.Update m
update url newCID copyDataFlag Authorization {about = Entity userId _} = do
now <- currentTime
Expand All @@ -56,60 +60,72 @@ update url newCID copyDataFlag Authorization {about = Entity userId _} = do
copyFiles :: Bool
copyFiles = maybe True identity copyDataFlag

instance ToSourceIO a (Streamly.SerialT IO a) where
toSourceIO serialStream =
SourceT \k ->
k $ Effect do
cont <- Streamly.foldr folder Skip serialStream
return $ cont Stop
where
folder :: a -> (StepT IO a -> StepT IO a) -> (StepT IO a -> StepT IO a)
folder x contAcc = \nextCont -> contAcc (Yield x nextCont)

-- CID -> Authorization -> m (SourceT IO Natural)
updateStreaming ::
forall m .
( MonadIO m
, MonadLogger m
, MonadThrow m
, MonadTime m
, MonadBaseControl IO m
, App.Modifier m
, MonadIPFSCluster m PinStatus
, ServerT API.App.StreamingUpdate m ~ (URL -> CID -> Authorization -> m (SourceT IO UploadStatus))
)
=> ServerT API.App.StreamingUpdate m
updateStreaming url newCID Authorization {about = Entity userId _} = do
now <- currentTime
status <- liftIO . newTVarIO $ Uploading 0 -- FIXME switch to mvar?
pseudoStreams <- streamCluster $ (Streaming.client $ Proxy @PinComplete) newCID (Just True)

let
(asyncRefs, chans) = NonEmpty.unzip pseudoStreams

asyncUpdates <- for chans \statusChan -> liftIO $ withAsync (action statusChan status) pure
let (asyncRefs, chans) = NonEmpty.unzip pseudoStreams
asyncListeners <- foo chans status

let
source :: Streamly.SerialT IO UploadStatus
source = Streamly.repeatM (readTVarIO status)
source :: Streamly.SerialT IO (Maybe BytesReceived)
source =
Streamly.repeatM do
sleepThread . Seconds $ Milli @Natural 500
readTVarIO status >>= \case
Uploading byteCount -> return . Just $ BytesReceived byteCount
_ -> return $ Nothing

source
|> Streamly.takeWhile isUploading
|> Streamly.finally (forM asyncRefs cancel)
|> Streamly.takeWhile isJust
|> Streamly.finally do
forM_ asyncListeners cancel
forM_ asyncRefs cancel
|> toSourceIO
|> mapStepT go
|> pure

action :: MonadIO m => TChan (Either ClientError PinStatus) -> TVar UploadStatus -> m ()
action channel status =
liftIO $ atomically go
where
go = do
go :: Monad m => StepT m (Maybe BytesReceived) -> StepT m BytesReceived
go = \case
S.Yield Nothing more -> go more
S.Yield (Just byteCount) more -> S.Yield byteCount (go more)

S.Skip more -> go more
S.Effect action -> S.Effect $ fmap go action
S.Error msg -> S.Error msg
S.Stop -> S.Stop

foo ::
MonadIO m
=> NonEmpty (TChan (Either ClientError PinStatus))
-> TVar UploadStatus
-> m (NonEmpty (Async ()))
foo chans statusVar =
forM chans \statusChan ->
liftIO $ withAsync (atomically $ fanIn statusChan statusVar) pure

fanIn :: TChan (Either ClientError PinStatus) -> TVar UploadStatus -> STM ()
fanIn channel status = go
where
go :: STM ()
go =
readTVar status >>= \case
Done ->
return () -- FIXME?
return ()

Failed ->
return () -- FIXME!
return ()

Uploading lastMax ->
readTChan channel >>= \case
Expand All @@ -127,12 +143,7 @@ action channel status =

go

toSourceT :: Monad m => Streamly.SerialT m a -> SourceT m a
toSourceT serialStream =
SourceT \k ->
k $ Effect do
cont <- Streamly.foldr folder Skip serialStream
return $ cont Stop
where
folder :: a -> (StepT m a -> StepT m a) -> (StepT m a -> StepT m a)
folder x contAcc = \nextCont -> contAcc (Yield x nextCont)
data UploadStatus
= Failed
| Uploading Natural
| Done
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
{-# OPTIONS_GHC -fno-warn-orphans #-}

module Fission.Web.Server.Internal.Orphanage.SerialT () where

import Servant.API
import Streamly.Prelude

import Fission.Prelude
import Fission.Web.Server.Stream

instance ToSourceIO a (SerialT IO a) where
toSourceIO = toSourceT
Loading

0 comments on commit 3037757

Please sign in to comment.