12345678910111213141516171819202122232425262728293031323334openCoreopenImportincludeWriter0letof_pipeinfopipe_w=let%map`Readerreader_fd,`Writerwriter_fd=Unix.pipeinfoinletreader=Reader.createreader_fdinletwriter=createwriter_fdinifDebug.writerthenDebug.log"Writer.of_pipe"(pipe_w,reader,writer)[%sexp_of:stringPipe.Writer.t*Reader.t*t];(* Shuttle bytes from [reader] to [pipe_w]. If the user calls [close writer],
then [reader] will see EOF, which will cause [transfer] to complete. If [pipe_w]
is closed, then [transfer] will complete. *)letclosed_and_flushed_downstream=let%bind()=Reader.transferreaderpipe_winifraise_when_consumer_leaveswriter&¬(is_closedwriter)thenMonitor.send_exn(monitorwriter)(Unix.Unix_error(EPIPE,"Writer.of_pipe",""));let%map(),()=Deferred.both(Reader.closereader)(closewriter)inifnot(Pipe.is_closedpipe_w)thenPipe.closepipe_winwriter,`Closed_and_flushed_downstreamclosed_and_flushed_downstream;;modulePrivate=structletset_bytes_receivedti=t.bytes_received<-iletset_bytes_writtenti=t.bytes_written<-imoduleCheck_buffer_age=Check_buffer_ageend