Test that Stream combinator doesn't blow up memory.
This commit is contained in:
parent
624a42ebf0
commit
37482d69d7
4 changed files with 70 additions and 36 deletions
|
@ -82,7 +82,7 @@ library
|
||||||
|
|
||||||
test-suite spec
|
test-suite spec
|
||||||
type: exitcode-stdio-1.0
|
type: exitcode-stdio-1.0
|
||||||
ghc-options: -Wall
|
ghc-options: -Wall -rtsopts -with-rtsopts=-T
|
||||||
default-language: Haskell2010
|
default-language: Haskell2010
|
||||||
hs-source-dirs: test
|
hs-source-dirs: test
|
||||||
main-is: Spec.hs
|
main-is: Spec.hs
|
||||||
|
|
|
@ -183,10 +183,18 @@ requestToClientRequest burl r = Client.defaultRequest
|
||||||
where
|
where
|
||||||
hs = toList $ requestAccept r
|
hs = toList $ requestAccept r
|
||||||
|
|
||||||
|
convertBody bd = case bd of
|
||||||
|
RequestBodyLBS body' -> Client.RequestBodyLBS body'
|
||||||
|
RequestBodyBS body' -> Client.RequestBodyBS body'
|
||||||
|
RequestBodyBuilder size body' -> Client.RequestBodyBuilder size body'
|
||||||
|
RequestBodyStream size body' -> Client.RequestBodyStream size body'
|
||||||
|
RequestBodyStreamChunked body' -> Client.RequestBodyStreamChunked body'
|
||||||
|
RequestBodyIO body' -> Client.RequestBodyIO (convertBody <$> body')
|
||||||
|
|
||||||
(body, contentTypeHdr) = case requestBody r of
|
(body, contentTypeHdr) = case requestBody r of
|
||||||
Nothing -> (Client.RequestBodyLBS "", Nothing)
|
Nothing -> (Client.RequestBodyLBS "", Nothing)
|
||||||
Just (RequestBodyLBS body', typ)
|
Just (body', typ)
|
||||||
-> (Client.RequestBodyLBS body', Just (hContentType, renderHeader typ))
|
-> (convertBody body', Just (hContentType, renderHeader typ))
|
||||||
|
|
||||||
isSecure = case baseUrlScheme burl of
|
isSecure = case baseUrlScheme burl of
|
||||||
Http -> False
|
Http -> False
|
||||||
|
|
|
@ -103,6 +103,9 @@ instance FromJSON Person
|
||||||
instance ToForm Person
|
instance ToForm Person
|
||||||
instance FromForm Person
|
instance FromForm Person
|
||||||
|
|
||||||
|
instance Arbitrary Person where
|
||||||
|
arbitrary = Person <$> arbitrary <*> arbitrary
|
||||||
|
|
||||||
alice :: Person
|
alice :: Person
|
||||||
alice = Person "Alice" 42
|
alice = Person "Alice" 42
|
||||||
|
|
||||||
|
|
|
@ -26,25 +26,26 @@
|
||||||
#include "overlapping-compat.h"
|
#include "overlapping-compat.h"
|
||||||
module Servant.StreamSpec (spec) where
|
module Servant.StreamSpec (spec) where
|
||||||
|
|
||||||
import Prelude ()
|
import Control.Monad (replicateM_, void)
|
||||||
import Prelude.Compat
|
import qualified Data.ByteString as BS
|
||||||
import Data.Proxy
|
import Data.Proxy
|
||||||
import qualified Network.HTTP.Client as C
|
import GHC.Stats (currentBytesUsed, getGCStats)
|
||||||
import System.IO.Unsafe (unsafePerformIO)
|
import qualified Network.HTTP.Client as C
|
||||||
|
import Prelude ()
|
||||||
|
import Prelude.Compat
|
||||||
|
import System.IO (IOMode (ReadMode), withFile)
|
||||||
|
import System.IO.Unsafe (unsafePerformIO)
|
||||||
import Test.Hspec
|
import Test.Hspec
|
||||||
|
import Test.QuickCheck
|
||||||
|
|
||||||
import Servant.API ((:<|>) ((:<|>)),
|
import Servant.API ((:<|>) ((:<|>)), (:>), JSON,
|
||||||
(:>),
|
NetstringFraming, NewlineFraming,
|
||||||
EmptyAPI, JSON,
|
OctetStream, ResultStream (..),
|
||||||
StreamGet,
|
StreamGenerator (..), StreamGet)
|
||||||
NewlineFraming,
|
|
||||||
NetstringFraming,
|
|
||||||
ResultStream(..),
|
|
||||||
StreamGenerator(..))
|
|
||||||
import Servant.Client
|
import Servant.Client
|
||||||
|
import Servant.ClientSpec (Person (..))
|
||||||
|
import qualified Servant.ClientSpec as CS
|
||||||
import Servant.Server
|
import Servant.Server
|
||||||
import qualified Servant.ClientSpec as CS
|
|
||||||
import Servant.ClientSpec (Person(..))
|
|
||||||
|
|
||||||
|
|
||||||
spec :: Spec
|
spec :: Spec
|
||||||
|
@ -54,7 +55,7 @@ spec = describe "Servant.Stream" $ do
|
||||||
type StreamApi f =
|
type StreamApi f =
|
||||||
"streamGetNewline" :> StreamGet NewlineFraming JSON (f Person)
|
"streamGetNewline" :> StreamGet NewlineFraming JSON (f Person)
|
||||||
:<|> "streamGetNetstring" :> StreamGet NetstringFraming JSON (f Person)
|
:<|> "streamGetNetstring" :> StreamGet NetstringFraming JSON (f Person)
|
||||||
:<|> EmptyAPI
|
:<|> "streamALot" :> StreamGet NewlineFraming OctetStream (f BS.ByteString)
|
||||||
|
|
||||||
|
|
||||||
capi :: Proxy (StreamApi ResultStream)
|
capi :: Proxy (StreamApi ResultStream)
|
||||||
|
@ -63,12 +64,9 @@ capi = Proxy
|
||||||
sapi :: Proxy (StreamApi StreamGenerator)
|
sapi :: Proxy (StreamApi StreamGenerator)
|
||||||
sapi = Proxy
|
sapi = Proxy
|
||||||
|
|
||||||
|
getGetNL, getGetNS :: ClientM (ResultStream Person)
|
||||||
getGetNL :<|> getGetNS :<|> EmptyClient = client capi
|
getGetALot :: ClientM (ResultStream BS.ByteString)
|
||||||
|
getGetNL :<|> getGetNS :<|> getGetALot = client capi
|
||||||
|
|
||||||
getGetNL :: ClientM (ResultStream Person)
|
|
||||||
getGetNS :: ClientM (ResultStream Person)
|
|
||||||
|
|
||||||
alice :: Person
|
alice :: Person
|
||||||
alice = Person "Alice" 42
|
alice = Person "Alice" 42
|
||||||
|
@ -77,14 +75,24 @@ bob :: Person
|
||||||
bob = Person "Bob" 25
|
bob = Person "Bob" 25
|
||||||
|
|
||||||
server :: Application
|
server :: Application
|
||||||
server = serve sapi (
|
server = serve sapi
|
||||||
(return (StreamGenerator (\f r -> f alice >> r bob >> r alice))
|
$ return (StreamGenerator (\f r -> f alice >> r bob >> r alice))
|
||||||
:: Handler (StreamGenerator Person))
|
:<|> return (StreamGenerator (\f r -> f alice >> r bob >> r alice))
|
||||||
:<|>
|
:<|> return (StreamGenerator lotsGenerator)
|
||||||
(return (StreamGenerator (\f r -> f alice >> r bob >> r alice))
|
where
|
||||||
:: Handler (StreamGenerator Person))
|
lotsGenerator f r = do
|
||||||
:<|>
|
f ""
|
||||||
emptyServer)
|
withFile "/dev/urandom" ReadMode $
|
||||||
|
\handle -> streamFiveMBNTimes handle 1000 r
|
||||||
|
return ()
|
||||||
|
|
||||||
|
streamFiveMBNTimes handle left sink
|
||||||
|
| left <= 0 = return ""
|
||||||
|
| otherwise = do
|
||||||
|
msg <- BS.hGet handle (megabytes 5)
|
||||||
|
sink msg
|
||||||
|
streamFiveMBNTimes handle (left - 1) sink
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
{-# NOINLINE manager' #-}
|
{-# NOINLINE manager' #-}
|
||||||
|
@ -94,20 +102,35 @@ manager' = unsafePerformIO $ C.newManager C.defaultManagerSettings
|
||||||
runClient :: ClientM a -> BaseUrl -> IO (Either ServantError a)
|
runClient :: ClientM a -> BaseUrl -> IO (Either ServantError a)
|
||||||
runClient x baseUrl' = runClientM x (mkClientEnv manager' baseUrl')
|
runClient x baseUrl' = runClientM x (mkClientEnv manager' baseUrl')
|
||||||
|
|
||||||
runResultStream :: ResultStream a -> IO (Maybe (Either String a), Maybe (Either String a), Maybe (Either String a), Maybe (Either String a))
|
runResultStream :: ResultStream a
|
||||||
runResultStream (ResultStream k) = k $ \act -> (,,,) <$> act <*> act <*> act <*> act
|
-> IO ( Maybe (Either String a)
|
||||||
|
, Maybe (Either String a)
|
||||||
|
, Maybe (Either String a)
|
||||||
|
, Maybe (Either String a))
|
||||||
|
runResultStream (ResultStream k)
|
||||||
|
= k $ \act -> (,,,) <$> act <*> act <*> act <*> act
|
||||||
|
|
||||||
streamSpec :: Spec
|
streamSpec :: Spec
|
||||||
streamSpec = beforeAll (CS.startWaiApp server) $ afterAll CS.endWaiApp $ do
|
streamSpec = beforeAll (CS.startWaiApp server) $ afterAll CS.endWaiApp $ do
|
||||||
|
|
||||||
it "Servant.API.StreamGet.Newline" $ \(_, baseUrl) -> do
|
it "works with Servant.API.StreamGet.Newline" $ \(_, baseUrl) -> do
|
||||||
Right res <- runClient getGetNL baseUrl
|
Right res <- runClient getGetNL baseUrl
|
||||||
let jra = Just (Right alice)
|
let jra = Just (Right alice)
|
||||||
jrb = Just (Right bob)
|
jrb = Just (Right bob)
|
||||||
runResultStream res `shouldReturn` (jra, jrb, jra, Nothing)
|
runResultStream res `shouldReturn` (jra, jrb, jra, Nothing)
|
||||||
|
|
||||||
it "Servant.API.StreamGet.Netstring" $ \(_, baseUrl) -> do
|
it "works with Servant.API.StreamGet.Netstring" $ \(_, baseUrl) -> do
|
||||||
Right res <- runClient getGetNS baseUrl
|
Right res <- runClient getGetNS baseUrl
|
||||||
let jra = Just (Right alice)
|
let jra = Just (Right alice)
|
||||||
jrb = Just (Right bob)
|
jrb = Just (Right bob)
|
||||||
runResultStream res `shouldReturn` (jra, jrb, jra, Nothing)
|
runResultStream res `shouldReturn` (jra, jrb, jra, Nothing)
|
||||||
|
|
||||||
|
it "streams in constant memory" $ \(_, baseUrl) -> do
|
||||||
|
Right (ResultStream res) <- runClient getGetALot baseUrl
|
||||||
|
let consumeNChunks n = replicateM_ n (res void)
|
||||||
|
consumeNChunks 900
|
||||||
|
memUsed <- currentBytesUsed <$> getGCStats
|
||||||
|
memUsed `shouldSatisfy` (< (megabytes 20))
|
||||||
|
|
||||||
|
megabytes :: Num a => a -> a
|
||||||
|
megabytes n = n * (1000 ^ 2)
|
||||||
|
|
Loading…
Reference in a new issue