openCoreopenImportopenRequire_explicit_time_sourcemoduleCore_unix=Core.UnixmoduleUnix=Unix_syscallsmoduleIOVec=Core.Unix.IOVecmoduleId=Unique_id.Int63()letio_stats=Io_stats.create()letdebug=Debug.writermoduleLine_ending=structtypet=|Dos|Unix[@@derivingsexp_of]endmoduleCheck_buffer_age'=structtype'at={writer:'a;maximum_age:Time_ns.Span.t;mutablebytes_received_at_now_minus_maximum_age:Int63.t;(* The 2 following queues hold the not-yet-written bytes received by the writer in
the last [maximum_age] period of time, with the time they were received at.
[Queue.length bytes_received_queue = Queue.length times_received_queue]. *)bytes_received_queue:Int63.tQueue.t;times_received_queue:Time_ns.tQueue.t;(* Number of bytes "seen" by the checker. [t.writer.bytes_received - t.bytes_seen]
represents the number of bytes received by the writer since the last time the
checker ran. *)mutablebytes_seen:Int63.t;(* The buffer-age check is responsible for filling in [too_old] if it detects an age
violation. *)mutabletoo_old:unitIvar.t;(* The buffer-age checks are stored in one of these data structures per time source,
and we keep a reference to our parent one in this [t] so we can easily remove
ourselves from it when closing the writer. *)for_this_time_source:'aper_time_source}and'aper_time_source={active_checks:('at[@sexp.opaque])Bag.t;closed:unitIvar.t}[@@derivingfields,sexp_of]endmoduleOpen_flags=Unix.Open_flagstypeopen_flags=(Open_flags.t,exn)Result.t[@@derivingsexp_of]moduleBacking_out_channel=Backing_out_channelmoduleDestroy_or_keep=structtypet=|Destroy|Keep[@@derivingsexp_of]endmoduleScheduled=structtypet=(Bigstring.tIOVec.t*Destroy_or_keep.t)Deque.tletlength(t:t)=Deque.foldt~init:0~f:(funn(iovec,_)->n+iovec.len)endtypet={id:Id.t;mutablefd:Fd.t;(* The writer uses a background job to flush data. The job runs within
[inner_monitor], which has a handler that wraps all errors to include [sexp_of_t
t], and sends them to [monitor]. *)monitor:Monitor.t;inner_monitor:Monitor.t;mutablebackground_writer_state:[`Running|`Not_running|`Stopped_permanently];background_writer_stopped:unitIvar.t;(* [syscall] determines the batching approach that the writer uses to batch data
together and flush it using the underlying write syscall. *)syscall:[`Per_cycle|`PeriodicofTime.Span.t];(* Counts since the writer was created. *)mutablebytes_received:Int63.t;mutablebytes_written:Int63.t;(* Bytes that we have received but not yet written are stored in two places:
[scheduled] and [buf]. The bytes that we need to write are the concatenation of
the sequence of iovecs in [scheduled] followed by the bytes in [buf] from
[scheduled_back] to [back]. Note that iovecs in [scheduled] can point to regions
in [buf], even the current [buf] in the writer. *)(* [scheduled] holds iovecs that we plan to write. *)scheduled:Scheduled.t;(* [scheduled_bytes] is the sum of the lengths of the iovecs in[scheduled] *)mutablescheduled_bytes:int;(* [buf] has three regions:
[0, scheduled_back) received and scheduled
[scheduled_back, back) received but not scheduled
[back, Bigstring.length buf) free space*)mutablebuf:Bigstring.t;mutablescheduled_back:int;mutableback:int;time_source:Time_source.t;flushes:(Time_ns.tIvar.t*Int63.t)Queue.t;(* [closed_state] tracks the state of the writer as it is being closed. Initially,
[closed_state] is [`Open]. When [close] is called, [closed_state] transitions to
[`Closed_and_flushing]. Once the writer is flushed and we're actually going to
close [fd], it transitions to[`Closed].
The distinction between [`Closed] and [`Closed_and_flushing] is necessary because
we want to allow [write]s to happen while [`Closed_and_flushing], but not when
[`Closed]. This is necessary to allow upstream producers to flush their data to
the writer when it is closed. *)mutableclose_state:[`Open|`Closed_and_flushing|`Closed];(* [close_finished] is filled when the close() system call on [fd] finishes. *)close_finished:unitIvar.t;(* [close_started] is filled when [close] is called. *)close_started:unitIvar.t;(* [producers_to_flush_at_close] holds all upstream producers feeding data to this
writer, and thus should be flushed when we close this writer, before flushing
the writer itself. *)producers_to_flush_at_close:(unit->unitDeferred.t)Bag.t;(* [flush_at_shutdown_elt] holds the element in [writers_to_flush_at_shutdown] for
this writer. Being in that bag is what causes this writer to be automatically
closed when [shutdown] is called, and for shutdown to wait for the close to finish.
[flush_at_shutdown_elt] is [Some] for the lifetime of the writer, until the
close finishes, at which point it transitions to[None]. *)mutableflush_at_shutdown_elt:tBag.Elt.toption;mutablecheck_buffer_age:tCheck_buffer_age'.tBag.Elt.toption;(* The "consumer" of a writer is whomever is reading the bytes that the writer
is writing. E.g. if the writer's file descriptor is a socket, then it is whomever
is on the other side of the socket connection. If the consumer leaves, Unix will
indicate this by returning EPIPE or ECONNRESET to a write() syscall. We keep
track of this with the [consumer_left] ivar, which is exposed in writer.mli.
We also allow the user to configure what action the writer takes when the
consumer leaves. By default, it raises, but that can be disabled. *)consumer_left:unitIvar.t;mutableraise_when_consumer_leaves:bool(* default is [true] *);(* [open_flags] is the open-file-descriptor bits of [fd]. It is created when [t] is
created, and starts a deferred computation that calls [Unix.fcntl_getfl].
[open_flags] is used to report an error when [fd] is not writable. [Fd] treats the
call to [fcntl_getfl] as an active system call, which prevents [Unix.close fd] from
completing until [fcntl_getfl] finishes. This prevents a file-descriptor or thread
leak even though client code doesn't explicitly wait on [open_flags]. *)open_flags:open_flagsDeferred.t;line_ending:Line_ending.t;(* If specified, subsequent writes are synchronously redirected here. *)mutablebacking_out_channel:Backing_out_channel.toption}[@@derivingfields]letsexp_of_tt=[%sexp(t.fd:Fd.t_hum)]typet_internals=tletsexp_of_t_internals{id;fd;monitor;inner_monitor;background_writer_state;background_writer_stopped;syscall;bytes_received;bytes_written;scheduled=_;scheduled_bytes;buf=_;scheduled_back;back;time_source;flushes=_;close_state;close_finished;close_started;producers_to_flush_at_close;flush_at_shutdown_elt;check_buffer_age;consumer_left;raise_when_consumer_leaves;open_flags;line_ending;backing_out_channel}=letsuppress_in_testx=ifam_running_inline_testthenNoneelseSomexinletmonitor_name_in_testmonitor=ifam_running_inline_testthen[%sexp(Monitor.namemonitor:Info.t)]else[%sexp(monitor:Monitor.t)]inlettime_source=ifphys_equaltime_source(Time_source.wall_clock())thenNoneelseSometime_sourcein(* [open_flags] are non-deterministic across CentOS versions and have been suppressed in
tests. Linux kernels (CentOS 6) expose O_CLOEXEC via fcntl(fd, F_GETFL), but newer
(CentOS 7) ones don't *)[%sexp{id=(suppress_in_testid:(Id.toption[@sexp.option]));fd=(suppress_in_testfd:(Fd.toption[@sexp.option]));monitor=(monitor_name_in_testmonitor:Sexp.t);inner_monitor=(monitor_name_in_testinner_monitor:Sexp.t);background_writer_state:[`Running|`Not_running|`Stopped_permanently];background_writer_stopped:unitIvar.t;syscall:[`Per_cycle|`PeriodicofTime.Span.t];bytes_received:Int63.t;bytes_written:Int63.t;scheduled_bytes:int;scheduled_back:int;back:int;time_source:(Time_source.toption[@sexp.option]);close_state:[`Open|`Closed_and_flushing|`Closed];close_finished:unitIvar.t;close_started:unitIvar.t;num_producers_to_flush_at_close=(Bag.lengthproducers_to_flush_at_close:int);flush_at_shutdown_elt=(suppress_in_testflush_at_shutdown_elt:((t[@sexp.opaque])Bag.Elt.toptionoption[@sexp.option]));check_buffer_age=(suppress_in_testcheck_buffer_age:((t[@sexp.opaque])Check_buffer_age'.tBag.Elt.toptionoption[@sexp.option]));consumer_left:unitIvar.t;raise_when_consumer_leaves:bool;open_flags=(suppress_in_testopen_flags:(open_flagsDeferred.toption[@sexp.option]));line_ending:Line_ending.t;backing_out_channel:(Backing_out_channel.toption[@sexp.option])}];;typewriter=t[@@derivingsexp_of]letset_raise_when_consumer_leavestbool=t.raise_when_consumer_leaves<-boolletbytes_to_writet=t.scheduled_bytes+t.back-t.scheduled_backletis_stopped_permanentlyt=matcht.background_writer_statewith|`Stopped_permanently->true|`Running|`Not_running->false;;letinvariantt:unit=tryletcheckffield=f(Field.getfieldt)inFields.iter~id:ignore~fd:ignore~monitor:ignore~inner_monitor:ignore~buf:ignore~background_writer_state:(check(function|`Stopped_permanently->assert(bytes_to_writet=0);assert(Ivar.is_fullt.background_writer_stopped)|`Running|`Not_running->assert(Bigstring.lengtht.buf>0);assert(Int63.(t.bytes_received-t.bytes_written=of_int(bytes_to_writet)));assert(Ivar.is_emptyt.background_writer_stopped)))~background_writer_stopped:ignore~syscall:ignore~bytes_written:(check(funbytes_written->assert(Int63.(zero<=bytes_written&&bytes_written<=t.bytes_received))))~bytes_received:ignore~scheduled:(check(fun(scheduled:Scheduled.t)->Deque.iterscheduled~f:(fun(iovec,kind)->ifphys_equalt.bufiovec.bufthenassert(matchkindwith|Keep->true|Destroy->false))))~scheduled_bytes:(check(funscheduled_bytes->assert(scheduled_bytes=Scheduled.lengtht.scheduled)))~scheduled_back:(check(funscheduled_back->assert(0<=scheduled_back&&scheduled_back<=t.back)))~back:(check(funback->assert(back<=Bigstring.lengtht.buf)))~time_source:ignore~flushes:ignore~close_state:ignore~close_finished:(check(funclose_finished->matcht.close_statewith|`Open|`Closed_and_flushing->assert(Ivar.is_emptyclose_finished)|`Closed->()))~close_started:(check(funclose_started->[%test_result:bool](Ivar.is_emptyclose_started)~expect:(matcht.close_statewith|`Open->true|`Closed|`Closed_and_flushing->false)))~producers_to_flush_at_close:ignore~flush_at_shutdown_elt:(check(funo->assert(Bool.equal(is_noneo)(Ivar.is_fullt.close_finished));Option.itero~f:(funelt->assert(phys_equalt(Bag.Elt.valueelt)))))~check_buffer_age:ignore~consumer_left:(check(funconsumer_left->ifIvar.is_fullconsumer_leftthenassert(is_stopped_permanentlyt)))~raise_when_consumer_leaves:ignore~open_flags:ignore~line_ending:ignore~backing_out_channel:(check(Option.invariantBacking_out_channel.invariant))with|exn->raise_s[%message"writer invariant failed"(exn:exn)~writer:(t:t_internals)];;moduleCheck_buffer_age:sigtypet=writerCheck_buffer_age'.tBag.Elt.toptionvaldummy:tvalcreate:writer->maximum_age:[`At_mostofTime.Span.t|`Unlimited]->tvaldestroy:t->unitvaltoo_old:t->unitDeferred.tmoduleInternal_for_unit_test:sigvalcheck_now:check_invariants:bool->time_source:Time_source.t->unitvalnum_active_checks_for:Time_source.t->intoptionendend=structopenCheck_buffer_age'typet=writerCheck_buffer_age'.tBag.Elt.toptionletelt_invariantt:unit=Invariant.invariant[%here]t[%sexp_of:_t](fun()->letcheckffield=f(Field.getfieldt)inassert(Queue.lengtht.bytes_received_queue=Queue.lengtht.times_received_queue);Fields.iter~writer:ignore~maximum_age:ignore~too_old:(check(funivar->letimplyab=(nota)||binassert(implyInt63.O.(t.bytes_received_at_now_minus_maximum_age>t.writer.bytes_written)(Ivar.is_fullivar))))~bytes_received_queue:(check(funq->letn=Queue.foldq~init:t.bytes_received_at_now_minus_maximum_age~f:(funprevcount->assert(Int63.(<)prevcount);count)inassert(Int63.(<=)nt.writer.bytes_received);assert(Int63.(=)nt.bytes_seen)))~times_received_queue:(check(funq->matchQueue.to_listqwith|[]->()|times->[%test_result:Time_ns.tlist]~expect:times(List.sorttimes~compare:Time_ns.compare);assert(Time_ns.Span.(<=)(Time_ns.diff(List.last_exntimes)(List.hd_exntimes))t.maximum_age)))~bytes_received_at_now_minus_maximum_age:ignore~bytes_seen:ignore~for_this_time_source:ignore);;letdummy=None(* [sync] prunes history by removing all the entries from [*_received_queue]s that
correspond to bytes already written or times older than [now - time_received]. *)letrecsynce~now=ifnot(Queue.is_emptye.bytes_received_queue)then(letbytes_received=Queue.peek_exne.bytes_received_queueinlettime_received=Queue.peek_exne.times_received_queueinletbytes_are_written=Int63.(<=)bytes_receivede.writer.bytes_writteninletbytes_are_too_old=Time_ns.Span.(>)(Time_ns.diffnowtime_received)e.maximum_ageinifbytes_are_too_oldthene.bytes_received_at_now_minus_maximum_age<-bytes_received;ifbytes_are_written||bytes_are_too_oldthen(ignore(Queue.dequeue_exne.bytes_received_queue:Int63.t);ignore(Queue.dequeue_exne.times_received_queue:Time_ns.t);synce~now));;modulePer_time_source=structtypet=writerCheck_buffer_age'.per_time_sourceletprocess_active_checke=letnow=Time_source.nowe.writer.time_sourceinsynce~now;letbytes_received=e.writer.bytes_receivedinletbytes_written=e.writer.bytes_writteninifInt63.O.(bytes_received>e.bytes_seen)then(e.bytes_seen<-bytes_received;ifInt63.O.(bytes_received>bytes_written)then(Queue.enqueuee.bytes_received_queuee.writer.bytes_received;Queue.enqueuee.times_received_queuenow));lettoo_old=Int63.O.(e.bytes_received_at_now_minus_maximum_age>bytes_written)inmatchIvar.is_fulle.too_old,too_oldwith|true,true|false,false->()|true,false->e.too_old<-Ivar.create()|false,true->Ivar.fille.too_old();letwriter=e.writerin(* [Monitor.send_exn] enqueues jobs but does not run user code, and so cannot
modify [e]. *)Monitor.send_exne.writer.monitor(Exn.create_s[%message"writer buffer has data older than"~maximum_age:(e.maximum_age:Time_ns.Span.t)~beginning_of_buffer:(Bigstring.to_stringwriter.buf~pos:0~len:(Int.min1024(Bigstring.lengthwriter.buf)):string)(writer:writer)]);;letcreate()={active_checks=Bag.create();closed=Ivar.create()}letcheckt=Bag.itert.active_checks~f:process_active_checkletinternal_check_now_for_unit_testt~check_invariants=ifcheck_invariantsthenBag.itert.active_checks~f:elt_invariant;checkt;;endmoduleTime_source_key=Hashable.Make_plain(structtypet=Time_source.t[@@derivingsexp_of]lethash_fold_tstatet=Time_source.Id.hash_fold_tstate(Time_source.idt)lethasht=Time_source.Id.hash(Time_source.idt)letcomparet1t2=Time_source.Id.compare(Time_source.idt1)(Time_source.idt2)end)(* [by_time_source] holds the set of [Per_time_source.t]'s with nonempty [active_checks]. *)letby_time_source:Per_time_source.tTime_source_key.Table.t=Time_source_key.Table.create();;moduleInternal_for_unit_test=structletnum_active_checks_fortime_source=Option.map(Hashtbl.findby_time_sourcetime_source)~f:(funpt->Bag.lengthpt.active_checks);;letcheck_now~check_invariants~time_source=Per_time_source.internal_check_now_for_unit_test(Hashtbl.find_exnby_time_sourcetime_source)~check_invariants;;endletcreatewriter~maximum_age=matchmaximum_agewith|`Unlimited->None|`At_mostmaximum_age->lettime_source=writer.time_sourceinletfor_this_time_source=Hashtbl.find_or_addby_time_sourcetime_source~default:(fun()->letpt=Per_time_source.create()inTime_source.everytime_sourceTime_ns.Span.second~stop:(Ivar.readpt.closed)~continue_on_error:false(fun()->Per_time_source.checkpt);pt)inSome(Bag.addfor_this_time_source.active_checks{writer;bytes_received_queue=Queue.create();times_received_queue=Queue.create();maximum_age=Time_ns.Span.of_span_float_round_nearestmaximum_age;bytes_seen=Int63.zero;bytes_received_at_now_minus_maximum_age=Int63.zero;too_old=Ivar.create();for_this_time_source});;letdestroyt=matchtwith|None->()|Someelt->lett=Bag.Elt.valueeltinletper_time_source=t.for_this_time_sourceinBag.removeper_time_source.active_checkselt;ifBag.is_emptyper_time_source.active_checksthen(Hashtbl.removeby_time_sourcet.writer.time_source;Ivar.fill_if_emptyper_time_source.closed());;lettoo_oldt=matchtwith|None->Deferred.never()|Someelt->Ivar.read(Bag.Elt.valueelt).too_old;;endletflushed_time_nst=ifInt63.O.(t.bytes_written=t.bytes_received)thenreturn(Time_source.nowt.time_source)elseifIvar.is_fullt.close_finishedthenDeferred.never()elseDeferred.create(funivar->Queue.enqueuet.flushes(ivar,t.bytes_received));;letflushed_timet=Deferred.map(flushed_time_nst)~f:Time_ns.to_time_float_round_nearest;;letflushedt=matcht.backing_out_channelwith|Somebacking_out_channel->Backing_out_channel.flushbacking_out_channel;return()|None->ifInt63.O.(t.bytes_written=t.bytes_received)thenreturn()elseifIvar.is_fullt.close_finishedthenDeferred.never()elseDeferred.ignore_m(flushed_timet);;letset_backing_out_channeltbacking_out_channel=t.backing_out_channel<-Somebacking_out_channel;;letset_synchronous_backing_out_channeltbacking_out_channel=letrecwait_until_no_bytes_to_write()=ifbytes_to_writet=0then(set_backing_out_channeltbacking_out_channel;return())else(let%bind()=flushedtinwait_until_no_bytes_to_write())inwait_until_no_bytes_to_write();;letset_synchronous_out_channeltout_channel=set_synchronous_backing_out_channelt(Backing_out_channel.of_out_channelout_channel);;letusing_synchronous_backing_out_channelt=Option.is_somet.backing_out_channelletclear_synchronous_out_channelt=ifis_somet.backing_out_channelthen(assert(bytes_to_writet=0);t.backing_out_channel<-None);;letwith_synchronous_backing_out_channeltbacking_out_channel~f=letsaved_backing_out_channel=t.backing_out_channelin(* This code will flush a bit more eagerly than it needs to if
[with_synchronous_backing_out_channel t oc] is called recursively on the same [t] and
[oc]. The flush is caused by [set_synchronous_backing_out_channel]. In theory this
could happen but in practice is exceedingly unlikely. *)Monitor.protect(fun()->let%bind()=set_synchronous_backing_out_channeltbacking_out_channelinf())~finally:(fun()->t.backing_out_channel<-saved_backing_out_channel;return());;letwith_synchronous_out_channeltout_channel~f=with_synchronous_backing_out_channelt~f(Backing_out_channel.of_out_channelout_channel);;letset_fdtfd=let%map()=flushedtint.fd<-fd;;letconsumer_leftt=Ivar.readt.consumer_leftletclose_finishedt=Ivar.readt.close_finishedletclose_startedt=Ivar.readt.close_startedletis_closedt=matcht.close_statewith|`Open->false|`Closed|`Closed_and_flushing->true;;letis_opent=not(is_closedt)letwriters_to_flush_at_shutdown:tBag.t=Bag.create()letfinal_flush?forcet=letproducers_flushed=(* Note that each element of [producers_to_flush_at_close] checks that the upstream
producer is flushed, which includes checking that [t] itself is flushed once the
producer has written everything to [t]. So, there is no need to call [flushed t]
after the producer is flushed. *)Deferred.List.iter~how:`Parallel~f:(funf->f())(Bag.to_listt.producers_to_flush_at_close)inletforce=matchforcewith|Somefc->fc|None->(* We used to use [after (sec 5.)] as the default value for [force] for all kinds
of underlying fds. This was problematic, because it silently caused data in
the writer's buffer to be dropped when it kicked in. We care about data
getting out only for the files, when we want to get data to disk. When we
close socket writers, we usually just want to drop the connection, so using
[after (sec 5.)] makes sense. *)(matchFd.kindt.fdwith|File->Deferred.never()|Char|Fifo|Socket_->Time_source.aftert.time_source(Time_ns.Span.of_sec5.))inDeferred.any_unit[(* If the consumer leaves, there's no more writing we can do. *)consumer_leftt;Deferred.all_unit[producers_flushed;flushedt];force;(* The buffer-age check might fire while we're waiting. *)Check_buffer_age.too_oldt.check_buffer_age];;letclose?force_closet=ifdebugthenDebug.log"Writer.close"t[%sexp_of:t];(matcht.close_statewith|`Closed_and_flushing|`Closed->()|`Open->t.close_state<-`Closed_and_flushing;Ivar.fillt.close_started();final_flusht?force:force_close>>>fun()->t.close_state<-`Closed;Check_buffer_age.destroyt.check_buffer_age;(matcht.flush_at_shutdown_eltwith|None->assertfalse|Someelt->Bag.removewriters_to_flush_at_shutdownelt);Unix.closet.fd>>>fun()->Ivar.fillt.close_finished());close_finishedt;;let()=Shutdown.at_shutdown(fun()->ifdebugthenDebug.log_string"Writer.at_shutdown";Deferred.List.iter~how:`Parallel(Bag.to_listwriters_to_flush_at_shutdown)~f:(funt->Deferred.any_unit[final_flusht;close_finishedt]));;letfill_flushes{bytes_written;flushes;time_source;_}=ifnot(Queue.is_emptyflushes)then(letnow=Time_source.nowtime_sourceinletrecloop()=matchQueue.peekflusheswith|None->()|Some(ivar,z)->ifInt63.(z<=bytes_written)then(Ivar.fillivarnow;ignore(Queue.dequeueflushes:(Time_ns.tIvar.t*Int63.t)option);loop())inloop());;letstop_permanentlyt=t.background_writer_state<-`Stopped_permanently;Deque.cleart.scheduled;t.scheduled_bytes<-0;t.buf<-Bigstring.create0;t.scheduled_back<-0;t.back<-0;Ivar.fill_if_emptyt.background_writer_stopped();Queue.cleart.flushes;;letstopped_permanentlyt=Ivar.readt.background_writer_stoppedletdietsexp=stop_permanentlyt;raise_ssexp;;typebuffer_age_limit=[`At_mostofTime.Span.t|`Unlimited][@@derivingbin_io,sexp]letcreate?buf_len?(syscall=`Per_cycle)?buffer_age_limit?(raise_when_consumer_leaves=true)?(line_ending=Line_ending.Unix)?time_sourcefd=lettime_source=matchtime_sourcewith|Somex->Time_source.read_onlyx|None->Time_source.wall_clock()inletbuffer_age_limit=matchbuffer_age_limitwith|Somez->z|None->(matchFd.kindfdwith|File->`Unlimited|Char|Fifo|Socket_->`At_most(Time.Span.of_min2.))inletbuf_len=matchbuf_lenwith|None->65*1024*2(* largest observed single write call * 2 *)|Somebuf_len->ifbuf_len<=0theninvalid_arg"Writer.create: buf_len <= 0"elsebuf_leninletid=Id.create()inletmonitor=Monitor.create()?name:(ifam_running_inline_testthenSome"Writer.monitor"elseNone)inletinner_monitor=Monitor.create()?name:(ifam_running_inline_testthenSome"Writer.inner_monitor"elseNone)inletconsumer_left=Ivar.create()inletopen_flags=try_with(fun()->Unix.fcntl_getflfd)inlett={id;fd;syscall;monitor;inner_monitor;buf=Bigstring.createbuf_len;back=0;scheduled_back=0;scheduled=Deque.create();scheduled_bytes=0;bytes_received=Int63.zero;bytes_written=Int63.zero;time_source;flushes=Queue.create();background_writer_state=`Not_running;background_writer_stopped=Ivar.create();close_state=`Open;close_finished=Ivar.create();close_started=Ivar.create();producers_to_flush_at_close=Bag.create();flush_at_shutdown_elt=None;check_buffer_age=Check_buffer_age.dummy;consumer_left;raise_when_consumer_leaves;open_flags;line_ending;backing_out_channel=None}inMonitor.detach_and_iter_errorsinner_monitor~f:(fun(exn:Exn.t)->Monitor.send_exnmonitor(Exn.create_s[%message"Writer error from inner_monitor"~_:(Monitor.extract_exnexn:Exn.t)~writer:(t:t_internals)]));t.check_buffer_age<-Check_buffer_age.createt~maximum_age:buffer_age_limit;t.flush_at_shutdown_elt<-Some(Bag.addwriters_to_flush_at_shutdownt);t;;letset_buffer_age_limittmaximum_age=Check_buffer_age.destroyt.check_buffer_age;t.check_buffer_age<-Check_buffer_age.createt~maximum_age;;letof_out_channelockind=create(Fd.of_out_channelockind)letcan_writet=matcht.close_statewith|`Open|`Closed_and_flushing->true|`Closed->false;;letensure_can_writet=ifnot(can_writet)thenraise_s[%message"attempt to use closed writer"~_:(t:t)];;letopen_file?(append=false)?buf_len?syscall?(perm=0o666)?line_ending?time_sourcefile=(* Writing to NFS needs the [`Trunc] flag to avoid leaving extra junk at the end of
a file. *)letmode=[`Wronly;`Creat]inletmode=(ifappendthen`Appendelse`Trunc)::modeinUnix.openfilefile~mode~perm>>|create?buf_len?syscall?line_ending?time_source;;letwith_closet~f=Monitor.protectf~finally:(fun()->closet)letwith_writer_exclusivetf=let%bind()=Unix.lockft.fd`WriteinMonitor.protectf~finally:(fun()->let%map()=flushedtinUnix.unlockft.fd);;letwith_file?perm?append?(exclusive=false)?line_ending?time_sourcefile~f=let%bindt=open_file?perm?append?line_ending?time_sourcefileinwith_closet~f:(fun()->ifexclusivethenwith_writer_exclusivet(fun()->ft)elseft);;letgot_bytestn=t.bytes_received<-Int63.(t.bytes_received+of_intn)letadd_iovectkind(iovec:_IOVec.t)~count_bytes_as_received=assert(t.scheduled_back=t.back);ifcount_bytes_as_receivedthengot_bytestiovec.len;ifnot(is_stopped_permanentlyt)then(t.scheduled_bytes<-t.scheduled_bytes+iovec.len;Deque.enqueue_backt.scheduled(iovec,kind));assert(t.scheduled_back=t.back);;letschedule_unscheduledtkind=letneed_to_schedule=t.back-t.scheduled_backinassert(need_to_schedule>=0);ifneed_to_schedule>0then(letpos=t.scheduled_backint.scheduled_back<-t.back;add_iovectkind(IOVec.of_bigstringt.buf~pos~len:need_to_schedule)~count_bytes_as_received:false(* they were already counted *));;letdummy_iovec=IOVec.emptyIOVec.bigstring_kindletmk_iovecst=schedule_unscheduledtKeep;letn_iovecs=Int.min(Deque.lengtht.scheduled)(Lazy.forceIOVec.max_iovecs)inletiovecs=Array.create~len:n_iovecsdummy_iovecinletcontains_mmapped_ref=reffalseinletiovecs_len=ref0inwith_return(funr->leti=ref0inDeque.itert.scheduled~f:(fun(iovec,_)->if!i>=n_iovecsthenr.return();if(not!contains_mmapped_ref)&&Bigstring.is_mmappediovec.bufthencontains_mmapped_ref:=true;iovecs_len:=!iovecs_len+iovec.len;iovecs.(!i)<-iovec;incri));iovecs,!contains_mmapped_ref,!iovecs_len;;(* Size of I/O- or blit operation for which a helper thread should be used. This number
(a power of two) is somewhat empirically motivated, but there is no reason why it
should be the best. *)letthread_io_cutoff=262_144letis_running=function|`Running->true|_->false;;(* If the writer was closed, we should be quiet. But if it wasn't, then someone was
monkeying around with the fd behind our back, and we should complain. *)letfd_closedt=ifnot(is_closedt)thendiet[%message"writer fd unexpectedly closed "];;letrecstart_writet=ifdebugthenDebug.log"Writer.start_write"t[%sexp_of:t];assert(is_runningt.background_writer_state);letiovecs,contains_mmapped,iovecs_len=mk_iovecstinlethandle_write_result=function|`Already_closed->fd_closedt|`Okn->ifn>=0thenwrite_finishedtnelsediet[%message"write system call returned negative result"(n:int)]|`Error(Unix.Unix_error((EWOULDBLOCK|EAGAIN),_,_))->write_when_readyt|`Error(Unix.Unix_error(EBADF,_,_))->diet[%message"write got EBADF"]|`Error(Unix.Unix_error((EPIPE|ECONNRESET|EHOSTUNREACH|ENETDOWN|ENETRESET|ENETUNREACH|ETIMEDOUT),_,_)asexn)->(* [t.consumer_left] is empty since once we reach this point, we stop the writer
permanently, and so will never reach here again. *)assert(Ivar.is_emptyt.consumer_left);Ivar.fillt.consumer_left();stop_permanentlyt;ift.raise_when_consumer_leavesthenraiseexn|`Errorexn->diet[%message""~_:(exn:Exn.t)]inletshould_write_in_thread=(not(Fd.supports_nonblockt.fd))(* Though the write will not block in this case, a memory-mapped bigstring in an
I/O-vector may cause a page fault, which would cause the async scheduler thread
to block. So, we write in a separate thread, and the [Bigstring.writev] releases
the OCaml lock, allowing the async scheduler thread to continue. *)||iovecs_len>thread_io_cutoff||contains_mmappedinifshould_write_in_threadthenFd.syscall_in_threadt.fd~name:"writev"(funfile_descr->Bigstring.writevfile_descriovecs)>>>handle_write_resultelsehandle_write_result(Fd.syscallt.fd~nonblocking:true(funfile_descr->Bigstring.writev_assume_fd_is_nonblockingfile_descriovecs))andwrite_when_readyt=ifdebugthenDebug.log"Writer.write_when_ready"t[%sexp_of:t];assert(is_runningt.background_writer_state);Fd.ready_tot.fd`Write>>>function|`Bad_fd->diet[%message"writer ready_to got Bad_fd"]|`Closed->fd_closedt|`Ready->start_writetandwrite_finishedtbytes_written=ifdebugthenDebug.log"Writer.write_finished"(bytes_written,t)[%sexp_of:int*t];assert(is_runningt.background_writer_state);letint63_bytes_written=Int63.of_intbytes_writteninIo_stats.updateio_stats~kind:(Fd.kindt.fd)~bytes:int63_bytes_written;t.bytes_written<-Int63.(int63_bytes_written+t.bytes_written);ifInt63.(t.bytes_written>t.bytes_received)thendiet[%message"writer wrote more bytes than it received"];fill_flushest;t.scheduled_bytes<-t.scheduled_bytes-bytes_written;(* Remove processed iovecs from t.scheduled. *)letrecremove_donebytes_written=assert(bytes_written>=0);matchDeque.dequeue_frontt.scheduledwith|None->ifbytes_written>0thendiet[%message"writer wrote nonzero amount but IO_queue is empty"]|Some({buf;pos;len},kind)->ifbytes_written>=lenthen((* Current I/O-vector completely written. Internally generated buffers get
destroyed immediately unless they are still in use for buffering. *)(matchkindwith|Destroy->Bigstring.unsafe_destroybuf|Keep->());remove_done(bytes_written-len))else((* Partial I/O: update partially written I/O-vector and retry I/O. *)letnew_iovec=IOVec.of_bigstringbuf~pos:(pos+bytes_written)~len:(len-bytes_written)inDeque.enqueue_frontt.scheduled(new_iovec,kind))inremove_donebytes_written;(* See if there's anything else to do. *)schedule_unscheduledtKeep;ifDeque.is_emptyt.scheduledthen(t.back<-0;t.scheduled_back<-0;t.background_writer_state<-`Not_running)else(matcht.syscallwith|`Per_cycle->write_when_readyt|`Periodicspan->Time_source.aftert.time_source(Time_ns.Span.of_span_float_round_nearestspan)>>>fun_->start_writet);;letmaybe_start_writert=matcht.background_writer_statewith|`Stopped_permanently|`Running->()|`Not_running->ifbytes_to_writet>0then(t.background_writer_state<-`Running;(* We schedule the background writer thread to run with low priority, so that it
runs at the end of the cycle and that all of the calls to Writer.write will
usually be batched into a single system call. *)schedule~monitor:t.inner_monitor~priority:Priority.low(fun()->t.open_flags>>>funopen_flags->letcan_write_fd=matchopen_flagswith|Error_->false|Okflags->Unix.Open_flags.can_writeflagsinifnotcan_write_fdthenraise_s[%message"not allowed to write due to file-descriptor flags"(open_flags:open_flags)];start_writet));;letgive_buftdesired=assert(desired>0);assert(not(is_stopped_permanentlyt));got_bytestdesired;letbuf_len=Bigstring.lengtht.bufinletavailable=buf_len-t.backinifdesired<=availablethen((* Data fits into buffer *)letpos=t.backint.back<-t.back+desired;t.buf,pos)elseif(* Preallocated buffer too small; schedule buffered writes. We create a new buffer of
exactly the desired size if the desired size is more than half the buffer length.
If we only created a new buffer when the desired size was greater than the buffer
length, then multiple consecutive writes of slightly more than half the buffer
length would each waste slightly less than half of the buffer. Although, it is
still the case that multiple consecutive writes of slightly more than one quarter
of the buffer length will waste slightly less than one quarter of the buffer. *)desired>buf_len/2then(schedule_unscheduledtKeep;(* Preallocation size too small; allocate dedicated buffer *)letbuf=Bigstring.createdesiredinadd_iovectDestroy(IOVec.of_bigstring~len:desiredbuf)~count_bytes_as_received:false;(* we already counted them above *)buf,0)else(schedule_unscheduledtDestroy;(* Preallocation size sufficient; preallocate new buffer *)letbuf=Bigstring.createbuf_lenint.buf<-buf;t.scheduled_back<-0;t.back<-desired;buf,0);;(* If [blit_to_bigstring] raises, [write_gen_unchecked] may leave some unexpected bytes in
the bigstring. However it leaves [t.back] and [t.bytes_received] in agreement. *)letwrite_gen_internal(typea)tsrc~src_pos~src_len~allow_partial_write~(blit_to_bigstring:(a,Bigstring.t)Blit.blit)=ifis_stopped_permanentlytthengot_bytestsrc_lenelse(matcht.backing_out_channelwith|Somebacking_out_channel->got_bytestsrc_len;Backing_out_channel.outputbacking_out_channel~blit_to_bigstring~src~src_len~src_pos;t.bytes_written<-Int63.(t.bytes_written+of_intsrc_len)|None->letavailable=Bigstring.lengtht.buf-t.backinifavailable>=src_lenthen(got_bytestsrc_len;letdst_pos=t.backint.back<-dst_pos+src_len;blit_to_bigstring~src~src_pos~len:src_len~dst:t.buf~dst_pos)elseifallow_partial_writethen(got_bytestavailable;letdst_pos=t.backint.back<-dst_pos+available;blit_to_bigstring~src~src_pos~len:available~dst:t.buf~dst_pos;letremaining=src_len-availableinletdst,dst_pos=give_buftremaininginblit_to_bigstring~src~src_pos:(src_pos+available)~len:remaining~dst~dst_pos)else(letdst,dst_pos=give_buftsrc_leninblit_to_bigstring~src~src_pos~dst~dst_pos~len:src_len);maybe_start_writert);;letwrite_directt~f=ifis_stopped_permanentlytthenNoneelse(letpos=t.backinletlen=Bigstring.lengtht.buf-posinletx,written=ft.buf~pos~leninifwritten<0||written>lenthenraise_s[%message"[write_direct]'s [~f] argument returned invalid [written]"(written:int)(len:int)~writer:(t:t)];t.back<-pos+written;got_bytestwritten;maybe_start_writert;Somex);;letwrite_gen_unchecked?pos?lentsrc~blit_to_bigstring~length=letsrc_pos,src_len=Ordered_collection_common.get_pos_len_exn()?pos?len~total_length:(lengthsrc)inwrite_gen_internaltsrc~src_pos~src_len~allow_partial_write:true~blit_to_bigstring;;letwrite_gen_whole_uncheckedtsrc~blit_to_bigstring~length=letsrc_len=lengthsrcinwrite_gen_internaltsrc~src_pos:0~src_len~allow_partial_write:false~blit_to_bigstring:(fun~src~src_pos~dst~dst_pos~len->assert(src_pos=0);assert(len=src_len);blit_to_bigstringsrcdst~pos:dst_pos);;letwrite_bytes?pos?lentsrc=write_gen_unchecked?pos?lentsrc~blit_to_bigstring:Bigstring.From_bytes.blit~length:Bytes.length;;letwrite?pos?lentsrc=write_gen_unchecked?pos?lentsrc~blit_to_bigstring:Bigstring.From_string.blit~length:String.length;;letwrite_bigstring?pos?lentsrc=write_gen_unchecked?pos?lentsrc~blit_to_bigstring:Bigstring.blit~length:Bigstring.length;;letwrite_iobuf?pos?lentiobuf=letiobuf=Iobuf.read_only(Iobuf.no_seekiobuf)inwrite_gen_unchecked?pos?lentiobuf~blit_to_bigstring:Iobuf.Peek.To_bigstring.blit~length:Iobuf.length;;letwrite_substringtsubstring=write_bytest(Substring.basesubstring)~pos:(Substring.possubstring)~len:(Substring.lengthsubstring);;letwrite_bigsubstringtbigsubstring=write_bigstringt(Bigsubstring.basebigsubstring)~pos:(Bigsubstring.posbigsubstring)~len:(Bigsubstring.lengthbigsubstring);;letwriteft=ksprintf(funs->writets)letwrite_gen?pos?lentsrc~blit_to_bigstring~length=trywrite_gen_unchecked?pos?lentsrc~blit_to_bigstring~lengthwith|exn->stop_permanentlyt;raise_s[%message"Writer.write_gen: error writing value"(exn:exn)];;letwrite_gen_wholetsrc~blit_to_bigstring~length=trywrite_gen_whole_uncheckedtsrc~blit_to_bigstring~lengthwith|exn->stop_permanentlyt;raise_s[%message"Writer.write_gen_whole: error writing value"(exn:exn)];;letto_formattert=Format.make_formatter(funstrposlen->letstr=Bytes.of_stringstrinensure_can_writet;write_substringt(Substring.createstr~pos~len))ignore;;letwrite_chartc=ifis_stopped_permanentlytthengot_bytest1else((* Check for the common case that the char can simply be put in the buffer. *)matcht.backing_out_channelwith|Somebacking_out_channel->got_bytest1;Backing_out_channel.output_charbacking_out_channelc;t.bytes_written<-Int63.(t.bytes_written+of_int1)|None->ifBigstring.lengtht.buf-t.back>=1then(got_bytest1;t.buf.{t.back}<-c;t.back<-t.back+1)else(letdst,dst_pos=give_buft1indst.{dst_pos}<-c);maybe_start_writert);;letnewline?line_endingt=letline_ending=matchline_endingwith|Somex->x|None->t.line_endingin(matchline_endingwith|Unix->()|Dos->write_chart'\r');write_chart'\n';;letwrite_line?line_endingts=writets;newlinet?line_ending;;letwrite_byteti=write_chart(char_of_int(i%256))moduleTerminate_with=structtypet=|Newline|Space_if_needed[@@derivingsexp_of]endletwrite_sexp_internal=letinitial_size=10*1024inletbuffer=lazy(Buffer.createinitial_size)inletblit_str=ref(Bytes.create0)infun~(terminate_with:Terminate_with.t)?(hum=false)tsexp->letbuffer=Lazy.forcebufferinBuffer.clearbuffer;ifhumthenSexp.to_buffer_hum~buf:buffer~indent:!Sexp.default_indentsexpelseSexp.to_buffer~buf:buffersexp;letlen=Buffer.lengthbufferinletblit_str_len=Bytes.length!blit_striniflen>blit_str_lenthenblit_str:=Bytes.create(maxlen(maxinitial_size(2*blit_str_len)));Buffer.blit~src:buffer~src_pos:0~dst:!blit_str~dst_pos:0~len;write_bytest!blit_str~len;matchterminate_withwith|Newline->newlinet|Space_if_needed->(* If the string representation doesn't start/end with paren or double quote, we add
a space after it to ensure that the parser can recognize the end of the sexp. *)letc=Bytes.get!blit_str0inifnotChar.O.(c='('||c='"')thenwrite_chart' ';;letwrite_sexp?hum?(terminate_with=Terminate_with.Space_if_needed)tsexp=write_sexp_internaltsexp?hum~terminate_with;;letwrite_bin_prott(writer:_Bin_prot.Type_class.writer)v=letlen=writer.sizevinassert(len>0);lettot_len=len+Bin_prot.Utils.size_header_lengthinifis_stopped_permanentlytthengot_bytesttot_lenelse(letbuf,start_pos=give_bufttot_leninignore(Bigstring.write_bin_protbuf~pos:start_poswriterv:int);maybe_start_writert);;letwrite_bin_prot_no_size_headert~sizewritev=ifis_stopped_permanentlytthengot_bytestsizeelse(letbuf,start_pos=give_buftsizeinletend_pos=writebuf~pos:start_posvinletwritten=end_pos-start_posinifwritten<>sizethenraise_s[%message"Writer.write_bin_prot_no_size_header bug!"(written:int)(size:int)];maybe_start_writert);;letsendts=writet(string_of_int(String.lengths)^"\n");writets;;letschedule_iovec?(destroy_or_keep=Destroy_or_keep.Keep)tiovec=schedule_unscheduledtKeep;add_iovectdestroy_or_keepiovec~count_bytes_as_received:true;maybe_start_writert;;letschedule_iovecstiovecs=schedule_unscheduledtKeep;Queue.iteriovecs~f:(add_iovectKeep~count_bytes_as_received:true);Queue.cleariovecs;maybe_start_writert;;letschedule_bigstring?destroy_or_keept?pos?lenbstr=schedule_iovect(IOVec.of_bigstring?pos?lenbstr)?destroy_or_keep;;letschedule_bigsubstringtbigsubstring=schedule_bigstringt(Bigsubstring.basebigsubstring)~pos:(Bigsubstring.posbigsubstring)~len:(Bigsubstring.lengthbigsubstring);;letschedule_iobuf_peekt?pos?leniobuf=schedule_iovect(Iobuf.Expert.to_iovec_shared?pos?leniobuf);;letschedule_iobuf_consumet?leniobuf=letiovec=Iobuf.Expert.to_iovec_shared?leniobufinletlen=iovec.leninschedule_iovectiovec;let%map_=flushed_timetinIobuf.advanceiobuflen;;(* The code below ensures that no calls happen on a closed writer. *)letfsynct=ensure_can_writet;let%bind()=flushedtinUnix.fsynct.fd;;letfdatasynct=ensure_can_writet;let%bind()=flushedtinUnix.fdatasynct.fd;;letwrite_bin_prottsw_argv=ensure_can_writet;write_bin_prottsw_argv;;letsendts=ensure_can_writet;sendts;;letschedule_iovec?destroy_or_keeptiovec=ensure_can_writet;schedule_iovec?destroy_or_keeptiovec;;letschedule_iovecstiovecs=ensure_can_writet;schedule_iovecstiovecs;;letschedule_bigstringt?pos?lenbstr=ensure_can_writet;schedule_bigstringt?pos?lenbstr;;letschedule_bigsubstringtbigsubstring=ensure_can_writet;schedule_bigsubstringtbigsubstring;;letschedule_iobuf_peekt?pos?leniobuf=ensure_can_writet;schedule_iobuf_peekt?pos?leniobuf;;letschedule_iobuf_consumet?leniobuf=ensure_can_writet;schedule_iobuf_consumet?leniobuf;;letwrite_gen?pos?lentsrc~blit_to_bigstring~length=ensure_can_writet;write_gen?pos?lentsrc~blit_to_bigstring~length;;letwrite_bytes?pos?lents=ensure_can_writet;write_bytes?pos?lents;;letwrite?pos?lents=ensure_can_writet;write?pos?lents;;letwrite_line?line_endingts=ensure_can_writet;write_linets?line_ending;;letwriteft=ensure_can_writet;writeft;;letwrite_sexp?hum?terminate_withts=ensure_can_writet;write_sexp?hum?terminate_withts;;letwrite_iobuf?pos?lentiobuf=ensure_can_writet;write_iobuf?pos?lentiobuf;;letwrite_bigstring?pos?lentsrc=ensure_can_writet;write_bigstring?pos?lentsrc;;letwrite_bigsubstringts=ensure_can_writet;write_bigsubstringts;;letwrite_substringts=ensure_can_writet;write_substringts;;letwrite_bytetb=ensure_can_writet;write_bytetb;;letwrite_chartc=ensure_can_writet;write_chartc;;letnewline?line_endingt=ensure_can_writet;newline?line_endingt;;letstdout_and_stderr=lazy((* We [create] the writers inside [Monitor.main] so that it is their monitors'
parent. *)matchScheduler.within_v~monitor:Monitor.main(fun()->letstdout=Fd.stdout()inletstderr=Fd.stderr()inlett=createstdoutinletdev_and_inofd=letstats=Core.Unix.fstat(Fd.file_descr_exnfd)instats.st_dev,stats.st_inoinmatcham_test_runnerwith|true->(* In tests, we use synchronous output to improve determinism, especially
when mixing libraries that use Core and Async printing. *)set_backing_out_channelt(Backing_out_channel.of_out_channelOut_channel.stdout);t,t|false->if[%compare.equal:int*int](dev_and_inostdout)(dev_and_inostderr)then(* If stdout and stderr point to the same file, we must share a single writer
between them. See the comment in writer.mli for details. *)t,telset,createstderr)with|None->raise_s[%message[%here]"unable to create stdout/stderr"]|Somev->v);;letstdout=lazy(fst(Lazy.forcestdout_and_stderr))letstderr=lazy(snd(Lazy.forcestdout_and_stderr))letuse_synchronous_stdout_and_stderr()=letstdout,stderr=Lazy.forcestdout_and_stderrinletts_and_channels=(stdout,Out_channel.stdout)::((* We only set [stderr] if it is distinct from [stdout]. *)matchphys_equalstdoutstderrwith|true->[]|false->[stderr,Out_channel.stderr])inList.mapts_and_channels~f:(fun(t,out_channel)->set_synchronous_out_channeltout_channel)|>Deferred.all_unit;;(* This test is here rather than in a [test] directory because we want it to run
immediately after [stdout] and [stderr] are defined, so that they haven't yet been
forced. *)let%expect_test"stdout and stderr are always the same in tests"=print_s[%message(Lazy.is_valstdout:bool)];[%expect{| ("Lazy.is_val stdout" false) |}];print_s[%message(Lazy.is_valstderr:bool)];[%expect{| ("Lazy.is_val stderr" false) |}];letmoduleU=Core.Unixinletsaved_stderr=U.dupU.stderrin(* Make sure fd 1 and 2 have different inodes at the point that we force them. *)letpipe_r,pipe_w=U.pipe()inU.dup2~src:pipe_w~dst:U.stderr;U.closepipe_r;U.closepipe_w;letstdout=Lazy.forcestdoutinletstderr=Lazy.forcestderrinU.dup2~src:saved_stderr~dst:U.stderr;U.closesaved_stderr;print_s[%message(phys_equalstdoutstderr:bool)];[%expect{| ("phys_equal stdout stderr" true) |}];;letbehave_nicely_in_pipeline?writers()=letwriters=matchwriterswith|Somez->z|None->List.map[stdout;stderr]~f:forceinList.iterwriters~f:(funwriter->set_buffer_age_limitwriter`Unlimited;set_raise_when_consumer_leaveswriterfalse;don't_wait_for(let%map()=consumer_leftwriterinShutdown.shutdown0));;letapply_umaskperm=letumask=Core_unix.umask0inignore(Core_unix.umaskumask:int);permlandlnotumask;;letwith_file_atomic?temp_file?perm?fsync:(do_fsync=false)?time_sourcefile~f=let%bindcurrent_file_permissions=match%mapMonitor.try_with(fun()->Unix.statfile)with|Okstats->Somestats.perm|Error_->Noneinlet%bindtemp_file,fd=Unix.mkstemp(Option.valuetemp_file~default:file)inlett=create?time_sourcefdinlet%bindresult=with_closet~f:(fun()->let%bindresult=ftinletnew_permissions=matchcurrent_file_permissionswith|None->(* We are creating a new file; apply the umask. *)apply_umask(Option.valueperm~default:0o666)|Somep->(* We are overwriting an existing file; use the requested permissions, or
whatever the file had already if nothing was supplied. *)Option.valueperm~default:pinlet%bind()=Unix.fchmodfd~perm:new_permissionsinlet%map()=ifdo_fsyncthenfsynctelsereturn()inresult)inmatch%bindMonitor.try_with(fun()->Unix.rename~src:temp_file~dst:file)with|Ok()->returnresult|Errorexn->letfailvsexp_of_v=raise_s[%message"Writer.with_file_atomic could not create file"(file:string)~_:(v:v)]in(match%mapMonitor.try_with(fun()->Unix.unlinktemp_file)with|Ok()->failexn[%sexp_of:exn]|Errorexn2->fail(exn,`Cleanup_failedexn2)[%sexp_of:exn*[`Cleanup_failedofexn]]);;letsave?temp_file?perm?fsyncfile~contents=with_file_atomic?temp_file?perm?fsyncfile~f:(funt->writetcontents;return());;letsave_lines?temp_file?perm?fsyncfilelines=with_file_atomic?temp_file?perm?fsyncfile~f:(funt->List.iterlines~f:(funline->writetline;newlinet);return());;letsave_sexp?temp_file?perm?fsync?(hum=true)filesexp=with_file_atomic?temp_file?perm?fsyncfile~f:(funt->write_sexp_internaltsexp~hum~terminate_with:Newline;return());;letsave_sexps?temp_file?perm?fsync?(hum=true)filesexps=with_file_atomic?temp_file?perm?fsyncfile~f:(funt->List.itersexps~f:(funsexp->write_sexp_internaltsexp~hum~terminate_with:Newline);return());;letsave_bin_prot?temp_file?perm?fsyncfilebin_writera=with_file_atomic?temp_file?perm?fsyncfile~f:(funt->write_bin_prottbin_writera;return());;letwith_flushed_at_closet~flushed~f=letproducers_to_flush_at_close_elt=Bag.addt.producers_to_flush_at_closeflushedinMonitor.protectf~finally:(fun()->Bag.removet.producers_to_flush_at_closeproducers_to_flush_at_close_elt;return());;letmake_transfer?(stop=Deferred.never())?max_num_values_per_readtpipe_rwrite_f=letconsumer=Pipe.add_consumerpipe_r~downstream_flushed:(fun()->let%map()=flushedtin`Ok)inletend_of_pipe_r=Ivar.create()in(* The only reason we can't use [Pipe.iter] is because it doesn't accept
[?max_num_values_per_read]. *)letreciter()=ifIvar.is_fullt.consumer_left||(not(can_writet))||Deferred.is_determinedstopthen(* The [choose] in [doit] will become determined and [doit] will do the right
thing. *)()else(letread_result=matchmax_num_values_per_readwith|None->Pipe.read_now'pipe_r~consumer|Somemax_queue_length->Pipe.read_now'pipe_r~consumer~max_queue_lengthinmatchread_resultwith|`Eof->Ivar.fillend_of_pipe_r()|`Nothing_available->Pipe.values_availablepipe_r>>>fun_->iter()|`Okq->write_fq~cont:(fun()->Pipe.Consumer.values_sent_downstreamconsumer;flushedt>>>iter))inletdoit()=(* Concurrecy between [iter] and [choose] is essential. Even if [iter] gets blocked,
for example on [flushed], the result of [doit] can still be determined by [choice]s
other than [end_of_pipe_r]. *)iter();match%mapchoose[choice(Ivar.readend_of_pipe_r)(fun()->`End_of_pipe_r);choicestop(fun()->`Stop);choice(close_finishedt)(fun()->`Writer_closed);choice(consumer_leftt)(fun()->`Consumer_left)]with|`End_of_pipe_r|`Stop->()|`Writer_closed|`Consumer_left->Pipe.close_readpipe_rinwith_flushed_at_closet~f:doit~flushed:(fun()->Deferred.ignore_m(Pipe.upstream_flushedpipe_r));;lettransfer?stop?max_num_values_per_readtpipe_rwrite_f=make_transfer?stop?max_num_values_per_readtpipe_r(funq~cont->Queue.iterq~f:write_f;cont());;lettransfer'?stop?max_num_values_per_readtpipe_rwrite_f=make_transfer?stop?max_num_values_per_readtpipe_r(funq~cont->write_fq>>>cont);;letpipet=letpipe_r,pipe_w=Pipe.create()indon't_wait_for(transfertpipe_r(funs->writets));pipe_w;;