123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489openCore_kernelopenAsync_kernelmoduleTime_ns=Core_kernel.Core_kernel_private.Time_ns_alternate_sexpmoduleP=ProtocolmoduleReader=Transport.ReadermoduleWriter=Transport.WritermoduleHeader:sigtypet[@@derivingbin_type_class]valv1:tvalnegotiate:us:t->peer:t->intOr_error.tend=structincludeP.Headerletnegotiate=negotiate~allow_legacy_peer:trueletv1=Protocol_version_header.create~protocol:Rpc~supported_versions:[1]endmoduleHandshake_error=structmoduleT=structtypet=|Eof|Transport_closed|Timeout|Reading_header_failedofError.t|Negotiation_failedofError.t|Negotiated_unexpected_versionofint[@@derivingsexp]endincludeTincludeSexpable.To_stringable(T)exceptionHandshake_errorof(t*Info.t)[@@derivingsexp]letto_exn~connection_descriptiont=Handshake_error(t,connection_description)endmoduleHeartbeat_config=structtypet={timeout:Time_ns.Span.t;send_every:Time_ns.Span.t}[@@derivingsexp,bin_io]letcreate~timeout~send_every={timeout;send_every}letdefault={timeout=Time_ns.Span.of_sec30.;send_every=Time_ns.Span.of_sec10.}endtyperesponse_handler=Nat0.tP.Response.t->read_buffer:Bigstring.t->read_buffer_pos_ref:intref->[`keep|`waitofunitDeferred.t|`removeofunitRpc_result.t|`remove_and_waitofunitDeferred.t]typet={description:Info.t;heartbeat_config:Heartbeat_config.t;heartbeat_callbacks:(unit->unit)listref;reader:Reader.t;writer:Writer.t;open_queries:(P.Query_id.t,response_handlersexp_opaque)Hashtbl.t;close_started:Info.tIvar.t;close_finished:unitIvar.t(* There's a circular dependency between connections and their implementation instances
(the latter depends on the connection state, which is given access to the connection
when it is created). *);implementations_instance:Implementations.Instance.tSet_once.t}[@@derivingsexp_of]letdescriptiont=t.descriptionletis_closedt=Ivar.is_fullt.close_startedletwritert=ifis_closedtthenError`ClosedelseOkt.writerletbytes_to_writet=Writer.bytes_to_writet.writerletflushedt=Writer.flushedt.writerlethandle_send_result:t->'aTransport.Send_result.t->'a=funtr->matchrwith|Sentx->x|Closed->(* All of the places we call [handle_send_result] check whether [t] is closed (usually
via the [writer] function above). This checks whether [t.writer] is closed, which
should not happen unless [t] is closed. *)failwiths"RPC connection got closed writer"tsexp_of_t|Message_too_big_->raise_s[%sexp"Message cannot be sent",{reason=(r:_Transport.Send_result.t);connection=(t:t)}]letdispatcht~response_handler~bin_writer_query~query=matchwritertwith|Error`Closedasr->r|Okwriter->Option.iterresponse_handler~f:(funresponse_handler->Hashtbl.sett.open_queries~key:query.P.Query.id~data:response_handler);Writer.send_bin_protwriter(P.Message.bin_writer_needs_length(Writer_with_length.of_writerbin_writer_query))(Queryquery)|>handle_send_resultt;Ok();;letmake_dispatch_bigstringdo_sendt~tag~versionbuf~pos~len~response_handler=matchwritertwith|Error`Closed->Error`Closed|Okwriter->letid=P.Query_id.create()inletheader:Nat0.tP.Message.t=Query{tag;version;id;data=Nat0.of_int_exnlen}inOption.iterresponse_handler~f:(funresponse_handler->Hashtbl.sett.open_queries~key:id~data:response_handler);letresult=do_sendwriterP.Message.bin_writer_nat0_theader~buf~pos~len|>handle_send_resulttinOkresult;;letdispatch_bigstring=make_dispatch_bigstringWriter.send_bin_prot_and_bigstring;;letschedule_dispatch_bigstring=make_dispatch_bigstringWriter.send_bin_prot_and_bigstring_non_copying;;lethandle_responset(response:_P.Response.t)~read_buffer~read_buffer_pos_ref:_Transport.Handler_result.t=matchHashtbl.findt.open_queriesresponse.idwith|None->Stop(Error(Rpc_error.Unknown_query_idresponse.id))|Someresponse_handler->matchresponse_handlerresponse~read_buffer~read_buffer_pos_refwith|`keep->Continue|`waitwait->Waitwait|`remove_and_waitwait->Hashtbl.removet.open_queriesresponse.id;Waitwait|`removeremoval_circumstances->Hashtbl.removet.open_queriesresponse.id;beginmatchremoval_circumstanceswith|Ok()->Continue|Errore->matchewith|Unimplemented_rpc_->Continue|Bin_io_exn_|Connection_closed|Write_error_|Uncaught_exn_|Unknown_query_id_->Stop(Errore)end;;lethandle_msgt(msg:_P.Message.t)~read_buffer~read_buffer_pos_ref:_Transport.Handler_result.t=matchmsgwith|Heartbeat->Continue|Responseresponse->handle_responsetresponse~read_buffer~read_buffer_pos_ref|Queryquery->letinstance=Set_once.get_exnt.implementations_instance[%here]inImplementations.Instance.handle_queryinstance~query~read_buffer~read_buffer_pos_ref;;letclose_reasont~on_close=letreason=Ivar.readt.close_startedinmatchon_closewith|`started->reason|`finished->Ivar.readt.close_finished>>=fun()->reasonletclose_finishedt=Ivar.readt.close_finishedletadd_heartbeat_callbacktf=t.heartbeat_callbacks:=f::!(t.heartbeat_callbacks);;letclose?(streaming_responses_flush_timeout=Time_ns.Span.of_int_sec5)~reasont=ifnot(is_closedt)thenbeginIvar.fillt.close_startedreason;beginmatchSet_once.gett.implementations_instancewith|None->Deferred.unit|Someinstance->letflushed=Implementations.Instance.flushinstanceinifDeferred.is_determinedflushedthenbeginImplementations.Instance.stopinstance;flushedendelsebeginDeferred.any_unit[flushed;Clock_ns.afterstreaming_responses_flush_timeout;Writer.stoppedt.writer]>>|fun()->Implementations.Instance.stopinstanceendend>>>fun()->Writer.closet.writer>>>fun()->Reader.closet.reader>>>fun()->Ivar.fillt.close_finished();end;close_finishedt;;;leton_messaget=letfbuf~pos~len:_:_Transport.Handler_result.t=letpos_ref=refposinletnat0_msg=P.Message.bin_read_nat0_tbuf~pos_refinmatchhandle_msgtnat0_msg~read_buffer:buf~read_buffer_pos_ref:pos_refwith|Continue->Continue|Wait_asres->res|Stopresult->letreason=letmsg="Rpc message handling loop stopped"inmatchresultwith|Ok()->Info.of_stringmsg|Errore->Info.createmsge(Rpc_error.sexp_of_t~get_connection_close_reason:(fun()->[%sexp"Connection.on_message resulted in Connection_closed error. \
This is weird."]))indon't_wait_for(closet~reason);StopreasoninStaged.stagef;;letheartbeatt~last_heartbeat=ifnot(is_closedt)thenbeginletsince_last_heartbeat=Time_ns.diff(Time_ns.now())last_heartbeatinifTime_ns.Span.(>)since_last_heartbeatt.heartbeat_config.timeoutthenbeginletreason()=sprintf!"No heartbeats received for %{sexp:Time_ns.Span.t}."t.heartbeat_config.timeoutindon't_wait_for(closet~reason:(Info.of_thunkreason));endelsebeginWriter.send_bin_prott.writerP.Message.bin_writer_nat0_tHeartbeat|>handle_send_resulttendendletdefault_handshake_timeout=Time_ns.Span.of_sec30.letcleanupt~reasonexn=don't_wait_for(close~reasont);ifnot(Hashtbl.is_emptyt.open_queries)thenbeginleterror=matchexnwith|Rpc_error.Rpc(error,(_:Info.t))->error|exn->Uncaught_exn(Exn.sexp_of_texn)in(* clean up open streaming responses *)(* an unfortunate hack; ok because the response handler will have nothing
to read following a response where [data] is an error *)letdummy_buffer=Bigstring.create1inletdummy_ref=ref0inHashtbl.iterit.open_queries~f:(fun~key:query_id~data:response_handler->ignore(response_handler~read_buffer:dummy_buffer~read_buffer_pos_ref:dummy_ref{id=query_id;data=Errorerror}));Hashtbl.cleart.open_queries;Bigstring.unsafe_destroydummy_buffer;end;;letrun_after_handshaket~implementations~connection_state=letinstance=Implementations.instantiateimplementations~writer:t.writer~connection_description:t.description~connection_close_started:(Ivar.readt.close_started)~connection_state:(connection_statet)inSet_once.set_exnt.implementations_instance[%here]instance;letmonitor=Monitor.create~name:"RPC connection loop"()inletreasonnameexn=(exn,Info.tag(Info.of_exnexn)~tag:("exn raised in RPC connection "^name))inStream.iter(Stream.interleave(Stream.of_list([Stream.map~f:(reason"loop")(Monitor.detach_and_get_error_streammonitor);Stream.map~f:(reason"Writer.t")(Monitor.detach_and_get_error_stream(Writer.monitort.writer))])))~f:(fun(exn,reason)->cleanuptexn~reason);within~monitor(fun()->letlast_heartbeat=ref(Time_ns.now())inevery~stop:(Ivar.readt.close_started>>|fun(_:Info.t)->())t.heartbeat_config.send_every(fun()->(* Make sure not to do this after calling [close] -- this function could be
called between when [t.close_started] is determined and when [stop] is, since
they happen in different Async jobs. *)ifnot(Ivar.is_fullt.close_started)thenheartbeatt~last_heartbeat:!last_heartbeat);Reader.read_forevert.reader~on_message:(Staged.unstage(on_messaget))~on_end_of_batch:(fun()->last_heartbeat:=Time_ns.now();List.iter!(t.heartbeat_callbacks)~f:(funf->f()))>>>function|Okreason->cleanupt~reason(Rpc_error.Rpc(Connection_closed,t.description))(* The protocol is such that right now, the only outcome of the other side closing the
connection normally is that we get an eof. *)|Error(`Eof|`Closed)->cleanupt~reason:(Info.of_string"EOF or connection closed")(Rpc_error.Rpc(Connection_closed,t.description)));;letdo_handshaket~handshake_timeout=ifWriter.is_closedt.writerthenreturn(ErrorHandshake_error.Transport_closed)elsebeginWriter.send_bin_prott.writerHeader.bin_t.writerHeader.v1|>handle_send_resultt;(* If we use [max_connections] in the server, then this read may just hang until the
server starts accepting new connections (which could be never). That is why a
timeout is used *)letresult=Monitor.try_with~run:`Now(fun()->Reader.read_one_message_bin_prott.readerHeader.bin_t.reader)inClock_ns.with_timeouthandshake_timeoutresult>>|function|`Timeout->(* There's a pending read, the reader is basically useless now, so we clean it
up. *)don't_wait_for(closet~reason:(Info.of_string"Handshake timeout"));ErrorHandshake_error.Timeout|`Result(Errorexn)->letreason=Info.of_string"[Reader.read_one_message_bin_prot] raised"indon't_wait_for(closet~reason);Error(Reading_header_failed(Error.of_exnexn))|`Result(Ok(Error`Eof))->ErrorEof|`Result(Ok(Error`Closed))->ErrorTransport_closed|`Result(Ok(Okpeer))->matchHeader.negotiate~us:Header.v1~peerwith|Errore->Error(Negotiation_failede)|Ok1->Ok()|Oki->Error(Negotiated_unexpected_versioni)end;;letcontains_magic_prefix=Protocol_version_header.contains_magic_prefix~protocol:Rpc;;letcreate?implementations~connection_state?(handshake_timeout=default_handshake_timeout)?(heartbeat_config=Heartbeat_config.default)?(description=Info.of_string"<created-directly>")({reader;writer}:Transport.t)=letimplementations=matchimplementationswithNone->Implementations.null()|Somes->sinlett={description;heartbeat_config;heartbeat_callbacks=ref[];reader;writer;open_queries=Hashtbl.Poly.create~size:10();close_started=Ivar.create();close_finished=Ivar.create();implementations_instance=Set_once.create()}inupon(Writer.stoppedwriter)(fun()->don't_wait_for(closet~reason:(Info.of_string"RPC transport stopped")));do_handshaket~handshake_timeout>>|function|Ok()->run_after_handshaket~implementations~connection_state;Okt|Errorerror->Error(Handshake_error.to_exn~connection_description:descriptionerror);;letwith_close?implementations?handshake_timeout?heartbeat_config~connection_statetransport~dispatch_queries~on_handshake_error=lethandle_handshake_error=matchon_handshake_errorwith|`Callf->f|`Raise->raiseincreate?implementations?handshake_timeout?heartbeat_config~connection_statetransport>>=funt->matchtwith|Errore->Transport.closetransport>>=fun()->handle_handshake_errore|Okt->Monitor.protect~finally:(fun()->closet~reason:(Info.of_string"Rpc.Connection.with_close finished"))(fun()->dispatch_queriest>>=funresult->(matchimplementationswith|None->Deferred.unit|Some_->close_finishedt)>>|fun()->result);;letserver_with_close?handshake_timeout?heartbeat_configtransport~implementations~connection_state~on_handshake_error=leton_handshake_error=matchon_handshake_errorwith|`Callf->`Callf|`Raise->`Raise|`Ignore->`Call(fun_->Deferred.unit)inwith_close?handshake_timeout?heartbeat_configtransport~implementations~connection_state~on_handshake_error~dispatch_queries:(fun_->Deferred.unit);;letclose?streaming_responses_flush_timeout?(reason=Info.of_string"Rpc.Connection.close")t=close?streaming_responses_flush_timeout~reasontmoduleClient_implementations=structtypenonrec'st={connection_state:t->'s;implementations:'sImplementations.t}letnull()={connection_state=(fun_->());implementations=Implementations.null()}end