open!Coreopen!Async(** See the doc/ directory for more details *)(** A [('worker, 'query, 'response) Function.t] is a type-safe function ['query ->
'response Deferred.t] that can only be run on a ['worker]. Under the hood it
represents an Async Rpc protocol that we know a ['worker] will implement. *)moduletypeFunction=sigtype('worker,'query,+'response)tmoduleDirect_pipe:sigtypenonrec('worker,'query,'response)t=('worker,'query*('responseRpc.Pipe_rpc.Pipe_message.t->Rpc.Pipe_rpc.Pipe_response.t),Rpc.Pipe_rpc.Id.t)tendvalmap:('worker,'query,'a)t->f:('a->'b)->('worker,'query,'b)tvalcontra_map:('worker,'a,'response)t->f:('b->'a)->('worker,'b,'response)t(** Common functions that are implemented by all workers *)(** This implementation will add another [Log.Output] for [Log.Global] that transfers
log messages to the returned pipe. You can subscribe to a worker's log more than
once and from different processes, as each call simply adds a new [Log.Output].
Closing the pipe will remove the corresponding [Log.Output].
NOTE: You will never get any log messages before this implementation has run (there
is no queuing of log messages). As a consequence, you will never get any log
messages written in a worker's init functions. *)valasync_log:(_,unit,Log.Message.Stable.V2.tPipe.Reader.t)t(** A given process can have multiple worker servers running (of the same or different
worker types). This implementation closes the server on which it is run. All
existing open connections will remain open, but no further connections to this
worker server will be accepted.
NOTE: calling [close_server] on a worker process that is only running one worker
server will leave a stranded worker process if no other cleanup has been setup
(e.g. setting up [on_client_disconnect] or [Connection.close_finished] handlers) *)valclose_server:(_,unit,unit)tmoduleFor_internal_testing:sigvalworker_server_rpc_settings:(_,unit,Rpc_settings.t)tendendmoduletypeWorker=sigtype('worker,'query,'response)_function(** A [Worker.t] type is defined [with bin_io] so it is possible to create functions
that take a worker as an argument. *)typet[@@derivingbin_io,sexp_of](** A type alias to make the [Connection] signature more readable *)typeworker=ttype'afunctions(** Accessor for the functions implemented by this worker type *)valfunctions:tfunctionstypeworker_state_init_argtypeconnection_state_init_argmoduleId:Identifiablevalid:t->Id.tvalrpc_settings:t->Rpc_settings.t(** [serve arg] will start an Rpc server in process implementing all the functions
of the given worker. *)valserve:worker_state_init_arg->workerDeferred.tmoduleConnection:sigtypet[@@derivingsexp_of](** The [id] of the connected worker *)valworker_id:t->Id.t(** Run functions implemented by this worker *)valrun:t->f:(worker,'query,'response)_function->arg:'query->'responseOr_error.tDeferred.tvalrun_exn:t->f:(worker,'query,'response)_function->arg:'query->'responseDeferred.t(** Connect to a given worker, returning a type wrapped [Rpc.Connection.t] that can be
used to run functions. *)valclient:worker->connection_state_init_arg->tOr_error.tDeferred.tvalclient_exn:worker->connection_state_init_arg->tDeferred.t(** [with_client worker init_arg f] connects to the [worker]'s server, initializes the
connection state with [init_arg] and runs [f] until an exception is thrown or
until the returned Deferred is determined.
NOTE: You should be careful when using this with [Pipe_rpc].
See [Rpc.Connection.with_close] for more information. *)valwith_client:worker->connection_state_init_arg->f:(t->'aDeferred.t)->'aOr_error.tDeferred.tvalclose:t->unitDeferred.tvalclose_finished:t->unitDeferred.tvalclose_reason:t->on_close:[`started|`finished]->Info.tDeferred.tvalis_closed:t->boolendmoduleShutdown_on(M:T1):sig(** This variant determines when a spawned worker will shut itself down.
In both the [Connection_closed] and [Heartbeater_connection_timeout] cases, the
worker will shut itself down when [Rpc.Connection.close_finished] is determined on
some established connection. The difference is which connection.
In the [Connection_closed] case, the connection is returned back to the caller.
The caller can call [Worker.Connection.close] to cause worker shutdown. If you
never intend to reconnect to your spawned worker, this is probably the variant you
want to choose.
In the [Heartbeater_connection_timeout] case, the connection is internal to the
library. The [worker] is returned so the caller is free to establish
connections and close them without triggering worker shutdown.
In both the above cases, worker shutdown will be triggered when the master process
exits. It may also result from network problems or long async cycles.
In the [Called_shutdown_function] case, the worker will only shut itself down on
an explicit [Worker.shutdown] call. Worker's spawned with this variant do not
shutdown when the master process exits. We strongly discourage use of this
variant. *)type_t=|Connection_closed:(connection_state_init_arg:connection_state_init_arg->Connection.tM.tDeferred.t)t|Heartbeater_connection_timeout:workerM.tDeferred.tt|Called_shutdown_function:workerM.tDeferred.ttend(** The various [spawn] functions create a new worker process that implements the
functions specified in the [Worker_spec].
[name] will be attached to certain error messages and is useful for debugging.
[env] extends the environment of the spawned worker process.
[connection_timeout] is used for various internal timeouts. This may need be to
increased if the init arg is really large (serialization and deserialization
takes more than [connection_timeout]).
[cd] changes the current working directory of a spawned worker process.
[shutdown_on] specifies when a worker should shut itself down.
[on_failure exn] will be called in the spawning process upon the worker process
raising a background exception. All exceptions raised before functions return will be
returned to the caller. [on_failure] will be called in [Monitor.current ()] at the
time of this spawn call. The worker initiates shutdown upon sending the exception
to the master process.
[worker_state_init_arg] (below) will be passed to [init_worker_state] of the given
[Worker_spec] module. This initializes a persistent worker state for all connections
to this worker. *)type'awith_spawn_args=?how:How_to_run.t(** default [How_to_run.local] *)->?name:string->?env:(string*string)list->?connection_timeout:Time.Span.t(** default 10 sec *)->?cd:string(** default / *)->on_failure:(Error.t->unit)->'a(** The spawned worker process daemonizes. Any initialization errors that wrote to
stderr (Rpc_parallel internal initialization, not user initialization code) will be
captured and rewritten to the spawning process's stderr with the prefix
"[WORKER %NAME% STDERR]".
[redirect_stdout] and [redirect_stderr] specify stdout and stderr of the worker
process. *)valspawn:(?umask:int(** defaults to use existing umask *)->shutdown_on:'aShutdown_on(Or_error).t->redirect_stdout:Fd_redirection.t->redirect_stderr:Fd_redirection.t->worker_state_init_arg->'a)with_spawn_argsvalspawn_exn:(?umask:int(** defaults to use existing umask *)->shutdown_on:'aShutdown_on(Monad.Ident).t->redirect_stdout:Fd_redirection.t->redirect_stderr:Fd_redirection.t->worker_state_init_arg->'a)with_spawn_argsmoduleSpawn_in_foreground_result:sigtype'at=('a*Process.t)Or_error.tend(** Similar to [spawn] but the worker process does not daemonize. If the process was
spawned on a remote host, the ssh [Process.t] is returned.
Remember to call [Process.wait] on the returned [Process.t] to avoid a zombie
process. Once the process exits, remember to call [Writer.close] on [Process.stdin]
and [Reader.close] on [Process.stdout] and [Process.stderr] to close the process's
stdin/stdout/stderr. *)valspawn_in_foreground:(shutdown_on:'aShutdown_on(Spawn_in_foreground_result).t->worker_state_init_arg->'a)with_spawn_argsmoduleSpawn_in_foreground_exn_result:sigtype'at='a*Process.tendvalspawn_in_foreground_exn:(shutdown_on:'aShutdown_on(Spawn_in_foreground_exn_result).t->worker_state_init_arg->'a)with_spawn_args(** [shutdown] attempts to connect to a worker. Upon success, [Shutdown.shutdown 0] is
run in the worker. If you want strong guarantees that a worker did shutdown, consider
using [spawn_in_foreground] and inspecting the [Process.t]. *)valshutdown:t->unitOr_error.tDeferred.tmoduleDeprecated:sig(** This is nearly identical to calling [spawn ~shutdown_on:Heartbeater_connection_timeout] and
then [Connection.client]. The only difference is that this function handles
shutting down the worker when [Connection.client] returns an error.
Uses of [spawn_and_connect] that disregard [t] can likely be replaced with [spawn
~shutdown_on:Connection_closed]. If [t] is used for reconnecting, then you can use [spawn]
followed by [Connection.client]. *)valspawn_and_connect:(?umask:int->redirect_stdout:Fd_redirection.t->redirect_stderr:Fd_redirection.t->connection_state_init_arg:connection_state_init_arg->worker_state_init_arg->(t*Connection.t)Or_error.tDeferred.t)with_spawn_argsvalspawn_and_connect_exn:(?umask:int->redirect_stdout:Fd_redirection.t->redirect_stderr:Fd_redirection.t->connection_state_init_arg:connection_state_init_arg->worker_state_init_arg->(t*Connection.t)Deferred.t)with_spawn_argsend(** This module is used for internal testing of the rpc_parallel library. *)moduleFor_internal_testing:sigmoduleSpawn_in_foreground_result:sigtype'at=('a*Process.t,Error.t*[`Worker_processofUnix.Exit_or_signal.tDeferred.toption])Result.tendvalspawn_in_foreground:(shutdown_on:'aShutdown_on(Spawn_in_foreground_result).t->worker_state_init_arg->'a)with_spawn_argsvalmaster_app_rpc_settings:unit->Rpc_settings.tendendmoduletypeFunctions=sigtypeworkertypeworker_state_init_argtypeworker_statetypeconnection_state_init_argtypeconnection_statetype'workerfunctionsvalfunctions:workerfunctions(** [init_worker_state] is called with the [init_arg] passed to [spawn] or [serve] *)valinit_worker_state:worker_state_init_arg->worker_stateDeferred.t(** [init_connection_state] is called with the [init_arg] passed to [Connection.client]
[connection] should only be used to register [close_finished] callbacks, not to
dispatch. *)valinit_connection_state:connection:Rpc.Connection.t->worker_state:worker_state->connection_state_init_arg->connection_stateDeferred.tendmoduletypeCreator=sigtype('worker,'query,'response)_functiontype('worker,'query,'response)_directtypeworkertypeworker_statetypeconnection_state(** [create_rpc ?name ~f ~bin_input ~bin_output ()] will create an [Rpc.Rpc.t] with
[name] if specified and use [f] as an implementation for this Rpc. It returns back a
[_function], a type-safe Rpc protocol. *)valcreate_rpc:?name:string->f:(worker_state:worker_state->conn_state:connection_state->'query->'responseDeferred.t)->bin_input:'queryBin_prot.Type_class.t->bin_output:'responseBin_prot.Type_class.t->unit->(worker,'query,'response)_function(** [create_pipe ?name ~f ~bin_input ~bin_output ()] will create an [Rpc.Pipe_rpc.t]
with [name] if specified. The implementation for this Rpc is a function that creates
a [Pipe.Reader.t] and a [Pipe.Writer.t], then calls [f arg ~writer] and returns the
reader.
Notice that [aborted] is not exposed. The pipe is closed upon aborted. *)valcreate_pipe:?name:string->f:(worker_state:worker_state->conn_state:connection_state->'query->'responsePipe.Reader.tDeferred.t)->bin_input:'queryBin_prot.Type_class.t->bin_output:'responseBin_prot.Type_class.t->unit->(worker,'query,'responsePipe.Reader.t)_function(** [create_direct_pipe ?name ~f ~bin_input ~bin_output ()] will create an
[Rpc.Pipe_rpc.t] with [name] if specified. *)valcreate_direct_pipe:?name:string->f:(worker_state:worker_state->conn_state:connection_state->'query->'responseRpc.Pipe_rpc.Direct_stream_writer.t->unitDeferred.t)->bin_input:'queryBin_prot.Type_class.t->bin_output:'responseBin_prot.Type_class.t->unit->(worker,'query,'response)_direct(** [create_one_way ?name ~f ~bin_msg ()] will create an [Rpc.One_way.t] with [name] if
specified and use [f] as an implementation. *)valcreate_one_way:?name:string->f:(worker_state:worker_state->conn_state:connection_state->'query->unit)->bin_input:'queryBin_prot.Type_class.t->unit->(worker,'query,unit)_function(** [create_reverse_pipe ?name ~f ~bin_query ~bin_update ~bin_response ()] generates a
function allowing you to send a [query] and a pipe of [update]s to a worker. The
worker will send back a [response]. It is up to you whether to send a [response]
before or after finishing with the pipe; Rpc_parallel doesn't care. *)valcreate_reverse_pipe:?name:string->f:(worker_state:worker_state->conn_state:connection_state->'query->'updatePipe.Reader.t->'responseDeferred.t)->bin_query:'queryBin_prot.Type_class.t->bin_update:'updateBin_prot.Type_class.t->bin_response:'responseBin_prot.Type_class.t->unit->(worker,'query*'updatePipe.Reader.t,'response)_function(** [create_reverse_direct_pipe ?name ~f ~bin_query ~bin_update ~bin_response ()]
generates a function allowing you to send a [query] and a direct stream of [update]s
to a worker. The worker will send back a [response]. It is up to you whether to send
a [response] before or after finishing with the pipe; Rpc_parallel doesn't care. *)valcreate_reverse_direct_pipe:?name:string->f:(worker_state:worker_state->conn_state:connection_state->'query->'updatePipe.Reader.t->'responseDeferred.t)->bin_query:'queryBin_prot.Type_class.t->bin_update:'updateBin_prot.Type_class.t->bin_response:'responseBin_prot.Type_class.t->unit->(worker,'query*('updateRpc.Pipe_rpc.Direct_stream_writer.t->unitOr_error.tDeferred.t),'response)_function(** [of_async_rpc ~f rpc] is the analog to [create_rpc] but instead of creating an Rpc
protocol, it uses the supplied one *)valof_async_rpc:f:(worker_state:worker_state->conn_state:connection_state->'query->'responseDeferred.t)->('query,'response)Rpc.Rpc.t->(worker,'query,'response)_function(** [of_async_pipe_rpc ~f rpc] is the analog to [create_pipe] but instead of creating a
Pipe rpc protocol, it uses the supplied one.
Notice that [aborted] is not exposed. The pipe is closed upon aborted. *)valof_async_pipe_rpc:f:(worker_state:worker_state->conn_state:connection_state->'query->'responsePipe.Reader.tDeferred.t)->('query,'response,Error.t)Rpc.Pipe_rpc.t->(worker,'query,'responsePipe.Reader.t)_function(** [of_async_direct_pipe_rpc ~f rpc] is the analog to [create_direct_pipe] but instead
of creating a Pipe rpc protocol, it uses the supplied one. *)valof_async_direct_pipe_rpc:f:(worker_state:worker_state->conn_state:connection_state->'query->'responseRpc.Pipe_rpc.Direct_stream_writer.t->unitDeferred.t)->('query,'response,Error.t)Rpc.Pipe_rpc.t->(worker,'query,'response)_direct(** [of_async_one_way_rpc ~f rpc] is the analog to [create_one_way] but instead of
creating a One_way rpc protocol, it uses the supplied one *)valof_async_one_way_rpc:f:(worker_state:worker_state->conn_state:connection_state->'query->unit)->'queryRpc.One_way.t->(worker,'query,unit)_functionend(** Specification for the creation of a worker type *)moduletypeWorker_spec=sigtype('worker,'query,'response)_functiontype('worker,'query,'response)_direct(** A type to encapsulate all the functions that can be run on this worker. Using a
record type here is often the most convenient and readable. *)type'workerfunctions(** State associated with each [Worker.t]. If this state is mutable, you must
think carefully when making multiple connections to the same spawned worker. *)moduleWorker_state:sigtypettypeinit_arg[@@derivingbin_io]end(** State associated with each connection to a [Worker.t] *)moduleConnection_state:sigtypettypeinit_arg[@@derivingbin_io]end(** The functions that can be run on this worker type *)moduleFunctions(C:Creatorwithtypeworker_state=Worker_state.tandtypeconnection_state=Connection_state.tandtype('w,'q,'r)_function:=('w,'q,'r)_functionandtype('w,'q,'r)_direct:=('w,'q,'r)_direct):Functionswithtypeworker:=C.workerandtype'afunctions:='afunctionsandtypeworker_state:=Worker_state.tandtypeworker_state_init_arg:=Worker_state.init_argandtypeconnection_state:=Connection_state.tandtypeconnection_state_init_arg:=Connection_state.init_argend(** An RPC backend. Defines how to create RPC servers and clients for a given protocol.
Currently, we have two backends, one in each of rpc_parallel_unauthenticated and
rpc_parallel_krb. These backends define how to create and connect to unencrypted and
kerberized RPC servers, respectively. Unless you are implementing your own
rpc_parallel backend, you should use one of the two libraries above instead of
creating your own modules with this signature. *)moduletypeBackend=sig(** The name of the backend -- e.g. "Unauthenticated Async RPC". Must be globally
unique. Used for error messages if someone initializes Rpc_parallel multiple times
with different backends. *)valname:stringmoduleSettings:sig(** Additional settings needed to create or connect to an RPC server for this backend.
For unecrypted RPC, this is unit. For kerberized RPC, this is the client and
server krb modes. *)typet[@@derivingbin_io,sexp]endvalserve:?max_message_size:int->?handshake_timeout:Time.Span.t->?heartbeat_config:Rpc.Connection.Heartbeat_config.t->implementations:'aRpc.Implementations.t->initial_connection_state:(Socket.Address.Inet.t->Rpc.Connection.t->'a)->where_to_listen:Tcp.Where_to_listen.inet->Settings.t->(Socket.Address.Inet.t,int)Tcp.Server.tDeferred.tvalwith_client:?implementations:'bRpc.Connection.Client_implementations.t->?max_message_size:int->?handshake_timeout:Time.Span.t->?heartbeat_config:Rpc.Connection.Heartbeat_config.t->Settings.t->Socket.Address.Inet.tTcp.Where_to_connect.t->(Rpc.Connection.t->'aDeferred.t)->'aOr_error.tDeferred.tvalclient:?implementations:'aRpc.Connection.Client_implementations.t->?max_message_size:int->?handshake_timeout:Time.Span.t->?heartbeat_config:Rpc.Connection.Heartbeat_config.t->?description:Info.t->Settings.t->Socket.Address.Inet.tTcp.Where_to_connect.t->Rpc.Connection.tOr_error.tDeferred.tend(** A backend packed together with its settings. Unless you are implementing your own
rpc_parallel backend, you should use either rpc_parallel_krb or
rpc_parallel_unauthenticated instead of constructing values of this type *)moduletypeBackend_and_settings=sigtypet=T:(moduleBackendwithtypeSettings.t='a)*'a->tendmoduletypeParallel=sigmoduleFunction:FunctionmoduleBackend_and_settings:Backend_and_settingsmoduletypeBackend=BackendmoduletypeWorker=Workerwithtype('w,'q,'r)_function:=('w,'q,'r)Function.tmoduletypeFunctions=FunctionsmoduletypeCreator=Creatorwithtype('w,'q,'r)_function:=('w,'q,'r)Function.tandtype('w,'q,'r)_direct:=('w,'q,'r)Function.Direct_pipe.tmoduletypeWorker_spec=Worker_specwithtype('w,'q,'r)_function:=('w,'q,'r)Function.tandtype('w,'q,'r)_direct:=('w,'q,'r)Function.Direct_pipe.t(** module Worker = Make(T)
The [Worker] module has specialized functions to spawn workers and run functions on
workers. *)moduleMake(S:Worker_spec):Workerwithtype'afunctions:='aS.functionsandtypeworker_state_init_arg:=S.Worker_state.init_argandtypeconnection_state_init_arg:=S.Connection_state.init_arg(** [start_app command] should be called from the top-level in order to start the
parallel application. This function will parse certain environment variables and
determine whether to start as a master or a worker.
[rpc_max_message_size], [rpc_handshake_timeout], [rpc_heartbeat_config] can be used
to alter the rpc defaults. These rpc settings will be used for all connections.
This can be useful if you have long async jobs.
[when_parsing_succeeds] will be passed to [Command.run] in the master process. *)valstart_app:?rpc_max_message_size:int->?rpc_handshake_timeout:Time.Span.t->?rpc_heartbeat_config:Rpc.Connection.Heartbeat_config.t->?when_parsing_succeeds:(unit->unit)->Backend_and_settings.t(** Use rpc_parallel_krb or rpc_parallel_unauthenticated to avoid having to manually
construct a custom Backend *)->Command.t->unit(** Use [State.get] to query whether the current process has been initialized as an rpc
parallel master ([start_app] or [init_master_exn] has been called). We return a
[State.t] rather than a [bool] so that you can require evidence at the type level.
If you want to certify, as a precondition, for some function that [start_app] was
used, require a [State.t] as an argument. If you don't need the [State.t] anymore,
just pattern match on it. *)moduleState:sigtypet=private[<`started]valget:unit->toptionendmoduleFor_testing:sig(** [initialize [%here]] must be called at the top level of any files that have inline
or expect tests that use [Rpc_parallel]. Further, these calls must come before the
definitions of the tests, but after the definitions of any workers used in the
tests.
For example:
{[
let () = Rpc_parallel.For_testing.initialize [%here]
let%expect_test "" =
run_code_with_rpc_parallel ();
[%expect {| output |}]
;;
]}
*)valinitialize:Backend_and_settings.t(** Use rpc_parallel_krb or rpc_parallel_unauthenticated to avoid having to manually
construct a custom Backend *)->Source_code_position.t->unitend(** If you want more direct control over your executable, you can use the [Expert]
module instead of [start_app]. If you use [Expert], you are responsible for starting
the master and worker rpc servers. [worker_command_args] will be the arguments sent
to each spawned worker. Running your executable with these args must follow a code
path that calls [worker_init_before_async_exn] and then [start_worker_server_exn].
An easy way to do this is to use [worker_command]. *)moduleExpert:sig(** [start_master_server_exn] must be called in the single master process. It is
necessary to be able to spawn workers. Raises if the process was spawned.
If [pass_name] is [false], the [?name] argument to spawned workers will not be
propagated into the worker's command line. This override is only needed to support
the "deprecated option" for implementing worker commands described below. *)valstart_master_server_exn:?rpc_max_message_size:int->?rpc_handshake_timeout:Time.Span.t->?rpc_heartbeat_config:Rpc.Connection.Heartbeat_config.t->?pass_name:bool(** default: true *)->Backend_and_settings.t(** Use rpc_parallel_krb or rpc_parallel_unauthenticated to avoid having to manually
construct a custom Backend *)->worker_command_args:stringlist->unit->unit(** You have two options for implementing the worker process, a simple one and a
deprecated one.
Simple option: just make sure [worker_command] is somewhere in the command
hierarchy of the same executable in which [start_master_server_exn] is called,
with a subcommand path equal to [worker_command_args]. It is possible for multiple
calls to [start_master_server_exn] to share the same [worker_command_args].
Deprecated option: implement something at least as complicated yourself using
[worker_init_before_async_exn] and [start_worker_server_exn]. This option may go
away in the future. *)valworker_command:(moduleBackend)(** Use rpc_parallel_krb or rpc_parallel_unauthenticated to avoid having to manually
construct a custom Backend *)->Command.tmoduleWorker_env:sigtypetend(** [worker_init_before_async_exn] must be called in a spawned worker process before
the async scheduler has started. You must not read from stdin before this function
call.
This has the side effect of calling [chdir]. *)valworker_init_before_async_exn:unit->Worker_env.t(** [start_worker_server_exn] must be called in each spawned process. It is illegal to
call both [start_master_server_exn] and [start_worker_server_exn] in the same
process. Raises if the process was not spawned.
This has the side effect of scheduling a job that completes the daemonization of
this process (if the process should daemonize). This includes redirecting stdout
and stderr according to [redirect_stdout] and [redirect_stderr]. All writes to
stdout before this job runs are blackholed. All writes to stderr before this job
runs are redirected to the spawning process's stderr. *)valstart_worker_server_exn:(moduleBackend)(** Use rpc_parallel_krb or rpc_parallel_unauthenticated to avoid having to manually
construct a custom Backend *)->Worker_env.t->unitendend