Merge pull request #988 from phadej/list-is-pure-stream

Add toStreamGenerator (NonEmpty a) instance
This commit is contained in:
Oleg Grenrus 2018-06-24 22:56:07 +03:00 committed by GitHub
commit ae05ef5312
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
9 changed files with 149 additions and 41 deletions

View file

@ -52,6 +52,7 @@ library
, bytestring >= 0.10.4.0 && < 0.11
, containers >= 0.5.5.1 && < 0.6
, text >= 1.2.3.0 && < 1.3
, transformers >= 0.3.0.0 && < 0.6
if !impl(ghc >= 8.0)
build-depends:

View file

@ -21,6 +21,7 @@ import Prelude.Compat
import Control.Concurrent (newMVar, modifyMVar)
import Data.Foldable (toList)
import qualified Data.ByteString.Lazy as BL
import Control.Monad.IO.Class (MonadIO (..))
import Data.List (foldl')
import Data.Proxy (Proxy (Proxy))
import Data.Semigroup ((<>))
@ -33,7 +34,7 @@ import Servant.API ((:<|>) ((:<|>)), (:>),
AuthProtect, BasicAuth,
BasicAuthData,
BuildHeadersTo (..),
BuildFromStream (..),
FromResultStream (..),
ByteStringParser (..),
Capture', CaptureAll,
Description, EmptyAPI,
@ -283,18 +284,18 @@ instance OVERLAPPING_
hoistClientMonad _ _ f ma = f ma
instance OVERLAPPABLE_
( RunClient m, MimeUnrender ct a, ReflectMethod method,
FramingUnrender framing a, BuildFromStream a (f a)
) => HasClient m (Stream method status framing ct (f a)) where
( RunClient m, MonadIO m, MimeUnrender ct a, ReflectMethod method,
FramingUnrender framing a, FromResultStream a b
) => HasClient m (Stream method status framing ct b) where
type Client m (Stream method status framing ct (f a)) = m (f a)
type Client m (Stream method status framing ct b) = m b
clientWithRoute _pm Proxy req = do
sresp <- streamingRequest req
{ requestAccept = fromList [contentType (Proxy :: Proxy ct)]
, requestMethod = reflectMethod (Proxy :: Proxy method)
}
return . buildFromStream $ ResultStream $ \k ->
liftIO $ fromResultStream $ ResultStream $ \k ->
runStreamingResponse sresp $ \gres -> do
let reader = responseBody gres
let unrender = unrenderFrames (Proxy :: Proxy framing) (Proxy :: Proxy a)

View file

@ -107,12 +107,12 @@ manager' = unsafePerformIO $ C.newManager C.defaultManagerSettings
runClient :: ClientM a -> BaseUrl -> IO (Either ServantError a)
runClient x baseUrl' = runClientM x (mkClientEnv manager' baseUrl')
runResultStream :: ResultStream a
testRunResultStream :: ResultStream a
-> IO ( Maybe (Either String a)
, Maybe (Either String a)
, Maybe (Either String a)
, Maybe (Either String a))
runResultStream (ResultStream k)
testRunResultStream (ResultStream k)
= k $ \act -> (,,,) <$> act <*> act <*> act <*> act
streamSpec :: Spec
@ -122,13 +122,13 @@ streamSpec = beforeAll (CS.startWaiApp server) $ afterAll CS.endWaiApp $ do
Right res <- runClient getGetNL baseUrl
let jra = Just (Right alice)
jrb = Just (Right bob)
runResultStream res `shouldReturn` (jra, jrb, jra, Nothing)
testRunResultStream res `shouldReturn` (jra, jrb, jra, Nothing)
it "works with Servant.API.StreamGet.Netstring" $ \(_, baseUrl) -> do
Right res <- runClient getGetNS baseUrl
let jra = Just (Right alice)
jrb = Just (Right bob)
runResultStream res `shouldReturn` (jra, jrb, jra, Nothing)
testRunResultStream res `shouldReturn` (jra, jrb, jra, Nothing)
it "streams in constant memory" $ \(_, baseUrl) -> do
Right (ResultStream res) <- runClient getGetALot baseUrl

View file

@ -840,6 +840,24 @@ instance OVERLAPPABLE_
status = fromInteger $ natVal (Proxy :: Proxy status)
p = Proxy :: Proxy a
-- | TODO: mention the endpoint is streaming, its framing strategy
--
-- Also there are no samples.
instance OVERLAPPABLE_
(MimeRender ct a, KnownNat status
, ReflectMethod method)
=> HasDocs (Stream method status framing ct a) where
docsFor Proxy (endpoint, action) DocOptions{..} =
single endpoint' action'
where endpoint' = endpoint & method .~ method'
action' = action & response.respTypes .~ allMime t
& response.respStatus .~ status
t = Proxy :: Proxy '[ct]
method' = reflectMethod (Proxy :: Proxy method)
status = fromInteger $ natVal (Proxy :: Proxy status)
p = Proxy :: Proxy a
instance OVERLAPPING_
(ToSample a, AllMimeRender (ct ': cts) a, KnownNat status
, ReflectMethod method, AllHeaderSamples ls, GetHeaders (HList ls))

View file

@ -238,6 +238,20 @@ instance (Elem JSON list, HasForeignType lang ftype a, ReflectMethod method)
method = reflectMethod (Proxy :: Proxy method)
methodLC = toLower $ decodeUtf8 method
-- | TODO: doesn't taking framing into account.
instance (ct ~ JSON, HasForeignType lang ftype a, ReflectMethod method)
=> HasForeign lang ftype (Stream method status framing ct a) where
type Foreign ftype (Stream method status framing ct a) = Req ftype
foreignFor lang Proxy Proxy req =
req & reqFuncName . _FunctionName %~ (methodLC :)
& reqMethod .~ method
& reqReturnType .~ Just retType
where
retType = typeFor lang (Proxy :: Proxy ftype) (Proxy :: Proxy a)
method = reflectMethod (Proxy :: Proxy method)
methodLC = toLower $ decodeUtf8 method
instance (KnownSymbol sym, HasForeignType lang ftype (RequiredArgument mods a), HasForeign lang ftype api)
=> HasForeign lang ftype (Header' mods sym a :> api) where
type Foreign ftype (Header' mods sym a :> api) = Foreign ftype api

View file

@ -114,7 +114,7 @@ import Servant.API.ResponseHeaders
ResponseHeader (..), addHeader, getHeadersHList, getResponse,
noHeader)
import Servant.API.Stream
(BoundaryStrategy (..), BuildFromStream (..),
(BoundaryStrategy (..), FromResultStream (..),
ByteStringParser (..), FramingRender (..),
FramingUnrender (..), NetstringFraming, NewlineFraming,
NoFraming, ResultStream (..), Stream, StreamGenerator (..),

View file

@ -7,6 +7,7 @@
module Servant.API.Internal.Test.ComprehensiveAPI where
import Data.Proxy
(Proxy (..))
import Servant.API
type GET = Get '[JSON] NoContent
@ -38,6 +39,7 @@ type ComprehensiveAPIWithoutRaw =
Vault :> GET :<|>
Verb 'POST 204 '[JSON] NoContent :<|>
Verb 'POST 204 '[JSON] Int :<|>
Stream 'GET 200 NetstringFraming JSON [Int] :<|>
WithNamedContext "foo" '[] GET :<|>
CaptureAll "foo" Int :> GET :<|>
Summary "foo" :> GET :<|>

View file

@ -11,13 +11,38 @@
{-# LANGUAGE TupleSections #-}
{-# OPTIONS_HADDOCK not-home #-}
module Servant.API.Stream where
module Servant.API.Stream (
Stream,
StreamGet,
StreamPost,
-- * Sources
--
-- | Both 'StreamGenerator' and 'ResultStream' are equivalent
-- to some *source* in streaming libraries.
StreamGenerator (..),
ToStreamGenerator (..),
ResultStream (..),
FromResultStream (..),
-- * Framing
FramingRender (..),
FramingUnrender (..),
BoundaryStrategy (..),
ByteStringParser (..),
-- ** Strategies
NoFraming,
NewlineFraming,
NetstringFraming,
) where
import Control.Arrow
(first)
import Data.ByteString.Lazy
(ByteString, empty)
import qualified Data.ByteString.Lazy.Char8 as LB
import Data.Foldable
(traverse_)
import Data.List.NonEmpty
(NonEmpty (..))
import Data.Monoid
((<>))
import Data.Proxy
@ -30,35 +55,82 @@ import GHC.TypeLits
(Nat)
import Network.HTTP.Types.Method
(StdMethod (..))
import System.IO.Unsafe
(unsafeInterleaveIO)
import Text.Read
(readMaybe)
-- | A Stream endpoint for a given method emits a stream of encoded values at a given Content-Type, delimited by a framing strategy. Stream endpoints always return response code 200 on success. Type synonyms are provided for standard methods.
-- | A Stream endpoint for a given method emits a stream of encoded values at a
-- given Content-Type, delimited by a framing strategy. Stream endpoints always
-- return response code 200 on success. Type synonyms are provided for standard
-- methods.
data Stream (method :: k1) (status :: Nat) (framing :: *) (contentType :: *) (a :: *)
deriving (Typeable, Generic)
type StreamGet = Stream 'GET 200
type StreamPost = Stream 'POST 200
-- | Stream endpoints may be implemented as producing a @StreamGenerator@ -- a function that itself takes two emit functions -- the first to be used on the first value the stream emits, and the second to be used on all subsequent values (to allow interspersed framing strategies such as comma separation).
-- | Stream endpoints may be implemented as producing a @StreamGenerator@ a
-- function that itself takes two emit functions the first to be used on the
-- first value the stream emits, and the second to be used on all subsequent
-- values (to allow interspersed framing strategies such as comma separation).
newtype StreamGenerator a = StreamGenerator { getStreamGenerator :: (a -> IO ()) -> (a -> IO ()) -> IO () }
-- | ToStreamGenerator is intended to be implemented for types such as Conduit, Pipe, etc. By implementing this class, all such streaming abstractions can be used directly as endpoints.
class ToStreamGenerator a b | a -> b where
toStreamGenerator :: a -> StreamGenerator b
instance ToStreamGenerator (StreamGenerator a) a
where toStreamGenerator x = x
instance ToStreamGenerator (StreamGenerator a) a where
toStreamGenerator x = x
-- | Clients reading from streaming endpoints can be implemented as producing a @ResultStream@ that captures the setup, takedown, and incremental logic for a read, being an IO continuation that takes a producer of Just either values or errors that terminates with a Nothing.
newtype ResultStream a = ResultStream (forall b. (IO (Maybe (Either String a)) -> IO b) -> IO b)
instance ToStreamGenerator (NonEmpty a) a where
toStreamGenerator (x :| xs) = StreamGenerator $ \f g -> f x >> traverse_ g xs
-- | BuildFromStream is intended to be implemented for types such as Conduit, Pipe, etc. By implementing this class, all such streaming abstractions can be used directly on the client side for talking to streaming endpoints.
class BuildFromStream a b where
buildFromStream :: ResultStream a -> b
instance ToStreamGenerator [a] a where
toStreamGenerator [] = StreamGenerator $ \_ _ -> return ()
toStreamGenerator (x : xs) = StreamGenerator $ \f g -> f x >> traverse_ g xs
instance BuildFromStream a (ResultStream a)
where buildFromStream x = x
-- | Clients reading from streaming endpoints can be implemented as producing a
-- @ResultStream@ that captures the setup, takedown, and incremental logic for
-- a read, being an IO continuation that takes a producer of Just either values
-- or errors that terminates with a Nothing.
newtype ResultStream a = ResultStream { runResultStream :: forall b. (IO (Maybe (Either String a)) -> IO b) -> IO b }
-- | FromResultStream is intended to be implemented for types such as Conduit, Pipe, etc. By implementing this class, all such streaming abstractions can be used directly on the client side for talking to streaming endpoints.
class FromResultStream a b | b -> a where
fromResultStream :: ResultStream a -> IO b
instance FromResultStream a (ResultStream a) where
fromResultStream = return
-- | Uses 'unsafeInterleaveIO'
instance FromResultStream a [a] where
fromResultStream x = runResultStream x lazyRead
-- | Uses 'unsafeInterleaveIO'
instance FromResultStream a (NonEmpty a) where
fromResultStream x = runResultStream x $ \r -> do
e <- r
case e of
Nothing -> fail "Empty stream"
Just (Left er) -> fail er
Just (Right y) -> do
ys <- lazyRead r
return (y :| ys)
lazyRead :: IO (Maybe (Either String a)) -> IO [a]
lazyRead r = go
where
go = unsafeInterleaveIO loop
loop = do
e <- r
case e of
Nothing -> return []
Just (Left er) -> fail er
Just (Right y) -> do
ys <- go
return (y : ys)
-- | The FramingRender class provides the logic for emitting a framing strategy. The strategy emits a header, followed by boundary-delimited data, and finally a termination character. For many strategies, some of these will just be empty bytestrings.
class FramingRender strategy a where
@ -74,9 +146,9 @@ data BoundaryStrategy = BoundaryStrategyBracket (ByteString -> (ByteString,ByteS
| BoundaryStrategyGeneral (ByteString -> ByteString)
-- | A type of parser that can never fail, and has different parsing strategies (incremental, or EOF) depending if more input can be sent. The incremental parser should return `Nothing` if it would like to be sent a longer ByteString. If it returns a value, it also returns the remainder following that value.
data ByteStringParser a = ByteStringParser {
parseIncremental :: ByteString -> Maybe (a, ByteString),
parseEOF :: ByteString -> (a, ByteString)
data ByteStringParser a = ByteStringParser
{ parseIncremental :: ByteString -> Maybe (a, ByteString)
, parseEOF :: ByteString -> (a, ByteString)
}
-- | The FramingUnrender class provides the logic for parsing a framing strategy. The outer @ByteStringParser@ strips the header from a stream of bytes, and yields a parser that can handle the remainder, stepwise. Each frame may be a ByteString, or a String indicating the error state for that frame. Such states are per-frame, so that protocols that can resume after errors are able to do so. Eventually this returns an empty ByteString to indicate termination.

View file

@ -460,8 +460,8 @@ instance HasLink Raw where
type MkLink Raw a = a
toLink toA _ = toA
instance HasLink (Stream m fr ct a) where
type MkLink (Stream m fr ct a) r = r
instance HasLink (Stream m status fr ct a) where
type MkLink (Stream m status fr ct a) r = r
toLink toA _ = toA
-- AuthProtext instances