Finish up bindings to most core gRPC functions (#1)

* grpc_server_request_call

* basic slice functionality

* rename function to emphasize side effects

* add docs

* ByteBuffer function bindings

* replace unsafeCoerce with more specific function, add docs, tests.

* add newtypes for Tag and Reserved void pointers

* manually fix request_registered_call binding

* use nocode keyword to fix Ptr () problems

* decouple copying Slice from freeing slice

* Add time ops

* remove nocode decls

* Start Op module, fix c2hs preprocessing order

* metadata manipulation operations

* metadata free function, test

* helper functions for constructing ops of each type

* bindings for op creation functions

* finish up Op creation functions, implement Op destruction, add docs.

* tweak documentation

* rework Op creation functions to work with an array of ops, for ease of use with grpc_call_start_batch

* forgot to change return types

* wrap hook lines, fix types to op creation functions

* implement part of the payload test

* hideous, but working, end to end test

* bindings for connectivity state checks, split test into two threads

* various cleanup

* rename Core to Unsafe for emphasis, clean up tests more

* add requested comment

* remove slice_unref binding, use sliceFree when converting buffer to bytestring
This commit is contained in:
Connor Clark 2016-05-13 09:12:37 -07:00
parent 8fc5ff2c18
commit 90a527f4e0
13 changed files with 1230 additions and 107 deletions

View File

@ -1,7 +1,10 @@
#include <grpc/grpc.h>
#include <grpc/byte_buffer.h>
#include <grpc/byte_buffer_reader.h>
#include <grpc/impl/codegen/grpc_types.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <grpc_haskell.h>
grpc_event *grpc_completion_queue_next_(grpc_completion_queue *cq,
@ -30,3 +33,293 @@ grpc_call *grpc_channel_create_call_(grpc_channel *channel,
completion_queue, method, host,
*deadline, reserved);
}
size_t gpr_slice_length_(gpr_slice *slice){
return GPR_SLICE_LENGTH(*slice);
}
uint8_t *gpr_slice_start_(gpr_slice *slice){
return GPR_SLICE_START_PTR(*slice);
}
gpr_slice* gpr_slice_from_copied_string_(const char *source){
gpr_slice* retval = malloc(sizeof(gpr_slice));
//note: 'gpr_slice_from_copied_string' handles allocating space for 'source'.
*retval = gpr_slice_from_copied_string(source);
return retval;
}
void free_slice(gpr_slice *slice){
gpr_slice_unref(*slice);
free(slice);
}
grpc_byte_buffer **create_receiving_byte_buffer(){
grpc_byte_buffer **retval = malloc(sizeof(grpc_byte_buffer*));
*retval = NULL;
return retval;
}
void destroy_receiving_byte_buffer(grpc_byte_buffer **bb){
grpc_byte_buffer_destroy(*bb);
free(bb);
}
grpc_byte_buffer_reader *byte_buffer_reader_create(grpc_byte_buffer *buffer){
grpc_byte_buffer_reader *reader = malloc(sizeof(grpc_byte_buffer_reader));
grpc_byte_buffer_reader_init(reader, buffer);
return reader;
}
void byte_buffer_reader_destroy(grpc_byte_buffer_reader *reader){
grpc_byte_buffer_reader_destroy(reader);
free(reader);
}
gpr_slice *grpc_byte_buffer_reader_readall_(grpc_byte_buffer_reader *reader){
gpr_slice *retval = malloc(sizeof(gpr_slice));
*retval = grpc_byte_buffer_reader_readall(reader);
return retval;
}
void timespec_destroy(gpr_timespec* t){
free(t);
}
gpr_timespec* gpr_inf_future_(gpr_clock_type t){
gpr_timespec *retval = malloc(sizeof(gpr_timespec));
*retval = gpr_inf_future(t);
return retval;
}
gpr_timespec* gpr_now_(gpr_clock_type t){
gpr_timespec *retval = malloc(sizeof(gpr_timespec));
*retval = gpr_now(t);
return retval;
}
int32_t gpr_time_to_millis_(gpr_timespec* t){
return gpr_time_to_millis(*t);
}
gpr_timespec* seconds_to_deadline(int64_t seconds){
gpr_timespec *retval = malloc(sizeof(gpr_timespec));
*retval = gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC),
gpr_time_from_millis(seconds * 1e3, GPR_TIMESPAN));
return retval;
}
gpr_timespec* millis_to_deadline(int64_t millis){
gpr_timespec *retval = malloc(sizeof(gpr_timespec));
*retval = gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC),
gpr_time_from_micros(millis * 1e3, GPR_TIMESPAN));
return retval;
}
grpc_metadata_array** metadata_array_create(){
grpc_metadata_array **retval = malloc(sizeof(grpc_metadata_array*));
*retval = malloc(sizeof(grpc_metadata_array));
grpc_metadata_array_init(*retval);
return retval;
}
void metadata_array_destroy(grpc_metadata_array **arr){
grpc_metadata_array_destroy(*arr);
free(*arr);
free(arr);
}
grpc_metadata* metadata_alloc(size_t n){
grpc_metadata *retval = malloc(sizeof(grpc_metadata)*n);
return retval;
}
void metadata_free(grpc_metadata* m){
free(m);
}
void set_metadata_key_val(char *key, char *val, grpc_metadata *arr, size_t i){
grpc_metadata *p = arr + i;
p->key = key;
p->value = val;
p->value_length = strlen(val);
}
const char* get_metadata_key(grpc_metadata *arr, size_t i){
grpc_metadata *p = arr + i;
return p->key;
}
const char* get_metadata_val(grpc_metadata *arr, size_t i){
grpc_metadata *p = arr + i;
return p->value;
}
grpc_op* op_array_create(size_t n){
return malloc(n*sizeof(grpc_op));
}
void op_array_destroy(grpc_op* op_array, size_t n){
for(int i = 0; i < n; i++){
grpc_op* op = op_array + i;
switch (op->op) {
case GRPC_OP_SEND_INITIAL_METADATA:
if(op->data.send_initial_metadata.count > 0){
metadata_free(op->data.send_initial_metadata.metadata);
}
break;
case GRPC_OP_SEND_MESSAGE:
grpc_byte_buffer_destroy(op->data.send_message);
break;
case GRPC_OP_SEND_CLOSE_FROM_CLIENT:
break;
case GRPC_OP_SEND_STATUS_FROM_SERVER:
free(op->data.send_status_from_server.trailing_metadata);
free(op->data.send_status_from_server.status_details);
break;
case GRPC_OP_RECV_INITIAL_METADATA:
break;
case GRPC_OP_RECV_MESSAGE:
break;
case GRPC_OP_RECV_STATUS_ON_CLIENT:
break;
case GRPC_OP_RECV_CLOSE_ON_SERVER:
break;
}
}
free(op_array);
}
void op_send_initial_metadata(grpc_op *op_array, size_t i,
grpc_metadata *arr, size_t n_metadata){
grpc_op *op = op_array + i;
op->op = GRPC_OP_SEND_INITIAL_METADATA;
op->data.send_initial_metadata.count = n_metadata;
op->data.send_initial_metadata.metadata
= malloc(n_metadata*sizeof(grpc_metadata));
memcpy(op->data.send_initial_metadata.metadata, arr,
n_metadata*sizeof(grpc_metadata));
op->flags = 0;
op->reserved = NULL;
}
void op_send_initial_metadata_empty(grpc_op *op_array, size_t i){
grpc_op *op = op_array + i;
op->op = GRPC_OP_SEND_INITIAL_METADATA;
op->data.send_initial_metadata.count = 0;
op->flags = 0;
op->reserved = NULL;
}
void op_send_message(grpc_op *op_array, size_t i,
grpc_byte_buffer *payload){
grpc_op *op = op_array + i;
op->op = GRPC_OP_SEND_MESSAGE;
op->data.send_message = grpc_byte_buffer_copy(payload);
op->flags = 0;
op->reserved = NULL;
}
void op_send_close_client(grpc_op *op_array, size_t i){
grpc_op *op = op_array + i;
op->op = GRPC_OP_SEND_CLOSE_FROM_CLIENT;
op->flags = 0;
op->reserved = NULL;
}
void op_recv_initial_metadata(grpc_op *op_array, size_t i,
grpc_metadata_array** arr){
grpc_op *op = op_array + i;
op->op = GRPC_OP_RECV_INITIAL_METADATA;
op->data.recv_initial_metadata = *arr;
op->flags = 0;
op->reserved = NULL;
}
void op_recv_message(grpc_op *op_array, size_t i,
grpc_byte_buffer **payload_recv){
grpc_op *op = op_array + i;
op->op = GRPC_OP_RECV_MESSAGE;
op->data.recv_message = payload_recv;
op->flags = 0;
op->reserved = NULL;
}
void op_recv_status_client(grpc_op *op_array, size_t i,
grpc_metadata_array** arr,
grpc_status_code* status,
char **details, size_t* details_capacity){
grpc_op *op = op_array + i;
op->op = GRPC_OP_RECV_STATUS_ON_CLIENT;
op->data.recv_status_on_client.trailing_metadata = *arr;
op->data.recv_status_on_client.status = status;
op->data.recv_status_on_client.status_details = details;
op->data.recv_status_on_client.status_details_capacity = details_capacity;
op->flags = 0;
op->reserved = NULL;
}
void op_recv_close_server(grpc_op *op_array, size_t i, int *was_cancelled){
grpc_op *op = op_array + i;
op->op = GRPC_OP_RECV_CLOSE_ON_SERVER;
op->data.recv_close_on_server.cancelled = was_cancelled;
op->flags = 0;
op->reserved = NULL;
}
void op_send_status_server(grpc_op *op_array, size_t i,
size_t metadata_count, grpc_metadata* m,
grpc_status_code status, char *details){
grpc_op *op = op_array + i;
op->op = GRPC_OP_SEND_STATUS_FROM_SERVER;
op->data.send_status_from_server.trailing_metadata_count = metadata_count;
op->data.send_status_from_server.trailing_metadata
= malloc(sizeof(grpc_metadata)*metadata_count);
memcpy(op->data.send_status_from_server.trailing_metadata, m,
metadata_count*sizeof(grpc_metadata));
op->data.send_status_from_server.status = status;
op->data.send_status_from_server.status_details
= malloc(sizeof(char)*(strlen(details) + 1));
strcpy(op->data.send_status_from_server.status_details, details);
op->flags = 0;
op->reserved = NULL;
}
void op_send_ok_status_server(grpc_op *op_array, size_t i){
grpc_op *op = op_array + i;
op->op = GRPC_OP_SEND_STATUS_FROM_SERVER;
op->data.send_status_from_server.trailing_metadata_count = 0;
op->data.send_status_from_server.status = GRPC_STATUS_OK;
op->data.send_status_from_server.status_details = "OK";
op->flags = 0;
op->reserved = NULL;
}
grpc_status_code* create_status_code_ptr(){
return malloc(sizeof(grpc_status_code));
}
void destroy_status_code_ptr(grpc_status_code* p){
free(p);
}
grpc_call_details* create_call_details(){
grpc_call_details* retval = malloc(sizeof(grpc_call_details));
grpc_call_details_init(retval);
return retval;
}
void destroy_call_details(grpc_call_details* cd){
grpc_call_details_destroy(cd);
free(cd);
}
void grpc_channel_watch_connectivity_state_(grpc_channel *channel,
grpc_connectivity_state
last_observed_state,
gpr_timespec* deadline,
grpc_completion_queue *cq,
void *tag){
grpc_channel_watch_connectivity_state(channel, last_observed_state, *deadline,
cq, tag);
}

