123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585openCoreopenAsync_kernelmoduleTime_ns=Core_private.Time_ns_alternate_sexpmoduleP=ProtocolmoduleReader=Transport.ReadermoduleWriter=Transport.WritermoduleHeader:sigtypet[@@derivingbin_type_class]valv1:tvalnegotiate:us:t->peer:t->intOr_error.tend=structincludeP.Headerletnegotiate=negotiate~allow_legacy_peer:trueletv1=Protocol_version_header.create_exn~protocol:Rpc~supported_versions:[1]endmoduleHandshake_error=structmoduleT=structtypet=|Eof|Transport_closed|Timeout|Reading_header_failedofError.t|Negotiation_failedofError.t|Negotiated_unexpected_versionofint[@@derivingsexp]endincludeTincludeSexpable.To_stringable(T)exceptionHandshake_errorof(t*Info.t)[@@derivingsexp]letto_exn~connection_descriptiont=Handshake_error(t,connection_description)endmoduleHeartbeat_config=structtypet={timeout:Time_ns.Span.t;send_every:Time_ns.Span.t}[@@derivingsexp,bin_io,fields]letcreate?(timeout=Time_ns.Span.of_sec30.)?(send_every=Time_ns.Span.of_sec10.)()={timeout;send_every};;moduleRuntime=structtypet={mutabletimeout:Time_ns.Span.t;send_every:Time_ns.Span.t}[@@derivingsexp_of]endletto_runtime{timeout;send_every}={Runtime.timeout;send_every}endtyperesponse_handler=Nat0.tP.Response.t->read_buffer:Bigstring.t->read_buffer_pos_ref:intref->[`keep|`waitofunitDeferred.t|`removeofunitRpc_result.t|`remove_and_waitofunitDeferred.t]typet={description:Info.t;heartbeat_config:Heartbeat_config.Runtime.t;mutableheartbeat_callbacks:(unit->unit)array;mutablelast_seen_alive:Time_ns.t;reader:Reader.t;writer:Writer.t;open_queries:(P.Query_id.t,(response_handler[@sexp.opaque]))Hashtbl.t;close_started:Info.tIvar.t;close_finished:unitIvar.t(* There's a circular dependency between connections and their implementation instances
(the latter depends on the connection state, which is given access to the connection
when it is created). *);implementations_instance:Implementations.Instance.tSet_once.t;time_source:Synchronous_time_source.t;heartbeat_event:Synchronous_time_source.Event.tSet_once.t}[@@derivingsexp_of]letsexp_of_t_hum_writert=[%sexp{description:Info.t=t.description;writer:Writer.t=t.writer}];;letdescriptiont=t.descriptionletis_closedt=Ivar.is_fullt.close_startedletwritert=ifis_closedt||not(Writer.can_sendt.writer)thenError`ClosedelseOkt.writer;;letbytes_to_writet=Writer.bytes_to_writet.writerletflushedt=Writer.flushedt.writerlethandle_send_result:t->'aTransport.Send_result.t->'a=funtr->matchrwith|Sentx->x|Closed->(* All of the places we call [handle_send_result] check whether [t] is closed
(usually via the [writer] function above). This checks whether [t.writer] is
closed, which should not happen unless [t] is closed. *)failwiths~here:[%here]"RPC connection got closed writer"tsexp_of_t_hum_writer|Message_too_big_->raise_s[%sexp"Message cannot be sent",{reason=(r:_Transport.Send_result.t);connection=(t:t_hum_writer)}];;letdispatcht~response_handler~bin_writer_query~query=matchwritertwith|Error`Closedasr->r|Okwriter->Option.iterresponse_handler~f:(funresponse_handler->Hashtbl.sett.open_queries~key:query.P.Query.id~data:response_handler);Writer.send_bin_protwriter(P.Message.bin_writer_needs_length(Writer_with_length.of_writerbin_writer_query))(Queryquery)|>handle_send_resultt;Ok();;letmake_dispatch_bigstringdo_sendt~tag~versionbuf~pos~len~response_handler=matchwritertwith|Error`Closed->Error`Closed|Okwriter->letid=P.Query_id.create()inletheader:Nat0.tP.Message.t=Query{tag;version;id;data=Nat0.of_int_exnlen}inOption.iterresponse_handler~f:(funresponse_handler->Hashtbl.sett.open_queries~key:id~data:response_handler);letresult=do_sendwriterP.Message.bin_writer_nat0_theader~buf~pos~len|>handle_send_resulttinOkresult;;letdispatch_bigstring=make_dispatch_bigstringWriter.send_bin_prot_and_bigstringletschedule_dispatch_bigstring=make_dispatch_bigstringWriter.send_bin_prot_and_bigstring_non_copying;;lethandle_responset(response:_P.Response.t)~read_buffer~read_buffer_pos_ref:_Transport.Handler_result.t=matchHashtbl.findt.open_queriesresponse.idwith|None->Stop(Error(Rpc_error.Unknown_query_idresponse.id))|Someresponse_handler->(matchresponse_handlerresponse~read_buffer~read_buffer_pos_refwith|`keep->Continue|`waitwait->Waitwait|`remove_and_waitwait->Hashtbl.removet.open_queriesresponse.id;Waitwait|`removeremoval_circumstances->Hashtbl.removet.open_queriesresponse.id;(matchremoval_circumstanceswith|Ok()->Continue|Errore->(matchewith|Unimplemented_rpc_->Continue|Bin_io_exn_|Connection_closed|Write_error_|Uncaught_exn_|Unknown_query_id_->Stop(Errore))));;lethandle_msgt(msg:_P.Message.t)~read_buffer~read_buffer_pos_ref~close_connection_monitor:_Transport.Handler_result.t=matchmsgwith|Heartbeat->Array.itert.heartbeat_callbacks~f:(funf->f());Continue|Responseresponse->handle_responsetresponse~read_buffer~read_buffer_pos_ref|Queryquery->letinstance=Set_once.get_exnt.implementations_instance[%here]inImplementations.Instance.handle_queryinstance~close_connection_monitor~query~read_buffer~read_buffer_pos_ref;;letclose_reasont~on_close=letreason=Ivar.readt.close_startedinmatchon_closewith|`started->reason|`finished->let%bind()=Ivar.readt.close_finishedinreason;;letclose_finishedt=Ivar.readt.close_finishedletadd_heartbeat_callbacktf=(* Adding heartbeat callbacks is relatively rare, but the callbacks are triggered a lot.
The array representation makes the addition quadratic for the sake of keeping the
triggering cheap. *)t.heartbeat_callbacks<-Array.append[|f|]t.heartbeat_callbacks;;letreset_heartbeat_timeoutttimeout=t.heartbeat_config.timeout<-timeout;t.last_seen_alive<-Synchronous_time_source.nowt.time_source;;letlast_seen_alivet=t.last_seen_aliveletabort_heartbeatingt=Option.iter(Set_once.gett.heartbeat_event)~f:(funevent->matchSynchronous_time_source.Event.abortt.time_sourceeventwith|Ok|Previously_unscheduled->()|Currently_happening->Synchronous_time_source.run_aftert.time_sourceTime_ns.Span.zero(fun()->Synchronous_time_source.Event.abort_exnt.time_sourceevent));;letclose?(streaming_responses_flush_timeout=Time_ns.Span.of_int_sec5)~reasont=ifnot(is_closedt)then(abort_heartbeatingt;Ivar.fillt.close_startedreason;(matchSet_once.gett.implementations_instancewith|None->Deferred.unit|Someinstance->letflushed=Implementations.Instance.flushinstanceinifDeferred.is_determinedflushedthen(Implementations.Instance.stopinstance;flushed)else(let%map()=Deferred.any_unit[flushed;Writer.stoppedt.writer;Time_source.after(Time_source.of_synchronoust.time_source)streaming_responses_flush_timeout]inImplementations.Instance.stopinstance))>>>fun()->Writer.closet.writer>>>fun()->Reader.closet.reader>>>fun()->Ivar.fillt.close_finished());close_finishedt;;leton_messaget~close_connection_monitor=letfbuf~pos~len:_:_Transport.Handler_result.t=letpos_ref=refposinletnat0_msg=P.Message.bin_read_nat0_tbuf~pos_refinmatchhandle_msgtnat0_msg~read_buffer:buf~read_buffer_pos_ref:pos_ref~close_connection_monitorwith|Continue->Continue|Wait_asres->res|Stopresult->letreason=letmsg="Rpc message handling loop stopped"inmatchresultwith|Ok()->Info.of_stringmsg|Errore->Info.createmsge(Rpc_error.sexp_of_t~get_connection_close_reason:(fun()->[%sexp"Connection.on_message resulted in Connection_closed error. This is \
weird."]))indon't_wait_for(closet~reason);StopreasoninStaged.stagef;;letheartbeat_nowt=letsince_last_heartbeat=Time_ns.diff(Synchronous_time_source.nowt.time_source)t.last_seen_aliveinifTime_ns.Span.(>)since_last_heartbeatt.heartbeat_config.timeoutthen(letreason()=sprintf!"No heartbeats received for %{sexp:Time_ns.Span.t}."t.heartbeat_config.timeoutindon't_wait_for(closet~reason:(Info.of_thunkreason)))else(matchwritertwith|Error`Closed->()|Okwriter->Writer.send_bin_protwriterP.Message.bin_writer_nat0_tHeartbeat|>handle_send_resultt);;letdefault_handshake_timeout=Time_ns.Span.of_sec30.letcleanupt~reasonexn=don't_wait_for(close~reasont);ifnot(Hashtbl.is_emptyt.open_queries)then(leterror=matchexnwith|Rpc_error.Rpc(error,(_:Info.t))->error|exn->Uncaught_exn(Exn.sexp_of_texn)in(* clean up open streaming responses *)(* an unfortunate hack; ok because the response handler will have nothing
to read following a response where [data] is an error *)letdummy_buffer=Bigstring.create1inletdummy_ref=ref0inHashtbl.iterit.open_queries~f:(fun~key:query_id~data:response_handler->ignore(response_handler~read_buffer:dummy_buffer~read_buffer_pos_ref:dummy_ref{id=query_id;data=Errorerror}));Hashtbl.cleart.open_queries;Bigstring.unsafe_destroydummy_buffer);;letschedule_heartbeatst=t.last_seen_alive<-Synchronous_time_source.nowt.time_source;letheartbeat_from_now_on=(* [at_intervals] will schedule the first heartbeat the first time the time_source is
advanced *)Synchronous_time_source.Event.at_intervalst.time_sourcet.heartbeat_config.send_every(fun()->heartbeat_nowt)inSet_once.set_exnt.heartbeat_event[%here]heartbeat_from_now_on;;letrun_after_handshaket~implementations~connection_state~writer_monitor_exns=letinstance=Implementations.instantiateimplementations~writer:t.writer~connection_description:t.description~connection_close_started:(Ivar.readt.close_started)~connection_state:(connection_statet)inSet_once.set_exnt.implementations_instance[%here]instance;letclose_connection_monitor=Monitor.create~name:"RPC close connection monitor"()inMonitor.detach_and_iter_errorsclose_connection_monitor~f:(funexn->letreason=Info.create_s[%message"Uncaught exception in implementation"(exn:Exn.t)]indon't_wait_for(close~reasont));letmonitor=Monitor.create~name:"RPC connection loop"()inletreasonnameexn=exn,Info.tag(Info.of_exnexn)~tag:("exn raised in RPC connection "^name)inStream.iter(Stream.interleave(Stream.of_list[Stream.map~f:(reason"loop")(Monitor.detach_and_get_error_streammonitor);Stream.map~f:(reason"Writer.t")writer_monitor_exns]))~f:(fun(exn,reason)->cleanuptexn~reason);within~monitor(fun()->schedule_heartbeatst;Reader.read_forevert.reader~on_message:(Staged.unstage(on_messaget~close_connection_monitor))~on_end_of_batch:(fun()->t.last_seen_alive<-Synchronous_time_source.nowt.time_source)>>>function|Okreason->cleanupt~reason(Rpc_error.Rpc(Connection_closed,t.description))(* The protocol is such that right now, the only outcome of the other side closing the
connection normally is that we get an eof. *)|Error(`Eof|`Closed)->cleanupt~reason:(Info.of_string"EOF or connection closed")(Rpc_error.Rpc(Connection_closed,t.description)));;letdo_handshaket~handshake_timeout=matchwritertwith|Error`Closed->return(ErrorHandshake_error.Transport_closed)|Okwriter->Writer.send_bin_protwriterHeader.bin_t.writerHeader.v1|>handle_send_resultt;(* If we use [max_connections] in the server, then this read may just hang until the
server starts accepting new connections (which could be never). That is why a
timeout is used *)letresult=Monitor.try_with~rest:`Log~run:`Now(fun()->Reader.read_one_message_bin_prott.readerHeader.bin_t.reader)in(match%mapTime_source.with_timeout(Time_source.of_synchronoust.time_source)handshake_timeoutresultwith|`Timeout->(* There's a pending read, the reader is basically useless now, so we clean it
up. *)don't_wait_for(closet~reason:(Info.of_string"Handshake timeout"));ErrorHandshake_error.Timeout|`Result(Errorexn)->letreason=Info.of_string"[Reader.read_one_message_bin_prot] raised"indon't_wait_for(closet~reason);Error(Reading_header_failed(Error.of_exnexn))|`Result(Ok(Error`Eof))->ErrorEof|`Result(Ok(Error`Closed))->ErrorTransport_closed|`Result(Ok(Okpeer))->(matchHeader.negotiate~us:Header.v1~peerwith|Errore->Error(Negotiation_failede)|Ok1->Ok()|Oki->Error(Negotiated_unexpected_versioni)));;letcontains_magic_prefix=Protocol_version_header.contains_magic_prefix~protocol:Rpcletcreate?implementations~connection_state?(handshake_timeout=default_handshake_timeout)?(heartbeat_config=Heartbeat_config.create())?(description=Info.of_string"<created-directly>")?(time_source=Synchronous_time_source.wall_clock())({reader;writer}:Transport.t)=letimplementations=matchimplementationswith|None->Implementations.null()|Somes->sinlett={description;heartbeat_config=Heartbeat_config.to_runtimeheartbeat_config;heartbeat_callbacks=[||];last_seen_alive=Synchronous_time_source.nowtime_source;reader;writer;open_queries=Hashtbl.Poly.create~size:10();close_started=Ivar.create();close_finished=Ivar.create();implementations_instance=Set_once.create();time_source;heartbeat_event=Set_once.create()}inletwriter_monitor_exns=Monitor.detach_and_get_error_stream(Writer.monitorwriter)inupon(Writer.stoppedwriter)(fun()->don't_wait_for(closet~reason:(Info.of_string"RPC transport stopped")));match%mapdo_handshaket~handshake_timeoutwith|Ok()->run_after_handshaket~implementations~connection_state~writer_monitor_exns;Okt|Errorerror->Error(Handshake_error.to_exn~connection_description:descriptionerror);;letwith_close?implementations?handshake_timeout?heartbeat_config?description?time_source~connection_statetransport~dispatch_queries~on_handshake_error=lethandle_handshake_error=matchon_handshake_errorwith|`Callf->f|`Raise->raiseinlet%bindt=create?implementations?handshake_timeout?heartbeat_config?description?time_source~connection_statetransportinmatchtwith|Errore->let%bind()=Transport.closetransportinhandle_handshake_errore|Okt->Monitor.protect~run:`Schedule~rest:`Log~finally:(fun()->closet~reason:(Info.of_string"Rpc.Connection.with_close finished"))(fun()->let%bindresult=dispatch_queriestinlet%map()=matchimplementationswith|None->Deferred.unit|Some_->close_finishedtinresult);;letserver_with_close?handshake_timeout?heartbeat_config?description?time_sourcetransport~implementations~connection_state~on_handshake_error=leton_handshake_error=matchon_handshake_errorwith|`Callf->`Callf|`Raise->`Raise|`Ignore->`Call(fun_->Deferred.unit)inwith_close?handshake_timeout?heartbeat_config?description?time_sourcetransport~implementations~connection_state~on_handshake_error~dispatch_queries:(fun_->Deferred.unit);;letclose?streaming_responses_flush_timeout?(reason=Info.of_string"Rpc.Connection.close")t=close?streaming_responses_flush_timeout~reasont;;moduleClient_implementations=structtypenonrec'st={connection_state:t->'s;implementations:'sImplementations.t}letnull()={connection_state=(fun_->());implementations=Implementations.null()};;end