123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152openCoreopenAsync_kernelmoduleHeader=structletlength=8letunsafe_get_payload_lengthbuf~pos=Bigstring.unsafe_get_int64_le_exnbuf~posletunsafe_set_payload_lengthbuf~pospayload_len=Bigstring.unsafe_set_int64_lebuf~pospayload_len;;endmoduleHandler_result=Transport_intf.Handler_resultmoduleReader=structmoduletypeS=Transport_intf.Readertypet=T:(moduleSwithtypet='a)*'a->tletpackmt=T(m,t)(* We put type annotations to be sure the type is not a function type, i.e. to avoid
creating closures *)letsexp_of_t(T((moduleM),t)):Sexp.t=M.sexp_of_ttletclose(T((moduleM),t)):unitDeferred.t=M.closetletis_closed(T((moduleM),t)):bool=M.is_closedtletread_forever(T((moduleM),t))~on_message~on_end_of_batch:_Deferred.t=M.read_forevert~on_message~on_end_of_batch;;letread_one_message_bin_prott(bin_reader:_Bin_prot.Type_class.reader)=read_forevert~on_message:(funbuf~pos~len->letpos_ref=refposinletx=bin_reader.readbuf~pos_refinif!pos_ref<>pos+lenthenfailwithf"message length (%d) did not match expected length (%d)"(!pos_ref-pos)len()elseStopx)~on_end_of_batch:ignore;;endmoduleSend_result=Transport_intf.Send_resultmoduleWriter=structmoduletypeS=Transport_intf.Writertype'awriter={impl:(moduleSwithtypet='a);t:'a(* We cache the result of [stopped] because it is often the [Deferred.any] of several
other deferreds and we want [can_send] to be simple. *);stopped:unitDeferred.t}typet=T:'awriter->tletpack(typea)(moduleM:Swithtypet=a)t=T{impl=(moduleM);t;stopped=M.stoppedt};;letsexp_of_t(T{impl=(moduleM);t;_}):Sexp.t=M.sexp_of_ttletclose(T{impl=(moduleM);t;_}):unitDeferred.t=M.closetletis_closed(T{impl=(moduleM);t;_}):bool=M.is_closedtletmonitor(T{impl=(moduleM);t;_}):Monitor.t=M.monitortletbytes_to_write(T{impl=(moduleM);t;_}):int=M.bytes_to_writetletflushed(T{impl=(moduleM);t;_}):unitDeferred.t=M.flushedtletready_to_write(T{impl=(moduleM);t;_}):unitDeferred.t=M.ready_to_writet;;letsend_bin_prot(T{impl=(moduleM);t;_})bin_writerx:_Send_result.t=M.send_bin_prottbin_writerx;;letsend_bin_prot_and_bigstring(T{impl=(moduleM);t;_})bin_writerx~buf~pos~len:_Send_result.t=M.send_bin_prot_and_bigstringtbin_writerx~buf~pos~len;;letsend_bin_prot_and_bigstring_non_copying(T{impl=(moduleM);t;_})bin_writerx~buf~pos~len:_Send_result.t=M.send_bin_prot_and_bigstring_non_copyingtbin_writerx~buf~pos~len;;letstopped(T{stopped;_})=stoppedletcan_send(T{impl=(moduleM);t;stopped})=not(M.is_closedt||Deferred.is_determinedstopped);;lettransfert?(max_num_values_per_read=1_000)pipef=letconsumer=Pipe.add_consumerpipe~downstream_flushed:(fun()->let%map()=flushedtin`Ok)inletend_of_pipe=Deferred.create(funivar->letreciter()=ifcan_sendtthen(matchPipe.read_now'pipe~consumer~max_queue_length:max_num_values_per_readwith|`Okq->Queue.iterq~f;Pipe.Consumer.values_sent_downstreamconsumer;ready_to_writet>>>iter|`Nothing_available->Pipe.values_availablepipe>>>fun(`Ok|`Eof)->iter()|`Eof->Ivar.fillivar())initer())inlet%map()=Deferred.any[end_of_pipe;stoppedt]inPipe.close_readpipe;;endtypet={reader:Reader.t;writer:Writer.t}[@@derivingsexp_of]letcloset=let%bind()=Writer.closet.writerinReader.closet.reader;;