123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337openCoreopenAsync_kernel(* The reason for defining this module type explicitly is so that we can internally keep
track of what is and isn't exposed. *)moduletypeS=sigtypet[@@derivingsexp_of]moduleHeartbeat_config:sigtypet[@@derivingsexp,bin_io](** Each side of the connection has its own heartbeat config. It sends a heartbeat
every [send_every]. If it doesn't receive any messages for [timeout], whether it's
a heartbeat or not, it drops the connection. It only checks whether [timeout] has
elapsed when it sends heartbeats, so effectively [timeout] is rounded up to the
nearest multiple of [send_every]. *)valcreate:?timeout:Time_ns.Span.t->?send_every:Time_ns.Span.t->unit->tvaltimeout:t->Time_ns.Span.tvalsend_every:t->Time_ns.Span.tendmoduleClient_implementations:sigtypeconnection:=ttypet=|T:{connection_state:connection->'s;implementations:'sImplementations.t}->tvalnull:unit->tend(** Initiate an Rpc connection on the given transport. [implementations] should be the
bag of implementations that the calling side implements; it defaults to
[Implementations.null] (i.e., "I implement no RPCs").
[protocol_version_headers] should contain the pre-shared protocol version headers,
if applicable. The client and server must agree on whether these were shared and, if
so, what they contained, or there may be a protocol error.
[connection_state] will be called once, before [create]'s result is determined, on
the same connection that [create] returns. Its output will be provided to the
[implementations] when queries arrive.
WARNING: If specifying a custom [heartbeat_config], make sure that both ends of the
Rpc connection use compatible settings for timeout and send frequency. Otherwise,
your Rpc connections might close unexpectedly.
[max_metadata_size] will limit how many bytes of metadata this peer can send along
with each query. It defaults to 1k. User-provided metadata exceeding that size will
be truncated.
WARNING: setting this value too high allows this connection to send large amounts of
data to the callee, unnoticed, which can severely degrade performance.
[description] can be used to give some extra information about the connection, which
will then show up in error messages and the connection's sexp. If you have lots of
connections in your program, this can be useful for distinguishing them.
[time_source] can be given to define the time_source for which the heartbeating
events will be scheduled. Defaults to wall-clock.
[identification] can be used to send an additional information to the peer. This is
intended to be used for identifying the identity of the /process/ as opposed to the
identity of the user. We use a bigstring to leave the option for clients to
interpret as structured data of their choosing. *)valcreate:?implementations:'sImplementations.t->?protocol_version_headers:Protocol_version_header.Pair.t->connection_state:(t->'s)->?handshake_timeout:Time_ns.Span.t->?heartbeat_config:Heartbeat_config.t->?max_metadata_size:Byte_units.t->?description:Info.t->?time_source:Synchronous_time_source.t->?identification:Bigstring.t->Transport.t->(t,Exn.t)Result.tDeferred.t(** As of Feb 2017, the RPC protocol started to contain a magic number so that one can
identify RPC communication. The bool returned by [contains_magic_prefix] says
whether this magic number was observed. *)valcontains_magic_prefix:boolBin_prot.Type_class.readervaldescription:t->Info.t(** After [add_heartbeat_callback t f], [f ()] will be called after every subsequent
heartbeat received by [t]. *)valadd_heartbeat_callback:t->(unit->unit)->unit(** Changes the heartbeat timeout and restarts the timer by setting [last_seen_alive] to
the current time. *)valreset_heartbeat_timeout:t->Time_ns.Span.t->unit(** The last time either any message has been received or [reset_heartbeat_timeout] was
called. *)vallast_seen_alive:t->Time_ns.t(** [close] starts closing the connection's transport, and returns a deferred that
becomes determined when its close completes. It is ok to call [close] multiple
times on the same [t]; calls subsequent to the initial call will have no effect, but
will return the same deferred as the original call.
Before closing the underlying transport's writer, [close] waits for all streaming
responses to be [Pipe.upstream_flushed] with a timeout of
[streaming_responses_flush_timeout].
The [reason] for closing the connection will be passed to callers of [close_reason].
*)valclose:?streaming_responses_flush_timeout:Time_ns.Span.t(* default: 5 seconds *)->?reason:Info.t->t->unitDeferred.t(** [close_finished] becomes determined after the close of the connection's transport
completes, i.e. the same deferred that [close] returns. [close_finished] differs
from [close] in that it does not have the side effect of initiating a close. *)valclose_finished:t->unitDeferred.t(** [close_reason ~on_close t] becomes determined when close starts or finishes
based on [on_close], but additionally returns the reason that the connection was
closed. *)valclose_reason:t->on_close:[`started|`finished]->Info.tDeferred.t(** [is_closed t] returns [true] iff [close t] has been called. [close] may be called
internally upon errors or timeouts. *)valis_closed:t->bool(** [bytes_to_write] and [flushed] just call the similarly named function on the
[Transport.Writer.t] within a connection. *)valbytes_to_write:t->int(** [bytes_written] just calls the similarly named functions on the [Transport.Writer.t]
within a connection. *)valbytes_written:t->Int63.t(** [bytes_read] just calls the similarly named function on the [Transport.Reader.t]
within a connection. *)valbytes_read:t->Int63.tvalflushed:t->unitDeferred.t(** Peer menu will become determined before any other messages are received. The menu is
sent automatically on creation of a connection. If the peer is using an older
version, the value is immediately determined to be [None]. If the connection is
closed before the menu is received, an error is returned.
It is expected that one will call {!Versioned_rpc.Connection_with_menu.create}
instead of this function and that will request the menu via rpc if it gets [None].
*)valpeer_menu:t->Menu.toptionOr_error.tDeferred.t(** Like {!peer_menu} but returns an rpc result *)valpeer_menu':t->Menu.toptionRpc_result.tDeferred.tvalmy_menu:t->Menu.toption(** Peer identification will become determined before any other messages are received.
If the peer is using an older version, the peer id is immediately determined to be
[None]. If the connection is closed before the menu is received, [None] is returned.
*)valpeer_identification:t->Bigstring.toptionDeferred.t(** [with_close] tries to create a [t] using the given transport. If a handshake error
is the result, it calls [on_handshake_error], for which the default behavior is to
raise an exception. If no error results, [dispatch_queries] is called on [t].
After [dispatch_queries] returns, if [server] is None, the [t] will be closed and
the deferred returned by [dispatch_queries] will be determined immediately.
Otherwise, we'll wait until the other side closes the connection and then close [t]
and determine the deferred returned by [dispatch_queries].
When the deferred returned by [with_close] becomes determined, [Transport.close] has
finished.
NOTE: Because this connection is closed when the [Deferred.t] returned by
[dispatch_queries] is determined, you should be careful when using this with
[Pipe_rpc]. For example, simply returning the pipe when you get it will close the
pipe immediately. You should instead either use the pipe inside [dispatch_queries]
and not determine its result until you are done with the pipe, or use a different
function like [create]. *)valwith_close:?implementations:'sImplementations.t->?protocol_version_headers:Protocol_version_header.Pair.t->?handshake_timeout:Time_ns.Span.t->?heartbeat_config:Heartbeat_config.t->?description:Info.t->?time_source:Synchronous_time_source.t->connection_state:(t->'s)->Transport.t->dispatch_queries:(t->'aDeferred.t)->on_handshake_error:[`Raise|`CallofExn.t->'aDeferred.t]->'aDeferred.t(** Runs [with_close] but dispatches no queries. The implementations are required
because this function doesn't let you dispatch any queries (i.e., act as a client),
it would be pointless to call it if you didn't want to act as a server.*)valserver_with_close:?handshake_timeout:Time_ns.Span.t->?heartbeat_config:Heartbeat_config.t->?description:Info.t->?time_source:Synchronous_time_source.t->Transport.t->implementations:'sImplementations.t->connection_state:(t->'s)->on_handshake_error:[`Raise|`Ignore|`CallofExn.t->unitDeferred.t]->unitDeferred.tendmoduletypeS_private=sigopenProtocolincludeS(* Internally, we use a couple of extra functions on connections that aren't exposed to
users. *)valcompute_metadata:t->Description.t->Query_id.t->Rpc_metadata.toptiontyperesponse_with_determinable_status=|Pipe_eof|Expert_indeterminate|Determinable:'aRpc_result.t*'aImplementation.F.error_mode->response_with_determinable_statustyperesponse_handler_action=|Keep|WaitofunitDeferred.t|Removeof(response_with_determinable_status,Rpc_error.tGel.t)result|Expert_remove_and_waitofunitDeferred.ttyperesponse_handler=Nat0.tResponse.t->read_buffer:Bigstring.t->read_buffer_pos_ref:intref->response_handler_actionvalsexp_of_t_hum_writer:t->Sexp.tmoduleDispatch_error:sigtypet=|Closed|Message_too_bigofTransport.Send_result.message_too_big[@@derivingsexp_of]endvaldispatch:t->kind:Tracing_event.Sent_response_kind.tTracing_event.Kind.t->response_handler:response_handleroption->bin_writer_query:'aBin_prot.Type_class.writer->query:'aQuery.t->(unit,Dispatch_error.t)Result.tvaldispatch_bigstring:t->tag:Rpc_tag.t->version:int->metadata:unit->Bigstring.t->pos:int->len:int->response_handler:response_handleroption->(unit,Dispatch_error.t)Result.tvalschedule_dispatch_bigstring:t->tag:Rpc_tag.t->version:int->metadata:unit->Bigstring.t->pos:int->len:int->response_handler:response_handleroption->(unitDeferred.t,Dispatch_error.t)Result.tvalschedule_dispatch_bigstring_with_metadata:t->tag:Rpc_tag.t->version:int->metadata:stringoption->Bigstring.t->pos:int->len:int->response_handler:response_handleroption->(unitDeferred.t,Dispatch_error.t)Result.tvaldefault_handshake_timeout:Time_ns.Span.t(** Allows getting information from the RPC that may be used for tracing or metrics. The
interface is not yet stable. *)valevents:t->(Tracing_event.t->unit)Bus.Read_only.t(** The header that would be sent at the beginning of a connection. This can be used to
pre-share this part of the handshake (see the [protocol_version_headers] argument to
[create]). *)valdefault_handshake_header:Protocol_version_header.t(** Allows some extra information to be passed between clients and servers (e.g. for
tracing). The [when_sending] function is called to compute the metadata that is sent
along with rpc queries. It is called in the same async context as the dispatch
function. The [on_receive] function is called before an rpc implementation to modify
the execution context for that implementation. Metadata is not sent if the server is
running an old version of the Async_rpc protocol. Handlers should not change the
monitor on the execution context (this will have no effect on where errors are sent
for rpc implementations).
The passed [query_id] may be used to correlate with a listener on the {!events} bus. *)valset_metadata_hooks:t->when_sending:(Description.t->query_id:Int63.t->Rpc_metadata.toption)->on_receive:(Description.t->query_id:Int63.t->Rpc_metadata.toption->Execution_context.t->Execution_context.t)->[`Ok|`Already_set](** True if future calls to [set_metadata_hooks] will return [`Already_set]. *)valhave_metadata_hooks_been_set:t->boolmoduleFor_testing:sigmoduleHeader:sigtypet[@@derivingbin_io,sexp_of]valv1:tvalv2:tvalv3:tendvalwith_async_execution_context:context:Header.t->f:(unit->'a)->'aendend