123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350openCoreopenImportmoduleTransport=Rpc_transportmoduleLow_latency_transport=Rpc_transport_low_latencymoduleAny=Rpc_kernel.AnymoduleDescription=Rpc_kernel.DescriptionmoduleOn_exception=Rpc_kernel.On_exceptionmoduleImplementation=Rpc_kernel.ImplementationmoduleImplementations=Rpc_kernel.ImplementationsmoduleOne_way=Rpc_kernel.One_waymodulePipe_rpc=Rpc_kernel.Pipe_rpcmoduleRpc=Rpc_kernel.RpcmoduleState_rpc=Rpc_kernel.State_rpcmodulePipe_close_reason=Rpc_kernel.Pipe_close_reasonmoduleConnection=structincludeRpc_kernel.Connectionletcreate?implementations~connection_state?max_message_size?handshake_timeout?heartbeat_config?descriptionreaderwriter=create?implementations~connection_state?handshake_timeout:(Option.maphandshake_timeout~f:Time_ns.Span.of_span_float_round_nearest)?heartbeat_config?description(Transport.of_reader_writerreaderwriter?max_message_size);;letcontains_magic_prefixreader=match%mapDeferred.Or_error.try_with~run:`Schedule~rest:`Log(fun()->Reader.peek_bin_protreadercontains_magic_prefix)with|Error_|Ok`Eof->false|Ok(`Okb)->b;;letwith_close?implementations?max_message_size?handshake_timeout?heartbeat_config?description~connection_statereaderwriter~dispatch_queries~on_handshake_error=with_close?implementations?handshake_timeout:(Option.maphandshake_timeout~f:Time_ns.Span.of_span_float_round_nearest)?heartbeat_config?description~connection_state(Transport.of_reader_writerreaderwriter?max_message_size)~dispatch_queries~on_handshake_error;;letserver_with_close?max_message_size?handshake_timeout?heartbeat_config?descriptionreaderwriter~implementations~connection_state~on_handshake_error=server_with_close?handshake_timeout:(Option.maphandshake_timeout~f:Time_ns.Span.of_span_float_round_nearest)?heartbeat_config?description(Transport.of_reader_writerreaderwriter?max_message_size)~implementations~connection_state~on_handshake_error;;letcollect_errors(transport:Transport.t)~f=letmonitor=Transport.Writer.monitortransport.writerin(* don't propagate errors up, we handle them here *)ignore(Monitor.detach_and_get_error_streammonitor);choose[choice(Monitor.get_next_errormonitor)(fune->Errore);choice(Monitor.try_with~run:`Schedule~rest:`Log~name:"Rpc.Connection.collect_errors"f)Fn.id];;typetransport_maker=Fd.t->max_message_size:int->Transport.ttypeon_handshake_error=[`Raise|`Ignore|`CallofExn.t->unit]letserve_with_transport~handshake_timeout~heartbeat_config~implementations~description~connection_state~on_handshake_errortransport=let%bindres=collect_errorstransport~f:(fun()->match%bindRpc_kernel.Connection.create?handshake_timeout:(Option.maphandshake_timeout~f:Time_ns.Span.of_span_float_round_nearest)?heartbeat_config~implementations~description~connection_statetransportwith|Okt->close_finishedt|Errorhandshake_error->(matchon_handshake_errorwith|`Callf->fhandshake_error|`Raise->raisehandshake_error|`Ignore->());Deferred.unit)inlet%map()=Transport.closetransportinResult.ok_exnres;;letmake_serve_funcserve_with_transport_handler~implementations~initial_connection_state~where_to_listen?max_connections?backlog?drop_incoming_connections?time_source?max_message_size?make_transport?handshake_timeout?heartbeat_config?auth?(on_handshake_error=`Ignore)?on_handler_error()=serve_with_transport_handler~where_to_listen?max_connections?backlog?drop_incoming_connections?time_source?max_message_size?make_transport?auth?on_handler_error(fun~client_addr~server_addrtransport->letdescription=letserver_addr=(server_addr:>Socket.Address.t)inletclient_addr=(client_addr:>Socket.Address.t)inInfo.create_s[%message"TCP server"(server_addr:Socket.Address.t)(client_addr:Socket.Address.t)]inserve_with_transport~handshake_timeout~heartbeat_config~implementations~description~connection_state:(funconn->initial_connection_stateclient_addrconn)~on_handshake_errortransport);;(* eta-expand [implementations] to avoid value restriction. *)letserve~implementations=make_serve_funcRpc_transport.Tcp.serve~implementations(* eta-expand [implementations] to avoid value restriction. *)letserve_inet~implementations=make_serve_funcRpc_transport.Tcp.serve_inet~implementations;;letdefault_handshake_timeout_float=Time_ns.Span.to_span_float_round_nearestAsync_rpc_kernel.Async_rpc_kernel_private.default_handshake_timeout;;letclient'?implementations?max_message_size?make_transport?handshake_timeout:(handshake_timeout_float=default_handshake_timeout_float)?heartbeat_config?descriptionwhere_to_connect=lethandshake_timeout=Time_ns.Span.of_span_float_round_nearesthandshake_timeout_floatinletfinish_handshake_by=Time_ns.add(Time_ns.now())handshake_timeoutinmatch%bindRpc_transport.Tcp.connect?max_message_size?make_transport~tcp_connect_timeout:handshake_timeoutwhere_to_connectwith|Error_aserror->returnerror|Ok(transport,sock_peername)->letdescription=matchdescriptionwith|None->Info.create"Client connected via TCP"where_to_connect[%sexp_of:_Tcp.Where_to_connect.t]|Somedesc->Info.tag_argdesc"via TCP"where_to_connect[%sexp_of:_Tcp.Where_to_connect.t]inlethandshake_timeout=Time_ns.difffinish_handshake_by(Time_ns.now())inlet%bindrpc_connection=matchimplementationswith|None->let{Client_implementations.connection_state;implementations}=Client_implementations.null()inRpc_kernel.Connection.createtransport~handshake_timeout?heartbeat_config~implementations~description~connection_state|Some{Client_implementations.connection_state;implementations}->Rpc_kernel.Connection.createtransport~handshake_timeout?heartbeat_config~implementations~description~connection_statein(matchrpc_connectionwith|Okt->return(Ok(sock_peername,t))|Error_aserror->let%bind()=Transport.closetransportinreturnerror);;letclient?implementations?max_message_size?make_transport?handshake_timeout?heartbeat_config?descriptionwhere_to_connect=client'?implementations?max_message_size?make_transport?handshake_timeout?heartbeat_config?descriptionwhere_to_connect>>|?snd;;letwith_client'?implementations?max_message_size?make_transport?handshake_timeout?heartbeat_config?descriptionwhere_to_connectf=client'?implementations?max_message_size?make_transport?handshake_timeout?heartbeat_config?descriptionwhere_to_connect>>=?fun(remote_server,t)->let%bindresult=Monitor.try_with~run:`Schedule~rest:`Log(fun()->f~remote_servert)inlet%map()=closet~reason:(Info.of_string"Rpc.Connection.with_client finished")inresult;;letwith_client?implementations?max_message_size?make_transport?handshake_timeout?heartbeat_configwhere_to_connectf=with_client'?implementations?max_message_size?make_transport?handshake_timeout?heartbeat_configwhere_to_connect(fun~remote_server:_->f);;end