View File

@ -15,14 +15,20 @@ extra-source-files: cbits, include
library
build-depends:
base
, clock
base ==4.8.*
, clock ==0.6.*
, bytestring ==0.10.*
c-sources:
cbits/grpc_haskell.c
exposed-modules:
Network.GRPC.Core
Network.GRPC.Core.Constants
Network.GRPC.Core.Time
-- NOTE: the order of these matters to c2hs.
Network.GRPC.Unsafe.Constants
Network.GRPC.Unsafe.Time
Network.GRPC.Unsafe.Slice
Network.GRPC.Unsafe.ByteBuffer
Network.GRPC.Unsafe.Metadata
Network.GRPC.Unsafe.Op
Network.GRPC.Unsafe
extra-libraries:
grpc
includes:
@ -30,6 +36,9 @@ library
, grpc/grpc.h
, grpc/status.h
, grpc/support/time.h
, grpc/impl/codegen/compression_types.h
, grpc/impl/codegen/slice_buffer.h
, grpc/impl/codegen/slice.h
build-tools: c2hs
default-language: Haskell2010
ghc-options: -Wall -fwarn-incomplete-patterns
@ -38,11 +47,16 @@ library
test-suite test
build-depends:
base
, QuickCheck
base ==4.8.*
, grpc-haskell
, bytestring ==0.10.*
, unix
, time
, async
, tasty >= 0.11 && <0.12
, tasty-hunit >= 0.9 && <0.10
default-language: Haskell2010
ghc-options: -Wall -fwarn-incomplete-patterns -O2 -threaded -rtsopts
ghc-options: -Wall -fwarn-incomplete-patterns -g -threaded
hs-source-dirs: tests
main-is: Properties.hs
type: exitcode-stdio-1.0

View File

