streaming client actually streams

This commit is contained in:
Gershom 2017-10-24 17:12:21 -07:00
parent 9a2ac6f4dd
commit e75a3cc37b
7 changed files with 82 additions and 34 deletions

View file

@ -43,6 +43,7 @@ module Servant.Client.Core
, Response(..)
, RunClient(..)
, module Servant.Client.Core.Internal.BaseUrl
, StreamingResponse(..)
-- * Writing HasClient instances
-- | These functions need not be re-exported by backend libraries.

View file

@ -4,6 +4,7 @@
{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE InstanceSigs #-}
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE PolyKinds #-}
{-# LANGUAGE ScopedTypeVariables #-}
@ -17,9 +18,12 @@ module Servant.Client.Core.Internal.HasClient where
import Prelude ()
import Prelude.Compat
import Control.Concurrent (newMVar, modifyMVar)
import Control.Monad (when)
import Data.Foldable (toList)
import qualified Data.ByteString.Lazy as BL
import Data.List (foldl')
import Data.Monoid ((<>))
import Data.Proxy (Proxy (Proxy))
import Data.Sequence (fromList)
import Data.String (fromString)
@ -31,6 +35,7 @@ import Servant.API ((:<|>) ((:<|>)), (:>),
BasicAuthData,
BuildHeadersTo (..),
BuildFromStream (..),
ByteStringParser (..),
Capture, CaptureAll,
Description, EmptyAPI,
FramingUnrender (..),
@ -248,44 +253,54 @@ instance OVERLAPPING_
, getHeadersHList = buildHeadersTo . toList $ responseHeaders response
}
data ResultStream a = ResultStream ((forall b. (IO (Maybe (Either String a)) -> IO b) -> IO b))
instance OVERLAPPABLE_
( RunClient m, MimeUnrender ct a, ReflectMethod method,
FramingUnrender framing a, BuildFromStream a (f a)
) => HasClient m (Stream method framing ct (f a)) where
type Client m (Stream method framing ct (f a)) = m (f a)
type Client m (Stream method framing ct (f a)) = m (ResultStream a)
clientWithRoute _pm Proxy req = do
response <- runRequest req
sresp <- streamingRequest req
{ requestAccept = fromList [contentType (Proxy :: Proxy ct)]
, requestMethod = reflectMethod (Proxy :: Proxy method)
}
return $ decodeFramed (Proxy :: Proxy framing) (Proxy :: Proxy ct) (Proxy :: Proxy a) response
return $ ResultStream $ \k ->
runStreamingResponse sresp $ \(status,_headers,_httpversion,reader) -> do
when (H.statusCode status /= 200) $ error "bad status" --fixme
let unrender = unrenderFrames (Proxy :: Proxy framing) (Proxy :: Proxy a)
loop bs = do
res <- BL.fromStrict <$> reader
if BL.null res
then return $ parseEOF unrender res
else let sofar = (bs <> res)
in case parseIncremental unrender sofar of
Just x -> return x
Nothing -> loop sofar
(frameParser, remainder) <- loop BL.empty
state <- newMVar remainder
let frameLoop bs = do
res <- BL.fromStrict <$> reader
let addIsEmptyInfo (a, r) = (r, (a, BL.null r && BL.null res))
if BL.null res
then return . addIsEmptyInfo $ parseEOF frameParser res
else let sofar = (bs <> res)
in case parseIncremental frameParser res of
Just x -> return $ addIsEmptyInfo x
Nothing -> frameLoop sofar
instance OVERLAPPING_
( RunClient m, BuildHeadersTo ls, MimeUnrender ct a, ReflectMethod method,
FramingUnrender framing a, BuildFromStream a (f a)
) => HasClient m (Stream method framing ct (Headers ls (f a))) where
type Client m (Stream method framing ct (Headers ls (f a))) = m (Headers ls (f a))
clientWithRoute _pm Proxy req = do
response <- runRequest req
{ requestAccept = fromList [contentType (Proxy :: Proxy ct)]
, requestMethod = reflectMethod (Proxy :: Proxy method)
}
return Headers { getResponse = decodeFramed (Proxy :: Proxy framing) (Proxy :: Proxy ct) (Proxy :: Proxy a) response
, getHeadersHList = buildHeadersTo . toList $ responseHeaders response
}
go = processResult <$> modifyMVar state frameLoop
processResult (Right bs,isDone) =
if BL.null bs && isDone
then Nothing
else Just $ case mimeUnrender (Proxy :: Proxy ct) bs :: Either String a of
Left err -> Left err
Right x -> Right x
processResult (Left err, _) = Just (Left err)
k go
decodeFramed :: forall ctype strategy a b.
(MimeUnrender ctype a, FramingUnrender strategy a, BuildFromStream a b) =>
Proxy strategy -> Proxy ctype -> Proxy a -> Response -> b
decodeFramed framingproxy ctypeproxy typeproxy response =
let (body, uncons) = unrenderFrames framingproxy typeproxy (responseBody response)
loop b | BL.null b = []
| otherwise = case uncons b of
(Right x, r) -> case mimeUnrender ctypeproxy x :: Either String a of
Left err -> Left err : loop r
Right x' -> Right x' : loop r
(Left err, r) -> Left err : loop r
in buildFromStream $ loop body
-- | If you use a 'Header' in one of your endpoints in your API,
-- the corresponding querying function will automatically take

View file

@ -4,6 +4,7 @@
{-# LANGUAGE DeriveGeneric #-}
{-# LANGUAGE GeneralizedNewtypeDeriving #-}
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TypeFamilies #-}
@ -15,6 +16,7 @@ import Prelude.Compat
import Control.Monad.Catch (Exception)
import qualified Data.ByteString.Builder as Builder
import qualified Data.ByteString as BS
import qualified Data.ByteString.Lazy as LBS
import Data.Semigroup ((<>))
import qualified Data.Sequence as Seq
@ -70,6 +72,8 @@ data Response = Response
, responseHttpVersion :: HttpVersion
} deriving (Eq, Show, Generic, Typeable)
data StreamingResponse = StreamingResponse { runStreamingResponse :: forall a. ((Status, Seq.Seq Header, HttpVersion, IO BS.ByteString) -> IO a) -> IO a }
-- A GET request to the top-level path
defaultRequest :: Request
defaultRequest = Request

View file

@ -19,11 +19,13 @@ import Servant.API (MimeUnrender,
contentTypes,
mimeUnrender)
import Servant.Client.Core.Internal.Request (Request, Response (..),
StreamingResponse (..),
ServantError (..))
class (Monad m) => RunClient m where
-- | How to make a request.
runRequest :: Request -> m Response
streamingRequest :: Request -> m StreamingResponse
throwServantError :: ServantError -> m a
catchServantError :: m a -> (ServantError -> m a) -> m a

View file

@ -88,6 +88,7 @@ instance Alt ClientM where
instance RunClient ClientM where
runRequest = performRequest
streamingRequest = performStreamingRequest
throwServantError = throwError
catchServantError = catchError
@ -115,6 +116,17 @@ performRequest req = do
throwError $ FailureResponse ourResponse
return ourResponse
performStreamingRequest :: Request -> ClientM StreamingResponse
performStreamingRequest req = do
m <- asks manager
burl <- asks baseUrl
let request = requestToClientRequest burl req
return $ StreamingResponse $
\k -> Client.withResponse request m $
\r ->
k (Client.responseStatus r, fromList $ Client.responseHeaders r, Client.responseVersion r, Client.responseBody r)
clientResponseToReponse :: Client.Response BSL.ByteString -> Response
clientResponseToReponse r = Response
{ responseStatusCode = Client.responseStatus r

View file

@ -85,6 +85,7 @@ import Servant.API.QueryParam (QueryFlag, QueryParam,
import Servant.API.Raw (Raw)
import Servant.API.Stream (Stream, StreamGenerator (..),
ToStreamGenerator (..), BuildFromStream (..),
ByteStringParser (..),
FramingRender (..), BoundaryStrategy (..),
FramingUnrender (..),
NewlineFraming)

View file

@ -11,13 +11,11 @@
module Servant.API.Stream where
import Control.Arrow ((***), first)
import Data.ByteString.Lazy (ByteString, empty)
import qualified Data.ByteString.Lazy.Char8 as LB
import Data.Proxy (Proxy)
import Data.Typeable (Typeable)
import GHC.Generics (Generic)
import Text.Read (readMaybe)
import Network.HTTP.Types.Method (StdMethod (..))
-- | A Stream endpoint for a given method emits a stream of encoded values at a given Content-Type, delimited by a framing strategy. Steam endpoints always return response code 200 on success. Type synonyms are provided for standard methods.
@ -57,10 +55,16 @@ data BoundaryStrategy = BoundaryStrategyBracket (ByteString -> (ByteString,ByteS
| BoundaryStrategyIntersperse ByteString
| BoundaryStrategyGeneral (ByteString -> ByteString)
-- | The FramingUnrender class provides the logic for parsing a framing strategy. Given a ByteString, it strips the header, and returns a tuple of the remainder along with a step function that can progressively "uncons" elements from this remainder. The error state is presented per-frame so that protocols that can resume after errors are able to do so. Termination of the unrendering is indicated by return of an empty reminder.
-- | A type of parser that can never fail, and has different parsing strategies (incremental, or EOF) depending if more input can be sent. The incremental parser should return `Nothing` if it would like to be sent a longer ByteString. If it returns a value, it also returns the remainder following that value.
data ByteStringParser a = ByteStringParser {
parseIncremental :: ByteString -> Maybe (a, ByteString),
parseEOF :: ByteString -> (a, ByteString)
}
-- | The FramingUnrender class provides the logic for parsing a framing strategy. The outer @ByteStringParser@ strips the header from a stream of bytes, and yields a parser that can handle the remainder, stepwise. Each frame may be a ByteString, or a String indicating the error state for that frame. Such states are per-frame, so that protocols that can resume after errors are able to do so. Eventually this returns an empty ByteString to indicate termination.
class FramingUnrender strategy a where
unrenderFrames :: Proxy strategy -> Proxy a -> ByteString -> (ByteString, ByteString -> (Either String ByteString, ByteString))
unrenderFrames :: Proxy strategy -> Proxy a -> ByteStringParser (ByteStringParser (Either String ByteString))
-- | A simple framing strategy that has no header or termination, and inserts a newline character between each frame.
-- This assumes that it is used with a Content-Type that encodes without newlines (e.g. JSON).
@ -72,8 +76,14 @@ instance FramingRender NewlineFraming a where
trailer _ _ = empty
instance FramingUnrender NewlineFraming a where
unrenderFrames _ _ = (, (Right *** LB.drop 1) . LB.break (== '\n'))
unrenderFrames _ _ = ByteStringParser (Just . (go,)) (go,)
where go = ByteStringParser
(\x -> case LB.break (== '\n') x of
(h,r) -> if not (LB.null r) then Just (Right h, LB.drop 1 r) else Nothing
)
(\x -> case LB.break (== '\n') x of
(h,r) -> (Right h, LB.drop 1 r)
)
-- | The netstring framing strategy as defined by djb: <http://cr.yp.to/proto/netstrings.txt>
data NetstringFraming
@ -82,9 +92,12 @@ instance FramingRender NetstringFraming a where
boundary _ _ = BoundaryStrategyBracket $ \b -> (LB.pack . show . LB.length $ b, "")
trailer _ _ = empty
{-
instance FramingUnrender NetstringFraming a where
unrenderFrames _ _ = (, \b -> let (i,r) = LB.break (==':') b
in case readMaybe (LB.unpack i) of
Just len -> first Right $ LB.splitAt len . LB.drop 1 $ r
Nothing -> (Left ("Bad netstring frame, couldn't parse value as integer value: " ++ LB.unpack i), LB.drop 1 . LB.dropWhile (/= ',') $ r)
)
-}