Client-side support for server-sent events (SSE)

This commit is contained in:
Ole Krüger 2020-07-01 22:17:34 +01:00 committed by Julian Ospald
parent 7ef9730f77
commit 069503e941
No known key found for this signature in database
GPG Key ID: 3786C5262ECB4A3F
9 changed files with 553 additions and 2 deletions

View File

@ -41,6 +41,7 @@ library
Servant.Client.Core.Request
Servant.Client.Core.Response
Servant.Client.Core.RunClient
Servant.Client.Core.ServerSentEvents
other-modules:
Servant.Client.Core.Internal
@ -50,7 +51,8 @@ library
--
-- note: mtl lower bound is so low because of GHC-7.8
build-depends:
base >= 4.9 && < 4.16
attoparsec >= 0.13.2.2 && < 0.15
, base >= 4.9 && < 4.16
, bytestring >= 0.10.8.1 && < 0.12
, constraints >= 0.2 && < 0.14
, containers >= 0.5.7.1 && < 0.7
@ -94,11 +96,15 @@ test-suite spec
other-modules:
Servant.Client.Core.Internal.BaseUrlSpec
Servant.Client.Core.RequestSpec
Servant.Client.Core.ServerSentEventsSpec
-- Dependencies inherited from the library. No need to specify bounds.
build-depends:
base
, base-compat
, bytestring
, transformers
, servant
, servant-client-core
-- Additional dependencies

View File