@ -1,4 +1,11 @@
#ifndef GRPC_HASKELL
#define GRPC_HASKELL
#include <grpc/grpc.h>
#include <grpc/impl/codegen/slice.h>
#include <grpc/impl/codegen/time.h>
#include <grpc/byte_buffer.h>
#include <grpc/byte_buffer_reader.h>
grpc_event *grpc_completion_queue_next_(grpc_completion_queue *cq,
gpr_timespec *deadline,
@ -14,3 +21,95 @@ grpc_call *grpc_channel_create_call_(grpc_channel *channel,
grpc_completion_queue *completion_queue,
const char *method, const char *host,
gpr_timespec *deadline, void *reserved);
size_t gpr_slice_length_(gpr_slice *slice);
uint8_t *gpr_slice_start_(gpr_slice *slice);
gpr_slice* gpr_slice_from_copied_string_(const char *source);
void free_slice(gpr_slice *slice);
grpc_byte_buffer **create_receiving_byte_buffer();
void destroy_receiving_byte_buffer(grpc_byte_buffer **bb);
grpc_byte_buffer_reader *byte_buffer_reader_create(grpc_byte_buffer *buffer);
void byte_buffer_reader_destroy(grpc_byte_buffer_reader *reader);
gpr_slice* grpc_byte_buffer_reader_readall_(grpc_byte_buffer_reader *reader);
void timespec_destroy(gpr_timespec* t);
gpr_timespec* gpr_inf_future_(gpr_clock_type t);
gpr_timespec* gpr_now_(gpr_clock_type t);
int32_t gpr_time_to_millis_(gpr_timespec* t);
gpr_timespec* seconds_to_deadline(int64_t seconds);
gpr_timespec* millis_to_deadline(int64_t millis);
grpc_metadata_array** metadata_array_create();
void metadata_array_destroy(grpc_metadata_array **arr);
grpc_metadata* metadata_alloc(size_t n);
void metadata_free(grpc_metadata* m);
void set_metadata_key_val(char *key, char *val, grpc_metadata *arr, size_t i);
const char* get_metadata_key(grpc_metadata *arr, size_t i);
const char* get_metadata_val(grpc_metadata *arr, size_t i);
grpc_op* op_array_create(size_t n);
void op_array_destroy(grpc_op* op_array, size_t n);
void op_send_initial_metadata(grpc_op *op_array, size_t i,
grpc_metadata *arr, size_t n_metadata);
void op_send_initial_metadata_empty(grpc_op *op_array, size_t i);
void op_send_message(grpc_op *op_array, size_t i,
grpc_byte_buffer *payload);
void op_send_close_client(grpc_op *op_array, size_t i);
void op_recv_initial_metadata(grpc_op *op_array, size_t i,
grpc_metadata_array** arr);
void op_recv_message(grpc_op *op_array, size_t i,
grpc_byte_buffer **payload_recv);
void op_recv_status_client(grpc_op *op_array, size_t i,
grpc_metadata_array** arr,
grpc_status_code* status,
char **details, size_t* details_capacity);
void op_recv_close_server(grpc_op *op_array, size_t i, int *was_cancelled);
void op_send_status_server(grpc_op *op_array, size_t i,
size_t metadata_count, grpc_metadata* m,
grpc_status_code status, char *details);
grpc_status_code* create_status_code_ptr();
void destroy_status_code_ptr(grpc_status_code* p);
grpc_call_details* create_call_details();
void destroy_call_details(grpc_call_details* cd);
void grpc_channel_watch_connectivity_state_(grpc_channel *channel,
grpc_connectivity_state
last_observed_state,
gpr_timespec* deadline,
grpc_completion_queue *cq,
void *tag);
#endif //GRPC_HASKELL

View File

@ -1,73 +0,0 @@
module Network.GRPC.Core where
-- TODO Remove wrapped function once once https://github.com/haskell/c2hs/issues/117 gets in
import Foreign.C.String
import Foreign.C.Types
import Foreign.Ptr
import Foreign.Storable
import Foreign.Marshal.Utils
import Network.GRPC.Core.Time
import Network.GRPC.Core.Constants
#include <grpc/grpc.h>
#include <grpc/status.h>
#include <grpc_haskell.h>
{#context prefix = "grpc" #}
{#pointer *gpr_timespec as CTimeSpecPtr -> CTimeSpec #}
{#enum grpc_status_code as StatusCode {underscoreToCase} deriving (Eq)#}
{#pointer *grpc_completion_queue as CompletionQueue newtype #}
{#pointer *grpc_channel as Channel newtype #}
{#pointer *grpc_server as Server newtype #}
{#pointer *grpc_call as Call newtype #}
{#pointer *grpc_call_details as CallDetails newtype #}
{#pointer *grpc_metadata_array as MetadataArray newtype #}
-- {#enum grpc_arg_type as ArgType {underscoreToCase} deriving (Eq)#}
newtype ChannelArgs = ChannelArgs [Arg]
-- TODO Storable ChannelArgs
{#pointer *grpc_channel_args as ChannelArgsPtr -> ChannelArgs #}
data Arg = Arg { argKey :: String, argValue :: ArgValue }
data ArgValue = ArgString String | ArgInt Int
{#enum grpc_call_error as CallError {underscoreToCase} deriving (Eq)#}
{#pointer *grpc_byte_buffer as ByteBuffer newtype #}
{#pointer *grpc_byte_buffer_reader as ByteBufferReader newtype #}
{#enum grpc_completion_type as CompletionType {underscoreToCase} deriving (Eq)#}
{#pointer *grpc_event as Event newtype #}
{#enum grpc_op_type as OpType {underscoreToCase} deriving (Eq)#}
{#pointer *grpc_op as Op newtype #}
{#fun grpc_init as ^ {} -> `()'#}
{#fun grpc_shutdown as ^ {} -> `()'#}
{#fun grpc_completion_queue_create as ^ {`Ptr ()'} -> `CompletionQueue'#}
{#fun grpc_completion_queue_next_ as ^ {`CompletionQueue', `CTimeSpecPtr', `Ptr ()'} -> `Event'#}
{#fun grpc_completion_queue_pluck_ as ^ {`CompletionQueue', `Ptr ()', `CTimeSpecPtr', `Ptr ()'} -> `Event'#}
{#fun grpc_completion_queue_shutdown as ^ {`CompletionQueue'} -> `()'#}
{#fun grpc_completion_queue_destroy as ^ {`CompletionQueue'} -> `()'#}
{#fun grpc_channel_create_call_ as ^ {`Channel', `Call', fromIntegral `PropagationMask', `CompletionQueue', `String', `String', `CTimeSpecPtr', `Ptr ()'} -> `Call'#}
{#fun grpc_insecure_channel_create as ^ {`String', `ChannelArgsPtr', `Ptr ()'} -> `Channel'#}
{#fun grpc_channel_destroy as ^ {`Channel'} -> `()'#}
{#fun grpc_call_start_batch as ^ {`Call', `Op', `Int', `Ptr ()', `Ptr ()'} -> `CallError'#}
{#fun grpc_call_cancel as ^ {`Call', `Ptr ()'} -> `()'#}
{#fun grpc_call_cancel_with_status as ^ {`Call', `StatusCode', `String', `Ptr ()'} -> `()'#}
{#fun grpc_call_destroy as ^ {`Call'} -> `()'#}
-- Server stuff
--{#fun grpc_server_request_call as ^ {`Server',with* `Call' peek*, `CallDetails', `MetadataArray', `CompletionQueue', `CompletionQueue', `Ptr ()'} -> `CallError'#}

View File

@ -1,24 +0,0 @@
module Network.GRPC.Core.Time where
import Control.Applicative
import Control.Monad
import Foreign.C.Types
import Foreign.Storable
import System.Clock
#include <grpc/support/time.h>
{#context prefix = "grp" #}
newtype CTimeSpec = CTimeSpec { timeSpec :: TimeSpec }
instance Storable CTimeSpec where
sizeOf _ = {#sizeof gpr_timespec #}
alignment _ = {#alignof gpr_timespec #}
peek p = fmap CTimeSpec $ TimeSpec
<$> liftM fromIntegral ({#get gpr_timespec->tv_sec #} p)
<*> liftM fromIntegral ({#get gpr_timespec->tv_nsec #} p)
poke p x = do
{#set gpr_timespec.tv_sec #} p (fromIntegral $ sec $ timeSpec x)
{#set gpr_timespec.tv_nsec #} p (fromIntegral $ nsec $ timeSpec x)

234
src/Network/GRPC/Unsafe.chs Normal file
View File

@ -0,0 +1,234 @@
module Network.GRPC.Unsafe where
import Control.Monad
import Foreign.C.Types
import Foreign.Ptr
import Foreign.Storable
{#import Network.GRPC.Unsafe.Time#}
import Network.GRPC.Unsafe.Constants
{#import Network.GRPC.Unsafe.ByteBuffer#}
{#import Network.GRPC.Unsafe.Op#}
{#import Network.GRPC.Unsafe.Metadata#}
#include <grpc/grpc.h>
#include <grpc/status.h>
#include <grpc_haskell.h>
{#context prefix = "grpc" #}
{#pointer *grpc_completion_queue as CompletionQueue newtype #}
-- | Represents a connection to a server. Created on the client side.
{#pointer *grpc_channel as Channel newtype #}
-- | Represents a server. Created on the server side.
{#pointer *grpc_server as Server newtype #}
-- | Represents a pointer to a call. To users of the gRPC core library, this
-- type is abstract; we have no access to its fields.
{#pointer *grpc_call as Call newtype #}
{#pointer *grpc_call_details as CallDetails newtype #}
{#fun create_call_details as ^ {} -> `CallDetails'#}
{#fun destroy_call_details as ^ {`CallDetails'} -> `()'#}
-- instance def adapted from
-- https://mail.haskell.org/pipermail/c2hs/2007-June/000800.html
instance Storable Call where
sizeOf (Call r) = sizeOf r
alignment (Call r) = alignment r
peek p = fmap Call (peek (castPtr p))
poke p (Call r) = poke (castPtr p) r
-- {#enum grpc_arg_type as ArgType {underscoreToCase} deriving (Eq)#}
newtype ChannelArgs = ChannelArgs [Arg]
-- TODO Storable ChannelArgs
{#pointer *grpc_channel_args as ChannelArgsPtr -> ChannelArgs #}
data Arg = Arg { argKey :: String, argValue :: ArgValue }
data ArgValue = ArgString String | ArgInt Int
-- | A 'Tag' is an identifier that is used with a 'CompletionQueue' to signal
-- that the corresponding operation has completed.
newtype Tag = Tag {unTag :: Ptr ()} deriving (Show, Eq)
tag :: Int -> Tag
tag = Tag . plusPtr nullPtr
instance Storable Tag where
sizeOf (Tag p) = sizeOf p
alignment (Tag p) = alignment p
peek p = fmap Tag (peek (castPtr p))
poke p (Tag r) = poke (castPtr p) r
-- | 'Reserved' is an as-yet unused void pointer param to several gRPC
-- functions. Create one with 'reserved'.
newtype Reserved = Reserved {unReserved :: Ptr ()}
reserved :: Reserved
reserved = Reserved nullPtr
{#enum grpc_call_error as CallError {underscoreToCase} deriving (Show, Eq)#}
-- | Represents the type of a completion event on a 'CompletionQueue'.
-- 'QueueShutdown' only occurs if the queue is shutting down (e.g., when the
-- server is stopping). 'QueueTimeout' occurs when we reached the deadline
-- before receiving an 'OpComplete'.
{#enum grpc_completion_type as CompletionType {underscoreToCase}
deriving (Show, Eq)#}
-- | Represents one event received over a 'CompletionQueue'.
data Event = Event {eventCompletionType :: CompletionType,
eventSuccess :: Bool,
eventTag :: Tag}
deriving (Show, Eq)
instance Storable Event where
sizeOf _ = {#sizeof grpc_event#}
alignment _ = {#alignof grpc_event#}
peek p = Event <$> liftM (toEnum . fromIntegral) ({#get grpc_event->type#} p)
<*> liftM (> 0) ({#get grpc_event->success#} p)
<*> liftM Tag ({#get grpc_event->tag#} p)
poke p (Event c s t) = do
{#set grpc_event.type#} p $ fromIntegral $ fromEnum c
{#set grpc_event.success#} p $ if s then 1 else 0
{#set grpc_event.tag#} p (unTag t)
castPeek :: Storable b => Ptr a -> IO b
castPeek p = peek (castPtr p)
{#enum grpc_connectivity_state as ConnectivityState {underscoreToCase}
deriving (Show, Eq)#}
{#fun grpc_init as ^ {} -> `()'#}
{#fun grpc_shutdown as ^ {} -> `()'#}
{#fun grpc_version_string as ^ {} -> `String' #}
-- | Create a new 'CompletionQueue'. See the docs for
-- 'grpcCompletionQueueShutdown' for instructions on how to clean up afterwards.
{#fun grpc_completion_queue_create as ^
{unReserved `Reserved'} -> `CompletionQueue'#}
-- | Block until we get the next event off the given 'CompletionQueue',
-- using the given 'CTimeSpecPtr' as a deadline specifying the max amount of
-- time to block.
{#fun grpc_completion_queue_next_ as ^
{`CompletionQueue', `CTimeSpecPtr',unReserved `Reserved'}
-> `Event' castPeek*#}
-- | Block until we get the next event with the given 'Tag' off the given
-- 'CompletionQueue'. NOTE: No more than 'maxCompletionQueuePluckers' can call
-- this function concurrently!
{#fun grpc_completion_queue_pluck_ as ^
{`CompletionQueue',unTag `Tag', `CTimeSpecPtr',unReserved `Reserved'}
-> `Event' castPeek*#}
-- | Stops a completion queue. After all events are drained,
-- 'grpcCompletionQueueNext' will yield 'QueueShutdown' and then it is safe to
-- call 'grpcCompletionQueueDestroy'. After calling this, we must ensure no
-- new work is pushed to the queue.
{#fun grpc_completion_queue_shutdown as ^ {`CompletionQueue'} -> `()'#}
-- | Destroys a 'CompletionQueue'. See 'grpcCompletionQueueShutdown' for how to
-- use safely. Caller must ensure no threads are calling
-- 'grpcCompletionQueueNext'.
{#fun grpc_completion_queue_destroy as ^ {`CompletionQueue'} -> `()'#}
-- | Sets up a call on the client. The first string is the endpoint name (e.g.
-- @"/foo"@) and the second is the host name. In my tests so far, the host name
-- here doesn't seem to be used... it looks like the host and port specified in
-- 'grpcInsecureChannelCreate' is the one that is actually used.
{#fun grpc_channel_create_call_ as ^
{`Channel', `Call', fromIntegral `PropagationMask', `CompletionQueue',
`String', `String', `CTimeSpecPtr',unReserved `Reserved'}
-> `Call'#}
-- | Create a channel (on the client) to the server. The first argument is
-- host and port, e.g. @"localhost:50051"@. The gRPC docs say that most clients
-- are expected to pass a 'nullPtr' for the 'ChannelArgsPtr'. We currently don't
-- expose any functions for creating channel args, since they are entirely
-- undocumented.
{#fun grpc_insecure_channel_create as ^
{`String', `ChannelArgsPtr',unReserved `Reserved'} -> `Channel'#}
-- | get the current connectivity state of the given channel. The 'Bool' is
-- True if we should try to connect the channel.
{#fun grpc_channel_check_connectivity_state as ^
{`Channel', `Bool'} -> `ConnectivityState'#}
-- | When the current connectivity state changes from the given
-- 'ConnectivityState', enqueues a success=1 tag on the given 'CompletionQueue'.
-- If the deadline is reached, enqueues a tag with success=0.
{#fun grpc_channel_watch_connectivity_state_ as ^
{`Channel', `ConnectivityState', `CTimeSpecPtr', `CompletionQueue',
unTag `Tag'}
-> `()'#}
{#fun grpc_channel_ping as ^
{`Channel', `CompletionQueue', unTag `Tag',unReserved `Reserved'} -> `()' #}
{#fun grpc_channel_destroy as ^ {`Channel'} -> `()'#}
-- | Starts executing a batch of ops in the given 'OpArray'. Does not block.
-- When complete, an event identified by the given 'Tag'
-- will be pushed onto the 'CompletionQueue' that was associated with the given
-- 'Call' when the 'Call' was created.
{#fun grpc_call_start_batch as ^
{`Call', `OpArray', `Int', unTag `Tag',unReserved `Reserved'} -> `CallError'#}
{#fun grpc_call_cancel as ^ {`Call',unReserved `Reserved'} -> `()'#}
{#fun grpc_call_cancel_with_status as ^
{`Call', `StatusCode', `String',unReserved `Reserved'} -> `()'#}
{#fun grpc_call_destroy as ^ {`Call'} -> `()'#}
-- Server stuff
{#fun grpc_server_create as ^
{`ChannelArgsPtr',unReserved `Reserved'} -> `Server'#}
{#fun grpc_server_register_completion_queue as ^
{`Server', `CompletionQueue', unReserved `Reserved'} -> `()'#}
{#fun grpc_server_add_insecure_http2_port as ^
{`Server', `String'} -> `Int'#}
-- | Starts a server. To shut down the server, call these in order:
-- 'grpcServerShutdownAndNotify', 'grpcServerCancelAllCalls',
-- 'grpcServerDestroy'. After these are done, shut down and destroy the server's
-- completion queue with 'grpcCompletionQueueShutdown' followed by
-- 'grpcCompletionQueueDestroy'.
{#fun grpc_server_start as ^ {`Server'} -> `()'#}
{#fun grpc_server_shutdown_and_notify as ^
{`Server', `CompletionQueue',unTag `Tag'} -> `()'#}
{#fun grpc_server_cancel_all_calls as ^
{`Server'} -> `()'#}
-- | Destroy the server. See 'grpcServerStart' for complete shutdown
-- instructions.
{#fun grpc_server_destroy as ^ {`Server'} -> `()'#}
-- | Request a call.
-- NOTE: You need to call 'grpcCompletionQueueNext' or
-- 'grpcCompletionQueuePluck' on the completion queue with the given
-- 'Tag' before using the 'Call' pointer again.
{#fun grpc_server_request_call as ^
{`Server',id `Ptr Call', `CallDetails', `MetadataArray',
`CompletionQueue', `CompletionQueue',unTag `Tag'}
-> `CallError'#}
-- | TODO: I am not yet sure how this function is supposed to be used.
{#fun grpc_server_request_registered_call as ^
{`Server',unTag `Tag',id `Ptr Call', `CTimeSpecPtr', `MetadataArray' id,
id `Ptr ByteBuffer', `CompletionQueue', `CompletionQueue',unTag `Tag'}
-> `CallError'#}

View File

@ -0,0 +1,85 @@
module Network.GRPC.Unsafe.ByteBuffer where
#include <grpc/grpc.h>
#include <grpc/impl/codegen/slice.h>
#include <grpc/impl/codegen/compression_types.h>
#include <grpc/impl/codegen/slice_buffer.h>
#include <grpc_haskell.h>
{#import Network.GRPC.Unsafe.Slice#}
import Control.Exception (bracket)
import qualified Data.ByteString as B
import Foreign.Ptr
import Foreign.C.Types
import Foreign.Storable
{#enum grpc_compression_algorithm as GRPCCompressionAlgorithm
{underscoreToCase} deriving (Eq) #}
-- | Represents a pointer to a gRPC byte buffer containing 1 or more 'Slice's.
-- Must be destroyed manually with 'grpcByteBufferDestroy'.
{#pointer *grpc_byte_buffer as ByteBuffer newtype #}
--Trivial Storable instance because 'ByteBuffer' type is a pointer.
instance Storable ByteBuffer where
sizeOf (ByteBuffer r) = sizeOf r
alignment (ByteBuffer r) = alignment r
peek p = fmap ByteBuffer (peek (castPtr p))
poke p (ByteBuffer r) = poke (castPtr p) r
--TODO: When I switched this to a ForeignPtr with a finalizer, I got errors
--about freeing un-malloced memory. Calling the same destroy function by hand
--works fine in the same code, though. Until I find a workaround, going to free
--everything by hand.
-- | Represents a pointer to a ByteBufferReader. Must be destroyed manually with
-- 'byteBufferReaderDestroy'.
{#pointer *grpc_byte_buffer_reader as ByteBufferReader newtype #}
-- | Creates a pointer to a 'ByteBuffer'. This is used to receive data when
-- creating a GRPC_OP_RECV_MESSAGE op.
{#fun create_receiving_byte_buffer as ^ {} -> `Ptr ByteBuffer' id#}
{#fun destroy_receiving_byte_buffer as ^ {id `Ptr ByteBuffer'} -> `()'#}
withByteBufferPtr :: (Ptr ByteBuffer -> IO a) -> IO a
withByteBufferPtr
= bracket createReceivingByteBuffer destroyReceivingByteBuffer
-- | Takes an array of slices and the length of the array and returns a
-- 'ByteBuffer'.
{#fun grpc_raw_byte_buffer_create as ^ {`Slice', `CULong'} -> `ByteBuffer'#}
{#fun grpc_raw_compressed_byte_buffer_create as ^
{`Slice', `CULong', `GRPCCompressionAlgorithm'} -> `ByteBuffer'#}
{#fun grpc_byte_buffer_copy as ^ {`ByteBuffer'} -> `ByteBuffer'#}
{#fun grpc_byte_buffer_length as ^ {`ByteBuffer'} -> `CULong'#}
{#fun grpc_byte_buffer_destroy as ^ {`ByteBuffer'} -> `()'#}
{#fun byte_buffer_reader_create as ^ {`ByteBuffer'} -> `ByteBufferReader'#}
{#fun byte_buffer_reader_destroy as ^ {`ByteBufferReader'} -> `()'#}
{#fun grpc_byte_buffer_reader_next as ^
{`ByteBufferReader', `Slice'} -> `CInt'#}
-- | Returns a 'Slice' containing the entire contents of the 'ByteBuffer' being
-- read by the given 'ByteBufferReader'.
{#fun grpc_byte_buffer_reader_readall_ as ^ {`ByteBufferReader'} -> `Slice'#}
{#fun grpc_raw_byte_buffer_from_reader as ^
{`ByteBufferReader'} -> `ByteBuffer'#}
withByteStringAsByteBuffer :: B.ByteString -> (ByteBuffer -> IO a) -> IO a
withByteStringAsByteBuffer bs f = do
bracket (byteStringToSlice bs) freeSlice $ \slice -> do
bracket (grpcRawByteBufferCreate slice 1) grpcByteBufferDestroy f
copyByteBufferToByteString :: ByteBuffer -> IO B.ByteString
copyByteBufferToByteString bb = do
bracket (byteBufferReaderCreate bb) byteBufferReaderDestroy $ \bbr -> do
bracket (grpcByteBufferReaderReadall bbr) freeSlice sliceToByteString

View File

@ -1,6 +1,6 @@
{-# LANGUAGE GeneralizedNewtypeDeriving #-}
module Network.GRPC.Core.Constants where
module Network.GRPC.Unsafe.Constants where
#include "grpc/grpc.h"
#include "grpc/impl/codegen/propagation_bits.h"
@ -20,6 +20,9 @@ writeBufferHint = #const GRPC_WRITE_BUFFER_HINT
writeNoCompress :: Int
writeNoCompress = #const GRPC_WRITE_NO_COMPRESS
maxCompletionQueuePluckers :: Int
maxCompletionQueuePluckers = #const GRPC_MAX_COMPLETION_QUEUE_PLUCKERS
newtype PropagationMask = PropagationMask {unPropagationMask :: Int}
deriving (Show, Eq, Ord, Integral, Enum, Real, Num)
@ -37,3 +40,6 @@ propagateCensusTracingContext =
propagateCancellation :: PropagationMask
propagateCancellation =
PropagationMask $ #const GRPC_PROPAGATE_CANCELLATION
propagateDefaults :: PropagationMask
propagateDefaults = PropagationMask $ #const GRPC_PROPAGATE_DEFAULTS

View File

@ -0,0 +1,70 @@
module Network.GRPC.Unsafe.Metadata where
import Control.Exception
import Control.Monad
import Data.ByteString (ByteString, useAsCString, packCString)
import Foreign.C.String
import Foreign.Ptr
import Foreign.Storable
#include <grpc/grpc.h>
#include <grpc/status.h>
#include <grpc/impl/codegen/grpc_types.h>
#include <grpc_haskell.h>
-- | Represents a pointer to one or more metadata key/value pairs. This type
-- is intended to be used when sending metadata.
{#pointer *grpc_metadata as MetadataKeyValPtr newtype#}
-- | Represents a pointer to a grpc_metadata_array. Must be destroyed with
-- 'metadataArrayDestroy'. This type is intended for receiving metadata.
-- This can be populated by passing it to e.g. 'grpcServerRequestCall'.
-- TODO: we need a function for getting a 'MetadataKeyValPtr'
-- and length from this type.
{#pointer *grpc_metadata_array as MetadataArray newtype#}
instance Storable MetadataArray where
sizeOf (MetadataArray r) = sizeOf r
alignment (MetadataArray r) = alignment r
peek p = fmap MetadataArray (peek (castPtr p))
poke p (MetadataArray r) = poke (castPtr p) r
-- | Create an empty 'MetadataArray'. Returns a pointer to it so that we can
-- pass it to the appropriate op creation functions.
{#fun metadata_array_create as ^ {} -> `Ptr MetadataArray' id#}
{#fun metadata_array_destroy as ^ {id `Ptr MetadataArray'} -> `()'#}
-- Note: I'm pretty sure we must call out to C to allocate these
-- because they are nested structs.
-- | Allocates space for exactly n metadata key/value pairs.
{#fun metadata_alloc as ^ {`Int'} -> `MetadataKeyValPtr'#}
{#fun metadata_free as ^ {`MetadataKeyValPtr'} -> `()'#}
-- | Sets a metadata key/value pair at the given index in the
-- 'MetadataKeyValPtr'. No error checking is performed to ensure the index is
-- in bounds!
{#fun set_metadata_key_val as setMetadataKeyVal
{useAsCString* `ByteString', useAsCString* `ByteString',
`MetadataKeyValPtr', `Int'} -> `()'#}
{#fun get_metadata_key as getMetadataKey'
{`MetadataKeyValPtr', `Int'} -> `CString'#}
{#fun get_metadata_val as getMetadataVal'
{`MetadataKeyValPtr', `Int'} -> `CString'#}
--TODO: The test suggests this is leaking.
withMetadataArrayPtr :: (Ptr MetadataArray -> IO a) -> IO a
withMetadataArrayPtr = bracket metadataArrayCreate metadataArrayDestroy
withMetadataKeyValPtr :: Int -> (MetadataKeyValPtr -> IO a) -> IO a
withMetadataKeyValPtr i f = bracket (metadataAlloc i) metadataFree f
getMetadataKey :: MetadataKeyValPtr -> Int -> IO ByteString
getMetadataKey m = getMetadataKey' m >=> packCString
getMetadataVal :: MetadataKeyValPtr -> Int -> IO ByteString
getMetadataVal m = getMetadataVal' m >=> packCString

View File

@ -0,0 +1,102 @@
module Network.GRPC.Unsafe.Op where
import Control.Exception
import Control.Monad
import qualified Data.ByteString as B
import Foreign.C.String
import Foreign.C.Types
import Foreign.Ptr
{#import Network.GRPC.Unsafe.ByteBuffer#}
{#import Network.GRPC.Unsafe.Metadata#}
#include <grpc/grpc.h>
#include <grpc/status.h>
#include <grpc/impl/codegen/grpc_types.h>
#include <grpc_haskell.h>
{#enum grpc_op_type as OpType {underscoreToCase} deriving (Eq)#}
{#enum grpc_status_code as StatusCode {underscoreToCase} deriving (Eq)#}
-- NOTE: We don't alloc the space for the enum in Haskell because enum size is
-- implementation-dependent. See:
-- http://stackoverflow.com/questions/1113855/is-the-sizeofenum-sizeofint-always
-- | Allocates space for a 'StatusCode' and returns a pointer to it. Used to
-- receive a status code from the server with 'opRecvStatusClient'.
{#fun create_status_code_ptr as ^ {} -> `Ptr StatusCode' castPtr#}
{#fun destroy_status_code_ptr as ^ {castPtr `Ptr StatusCode'} -> `()' #}
-- | Represents an array of ops to be passed to 'grpcCallStartBatch'.
-- Create an array with 'opArrayCreate', then create individual ops in the array
-- using the op* functions. For these functions, the first two arguments are
-- always the OpArray to mutate and the index in the array at which to create
-- the new op. After processing the batch and getting out any results, call
-- 'opArrayDestroy'.
{#pointer *grpc_op as OpArray newtype #}
-- | Creates an empty 'OpArray' with space for the given number of ops.
{#fun op_array_create as ^ {`Int'} -> `OpArray'#}
-- | Destroys an 'OpArray' of the given size.
{#fun op_array_destroy as ^ {`OpArray', `Int'} -> `()'#}
-- | brackets creating and destroying an 'OpArray' with the given size.
withOpArray :: Int -> (OpArray -> IO a) -> IO a
withOpArray n f = bracket (opArrayCreate n) (flip opArrayDestroy n) f
-- | Creates an op of type GRPC_OP_SEND_INITIAL_METADATA at the specified
-- index of the given 'OpArray', containing the given
-- metadata. The metadata is copied and can be destroyed after calling this
-- function.
{#fun op_send_initial_metadata as ^
{`OpArray', `Int', `MetadataKeyValPtr', `Int'} -> `()'#}
-- | Creates an op of type GRPC_OP_SEND_INITIAL_METADATA at the specified
-- index of the given 'OpArray'. The op will contain no metadata.
{#fun op_send_initial_metadata_empty as ^ {`OpArray', `Int'} -> `()'#}
-- | Creates an op of type GRPC_OP_SEND_MESSAGE at the specified index of
-- the given 'OpArray'. The given 'ByteBuffer' is
-- copied and can be destroyed after calling this function.
{#fun op_send_message as ^ {`OpArray', `Int', `ByteBuffer'} -> `()'#}
-- | Creates an 'Op' of type GRPC_OP_SEND_CLOSE_FROM_CLIENT at the specified
-- index of the given 'OpArray'.
{#fun op_send_close_client as ^ {`OpArray', `Int'} -> `()'#}
-- | Creates an op of type GRPC_OP_RECV_INITIAL_METADATA at the specified
-- index of the given 'OpArray', and ties the given
-- 'MetadataArray' pointer to that op so that the received metadata can be
-- accessed. It is the user's responsibility to destroy the 'MetadataArray'.
{#fun op_recv_initial_metadata as ^
{`OpArray', `Int',id `Ptr MetadataArray'} -> `()'#}
-- | Creates an op of type GRPC_OP_RECV_MESSAGE at the specified index of the
-- given 'OpArray', and ties the given
-- 'ByteBuffer' pointer to that op so that the received message can be
-- accessed. It is the user's responsibility to destroy the 'ByteBuffer'.
{#fun op_recv_message as ^ {`OpArray', `Int',id `Ptr ByteBuffer'} -> `()'#}
-- | Creates an op of type GRPC_OP_RECV_STATUS_ON_CLIENT at the specified
-- index of the given 'OpArray', and ties all the
-- input pointers to that op so that the results of the receive can be
-- accessed. It is the user's responsibility to free all the input args after
-- this call.
{#fun op_recv_status_client as ^
{`OpArray', `Int',id `Ptr MetadataArray', castPtr `Ptr StatusCode',
castPtr `Ptr CString', `Int'}
-> `()'#}
-- | Creates an op of type GRPC_OP_RECV_CLOSE_ON_SERVER at the specified index
-- of the given 'OpArray', and ties the input
-- pointer to that op so that the result of the receive can be accessed. It is
-- the user's responsibility to free the pointer.
{#fun op_recv_close_server as ^ {`OpArray', `Int', id `Ptr CInt'} -> `()'#}
-- | Creates an op of type GRPC_OP_SEND_STATUS_FROM_SERVER at the specified
-- index of the given 'OpArray'. The given
-- Metadata and string are copied when creating the op, and can be safely
-- destroyed immediately after calling this function.
{#fun op_send_status_server as ^
{`OpArray', `Int', `Int', `MetadataKeyValPtr', `StatusCode', `String'}
-> `()'#}

View File

@ -0,0 +1,54 @@
module Network.GRPC.Unsafe.Slice where
#include <grpc/impl/codegen/slice.h>
#include <grpc_haskell.h>
import qualified Data.ByteString as B
import Control.Applicative
import Data.Word
import Foreign.C.String
import Foreign.C.Types
import Foreign.Ptr
import Foreign.Marshal.Alloc
-- | A 'Slice' is gRPC's string type. We can easily convert these to and from
-- ByteStrings. This type is a pointer to a C type.
{#pointer *gpr_slice as Slice newtype #}
-- TODO: we could also represent this type as 'Ptr Slice', by doing this:
-- newtype Slice = Slice {#type gpr_slice#}
-- This would have no practical effect, but it would communicate intent more
-- clearly by emphasizing that the type is indeed a pointer and that the data
-- it is pointing to might change/be destroyed after running IO actions. To make
-- the change, we would just need to change all occurrences of 'Slice' to
-- 'Ptr Slice' and add 'castPtr' in and out marshallers.
-- This seems like the right way to do it, but c2hs doesn't make it easy, so
-- maybe the established idiom is to do what c2hs does.
-- | Get the length of a slice.
{#fun gpr_slice_length_ as ^ {`Slice'} -> `CULong'#}
-- | Returns a pointer to the start of the character array contained by the
-- slice.
{#fun gpr_slice_start_ as ^ {`Slice'} -> `Ptr CChar' castPtr #}
{#fun gpr_slice_from_copied_string_ as ^ {`CString'} -> `Slice'#}
-- | Properly cleans up all memory used by a 'Slice'. Danger: the Slice should
-- not be used after this function is called on it.
{#fun free_slice as ^ {`Slice'} -> `()'#}
-- | Copies a 'Slice' to a ByteString.
-- TODO: there are also non-copying unsafe ByteString construction functions.
-- We could gain some speed by using them.
-- idea would be something :: (ByteString -> Response) -> IO () that handles
-- getting and freeing the slice behind the scenes.
sliceToByteString :: Slice -> IO B.ByteString
sliceToByteString slice = do
len <- fmap fromIntegral $ gprSliceLength slice
str <- gprSliceStart slice
B.packCStringLen (str, len)
-- | Copies a 'ByteString' to a 'Slice'.
byteStringToSlice :: B.ByteString -> IO Slice
byteStringToSlice bs = B.useAsCString bs gprSliceFromCopiedString

View File

@ -0,0 +1,48 @@
module Network.GRPC.Unsafe.Time where
import Control.Applicative
import Control.Monad
import Foreign.C.Types
import Foreign.Storable
import System.Clock
#include <grpc/support/time.h>
#include <grpc_haskell.h>
{#context prefix = "grp" #}
newtype CTimeSpec = CTimeSpec { timeSpec :: TimeSpec }
instance Storable CTimeSpec where
sizeOf _ = {#sizeof gpr_timespec #}
alignment _ = {#alignof gpr_timespec #}
peek p = fmap CTimeSpec $ TimeSpec
<$> liftM fromIntegral ({#get gpr_timespec->tv_sec #} p)
<*> liftM fromIntegral ({#get gpr_timespec->tv_nsec #} p)
poke p x = do
{#set gpr_timespec.tv_sec #} p (fromIntegral $ sec $ timeSpec x)
{#set gpr_timespec.tv_nsec #} p (fromIntegral $ nsec $ timeSpec x)
{#enum gpr_clock_type as ClockType {underscoreToCase} deriving (Eq) #}
-- | A pointer to a CTimeSpec. Must be destroyed manually with
-- 'timespecDestroy'.
{#pointer *gpr_timespec as CTimeSpecPtr -> CTimeSpec #}
{#fun timespec_destroy as ^ {`CTimeSpecPtr'} -> `()'#}
{#fun gpr_inf_future_ as ^ {`ClockType'} -> `CTimeSpecPtr'#}
-- | Get the current time for the given 'ClockType'. Warning: 'GprTimespan' will
-- cause a crash. Probably only need to use GprClockMonotonic, which returns 0.
{#fun gpr_now_ as ^ {`ClockType'} -> `CTimeSpecPtr'#}
{#fun gpr_time_to_millis_ as ^ {`CTimeSpecPtr'} -> `Int'#}
-- | Returns a GprClockMonotonic representing a deadline n seconds in the
-- future.
{#fun seconds_to_deadline as ^ {`Int'} -> `CTimeSpecPtr'#}
-- | Returns a GprClockMonotonic representing a deadline n milliseconds
-- in the future.
{#fun millis_to_deadline as ^ {`Int'} -> `CTimeSpecPtr'#}

View File

@ -1,2 +1,217 @@
{-# LANGUAGE OverloadedStrings #-}
import Control.Concurrent.Async
import Network.GRPC.Unsafe
import Network.GRPC.Unsafe.Slice
import Network.GRPC.Unsafe.ByteBuffer
import Network.GRPC.Unsafe.Time
import Network.GRPC.Unsafe.Metadata
import Network.GRPC.Unsafe.Op
import Network.GRPC.Unsafe.Constants
import qualified Data.ByteString as B
import Data.Time.Clock.POSIX
import GHC.Exts
import Foreign.Marshal.Alloc
import Foreign.Storable
import Foreign.Ptr
import Test.Tasty
import Test.Tasty.HUnit as HU
roundtripSlice :: B.ByteString -> TestTree
roundtripSlice bs = testCase "Slice C bindings roundtrip" $ do
slice <- byteStringToSlice bs
unslice <- sliceToByteString slice
bs HU.@?= unslice
freeSlice slice
roundtripByteBuffer :: B.ByteString -> TestTree
roundtripByteBuffer bs = testCase "ByteBuffer C bindings roundtrip" $ do
slice <- byteStringToSlice bs
buffer <- grpcRawByteBufferCreate slice 1
reader <- byteBufferReaderCreate buffer
readSlice <- grpcByteBufferReaderReadall reader
bs' <- sliceToByteString readSlice
bs' HU.@?= bs
--clean up
freeSlice slice
byteBufferReaderDestroy reader
grpcByteBufferDestroy buffer
freeSlice readSlice
currTimeMillis :: ClockType -> IO Int
currTimeMillis t = do
gprT <- gprNow t
tMillis <- gprTimeToMillis gprT
timespecDestroy gprT
return tMillis
testNow :: TestTree
testNow = testCase "create and destroy various clock types" $ do
_ <- currTimeMillis GprClockMonotonic
_ <- currTimeMillis GprClockRealtime
_ <- currTimeMillis GprClockPrecise
return ()
testMetadata :: TestTree
testMetadata = testCase "metadata setter/getter C bindings roundtrip" $ do
m <- metadataAlloc 3
setMetadataKeyVal "hello" "world" m 0
setMetadataKeyVal "foo" "bar" m 1
setMetadataKeyVal "Haskell" "Curry" m 2
k0 <- getMetadataKey m 0
v0 <- getMetadataVal m 0
k1 <- getMetadataKey m 1
v1 <- getMetadataVal m 1
k2 <- getMetadataKey m 2
v2 <- getMetadataVal m 2
k0 HU.@?= "hello"
v0 HU.@?= "world"
k1 HU.@?= "foo"
v1 HU.@?= "bar"
k2 HU.@?= "Haskell"
v2 HU.@?= "Curry"
metadataFree m
assertCqEventComplete :: Event -> IO ()
assertCqEventComplete e = do
eventCompletionType e HU.@?= OpComplete
eventSuccess e HU.@?= True
testPayloadClient :: IO ()
testPayloadClient = do
client <- grpcInsecureChannelCreate "localhost:50051" nullPtr reserved
cq <- grpcCompletionQueueCreate reserved
withMetadataArrayPtr $ \initialMetadataRecv -> do
withMetadataArrayPtr $ \trailingMetadataRecv -> do
withByteBufferPtr $ \clientRecvBB -> do
deadline <- secondsToDeadline 5
pluckDeadline <- secondsToDeadline 10
clientCall <- grpcChannelCreateCall
client (Call nullPtr) propagateDefaults cq
"/foo" "localhost" deadline reserved
--send request
withOpArray 6 $ \ops -> do
opSendInitialMetadataEmpty ops 0
withByteStringAsByteBuffer "hello world" $ \requestPayload -> do
opSendMessage ops 1 requestPayload
opSendCloseClient ops 2
opRecvInitialMetadata ops 3 initialMetadataRecv
opRecvMessage ops 4 clientRecvBB
statusCodePtr <- createStatusCodePtr
let cstringCapacity = 32
cStringPtr <- malloc
cstring <- mallocBytes cstringCapacity
poke cStringPtr cstring
opRecvStatusClient ops 5 trailingMetadataRecv statusCodePtr
cStringPtr
cstringCapacity
--send client request
requestError <- grpcCallStartBatch clientCall ops 6 (tag 1) reserved
clientRequestCqEvent <- grpcCompletionQueuePluck
cq (tag 1) pluckDeadline reserved
assertCqEventComplete clientRequestCqEvent
requestError HU.@?= CallOk
free cstring
free cStringPtr
destroyStatusCodePtr statusCodePtr
--verify response received
responseRecv <- peek clientRecvBB
responseRecvBS <- copyByteBufferToByteString responseRecv
responseRecvBS HU.@?= "hello you"
grpcCompletionQueueShutdown cq
grpcCallDestroy clientCall
--TODO: the grpc test drains the cq here
grpcCompletionQueueDestroy cq
grpcChannelDestroy client
testPayloadServer :: IO ()
testPayloadServer = do
server <- grpcServerCreate nullPtr reserved
cq <- grpcCompletionQueueCreate reserved
grpcServerRegisterCompletionQueue server cq reserved
_ <- grpcServerAddInsecureHttp2Port server "localhost:50051"
grpcServerStart server
serverCallPtr <- malloc
withMetadataArrayPtr $ \requestMetadataRecv -> do
withByteBufferPtr $ \recvBufferPtr -> do
callDetails <- createCallDetails
requestMetadataRecv' <- peek requestMetadataRecv
recvRequestError <- grpcServerRequestCall
server serverCallPtr callDetails
requestMetadataRecv' cq cq (tag 101)
pluckDeadline' <- secondsToDeadline 10
requestCallCqEvent <- grpcCompletionQueuePluck cq (tag 101)
pluckDeadline'
reserved
assertCqEventComplete requestCallCqEvent
recvRequestError HU.@?= CallOk
destroyCallDetails callDetails
--receive request
withOpArray 2 $ \recvOps -> do
opSendInitialMetadataEmpty recvOps 0
opRecvMessage recvOps 1 recvBufferPtr
serverCall <- peek serverCallPtr
recvBatchError <- grpcCallStartBatch serverCall recvOps 2
(tag 102) reserved
recvBatchError HU.@?= CallOk
pluckDeadline'' <- secondsToDeadline 10
recvCqEvent <- grpcCompletionQueuePluck cq (tag 102)
pluckDeadline''
reserved
assertCqEventComplete recvCqEvent
--send response
withOpArray 3 $ \respOps -> do
withByteStringAsByteBuffer "hello you" $ \respbb -> do
cancelledPtr <- malloc
opRecvCloseServer respOps 0 cancelledPtr
opSendMessage respOps 1 respbb
opSendStatusServer respOps 2 0 (MetadataKeyValPtr nullPtr)
GrpcStatusOk "ok"
serverCall <- peek serverCallPtr
respBatchError <- grpcCallStartBatch serverCall respOps 3
(tag 103) reserved
respBatchError HU.@?= CallOk
pluckDeadline''' <- secondsToDeadline 10
respCqEvent <- grpcCompletionQueuePluck cq (tag 103)
pluckDeadline'''
reserved
assertCqEventComplete respCqEvent
--verify data was received
serverRecv <- peek recvBufferPtr
serverRecvBS <- copyByteBufferToByteString serverRecv
serverRecvBS HU.@?= "hello world"
--shut down
grpcServerShutdownAndNotify server cq (tag 0)
pluckDeadline'''' <- secondsToDeadline 10
shutdownEvent <- grpcCompletionQueuePluck cq (tag 0) pluckDeadline''''
reserved
assertCqEventComplete shutdownEvent
grpcServerCancelAllCalls server
grpcServerDestroy server
grpcCompletionQueueShutdown cq
grpcCompletionQueueDestroy cq
free serverCallPtr
-- | Straightforward translation of the gRPC core test end2end/tests/payload.c
-- This is intended to test the low-level C bindings, so we use only a few
-- minimal abstractions on top of it.
testPayload :: TestTree
testPayload = testCase "low-level C bindings request/response " $ do
grpcInit
withAsync testPayloadServer $ \a1 -> do
withAsync testPayloadClient $ \a2 -> do
() <- wait a1
wait a2
grpcShutdown
putStrLn "Done."
unitTests :: TestTree
unitTests = testGroup "Unit tests"
[testPayload,
roundtripSlice "Hello, world!",
roundtripByteBuffer "Hwaet! We gardena in geardagum...",
testMetadata,
testNow]
main :: IO ()
main = return ()
main = defaultMain unitTests