From e75a3cc37be56d3013918ef4330938b1cb0b1fda Mon Sep 17 00:00:00 2001 From: Gershom Date: Tue, 24 Oct 2017 17:12:21 -0700 Subject: [PATCH] streaming client actually streams --- .../src/Servant/Client/Core.hs | 1 + .../Servant/Client/Core/Internal/HasClient.hs | 71 +++++++++++-------- .../Servant/Client/Core/Internal/Request.hs | 4 ++ .../Servant/Client/Core/Internal/RunClient.hs | 2 + .../src/Servant/Client/Internal/HttpClient.hs | 12 ++++ servant/src/Servant/API.hs | 1 + servant/src/Servant/API/Stream.hs | 25 +++++-- 7 files changed, 82 insertions(+), 34 deletions(-) diff --git a/servant-client-core/src/Servant/Client/Core.hs b/servant-client-core/src/Servant/Client/Core.hs index a926c169..73160abf 100644 --- a/servant-client-core/src/Servant/Client/Core.hs +++ b/servant-client-core/src/Servant/Client/Core.hs @@ -43,6 +43,7 @@ module Servant.Client.Core , Response(..) , RunClient(..) , module Servant.Client.Core.Internal.BaseUrl + , StreamingResponse(..) -- * Writing HasClient instances -- | These functions need not be re-exported by backend libraries. diff --git a/servant-client-core/src/Servant/Client/Core/Internal/HasClient.hs b/servant-client-core/src/Servant/Client/Core/Internal/HasClient.hs index fda931bd..5d320379 100644 --- a/servant-client-core/src/Servant/Client/Core/Internal/HasClient.hs +++ b/servant-client-core/src/Servant/Client/Core/Internal/HasClient.hs @@ -4,6 +4,7 @@ {-# LANGUAGE FlexibleInstances #-} {-# LANGUAGE InstanceSigs #-} {-# LANGUAGE MultiParamTypeClasses #-} +{-# LANGUAGE RankNTypes #-} {-# LANGUAGE OverloadedStrings #-} {-# LANGUAGE PolyKinds #-} {-# LANGUAGE ScopedTypeVariables #-} @@ -17,9 +18,12 @@ module Servant.Client.Core.Internal.HasClient where import Prelude () import Prelude.Compat +import Control.Concurrent (newMVar, modifyMVar) +import Control.Monad (when) import Data.Foldable (toList) import qualified Data.ByteString.Lazy as BL import Data.List (foldl') +import Data.Monoid ((<>)) import Data.Proxy (Proxy (Proxy)) import Data.Sequence (fromList) import Data.String (fromString) @@ -31,6 +35,7 @@ import Servant.API ((:<|>) ((:<|>)), (:>), BasicAuthData, BuildHeadersTo (..), BuildFromStream (..), + ByteStringParser (..), Capture, CaptureAll, Description, EmptyAPI, FramingUnrender (..), @@ -248,44 +253,54 @@ instance OVERLAPPING_ , getHeadersHList = buildHeadersTo . toList $ responseHeaders response } +data ResultStream a = ResultStream ((forall b. (IO (Maybe (Either String a)) -> IO b) -> IO b)) + instance OVERLAPPABLE_ ( RunClient m, MimeUnrender ct a, ReflectMethod method, FramingUnrender framing a, BuildFromStream a (f a) ) => HasClient m (Stream method framing ct (f a)) where - type Client m (Stream method framing ct (f a)) = m (f a) + + type Client m (Stream method framing ct (f a)) = m (ResultStream a) + clientWithRoute _pm Proxy req = do - response <- runRequest req + sresp <- streamingRequest req { requestAccept = fromList [contentType (Proxy :: Proxy ct)] , requestMethod = reflectMethod (Proxy :: Proxy method) } - return $ decodeFramed (Proxy :: Proxy framing) (Proxy :: Proxy ct) (Proxy :: Proxy a) response + return $ ResultStream $ \k -> + runStreamingResponse sresp $ \(status,_headers,_httpversion,reader) -> do + when (H.statusCode status /= 200) $ error "bad status" --fixme + let unrender = unrenderFrames (Proxy :: Proxy framing) (Proxy :: Proxy a) + loop bs = do + res <- BL.fromStrict <$> reader + if BL.null res + then return $ parseEOF unrender res + else let sofar = (bs <> res) + in case parseIncremental unrender sofar of + Just x -> return x + Nothing -> loop sofar + (frameParser, remainder) <- loop BL.empty + state <- newMVar remainder + let frameLoop bs = do + res <- BL.fromStrict <$> reader + let addIsEmptyInfo (a, r) = (r, (a, BL.null r && BL.null res)) + if BL.null res + then return . addIsEmptyInfo $ parseEOF frameParser res + else let sofar = (bs <> res) + in case parseIncremental frameParser res of + Just x -> return $ addIsEmptyInfo x + Nothing -> frameLoop sofar -instance OVERLAPPING_ - ( RunClient m, BuildHeadersTo ls, MimeUnrender ct a, ReflectMethod method, - FramingUnrender framing a, BuildFromStream a (f a) - ) => HasClient m (Stream method framing ct (Headers ls (f a))) where - type Client m (Stream method framing ct (Headers ls (f a))) = m (Headers ls (f a)) - clientWithRoute _pm Proxy req = do - response <- runRequest req - { requestAccept = fromList [contentType (Proxy :: Proxy ct)] - , requestMethod = reflectMethod (Proxy :: Proxy method) - } - return Headers { getResponse = decodeFramed (Proxy :: Proxy framing) (Proxy :: Proxy ct) (Proxy :: Proxy a) response - , getHeadersHList = buildHeadersTo . toList $ responseHeaders response - } + go = processResult <$> modifyMVar state frameLoop + processResult (Right bs,isDone) = + if BL.null bs && isDone + then Nothing + else Just $ case mimeUnrender (Proxy :: Proxy ct) bs :: Either String a of + Left err -> Left err + Right x -> Right x + processResult (Left err, _) = Just (Left err) + k go -decodeFramed :: forall ctype strategy a b. - (MimeUnrender ctype a, FramingUnrender strategy a, BuildFromStream a b) => - Proxy strategy -> Proxy ctype -> Proxy a -> Response -> b -decodeFramed framingproxy ctypeproxy typeproxy response = - let (body, uncons) = unrenderFrames framingproxy typeproxy (responseBody response) - loop b | BL.null b = [] - | otherwise = case uncons b of - (Right x, r) -> case mimeUnrender ctypeproxy x :: Either String a of - Left err -> Left err : loop r - Right x' -> Right x' : loop r - (Left err, r) -> Left err : loop r - in buildFromStream $ loop body -- | If you use a 'Header' in one of your endpoints in your API, -- the corresponding querying function will automatically take diff --git a/servant-client-core/src/Servant/Client/Core/Internal/Request.hs b/servant-client-core/src/Servant/Client/Core/Internal/Request.hs index 458219b9..b120c7f7 100644 --- a/servant-client-core/src/Servant/Client/Core/Internal/Request.hs +++ b/servant-client-core/src/Servant/Client/Core/Internal/Request.hs @@ -4,6 +4,7 @@ {-# LANGUAGE DeriveGeneric #-} {-# LANGUAGE GeneralizedNewtypeDeriving #-} {-# LANGUAGE MultiParamTypeClasses #-} +{-# LANGUAGE RankNTypes #-} {-# LANGUAGE OverloadedStrings #-} {-# LANGUAGE ScopedTypeVariables #-} {-# LANGUAGE TypeFamilies #-} @@ -15,6 +16,7 @@ import Prelude.Compat import Control.Monad.Catch (Exception) import qualified Data.ByteString.Builder as Builder +import qualified Data.ByteString as BS import qualified Data.ByteString.Lazy as LBS import Data.Semigroup ((<>)) import qualified Data.Sequence as Seq @@ -70,6 +72,8 @@ data Response = Response , responseHttpVersion :: HttpVersion } deriving (Eq, Show, Generic, Typeable) +data StreamingResponse = StreamingResponse { runStreamingResponse :: forall a. ((Status, Seq.Seq Header, HttpVersion, IO BS.ByteString) -> IO a) -> IO a } + -- A GET request to the top-level path defaultRequest :: Request defaultRequest = Request diff --git a/servant-client-core/src/Servant/Client/Core/Internal/RunClient.hs b/servant-client-core/src/Servant/Client/Core/Internal/RunClient.hs index 564cbb39..88b39a04 100644 --- a/servant-client-core/src/Servant/Client/Core/Internal/RunClient.hs +++ b/servant-client-core/src/Servant/Client/Core/Internal/RunClient.hs @@ -19,11 +19,13 @@ import Servant.API (MimeUnrender, contentTypes, mimeUnrender) import Servant.Client.Core.Internal.Request (Request, Response (..), + StreamingResponse (..), ServantError (..)) class (Monad m) => RunClient m where -- | How to make a request. runRequest :: Request -> m Response + streamingRequest :: Request -> m StreamingResponse throwServantError :: ServantError -> m a catchServantError :: m a -> (ServantError -> m a) -> m a diff --git a/servant-client/src/Servant/Client/Internal/HttpClient.hs b/servant-client/src/Servant/Client/Internal/HttpClient.hs index d116b443..94086306 100644 --- a/servant-client/src/Servant/Client/Internal/HttpClient.hs +++ b/servant-client/src/Servant/Client/Internal/HttpClient.hs @@ -88,6 +88,7 @@ instance Alt ClientM where instance RunClient ClientM where runRequest = performRequest + streamingRequest = performStreamingRequest throwServantError = throwError catchServantError = catchError @@ -115,6 +116,17 @@ performRequest req = do throwError $ FailureResponse ourResponse return ourResponse +performStreamingRequest :: Request -> ClientM StreamingResponse +performStreamingRequest req = do + m <- asks manager + burl <- asks baseUrl + let request = requestToClientRequest burl req + + return $ StreamingResponse $ + \k -> Client.withResponse request m $ + \r -> + k (Client.responseStatus r, fromList $ Client.responseHeaders r, Client.responseVersion r, Client.responseBody r) + clientResponseToReponse :: Client.Response BSL.ByteString -> Response clientResponseToReponse r = Response { responseStatusCode = Client.responseStatus r diff --git a/servant/src/Servant/API.hs b/servant/src/Servant/API.hs index cfb99dc4..a2495dc9 100644 --- a/servant/src/Servant/API.hs +++ b/servant/src/Servant/API.hs @@ -85,6 +85,7 @@ import Servant.API.QueryParam (QueryFlag, QueryParam, import Servant.API.Raw (Raw) import Servant.API.Stream (Stream, StreamGenerator (..), ToStreamGenerator (..), BuildFromStream (..), + ByteStringParser (..), FramingRender (..), BoundaryStrategy (..), FramingUnrender (..), NewlineFraming) diff --git a/servant/src/Servant/API/Stream.hs b/servant/src/Servant/API/Stream.hs index 1cb0491c..e676bc6b 100644 --- a/servant/src/Servant/API/Stream.hs +++ b/servant/src/Servant/API/Stream.hs @@ -11,13 +11,11 @@ module Servant.API.Stream where -import Control.Arrow ((***), first) import Data.ByteString.Lazy (ByteString, empty) import qualified Data.ByteString.Lazy.Char8 as LB import Data.Proxy (Proxy) import Data.Typeable (Typeable) import GHC.Generics (Generic) -import Text.Read (readMaybe) import Network.HTTP.Types.Method (StdMethod (..)) -- | A Stream endpoint for a given method emits a stream of encoded values at a given Content-Type, delimited by a framing strategy. Steam endpoints always return response code 200 on success. Type synonyms are provided for standard methods. @@ -57,10 +55,16 @@ data BoundaryStrategy = BoundaryStrategyBracket (ByteString -> (ByteString,ByteS | BoundaryStrategyIntersperse ByteString | BoundaryStrategyGeneral (ByteString -> ByteString) --- | The FramingUnrender class provides the logic for parsing a framing strategy. Given a ByteString, it strips the header, and returns a tuple of the remainder along with a step function that can progressively "uncons" elements from this remainder. The error state is presented per-frame so that protocols that can resume after errors are able to do so. Termination of the unrendering is indicated by return of an empty reminder. +-- | A type of parser that can never fail, and has different parsing strategies (incremental, or EOF) depending if more input can be sent. The incremental parser should return `Nothing` if it would like to be sent a longer ByteString. If it returns a value, it also returns the remainder following that value. +data ByteStringParser a = ByteStringParser { + parseIncremental :: ByteString -> Maybe (a, ByteString), + parseEOF :: ByteString -> (a, ByteString) +} +-- | The FramingUnrender class provides the logic for parsing a framing strategy. The outer @ByteStringParser@ strips the header from a stream of bytes, and yields a parser that can handle the remainder, stepwise. Each frame may be a ByteString, or a String indicating the error state for that frame. Such states are per-frame, so that protocols that can resume after errors are able to do so. Eventually this returns an empty ByteString to indicate termination. class FramingUnrender strategy a where - unrenderFrames :: Proxy strategy -> Proxy a -> ByteString -> (ByteString, ByteString -> (Either String ByteString, ByteString)) + unrenderFrames :: Proxy strategy -> Proxy a -> ByteStringParser (ByteStringParser (Either String ByteString)) + -- | A simple framing strategy that has no header or termination, and inserts a newline character between each frame. -- This assumes that it is used with a Content-Type that encodes without newlines (e.g. JSON). @@ -72,8 +76,14 @@ instance FramingRender NewlineFraming a where trailer _ _ = empty instance FramingUnrender NewlineFraming a where - unrenderFrames _ _ = (, (Right *** LB.drop 1) . LB.break (== '\n')) - + unrenderFrames _ _ = ByteStringParser (Just . (go,)) (go,) + where go = ByteStringParser + (\x -> case LB.break (== '\n') x of + (h,r) -> if not (LB.null r) then Just (Right h, LB.drop 1 r) else Nothing + ) + (\x -> case LB.break (== '\n') x of + (h,r) -> (Right h, LB.drop 1 r) + ) -- | The netstring framing strategy as defined by djb: data NetstringFraming @@ -82,9 +92,12 @@ instance FramingRender NetstringFraming a where boundary _ _ = BoundaryStrategyBracket $ \b -> (LB.pack . show . LB.length $ b, "") trailer _ _ = empty +{- + instance FramingUnrender NetstringFraming a where unrenderFrames _ _ = (, \b -> let (i,r) = LB.break (==':') b in case readMaybe (LB.unpack i) of Just len -> first Right $ LB.splitAt len . LB.drop 1 $ r Nothing -> (Left ("Bad netstring frame, couldn't parse value as integer value: " ++ LB.unpack i), LB.drop 1 . LB.dropWhile (/= ',') $ r) ) +-} \ No newline at end of file