@ -85,7 +85,7 @@ import Servant.API.Generic
(GenericMode(..), ToServant, ToServantApi
, GenericServant, toServant, fromServant)
import Servant.API.ContentTypes
(contentTypes, AllMime (allMime), AllMimeUnrender (allMimeUnrender))
(contentTypes, AllMime (allMime), AllMimeUnrender (allMimeUnrender), EventStream)
import Servant.API.Status
(statusFromNat)
import Servant.API.TypeLevel (FragmentUnique, AtLeastOneFragment)
@ -94,6 +94,10 @@ import Servant.API.Modifiers
import Servant.API.TypeErrors
import Servant.API.UVerb
(HasStatus, HasStatuses (Statuses, statuses), UVerb, Union, Unique, inject, statusOf, foldMapUnion, matchUnion)
import Servant.API.ServerSentEvents
(EventKind (JsonEvent, RawEvent), ServerSentEvents')
import Servant.API.Stream
(NoFraming)
import Servant.Client.Core.Auth
import Servant.Client.Core.BasicAuth
@ -101,6 +105,7 @@ import Servant.Client.Core.ClientError
import Servant.Client.Core.Request
import Servant.Client.Core.Response
import Servant.Client.Core.RunClient
import Servant.Client.Core.ServerSentEvents
-- * Accessing APIs as a Client
@ -462,6 +467,63 @@ instance {-# OVERLAPPING #-}
, requestMethod = reflectMethod (Proxy :: Proxy method)
}
type SseClientDelegate method status =
Stream method status NoFraming EventStream
instance
( RunClient m
, HasClient m (SseClientDelegate method status (EventMessageStreamT IO))
)
=> HasClient m (ServerSentEvents' method status 'RawEvent EventMessage) where
type Client m (ServerSentEvents' method status 'RawEvent EventMessage) =
Client m (SseClientDelegate method status (EventMessageStreamT IO))
hoistClientMonad p _ =
hoistClientMonad
p
(Proxy :: Proxy (SseClientDelegate method status (EventMessageStreamT IO)))
clientWithRoute p _ =
clientWithRoute
p
(Proxy :: Proxy (SseClientDelegate method status (EventMessageStreamT IO)))
instance
( RunClient m
, HasClient m (SseClientDelegate method status (EventStreamT IO))
)
=> HasClient m (ServerSentEvents' method status 'RawEvent (Event a)) where
type Client m (ServerSentEvents' method status 'RawEvent (Event a)) =
Client m (SseClientDelegate method status (EventStreamT IO))
hoistClientMonad p _ =
hoistClientMonad
p
(Proxy :: Proxy (SseClientDelegate method status (EventStreamT IO)))
clientWithRoute p _ =
clientWithRoute
p
(Proxy :: Proxy (SseClientDelegate method status (EventStreamT IO)))
instance
( RunClient m
, HasClient m (SseClientDelegate method status (JsonEventStreamT IO a))
)
=> HasClient m (ServerSentEvents' method status 'JsonEvent a) where
type Client m (ServerSentEvents' method status 'JsonEvent a) =
Client m (SseClientDelegate method status (JsonEventStreamT IO a))
hoistClientMonad p _ =
hoistClientMonad
p
(Proxy :: Proxy (SseClientDelegate method status (JsonEventStreamT IO a)))
clientWithRoute p _ =
clientWithRoute
p
(Proxy :: Proxy (SseClientDelegate method status (JsonEventStreamT IO a)))
-- | If you use a 'Header' in one of your endpoints in your API,
-- the corresponding querying function will automatically take
-- an additional argument of the type specified by your 'Header',

View File

@ -0,0 +1,306 @@
{-# LANGUAGE DeriveFunctor #-}
{-# LANGUAGE DeriveGeneric #-}
{-# LANGUAGE GeneralizedNewtypeDeriving #-}
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE ScopedTypeVariables #-}
-- | Server-sent events
--
-- See <https://www.w3.org/TR/2009/WD-eventsource-20090421/> for more details
-- on server-sent events (SSE).
--
module Servant.Client.Core.ServerSentEvents (
EventMessage (..),
EventIgnoreReason (..),
Event (..),
EventStreamT (..),
JsonEventStreamT (..),
EventMessageStreamT (..)
) where
import Control.Applicative
(Alternative ((<|>)))
import Control.Monad.IO.Class
(MonadIO)
import qualified Data.Aeson as Aeson
import qualified Data.Attoparsec.ByteString as Attoparsec
import qualified Data.ByteString as ByteString
import qualified Data.ByteString.Char8 as ByteString.Char8
import qualified Data.ByteString.Lazy as ByteString.Lazy
import Data.Char
(chr)
import Data.Coerce
(coerce)
import Data.Foldable
(traverse_)
import Data.Functor
(void)
import qualified Data.Text as Text
import Data.Text.Encoding
(encodeUtf8)
import GHC.Generics
(Generic)
import Numeric.Natural
(Natural)
import Servant.API.ContentTypes
(EventStreamChunk (..))
import Servant.API.Stream
(FromSourceIO (..))
import Servant.Types.SourceT
(SourceT, StepT (..), foreachYieldStep, mapStepT,
transformStepWithAtto)
-- | Line (or frame) of an event stream
newtype EventStreamLine = EventStreamLine
{ unEventStreamLine :: ByteString.ByteString }
deriving Show
-- | Consume chunks to produce event stream lines.
eventLinesFromRawChunks
:: Monad m
=> StepT m ByteString.ByteString
-> StepT m EventStreamLine
eventLinesFromRawChunks =
transformStepWithAtto eventLine
-- | Consume event stream chunks to produce event stream lines.
eventLinesFromChunks
:: Monad m
=> StepT m EventStreamChunk
-> StepT m EventStreamLine
eventLinesFromChunks =
-- 'coerce' efficiently unpacks the 'EventStreamChunk'
eventLinesFromRawChunks . fmap (coerce ByteString.Lazy.toStrict)
-- | Apply a 'Attoparsec.Parser' to each line of the event stream individually.
parseEventLines
:: Monad m
=> Attoparsec.Parser a
-> StepT m EventStreamLine
-> StepT m a
parseEventLines parser =
foreachYieldStep $ \(EventStreamLine line) next ->
case Attoparsec.parseOnly parser line of
Left err -> Error err
Right value -> Yield value next
-- | A line of an event stream
eventLine :: Attoparsec.Parser EventStreamLine
eventLine = do
Attoparsec.option () byteOrderMark -- A line may be prefixed with a byte order mark
EventStreamLine <$> untilLineEnd <* lineEnd
-- | Byte order mark (U+FEFF) in UTF-8 representation
byteOrderMark :: Attoparsec.Parser ()
byteOrderMark =
traverse_ Attoparsec.word8
$ ByteString.unpack
$ encodeUtf8
$ Text.singleton
$ chr 0xFEFF
-- | Event stream line ending
lineEnd :: Attoparsec.Parser ()
lineEnd =
(cr >> lf) <|> cr <|> lf <|> Attoparsec.endOfInput
where
cr = void (Attoparsec.word8 0x0D)
lf = void (Attoparsec.word8 0x0A)
-- | Consume all contents until the end of the line.
untilLineEnd :: Attoparsec.Parser ByteString.ByteString
untilLineEnd = Attoparsec.takeWhile (\w8 -> w8 /= 0x0D && w8 /= 0x0A)
-- | Structured variant of an event line of an event stream
data EventMessage
= EventDispatch
-- ^ Dispatch on the accumulated event.
| EventSetName ByteString.ByteString
-- ^ Set the name of the current event.
| EventSetLastId ByteString.ByteString
-- ^ Set the last event identifier.
| EventData ByteString.ByteString
-- ^ Append data to the event's data buffer.
| EventRetry Natural
-- ^ Set the event stream's reconnection time.
| EventIgnore EventIgnoreReason
-- ^ Ignored
deriving (Show, Eq, Ord)
-- | Reason why a event line can be ignored
data EventIgnoreReason
= EventFieldNameUnknown ByteString.ByteString
| EventRetryNonNumeric ByteString.ByteString
| EventComment ByteString.ByteString
deriving (Show, Eq, Ord)
-- | Parse the event stream lines into more structured messages.
eventMessagesFromLines
:: Monad m
=> StepT m EventStreamLine
-> StepT m EventMessage
eventMessagesFromLines =
ensureLastDispatch False . parseEventLines eventMessage
where
-- | Make sure the last event message is a dispatch.
ensureLastDispatch didDispatch step = case step of
Stop ->
if not didDispatch then Yield EventDispatch Stop else Stop
Yield other next ->
Yield other $ ensureLastDispatch (other == EventDispatch) next
Skip next ->
Skip $ ensureLastDispatch didDispatch next
Effect eff ->
Effect $ ensureLastDispatch didDispatch <$> eff
err@Error{} ->
err
-- | Event line parser for an event message.
eventMessage :: Attoparsec.Parser EventMessage
eventMessage =
ignore <|> field <|> dispatch
where
ignore = do
_ <- Attoparsec.word8 0x3A -- ':'
EventIgnore . EventComment <$> Attoparsec.takeByteString
dispatch = do
Attoparsec.endOfInput
pure EventDispatch
field = do
name <- Attoparsec.takeWhile1 (/= 0x3A) -- Up to ':' or the end
value <- Attoparsec.option ByteString.empty $ do
_ <- Attoparsec.word8 0x3A -- ':'
_ <- Attoparsec.option 0x20 $ Attoparsec.word8 0x20 -- Optional ' '
Attoparsec.takeByteString
pure $ case name of
"event" -> EventSetName value
"data" -> EventData value
"id" -> EventSetLastId value
"retry" ->
-- The retry value consist of digits.
if ByteString.all (\w8 -> 0x30 <= w8 && w8 <= 0x39) value then
EventRetry (read (ByteString.Char8.unpack value))
else
EventIgnore (EventRetryNonNumeric value)
_ -> EventIgnore (EventFieldNameUnknown name)
-- | Event sent by the remote
data Event a = Event
{ eventName :: Maybe ByteString.ByteString
, eventData :: a
}
deriving (Show, Eq, Ord, Functor, Generic)
-- | Accumulate event messages to build individual 'Event's.
eventsFromMessages
:: Functor m
=> StepT m EventMessage
-> StepT m (Event ByteString.ByteString)
eventsFromMessages =
initGo
where
initGo = go Nothing ByteString.Lazy.empty
combineData dataBuffer newData =
if ByteString.Lazy.null dataBuffer then
ByteString.Lazy.fromStrict newData
else
ByteString.Lazy.concat
[ dataBuffer
, ByteString.Lazy.singleton 0x0A -- Line feed
, ByteString.Lazy.fromStrict newData
]
go name dataBuffer step = case step of
Stop ->
Stop
Skip next ->
go name dataBuffer next
Effect eff ->
Effect (go name dataBuffer <$> eff)
Error err ->
Error err
Yield message next -> case message of
EventSetName newName ->
go (Just newName) dataBuffer next
EventData newData ->
go name (combineData dataBuffer newData) next
EventDispatch ->
Yield
(Event name (ByteString.Lazy.toStrict dataBuffer))
(initGo next)
_ ->
-- We ignore other message because they don't fit into
-- the 'Event' type. If a user needs more fine grained
-- control, the 'EventMessage' interface is better suited.
go name dataBuffer next
-- | Server-sent event stream (SSE)
--
-- See <https://www.w3.org/TR/2009/WD-eventsource-20090421/> for more details.
--
newtype EventMessageStreamT m = EventMessageStreamT
{ unEventMessageStreamT :: SourceT m EventMessage }
deriving (Show, Semigroup, Monoid)
-- | Server-sent event messages
--
-- 'EventMessage' gives you more control over the communication with the server
-- than 'Event'.
--
instance MonadIO m => FromSourceIO EventStreamChunk (EventMessageStreamT m) where
fromSourceIO =
EventMessageStreamT
. mapStepT (eventMessagesFromLines . eventLinesFromChunks)
. fromSourceIO
-- | Server-sent event stream (SSE)
--
-- See <https://www.w3.org/TR/2009/WD-eventsource-20090421/> for more details.
--
newtype EventStreamT m = EventStreamT
{ unEventStreamT :: SourceT m (Event ByteString.ByteString) }
deriving (Show, Semigroup, Monoid)
-- | Server-sent events
instance MonadIO m => FromSourceIO EventStreamChunk (EventStreamT m) where
fromSourceIO input =
-- 'coerce' is used in place of unpacking and repacking 'EventStreamT'
coerce
(mapStepT eventsFromMessages)
(fromSourceIO input :: EventMessageStreamT m)
-- | Try to parse event data to JSON.
jsonEventsFromEvents
:: (Functor m, Aeson.FromJSON a)
=> StepT m (Event ByteString.ByteString)
-> StepT m (Event a)
jsonEventsFromEvents =
foreachYieldStep $ \(Event name datas) next ->
either
Error
(\value -> Yield (Event name value) next)
(Aeson.eitherDecode (ByteString.Lazy.fromStrict datas))
-- | Server-sent event stream (SSE) for JSON values
newtype JsonEventStreamT m a = JsonEventStreamT
{ unJsonEventStreamT :: SourceT m (Event a) }
deriving (Show, Functor, Semigroup, Monoid)
-- | Server-sent JSON event stream
instance (MonadIO m, Aeson.FromJSON a) => FromSourceIO EventStreamChunk (JsonEventStreamT m a) where
fromSourceIO input =
-- The 'coerce' efficiently unwraps the 'EventStreamT' and wraps the
-- JsonEventStreamT.
coerce
(mapStepT jsonEventsFromEvents)
(fromSourceIO input :: EventStreamT m)

View File

@ -0,0 +1,102 @@
{-# LANGUAGE OverloadedStrings #-}
module Servant.Client.Core.ServerSentEventsSpec (spec) where
import Control.Monad.Trans.Except
(runExceptT)
import qualified Data.ByteString.Lazy as ByteString
import Data.Foldable
(for_)
import Data.Int
(Int64)
import Servant.API.ContentTypes
(EventStreamChunk (..))
import Servant.API.Stream
(FromSourceIO (fromSourceIO))
import Servant.Client.Core.ServerSentEvents
(Event (..), EventIgnoreReason (EventComment),
EventMessage (..), unEventMessageStreamT, unEventStreamT)
import Servant.Types.SourceT
(runSourceT, source)
import Test.Hspec
(Spec, describe, it, shouldBe)
spec :: Spec
spec = describe "Servant.Client.Core.ServerSentEvent" $ do
describe "EventMessageStreamT" $ do
it "processes chunks correctly" $ do
let allMessages = ByteString.intercalate "\n"
[ "retry: 30"
, "data: Hello World"
, "id: 1"
, ""
, "event: my_event"
, "data"
, "id: 2"
, ":Just a comment"
, ""
, "data: Bye"
]
for_ [1, 10, 100] $ \chunkSize -> do
result <-
runExceptT
$ runSourceT
$ unEventMessageStreamT
$ fromSourceIO
$ source
$ map EventStreamChunk
$ chunkify chunkSize allMessages
result `shouldBe` Right
[ EventRetry 30
, EventData "Hello World"
, EventSetLastId "1"
, EventDispatch
, EventSetName "my_event"
, EventData ""
, EventSetLastId "2"
, EventIgnore (EventComment "Just a comment")
, EventDispatch
, EventData "Bye"
, EventDispatch
]
describe "EventStreamT" $ do
it "processes chunks correctly" $ do
let allMessages = ByteString.intercalate "\n"
[ "retry: 30"
, "data: Hello World"
, "id: 1"
, ""
, "event: my_event"
, "data"
, "id: 2"
, ":Just a comment"
, ""
, "data: Bye"
]
for_ [1, 10, 100] $ \chunkSize -> do
result <-
runExceptT
$ runSourceT
$ unEventStreamT
$ fromSourceIO
$ source
$ map EventStreamChunk
$ chunkify chunkSize allMessages
result `shouldBe` Right
[ Event Nothing "Hello World"
, Event (Just "my_event") ""
, Event Nothing "Bye"
]
chunkify :: Int64 -> ByteString.ByteString -> [ByteString.ByteString]
chunkify chunkSize input =
if ByteString.null input then
[]
else
let (h, t) = ByteString.splitAt chunkSize input
in h : chunkify chunkSize t

View File

@ -52,6 +52,7 @@ library
Servant.API.RemoteHost
Servant.API.ReqBody
Servant.API.ResponseHeaders
Servant.API.ServerSentEvents
Servant.API.Status
Servant.API.Stream
Servant.API.Sub

View File

@ -42,6 +42,9 @@ module Servant.API (
-- * Streaming endpoints, distinguished by HTTP method
module Servant.API.Stream,
-- * Server-sent events (SSE)
module Servant.API.ServerSentEvents,
-- * Authentication
module Servant.API.BasicAuth,
@ -121,6 +124,8 @@ import Servant.API.ResponseHeaders
GetHeaders (getHeaders), HList (..), HasResponseHeader,
Headers (..), ResponseHeader (..), addHeader, getHeadersHList,
getResponse, lookupResponseHeader, noHeader)
import Servant.API.ServerSentEvents
(EventKind (..), ServerSentEvents, ServerSentEvents')
import Servant.API.Stream
(FramingRender (..), FramingUnrender (..), FromSourceIO (..),
NetstringFraming, NewlineFraming, NoFraming, SourceIO, Stream,

View File

@ -49,6 +49,7 @@ module Servant.API.ContentTypes
, PlainText
, FormUrlEncoded
, OctetStream
, EventStream
-- * Building your own Content-Type
, Accept(..)
@ -67,6 +68,7 @@ module Servant.API.ContentTypes
, AllMimeUnrender(..)
, eitherDecodeLenient
, canHandleAcceptH
, EventStreamChunk(..)
) where
import Control.Arrow
@ -110,6 +112,7 @@ data JSON deriving Typeable
data PlainText deriving Typeable
data FormUrlEncoded deriving Typeable
data OctetStream deriving Typeable
data EventStream deriving Typeable
-- * Accept class
@ -153,6 +156,10 @@ instance Accept PlainText where
instance Accept OctetStream where
contentType _ = "application" M.// "octet-stream"
-- | @text/event-stream@
instance Accept EventStream where
contentType _ = "text" M.// "event-stream"
newtype AcceptHeader = AcceptHeader BS.ByteString
deriving (Eq, Show, Read, Typeable, Generic)
@ -419,6 +426,12 @@ instance MimeUnrender OctetStream ByteString where
instance MimeUnrender OctetStream BS.ByteString where
mimeUnrender _ = Right . toStrict
-- | Chunk of an event stream
newtype EventStreamChunk = EventStreamChunk
{ unEventStreamChunk :: ByteString }
instance MimeUnrender EventStream EventStreamChunk where
mimeUnrender _ = Right . EventStreamChunk
-- $setup
-- >>> :set -XFlexibleInstances

View File

@ -0,0 +1,40 @@
{-# LANGUAGE DataKinds #-}
{-# LANGUAGE DeriveGeneric #-}
{-# LANGUAGE PolyKinds #-}
-- | Server-sent events
--
-- See <https://www.w3.org/TR/2009/WD-eventsource-20090421/>.
--
module Servant.API.ServerSentEvents
( ServerSentEvents'
, ServerSentEvents
, EventKind (..)
)
where
import Data.Typeable
(Typeable)
import GHC.Generics
(Generic)
import GHC.TypeLits
(Nat)
import Network.HTTP.Types
(StdMethod (GET))
-- | Determines the shape of events you may receive (i.e. the @a@ in
-- 'ServerSentEvents\'')
data EventKind
= RawEvent
-- ^ 'EventMessage' or 'Event' 'ByteString'
| JsonEvent
-- ^ Anything that implements 'FromJSON'
-- | Server-sent events (SSE)
--
-- See <https://www.w3.org/TR/2009/WD-eventsource-20090421/>.
--
data ServerSentEvents' (method :: k) (status :: Nat) (kind :: EventKind) (a :: *)
deriving (Typeable, Generic)
type ServerSentEvents = ServerSentEvents' 'GET 200

View File

@ -296,6 +296,22 @@ foreachStep f g = go where
go (Error err) = f err
go (Effect ms) = ms >>= go
-- | Traverse the 'StepT' and call the given function for each 'Yield'.
foreachYieldStep
:: Functor m
=> (a -> StepT m b -> StepT m b)
-> StepT m a
-> StepT m b
foreachYieldStep f =
go
where
go step = case step of
Error msg -> Error msg
Stop -> Stop
Skip next -> Skip (go next)
Yield val next -> f val (go next)
Effect eff -> Effect (go <$> eff)
-------------------------------------------------------------------------------
-- Monadic
-------------------------------------------------------------------------------