123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312openCoreopenImportmoduleTransport=Rpc_transportmoduleLow_latency_transport=Rpc_transport_low_latencymoduleAny=Rpc_kernel.AnymoduleDescription=Rpc_kernel.DescriptionmoduleImplementation=Rpc_kernel.ImplementationmoduleImplementations=Rpc_kernel.ImplementationsmoduleOne_way=Rpc_kernel.One_waymodulePipe_rpc=Rpc_kernel.Pipe_rpcmoduleRpc=Rpc_kernel.RpcmoduleState_rpc=Rpc_kernel.State_rpcmodulePipe_close_reason=Rpc_kernel.Pipe_close_reasonmoduleConnection=structincludeRpc_kernel.Connection(* unfortunately, copied from reader0.ml *)letdefault_max_message_size=100*1024*1024letcreate?implementations~connection_state?(max_message_size=default_max_message_size)?handshake_timeout?heartbeat_config?descriptionreaderwriter=create?implementations~connection_state?handshake_timeout:(Option.maphandshake_timeout~f:Time_ns.Span.of_span_float_round_nearest)?heartbeat_config?description(Transport.of_reader_writerreaderwriter~max_message_size);;letcontains_magic_prefixreader=Deferred.Or_error.try_with(fun()->Reader.peek_bin_protreadercontains_magic_prefix)>>|function|Error_|Ok`Eof->false|Ok(`Okb)->b;;letwith_close?implementations?(max_message_size=default_max_message_size)?handshake_timeout?heartbeat_config~connection_statereaderwriter~dispatch_queries~on_handshake_error=with_close?implementations?handshake_timeout:(Option.maphandshake_timeout~f:Time_ns.Span.of_span_float_round_nearest)?heartbeat_config~connection_state(Transport.of_reader_writerreaderwriter~max_message_size)~dispatch_queries~on_handshake_error;;letserver_with_close?(max_message_size=default_max_message_size)?handshake_timeout?heartbeat_configreaderwriter~implementations~connection_state~on_handshake_error=server_with_close?handshake_timeout:(Option.maphandshake_timeout~f:Time_ns.Span.of_span_float_round_nearest)?heartbeat_config(Transport.of_reader_writerreaderwriter~max_message_size)~implementations~connection_state~on_handshake_error;;letcollect_errors(transport:Transport.t)~f=letmonitor=Transport.Writer.monitortransport.writerin(* don't propagate errors up, we handle them here *)ignore(Monitor.detach_and_get_error_streammonitor);choose[choice(Monitor.get_next_errormonitor)(fune->Errore);choice(try_with~name:"Rpc.Connection.collect_errors"f)Fn.id];;typetransport_maker=Fd.t->max_message_size:int->Transport.ttypeon_handshake_error=[`Raise|`Ignore|`CallofExn.t->unit]letdefault_transport_makerfd~max_message_size=Transport.of_fdfd~max_message_sizeletserve_with_transport~handshake_timeout~heartbeat_config~implementations~description~connection_state~on_handshake_errortransport=collect_errorstransport~f:(fun()->Rpc_kernel.Connection.create?handshake_timeout:(Option.maphandshake_timeout~f:Time_ns.Span.of_span_float_round_nearest)?heartbeat_config~implementations~description~connection_statetransport>>=function|Okt->close_finishedt|Errorhandshake_error->(matchon_handshake_errorwith|`Callf->fhandshake_error|`Raise->raisehandshake_error|`Ignore->());Deferred.unit)>>=funres->Transport.closetransport>>|fun()->Result.ok_exnres;;letserve~implementations~initial_connection_state~where_to_listen?max_connections?backlog?(max_message_size=default_max_message_size)?(make_transport=default_transport_maker)?handshake_timeout?heartbeat_config?(auth=fun_->true)?(on_handshake_error=`Ignore)?(on_handler_error=`Ignore)()=Tcp.Server.create_sock?max_connections?backlogwhere_to_listen~on_handler_error(funinetsocket->match(Socket.getpeernamesocket:>Socket.Address.t)with|exception_could_raise_if_the_socket_disconnects_quickly->Deferred.unit|client_addr->ifnot(authinet)thenDeferred.unitelse(letdescription=letserver_addr=(Socket.getsocknamesocket:>Socket.Address.t)inInfo.create_s[%message"TCP server"(server_addr:Socket.Address.t)(client_addr:Socket.Address.t)]inletconnection_state=initial_connection_stateinetinserve_with_transport~handshake_timeout~heartbeat_config~implementations~description~connection_state~on_handshake_error(make_transport~max_message_size(Socket.fdsocket))));;letclient'?implementations?(max_message_size=default_max_message_size)?(make_transport=default_transport_maker)?(handshake_timeout=Time_ns.Span.to_span_float_round_nearestAsync_rpc_kernel.Async_rpc_kernel_private.default_handshake_timeout)?heartbeat_config?descriptionwhere_to_connect=letfinish_handshake_by=Time_ns.add(Time_ns.now())(Time_ns.Span.of_span_float_round_nearesthandshake_timeout)inMonitor.try_with(fun()->Tcp.connect_sock~timeout:handshake_timeoutwhere_to_connect)>>=?funsock->matchSocket.getpeernamesockwith|exceptionexn_could_be_raised_if_the_socket_is_diconnected_now->Socket.shutdownsock`Both;Deferred.Result.failexn_could_be_raised_if_the_socket_is_diconnected_now|sock_peername->letdescription=matchdescriptionwith|None->Info.create"Client connected via TCP"where_to_connect[%sexp_of:_Tcp.Where_to_connect.t]|Somedesc->Info.tag_argdesc"via TCP"where_to_connect[%sexp_of:_Tcp.Where_to_connect.t]inlethandshake_timeout=Time_ns.difffinish_handshake_by(Time_ns.now())inlettransport=make_transport(Socket.fdsock)~max_message_sizein(matchimplementationswith|None->let{Client_implementations.connection_state;implementations}=Client_implementations.null()inRpc_kernel.Connection.createtransport~handshake_timeout?heartbeat_config~implementations~description~connection_state|Some{Client_implementations.connection_state;implementations}->Rpc_kernel.Connection.createtransport~handshake_timeout?heartbeat_config~implementations~description~connection_state)>>=(function|Okt->return(Ok(sock_peername,t))|Error_aserror->Transport.closetransport>>=fun()->returnerror);;letclient?implementations?max_message_size?make_transport?handshake_timeout?heartbeat_config?descriptionwhere_to_connect=client'?implementations?max_message_size?make_transport?handshake_timeout?heartbeat_config?descriptionwhere_to_connect>>|?snd;;letwith_client'?implementations?max_message_size?make_transport?handshake_timeout?heartbeat_configwhere_to_connectf=client'?implementations?max_message_size?make_transport?handshake_timeout?heartbeat_configwhere_to_connect>>=?fun(remote_server,t)->try_with(fun()->f~remote_servert)>>=funresult->closet~reason:(Info.of_string"Rpc.Connection.with_client finished")>>|fun()->result;;letwith_client?implementations?max_message_size?make_transport?handshake_timeout?heartbeat_configwhere_to_connectf=with_client'?implementations?max_message_size?make_transport?handshake_timeout?heartbeat_configwhere_to_connect(fun~remote_server:_->f);;end