123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316openCoreopenImportmoduleKernel_transport=Rpc_kernel.TransportmoduleHeader=Kernel_transport.HeadermoduleHandler_result=Kernel_transport.Handler_resultmoduleSend_result=Kernel_transport.Send_resultmoduleWith_limit:sigtype'at=private{t:'a;max_message_size:int}[@@derivingsexp_of]valcreate:'a->max_message_size:int->'atvalmessage_size_ok:_t->payload_len:int->boolvalcheck_message_size:_t->payload_len:int->unitend=structtype'at={t:'a;max_message_size:int}[@@derivingsexp_of]letcreatet~max_message_size=ifmax_message_size<0thenfailwithf"Rpc_transport.With_limit.create got negative max message size: %d"max_message_size();{t;max_message_size};;letmessage_size_okt~payload_len=payload_len>=0&&payload_len<=t.max_message_size;;letcheck_message_sizet~payload_len=ifnot(message_size_okt~payload_len)thenfailwiths~here:[%here]"Rpc_transport: message too small or too big"(`Message_sizepayload_len,`Max_message_sizet.max_message_size)[%sexp_of:[`Message_sizeofint]*[`Max_message_sizeofint]];;endmoduleUnix_reader=structopenWith_limittypet=Reader.tWith_limit.t[@@derivingsexp_of]letcreate~reader~max_message_size=With_limit.createreader~max_message_sizeletcloset=Reader.closet.tletis_closedt=Reader.is_closedt.tletall_unit_then_returnlret_val=matchlwith|[]->returnret_val(* avoid deferred operations in the common case *)|_->let%map()=Deferred.all_unitlinret_val;;letread_forevert~on_message~on_end_of_batch=letfinish_loop~consumed~need~wait_before_reading=on_end_of_batch();all_unit_then_returnwait_before_reading(`Consumed(consumed,`Needneed))inletrecloopbuf~pos~len~consumed~wait_before_reading=iflen<Header.lengththenfinish_loop~consumed~need:Header.length~wait_before_readingelse(letpayload_len=Header.unsafe_get_payload_lengthbuf~posinlettotal_len=Header.length+payload_leninWith_limit.check_message_sizet~payload_len;iflen<total_lenthenfinish_loop~consumed~need:total_len~wait_before_readingelse(letconsumed=consumed+total_leninletresult:_Handler_result.t=on_messagebuf~pos:(pos+Header.length)~len:payload_leninmatchresultwith|Stopx->all_unit_then_returnwait_before_reading(`Stop_consumed(x,consumed))|Continue->loopbuf~pos:(pos+total_len)~len:(len-total_len)~consumed~wait_before_reading|Waitd->letwait_before_reading=ifDeferred.is_determineddthenwait_before_readingelsed::wait_before_readinginloopbuf~pos:(pos+total_len)~len:(len-total_len)~consumed~wait_before_reading))inlethandle_chunkbuf~pos~len=loopbuf~pos~len~consumed:0~wait_before_reading:[]inmatch%mapReader.read_one_chunk_at_a_timet.t~handle_chunkwith|`Eof|`Eof_with_unconsumed_data_->Error`Eof|`Stoppedx->Okx;;endmoduleUnix_writer=structopenWith_limittypet=Writer.tWith_limit.t[@@derivingsexp_of]letcreate~writer~max_message_size=(* Prevent exceptions in the writer when the other side disconnects. Note that "stale
data in buffer" exceptions are not an issue when the consumer leaves, since
[Rpc_kernel.Connection] takes care of closing the transport when the consumer
leaves. *)Writer.set_raise_when_consumer_leaveswriterfalse;With_limit.createwriter~max_message_size;;letcloset=Writer.closet.tletis_closedt=Writer.is_closedt.tletmonitort=Writer.monitort.tletbytes_to_writet=Writer.bytes_to_writet.tletstoppedt=Deferred.any[Writer.close_startedt.t;Writer.consumer_leftt.t]letflushedt=Writer.flushedt.tletready_to_write=flushedletbin_write_payload_lengthbuf~posx=Header.unsafe_set_payload_lengthbuf~posx;pos+Header.length;;letsend_bin_prot_internalt(bin_writer:_Bin_prot.Type_class.writer)x~followup_len:_Send_result.t=ifnot(Writer.is_closedt.t)then(letdata_len=bin_writer.sizexinletpayload_len=data_len+followup_leninifmessage_size_okt~payload_lenthen(Writer.write_bin_prot_no_size_headert.t~size:Header.lengthbin_write_payload_lengthpayload_len;Writer.write_bin_prot_no_size_headert.t~size:data_lenbin_writer.writex;Sent())elseMessage_too_big{size=payload_len;max_message_size=t.max_message_size})elseClosed;;letsend_bin_prottbin_writerx=send_bin_prot_internaltbin_writerx~followup_len:0letsend_bin_prot_and_bigstringtbin_writerx~buf~pos~len:_Send_result.t=matchsend_bin_prot_internaltbin_writerx~followup_len:lenwith|Sent()->Writer.write_bigstringt.tbuf~pos~len;Sent()|error->error;;letsend_bin_prot_and_bigstring_non_copyingtbin_writerx~buf~pos~len:_Send_result.t=matchsend_bin_prot_internaltbin_writerx~followup_len:lenwith|Sent()->Writer.schedule_bigstringt.tbuf~pos~len;Sent(Writer.flushedt.t)|(Closed|Message_too_big_)asr->r;;endletdefault_max_message_size=Lazy.from_fun(fun()->matchSys.getenv"ASYNC_RPC_MAX_MESSAGE_SIZE"with|None->(* unfortunately, copied from reader0.ml *)100*1024*1024|Somemax_message_size->Int.of_stringmax_message_size);;moduleReader=structincludeKernel_transport.Readerletof_reader?(max_message_size=forcedefault_max_message_size)reader=pack(moduleUnix_reader)(Unix_reader.create~reader~max_message_size);;endmoduleWriter=structincludeKernel_transport.Writerletof_writer?(max_message_size=forcedefault_max_message_size)writer=pack(moduleUnix_writer)(Unix_writer.create~writer~max_message_size);;endtypet=Kernel_transport.t={reader:Reader.t;writer:Writer.t}[@@derivingsexp_of]letclose=Kernel_transport.closeletof_reader_writer?max_message_sizereaderwriter={reader=Reader.of_reader?max_message_sizereader;writer=Writer.of_writer?max_message_sizewriter};;letof_fd?buffer_age_limit?reader_buffer_size~max_message_sizefd=of_reader_writer~max_message_size(Async_unix.Reader.create?buf_len:reader_buffer_sizefd)(Async_unix.Writer.create?buffer_age_limitfd);;moduleTcp=structletdefault_transport_makerfd~max_message_size=of_fdfd~max_message_sizeletmake_serve_functcp_creator~where_to_listen?max_connections?backlog?drop_incoming_connections?time_source?(max_message_size=forcedefault_max_message_size)?(make_transport=default_transport_maker)?(auth=fun_->true)?(on_handler_error=`Ignore)handle_transport=tcp_creator?max_connections?max_accepts_per_batch:None?backlog?drop_incoming_connections?socket:None?time_source~on_handler_errorwhere_to_listen(funclient_addrsocket->matchauthclient_addrwith|false->return()|true->lettransport=make_transport~max_message_size(Socket.fdsocket)inlet%bindresult=Monitor.try_with~run:`Schedule~rest:`Raise(fun()->handle_transport~client_addr~server_addr:(Socket.getsocknamesocket)transport)inlet%bind()=closetransportin(matchresultwith|Ok()->return()|Errorexn->raiseexn));;(* eta-expand [where_to_listen] to avoid value restriction. *)letserve~where_to_listen=make_serve_funcTcp.Server.create_sock~where_to_listen(* eta-expand [where_to_listen] to avoid value restriction. *)letserve_inet~where_to_listen=make_serve_funcTcp.Server.create_sock_inet~where_to_listen;;letconnect?(max_message_size=forcedefault_max_message_size)?(make_transport=default_transport_maker)?(tcp_connect_timeout=Async_rpc_kernel.Async_rpc_kernel_private.default_handshake_timeout)where_to_connect=let%bindsock=Monitor.try_with~run:`Schedule~rest:`Log(fun()->Tcp.connect_sock~timeout:(Time_ns.Span.to_span_float_round_nearesttcp_connect_timeout)where_to_connect)inmatchsockwith|Error_aserror->returnerror|Oksock->(matchSocket.getpeernamesockwith|exceptionexn_could_be_raised_if_the_socket_is_diconnected_now->Socket.shutdownsock`Both;don't_wait_for(Unix.close(Socket.fdsock));return(Errorexn_could_be_raised_if_the_socket_is_diconnected_now)|sock_peername->lettransport=make_transport(Socket.fdsock)~max_message_sizeinreturn(Ok(transport,sock_peername)));;end