123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352open!Coreopen!Async_kernelopenAsync_unixmoduleUnix=Core_unixmoduleConfig=struct(* Same as the default value of [buffer_age_limit] for [Async_unix.Writer] *)letdefault_write_timeout=Time_ns.Span.of_min2.letdefault_initial_buffer_size=64*1024letdefault_max_buffer_size=Int.max_valuetypet={initial_buffer_size:int;write_timeout:Time_ns.Span.t;max_buffer_size:int}[@@derivingsexp_of]letvalidatet=ift.initial_buffer_size<=0||Time_ns.Span.(<=)t.write_timeoutTime_ns.Span.zero||t.initial_buffer_size>t.max_buffer_sizethenraise_s[%sexp"Shuttle.Config.validate: invalid config",{t:t}];t;;letcreate?(buf_len=default_initial_buffer_size)?(write_timeout=default_write_timeout)?(max_buffer_size=default_max_buffer_size)()=validate{initial_buffer_size=buf_len;write_timeout;max_buffer_size};;endmoduleFlush_result=structtypet=|Flushed|Remote_closed|Error[@@derivingsexp_of]endtypeflush={pos:Int63.t;ivar:Flush_result.tIvar.t}[@@derivingsexp_of]typeclose_state=|Open|Start_close|Closed[@@derivingsexp_of]typewriter_state=|Active|Stopped|Inactive[@@derivingsexp_of]typet={fd:Fd.t;config:Config.t;buf:Bytebuffer.t;monitor:Monitor.t;flushes:flushQueue.t;mutableclose_state:close_state;close_started:unitIvar.t;close_finished:unitIvar.t;remote_closed:unitIvar.t;mutablewriter_state:writer_state;mutablebytes_written:Int63.t}[@@derivingsexp_of]letcreate?max_buffer_size?buf_len?write_timeoutfd=letconfig=Config.create?max_buffer_size?buf_len?write_timeout()in{fd;config;flushes=Queue.create();writer_state=Inactive;buf=Bytebuffer.create~max_buffer_size:config.max_buffer_sizeconfig.initial_buffer_size;monitor=Monitor.create();close_state=Open;remote_closed=Ivar.create();close_started=Ivar.create();close_finished=Ivar.create();bytes_written=Int63.zero};;letwakeup_flushes_with_errorterror=whilenot(Queue.is_emptyt.flushes)doIvar.fill(Queue.dequeue_exnt.flushes).ivarerrordone;;letstop_writertreason=wakeup_flushes_with_errortreason;t.writer_state<-Stopped;;letmonitort=t.monitorletremote_closedt=Ivar.readt.remote_closedletis_closedt=matcht.close_statewith|Open->false|Closed|Start_close->true;;letclose_startedt=Ivar.readt.close_startedletclose_finishedt=Ivar.readt.close_finishedletis_open=Fn.nonis_closedletflushed_or_failt=ifBytebuffer.lengtht.buf=0thenreturnFlush_result.Flushedelseifis_closedtthenreturnFlush_result.Errorelse(letflush={pos=Int63.(+)t.bytes_written(Int63.of_int(Bytebuffer.lengtht.buf));ivar=Ivar.create()}inQueue.enqueuet.flushesflush;Ivar.readflush.ivar);;letflushedt=match%bindflushed_or_failtwith|Flush_result.Flushed->Deferred.unit|Error|Remote_closed->Deferred.never();;letdequeue_flushest=while(not(Queue.is_emptyt.flushes))&&Int63.(<=)(Queue.peek_exnt.flushes).post.bytes_writtendoIvar.fill(Queue.dequeue_exnt.flushes).ivarFlush_result.Flusheddone;;letwrite_nonblockingt=matchFd.syscall_exn~nonblocking:truet.fd(funfd->Bytebuffer.write_assume_fd_is_nonblockingfdt.buf)with|n->assert(n>=0);`Okn|exceptionUnix.Unix_error((EWOULDBLOCK|EAGAIN|EINTR),_,_)->`Ok0|exceptionUnix.Unix_error((EPIPE|ECONNRESET|EHOSTUNREACH|ENETDOWN|ENETRESET|ENETUNREACH|ETIMEDOUT),_,_)->`Eof|exceptionexn->stop_writertFlush_result.Error;raiseexn;;letcloset=(matcht.close_statewith|Closed|Start_close->()|Open->t.close_state<-Start_close;Ivar.fillt.close_started();Deferred.any_unit[after(Time.Span.of_sec5.);Deferred.ignore_m(flushed_or_failt)]>>>fun()->t.close_state<-Closed;Fd.closet.fd>>>fun()->Ivar.fillt.close_finished());close_finishedt;;moduleSingle_write_result=structtypet=|Continue|Stopendletrecprocess_write_resultt=function|`Eof->Ivar.fillt.remote_closed();stop_writertFlush_result.Remote_closed|`Okn->Bytebuffer.compactt.buf;t.bytes_written<-Int63.(+)t.bytes_written(Int63.of_intn);dequeue_flushest;ifBytebuffer.lengtht.buf<=0thent.writer_state<-Inactiveelsewait_and_write_everythingtandwrite_everythingt=ifBytebuffer.lengtht.buf<=0thent.writer_state<-Inactive;ifFd.supports_nonblockt.fdthenprocess_write_resultt(write_nonblockingt)elseFd.syscall_in_threadt.fd~name:"write"(funfd->Bytebuffer.writefdt.buf)>>>function|`Errorexn->stop_writertFlush_result.Error;Exn.reraiseexn"Error while writing"|`Ok_asres->process_write_resulttres|`Already_closed->process_write_resultt`Eofandwait_and_write_everythingt=Clock_ns.with_timeoutt.config.write_timeout(Fd.ready_tot.fd`Write)>>>funresult->matchresultwith|`Result`Ready->write_everythingt|`Timeout->Log.Global.sexp~level:`Error[%message"Shuttle.Output_channel timed out waiting to write on file descriptor. Closing \
the writer."~timeout:(t.config.write_timeout:Time_ns.Span.t)(t:t)];stop_writertFlush_result.Error|`Result((`Bad_fd|`Closed)asresult)->stop_writertFlush_result.Error;raise_s[%sexp"Shuttle.Output_channel: fd changed",{t:t;ready_to_result=(result:[`Bad_fd|`Closed])}];;letis_writingt=matcht.writer_statewith|Active->true|Inactive->false|Stopped->false;;letschedule_flusht=if(not(is_writingt))&&Bytebuffer.lengtht.buf>0then(t.writer_state<-Active;Scheduler.within~monitor:t.monitor(fun()->write_everythingt));;letflusht=letflush_result=flushedtinschedule_flusht;flush_result;;letflush_or_failt=letflush_result=flushed_or_failtinschedule_flusht;flush_result;;letensure_can_writet=matcht.writer_statewith|Inactive|Active->()|Stopped->raise_s[%sexp"Attempting to write to a closed writer",{t:t}];;letcan_writet=matcht.writer_statewith|Inactive|Active->true|Stopped->false;;letwrite_bigstringt?pos?lenbuf=ensure_can_writet;Bytebuffer.Fill.bigstringt.bufbuf?pos?len;;letschedule_bigstringt?pos?lenbuf=write_bigstringt?pos?lenbufletwritet?pos?lenbuf=ensure_can_writet;Bytebuffer.Fill.stringt.bufbuf?pos?len;;letwrite_stringt?pos?lenbuf=writet?pos?lenbufletwriteftfmt=ksprintf(funstr->writetstr)fmtletwrite_chartch=ensure_can_writet;Bytebuffer.Fill.chart.bufch;;letwrite_from_pipetreader=letfinished=Ivar.create()inletconsumer=(* Add a consumer so the pipe will take the output_channel into account when it checks
if the reader contents have been flushed. *)Pipe.add_consumerreader~downstream_flushed:(fun()->let%map()=flushedtin`Ok)inletrecloop()=ifcan_writet&&is_opentthen((* use [read_now'] as [iter] doesn't allow working on chunks of values at a time. *)matchPipe.read_now'~consumerreaderwith|`Eof->Ivar.fillfinished()|`Nothing_available->Pipe.values_availablereader>>>fun_->loop()|`Okbufs->Queue.iterbufs~f:(funbuf->writetbuf);schedule_flusht;Pipe.Consumer.values_sent_downstreamconsumer;flushedt>>>loop)inloop();choose[choice(Ivar.readfinished)(fun()->`Finished);choice(close_finishedt)(fun()->`Closed)]>>|function|`Finished->()|`Closed->(* Close the pipe (both read and write ends) since the channel is closed. This is
desirable so all future calls to [Pipe.write] fail. *)Pipe.close_readreader;;letpipet=letreader,writer=Pipe.create()indon't_wait_for(write_from_pipetreader);writer;;letopen_file?buf_len?(append=false)filename=letmode=letbase_mode=[`Wronly;`Creat]inifappendthen`Append::base_modeelsebase_modeinlet%mapfd=Async.Unix.openfile~modefilenameincreate?buf_lenfd;;letwith_file?buf_len?appendfilename~f=let%bindt=open_file?buf_len?appendfilenameinMonitor.protect~finally:(fun()->closet)(fun()->ft);;