From c5e04514f99f95b79b255475a22171c1894edcdd Mon Sep 17 00:00:00 2001 From: Gershom Date: Wed, 18 Oct 2017 18:43:43 -0400 Subject: [PATCH 01/10] initial checkin --- servant-server/servant-server.cabal | 1 + servant-server/src/Servant/Server/Internal.hs | 54 +++++++++++++++++-- servant/servant.cabal | 1 + servant/src/Servant/API.hs | 8 ++- servant/src/Servant/API/Stream.hs | 42 +++++++++++++++ 5 files changed, 101 insertions(+), 5 deletions(-) create mode 100644 servant/src/Servant/API/Stream.hs diff --git a/servant-server/servant-server.cabal b/servant-server/servant-server.cabal index efb8f29b..c4fe7c6e 100644 --- a/servant-server/servant-server.cabal +++ b/servant-server/servant-server.cabal @@ -60,6 +60,7 @@ library , containers >= 0.5 && < 0.6 , exceptions >= 0.8 && < 0.9 , http-api-data >= 0.3 && < 0.4 + , http-media >= 0.4 && < 0.8 , http-types >= 0.8 && < 0.10 , network-uri >= 2.6 && < 2.7 , monad-control >= 1.0.0.4 && < 1.1 diff --git a/servant-server/src/Servant/Server/Internal.hs b/servant-server/src/Servant/Server/Internal.hs index 7dd290db..29d56fb0 100644 --- a/servant-server/src/Servant/Server/Internal.hs +++ b/servant-server/src/Servant/Server/Internal.hs @@ -25,12 +25,15 @@ module Servant.Server.Internal , module Servant.Server.Internal.ServantErr ) where +import Control.Monad (when) import Control.Monad.Trans (liftIO) import Control.Monad.Trans.Resource (runResourceT) import qualified Data.ByteString as B +import qualified Data.ByteString.Builder as BB import qualified Data.ByteString.Char8 as BC8 import qualified Data.ByteString.Lazy as BL -import Data.Maybe (fromMaybe, mapMaybe) +import Data.Maybe (fromMaybe, mapMaybe, + isNothing, maybeToList) import Data.Either (partitionEithers) import Data.String (fromString) import Data.String.Conversions (cs, (<>)) @@ -40,13 +43,15 @@ import Data.Typeable import GHC.TypeLits (KnownNat, KnownSymbol, natVal, symbolVal) import Network.HTTP.Types hiding (Header, ResponseHeaders) +import qualified Network.HTTP.Media as NHM import Network.Socket (SockAddr) import Network.Wai (Application, Request, Response, httpVersion, isSecure, lazyRequestBody, rawQueryString, remoteHost, requestHeaders, requestMethod, - responseLBS, vault) + responseLBS, responseStream, + vault) import Prelude () import Prelude.Compat import Web.HttpApiData (FromHttpApiData, parseHeader, @@ -60,11 +65,16 @@ import Servant.API ((:<|>) (..), (:>), BasicAuth, Capt QueryParam, QueryParams, Raw, RemoteHost, ReqBody, Vault, WithNamedContext, - Description, Summary) + Description, Summary, + Accept(..), + Framing(..), Stream, + StreamGenerator(..), + BoundaryStrategy(..)) import Servant.API.ContentTypes (AcceptHeader (..), AllCTRender (..), AllCTUnrender (..), AllMime, + MimeRender(..), canHandleAcceptH) import Servant.API.ResponseHeaders (GetHeaders, Headers, getHeaders, getResponse) @@ -276,6 +286,42 @@ instance OVERLAPPING_ where method = reflectMethod (Proxy :: Proxy method) status = toEnum . fromInteger $ natVal (Proxy :: Proxy status) + +instance ( MimeRender ctype a, ReflectMethod method, Framing framing ctype + ) => HasServer (Stream method framing ctype a) context where + + type ServerT (Stream method framing ctype a) m = m (StreamGenerator a) + hoistServerWithContext _ _ nt s = nt s + + route Proxy _ action = leafRouter $ \env request respond -> + let accH = fromMaybe ct_wildcard $ lookup hAccept $ requestHeaders request + cmediatype = NHM.matchAccept [contentType (Proxy :: Proxy ctype)] accH + accCheck = when (isNothing cmediatype) $ delayedFail err406 + contentHeader = (hContentType, NHM.renderHeader . maybeToList $ cmediatype) + in runAction (action `addMethodCheck` methodCheck method request + `addAcceptCheck` accCheck + ) env request respond $ \ (StreamGenerator k) -> + Route $ responseStream status200 [contentHeader] $ \write flush -> do + write . BB.lazyByteString . header (Proxy :: Proxy framing) $ (Proxy :: Proxy ctype) + case boundary (Proxy :: Proxy framing) (Proxy :: Proxy ctype) of + BoundaryStrategyBracket f -> + let go x = let bs = mimeRender (Proxy :: Proxy ctype) $ x + (before, after) = f bs + in write ( BB.lazyByteString before + <> BB.lazyByteString bs + <> BB.lazyByteString after) + in k go go + BoundaryStrategyIntersperse sep -> k + (\x -> do + write . BB.lazyByteString . mimeRender (Proxy :: Proxy ctype) $ x + flush) + (\x -> do + write . (BB.lazyByteString sep <>) . BB.lazyByteString . mimeRender (Proxy :: Proxy ctype) $ x + flush) + write . BB.lazyByteString . terminate (Proxy :: Proxy framing) $ (Proxy :: Proxy ctype) + where method = reflectMethod (Proxy :: Proxy method) + + -- | If you use 'Header' in one of the endpoints for your API, -- this automatically requires your server-side handler to be a function -- that takes an argument of the type specified by 'Header'. @@ -318,7 +364,7 @@ instance (KnownSymbol sym, FromHttpApiData a, HasServer api context) <> fromString headerName <> " failed: " <> e } - Right header -> return $ Just header + Right hdr -> return $ Just hdr -- | If you use @'QueryParam' "author" Text@ in one of the endpoints for your API, -- this automatically requires your server-side handler to be a function diff --git a/servant/servant.cabal b/servant/servant.cabal index fbd4d714..86373336 100644 --- a/servant/servant.cabal +++ b/servant/servant.cabal @@ -47,6 +47,7 @@ library Servant.API.IsSecure Servant.API.QueryParam Servant.API.Raw + Servant.API.Stream Servant.API.RemoteHost Servant.API.ReqBody Servant.API.ResponseHeaders diff --git a/servant/src/Servant/API.hs b/servant/src/Servant/API.hs index 88e4d934..9e4a5a84 100644 --- a/servant/src/Servant/API.hs +++ b/servant/src/Servant/API.hs @@ -31,7 +31,10 @@ module Servant.API ( -- * Actual endpoints, distinguished by HTTP method module Servant.API.Verbs, - -- * Authentication + -- * Streaming endpoints, distinguished by HTTP method + module Servant.API.Stream, + +-- * Authentication module Servant.API.BasicAuth, -- * Endpoints description @@ -80,6 +83,9 @@ import Servant.API.IsSecure (IsSecure (..)) import Servant.API.QueryParam (QueryFlag, QueryParam, QueryParams) import Servant.API.Raw (Raw) +import Servant.API.Stream (Stream, StreamGenerator(..), + Framing(..), BoundaryStrategy(..), + NewlineFraming) import Servant.API.RemoteHost (RemoteHost) import Servant.API.ReqBody (ReqBody) import Servant.API.ResponseHeaders (AddHeader, addHeader, noHeader, diff --git a/servant/src/Servant/API/Stream.hs b/servant/src/Servant/API/Stream.hs new file mode 100644 index 00000000..f0092000 --- /dev/null +++ b/servant/src/Servant/API/Stream.hs @@ -0,0 +1,42 @@ +{-# LANGUAGE DeriveDataTypeable #-} +{-# LANGUAGE DeriveGeneric #-} +{-# LANGUAGE FlexibleInstances #-} +{-# LANGUAGE KindSignatures #-} +{-# LANGUAGE MultiParamTypeClasses #-} +{-# LANGUAGE OverloadedStrings #-} +{-# LANGUAGE PolyKinds #-} +{-# OPTIONS_HADDOCK not-home #-} + +module Servant.API.Stream where + +import Data.ByteString.Lazy (ByteString, empty) +import Data.Proxy (Proxy) +import Data.Typeable (Typeable) +import GHC.Generics (Generic) + +-- | A stream endpoint for a given method emits a stream of encoded values at a given Content-Type, delimited by a framing strategy. +data Stream (method :: k1) (framing :: *) (contentType :: *) a + deriving (Typeable, Generic) + +-- | Stream endpoints may be implemented as producing a @StreamGenerator@ -- a function that itself takes two emit functions -- the first to be used on the first value the stream emits, and the second to be used on all subsequent values (to allow interspersed framing strategies such as comma separation). +data StreamGenerator a = StreamGenerator ((a -> IO ()) -> (a -> IO ()) -> IO ()) + +-- | The Framing class provides the logic for each framing strategy. The strategy emits a header, followed by boundary-delimited data, and finally a termination character. For many strategies, some of these will just be empty bytestrings. +class Framing strategy a where + header :: Proxy strategy -> Proxy a -> ByteString + boundary :: Proxy strategy -> Proxy a -> BoundaryStrategy + terminate :: Proxy strategy -> Proxy a -> ByteString + +-- | The bracketing strategy generates things to precede and follow the content, as with netstrings. +-- | The intersperse strategy inserts seperators between things, as with newline framing. +data BoundaryStrategy = BoundaryStrategyBracket (ByteString -> (ByteString,ByteString)) + | BoundaryStrategyIntersperse ByteString + + +-- | A simple framing strategy that has no header or termination, and inserts a newline character between each frame. +data NewlineFraming + +instance Framing NewlineFraming a where + header _ _ = empty + boundary _ _ = BoundaryStrategyIntersperse "\n" + terminate _ _ = empty From d4168aa3ae02133cbc517b39ec7014f44fa62d36 Mon Sep 17 00:00:00 2001 From: Gershom Date: Thu, 19 Oct 2017 17:41:49 -0400 Subject: [PATCH 02/10] first round of changes --- servant-server/src/Servant/Server/Internal.hs | 65 ++++++++++++++----- servant/src/Servant/API.hs | 5 +- servant/src/Servant/API/Stream.hs | 60 +++++++++++++---- servant/src/Servant/Utils/Links.hs | 5 ++ 4 files changed, 103 insertions(+), 32 deletions(-) diff --git a/servant-server/src/Servant/Server/Internal.hs b/servant-server/src/Servant/Server/Internal.hs index 29d56fb0..47f15879 100644 --- a/servant-server/src/Servant/Server/Internal.hs +++ b/servant-server/src/Servant/Server/Internal.hs @@ -10,6 +10,7 @@ {-# LANGUAGE PolyKinds #-} {-# LANGUAGE RankNTypes #-} {-# LANGUAGE ScopedTypeVariables #-} +{-# LANGUAGE TupleSections #-} {-# LANGUAGE TypeFamilies #-} {-# LANGUAGE TypeOperators #-} @@ -67,8 +68,8 @@ import Servant.API ((:<|>) (..), (:>), BasicAuth, Capt WithNamedContext, Description, Summary, Accept(..), - Framing(..), Stream, - StreamGenerator(..), + FramingRender(..), Stream, + StreamGenerator(..), ToStreamGenerator(..), BoundaryStrategy(..)) import Servant.API.ContentTypes (AcceptHeader (..), AllCTRender (..), @@ -287,40 +288,68 @@ instance OVERLAPPING_ status = toEnum . fromInteger $ natVal (Proxy :: Proxy status) -instance ( MimeRender ctype a, ReflectMethod method, Framing framing ctype - ) => HasServer (Stream method framing ctype a) context where +instance OVERLAPPABLE_ + ( MimeRender ctype a, ReflectMethod method, + FramingRender framing ctype, ToStreamGenerator f a + ) => HasServer (Stream method framing ctype (f a)) context where - type ServerT (Stream method framing ctype a) m = m (StreamGenerator a) + type ServerT (Stream method framing ctype (f a)) m = m (f a) hoistServerWithContext _ _ nt s = nt s - route Proxy _ action = leafRouter $ \env request respond -> + route Proxy _ = streamRouter ([],) method (Proxy :: Proxy framing) (Proxy :: Proxy ctype) + where method = reflectMethod (Proxy :: Proxy method) + +instance OVERLAPPING_ + ( MimeRender ctype a, ReflectMethod method, + FramingRender framing ctype, ToStreamGenerator f a, + GetHeaders (Headers h (f a)) + ) => HasServer (Stream method framing ctype (Headers h (f a))) context where + + type ServerT (Stream method framing ctype (Headers h (f a))) m = m (Headers h (f a)) + hoistServerWithContext _ _ nt s = nt s + + route Proxy _ = streamRouter (\x -> (getHeaders x, getResponse x)) method (Proxy :: Proxy framing) (Proxy :: Proxy ctype) + where method = reflectMethod (Proxy :: Proxy method) + + +streamRouter :: (MimeRender ctype a, FramingRender framing ctype, ToStreamGenerator f a) => + (b -> ([(HeaderName, B.ByteString)], f a)) + -> Method + -> Proxy framing + -> Proxy ctype + -> Delayed env (Handler b) + -> Router env +streamRouter splitHeaders method framingproxy ctypeproxy action = leafRouter $ \env request respond -> let accH = fromMaybe ct_wildcard $ lookup hAccept $ requestHeaders request - cmediatype = NHM.matchAccept [contentType (Proxy :: Proxy ctype)] accH + cmediatype = NHM.matchAccept [contentType ctypeproxy] accH accCheck = when (isNothing cmediatype) $ delayedFail err406 contentHeader = (hContentType, NHM.renderHeader . maybeToList $ cmediatype) in runAction (action `addMethodCheck` methodCheck method request `addAcceptCheck` accCheck - ) env request respond $ \ (StreamGenerator k) -> - Route $ responseStream status200 [contentHeader] $ \write flush -> do - write . BB.lazyByteString . header (Proxy :: Proxy framing) $ (Proxy :: Proxy ctype) - case boundary (Proxy :: Proxy framing) (Proxy :: Proxy ctype) of + ) env request respond $ \ output -> + let (headers, fa) = splitHeaders output + k = getStreamGenerator . toStreamGenerator $ fa in + Route $ responseStream status200 (contentHeader : headers) $ \write flush -> do + write . BB.lazyByteString $ header framingproxy ctypeproxy + case boundary framingproxy ctypeproxy of BoundaryStrategyBracket f -> - let go x = let bs = mimeRender (Proxy :: Proxy ctype) $ x + let go x = let bs = mimeRender ctypeproxy $ x (before, after) = f bs in write ( BB.lazyByteString before <> BB.lazyByteString bs - <> BB.lazyByteString after) + <> BB.lazyByteString after) >> flush in k go go BoundaryStrategyIntersperse sep -> k (\x -> do - write . BB.lazyByteString . mimeRender (Proxy :: Proxy ctype) $ x + write . BB.lazyByteString . mimeRender ctypeproxy $ x flush) (\x -> do - write . (BB.lazyByteString sep <>) . BB.lazyByteString . mimeRender (Proxy :: Proxy ctype) $ x + write . (BB.lazyByteString sep <>) . BB.lazyByteString . mimeRender ctypeproxy $ x flush) - write . BB.lazyByteString . terminate (Proxy :: Proxy framing) $ (Proxy :: Proxy ctype) - where method = reflectMethod (Proxy :: Proxy method) - + BoundaryStrategyGeneral f -> + let go = (>> flush) . write . BB.lazyByteString . f . mimeRender ctypeproxy + in k go go + write . BB.lazyByteString $ terminate framingproxy ctypeproxy -- | If you use 'Header' in one of the endpoints for your API, -- this automatically requires your server-side handler to be a function diff --git a/servant/src/Servant/API.hs b/servant/src/Servant/API.hs index 9e4a5a84..3a2c768c 100644 --- a/servant/src/Servant/API.hs +++ b/servant/src/Servant/API.hs @@ -83,8 +83,9 @@ import Servant.API.IsSecure (IsSecure (..)) import Servant.API.QueryParam (QueryFlag, QueryParam, QueryParams) import Servant.API.Raw (Raw) -import Servant.API.Stream (Stream, StreamGenerator(..), - Framing(..), BoundaryStrategy(..), +import Servant.API.Stream (Stream, StreamGenerator(..), ToStreamGenerator(..), + FramingRender(..), BoundaryStrategy(..), + FramingUnrender(..), NewlineFraming) import Servant.API.RemoteHost (RemoteHost) import Servant.API.ReqBody (ReqBody) diff --git a/servant/src/Servant/API/Stream.hs b/servant/src/Servant/API/Stream.hs index f0092000..e37855ea 100644 --- a/servant/src/Servant/API/Stream.hs +++ b/servant/src/Servant/API/Stream.hs @@ -5,38 +5,74 @@ {-# LANGUAGE MultiParamTypeClasses #-} {-# LANGUAGE OverloadedStrings #-} {-# LANGUAGE PolyKinds #-} +{-# LANGUAGE TupleSections #-} {-# OPTIONS_HADDOCK not-home #-} module Servant.API.Stream where -import Data.ByteString.Lazy (ByteString, empty) -import Data.Proxy (Proxy) -import Data.Typeable (Typeable) -import GHC.Generics (Generic) +import Control.Arrow ((***), first) +import Data.ByteString.Lazy (ByteString, empty) +import qualified Data.ByteString.Lazy.Char8 as LB +import Data.Proxy (Proxy) +import Data.Typeable (Typeable) +import GHC.Generics (Generic) +import Text.Read (readMaybe) -- | A stream endpoint for a given method emits a stream of encoded values at a given Content-Type, delimited by a framing strategy. data Stream (method :: k1) (framing :: *) (contentType :: *) a deriving (Typeable, Generic) -- | Stream endpoints may be implemented as producing a @StreamGenerator@ -- a function that itself takes two emit functions -- the first to be used on the first value the stream emits, and the second to be used on all subsequent values (to allow interspersed framing strategies such as comma separation). -data StreamGenerator a = StreamGenerator ((a -> IO ()) -> (a -> IO ()) -> IO ()) +newtype StreamGenerator a = StreamGenerator {getStreamGenerator :: (a -> IO ()) -> (a -> IO ()) -> IO ()} --- | The Framing class provides the logic for each framing strategy. The strategy emits a header, followed by boundary-delimited data, and finally a termination character. For many strategies, some of these will just be empty bytestrings. -class Framing strategy a where +-- | ToStreamGenerator is intended to be implemented for types such as Conduit, Pipe, etc. By implementing this class, all such streaming abstractions can be used directly as endpoints. +class ToStreamGenerator f a where + toStreamGenerator :: f a -> StreamGenerator a + +instance ToStreamGenerator StreamGenerator a + where toStreamGenerator x = x + +-- | The FramingRender class provides the logic for emitting a framing strategy. The strategy emits a header, followed by boundary-delimited data, and finally a termination character. For many strategies, some of these will just be empty bytestrings. +class FramingRender strategy a where header :: Proxy strategy -> Proxy a -> ByteString boundary :: Proxy strategy -> Proxy a -> BoundaryStrategy terminate :: Proxy strategy -> Proxy a -> ByteString -- | The bracketing strategy generates things to precede and follow the content, as with netstrings. --- | The intersperse strategy inserts seperators between things, as with newline framing. +-- The intersperse strategy inserts seperators between things, as with newline framing. +-- Finally, the general strategy performs an arbitrary rewrite on the content, to allow escaping rules and such. data BoundaryStrategy = BoundaryStrategyBracket (ByteString -> (ByteString,ByteString)) | BoundaryStrategyIntersperse ByteString + | BoundaryStrategyGeneral (ByteString -> ByteString) +-- | The FramingUnrender class provides the logic for parsing a framing strategy. Given a ByteString, it strips the header, and returns a tuple of the remainder along with a step function that can progressively "uncons" elements from this remainder. The error state is presented per-frame so that protocols that can resume after errors are able to do so. + +class FramingUnrender strategy a where + unrenderFrames :: Proxy strategy -> Proxy a -> ByteString -> (ByteString, ByteString -> (Either String ByteString, ByteString)) -- | A simple framing strategy that has no header or termination, and inserts a newline character between each frame. +-- This assumes that it is used with a Content-Type that encodes without newlines (e.g. JSON). data NewlineFraming -instance Framing NewlineFraming a where - header _ _ = empty - boundary _ _ = BoundaryStrategyIntersperse "\n" - terminate _ _ = empty +instance FramingRender NewlineFraming a where + header _ _ = empty + boundary _ _ = BoundaryStrategyIntersperse "\n" + terminate _ _ = empty + +instance FramingUnrender NewlineFraming a where + unrenderFrames _ _ = (, (Right *** LB.drop 1) . LB.break (== '\n')) + +-- | The netstring framing strategy as defined by djb: +data NetstringFraming + +instance FramingRender NetstringFraming a where + header _ _ = empty + boundary _ _ = BoundaryStrategyBracket $ \b -> (LB.pack . show . LB.length $ b, "") + terminate _ _ = empty + +instance FramingUnrender NetstringFraming a where + unrenderFrames _ _ = (, \b -> let (i,r) = LB.break (==':') b + in case readMaybe (LB.unpack i) of + Just len -> first Right $ LB.splitAt len . LB.drop 1 $ r + Nothing -> (Left ("Bad netstring frame, couldn't parse value as integer value: " ++ LB.unpack i), LB.drop 1 . LB.dropWhile (/= ',') $ r) + ) diff --git a/servant/src/Servant/Utils/Links.hs b/servant/src/Servant/Utils/Links.hs index 0318f96c..5d480a41 100644 --- a/servant/src/Servant/Utils/Links.hs +++ b/servant/src/Servant/Utils/Links.hs @@ -117,6 +117,7 @@ import Servant.API.RemoteHost ( RemoteHost ) import Servant.API.Verbs ( Verb ) import Servant.API.Sub ( type (:>) ) import Servant.API.Raw ( Raw ) +import Servant.API.Stream ( Stream ) import Servant.API.TypeLevel import Servant.API.Experimental.Auth ( AuthProtect ) @@ -306,6 +307,10 @@ instance HasLink Raw where type MkLink Raw = Link toLink _ = id +instance HasLink (Stream m fr ct a) where + type MkLink (Stream m fr ct a) = Link + toLink _ = id + -- AuthProtext instances instance HasLink sub => HasLink (AuthProtect tag :> sub) where type MkLink (AuthProtect tag :> sub) = MkLink sub From 9132a5bb8453bd41ab22ad6866b1b688408d5fd5 Mon Sep 17 00:00:00 2001 From: Gershom Date: Thu, 19 Oct 2017 17:43:01 -0400 Subject: [PATCH 03/10] fix indentation --- servant/src/Servant/API.hs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/servant/src/Servant/API.hs b/servant/src/Servant/API.hs index 3a2c768c..f1bee3bd 100644 --- a/servant/src/Servant/API.hs +++ b/servant/src/Servant/API.hs @@ -34,7 +34,7 @@ module Servant.API ( -- * Streaming endpoints, distinguished by HTTP method module Servant.API.Stream, --- * Authentication + -- * Authentication module Servant.API.BasicAuth, -- * Endpoints description From 9a2ac6f4dd7a28e56949980dbfddd467e7811712 Mon Sep 17 00:00:00 2001 From: Gershom Date: Fri, 20 Oct 2017 15:09:11 -0400 Subject: [PATCH 04/10] HasClient instance for Stream --- .../Servant/Client/Core/Internal/HasClient.hs | 42 +++++++++++++++++++ servant-server/src/Servant/Server/Internal.hs | 2 +- servant/src/Servant/API.hs | 7 ++-- servant/src/Servant/API/Stream.hs | 26 ++++++++---- 4 files changed, 66 insertions(+), 11 deletions(-) diff --git a/servant-client-core/src/Servant/Client/Core/Internal/HasClient.hs b/servant-client-core/src/Servant/Client/Core/Internal/HasClient.hs index 42d61d58..fda931bd 100644 --- a/servant-client-core/src/Servant/Client/Core/Internal/HasClient.hs +++ b/servant-client-core/src/Servant/Client/Core/Internal/HasClient.hs @@ -18,6 +18,7 @@ import Prelude () import Prelude.Compat import Data.Foldable (toList) +import qualified Data.ByteString.Lazy as BL import Data.List (foldl') import Data.Proxy (Proxy (Proxy)) import Data.Sequence (fromList) @@ -29,8 +30,10 @@ import Servant.API ((:<|>) ((:<|>)), (:>), AuthProtect, BasicAuth, BasicAuthData, BuildHeadersTo (..), + BuildFromStream (..), Capture, CaptureAll, Description, EmptyAPI, + FramingUnrender (..), Header, Headers (..), HttpVersion, IsSecure, MimeRender (mimeRender), @@ -40,6 +43,7 @@ import Servant.API ((:<|>) ((:<|>)), (:>), QueryParams, Raw, ReflectMethod (..), RemoteHost, ReqBody, + Stream, Summary, ToHttpApiData, Vault, Verb, WithNamedContext, @@ -244,6 +248,44 @@ instance OVERLAPPING_ , getHeadersHList = buildHeadersTo . toList $ responseHeaders response } +instance OVERLAPPABLE_ + ( RunClient m, MimeUnrender ct a, ReflectMethod method, + FramingUnrender framing a, BuildFromStream a (f a) + ) => HasClient m (Stream method framing ct (f a)) where + type Client m (Stream method framing ct (f a)) = m (f a) + clientWithRoute _pm Proxy req = do + response <- runRequest req + { requestAccept = fromList [contentType (Proxy :: Proxy ct)] + , requestMethod = reflectMethod (Proxy :: Proxy method) + } + return $ decodeFramed (Proxy :: Proxy framing) (Proxy :: Proxy ct) (Proxy :: Proxy a) response + +instance OVERLAPPING_ + ( RunClient m, BuildHeadersTo ls, MimeUnrender ct a, ReflectMethod method, + FramingUnrender framing a, BuildFromStream a (f a) + ) => HasClient m (Stream method framing ct (Headers ls (f a))) where + type Client m (Stream method framing ct (Headers ls (f a))) = m (Headers ls (f a)) + clientWithRoute _pm Proxy req = do + response <- runRequest req + { requestAccept = fromList [contentType (Proxy :: Proxy ct)] + , requestMethod = reflectMethod (Proxy :: Proxy method) + } + return Headers { getResponse = decodeFramed (Proxy :: Proxy framing) (Proxy :: Proxy ct) (Proxy :: Proxy a) response + , getHeadersHList = buildHeadersTo . toList $ responseHeaders response + } + +decodeFramed :: forall ctype strategy a b. + (MimeUnrender ctype a, FramingUnrender strategy a, BuildFromStream a b) => + Proxy strategy -> Proxy ctype -> Proxy a -> Response -> b +decodeFramed framingproxy ctypeproxy typeproxy response = + let (body, uncons) = unrenderFrames framingproxy typeproxy (responseBody response) + loop b | BL.null b = [] + | otherwise = case uncons b of + (Right x, r) -> case mimeUnrender ctypeproxy x :: Either String a of + Left err -> Left err : loop r + Right x' -> Right x' : loop r + (Left err, r) -> Left err : loop r + in buildFromStream $ loop body -- | If you use a 'Header' in one of your endpoints in your API, -- the corresponding querying function will automatically take diff --git a/servant-server/src/Servant/Server/Internal.hs b/servant-server/src/Servant/Server/Internal.hs index 47f15879..e7145d70 100644 --- a/servant-server/src/Servant/Server/Internal.hs +++ b/servant-server/src/Servant/Server/Internal.hs @@ -349,7 +349,7 @@ streamRouter splitHeaders method framingproxy ctypeproxy action = leafRouter $ \ BoundaryStrategyGeneral f -> let go = (>> flush) . write . BB.lazyByteString . f . mimeRender ctypeproxy in k go go - write . BB.lazyByteString $ terminate framingproxy ctypeproxy + write . BB.lazyByteString $ trailer framingproxy ctypeproxy -- | If you use 'Header' in one of the endpoints for your API, -- this automatically requires your server-side handler to be a function diff --git a/servant/src/Servant/API.hs b/servant/src/Servant/API.hs index f1bee3bd..cfb99dc4 100644 --- a/servant/src/Servant/API.hs +++ b/servant/src/Servant/API.hs @@ -83,9 +83,10 @@ import Servant.API.IsSecure (IsSecure (..)) import Servant.API.QueryParam (QueryFlag, QueryParam, QueryParams) import Servant.API.Raw (Raw) -import Servant.API.Stream (Stream, StreamGenerator(..), ToStreamGenerator(..), - FramingRender(..), BoundaryStrategy(..), - FramingUnrender(..), +import Servant.API.Stream (Stream, StreamGenerator (..), + ToStreamGenerator (..), BuildFromStream (..), + FramingRender (..), BoundaryStrategy (..), + FramingUnrender (..), NewlineFraming) import Servant.API.RemoteHost (RemoteHost) import Servant.API.ReqBody (ReqBody) diff --git a/servant/src/Servant/API/Stream.hs b/servant/src/Servant/API/Stream.hs index e37855ea..1cb0491c 100644 --- a/servant/src/Servant/API/Stream.hs +++ b/servant/src/Servant/API/Stream.hs @@ -1,3 +1,4 @@ +{-# LANGUAGE DataKinds #-} {-# LANGUAGE DeriveDataTypeable #-} {-# LANGUAGE DeriveGeneric #-} {-# LANGUAGE FlexibleInstances #-} @@ -17,11 +18,15 @@ import Data.Proxy (Proxy) import Data.Typeable (Typeable) import GHC.Generics (Generic) import Text.Read (readMaybe) +import Network.HTTP.Types.Method (StdMethod (..)) --- | A stream endpoint for a given method emits a stream of encoded values at a given Content-Type, delimited by a framing strategy. +-- | A Stream endpoint for a given method emits a stream of encoded values at a given Content-Type, delimited by a framing strategy. Steam endpoints always return response code 200 on success. Type synonyms are provided for standard methods. data Stream (method :: k1) (framing :: *) (contentType :: *) a deriving (Typeable, Generic) +type StreamGet = Stream 'GET +type StreamPost = Stream 'POST + -- | Stream endpoints may be implemented as producing a @StreamGenerator@ -- a function that itself takes two emit functions -- the first to be used on the first value the stream emits, and the second to be used on all subsequent values (to allow interspersed framing strategies such as comma separation). newtype StreamGenerator a = StreamGenerator {getStreamGenerator :: (a -> IO ()) -> (a -> IO ()) -> IO ()} @@ -32,11 +37,18 @@ class ToStreamGenerator f a where instance ToStreamGenerator StreamGenerator a where toStreamGenerator x = x +-- | BuildFromStream is intended to be implemented for types such as Conduit, Pipe, etc. By implementing this class, all such streaming abstractions can be used directly on the client side for talking to streaming endpoints. The streams we build from are represented as lazy lists of elements interspersed with possible errors. +class BuildFromStream a b where + buildFromStream :: [Either String a] -> b + +instance BuildFromStream a [Either String a] + where buildFromStream x = x + -- | The FramingRender class provides the logic for emitting a framing strategy. The strategy emits a header, followed by boundary-delimited data, and finally a termination character. For many strategies, some of these will just be empty bytestrings. class FramingRender strategy a where - header :: Proxy strategy -> Proxy a -> ByteString - boundary :: Proxy strategy -> Proxy a -> BoundaryStrategy - terminate :: Proxy strategy -> Proxy a -> ByteString + header :: Proxy strategy -> Proxy a -> ByteString + boundary :: Proxy strategy -> Proxy a -> BoundaryStrategy + trailer :: Proxy strategy -> Proxy a -> ByteString -- | The bracketing strategy generates things to precede and follow the content, as with netstrings. -- The intersperse strategy inserts seperators between things, as with newline framing. @@ -45,7 +57,7 @@ data BoundaryStrategy = BoundaryStrategyBracket (ByteString -> (ByteString,ByteS | BoundaryStrategyIntersperse ByteString | BoundaryStrategyGeneral (ByteString -> ByteString) --- | The FramingUnrender class provides the logic for parsing a framing strategy. Given a ByteString, it strips the header, and returns a tuple of the remainder along with a step function that can progressively "uncons" elements from this remainder. The error state is presented per-frame so that protocols that can resume after errors are able to do so. +-- | The FramingUnrender class provides the logic for parsing a framing strategy. Given a ByteString, it strips the header, and returns a tuple of the remainder along with a step function that can progressively "uncons" elements from this remainder. The error state is presented per-frame so that protocols that can resume after errors are able to do so. Termination of the unrendering is indicated by return of an empty reminder. class FramingUnrender strategy a where unrenderFrames :: Proxy strategy -> Proxy a -> ByteString -> (ByteString, ByteString -> (Either String ByteString, ByteString)) @@ -57,7 +69,7 @@ data NewlineFraming instance FramingRender NewlineFraming a where header _ _ = empty boundary _ _ = BoundaryStrategyIntersperse "\n" - terminate _ _ = empty + trailer _ _ = empty instance FramingUnrender NewlineFraming a where unrenderFrames _ _ = (, (Right *** LB.drop 1) . LB.break (== '\n')) @@ -68,7 +80,7 @@ data NetstringFraming instance FramingRender NetstringFraming a where header _ _ = empty boundary _ _ = BoundaryStrategyBracket $ \b -> (LB.pack . show . LB.length $ b, "") - terminate _ _ = empty + trailer _ _ = empty instance FramingUnrender NetstringFraming a where unrenderFrames _ _ = (, \b -> let (i,r) = LB.break (==':') b From e75a3cc37be56d3013918ef4330938b1cb0b1fda Mon Sep 17 00:00:00 2001 From: Gershom Date: Tue, 24 Oct 2017 17:12:21 -0700 Subject: [PATCH 05/10] streaming client actually streams --- .../src/Servant/Client/Core.hs | 1 + .../Servant/Client/Core/Internal/HasClient.hs | 71 +++++++++++-------- .../Servant/Client/Core/Internal/Request.hs | 4 ++ .../Servant/Client/Core/Internal/RunClient.hs | 2 + .../src/Servant/Client/Internal/HttpClient.hs | 12 ++++ servant/src/Servant/API.hs | 1 + servant/src/Servant/API/Stream.hs | 25 +++++-- 7 files changed, 82 insertions(+), 34 deletions(-) diff --git a/servant-client-core/src/Servant/Client/Core.hs b/servant-client-core/src/Servant/Client/Core.hs index a926c169..73160abf 100644 --- a/servant-client-core/src/Servant/Client/Core.hs +++ b/servant-client-core/src/Servant/Client/Core.hs @@ -43,6 +43,7 @@ module Servant.Client.Core , Response(..) , RunClient(..) , module Servant.Client.Core.Internal.BaseUrl + , StreamingResponse(..) -- * Writing HasClient instances -- | These functions need not be re-exported by backend libraries. diff --git a/servant-client-core/src/Servant/Client/Core/Internal/HasClient.hs b/servant-client-core/src/Servant/Client/Core/Internal/HasClient.hs index fda931bd..5d320379 100644 --- a/servant-client-core/src/Servant/Client/Core/Internal/HasClient.hs +++ b/servant-client-core/src/Servant/Client/Core/Internal/HasClient.hs @@ -4,6 +4,7 @@ {-# LANGUAGE FlexibleInstances #-} {-# LANGUAGE InstanceSigs #-} {-# LANGUAGE MultiParamTypeClasses #-} +{-# LANGUAGE RankNTypes #-} {-# LANGUAGE OverloadedStrings #-} {-# LANGUAGE PolyKinds #-} {-# LANGUAGE ScopedTypeVariables #-} @@ -17,9 +18,12 @@ module Servant.Client.Core.Internal.HasClient where import Prelude () import Prelude.Compat +import Control.Concurrent (newMVar, modifyMVar) +import Control.Monad (when) import Data.Foldable (toList) import qualified Data.ByteString.Lazy as BL import Data.List (foldl') +import Data.Monoid ((<>)) import Data.Proxy (Proxy (Proxy)) import Data.Sequence (fromList) import Data.String (fromString) @@ -31,6 +35,7 @@ import Servant.API ((:<|>) ((:<|>)), (:>), BasicAuthData, BuildHeadersTo (..), BuildFromStream (..), + ByteStringParser (..), Capture, CaptureAll, Description, EmptyAPI, FramingUnrender (..), @@ -248,44 +253,54 @@ instance OVERLAPPING_ , getHeadersHList = buildHeadersTo . toList $ responseHeaders response } +data ResultStream a = ResultStream ((forall b. (IO (Maybe (Either String a)) -> IO b) -> IO b)) + instance OVERLAPPABLE_ ( RunClient m, MimeUnrender ct a, ReflectMethod method, FramingUnrender framing a, BuildFromStream a (f a) ) => HasClient m (Stream method framing ct (f a)) where - type Client m (Stream method framing ct (f a)) = m (f a) + + type Client m (Stream method framing ct (f a)) = m (ResultStream a) + clientWithRoute _pm Proxy req = do - response <- runRequest req + sresp <- streamingRequest req { requestAccept = fromList [contentType (Proxy :: Proxy ct)] , requestMethod = reflectMethod (Proxy :: Proxy method) } - return $ decodeFramed (Proxy :: Proxy framing) (Proxy :: Proxy ct) (Proxy :: Proxy a) response + return $ ResultStream $ \k -> + runStreamingResponse sresp $ \(status,_headers,_httpversion,reader) -> do + when (H.statusCode status /= 200) $ error "bad status" --fixme + let unrender = unrenderFrames (Proxy :: Proxy framing) (Proxy :: Proxy a) + loop bs = do + res <- BL.fromStrict <$> reader + if BL.null res + then return $ parseEOF unrender res + else let sofar = (bs <> res) + in case parseIncremental unrender sofar of + Just x -> return x + Nothing -> loop sofar + (frameParser, remainder) <- loop BL.empty + state <- newMVar remainder + let frameLoop bs = do + res <- BL.fromStrict <$> reader + let addIsEmptyInfo (a, r) = (r, (a, BL.null r && BL.null res)) + if BL.null res + then return . addIsEmptyInfo $ parseEOF frameParser res + else let sofar = (bs <> res) + in case parseIncremental frameParser res of + Just x -> return $ addIsEmptyInfo x + Nothing -> frameLoop sofar -instance OVERLAPPING_ - ( RunClient m, BuildHeadersTo ls, MimeUnrender ct a, ReflectMethod method, - FramingUnrender framing a, BuildFromStream a (f a) - ) => HasClient m (Stream method framing ct (Headers ls (f a))) where - type Client m (Stream method framing ct (Headers ls (f a))) = m (Headers ls (f a)) - clientWithRoute _pm Proxy req = do - response <- runRequest req - { requestAccept = fromList [contentType (Proxy :: Proxy ct)] - , requestMethod = reflectMethod (Proxy :: Proxy method) - } - return Headers { getResponse = decodeFramed (Proxy :: Proxy framing) (Proxy :: Proxy ct) (Proxy :: Proxy a) response - , getHeadersHList = buildHeadersTo . toList $ responseHeaders response - } + go = processResult <$> modifyMVar state frameLoop + processResult (Right bs,isDone) = + if BL.null bs && isDone + then Nothing + else Just $ case mimeUnrender (Proxy :: Proxy ct) bs :: Either String a of + Left err -> Left err + Right x -> Right x + processResult (Left err, _) = Just (Left err) + k go -decodeFramed :: forall ctype strategy a b. - (MimeUnrender ctype a, FramingUnrender strategy a, BuildFromStream a b) => - Proxy strategy -> Proxy ctype -> Proxy a -> Response -> b -decodeFramed framingproxy ctypeproxy typeproxy response = - let (body, uncons) = unrenderFrames framingproxy typeproxy (responseBody response) - loop b | BL.null b = [] - | otherwise = case uncons b of - (Right x, r) -> case mimeUnrender ctypeproxy x :: Either String a of - Left err -> Left err : loop r - Right x' -> Right x' : loop r - (Left err, r) -> Left err : loop r - in buildFromStream $ loop body -- | If you use a 'Header' in one of your endpoints in your API, -- the corresponding querying function will automatically take diff --git a/servant-client-core/src/Servant/Client/Core/Internal/Request.hs b/servant-client-core/src/Servant/Client/Core/Internal/Request.hs index 458219b9..b120c7f7 100644 --- a/servant-client-core/src/Servant/Client/Core/Internal/Request.hs +++ b/servant-client-core/src/Servant/Client/Core/Internal/Request.hs @@ -4,6 +4,7 @@ {-# LANGUAGE DeriveGeneric #-} {-# LANGUAGE GeneralizedNewtypeDeriving #-} {-# LANGUAGE MultiParamTypeClasses #-} +{-# LANGUAGE RankNTypes #-} {-# LANGUAGE OverloadedStrings #-} {-# LANGUAGE ScopedTypeVariables #-} {-# LANGUAGE TypeFamilies #-} @@ -15,6 +16,7 @@ import Prelude.Compat import Control.Monad.Catch (Exception) import qualified Data.ByteString.Builder as Builder +import qualified Data.ByteString as BS import qualified Data.ByteString.Lazy as LBS import Data.Semigroup ((<>)) import qualified Data.Sequence as Seq @@ -70,6 +72,8 @@ data Response = Response , responseHttpVersion :: HttpVersion } deriving (Eq, Show, Generic, Typeable) +data StreamingResponse = StreamingResponse { runStreamingResponse :: forall a. ((Status, Seq.Seq Header, HttpVersion, IO BS.ByteString) -> IO a) -> IO a } + -- A GET request to the top-level path defaultRequest :: Request defaultRequest = Request diff --git a/servant-client-core/src/Servant/Client/Core/Internal/RunClient.hs b/servant-client-core/src/Servant/Client/Core/Internal/RunClient.hs index 564cbb39..88b39a04 100644 --- a/servant-client-core/src/Servant/Client/Core/Internal/RunClient.hs +++ b/servant-client-core/src/Servant/Client/Core/Internal/RunClient.hs @@ -19,11 +19,13 @@ import Servant.API (MimeUnrender, contentTypes, mimeUnrender) import Servant.Client.Core.Internal.Request (Request, Response (..), + StreamingResponse (..), ServantError (..)) class (Monad m) => RunClient m where -- | How to make a request. runRequest :: Request -> m Response + streamingRequest :: Request -> m StreamingResponse throwServantError :: ServantError -> m a catchServantError :: m a -> (ServantError -> m a) -> m a diff --git a/servant-client/src/Servant/Client/Internal/HttpClient.hs b/servant-client/src/Servant/Client/Internal/HttpClient.hs index d116b443..94086306 100644 --- a/servant-client/src/Servant/Client/Internal/HttpClient.hs +++ b/servant-client/src/Servant/Client/Internal/HttpClient.hs @@ -88,6 +88,7 @@ instance Alt ClientM where instance RunClient ClientM where runRequest = performRequest + streamingRequest = performStreamingRequest throwServantError = throwError catchServantError = catchError @@ -115,6 +116,17 @@ performRequest req = do throwError $ FailureResponse ourResponse return ourResponse +performStreamingRequest :: Request -> ClientM StreamingResponse +performStreamingRequest req = do + m <- asks manager + burl <- asks baseUrl + let request = requestToClientRequest burl req + + return $ StreamingResponse $ + \k -> Client.withResponse request m $ + \r -> + k (Client.responseStatus r, fromList $ Client.responseHeaders r, Client.responseVersion r, Client.responseBody r) + clientResponseToReponse :: Client.Response BSL.ByteString -> Response clientResponseToReponse r = Response { responseStatusCode = Client.responseStatus r diff --git a/servant/src/Servant/API.hs b/servant/src/Servant/API.hs index cfb99dc4..a2495dc9 100644 --- a/servant/src/Servant/API.hs +++ b/servant/src/Servant/API.hs @@ -85,6 +85,7 @@ import Servant.API.QueryParam (QueryFlag, QueryParam, import Servant.API.Raw (Raw) import Servant.API.Stream (Stream, StreamGenerator (..), ToStreamGenerator (..), BuildFromStream (..), + ByteStringParser (..), FramingRender (..), BoundaryStrategy (..), FramingUnrender (..), NewlineFraming) diff --git a/servant/src/Servant/API/Stream.hs b/servant/src/Servant/API/Stream.hs index 1cb0491c..e676bc6b 100644 --- a/servant/src/Servant/API/Stream.hs +++ b/servant/src/Servant/API/Stream.hs @@ -11,13 +11,11 @@ module Servant.API.Stream where -import Control.Arrow ((***), first) import Data.ByteString.Lazy (ByteString, empty) import qualified Data.ByteString.Lazy.Char8 as LB import Data.Proxy (Proxy) import Data.Typeable (Typeable) import GHC.Generics (Generic) -import Text.Read (readMaybe) import Network.HTTP.Types.Method (StdMethod (..)) -- | A Stream endpoint for a given method emits a stream of encoded values at a given Content-Type, delimited by a framing strategy. Steam endpoints always return response code 200 on success. Type synonyms are provided for standard methods. @@ -57,10 +55,16 @@ data BoundaryStrategy = BoundaryStrategyBracket (ByteString -> (ByteString,ByteS | BoundaryStrategyIntersperse ByteString | BoundaryStrategyGeneral (ByteString -> ByteString) --- | The FramingUnrender class provides the logic for parsing a framing strategy. Given a ByteString, it strips the header, and returns a tuple of the remainder along with a step function that can progressively "uncons" elements from this remainder. The error state is presented per-frame so that protocols that can resume after errors are able to do so. Termination of the unrendering is indicated by return of an empty reminder. +-- | A type of parser that can never fail, and has different parsing strategies (incremental, or EOF) depending if more input can be sent. The incremental parser should return `Nothing` if it would like to be sent a longer ByteString. If it returns a value, it also returns the remainder following that value. +data ByteStringParser a = ByteStringParser { + parseIncremental :: ByteString -> Maybe (a, ByteString), + parseEOF :: ByteString -> (a, ByteString) +} +-- | The FramingUnrender class provides the logic for parsing a framing strategy. The outer @ByteStringParser@ strips the header from a stream of bytes, and yields a parser that can handle the remainder, stepwise. Each frame may be a ByteString, or a String indicating the error state for that frame. Such states are per-frame, so that protocols that can resume after errors are able to do so. Eventually this returns an empty ByteString to indicate termination. class FramingUnrender strategy a where - unrenderFrames :: Proxy strategy -> Proxy a -> ByteString -> (ByteString, ByteString -> (Either String ByteString, ByteString)) + unrenderFrames :: Proxy strategy -> Proxy a -> ByteStringParser (ByteStringParser (Either String ByteString)) + -- | A simple framing strategy that has no header or termination, and inserts a newline character between each frame. -- This assumes that it is used with a Content-Type that encodes without newlines (e.g. JSON). @@ -72,8 +76,14 @@ instance FramingRender NewlineFraming a where trailer _ _ = empty instance FramingUnrender NewlineFraming a where - unrenderFrames _ _ = (, (Right *** LB.drop 1) . LB.break (== '\n')) - + unrenderFrames _ _ = ByteStringParser (Just . (go,)) (go,) + where go = ByteStringParser + (\x -> case LB.break (== '\n') x of + (h,r) -> if not (LB.null r) then Just (Right h, LB.drop 1 r) else Nothing + ) + (\x -> case LB.break (== '\n') x of + (h,r) -> (Right h, LB.drop 1 r) + ) -- | The netstring framing strategy as defined by djb: data NetstringFraming @@ -82,9 +92,12 @@ instance FramingRender NetstringFraming a where boundary _ _ = BoundaryStrategyBracket $ \b -> (LB.pack . show . LB.length $ b, "") trailer _ _ = empty +{- + instance FramingUnrender NetstringFraming a where unrenderFrames _ _ = (, \b -> let (i,r) = LB.break (==':') b in case readMaybe (LB.unpack i) of Just len -> first Right $ LB.splitAt len . LB.drop 1 $ r Nothing -> (Left ("Bad netstring frame, couldn't parse value as integer value: " ++ LB.unpack i), LB.drop 1 . LB.dropWhile (/= ',') $ r) ) +-} \ No newline at end of file From 0c77a2b4b0581a6d91fd3c99229d200ef402602b Mon Sep 17 00:00:00 2001 From: Gershom Date: Tue, 24 Oct 2017 17:26:18 -0700 Subject: [PATCH 06/10] make client endpoint give polymorphic result --- .../src/Servant/Client/Core/Internal/HasClient.hs | 8 +++----- servant/src/Servant/API.hs | 3 ++- servant/src/Servant/API/Stream.hs | 12 ++++++++---- 3 files changed, 13 insertions(+), 10 deletions(-) diff --git a/servant-client-core/src/Servant/Client/Core/Internal/HasClient.hs b/servant-client-core/src/Servant/Client/Core/Internal/HasClient.hs index 5d320379..ab3fb688 100644 --- a/servant-client-core/src/Servant/Client/Core/Internal/HasClient.hs +++ b/servant-client-core/src/Servant/Client/Core/Internal/HasClient.hs @@ -4,7 +4,6 @@ {-# LANGUAGE FlexibleInstances #-} {-# LANGUAGE InstanceSigs #-} {-# LANGUAGE MultiParamTypeClasses #-} -{-# LANGUAGE RankNTypes #-} {-# LANGUAGE OverloadedStrings #-} {-# LANGUAGE PolyKinds #-} {-# LANGUAGE ScopedTypeVariables #-} @@ -48,6 +47,7 @@ import Servant.API ((:<|>) ((:<|>)), (:>), QueryParams, Raw, ReflectMethod (..), RemoteHost, ReqBody, + ResultStream(..), Stream, Summary, ToHttpApiData, Vault, Verb, @@ -253,21 +253,19 @@ instance OVERLAPPING_ , getHeadersHList = buildHeadersTo . toList $ responseHeaders response } -data ResultStream a = ResultStream ((forall b. (IO (Maybe (Either String a)) -> IO b) -> IO b)) - instance OVERLAPPABLE_ ( RunClient m, MimeUnrender ct a, ReflectMethod method, FramingUnrender framing a, BuildFromStream a (f a) ) => HasClient m (Stream method framing ct (f a)) where - type Client m (Stream method framing ct (f a)) = m (ResultStream a) + type Client m (Stream method framing ct (f a)) = m (f a) clientWithRoute _pm Proxy req = do sresp <- streamingRequest req { requestAccept = fromList [contentType (Proxy :: Proxy ct)] , requestMethod = reflectMethod (Proxy :: Proxy method) } - return $ ResultStream $ \k -> + return . buildFromStream $ ResultStream $ \k -> runStreamingResponse sresp $ \(status,_headers,_httpversion,reader) -> do when (H.statusCode status /= 200) $ error "bad status" --fixme let unrender = unrenderFrames (Proxy :: Proxy framing) (Proxy :: Proxy a) diff --git a/servant/src/Servant/API.hs b/servant/src/Servant/API.hs index a2495dc9..bb2a2875 100644 --- a/servant/src/Servant/API.hs +++ b/servant/src/Servant/API.hs @@ -84,7 +84,8 @@ import Servant.API.QueryParam (QueryFlag, QueryParam, QueryParams) import Servant.API.Raw (Raw) import Servant.API.Stream (Stream, StreamGenerator (..), - ToStreamGenerator (..), BuildFromStream (..), + ToStreamGenerator (..), + ResultStream(..), BuildFromStream (..), ByteStringParser (..), FramingRender (..), BoundaryStrategy (..), FramingUnrender (..), diff --git a/servant/src/Servant/API/Stream.hs b/servant/src/Servant/API/Stream.hs index e676bc6b..01cd0898 100644 --- a/servant/src/Servant/API/Stream.hs +++ b/servant/src/Servant/API/Stream.hs @@ -6,6 +6,7 @@ {-# LANGUAGE MultiParamTypeClasses #-} {-# LANGUAGE OverloadedStrings #-} {-# LANGUAGE PolyKinds #-} +{-# LANGUAGE RankNTypes #-} {-# LANGUAGE TupleSections #-} {-# OPTIONS_HADDOCK not-home #-} @@ -35,11 +36,14 @@ class ToStreamGenerator f a where instance ToStreamGenerator StreamGenerator a where toStreamGenerator x = x --- | BuildFromStream is intended to be implemented for types such as Conduit, Pipe, etc. By implementing this class, all such streaming abstractions can be used directly on the client side for talking to streaming endpoints. The streams we build from are represented as lazy lists of elements interspersed with possible errors. -class BuildFromStream a b where - buildFromStream :: [Either String a] -> b +-- | Clients reading from streaming endpoints can be implemented as producing a @ResultStream@ that captures the setup, takedown, and incremental logic for a read, being an IO continuation that takes a producer of Just either values or errors that terminates with a Nothing. +data ResultStream a = ResultStream ((forall b. (IO (Maybe (Either String a)) -> IO b) -> IO b)) -instance BuildFromStream a [Either String a] +-- | BuildFromStream is intended to be implemented for types such as Conduit, Pipe, etc. By implementing this class, all such streaming abstractions can be used directly on the client side for talking to streaming endpoints. +class BuildFromStream a b where + buildFromStream :: ResultStream a -> b + +instance BuildFromStream a (ResultStream a) where buildFromStream x = x -- | The FramingRender class provides the logic for emitting a framing strategy. The strategy emits a header, followed by boundary-delimited data, and finally a termination character. For many strategies, some of these will just be empty bytestrings. From 38e87397e77b07e9527afa5e75027c6aeb290546 Mon Sep 17 00:00:00 2001 From: Gershom Date: Sat, 4 Nov 2017 00:10:29 -0400 Subject: [PATCH 07/10] add tests, fix to make tests work --- .../Servant/Client/Core/Internal/HasClient.hs | 10 +- servant-client/servant-client.cabal | 1 + servant-client/test/Servant/ClientSpec.hs | 2 +- servant-client/test/Servant/StreamSpec.hs | 113 ++++++++++++++++++ servant/src/Servant/API.hs | 6 +- servant/src/Servant/API/Stream.hs | 26 ++-- 6 files changed, 143 insertions(+), 15 deletions(-) create mode 100644 servant-client/test/Servant/StreamSpec.hs diff --git a/servant-client-core/src/Servant/Client/Core/Internal/HasClient.hs b/servant-client-core/src/Servant/Client/Core/Internal/HasClient.hs index ab3fb688..29bafb13 100644 --- a/servant-client-core/src/Servant/Client/Core/Internal/HasClient.hs +++ b/servant-client-core/src/Servant/Client/Core/Internal/HasClient.hs @@ -267,7 +267,7 @@ instance OVERLAPPABLE_ } return . buildFromStream $ ResultStream $ \k -> runStreamingResponse sresp $ \(status,_headers,_httpversion,reader) -> do - when (H.statusCode status /= 200) $ error "bad status" --fixme + when (H.statusCode status /= 200) $ error "bad status" -- TODO fixme let unrender = unrenderFrames (Proxy :: Proxy framing) (Proxy :: Proxy a) loop bs = do res <- BL.fromStrict <$> reader @@ -283,10 +283,12 @@ instance OVERLAPPABLE_ res <- BL.fromStrict <$> reader let addIsEmptyInfo (a, r) = (r, (a, BL.null r && BL.null res)) if BL.null res - then return . addIsEmptyInfo $ parseEOF frameParser res + then if BL.null bs + then return ("", (Right "", True)) + else return . addIsEmptyInfo $ parseEOF frameParser bs else let sofar = (bs <> res) - in case parseIncremental frameParser res of - Just x -> return $ addIsEmptyInfo x + in case parseIncremental frameParser sofar of + Just x -> return $ addIsEmptyInfo x Nothing -> frameLoop sofar go = processResult <$> modifyMVar state frameLoop diff --git a/servant-client/servant-client.cabal b/servant-client/servant-client.cabal index 14ea3ad2..06f770f0 100644 --- a/servant-client/servant-client.cabal +++ b/servant-client/servant-client.cabal @@ -66,6 +66,7 @@ test-suite spec main-is: Spec.hs other-modules: Servant.ClientSpec + Servant.StreamSpec build-depends: base == 4.* , aeson diff --git a/servant-client/test/Servant/ClientSpec.hs b/servant-client/test/Servant/ClientSpec.hs index fda25428..495fa860 100644 --- a/servant-client/test/Servant/ClientSpec.hs +++ b/servant-client/test/Servant/ClientSpec.hs @@ -24,7 +24,7 @@ {-# OPTIONS_GHC -fno-warn-name-shadowing #-} #include "overlapping-compat.h" -module Servant.ClientSpec (spec) where +module Servant.ClientSpec (spec, Person(..), startWaiApp, endWaiApp) where import Prelude () import Prelude.Compat diff --git a/servant-client/test/Servant/StreamSpec.hs b/servant-client/test/Servant/StreamSpec.hs new file mode 100644 index 00000000..df9003ab --- /dev/null +++ b/servant-client/test/Servant/StreamSpec.hs @@ -0,0 +1,113 @@ +{-# LANGUAGE CPP #-} +{-# LANGUAGE ConstraintKinds #-} +{-# LANGUAGE DataKinds #-} +{-# LANGUAGE DeriveGeneric #-} +{-# LANGUAGE FlexibleContexts #-} +{-# LANGUAGE FlexibleInstances #-} +{-# LANGUAGE FunctionalDependencies #-} +{-# LANGUAGE GADTs #-} +{-# LANGUAGE MultiParamTypeClasses #-} +{-# LANGUAGE OverloadedStrings #-} +{-# LANGUAGE PolyKinds #-} +{-# LANGUAGE RecordWildCards #-} +{-# LANGUAGE ScopedTypeVariables #-} +{-# LANGUAGE StandaloneDeriving #-} +{-# LANGUAGE TypeFamilies #-} +{-# LANGUAGE TypeOperators #-} +{-# LANGUAGE UndecidableInstances #-} +#if __GLASGOW_HASKELL__ >= 800 +{-# OPTIONS_GHC -freduction-depth=100 #-} +#else +{-# OPTIONS_GHC -fcontext-stack=100 #-} +#endif +{-# OPTIONS_GHC -fno-warn-orphans #-} +{-# OPTIONS_GHC -fno-warn-name-shadowing #-} + +#include "overlapping-compat.h" +module Servant.StreamSpec (spec) where + +import Prelude () +import Prelude.Compat +import Data.Proxy +import qualified Network.HTTP.Client as C +import System.IO.Unsafe (unsafePerformIO) +import Test.Hspec + +import Servant.API ((:<|>) ((:<|>)), + (:>), + EmptyAPI, JSON, + StreamGet, + NewlineFraming, + NetstringFraming, + ResultStream(..), + StreamGenerator(..)) +import Servant.Client +import Servant.Server +import qualified Servant.ClientSpec as CS +import Servant.ClientSpec (Person(..)) + + +spec :: Spec +spec = describe "Servant.Stream" $ do + streamSpec + +type StreamApi f = + "streamGetNewline" :> StreamGet NewlineFraming JSON (f Person) + :<|> "streamGetNetstring" :> StreamGet NetstringFraming JSON (f Person) + :<|> EmptyAPI + + +capi :: Proxy (StreamApi ResultStream) +capi = Proxy + +sapi :: Proxy (StreamApi StreamGenerator) +sapi = Proxy + + +getGetNL :<|> getGetNS :<|> EmptyClient = client capi + + +getGetNL :: ClientM (ResultStream Person) +getGetNS :: ClientM (ResultStream Person) + +alice :: Person +alice = Person "Alice" 42 + +bob :: Person +bob = Person "Bob" 25 + +server :: Application +server = serve sapi ( + (return (StreamGenerator (\f r -> f alice >> r bob >> r alice)) + :: Handler (StreamGenerator Person)) + :<|> + (return (StreamGenerator (\f r -> f alice >> r bob >> r alice)) + :: Handler (StreamGenerator Person)) + :<|> + emptyServer) + + +{-# NOINLINE manager' #-} +manager' :: C.Manager +manager' = unsafePerformIO $ C.newManager C.defaultManagerSettings + +runClient :: ClientM a -> BaseUrl -> IO (Either ServantError a) +runClient x baseUrl' = runClientM x (ClientEnv manager' baseUrl') + +runResultStream :: ResultStream a -> IO (Maybe (Either String a), Maybe (Either String a), Maybe (Either String a), Maybe (Either String a)) +runResultStream (ResultStream k) = k $ \act -> (,,,) <$> act <*> act <*> act <*> act + +streamSpec :: Spec +streamSpec = beforeAll (CS.startWaiApp server) $ afterAll CS.endWaiApp $ do + + it "Servant.API.StreamGet.Newline" $ \(_, baseUrl) -> do + Right res <- runClient getGetNL baseUrl + let jra = Just (Right alice) + jrb = Just (Right bob) + runResultStream res `shouldReturn` (jra, jrb, jra, Nothing) + + it "Servant.API.StreamGet.Netstring" $ \(_, baseUrl) -> do + Right res <- runClient getGetNS baseUrl + let jra = Just (Right alice) + jrb = Just (Right bob) + runResultStream res `shouldReturn` (jra, jrb, jra, Nothing) diff --git a/servant/src/Servant/API.hs b/servant/src/Servant/API.hs index bb2a2875..84f3d861 100644 --- a/servant/src/Servant/API.hs +++ b/servant/src/Servant/API.hs @@ -83,13 +83,15 @@ import Servant.API.IsSecure (IsSecure (..)) import Servant.API.QueryParam (QueryFlag, QueryParam, QueryParams) import Servant.API.Raw (Raw) -import Servant.API.Stream (Stream, StreamGenerator (..), +import Servant.API.Stream (Stream, StreamGet, StreamPost, + StreamGenerator (..), ToStreamGenerator (..), ResultStream(..), BuildFromStream (..), ByteStringParser (..), FramingRender (..), BoundaryStrategy (..), FramingUnrender (..), - NewlineFraming) + NewlineFraming, + NetstringFraming) import Servant.API.RemoteHost (RemoteHost) import Servant.API.ReqBody (ReqBody) import Servant.API.ResponseHeaders (AddHeader, addHeader, noHeader, diff --git a/servant/src/Servant/API/Stream.hs b/servant/src/Servant/API/Stream.hs index 01cd0898..50828c7c 100644 --- a/servant/src/Servant/API/Stream.hs +++ b/servant/src/Servant/API/Stream.hs @@ -14,9 +14,12 @@ module Servant.API.Stream where import Data.ByteString.Lazy (ByteString, empty) import qualified Data.ByteString.Lazy.Char8 as LB +import Data.Monoid ((<>)) import Data.Proxy (Proxy) import Data.Typeable (Typeable) import GHC.Generics (Generic) +import Text.Read (readMaybe) +import Data.Bifunctor (first) import Network.HTTP.Types.Method (StdMethod (..)) -- | A Stream endpoint for a given method emits a stream of encoded values at a given Content-Type, delimited by a framing strategy. Steam endpoints always return response code 200 on success. Type synonyms are provided for standard methods. @@ -93,15 +96,22 @@ data NetstringFraming instance FramingRender NetstringFraming a where header _ _ = empty - boundary _ _ = BoundaryStrategyBracket $ \b -> (LB.pack . show . LB.length $ b, "") + boundary _ _ = BoundaryStrategyBracket $ \b -> ((<> ":") . LB.pack . show . LB.length $ b, ",") trailer _ _ = empty -{- instance FramingUnrender NetstringFraming a where - unrenderFrames _ _ = (, \b -> let (i,r) = LB.break (==':') b - in case readMaybe (LB.unpack i) of - Just len -> first Right $ LB.splitAt len . LB.drop 1 $ r - Nothing -> (Left ("Bad netstring frame, couldn't parse value as integer value: " ++ LB.unpack i), LB.drop 1 . LB.dropWhile (/= ',') $ r) - ) --} \ No newline at end of file + unrenderFrames _ _ = ByteStringParser (Just . (go,)) (go,) + where go = ByteStringParser + (\b -> let (i,r) = LB.break (==':') b + in case readMaybe (LB.unpack i) of + Just len -> if LB.length r > len + then Just . first Right . fmap (LB.drop 1) $ LB.splitAt len . LB.drop 1 $ r + else Nothing + Nothing -> Just (Left ("Bad netstring frame, couldn't parse value as integer value: " ++ LB.unpack i), LB.drop 1 . LB.dropWhile (/= ',') $ r)) + (\b -> let (i,r) = LB.break (==':') b + in case readMaybe (LB.unpack i) of + Just len -> if LB.length r > len + then first Right . fmap (LB.drop 1) $ LB.splitAt len . LB.drop 1 $ r + else (Right $ LB.take len r, LB.empty) + Nothing -> (Left ("Bad netstring frame, couldn't parse value as integer value: " ++ LB.unpack i), LB.drop 1 . LB.dropWhile (/= ',') $ r)) From b704d3c067be44c80b091f79133b2db9108f7ed5 Mon Sep 17 00:00:00 2001 From: Gershom Date: Sat, 4 Nov 2017 00:20:50 -0400 Subject: [PATCH 08/10] fixup cabal from bad merge --- servant-server/servant-server.cabal | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/servant-server/servant-server.cabal b/servant-server/servant-server.cabal index 3dd3200b..d381b2a7 100644 --- a/servant-server/servant-server.cabal +++ b/servant-server/servant-server.cabal @@ -64,7 +64,8 @@ library , containers >= 0.5 && < 0.6 , exceptions >= 0.8 && < 0.9 , http-api-data >= 0.3 && < 0.4 - , http-types >= 0.8 && < 0.10 + , http-media >= 0.4 && < 0.8 + , http-types >= 0.8 && < 0.11 , network-uri >= 2.6 && < 2.7 , monad-control >= 1.0.0.4 && < 1.1 , mtl >= 2 && < 2.3 From 90292e1f62152fcfe9489b10b40d01d39a897399 Mon Sep 17 00:00:00 2001 From: Gershom Date: Mon, 6 Nov 2017 11:37:00 -0500 Subject: [PATCH 09/10] move statuscheck earlier on streaming response to give good error --- .../Servant/Client/Core/Internal/HasClient.hs | 1 - .../src/Servant/Client/Internal/HttpClient.hs | 16 ++++++++++------ 2 files changed, 10 insertions(+), 7 deletions(-) diff --git a/servant-client-core/src/Servant/Client/Core/Internal/HasClient.hs b/servant-client-core/src/Servant/Client/Core/Internal/HasClient.hs index 29bafb13..797f1f77 100644 --- a/servant-client-core/src/Servant/Client/Core/Internal/HasClient.hs +++ b/servant-client-core/src/Servant/Client/Core/Internal/HasClient.hs @@ -267,7 +267,6 @@ instance OVERLAPPABLE_ } return . buildFromStream $ ResultStream $ \k -> runStreamingResponse sresp $ \(status,_headers,_httpversion,reader) -> do - when (H.statusCode status /= 200) $ error "bad status" -- TODO fixme let unrender = unrenderFrames (Proxy :: Proxy framing) (Proxy :: Proxy a) loop bs = do res <- BL.fromStrict <$> reader diff --git a/servant-client/src/Servant/Client/Internal/HttpClient.hs b/servant-client/src/Servant/Client/Internal/HttpClient.hs index 94086306..a0b8fcb1 100644 --- a/servant-client/src/Servant/Client/Internal/HttpClient.hs +++ b/servant-client/src/Servant/Client/Internal/HttpClient.hs @@ -111,7 +111,7 @@ performRequest req = do Right response -> do let status = Client.responseStatus response status_code = statusCode status - ourResponse = clientResponseToReponse response + ourResponse = clientResponseToResponse response unless (status_code >= 200 && status_code < 300) $ throwError $ FailureResponse ourResponse return ourResponse @@ -121,14 +121,18 @@ performStreamingRequest req = do m <- asks manager burl <- asks baseUrl let request = requestToClientRequest burl req - return $ StreamingResponse $ \k -> Client.withResponse request m $ - \r -> - k (Client.responseStatus r, fromList $ Client.responseHeaders r, Client.responseVersion r, Client.responseBody r) + \r -> do + let status = Client.responseStatus r + status_code = statusCode status + unless (status_code >= 200 && status_code < 300) $ do + b <- BSL.fromChunks <$> Client.brConsume (Client.responseBody r) + throw $ FailureResponse $ Response status b (fromList $ Client.responseHeaders r) (Client.responseVersion r) + k (status, fromList $ Client.responseHeaders r, Client.responseVersion r, Client.responseBody r) -clientResponseToReponse :: Client.Response BSL.ByteString -> Response -clientResponseToReponse r = Response +clientResponseToResponse :: Client.Response BSL.ByteString -> Response +clientResponseToResponse r = Response { responseStatusCode = Client.responseStatus r , responseBody = Client.responseBody r , responseHeaders = fromList $ Client.responseHeaders r From db13077ccb93223e606ace6f7a67ac9389375a6a Mon Sep 17 00:00:00 2001 From: Gershom Date: Mon, 6 Nov 2017 11:55:27 -0500 Subject: [PATCH 10/10] wall clean and compat --- .../src/Servant/Client/Core/Internal/HasClient.hs | 3 +-- servant/src/Servant/API/Stream.hs | 2 +- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/servant-client-core/src/Servant/Client/Core/Internal/HasClient.hs b/servant-client-core/src/Servant/Client/Core/Internal/HasClient.hs index 797f1f77..ef5bdce4 100644 --- a/servant-client-core/src/Servant/Client/Core/Internal/HasClient.hs +++ b/servant-client-core/src/Servant/Client/Core/Internal/HasClient.hs @@ -18,7 +18,6 @@ import Prelude () import Prelude.Compat import Control.Concurrent (newMVar, modifyMVar) -import Control.Monad (when) import Data.Foldable (toList) import qualified Data.ByteString.Lazy as BL import Data.List (foldl') @@ -266,7 +265,7 @@ instance OVERLAPPABLE_ , requestMethod = reflectMethod (Proxy :: Proxy method) } return . buildFromStream $ ResultStream $ \k -> - runStreamingResponse sresp $ \(status,_headers,_httpversion,reader) -> do + runStreamingResponse sresp $ \(_status,_headers,_httpversion,reader) -> do let unrender = unrenderFrames (Proxy :: Proxy framing) (Proxy :: Proxy a) loop bs = do res <- BL.fromStrict <$> reader diff --git a/servant/src/Servant/API/Stream.hs b/servant/src/Servant/API/Stream.hs index 50828c7c..073e0ce1 100644 --- a/servant/src/Servant/API/Stream.hs +++ b/servant/src/Servant/API/Stream.hs @@ -19,7 +19,7 @@ import Data.Proxy (Proxy) import Data.Typeable (Typeable) import GHC.Generics (Generic) import Text.Read (readMaybe) -import Data.Bifunctor (first) +import Control.Arrow (first) import Network.HTTP.Types.Method (StdMethod (..)) -- | A Stream endpoint for a given method emits a stream of encoded values at a given Content-Type, delimited by a framing strategy. Steam endpoints always return response code 200 on success. Type synonyms are provided for standard methods.