From 90a527f4e0f624644141a6b762fa9eebb777c44f Mon Sep 17 00:00:00 2001 From: Connor Clark Date: Fri, 13 May 2016 09:12:37 -0700 Subject: [PATCH] Finish up bindings to most core gRPC functions (#1) * grpc_server_request_call * basic slice functionality * rename function to emphasize side effects * add docs * ByteBuffer function bindings * replace unsafeCoerce with more specific function, add docs, tests. * add newtypes for Tag and Reserved void pointers * manually fix request_registered_call binding * use nocode keyword to fix Ptr () problems * decouple copying Slice from freeing slice * Add time ops * remove nocode decls * Start Op module, fix c2hs preprocessing order * metadata manipulation operations * metadata free function, test * helper functions for constructing ops of each type * bindings for op creation functions * finish up Op creation functions, implement Op destruction, add docs. * tweak documentation * rework Op creation functions to work with an array of ops, for ease of use with grpc_call_start_batch * forgot to change return types * wrap hook lines, fix types to op creation functions * implement part of the payload test * hideous, but working, end to end test * bindings for connectivity state checks, split test into two threads * various cleanup * rename Core to Unsafe for emphasis, clean up tests more * add requested comment * remove slice_unref binding, use sliceFree when converting buffer to bytestring --- cbits/grpc_haskell.c | 293 ++++++++++++++++++ grpc-haskell.cabal | 30 +- include/grpc_haskell.h | 99 ++++++ src/Network/GRPC/Core.chs | 73 ----- src/Network/GRPC/Core/Time.chs | 24 -- src/Network/GRPC/Unsafe.chs | 234 ++++++++++++++ src/Network/GRPC/Unsafe/ByteBuffer.chs | 85 +++++ .../GRPC/{Core => Unsafe}/Constants.hsc | 8 +- src/Network/GRPC/Unsafe/Metadata.chs | 70 +++++ src/Network/GRPC/Unsafe/Op.chs | 102 ++++++ src/Network/GRPC/Unsafe/Slice.chs | 54 ++++ src/Network/GRPC/Unsafe/Time.chs | 48 +++ tests/Properties.hs | 217 ++++++++++++- 13 files changed, 1230 insertions(+), 107 deletions(-) delete mode 100644 src/Network/GRPC/Core.chs delete mode 100644 src/Network/GRPC/Core/Time.chs create mode 100644 src/Network/GRPC/Unsafe.chs create mode 100644 src/Network/GRPC/Unsafe/ByteBuffer.chs rename src/Network/GRPC/{Core => Unsafe}/Constants.hsc (81%) create mode 100644 src/Network/GRPC/Unsafe/Metadata.chs create mode 100644 src/Network/GRPC/Unsafe/Op.chs create mode 100644 src/Network/GRPC/Unsafe/Slice.chs create mode 100644 src/Network/GRPC/Unsafe/Time.chs diff --git a/cbits/grpc_haskell.c b/cbits/grpc_haskell.c index 9461af4..af429d6 100644 --- a/cbits/grpc_haskell.c +++ b/cbits/grpc_haskell.c @@ -1,7 +1,10 @@ #include +#include +#include #include #include #include +#include #include grpc_event *grpc_completion_queue_next_(grpc_completion_queue *cq, @@ -30,3 +33,293 @@ grpc_call *grpc_channel_create_call_(grpc_channel *channel, completion_queue, method, host, *deadline, reserved); } + +size_t gpr_slice_length_(gpr_slice *slice){ + return GPR_SLICE_LENGTH(*slice); +} + +uint8_t *gpr_slice_start_(gpr_slice *slice){ + return GPR_SLICE_START_PTR(*slice); +} + +gpr_slice* gpr_slice_from_copied_string_(const char *source){ + gpr_slice* retval = malloc(sizeof(gpr_slice)); + //note: 'gpr_slice_from_copied_string' handles allocating space for 'source'. + *retval = gpr_slice_from_copied_string(source); + return retval; +} + +void free_slice(gpr_slice *slice){ + gpr_slice_unref(*slice); + free(slice); +} + +grpc_byte_buffer **create_receiving_byte_buffer(){ + grpc_byte_buffer **retval = malloc(sizeof(grpc_byte_buffer*)); + *retval = NULL; + return retval; +} + +void destroy_receiving_byte_buffer(grpc_byte_buffer **bb){ + grpc_byte_buffer_destroy(*bb); + free(bb); +} + +grpc_byte_buffer_reader *byte_buffer_reader_create(grpc_byte_buffer *buffer){ + grpc_byte_buffer_reader *reader = malloc(sizeof(grpc_byte_buffer_reader)); + grpc_byte_buffer_reader_init(reader, buffer); + return reader; +} + +void byte_buffer_reader_destroy(grpc_byte_buffer_reader *reader){ + grpc_byte_buffer_reader_destroy(reader); + free(reader); +} + +gpr_slice *grpc_byte_buffer_reader_readall_(grpc_byte_buffer_reader *reader){ + gpr_slice *retval = malloc(sizeof(gpr_slice)); + *retval = grpc_byte_buffer_reader_readall(reader); + return retval; +} + +void timespec_destroy(gpr_timespec* t){ + free(t); +} + +gpr_timespec* gpr_inf_future_(gpr_clock_type t){ + gpr_timespec *retval = malloc(sizeof(gpr_timespec)); + *retval = gpr_inf_future(t); + return retval; +} + +gpr_timespec* gpr_now_(gpr_clock_type t){ + gpr_timespec *retval = malloc(sizeof(gpr_timespec)); + *retval = gpr_now(t); + return retval; +} + +int32_t gpr_time_to_millis_(gpr_timespec* t){ + return gpr_time_to_millis(*t); +} + +gpr_timespec* seconds_to_deadline(int64_t seconds){ + gpr_timespec *retval = malloc(sizeof(gpr_timespec)); + *retval = gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC), + gpr_time_from_millis(seconds * 1e3, GPR_TIMESPAN)); + return retval; +} + +gpr_timespec* millis_to_deadline(int64_t millis){ + gpr_timespec *retval = malloc(sizeof(gpr_timespec)); + *retval = gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC), + gpr_time_from_micros(millis * 1e3, GPR_TIMESPAN)); + return retval; +} + +grpc_metadata_array** metadata_array_create(){ + grpc_metadata_array **retval = malloc(sizeof(grpc_metadata_array*)); + *retval = malloc(sizeof(grpc_metadata_array)); + grpc_metadata_array_init(*retval); + return retval; +} + +void metadata_array_destroy(grpc_metadata_array **arr){ + grpc_metadata_array_destroy(*arr); + free(*arr); + free(arr); +} + +grpc_metadata* metadata_alloc(size_t n){ + grpc_metadata *retval = malloc(sizeof(grpc_metadata)*n); + return retval; +} + +void metadata_free(grpc_metadata* m){ + free(m); +} + +void set_metadata_key_val(char *key, char *val, grpc_metadata *arr, size_t i){ + grpc_metadata *p = arr + i; + p->key = key; + p->value = val; + p->value_length = strlen(val); +} + +const char* get_metadata_key(grpc_metadata *arr, size_t i){ + grpc_metadata *p = arr + i; + return p->key; +} + +const char* get_metadata_val(grpc_metadata *arr, size_t i){ + grpc_metadata *p = arr + i; + return p->value; +} + +grpc_op* op_array_create(size_t n){ + return malloc(n*sizeof(grpc_op)); +} + +void op_array_destroy(grpc_op* op_array, size_t n){ + for(int i = 0; i < n; i++){ + grpc_op* op = op_array + i; + switch (op->op) { + case GRPC_OP_SEND_INITIAL_METADATA: + if(op->data.send_initial_metadata.count > 0){ + metadata_free(op->data.send_initial_metadata.metadata); + } + break; + case GRPC_OP_SEND_MESSAGE: + grpc_byte_buffer_destroy(op->data.send_message); + break; + case GRPC_OP_SEND_CLOSE_FROM_CLIENT: + break; + case GRPC_OP_SEND_STATUS_FROM_SERVER: + free(op->data.send_status_from_server.trailing_metadata); + free(op->data.send_status_from_server.status_details); + break; + case GRPC_OP_RECV_INITIAL_METADATA: + break; + case GRPC_OP_RECV_MESSAGE: + break; + case GRPC_OP_RECV_STATUS_ON_CLIENT: + break; + case GRPC_OP_RECV_CLOSE_ON_SERVER: + break; + } + } + free(op_array); +} + +void op_send_initial_metadata(grpc_op *op_array, size_t i, + grpc_metadata *arr, size_t n_metadata){ + grpc_op *op = op_array + i; + op->op = GRPC_OP_SEND_INITIAL_METADATA; + op->data.send_initial_metadata.count = n_metadata; + op->data.send_initial_metadata.metadata + = malloc(n_metadata*sizeof(grpc_metadata)); + memcpy(op->data.send_initial_metadata.metadata, arr, + n_metadata*sizeof(grpc_metadata)); + op->flags = 0; + op->reserved = NULL; +} + +void op_send_initial_metadata_empty(grpc_op *op_array, size_t i){ + grpc_op *op = op_array + i; + op->op = GRPC_OP_SEND_INITIAL_METADATA; + op->data.send_initial_metadata.count = 0; + op->flags = 0; + op->reserved = NULL; +} + +void op_send_message(grpc_op *op_array, size_t i, + grpc_byte_buffer *payload){ + grpc_op *op = op_array + i; + op->op = GRPC_OP_SEND_MESSAGE; + op->data.send_message = grpc_byte_buffer_copy(payload); + op->flags = 0; + op->reserved = NULL; +} + +void op_send_close_client(grpc_op *op_array, size_t i){ + grpc_op *op = op_array + i; + op->op = GRPC_OP_SEND_CLOSE_FROM_CLIENT; + op->flags = 0; + op->reserved = NULL; +} + +void op_recv_initial_metadata(grpc_op *op_array, size_t i, + grpc_metadata_array** arr){ + grpc_op *op = op_array + i; + op->op = GRPC_OP_RECV_INITIAL_METADATA; + op->data.recv_initial_metadata = *arr; + op->flags = 0; + op->reserved = NULL; +} + +void op_recv_message(grpc_op *op_array, size_t i, + grpc_byte_buffer **payload_recv){ + grpc_op *op = op_array + i; + op->op = GRPC_OP_RECV_MESSAGE; + op->data.recv_message = payload_recv; + op->flags = 0; + op->reserved = NULL; +} + +void op_recv_status_client(grpc_op *op_array, size_t i, + grpc_metadata_array** arr, + grpc_status_code* status, + char **details, size_t* details_capacity){ + grpc_op *op = op_array + i; + op->op = GRPC_OP_RECV_STATUS_ON_CLIENT; + op->data.recv_status_on_client.trailing_metadata = *arr; + op->data.recv_status_on_client.status = status; + op->data.recv_status_on_client.status_details = details; + op->data.recv_status_on_client.status_details_capacity = details_capacity; + op->flags = 0; + op->reserved = NULL; +} + +void op_recv_close_server(grpc_op *op_array, size_t i, int *was_cancelled){ + grpc_op *op = op_array + i; + op->op = GRPC_OP_RECV_CLOSE_ON_SERVER; + op->data.recv_close_on_server.cancelled = was_cancelled; + op->flags = 0; + op->reserved = NULL; +} + +void op_send_status_server(grpc_op *op_array, size_t i, + size_t metadata_count, grpc_metadata* m, + grpc_status_code status, char *details){ + grpc_op *op = op_array + i; + op->op = GRPC_OP_SEND_STATUS_FROM_SERVER; + op->data.send_status_from_server.trailing_metadata_count = metadata_count; + op->data.send_status_from_server.trailing_metadata + = malloc(sizeof(grpc_metadata)*metadata_count); + memcpy(op->data.send_status_from_server.trailing_metadata, m, + metadata_count*sizeof(grpc_metadata)); + op->data.send_status_from_server.status = status; + op->data.send_status_from_server.status_details + = malloc(sizeof(char)*(strlen(details) + 1)); + strcpy(op->data.send_status_from_server.status_details, details); + op->flags = 0; + op->reserved = NULL; +} + +void op_send_ok_status_server(grpc_op *op_array, size_t i){ + grpc_op *op = op_array + i; + op->op = GRPC_OP_SEND_STATUS_FROM_SERVER; + op->data.send_status_from_server.trailing_metadata_count = 0; + op->data.send_status_from_server.status = GRPC_STATUS_OK; + op->data.send_status_from_server.status_details = "OK"; + op->flags = 0; + op->reserved = NULL; +} + +grpc_status_code* create_status_code_ptr(){ + return malloc(sizeof(grpc_status_code)); +} + +void destroy_status_code_ptr(grpc_status_code* p){ + free(p); +} + +grpc_call_details* create_call_details(){ + grpc_call_details* retval = malloc(sizeof(grpc_call_details)); + grpc_call_details_init(retval); + return retval; +} + +void destroy_call_details(grpc_call_details* cd){ + grpc_call_details_destroy(cd); + free(cd); +} + +void grpc_channel_watch_connectivity_state_(grpc_channel *channel, + grpc_connectivity_state + last_observed_state, + gpr_timespec* deadline, + grpc_completion_queue *cq, + void *tag){ + grpc_channel_watch_connectivity_state(channel, last_observed_state, *deadline, + cq, tag); +} diff --git a/grpc-haskell.cabal b/grpc-haskell.cabal index 10d0b65..3f4c8a9 100644 --- a/grpc-haskell.cabal +++ b/grpc-haskell.cabal @@ -15,14 +15,20 @@ extra-source-files: cbits, include library build-depends: - base - , clock + base ==4.8.* + , clock ==0.6.* + , bytestring ==0.10.* c-sources: cbits/grpc_haskell.c exposed-modules: - Network.GRPC.Core - Network.GRPC.Core.Constants - Network.GRPC.Core.Time + -- NOTE: the order of these matters to c2hs. + Network.GRPC.Unsafe.Constants + Network.GRPC.Unsafe.Time + Network.GRPC.Unsafe.Slice + Network.GRPC.Unsafe.ByteBuffer + Network.GRPC.Unsafe.Metadata + Network.GRPC.Unsafe.Op + Network.GRPC.Unsafe extra-libraries: grpc includes: @@ -30,6 +36,9 @@ library , grpc/grpc.h , grpc/status.h , grpc/support/time.h + , grpc/impl/codegen/compression_types.h + , grpc/impl/codegen/slice_buffer.h + , grpc/impl/codegen/slice.h build-tools: c2hs default-language: Haskell2010 ghc-options: -Wall -fwarn-incomplete-patterns @@ -38,11 +47,16 @@ library test-suite test build-depends: - base - , QuickCheck + base ==4.8.* , grpc-haskell + , bytestring ==0.10.* + , unix + , time + , async + , tasty >= 0.11 && <0.12 + , tasty-hunit >= 0.9 && <0.10 default-language: Haskell2010 - ghc-options: -Wall -fwarn-incomplete-patterns -O2 -threaded -rtsopts + ghc-options: -Wall -fwarn-incomplete-patterns -g -threaded hs-source-dirs: tests main-is: Properties.hs type: exitcode-stdio-1.0 diff --git a/include/grpc_haskell.h b/include/grpc_haskell.h index 034abad..faadfbb 100644 --- a/include/grpc_haskell.h +++ b/include/grpc_haskell.h @@ -1,4 +1,11 @@ +#ifndef GRPC_HASKELL +#define GRPC_HASKELL + #include +#include +#include +#include +#include grpc_event *grpc_completion_queue_next_(grpc_completion_queue *cq, gpr_timespec *deadline, @@ -14,3 +21,95 @@ grpc_call *grpc_channel_create_call_(grpc_channel *channel, grpc_completion_queue *completion_queue, const char *method, const char *host, gpr_timespec *deadline, void *reserved); + +size_t gpr_slice_length_(gpr_slice *slice); + +uint8_t *gpr_slice_start_(gpr_slice *slice); + +gpr_slice* gpr_slice_from_copied_string_(const char *source); + +void free_slice(gpr_slice *slice); + +grpc_byte_buffer **create_receiving_byte_buffer(); + +void destroy_receiving_byte_buffer(grpc_byte_buffer **bb); + +grpc_byte_buffer_reader *byte_buffer_reader_create(grpc_byte_buffer *buffer); + +void byte_buffer_reader_destroy(grpc_byte_buffer_reader *reader); + +gpr_slice* grpc_byte_buffer_reader_readall_(grpc_byte_buffer_reader *reader); + +void timespec_destroy(gpr_timespec* t); + +gpr_timespec* gpr_inf_future_(gpr_clock_type t); + +gpr_timespec* gpr_now_(gpr_clock_type t); + +int32_t gpr_time_to_millis_(gpr_timespec* t); + +gpr_timespec* seconds_to_deadline(int64_t seconds); + +gpr_timespec* millis_to_deadline(int64_t millis); + +grpc_metadata_array** metadata_array_create(); + +void metadata_array_destroy(grpc_metadata_array **arr); + +grpc_metadata* metadata_alloc(size_t n); + +void metadata_free(grpc_metadata* m); + +void set_metadata_key_val(char *key, char *val, grpc_metadata *arr, size_t i); + +const char* get_metadata_key(grpc_metadata *arr, size_t i); + +const char* get_metadata_val(grpc_metadata *arr, size_t i); + +grpc_op* op_array_create(size_t n); + +void op_array_destroy(grpc_op* op_array, size_t n); + +void op_send_initial_metadata(grpc_op *op_array, size_t i, + grpc_metadata *arr, size_t n_metadata); + +void op_send_initial_metadata_empty(grpc_op *op_array, size_t i); + +void op_send_message(grpc_op *op_array, size_t i, + grpc_byte_buffer *payload); + +void op_send_close_client(grpc_op *op_array, size_t i); + +void op_recv_initial_metadata(grpc_op *op_array, size_t i, + grpc_metadata_array** arr); + +void op_recv_message(grpc_op *op_array, size_t i, + grpc_byte_buffer **payload_recv); + +void op_recv_status_client(grpc_op *op_array, size_t i, + grpc_metadata_array** arr, + grpc_status_code* status, + char **details, size_t* details_capacity); + +void op_recv_close_server(grpc_op *op_array, size_t i, int *was_cancelled); + +void op_send_status_server(grpc_op *op_array, size_t i, + size_t metadata_count, grpc_metadata* m, + grpc_status_code status, char *details); + +grpc_status_code* create_status_code_ptr(); + +void destroy_status_code_ptr(grpc_status_code* p); + +grpc_call_details* create_call_details(); + +void destroy_call_details(grpc_call_details* cd); + +void grpc_channel_watch_connectivity_state_(grpc_channel *channel, + grpc_connectivity_state + last_observed_state, + gpr_timespec* deadline, + grpc_completion_queue *cq, + void *tag); + +#endif //GRPC_HASKELL diff --git a/src/Network/GRPC/Core.chs b/src/Network/GRPC/Core.chs deleted file mode 100644 index cab926a..0000000 --- a/src/Network/GRPC/Core.chs +++ /dev/null @@ -1,73 +0,0 @@ -module Network.GRPC.Core where - --- TODO Remove wrapped function once once https://github.com/haskell/c2hs/issues/117 gets in - -import Foreign.C.String -import Foreign.C.Types -import Foreign.Ptr -import Foreign.Storable -import Foreign.Marshal.Utils - -import Network.GRPC.Core.Time -import Network.GRPC.Core.Constants - -#include -#include -#include - -{#context prefix = "grpc" #} - -{#pointer *gpr_timespec as CTimeSpecPtr -> CTimeSpec #} -{#enum grpc_status_code as StatusCode {underscoreToCase} deriving (Eq)#} - -{#pointer *grpc_completion_queue as CompletionQueue newtype #} -{#pointer *grpc_channel as Channel newtype #} -{#pointer *grpc_server as Server newtype #} -{#pointer *grpc_call as Call newtype #} -{#pointer *grpc_call_details as CallDetails newtype #} -{#pointer *grpc_metadata_array as MetadataArray newtype #} - --- {#enum grpc_arg_type as ArgType {underscoreToCase} deriving (Eq)#} - -newtype ChannelArgs = ChannelArgs [Arg] - --- TODO Storable ChannelArgs - -{#pointer *grpc_channel_args as ChannelArgsPtr -> ChannelArgs #} - -data Arg = Arg { argKey :: String, argValue :: ArgValue } -data ArgValue = ArgString String | ArgInt Int - -{#enum grpc_call_error as CallError {underscoreToCase} deriving (Eq)#} - -{#pointer *grpc_byte_buffer as ByteBuffer newtype #} -{#pointer *grpc_byte_buffer_reader as ByteBufferReader newtype #} - -{#enum grpc_completion_type as CompletionType {underscoreToCase} deriving (Eq)#} -{#pointer *grpc_event as Event newtype #} -{#enum grpc_op_type as OpType {underscoreToCase} deriving (Eq)#} -{#pointer *grpc_op as Op newtype #} - -{#fun grpc_init as ^ {} -> `()'#} -{#fun grpc_shutdown as ^ {} -> `()'#} - -{#fun grpc_completion_queue_create as ^ {`Ptr ()'} -> `CompletionQueue'#} - -{#fun grpc_completion_queue_next_ as ^ {`CompletionQueue', `CTimeSpecPtr', `Ptr ()'} -> `Event'#} -{#fun grpc_completion_queue_pluck_ as ^ {`CompletionQueue', `Ptr ()', `CTimeSpecPtr', `Ptr ()'} -> `Event'#} - -{#fun grpc_completion_queue_shutdown as ^ {`CompletionQueue'} -> `()'#} -{#fun grpc_completion_queue_destroy as ^ {`CompletionQueue'} -> `()'#} - -{#fun grpc_channel_create_call_ as ^ {`Channel', `Call', fromIntegral `PropagationMask', `CompletionQueue', `String', `String', `CTimeSpecPtr', `Ptr ()'} -> `Call'#} -{#fun grpc_insecure_channel_create as ^ {`String', `ChannelArgsPtr', `Ptr ()'} -> `Channel'#} -{#fun grpc_channel_destroy as ^ {`Channel'} -> `()'#} - -{#fun grpc_call_start_batch as ^ {`Call', `Op', `Int', `Ptr ()', `Ptr ()'} -> `CallError'#} -{#fun grpc_call_cancel as ^ {`Call', `Ptr ()'} -> `()'#} -{#fun grpc_call_cancel_with_status as ^ {`Call', `StatusCode', `String', `Ptr ()'} -> `()'#} -{#fun grpc_call_destroy as ^ {`Call'} -> `()'#} - --- Server stuff - ---{#fun grpc_server_request_call as ^ {`Server',with* `Call' peek*, `CallDetails', `MetadataArray', `CompletionQueue', `CompletionQueue', `Ptr ()'} -> `CallError'#} diff --git a/src/Network/GRPC/Core/Time.chs b/src/Network/GRPC/Core/Time.chs deleted file mode 100644 index 907b3f8..0000000 --- a/src/Network/GRPC/Core/Time.chs +++ /dev/null @@ -1,24 +0,0 @@ -module Network.GRPC.Core.Time where - -import Control.Applicative -import Control.Monad -import Foreign.C.Types -import Foreign.Storable -import System.Clock - -#include - -{#context prefix = "grp" #} - -newtype CTimeSpec = CTimeSpec { timeSpec :: TimeSpec } - -instance Storable CTimeSpec where - sizeOf _ = {#sizeof gpr_timespec #} - alignment _ = {#alignof gpr_timespec #} - peek p = fmap CTimeSpec $ TimeSpec - <$> liftM fromIntegral ({#get gpr_timespec->tv_sec #} p) - <*> liftM fromIntegral ({#get gpr_timespec->tv_nsec #} p) - poke p x = do - {#set gpr_timespec.tv_sec #} p (fromIntegral $ sec $ timeSpec x) - {#set gpr_timespec.tv_nsec #} p (fromIntegral $ nsec $ timeSpec x) - diff --git a/src/Network/GRPC/Unsafe.chs b/src/Network/GRPC/Unsafe.chs new file mode 100644 index 0000000..f8e4d18 --- /dev/null +++ b/src/Network/GRPC/Unsafe.chs @@ -0,0 +1,234 @@ +module Network.GRPC.Unsafe where + +import Control.Monad + +import Foreign.C.Types +import Foreign.Ptr +import Foreign.Storable + +{#import Network.GRPC.Unsafe.Time#} +import Network.GRPC.Unsafe.Constants +{#import Network.GRPC.Unsafe.ByteBuffer#} +{#import Network.GRPC.Unsafe.Op#} +{#import Network.GRPC.Unsafe.Metadata#} + +#include +#include +#include + +{#context prefix = "grpc" #} + +{#pointer *grpc_completion_queue as CompletionQueue newtype #} + +-- | Represents a connection to a server. Created on the client side. +{#pointer *grpc_channel as Channel newtype #} + +-- | Represents a server. Created on the server side. +{#pointer *grpc_server as Server newtype #} + +-- | Represents a pointer to a call. To users of the gRPC core library, this +-- type is abstract; we have no access to its fields. +{#pointer *grpc_call as Call newtype #} +{#pointer *grpc_call_details as CallDetails newtype #} + +{#fun create_call_details as ^ {} -> `CallDetails'#} +{#fun destroy_call_details as ^ {`CallDetails'} -> `()'#} + +-- instance def adapted from +-- https://mail.haskell.org/pipermail/c2hs/2007-June/000800.html +instance Storable Call where + sizeOf (Call r) = sizeOf r + alignment (Call r) = alignment r + peek p = fmap Call (peek (castPtr p)) + poke p (Call r) = poke (castPtr p) r + +-- {#enum grpc_arg_type as ArgType {underscoreToCase} deriving (Eq)#} + +newtype ChannelArgs = ChannelArgs [Arg] + +-- TODO Storable ChannelArgs + +{#pointer *grpc_channel_args as ChannelArgsPtr -> ChannelArgs #} + +data Arg = Arg { argKey :: String, argValue :: ArgValue } +data ArgValue = ArgString String | ArgInt Int + +-- | A 'Tag' is an identifier that is used with a 'CompletionQueue' to signal +-- that the corresponding operation has completed. +newtype Tag = Tag {unTag :: Ptr ()} deriving (Show, Eq) + +tag :: Int -> Tag +tag = Tag . plusPtr nullPtr + +instance Storable Tag where + sizeOf (Tag p) = sizeOf p + alignment (Tag p) = alignment p + peek p = fmap Tag (peek (castPtr p)) + poke p (Tag r) = poke (castPtr p) r + +-- | 'Reserved' is an as-yet unused void pointer param to several gRPC +-- functions. Create one with 'reserved'. +newtype Reserved = Reserved {unReserved :: Ptr ()} + +reserved :: Reserved +reserved = Reserved nullPtr + +{#enum grpc_call_error as CallError {underscoreToCase} deriving (Show, Eq)#} + +-- | Represents the type of a completion event on a 'CompletionQueue'. +-- 'QueueShutdown' only occurs if the queue is shutting down (e.g., when the +-- server is stopping). 'QueueTimeout' occurs when we reached the deadline +-- before receiving an 'OpComplete'. +{#enum grpc_completion_type as CompletionType {underscoreToCase} + deriving (Show, Eq)#} + +-- | Represents one event received over a 'CompletionQueue'. +data Event = Event {eventCompletionType :: CompletionType, + eventSuccess :: Bool, + eventTag :: Tag} + deriving (Show, Eq) + +instance Storable Event where + sizeOf _ = {#sizeof grpc_event#} + alignment _ = {#alignof grpc_event#} + peek p = Event <$> liftM (toEnum . fromIntegral) ({#get grpc_event->type#} p) + <*> liftM (> 0) ({#get grpc_event->success#} p) + <*> liftM Tag ({#get grpc_event->tag#} p) + poke p (Event c s t) = do + {#set grpc_event.type#} p $ fromIntegral $ fromEnum c + {#set grpc_event.success#} p $ if s then 1 else 0 + {#set grpc_event.tag#} p (unTag t) + +castPeek :: Storable b => Ptr a -> IO b +castPeek p = peek (castPtr p) + +{#enum grpc_connectivity_state as ConnectivityState {underscoreToCase} + deriving (Show, Eq)#} + +{#fun grpc_init as ^ {} -> `()'#} + +{#fun grpc_shutdown as ^ {} -> `()'#} + +{#fun grpc_version_string as ^ {} -> `String' #} + +-- | Create a new 'CompletionQueue'. See the docs for +-- 'grpcCompletionQueueShutdown' for instructions on how to clean up afterwards. +{#fun grpc_completion_queue_create as ^ + {unReserved `Reserved'} -> `CompletionQueue'#} + +-- | Block until we get the next event off the given 'CompletionQueue', +-- using the given 'CTimeSpecPtr' as a deadline specifying the max amount of +-- time to block. +{#fun grpc_completion_queue_next_ as ^ + {`CompletionQueue', `CTimeSpecPtr',unReserved `Reserved'} + -> `Event' castPeek*#} + +-- | Block until we get the next event with the given 'Tag' off the given +-- 'CompletionQueue'. NOTE: No more than 'maxCompletionQueuePluckers' can call +-- this function concurrently! +{#fun grpc_completion_queue_pluck_ as ^ + {`CompletionQueue',unTag `Tag', `CTimeSpecPtr',unReserved `Reserved'} + -> `Event' castPeek*#} + +-- | Stops a completion queue. After all events are drained, +-- 'grpcCompletionQueueNext' will yield 'QueueShutdown' and then it is safe to +-- call 'grpcCompletionQueueDestroy'. After calling this, we must ensure no +-- new work is pushed to the queue. +{#fun grpc_completion_queue_shutdown as ^ {`CompletionQueue'} -> `()'#} + +-- | Destroys a 'CompletionQueue'. See 'grpcCompletionQueueShutdown' for how to +-- use safely. Caller must ensure no threads are calling +-- 'grpcCompletionQueueNext'. +{#fun grpc_completion_queue_destroy as ^ {`CompletionQueue'} -> `()'#} + +-- | Sets up a call on the client. The first string is the endpoint name (e.g. +-- @"/foo"@) and the second is the host name. In my tests so far, the host name +-- here doesn't seem to be used... it looks like the host and port specified in +-- 'grpcInsecureChannelCreate' is the one that is actually used. +{#fun grpc_channel_create_call_ as ^ + {`Channel', `Call', fromIntegral `PropagationMask', `CompletionQueue', + `String', `String', `CTimeSpecPtr',unReserved `Reserved'} + -> `Call'#} + +-- | Create a channel (on the client) to the server. The first argument is +-- host and port, e.g. @"localhost:50051"@. The gRPC docs say that most clients +-- are expected to pass a 'nullPtr' for the 'ChannelArgsPtr'. We currently don't +-- expose any functions for creating channel args, since they are entirely +-- undocumented. +{#fun grpc_insecure_channel_create as ^ + {`String', `ChannelArgsPtr',unReserved `Reserved'} -> `Channel'#} + +-- | get the current connectivity state of the given channel. The 'Bool' is +-- True if we should try to connect the channel. +{#fun grpc_channel_check_connectivity_state as ^ + {`Channel', `Bool'} -> `ConnectivityState'#} + +-- | When the current connectivity state changes from the given +-- 'ConnectivityState', enqueues a success=1 tag on the given 'CompletionQueue'. +-- If the deadline is reached, enqueues a tag with success=0. +{#fun grpc_channel_watch_connectivity_state_ as ^ + {`Channel', `ConnectivityState', `CTimeSpecPtr', `CompletionQueue', + unTag `Tag'} + -> `()'#} + +{#fun grpc_channel_ping as ^ + {`Channel', `CompletionQueue', unTag `Tag',unReserved `Reserved'} -> `()' #} + +{#fun grpc_channel_destroy as ^ {`Channel'} -> `()'#} + +-- | Starts executing a batch of ops in the given 'OpArray'. Does not block. +-- When complete, an event identified by the given 'Tag' +-- will be pushed onto the 'CompletionQueue' that was associated with the given +-- 'Call' when the 'Call' was created. +{#fun grpc_call_start_batch as ^ + {`Call', `OpArray', `Int', unTag `Tag',unReserved `Reserved'} -> `CallError'#} + +{#fun grpc_call_cancel as ^ {`Call',unReserved `Reserved'} -> `()'#} + +{#fun grpc_call_cancel_with_status as ^ + {`Call', `StatusCode', `String',unReserved `Reserved'} -> `()'#} + +{#fun grpc_call_destroy as ^ {`Call'} -> `()'#} + +-- Server stuff + +{#fun grpc_server_create as ^ + {`ChannelArgsPtr',unReserved `Reserved'} -> `Server'#} + +{#fun grpc_server_register_completion_queue as ^ + {`Server', `CompletionQueue', unReserved `Reserved'} -> `()'#} + +{#fun grpc_server_add_insecure_http2_port as ^ + {`Server', `String'} -> `Int'#} + +-- | Starts a server. To shut down the server, call these in order: +-- 'grpcServerShutdownAndNotify', 'grpcServerCancelAllCalls', +-- 'grpcServerDestroy'. After these are done, shut down and destroy the server's +-- completion queue with 'grpcCompletionQueueShutdown' followed by +-- 'grpcCompletionQueueDestroy'. +{#fun grpc_server_start as ^ {`Server'} -> `()'#} + +{#fun grpc_server_shutdown_and_notify as ^ + {`Server', `CompletionQueue',unTag `Tag'} -> `()'#} + +{#fun grpc_server_cancel_all_calls as ^ + {`Server'} -> `()'#} + +-- | Destroy the server. See 'grpcServerStart' for complete shutdown +-- instructions. +{#fun grpc_server_destroy as ^ {`Server'} -> `()'#} + +-- | Request a call. +-- NOTE: You need to call 'grpcCompletionQueueNext' or +-- 'grpcCompletionQueuePluck' on the completion queue with the given +-- 'Tag' before using the 'Call' pointer again. +{#fun grpc_server_request_call as ^ + {`Server',id `Ptr Call', `CallDetails', `MetadataArray', + `CompletionQueue', `CompletionQueue',unTag `Tag'} + -> `CallError'#} + +-- | TODO: I am not yet sure how this function is supposed to be used. +{#fun grpc_server_request_registered_call as ^ + {`Server',unTag `Tag',id `Ptr Call', `CTimeSpecPtr', `MetadataArray' id, + id `Ptr ByteBuffer', `CompletionQueue', `CompletionQueue',unTag `Tag'} + -> `CallError'#} diff --git a/src/Network/GRPC/Unsafe/ByteBuffer.chs b/src/Network/GRPC/Unsafe/ByteBuffer.chs new file mode 100644 index 0000000..36bc2fc --- /dev/null +++ b/src/Network/GRPC/Unsafe/ByteBuffer.chs @@ -0,0 +1,85 @@ +module Network.GRPC.Unsafe.ByteBuffer where + +#include +#include +#include +#include + +#include + +{#import Network.GRPC.Unsafe.Slice#} +import Control.Exception (bracket) +import qualified Data.ByteString as B +import Foreign.Ptr +import Foreign.C.Types +import Foreign.Storable + +{#enum grpc_compression_algorithm as GRPCCompressionAlgorithm + {underscoreToCase} deriving (Eq) #} + +-- | Represents a pointer to a gRPC byte buffer containing 1 or more 'Slice's. +-- Must be destroyed manually with 'grpcByteBufferDestroy'. +{#pointer *grpc_byte_buffer as ByteBuffer newtype #} + +--Trivial Storable instance because 'ByteBuffer' type is a pointer. +instance Storable ByteBuffer where + sizeOf (ByteBuffer r) = sizeOf r + alignment (ByteBuffer r) = alignment r + peek p = fmap ByteBuffer (peek (castPtr p)) + poke p (ByteBuffer r) = poke (castPtr p) r + +--TODO: When I switched this to a ForeignPtr with a finalizer, I got errors +--about freeing un-malloced memory. Calling the same destroy function by hand +--works fine in the same code, though. Until I find a workaround, going to free +--everything by hand. + +-- | Represents a pointer to a ByteBufferReader. Must be destroyed manually with +-- 'byteBufferReaderDestroy'. +{#pointer *grpc_byte_buffer_reader as ByteBufferReader newtype #} + +-- | Creates a pointer to a 'ByteBuffer'. This is used to receive data when +-- creating a GRPC_OP_RECV_MESSAGE op. +{#fun create_receiving_byte_buffer as ^ {} -> `Ptr ByteBuffer' id#} + +{#fun destroy_receiving_byte_buffer as ^ {id `Ptr ByteBuffer'} -> `()'#} + +withByteBufferPtr :: (Ptr ByteBuffer -> IO a) -> IO a +withByteBufferPtr + = bracket createReceivingByteBuffer destroyReceivingByteBuffer + +-- | Takes an array of slices and the length of the array and returns a +-- 'ByteBuffer'. +{#fun grpc_raw_byte_buffer_create as ^ {`Slice', `CULong'} -> `ByteBuffer'#} + +{#fun grpc_raw_compressed_byte_buffer_create as ^ + {`Slice', `CULong', `GRPCCompressionAlgorithm'} -> `ByteBuffer'#} + +{#fun grpc_byte_buffer_copy as ^ {`ByteBuffer'} -> `ByteBuffer'#} + +{#fun grpc_byte_buffer_length as ^ {`ByteBuffer'} -> `CULong'#} + +{#fun grpc_byte_buffer_destroy as ^ {`ByteBuffer'} -> `()'#} + +{#fun byte_buffer_reader_create as ^ {`ByteBuffer'} -> `ByteBufferReader'#} + +{#fun byte_buffer_reader_destroy as ^ {`ByteBufferReader'} -> `()'#} + +{#fun grpc_byte_buffer_reader_next as ^ + {`ByteBufferReader', `Slice'} -> `CInt'#} + +-- | Returns a 'Slice' containing the entire contents of the 'ByteBuffer' being +-- read by the given 'ByteBufferReader'. +{#fun grpc_byte_buffer_reader_readall_ as ^ {`ByteBufferReader'} -> `Slice'#} + +{#fun grpc_raw_byte_buffer_from_reader as ^ + {`ByteBufferReader'} -> `ByteBuffer'#} + +withByteStringAsByteBuffer :: B.ByteString -> (ByteBuffer -> IO a) -> IO a +withByteStringAsByteBuffer bs f = do + bracket (byteStringToSlice bs) freeSlice $ \slice -> do + bracket (grpcRawByteBufferCreate slice 1) grpcByteBufferDestroy f + +copyByteBufferToByteString :: ByteBuffer -> IO B.ByteString +copyByteBufferToByteString bb = do + bracket (byteBufferReaderCreate bb) byteBufferReaderDestroy $ \bbr -> do + bracket (grpcByteBufferReaderReadall bbr) freeSlice sliceToByteString diff --git a/src/Network/GRPC/Core/Constants.hsc b/src/Network/GRPC/Unsafe/Constants.hsc similarity index 81% rename from src/Network/GRPC/Core/Constants.hsc rename to src/Network/GRPC/Unsafe/Constants.hsc index 86989f0..c0ff5b2 100644 --- a/src/Network/GRPC/Core/Constants.hsc +++ b/src/Network/GRPC/Unsafe/Constants.hsc @@ -1,6 +1,6 @@ {-# LANGUAGE GeneralizedNewtypeDeriving #-} -module Network.GRPC.Core.Constants where +module Network.GRPC.Unsafe.Constants where #include "grpc/grpc.h" #include "grpc/impl/codegen/propagation_bits.h" @@ -20,6 +20,9 @@ writeBufferHint = #const GRPC_WRITE_BUFFER_HINT writeNoCompress :: Int writeNoCompress = #const GRPC_WRITE_NO_COMPRESS +maxCompletionQueuePluckers :: Int +maxCompletionQueuePluckers = #const GRPC_MAX_COMPLETION_QUEUE_PLUCKERS + newtype PropagationMask = PropagationMask {unPropagationMask :: Int} deriving (Show, Eq, Ord, Integral, Enum, Real, Num) @@ -37,3 +40,6 @@ propagateCensusTracingContext = propagateCancellation :: PropagationMask propagateCancellation = PropagationMask $ #const GRPC_PROPAGATE_CANCELLATION + +propagateDefaults :: PropagationMask +propagateDefaults = PropagationMask $ #const GRPC_PROPAGATE_DEFAULTS diff --git a/src/Network/GRPC/Unsafe/Metadata.chs b/src/Network/GRPC/Unsafe/Metadata.chs new file mode 100644 index 0000000..4603940 --- /dev/null +++ b/src/Network/GRPC/Unsafe/Metadata.chs @@ -0,0 +1,70 @@ +module Network.GRPC.Unsafe.Metadata where + +import Control.Exception +import Control.Monad +import Data.ByteString (ByteString, useAsCString, packCString) +import Foreign.C.String +import Foreign.Ptr +import Foreign.Storable + + +#include +#include +#include +#include + +-- | Represents a pointer to one or more metadata key/value pairs. This type +-- is intended to be used when sending metadata. +{#pointer *grpc_metadata as MetadataKeyValPtr newtype#} + +-- | Represents a pointer to a grpc_metadata_array. Must be destroyed with +-- 'metadataArrayDestroy'. This type is intended for receiving metadata. +-- This can be populated by passing it to e.g. 'grpcServerRequestCall'. +-- TODO: we need a function for getting a 'MetadataKeyValPtr' +-- and length from this type. +{#pointer *grpc_metadata_array as MetadataArray newtype#} + +instance Storable MetadataArray where + sizeOf (MetadataArray r) = sizeOf r + alignment (MetadataArray r) = alignment r + peek p = fmap MetadataArray (peek (castPtr p)) + poke p (MetadataArray r) = poke (castPtr p) r + +-- | Create an empty 'MetadataArray'. Returns a pointer to it so that we can +-- pass it to the appropriate op creation functions. +{#fun metadata_array_create as ^ {} -> `Ptr MetadataArray' id#} + +{#fun metadata_array_destroy as ^ {id `Ptr MetadataArray'} -> `()'#} + +-- Note: I'm pretty sure we must call out to C to allocate these +-- because they are nested structs. +-- | Allocates space for exactly n metadata key/value pairs. +{#fun metadata_alloc as ^ {`Int'} -> `MetadataKeyValPtr'#} + +{#fun metadata_free as ^ {`MetadataKeyValPtr'} -> `()'#} + +-- | Sets a metadata key/value pair at the given index in the +-- 'MetadataKeyValPtr'. No error checking is performed to ensure the index is +-- in bounds! +{#fun set_metadata_key_val as setMetadataKeyVal + {useAsCString* `ByteString', useAsCString* `ByteString', + `MetadataKeyValPtr', `Int'} -> `()'#} + +{#fun get_metadata_key as getMetadataKey' + {`MetadataKeyValPtr', `Int'} -> `CString'#} + +{#fun get_metadata_val as getMetadataVal' + {`MetadataKeyValPtr', `Int'} -> `CString'#} + +--TODO: The test suggests this is leaking. +withMetadataArrayPtr :: (Ptr MetadataArray -> IO a) -> IO a +withMetadataArrayPtr = bracket metadataArrayCreate metadataArrayDestroy + +withMetadataKeyValPtr :: Int -> (MetadataKeyValPtr -> IO a) -> IO a +withMetadataKeyValPtr i f = bracket (metadataAlloc i) metadataFree f + +getMetadataKey :: MetadataKeyValPtr -> Int -> IO ByteString +getMetadataKey m = getMetadataKey' m >=> packCString + +getMetadataVal :: MetadataKeyValPtr -> Int -> IO ByteString +getMetadataVal m = getMetadataVal' m >=> packCString diff --git a/src/Network/GRPC/Unsafe/Op.chs b/src/Network/GRPC/Unsafe/Op.chs new file mode 100644 index 0000000..9f6346e --- /dev/null +++ b/src/Network/GRPC/Unsafe/Op.chs @@ -0,0 +1,102 @@ +module Network.GRPC.Unsafe.Op where + +import Control.Exception +import Control.Monad +import qualified Data.ByteString as B +import Foreign.C.String +import Foreign.C.Types +import Foreign.Ptr +{#import Network.GRPC.Unsafe.ByteBuffer#} +{#import Network.GRPC.Unsafe.Metadata#} + +#include +#include +#include +#include + +{#enum grpc_op_type as OpType {underscoreToCase} deriving (Eq)#} +{#enum grpc_status_code as StatusCode {underscoreToCase} deriving (Eq)#} + +-- NOTE: We don't alloc the space for the enum in Haskell because enum size is +-- implementation-dependent. See: +-- http://stackoverflow.com/questions/1113855/is-the-sizeofenum-sizeofint-always +-- | Allocates space for a 'StatusCode' and returns a pointer to it. Used to +-- receive a status code from the server with 'opRecvStatusClient'. +{#fun create_status_code_ptr as ^ {} -> `Ptr StatusCode' castPtr#} + +{#fun destroy_status_code_ptr as ^ {castPtr `Ptr StatusCode'} -> `()' #} + +-- | Represents an array of ops to be passed to 'grpcCallStartBatch'. +-- Create an array with 'opArrayCreate', then create individual ops in the array +-- using the op* functions. For these functions, the first two arguments are +-- always the OpArray to mutate and the index in the array at which to create +-- the new op. After processing the batch and getting out any results, call +-- 'opArrayDestroy'. +{#pointer *grpc_op as OpArray newtype #} + +-- | Creates an empty 'OpArray' with space for the given number of ops. +{#fun op_array_create as ^ {`Int'} -> `OpArray'#} + +-- | Destroys an 'OpArray' of the given size. +{#fun op_array_destroy as ^ {`OpArray', `Int'} -> `()'#} + +-- | brackets creating and destroying an 'OpArray' with the given size. +withOpArray :: Int -> (OpArray -> IO a) -> IO a +withOpArray n f = bracket (opArrayCreate n) (flip opArrayDestroy n) f + +-- | Creates an op of type GRPC_OP_SEND_INITIAL_METADATA at the specified +-- index of the given 'OpArray', containing the given +-- metadata. The metadata is copied and can be destroyed after calling this +-- function. +{#fun op_send_initial_metadata as ^ + {`OpArray', `Int', `MetadataKeyValPtr', `Int'} -> `()'#} + +-- | Creates an op of type GRPC_OP_SEND_INITIAL_METADATA at the specified +-- index of the given 'OpArray'. The op will contain no metadata. +{#fun op_send_initial_metadata_empty as ^ {`OpArray', `Int'} -> `()'#} + +-- | Creates an op of type GRPC_OP_SEND_MESSAGE at the specified index of +-- the given 'OpArray'. The given 'ByteBuffer' is +-- copied and can be destroyed after calling this function. +{#fun op_send_message as ^ {`OpArray', `Int', `ByteBuffer'} -> `()'#} + +-- | Creates an 'Op' of type GRPC_OP_SEND_CLOSE_FROM_CLIENT at the specified +-- index of the given 'OpArray'. +{#fun op_send_close_client as ^ {`OpArray', `Int'} -> `()'#} + +-- | Creates an op of type GRPC_OP_RECV_INITIAL_METADATA at the specified +-- index of the given 'OpArray', and ties the given +-- 'MetadataArray' pointer to that op so that the received metadata can be +-- accessed. It is the user's responsibility to destroy the 'MetadataArray'. +{#fun op_recv_initial_metadata as ^ + {`OpArray', `Int',id `Ptr MetadataArray'} -> `()'#} + +-- | Creates an op of type GRPC_OP_RECV_MESSAGE at the specified index of the +-- given 'OpArray', and ties the given +-- 'ByteBuffer' pointer to that op so that the received message can be +-- accessed. It is the user's responsibility to destroy the 'ByteBuffer'. +{#fun op_recv_message as ^ {`OpArray', `Int',id `Ptr ByteBuffer'} -> `()'#} + +-- | Creates an op of type GRPC_OP_RECV_STATUS_ON_CLIENT at the specified +-- index of the given 'OpArray', and ties all the +-- input pointers to that op so that the results of the receive can be +-- accessed. It is the user's responsibility to free all the input args after +-- this call. +{#fun op_recv_status_client as ^ + {`OpArray', `Int',id `Ptr MetadataArray', castPtr `Ptr StatusCode', + castPtr `Ptr CString', `Int'} + -> `()'#} + +-- | Creates an op of type GRPC_OP_RECV_CLOSE_ON_SERVER at the specified index +-- of the given 'OpArray', and ties the input +-- pointer to that op so that the result of the receive can be accessed. It is +-- the user's responsibility to free the pointer. +{#fun op_recv_close_server as ^ {`OpArray', `Int', id `Ptr CInt'} -> `()'#} + +-- | Creates an op of type GRPC_OP_SEND_STATUS_FROM_SERVER at the specified +-- index of the given 'OpArray'. The given +-- Metadata and string are copied when creating the op, and can be safely +-- destroyed immediately after calling this function. +{#fun op_send_status_server as ^ + {`OpArray', `Int', `Int', `MetadataKeyValPtr', `StatusCode', `String'} + -> `()'#} diff --git a/src/Network/GRPC/Unsafe/Slice.chs b/src/Network/GRPC/Unsafe/Slice.chs new file mode 100644 index 0000000..160a15d --- /dev/null +++ b/src/Network/GRPC/Unsafe/Slice.chs @@ -0,0 +1,54 @@ +module Network.GRPC.Unsafe.Slice where + +#include +#include + +import qualified Data.ByteString as B +import Control.Applicative +import Data.Word +import Foreign.C.String +import Foreign.C.Types +import Foreign.Ptr +import Foreign.Marshal.Alloc + +-- | A 'Slice' is gRPC's string type. We can easily convert these to and from +-- ByteStrings. This type is a pointer to a C type. +{#pointer *gpr_slice as Slice newtype #} + +-- TODO: we could also represent this type as 'Ptr Slice', by doing this: +-- newtype Slice = Slice {#type gpr_slice#} +-- This would have no practical effect, but it would communicate intent more +-- clearly by emphasizing that the type is indeed a pointer and that the data +-- it is pointing to might change/be destroyed after running IO actions. To make +-- the change, we would just need to change all occurrences of 'Slice' to +-- 'Ptr Slice' and add 'castPtr' in and out marshallers. +-- This seems like the right way to do it, but c2hs doesn't make it easy, so +-- maybe the established idiom is to do what c2hs does. + +-- | Get the length of a slice. +{#fun gpr_slice_length_ as ^ {`Slice'} -> `CULong'#} + +-- | Returns a pointer to the start of the character array contained by the +-- slice. +{#fun gpr_slice_start_ as ^ {`Slice'} -> `Ptr CChar' castPtr #} + +{#fun gpr_slice_from_copied_string_ as ^ {`CString'} -> `Slice'#} + +-- | Properly cleans up all memory used by a 'Slice'. Danger: the Slice should +-- not be used after this function is called on it. +{#fun free_slice as ^ {`Slice'} -> `()'#} + +-- | Copies a 'Slice' to a ByteString. +-- TODO: there are also non-copying unsafe ByteString construction functions. +-- We could gain some speed by using them. +-- idea would be something :: (ByteString -> Response) -> IO () that handles +-- getting and freeing the slice behind the scenes. +sliceToByteString :: Slice -> IO B.ByteString +sliceToByteString slice = do + len <- fmap fromIntegral $ gprSliceLength slice + str <- gprSliceStart slice + B.packCStringLen (str, len) + +-- | Copies a 'ByteString' to a 'Slice'. +byteStringToSlice :: B.ByteString -> IO Slice +byteStringToSlice bs = B.useAsCString bs gprSliceFromCopiedString diff --git a/src/Network/GRPC/Unsafe/Time.chs b/src/Network/GRPC/Unsafe/Time.chs new file mode 100644 index 0000000..6da98c3 --- /dev/null +++ b/src/Network/GRPC/Unsafe/Time.chs @@ -0,0 +1,48 @@ +module Network.GRPC.Unsafe.Time where + +import Control.Applicative +import Control.Monad +import Foreign.C.Types +import Foreign.Storable +import System.Clock + +#include +#include + +{#context prefix = "grp" #} + +newtype CTimeSpec = CTimeSpec { timeSpec :: TimeSpec } + +instance Storable CTimeSpec where + sizeOf _ = {#sizeof gpr_timespec #} + alignment _ = {#alignof gpr_timespec #} + peek p = fmap CTimeSpec $ TimeSpec + <$> liftM fromIntegral ({#get gpr_timespec->tv_sec #} p) + <*> liftM fromIntegral ({#get gpr_timespec->tv_nsec #} p) + poke p x = do + {#set gpr_timespec.tv_sec #} p (fromIntegral $ sec $ timeSpec x) + {#set gpr_timespec.tv_nsec #} p (fromIntegral $ nsec $ timeSpec x) + +{#enum gpr_clock_type as ClockType {underscoreToCase} deriving (Eq) #} + +-- | A pointer to a CTimeSpec. Must be destroyed manually with +-- 'timespecDestroy'. +{#pointer *gpr_timespec as CTimeSpecPtr -> CTimeSpec #} + +{#fun timespec_destroy as ^ {`CTimeSpecPtr'} -> `()'#} + +{#fun gpr_inf_future_ as ^ {`ClockType'} -> `CTimeSpecPtr'#} + +-- | Get the current time for the given 'ClockType'. Warning: 'GprTimespan' will +-- cause a crash. Probably only need to use GprClockMonotonic, which returns 0. +{#fun gpr_now_ as ^ {`ClockType'} -> `CTimeSpecPtr'#} + +{#fun gpr_time_to_millis_ as ^ {`CTimeSpecPtr'} -> `Int'#} + +-- | Returns a GprClockMonotonic representing a deadline n seconds in the +-- future. +{#fun seconds_to_deadline as ^ {`Int'} -> `CTimeSpecPtr'#} + +-- | Returns a GprClockMonotonic representing a deadline n milliseconds +-- in the future. +{#fun millis_to_deadline as ^ {`Int'} -> `CTimeSpecPtr'#} diff --git a/tests/Properties.hs b/tests/Properties.hs index 377b6b5..e866a5f 100644 --- a/tests/Properties.hs +++ b/tests/Properties.hs @@ -1,2 +1,217 @@ +{-# LANGUAGE OverloadedStrings #-} + +import Control.Concurrent.Async +import Network.GRPC.Unsafe +import Network.GRPC.Unsafe.Slice +import Network.GRPC.Unsafe.ByteBuffer +import Network.GRPC.Unsafe.Time +import Network.GRPC.Unsafe.Metadata +import Network.GRPC.Unsafe.Op +import Network.GRPC.Unsafe.Constants +import qualified Data.ByteString as B +import Data.Time.Clock.POSIX +import GHC.Exts +import Foreign.Marshal.Alloc +import Foreign.Storable +import Foreign.Ptr +import Test.Tasty +import Test.Tasty.HUnit as HU + +roundtripSlice :: B.ByteString -> TestTree +roundtripSlice bs = testCase "Slice C bindings roundtrip" $ do + slice <- byteStringToSlice bs + unslice <- sliceToByteString slice + bs HU.@?= unslice + freeSlice slice + +roundtripByteBuffer :: B.ByteString -> TestTree +roundtripByteBuffer bs = testCase "ByteBuffer C bindings roundtrip" $ do + slice <- byteStringToSlice bs + buffer <- grpcRawByteBufferCreate slice 1 + reader <- byteBufferReaderCreate buffer + readSlice <- grpcByteBufferReaderReadall reader + bs' <- sliceToByteString readSlice + bs' HU.@?= bs + --clean up + freeSlice slice + byteBufferReaderDestroy reader + grpcByteBufferDestroy buffer + freeSlice readSlice + +currTimeMillis :: ClockType -> IO Int +currTimeMillis t = do + gprT <- gprNow t + tMillis <- gprTimeToMillis gprT + timespecDestroy gprT + return tMillis + +testNow :: TestTree +testNow = testCase "create and destroy various clock types" $ do + _ <- currTimeMillis GprClockMonotonic + _ <- currTimeMillis GprClockRealtime + _ <- currTimeMillis GprClockPrecise + return () + +testMetadata :: TestTree +testMetadata = testCase "metadata setter/getter C bindings roundtrip" $ do + m <- metadataAlloc 3 + setMetadataKeyVal "hello" "world" m 0 + setMetadataKeyVal "foo" "bar" m 1 + setMetadataKeyVal "Haskell" "Curry" m 2 + k0 <- getMetadataKey m 0 + v0 <- getMetadataVal m 0 + k1 <- getMetadataKey m 1 + v1 <- getMetadataVal m 1 + k2 <- getMetadataKey m 2 + v2 <- getMetadataVal m 2 + k0 HU.@?= "hello" + v0 HU.@?= "world" + k1 HU.@?= "foo" + v1 HU.@?= "bar" + k2 HU.@?= "Haskell" + v2 HU.@?= "Curry" + metadataFree m + +assertCqEventComplete :: Event -> IO () +assertCqEventComplete e = do + eventCompletionType e HU.@?= OpComplete + eventSuccess e HU.@?= True + +testPayloadClient :: IO () +testPayloadClient = do + client <- grpcInsecureChannelCreate "localhost:50051" nullPtr reserved + cq <- grpcCompletionQueueCreate reserved + withMetadataArrayPtr $ \initialMetadataRecv -> do + withMetadataArrayPtr $ \trailingMetadataRecv -> do + withByteBufferPtr $ \clientRecvBB -> do + deadline <- secondsToDeadline 5 + pluckDeadline <- secondsToDeadline 10 + clientCall <- grpcChannelCreateCall + client (Call nullPtr) propagateDefaults cq + "/foo" "localhost" deadline reserved + --send request + withOpArray 6 $ \ops -> do + opSendInitialMetadataEmpty ops 0 + withByteStringAsByteBuffer "hello world" $ \requestPayload -> do + opSendMessage ops 1 requestPayload + opSendCloseClient ops 2 + opRecvInitialMetadata ops 3 initialMetadataRecv + opRecvMessage ops 4 clientRecvBB + statusCodePtr <- createStatusCodePtr + let cstringCapacity = 32 + cStringPtr <- malloc + cstring <- mallocBytes cstringCapacity + poke cStringPtr cstring + opRecvStatusClient ops 5 trailingMetadataRecv statusCodePtr + cStringPtr + cstringCapacity + --send client request + requestError <- grpcCallStartBatch clientCall ops 6 (tag 1) reserved + clientRequestCqEvent <- grpcCompletionQueuePluck + cq (tag 1) pluckDeadline reserved + assertCqEventComplete clientRequestCqEvent + requestError HU.@?= CallOk + free cstring + free cStringPtr + destroyStatusCodePtr statusCodePtr + --verify response received + responseRecv <- peek clientRecvBB + responseRecvBS <- copyByteBufferToByteString responseRecv + responseRecvBS HU.@?= "hello you" + grpcCompletionQueueShutdown cq + grpcCallDestroy clientCall + --TODO: the grpc test drains the cq here + grpcCompletionQueueDestroy cq + grpcChannelDestroy client + +testPayloadServer :: IO () +testPayloadServer = do + server <- grpcServerCreate nullPtr reserved + cq <- grpcCompletionQueueCreate reserved + grpcServerRegisterCompletionQueue server cq reserved + _ <- grpcServerAddInsecureHttp2Port server "localhost:50051" + grpcServerStart server + serverCallPtr <- malloc + withMetadataArrayPtr $ \requestMetadataRecv -> do + withByteBufferPtr $ \recvBufferPtr -> do + callDetails <- createCallDetails + requestMetadataRecv' <- peek requestMetadataRecv + recvRequestError <- grpcServerRequestCall + server serverCallPtr callDetails + requestMetadataRecv' cq cq (tag 101) + pluckDeadline' <- secondsToDeadline 10 + requestCallCqEvent <- grpcCompletionQueuePluck cq (tag 101) + pluckDeadline' + reserved + assertCqEventComplete requestCallCqEvent + recvRequestError HU.@?= CallOk + destroyCallDetails callDetails + --receive request + withOpArray 2 $ \recvOps -> do + opSendInitialMetadataEmpty recvOps 0 + opRecvMessage recvOps 1 recvBufferPtr + serverCall <- peek serverCallPtr + recvBatchError <- grpcCallStartBatch serverCall recvOps 2 + (tag 102) reserved + recvBatchError HU.@?= CallOk + pluckDeadline'' <- secondsToDeadline 10 + recvCqEvent <- grpcCompletionQueuePluck cq (tag 102) + pluckDeadline'' + reserved + assertCqEventComplete recvCqEvent + --send response + withOpArray 3 $ \respOps -> do + withByteStringAsByteBuffer "hello you" $ \respbb -> do + cancelledPtr <- malloc + opRecvCloseServer respOps 0 cancelledPtr + opSendMessage respOps 1 respbb + opSendStatusServer respOps 2 0 (MetadataKeyValPtr nullPtr) + GrpcStatusOk "ok" + serverCall <- peek serverCallPtr + respBatchError <- grpcCallStartBatch serverCall respOps 3 + (tag 103) reserved + respBatchError HU.@?= CallOk + pluckDeadline''' <- secondsToDeadline 10 + respCqEvent <- grpcCompletionQueuePluck cq (tag 103) + pluckDeadline''' + reserved + assertCqEventComplete respCqEvent + --verify data was received + serverRecv <- peek recvBufferPtr + serverRecvBS <- copyByteBufferToByteString serverRecv + serverRecvBS HU.@?= "hello world" + --shut down + grpcServerShutdownAndNotify server cq (tag 0) + pluckDeadline'''' <- secondsToDeadline 10 + shutdownEvent <- grpcCompletionQueuePluck cq (tag 0) pluckDeadline'''' + reserved + assertCqEventComplete shutdownEvent + grpcServerCancelAllCalls server + grpcServerDestroy server + grpcCompletionQueueShutdown cq + grpcCompletionQueueDestroy cq + free serverCallPtr + +-- | Straightforward translation of the gRPC core test end2end/tests/payload.c +-- This is intended to test the low-level C bindings, so we use only a few +-- minimal abstractions on top of it. +testPayload :: TestTree +testPayload = testCase "low-level C bindings request/response " $ do + grpcInit + withAsync testPayloadServer $ \a1 -> do + withAsync testPayloadClient $ \a2 -> do + () <- wait a1 + wait a2 + grpcShutdown + putStrLn "Done." + +unitTests :: TestTree +unitTests = testGroup "Unit tests" + [testPayload, + roundtripSlice "Hello, world!", + roundtripByteBuffer "Hwaet! We gardena in geardagum...", + testMetadata, + testNow] + main :: IO () -main = return () +main = defaultMain unitTests