123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322open!Coreopen!Async_kernelopenAsync_unixmoduleUnix=Core_unixopen!Async_kernel_require_explicit_time_sourcemoduleLogger=Log.Make_global()moduleFlush_result=structtypet=|Flushed|Remote_closed|Error[@@derivingsexp_of]endtypeflush={pos:Int63.t;ivar:Flush_result.tIvar.t}[@@derivingsexp_of]typeclose_state=|Open|Start_close|Closed[@@derivingsexp_of]typewriter_state=|Active|Stopped|Inactive[@@derivingsexp_of]typet={fd:Fd.t;write_timeout:Time_ns.Span.t;buf:Bytebuffer.t;monitor:Monitor.t;flushes:flushQueue.t;mutableclose_state:close_state;close_started:unitIvar.t;close_finished:unitIvar.t;remote_closed:unitIvar.t;mutablewriter_state:writer_state;mutablebytes_written:Int63.t;time_source:Time_source.t}[@@derivingsexp_of]letdefault_write_timeout=Time_ns.Span.of_min2.letcreate?max_buffer_size?buf_len?write_timeout?time_sourcefd=Fd.with_file_descr_exnfdignore~nonblocking:true;letbuf_len=matchbuf_lenwith|None->64*1024|Somev->vinifbuf_len<=0thenraise_s[%sexp"Buffer size must be greater than 0"];lettime_source=matchtime_sourcewith|None->Time_source.wall_clock()|Somet->Time_source.read_onlytinletwrite_timeout=matchwrite_timeoutwith|Somev->v|None->default_write_timeoutinifTime_ns.Span.(<=)write_timeoutTime_ns.Span.zerothenraise_s[%message"Write timeout cannot be less than zero"];{fd;flushes=Queue.create();write_timeout;writer_state=Inactive;buf=Bytebuffer.create?max_buffer_sizebuf_len;monitor=Monitor.create();close_state=Open;remote_closed=Ivar.create();close_started=Ivar.create();close_finished=Ivar.create();bytes_written=Int63.zero;time_source};;letwakeup_flushes_with_errorterror=whilenot(Queue.is_emptyt.flushes)doIvar.fill(Queue.dequeue_exnt.flushes).ivarerrordone;;letis_closedt=matcht.close_statewith|Open->false|Closed|Start_close->true;;letflushed_or_failt=ifBytebuffer.lengtht.buf=0thenreturnFlush_result.Flushedelseifis_closedtthenreturnFlush_result.Errorelse(letflush={pos=Int63.(+)t.bytes_written(Int63.of_int(Bytebuffer.lengtht.buf));ivar=Ivar.create()}inQueue.enqueuet.flushesflush;Ivar.readflush.ivar);;letflushedt=match%bindflushed_or_failtwith|Flush_result.Flushed->Deferred.unit|Error|Remote_closed->Deferred.never();;letclose_startedt=Ivar.readt.close_startedletclose_finishedt=Ivar.readt.close_finishedletclose't=matcht.close_statewith|Closed|Start_close->()|Open->t.close_state<-Start_close;Ivar.fillt.close_started();Deferred.any_unit[Time_source.aftert.time_source(Time_ns.Span.of_sec5.);Deferred.ignore_m(flushed_or_failt)]>>>fun()->t.close_state<-Closed;Fd.closet.fd>>>fun()->Ivar.fillt.close_finished();;letcloset=close't;close_finishedt;;letstop_writertreason=wakeup_flushes_with_errortreason;t.writer_state<-Stopped;close't;;letmonitort=t.monitorletremote_closedt=Ivar.readt.remote_closedletis_open=Fn.nonis_closedletdequeue_flushest=while(not(Queue.is_emptyt.flushes))&&Int63.(<=)(Queue.peek_exnt.flushes).post.bytes_writtendoIvar.fill(Queue.dequeue_exnt.flushes).ivarFlush_result.Flusheddone;;letwrite_nonblockingt=matchBytebuffer.write_assume_fd_is_nonblockingt.buf(Fd.file_descr_exnt.fd)with|n->assert(n>=0);`Okn|exceptionUnix.Unix_error((EWOULDBLOCK|EAGAIN|EINTR),_,_)->`Poll_again|exceptionUnix.Unix_error((EPIPE|ECONNRESET|EHOSTUNREACH|ENETDOWN|ENETRESET|ENETUNREACH|ETIMEDOUT),_,_)->`Eof|exceptionexn->stop_writertFlush_result.Error;raiseexn;;letrecwrite_everythingt=ifBytebuffer.lengtht.buf<=0thent.writer_state<-Inactiveelse(matchwrite_nonblockingtwith|`Eof->Ivar.fillt.remote_closed();stop_writertFlush_result.Remote_closed|`Poll_again->wait_and_write_everythingt|`Okn->Bytebuffer.compactt.buf;t.bytes_written<-Int63.(+)t.bytes_written(Int63.of_intn);dequeue_flushest;ifBytebuffer.lengtht.buf<=0thent.writer_state<-Inactiveelsewait_and_write_everythingt)andwait_and_write_everythingt=Time_source.with_timeoutt.time_sourcet.write_timeout(Fd.ready_tot.fd`Write)>>>funresult->matchresultwith|`Result`Ready->write_everythingt|`Timeout->Logger.sexp~level:`Error[%message"Shuttle.Output_channel timed out waiting to write on file descriptor. Closing \
the writer."~timeout:(t.write_timeout:Time_ns.Span.t)(t:t)];stop_writertFlush_result.Error|`Result((`Bad_fd|`Closed)asresult)->Logger.sexp~level:`Error[%sexp"Shuttle.Output_channel: fd changed",{t:t;ready_to_result=(result:[`Bad_fd|`Closed])}];stop_writertFlush_result.Error;;letis_writingt=matcht.writer_statewith|Active->true|Inactive->false|Stopped->false;;letschedule_flusht=if(not(is_writingt))&&Bytebuffer.lengtht.buf>0then(t.writer_state<-Active;Scheduler.within~monitor:t.monitor(fun()->write_everythingt));;letflusht=letflush_result=flushedtinschedule_flusht;flush_result;;letflush_or_failt=letflush_result=flushed_or_failtinschedule_flusht;flush_result;;letensure_can_writet=matcht.writer_statewith|Inactive|Active->()|Stopped->raise_s[%sexp"Attempting to write to a closed writer",{t:t}];;letcan_writet=matcht.writer_statewith|Inactive|Active->true|Stopped->false;;letwrite_bigstringt?pos?lenbuf=ensure_can_writet;Bytebuffer.add_bigstringt.bufbuf?pos?len;;letschedule_bigstringt?pos?lenbuf=write_bigstringt?pos?lenbufletwritet?pos?lenbuf=ensure_can_writet;Bytebuffer.add_stringt.bufbuf?pos?len;;letwrite_stringt?pos?lenbuf=writet?pos?lenbufletwriteftfmt=ksprintf(funstr->writetstr)fmtletwrite_chartch=ensure_can_writet;Bytebuffer.add_chart.bufch;;letwrite_from_pipetreader=letfinished=Ivar.create()inletconsumer=(* Add a consumer so the pipe will take the output_channel into account when it checks
if the reader contents have been flushed. *)Pipe.add_consumerreader~downstream_flushed:(fun()->let%map()=flushedtin`Ok)inletrecloop()=ifcan_writet&&is_opent&¬(Ivar.is_fullt.remote_closed)then((* use [read_now'] as [iter] doesn't allow working on chunks of values at a time. *)matchPipe.read_now'~consumerreaderwith|`Eof->Ivar.fillfinished()|`Nothing_available->Pipe.values_availablereader>>>fun_->loop()|`Okbufs->Queue.iterbufs~f:(funbuf->writetbuf);schedule_flusht;Pipe.Consumer.values_sent_downstreamconsumer;flushedt>>>loop)inloop();choose[choice(Ivar.readfinished)(fun()->`Finished);choice(close_finishedt)(fun()->`Closed);choice(remote_closedt)(fun()->`Remote_closed)]>>|function|`Finished->()|`Closed|`Remote_closed->(* Close the pipe (both read and write ends) since the channel is closed. This is
desirable so all future calls to [Pipe.write] fail. *)Pipe.close_readreader;;letpipet=letreader,writer=Pipe.create()indon't_wait_for(write_from_pipetreader);writer;;