2017-11-04 05:10:29 +01:00
|
|
|
{-# LANGUAGE CPP #-}
|
|
|
|
{-# LANGUAGE ConstraintKinds #-}
|
|
|
|
{-# LANGUAGE DataKinds #-}
|
|
|
|
{-# LANGUAGE DeriveGeneric #-}
|
|
|
|
{-# LANGUAGE FlexibleContexts #-}
|
|
|
|
{-# LANGUAGE FlexibleInstances #-}
|
|
|
|
{-# LANGUAGE FunctionalDependencies #-}
|
|
|
|
{-# LANGUAGE GADTs #-}
|
|
|
|
{-# LANGUAGE MultiParamTypeClasses #-}
|
|
|
|
{-# LANGUAGE OverloadedStrings #-}
|
|
|
|
{-# LANGUAGE PolyKinds #-}
|
|
|
|
{-# LANGUAGE RecordWildCards #-}
|
|
|
|
{-# LANGUAGE ScopedTypeVariables #-}
|
|
|
|
{-# LANGUAGE StandaloneDeriving #-}
|
|
|
|
{-# LANGUAGE TypeFamilies #-}
|
|
|
|
{-# LANGUAGE TypeOperators #-}
|
|
|
|
{-# LANGUAGE UndecidableInstances #-}
|
|
|
|
{-# OPTIONS_GHC -freduction-depth=100 #-}
|
|
|
|
{-# OPTIONS_GHC -fno-warn-orphans #-}
|
|
|
|
{-# OPTIONS_GHC -fno-warn-name-shadowing #-}
|
|
|
|
|
|
|
|
module Servant.StreamSpec (spec) where
|
|
|
|
|
2018-06-29 21:08:26 +02:00
|
|
|
import Control.Monad
|
2018-06-26 19:11:28 +02:00
|
|
|
(when)
|
|
|
|
import Control.Monad.Codensity
|
|
|
|
(Codensity (..))
|
|
|
|
import Control.Monad.IO.Class
|
|
|
|
(MonadIO (..))
|
|
|
|
import Control.Monad.Trans.Except
|
2018-11-01 18:42:30 +01:00
|
|
|
import qualified Data.ByteString as BS
|
2017-11-04 05:10:29 +01:00
|
|
|
import Data.Proxy
|
2018-11-01 18:42:30 +01:00
|
|
|
import qualified Data.TDigest as TD
|
|
|
|
import qualified Network.HTTP.Client as C
|
2018-06-29 21:08:26 +02:00
|
|
|
import Prelude ()
|
2018-02-27 15:31:41 +01:00
|
|
|
import Prelude.Compat
|
2018-06-29 21:08:26 +02:00
|
|
|
import Servant.API
|
2019-01-31 10:18:22 +01:00
|
|
|
((:<|>) ((:<|>)), (:>), JSON, NetstringFraming, StreamBody,
|
|
|
|
NewlineFraming, NoFraming, OctetStream, SourceIO, StreamGet,
|
|
|
|
)
|
2018-11-01 18:42:30 +01:00
|
|
|
import Servant.Client.Streaming
|
2018-06-29 21:08:26 +02:00
|
|
|
import Servant.ClientSpec
|
|
|
|
(Person (..))
|
2018-11-01 18:42:30 +01:00
|
|
|
import qualified Servant.ClientSpec as CS
|
2017-11-04 05:10:29 +01:00
|
|
|
import Servant.Server
|
2018-11-01 18:42:30 +01:00
|
|
|
import Servant.Test.ComprehensiveAPI
|
2018-06-26 19:11:28 +02:00
|
|
|
import Servant.Types.SourceT
|
|
|
|
import System.Entropy
|
|
|
|
(getEntropy, getHardwareEntropy)
|
|
|
|
import System.IO.Unsafe
|
|
|
|
(unsafePerformIO)
|
|
|
|
import System.Mem
|
|
|
|
(performGC)
|
|
|
|
import Test.Hspec
|
2017-11-04 05:10:29 +01:00
|
|
|
|
2018-03-15 09:46:30 +01:00
|
|
|
#if MIN_VERSION_base(4,10,0)
|
2018-06-29 21:08:26 +02:00
|
|
|
import GHC.Stats
|
2018-06-26 19:11:28 +02:00
|
|
|
(gc, gcdetails_live_bytes, getRTSStats)
|
2018-03-15 09:46:30 +01:00
|
|
|
#else
|
2018-06-29 21:08:26 +02:00
|
|
|
import GHC.Stats
|
|
|
|
(currentBytesUsed, getGCStats)
|
2018-03-15 09:46:30 +01:00
|
|
|
#endif
|
2017-11-04 05:10:29 +01:00
|
|
|
|
2018-11-01 18:42:30 +01:00
|
|
|
-- This declaration simply checks that all instances are in place.
|
|
|
|
-- Note: this is streaming client
|
|
|
|
_ = client comprehensiveAPI
|
|
|
|
|
2017-11-04 05:10:29 +01:00
|
|
|
spec :: Spec
|
2018-11-01 18:42:30 +01:00
|
|
|
spec = describe "Servant.Client.Streaming" $ do
|
2017-11-04 05:10:29 +01:00
|
|
|
streamSpec
|
|
|
|
|
2018-06-26 19:11:28 +02:00
|
|
|
type StreamApi =
|
|
|
|
"streamGetNewline" :> StreamGet NewlineFraming JSON (SourceIO Person)
|
|
|
|
:<|> "streamGetNetstring" :> StreamGet NetstringFraming JSON (SourceIO Person)
|
|
|
|
:<|> "streamALot" :> StreamGet NoFraming OctetStream (SourceIO BS.ByteString)
|
2019-01-31 10:18:22 +01:00
|
|
|
:<|> "streamBody" :> StreamBody NoFraming OctetStream (SourceIO BS.ByteString) :> StreamGet NoFraming OctetStream (SourceIO BS.ByteString)
|
2017-11-04 05:10:29 +01:00
|
|
|
|
2018-06-26 19:11:28 +02:00
|
|
|
api :: Proxy StreamApi
|
|
|
|
api = Proxy
|
2017-11-04 05:10:29 +01:00
|
|
|
|
2018-11-01 18:42:30 +01:00
|
|
|
getGetNL, getGetNS :: ClientM (SourceIO Person)
|
|
|
|
getGetALot :: ClientM (SourceIO BS.ByteString)
|
2019-01-31 10:18:22 +01:00
|
|
|
getStreamBody :: SourceT IO BS.ByteString -> ClientM (SourceIO BS.ByteString)
|
|
|
|
getGetNL :<|> getGetNS :<|> getGetALot :<|> getStreamBody = client api
|
2017-11-04 05:10:29 +01:00
|
|
|
|
|
|
|
alice :: Person
|
|
|
|
alice = Person "Alice" 42
|
|
|
|
|
|
|
|
bob :: Person
|
|
|
|
bob = Person "Bob" 25
|
|
|
|
|
|
|
|
server :: Application
|
2018-06-26 19:11:28 +02:00
|
|
|
server = serve api
|
|
|
|
$ return (source [alice, bob, alice])
|
|
|
|
:<|> return (source [alice, bob, alice])
|
|
|
|
-- 2 ^ (18 + 10) = 256M
|
|
|
|
:<|> return (SourceT ($ lots (powerOfTwo 18)))
|
2019-01-31 10:18:22 +01:00
|
|
|
:<|> return
|
2018-06-26 19:11:28 +02:00
|
|
|
where
|
|
|
|
lots n
|
|
|
|
| n < 0 = Stop
|
|
|
|
| otherwise = Effect $ do
|
|
|
|
let size = powerOfTwo 10
|
|
|
|
mbs <- getHardwareEntropy size
|
|
|
|
bs <- maybe (getEntropy size) pure mbs
|
|
|
|
return (Yield bs (lots (n - 1)))
|
2017-11-04 05:10:29 +01:00
|
|
|
|
2018-06-26 19:11:28 +02:00
|
|
|
powerOfTwo :: Int -> Int
|
|
|
|
powerOfTwo = (2 ^)
|
2017-11-04 05:10:29 +01:00
|
|
|
|
|
|
|
{-# NOINLINE manager' #-}
|
|
|
|
manager' :: C.Manager
|
|
|
|
manager' = unsafePerformIO $ C.newManager C.defaultManagerSettings
|
|
|
|
|
2018-11-01 18:42:30 +01:00
|
|
|
withClient :: ClientM a -> BaseUrl -> (Either ServantError a -> IO r) -> IO r
|
|
|
|
withClient x baseUrl' = withClientM x (mkClientEnv manager' baseUrl')
|
2017-11-04 05:10:29 +01:00
|
|
|
|
2018-11-01 18:42:30 +01:00
|
|
|
testRunSourceIO :: SourceIO a
|
2018-06-26 19:11:28 +02:00
|
|
|
-> IO (Either String [a])
|
2018-11-01 18:42:30 +01:00
|
|
|
testRunSourceIO = runExceptT . runSourceT
|
2017-11-04 05:10:29 +01:00
|
|
|
|
|
|
|
streamSpec :: Spec
|
|
|
|
streamSpec = beforeAll (CS.startWaiApp server) $ afterAll CS.endWaiApp $ do
|
2018-02-27 15:31:41 +01:00
|
|
|
it "works with Servant.API.StreamGet.Newline" $ \(_, baseUrl) -> do
|
2018-11-01 18:42:30 +01:00
|
|
|
withClient getGetNL baseUrl $ \(Right res) ->
|
|
|
|
testRunSourceIO res `shouldReturn` Right [alice, bob, alice]
|
2017-11-04 05:10:29 +01:00
|
|
|
|
2018-02-27 15:31:41 +01:00
|
|
|
it "works with Servant.API.StreamGet.Netstring" $ \(_, baseUrl) -> do
|
2018-11-01 18:42:30 +01:00
|
|
|
withClient getGetNS baseUrl $ \(Right res) ->
|
|
|
|
testRunSourceIO res `shouldReturn` Right [alice, bob, alice]
|
2018-02-27 15:31:41 +01:00
|
|
|
|
2019-01-31 10:18:22 +01:00
|
|
|
it "works with Servant.API.StreamBody" $ \(_, baseUrl) -> do
|
2019-02-02 11:44:35 +01:00
|
|
|
withClient (getStreamBody (source input)) baseUrl $ \(Right res) ->
|
|
|
|
testRunSourceIO res `shouldReturn` Right output
|
2019-01-31 10:18:22 +01:00
|
|
|
where
|
2019-02-02 11:44:35 +01:00
|
|
|
input = ["foo", "", "bar"]
|
|
|
|
output = ["foo", "bar"]
|
2019-01-31 10:18:22 +01:00
|
|
|
|
2018-07-04 23:27:57 +02:00
|
|
|
{-
|
2018-02-27 15:31:41 +01:00
|
|
|
it "streams in constant memory" $ \(_, baseUrl) -> do
|
2018-06-26 19:11:28 +02:00
|
|
|
Right rs <- runClient getGetALot baseUrl
|
|
|
|
performGC
|
|
|
|
-- usage0 <- getUsage
|
|
|
|
-- putStrLn $ "Start: " ++ show usage0
|
|
|
|
tdigest <- memoryUsage $ joinCodensitySourceT rs
|
|
|
|
|
|
|
|
-- putStrLn $ "Median: " ++ show (TD.median tdigest)
|
|
|
|
-- putStrLn $ "Mean: " ++ show (TD.mean tdigest)
|
|
|
|
-- putStrLn $ "Stddev: " ++ show (TD.stddev tdigest)
|
|
|
|
|
|
|
|
-- forM_ [0.01, 0.1, 0.2, 0.5, 0.8, 0.9, 0.99] $ \q ->
|
|
|
|
-- putStrLn $ "q" ++ show q ++ ": " ++ show (TD.quantile q tdigest)
|
|
|
|
|
|
|
|
let Just stddev = TD.stddev tdigest
|
|
|
|
|
|
|
|
-- standard deviation of 100k is ok, we generate 256M of data after all.
|
|
|
|
-- On my machine deviation is 40k-50k
|
|
|
|
stddev `shouldSatisfy` (< 100000)
|
|
|
|
|
|
|
|
memoryUsage :: SourceT IO BS.ByteString -> IO (TD.TDigest 25)
|
|
|
|
memoryUsage src = unSourceT src $ loop mempty (0 :: Int)
|
|
|
|
where
|
|
|
|
loop !acc !_ Stop = return acc
|
|
|
|
loop !_ !_ (Error err) = fail err -- !
|
|
|
|
loop !acc !n (Skip s) = loop acc n s
|
|
|
|
loop !acc !n (Effect ms) = ms >>= loop acc n
|
|
|
|
loop !acc !n (Yield _bs s) = do
|
|
|
|
usage <- liftIO getUsage
|
|
|
|
-- We perform GC in between as we generate garbage.
|
|
|
|
when (n `mod` 1024 == 0) $ liftIO performGC
|
|
|
|
loop (TD.insert usage acc) (n + 1) s
|
|
|
|
|
|
|
|
getUsage :: IO Double
|
|
|
|
getUsage = fromIntegral .
|
2018-03-15 09:46:30 +01:00
|
|
|
#if MIN_VERSION_base(4,10,0)
|
2018-06-26 19:11:28 +02:00
|
|
|
gcdetails_live_bytes . gc <$> getRTSStats
|
2018-03-15 09:46:30 +01:00
|
|
|
#else
|
2018-06-26 19:11:28 +02:00
|
|
|
currentBytesUsed <$> getGCStats
|
2018-03-15 09:46:30 +01:00
|
|
|
#endif
|
|
|
|
memUsed `shouldSatisfy` (< megabytes 22)
|
2018-02-27 15:31:41 +01:00
|
|
|
|
|
|
|
megabytes :: Num a => a -> a
|
2018-03-15 09:46:30 +01:00
|
|
|
megabytes n = n * (1000 ^ (2 :: Int))
|
2018-06-26 19:11:28 +02:00
|
|
|
-}
|