123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227openCoreopenImportmoduleAsync_reader=ReadermoduleAsync_writer=WritermoduleKernel_transport=Rpc_kernel.TransportmoduleHeader=Kernel_transport.HeadermoduleHandler_result=Kernel_transport.Handler_resultmoduleSend_result=Kernel_transport.Send_resultmoduleWith_limit:sigtype'at=private{t:'a;max_message_size:int}[@@derivingsexp_of]valcreate:'a->max_message_size:int->'atvalmessage_size_ok:_t->payload_len:int->boolvalcheck_message_size:_t->payload_len:int->unitend=structtype'at={t:'a;max_message_size:int}[@@derivingsexp_of]letcreatet~max_message_size=ifmax_message_size<0thenfailwithf"Rpc_transport.With_limit.create got negative max message size: %d"max_message_size();{t;max_message_size};;letmessage_size_okt~payload_len=payload_len>=0&&payload_len<=t.max_message_size;;letcheck_message_sizet~payload_len=ifnot(message_size_okt~payload_len)thenfailwiths"Rpc_transport: message too small or too big"(`Message_sizepayload_len,`Max_message_sizet.max_message_size)[%sexp_of:[`Message_sizeofint]*[`Max_message_sizeofint]];;endmoduleUnix_reader=structopenWith_limittypet=Reader.tWith_limit.t[@@derivingsexp_of]letcreate~reader~max_message_size=With_limit.createreader~max_message_sizeletcloset=Reader.closet.tletis_closedt=Reader.is_closedt.tletall_unit_then_returnlret_val=matchlwith|[]->returnret_val(* avoid deferred operations in the common case *)|_->Deferred.all_unitl>>|fun()->ret_val;;letread_forevert~on_message~on_end_of_batch=letfinish_loop~consumed~need~wait_before_reading=on_end_of_batch();all_unit_then_returnwait_before_reading(`Consumed(consumed,`Needneed))inletrecloopbuf~pos~len~consumed~wait_before_reading=iflen<Header.lengththenfinish_loop~consumed~need:Header.length~wait_before_readingelse(letpayload_len=Header.unsafe_get_payload_lengthbuf~posinlettotal_len=Header.length+payload_leninWith_limit.check_message_sizet~payload_len;iflen<total_lenthenfinish_loop~consumed~need:total_len~wait_before_readingelse(letconsumed=consumed+total_leninletresult:_Handler_result.t=on_messagebuf~pos:(pos+Header.length)~len:payload_leninmatchresultwith|Stopx->all_unit_then_returnwait_before_reading(`Stop_consumed(x,consumed))|Continue->loopbuf~pos:(pos+total_len)~len:(len-total_len)~consumed~wait_before_reading|Waitd->letwait_before_reading=ifDeferred.is_determineddthenwait_before_readingelsed::wait_before_readinginloopbuf~pos:(pos+total_len)~len:(len-total_len)~consumed~wait_before_reading))inlethandle_chunkbuf~pos~len=loopbuf~pos~len~consumed:0~wait_before_reading:[]inReader.read_one_chunk_at_a_timet.t~handle_chunk>>|function|`Eof|`Eof_with_unconsumed_data_->Error`Eof|`Stoppedx->Okx;;endmoduleUnix_writer=structopenWith_limittypet=Writer.tWith_limit.t[@@derivingsexp_of]letcreate~writer~max_message_size=(* Prevent exceptions in the writer when the other side disconnects. Note that "stale
data in buffer" exceptions are not an issue when the consumer leaves, since
[Rpc_kernel.Connection] takes care of closing the transport when the consumer
leaves. *)Writer.set_raise_when_consumer_leaveswriterfalse;With_limit.createwriter~max_message_size;;letcloset=Writer.closet.tletis_closedt=Writer.is_closedt.tletmonitort=Writer.monitort.tletbytes_to_writet=Writer.bytes_to_writet.tletstoppedt=Deferred.any[Writer.close_startedt.t;Writer.consumer_leftt.t]letflushedt=Writer.flushedt.tletready_to_write=flushedletbin_write_payload_lengthbuf~posx=Header.unsafe_set_payload_lengthbuf~posx;pos+Header.length;;letsend_bin_prot_internalt(bin_writer:_Bin_prot.Type_class.writer)x~followup_len:_Send_result.t=ifnot(Writer.is_closedt.t)then(letdata_len=bin_writer.sizexinletpayload_len=data_len+followup_leninifmessage_size_okt~payload_lenthen(Writer.write_bin_prot_no_size_headert.t~size:Header.lengthbin_write_payload_lengthpayload_len;Writer.write_bin_prot_no_size_headert.t~size:data_lenbin_writer.writex;Sent())elseMessage_too_big{size=payload_len;max_message_size=t.max_message_size})elseClosed;;letsend_bin_prottbin_writerx=send_bin_prot_internaltbin_writerx~followup_len:0;;letsend_bin_prot_and_bigstringtbin_writerx~buf~pos~len:_Send_result.t=matchsend_bin_prot_internaltbin_writerx~followup_len:lenwith|Sent()->Writer.write_bigstringt.tbuf~pos~len;Sent()|error->error;;letsend_bin_prot_and_bigstring_non_copyingtbin_writerx~buf~pos~len:_Send_result.t=matchsend_bin_prot_internaltbin_writerx~followup_len:lenwith|Sent()->Writer.schedule_bigstringt.tbuf~pos~len;Sent(Writer.flushedt.t)|(Closed|Message_too_big_)asr->r;;endmoduleReader=structincludeKernel_transport.Readerletof_reader~max_message_sizereader=pack(moduleUnix_reader)(Unix_reader.create~reader~max_message_size);;endmoduleWriter=structincludeKernel_transport.Writerletof_writer~max_message_sizewriter=pack(moduleUnix_writer)(Unix_writer.create~writer~max_message_size);;endtypet=Kernel_transport.t={reader:Reader.t;writer:Writer.t}[@@derivingsexp_of]letclose=Kernel_transport.closeletof_reader_writer~max_message_sizereaderwriter={reader=Reader.of_readerreader~max_message_size;writer=Writer.of_writerwriter~max_message_size};;letof_fd?buffer_age_limit?reader_buffer_size~max_message_sizefd=of_reader_writer~max_message_size(Async_unix.Reader.create?buf_len:reader_buffer_sizefd)(Async_unix.Writer.create?buffer_age_limitfd);;