123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814open!Coreopen!ImportmoduletypeWriter0=sig(** [Writer] is Async's main API for output to a file descriptor. It is the analog of
[Core.Out_channel].
Each writer has an internal buffer, to which [Writer.write*] adds data. Each writer
uses an Async cooperative thread that makes [write()] system calls to move the data
from the writer's buffer to an OS buffer via the file descriptor.
There is no guarantee that the data sync on the other side of the writer can keep up
with the rate at which you are writing. If it cannot, the OS buffer will fill up and
the writer's cooperative thread will be unable to send any bytes. In that case, calls
to [Writer.write*] will grow the writer's buffer without bound, as long as your
program produces data. One solution to this problem is to call [Writer.flushed] and
not continue until that becomes determined, which will only happen once the bytes in
the writer's buffer have been successfully transferred to the OS buffer. Another
solution is to check [Writer.bytes_to_write] and not produce any more data if that is
beyond some bound.
There are two kinds of errors that one can handle with writers. First, a writer can be
[close]d, which will cause future [write]s (and other operations) to synchronously
raise an exception. Second, the writer's cooperative thread can fail due to a
[write()] system call failing. This will cause an exception to be sent to the writer's
monitor, which will be a child of the monitor in effect when the writer is created.
One can deal with such asynchronous exceptions in the usual way, by handling the
stream returned by [Monitor.detach_and_get_error_stream (Writer.monitor writer)]. *)moduleId:Unique_idmoduleLine_ending:sigtypet=|Dos|Unix[@@derivingsexp_of]endtypet[@@derivingsexp_of]includeInvariant.Swithtypet:=t(** [stdout] and [stderr] are writers for file descriptors 1 and 2. They are lazy because
we don't want to create them in all programs that happen to link with Async.
When either [stdout] or [stderr] is created, they both are created. Furthermore, if
they point to the same inode, then they will be the same writer to [Fd.stdout]. This
can be confusing, because [fd (force stderr)] will be [Fd.stdout], not [Fd.stderr].
And subsequent modifications of [Fd.stderr] will have no effect on [Writer.stderr].
Unfortunately, the sharing is necessary because Async uses OS threads to do [write()]
syscalls using the writer buffer. When calling a program that redirects stdout and
stderr to the same file, as in:
{v
foo.exe >/tmp/z.file 2>&1
v}
if [Writer.stdout] and [Writer.stderr] weren't the same writer, then they could have
threads simultaneously writing to the same file, which could easily cause data
loss. *)valstdout:tLazy.tvalstderr:tLazy.ttypebuffer_age_limit=[`At_mostofTime.Span.t|`Unlimited][@@derivingbin_io,sexp](** [create ?buf_len ?syscall ?buffer_age_limit fd] creates a new writer. The file
descriptor [fd] should not be in use for writing by anything else.
By default, a write system call occurs at the end of a cycle in which bytes were
written. One can supply [~syscall:(`Periodic span)] to get better performance. This
batches writes together, doing the write system call periodically according to the
supplied span.
A writer can asynchronously fail if the underlying write syscall returns an error,
e.g., [EBADF], [EPIPE], [ECONNRESET], ....
[buffer_age_limit] specifies how backed up you can get before raising an exception.
The default is [`Unlimited] for files, and 2 minutes for other kinds of file
descriptors. You can supply [`Unlimited] to turn off buffer-age checks.
[raise_when_consumer_leaves] specifies whether the writer should raise an exception
when the consumer receiving bytes from the writer leaves, i.e., in Unix, the write
syscall returns [EPIPE] or [ECONNRESET]. If [not raise_when_consumer_leaves], then
the writer will silently drop all writes after the consumer leaves, and the writer
will eventually fail with a writer-buffer-older-than error if the application remains
open long enough.
[line_ending] determines how [newline] and [write_line] terminate lines by default.
If [line_ending = Unix] then end of line is ["\n"]; if [line_ending = Dos] then end of
line is ["\r\n"]. Note that [line_ending = Dos] is not equivalent to opening the file
in text mode because any "\n" characters being printed by other means (e.g., [write
"\n"]) are still written verbatim (in Unix style).
[time_source] is useful in tests to trigger [buffer_age_limit]-related conditions, or
simply to have the result of (for example) [flushed_time_ns] agree with your test's
synthetic time. It is also used to schedule the [`Periodic] syscalls.
[buf_len] specifies the initial size of the internal buffer. This buffer will be
automatically resized up if more data is written than the buffer has room for, e.g.
using the [write*] functions. Note that buffers at least 128 KiB in size will be
allocated with mmap (see [bigstring_unix_stubs.c]); buffers smaller than that will
go on the C heap directly, which can cause C heap fragmentation in programs that
allocate lots of buffers, e.g. RPC servers, possibly resulting in unintuitive higher
overall program memory usage. *)valcreate:?buf_len:int(** default is 130 KiB *)->?syscall:[`Per_cycle|`PeriodicofTime.Span.t]->?buffer_age_limit:buffer_age_limit->?raise_when_consumer_leaves:bool(** default is [true] *)->?line_ending:Line_ending.t(** default is [Unix] *)->?time_source:[>read]Time_source.T1.t(** default is [Time_source.wall_clock ()] *)->Fd.t->tvalraise_when_consumer_leaves:t->bool(** [set_raise_when_consumer_leaves t bool] sets the [raise_when_consumer_leaves] flag of
[t], which determies how [t] responds to a write system call raising [EPIPE] and
[ECONNRESET] (see [create]). *)valset_raise_when_consumer_leaves:t->bool->unit(** [set_buffer_age_limit t buffer_age_limit] replaces the existing buffer age limit with
the new one. This is useful for stdout and stderr, which are lazily created in a
context that does not allow applications to specify [buffer_age_limit]. *)valset_buffer_age_limit:t->buffer_age_limit->unit(** [consumer_left t] returns a deferred that becomes determined when [t] attempts to
write to a pipe that broke because the consumer on the other side left. *)valconsumer_left:t->unitDeferred.tvalof_out_channel:Out_channel.t->Fd.Kind.t->t(** [open_file file] opens [file] for writing and returns a writer for it. It uses
[Unix_syscalls.openfile] to open the file. See [create] for the meanings of the
arguments. *)valopen_file:?info:Info.t(** for errors. Defaults to the file path. *)->?append:bool(** default is [false], meaning truncate instead *)->?buf_len:int->?syscall:[`Per_cycle|`PeriodicofTime.Span.t]->?perm:int(** default is [0o666] *)->?line_ending:Line_ending.t(** default is [Unix] *)->?time_source:[>read]Time_source.T1.t(** default is [Time_source.wall_clock ()] *)->string->tDeferred.t(** [with_file file ~f] opens [file] for writing, creates a writer [t], and runs [f t] to
obtain a deferred [d]. When [d] becomes determined, the writer is closed. When the
close completes, the result of [with_file] becomes determined with the value of [d].
There is no need to call [Writer.flushed] to ensure that [with_file] waits for the
writer to be flushed before closing it. [Writer.close] will already wait for the
flush.
[exclusive = true] uses a filesystem lock to try and make sure that the file is not
modified during a concurrent read or write operation. This is an advisory lock, which
means that the reader must be cooperating by taking a relevant lock when writing (see
[Reader.with_file]). This is unrelated and should not be confused with the [O_EXCL]
flag in [open] systemcall. Note that the implementation uses [Unix.lockf], which has
known pitfalls. It's recommended that you avoid the [exclusive] flag in favor of
using a library dedicated to dealing with file locks where the pitfalls can be
documented in detail.
*)valwith_file:?perm:int(** default is [0o666] *)->?append:bool(** default is [false], meaning truncate instead *)->?syscall:[`Per_cycle|`PeriodicofTime.Span.t]->?exclusive:bool(** default is [false] *)->?line_ending:Line_ending.t(** default is [Unix] *)->?time_source:[>read]Time_source.T1.t(** default is [Time_source.wall_clock ()] *)->string->f:(t->'aDeferred.t)->'aDeferred.t(** [id] returns an id for this writer that is unique among all other writers. *)valid:t->Id.t(** [fd] returns the [Fd.t] used to create this writer. *)valfd:t->Fd.t(** [set_fd t fd] sets the [fd] used by [t] for its underlying system calls. It first
waits until everything being sent to the current [fd] is flushed. Of course, one must
understand how the writer works and what one is doing to use this. *)valset_fd:t->Fd.t->unitDeferred.t(** [write_gen t a] writes [a] to writer [t], with [length] specifying the number of bytes
needed and [blit_to_bigstring] blitting [a] directly into the [t]'s buffer. If one
has a type that has [length] and [blit_to_bigstring] functions, like:
{[
module A : sig
type t
val length : t -> int
val blit_to_bigstring : (t, Bigstring.t) Blit.blit
end ]}
then one can use [write_gen] to implement a custom analog of [Writer.write], like:
{[
module Write_a : sig
val write : ?pos:int -> ?len:int -> A.t -> Writer.t -> unit
end = struct
let write ?pos ?len a writer =
Writer.write_gen
~length:A.length
~blit_to_bigstring:A.blit_to_bigstring
?pos ?len writer a
end ]}
In some cases it may be difficult to write only part of a value:
{[
module B : sig
type t
val length : t -> int
val blit_to_bigstring : t -> Bigstring.t -> pos:int -> unit
end ]}
In these cases, use [write_gen_whole] instead. It never requires writing only part of
a value, although it is potentially less space-efficient. It may waste portions of
previously-allocated write buffers if they are too small.
{[
module Write_b : sig
val write : B.t -> Writer.t -> unit
end = struct
let write b writer =
Writer.write_gen_whole
~length:B.length
~blit_to_bigstring:B.blit_to_bigstring
writer b
end ]}
Note: [write_gen] and [write_gen_whole] give you access to the writer's internal
buffer. You should not capture it; doing so might lead to errors of the segfault
kind. *)valwrite_gen:?pos:int->?len:int->t->'a->blit_to_bigstring:(src:'a->src_pos:int->dst:Bigstring.t->dst_pos:int->len:int->unit)->length:('a->int)->unitvalwrite_gen_whole:t->'a->blit_to_bigstring:('a->Bigstring.t->pos:int->unit)->length:('a->int)->unit(** [write_direct t ~f] gives [t]'s internal buffer to [f]. [pos] and [len] define the
portion of the buffer that can be filled. [f] must return a pair [(x, written)] where
[written] is the number of bytes written to the buffer at [pos]. [write_direct]
raises if [written < 0 || written > len]. [write_direct] returns [Some x], or [None]
if the writer is stopped. By using [write_direct] only, one can ensure that the
writer's internal buffer never grows. Look at the [write_direct] expect tests for an
example of how this can be used to construct a [write_string] like function that never
grows the internal buffer. *)valwrite_direct:t->f:(Bigstring.t->pos:int->len:int->'a*int)->'aoption(** [write ?pos ?len t s] adds a job to the writer's queue of pending writes. The
contents of the string are copied to an internal buffer before [write] returns, so
clients can do whatever they want with [s] after that. *)valwrite_bytes:?pos:int->?len:int->t->Bytes.t->unitvalwrite:?pos:int->?len:int->t->string->unitvalwrite_bigstring:?pos:int->?len:int->t->Bigstring.t->unitvalwrite_iobuf:?pos:int->?len:int->t->([>read],_)Iobuf.t->unitvalwrite_substring:t->Substring.t->unitvalwrite_bigsubstring:t->Bigsubstring.t->unitvalwritef:t->('a,unit,string,unit)format4->'a(** [to_formatter] returns an OCaml-formatter that one can print to using
{!Format.fprintf}. Note that flushing the formatter will only submit all buffered
data to the writer, but does {e not} guarantee flushing to the operating system. *)valto_formatter:t->Format.formatter(** [write_char t c] writes the character. *)valwrite_char:t->char->unit(** [newline t] writes the end-of-line terminator. [line_ending] can override [t]'s
[line_ending]. *)valnewline:?line_ending:Line_ending.t->t->unit(** [write_line t s ?line_ending] is [write t s; newline t ?line_ending]. *)valwrite_line:?line_ending:Line_ending.t->t->string->unit(** [write_byte t i] writes one 8-bit integer (as the single character with that code).
The given integer is taken modulo 256. *)valwrite_byte:t->int->unitmoduleTerminate_with:sigtypet=|Newline|Space_if_needed[@@derivingsexp_of]end(** [write_sexp t sexp] writes to [t] the string representation of [sexp], possibly
followed by a terminating character as per [Terminate_with]. With
[~terminate_with:Newline], the terminating character is a newline. With
[~terminate_with:Space_if_needed], if a space is needed to ensure that the sexp reader
knows that it has reached the end of the sexp, then the terminating character will be
a space; otherwise, no terminating character is added. A terminating space is needed
if the string representation doesn't end in [')'] or ['"']. *)valwrite_sexp:?hum:bool(** default is [false] *)->?terminate_with:Terminate_with.t(** default is [Space_if_needed] *)->t->Sexp.t->unit(** [write_bin_prot] writes out a value using its bin_prot sizer/writer pair. The format
is the "size-prefixed binary protocol", in which the length of the data is written
before the data itself. This is the format that [Reader.read_bin_prot] reads. *)valwrite_bin_prot:t->'aBin_prot.Type_class.writer->'a->unit(** Writes out a value using its bin_prot writer. Unlike [write_bin_prot], this doesn't
prefix the output with the size of the bin_prot blob. [size] is the expected size.
This function will raise if the bin_prot writer writes an amount other than [size]
bytes. *)valwrite_bin_prot_no_size_header:t->size:int->'aBin_prot.Write.writer->'a->unit(** Unlike the [write_] functions, all functions starting with [schedule_] require
flushing or closing of the writer after returning before it is safe to modify the
bigstrings which were directly or indirectly passed to these functions. The reason is
that these bigstrings will be read from directly when writing; their contents is not
copied to internal buffers.
This is important if users need to send the same large data string to a huge number of
clients simultaneously (e.g., on a cluster), because these functions then avoid
needlessly exhausting memory by sharing the data. *)(** [schedule_bigstring t bstr] schedules a write of bigstring [bstr]. It is not safe to
change the bigstring until the writer has been successfully flushed or closed after
this operation. *)valschedule_bigstring:t->?pos:int->?len:int->Bigstring.t->unitvalschedule_bigsubstring:t->Bigsubstring.t->unit(** [schedule_iobuf_peek] is like [schedule_bigstring], but for an iobuf. It is not safe
to change the iobuf until the writer has been successfully flushed or closed after
this operation. *)valschedule_iobuf_peek:t->?pos:int->?len:int->([>read],_)Iobuf.t->unit(** [schedule_iobuf_consume] is like [schedule_iobuf_peek]. Once the result is determined,
the iobuf will be fully consumed (or advanced by [min len (Iobuf.length iobuf)] if
[len] is specified), and the writer will be flushed. *)valschedule_iobuf_consume:t->?len:int->([>read],Iobuf.seek)Iobuf.t->unitDeferred.tmoduleDestroy_or_keep:sigtypet=|Destroy|Keep[@@derivingsexp_of]end(** [schedule_iovec t iovec] schedules a write of I/O-vector [iovec]. It is not safe to
change the bigstrings underlying the I/O-vector until the writer has been successfully
flushed or closed after this operation. *)valschedule_iovec:?destroy_or_keep:Destroy_or_keep.t(** default is [Keep] *)->t->Bigstring.tUnix.IOVec.t->unit(** [schedule_iovecs t iovecs] like {!schedule_iovec}, but takes a whole queue [iovecs] of
I/O-vectors as argument. The queue is guaranteed to be empty when this function
returns and can be modified. It is not safe to change the bigstrings underlying the
I/O-vectors until the writer has been successfully flushed or closed after this
operation. *)valschedule_iovecs:t->Bigstring.tUnix.IOVec.tQueue.t->unitmoduleFlush_result:sigtypet=|Error(** [Error] is accompanied by a detailed error being sent to the writer's monitor. *)|Consumer_left(** [Consumer_left] is returned when the consumer leaves (see {!consumer_left}) and
{!raise_when_consumer_leaves} is set to [false]. If that flag is set to [true],
then you get an [Error] instead. *)|Force_closed(** [Force_closed] is returned when [Writer.close] is forced before all data is
flushed (see [?force_close]). *)|FlushedofTime_ns.t(** The time just after the [write()] system call returned or
the time [flushed_*] was called if all the writes were already flushed by then. *)[@@derivingsexp_of]end(** [flushed_or_failed_with_result t] returns a deferred that will become determined when
all prior writes complete (i.e. the [write()] system call returns), or when any of
them fail.
Handling the [Error] case can be tricky due to the following race: the result gets
determined concurrently with the exception propagation through the writer's monitor.
The caller needs to make sure that the program behavior does not depend on which
signal propagates first.
*)valflushed_or_failed_with_result:t->Flush_result.tDeferred.t(** [flushed_or_failed_unit t] returns a deferred that will become
determined when all prior writes complete, or when any of them fail.
Unlike {!flushed_or_failed_with_result}, its return value gives you no indication of
which happened. In the [Error] case, the result will be determined in parallel with
the error propagating to the writer's monitor. The caller should robustly handle
either side winning that race.
*)valflushed_or_failed_unit:t->unitDeferred.t(** [flushed t] returns a deferred that will become determined when all prior writes
complete (i.e. the [write()] system call returns). If a prior write fails, then the
deferred will never become determined.
It is OK to call [flushed t] after [t] has been closed. *)valflushed:t->unitDeferred.tvalflushed_time:t->Time.tDeferred.tvalflushed_time_ns:t->Time_ns.tDeferred.t(** [fsync t] calls [flushed t] before calling [Unix.fsync] on the underlying
file descriptor *)valfsync:t->unitDeferred.t(** [fdatasync t] calls [flushed t] before calling [Unix.fdatasync] on the
underlying file descriptor *)valfdatasync:t->unitDeferred.t(** [send] writes a string to the writer that can be read back using [Reader.recv]. *)valsend:t->string->unit(** [monitor t] returns the writer's monitor. *)valmonitor:t->Monitor.t(** [close ?force_close t] waits for the writer to be flushed, and then calls [Unix.close]
on the underlying file descriptor. [force_close] causes the [Unix.close] to happen
even if the flush hangs. By default [force_close] is [Deferred.never ()] for files
and [after (sec 5)] for other types of file descriptors (e.g., sockets). If the close
is forced, data in the writer's buffer may not be written to the file descriptor. You
can check this by calling [bytes_to_write] after [close] finishes.
WARNING: [force_close] will not reliably stop any write that is in progress.
If there are any in-flight system calls, it will wait for them to finish, which
includes [writev], which can legitimately block forever.
[close] will raise an exception if the [Unix.close] on the underlying file descriptor
fails.
You must call [close] on a writer in order to close the underlying file descriptor.
Not doing so will cause a file descriptor leak. It also will cause a space leak,
because until the writer is closed, it is held on to in order to flush the writer on
shutdown.
It is an error to call other operations on [t] after [close t] has been called, except
that calls of [close] subsequent to the original call to [close] will return the same
deferred as the original call.
[close_started t] becomes determined as soon as [close] is called.
[close_finished t] becomes determined after [t]'s underlying file descriptor has been
closed, i.e., it is the same as the result of [close]. [close_finished] differs from
[close] in that it does not have the side effect of initiating a close.
[is_closed t] returns [true] iff [close t] has been called.
[is_open t] is [not (is_closed t)]
[with_close t ~f] runs [f ()], and closes [t] after [f] finishes or raises. *)valclose:?force_close:unitDeferred.t->t->unitDeferred.tvalclose_started:t->unitDeferred.tvalclose_finished:t->unitDeferred.tvalis_closed:t->boolvalis_open:t->boolvalwith_close:t->f:(unit->'aDeferred.t)->'aDeferred.t(** [can_write t] returns [true] if calls to [write*] functions on [t] are allowed. If
[is_open t] then [can_write t]. But one can have [is_closed t] and [can_write t],
during the time after [close t] before closing has finished. *)valcan_write:t->bool(** Errors raised within the writer can stop the background job that flushes out the
writer's buffers. [is_stopped_permanently] returns [true] when the background job has
stopped. [stopped_permanently] becomes determined when the background job has
stopped. *)valis_stopped_permanently:t->boolvalstopped_permanently:t->unitDeferred.t(** In addition to flushing its internal buffer prior to closing, a writer keeps track of
producers that are feeding it data, so that when [Writer.close] is called, it does the
following:
+ requests that the writer's producers flush their data to it
+ flushes the writer's internal buffer
+ calls [Unix.close] on the writer's underlying file descriptor
[with_flushed_at_close t ~flushed ~f] calls [f] and adds [flushed] to the set of
producers that should be flushed-at-close, for the duration of [f]. *)valwith_flushed_at_close:t->flushed:(unit->unitDeferred.t)->f:(unit->'aDeferred.t)->'aDeferred.t(** [bytes_to_write t] returns how many bytes have been requested to write but have not
yet been written. *)valbytes_to_write:t->int(** [bytes_written t] returns how many bytes have been written. *)valbytes_written:t->Int63.t(** [bytes_received t] returns how many bytes have been received by the writer. As long
as the writer is running, [bytes_received = bytes_written + bytes_to_write]. *)valbytes_received:t->Int63.t(** [with_file_atomic ?temp_file ?perm ?fsync ?replace_special file ~f] creates a writer
to a temp file, feeds that writer to [f], and when the result of [f] becomes
determined, atomically moves (using [Unix.rename]) the temp file to [file]. If [file]
currently exists and is a regular file (see below regarding [replace_special]) it will
be replaced, even if it is read-only.
The temp file will be [file] (or [temp_file] if supplied) suffixed by a unique random
sequence of six characters. The temp file will be removed if an exception is raised to
the monitor of [f] before the result of [f] becomes determined. However, if the
program exits for some other reason, the temp file may not be cleaned up; so it may be
prudent to choose a temp file that can be easily found by cleanup tools.
If [fsync] is [true], the temp file will be flushed to disk before it takes the place
of the target file, thus guaranteeing that the target file will always be in a sound
state, even after a machine crash. Since synchronization is extremely slow, this is
not the default. Think carefully about the event of machine crashes and whether you
may need this option!
If [replace_special] is [false] (the default) an existing special [file] (block or
character device, socket or FIFO) will not be replaced by a regular file, the
temporary file is not created and an exception is raised. To explicitly replace an
existing special [file], [replace_special] must be passed as [true]. Note that
if [file] exists and is a directory, the rename will fail; if [file] exists and is
a symbolic link, the link will be replaced, not the target (as per [Unix.rename]).
We intend for [with_file_atomic] to mimic the behavior of the [open] system call, so
if [file] does not exist, we will apply the current umask to [perm] (the effective
permissions become [perm land lnot umask], see [man 2 open]). However, if [file] does
exist and [perm] is specified, we do something different from [open] system call: we
override the permission with [perm], ignoring the umask. This means that if you
create and then immediately overwrite the file with [with_file_atomic ~perm], then the
umask will be honored the first time and ignored the second time. If [perm] is not
specified, then any existing file permissions are preserved.
If [f] closes the writer passed to it, [with_file_atomic] raises and does not create
[file].
*)valwith_file_atomic:?temp_file:string->?perm:Unix.file_perm->?fsync:bool(** default is [false] *)->?replace_special:bool(** default is [false] *)->?time_source:[>read]Time_source.T1.t(** default is [Time_source.wall_clock ()] *)->string->f:(t->'aDeferred.t)->'aDeferred.t(** [save] is a special case of [with_file_atomic] that atomically writes the given
string to the specified file. *)valsave:?temp_file:string->?perm:Unix.file_perm->?fsync:bool(** default is [false] *)->?replace_special:bool(** default is [false] *)->string->contents:string->unitDeferred.t(** [save_lines file lines] writes all lines in [lines] to [file], with each line followed
by a newline. *)valsave_lines:?temp_file:string->?perm:Unix.file_perm->?fsync:bool(** default is [false] *)->?replace_special:bool(** default is [false] *)->string->stringlist->unitDeferred.t(** [save_sexp] is a special case of [with_file_atomic] that atomically writes the
given sexp to the specified file.
[save_sexp t sexp] writes [sexp] to [t], followed by a newline. To read a file
produced using [save_sexp], one would typically use [Reader.load_sexp], which deals
with the additional whitespace and works nicely with converting the sexp to a
value. *)valsave_sexp:?temp_file:string->?perm:Unix.file_perm->?fsync:bool(** default is [false] *)->?replace_special:bool(** default is [false] *)->?hum:bool(** default is [true] *)->string->Sexp.t->unitDeferred.t(** [save_sexps] works similarly to [save_sexp], but saves a sequence of sexps instead,
separated by newlines. There is a corresponding [Reader.load_sexps] for reading back
in. *)valsave_sexps:?temp_file:string->?perm:Unix.file_perm->?fsync:bool(** default is [false] *)->?replace_special:bool(** default is [false] *)->?hum:bool(** default is [true] *)->string->Sexp.tlist->unitDeferred.t(** [save_sexps_conv] is like [save_sexps], but converts to sexps internally, one at a
time. This avoids allocating the list of sexps up front, which can be costly. The
default values of the parameters are the same as [save_sexps]. *)valsave_sexps_conv:?temp_file:string->?perm:int->?fsync:bool(** default is [false] *)->?replace_special:bool(** default is [false] *)->?hum:bool->string->'alist->('a->Sexp.t)->unitDeferred.t(** [save_bin_prot t bin_writer 'a] is a special case of [with_file_atomic] that writes
['a] to [t] using its bin_writer, in the
size-prefixed format, like [write_bin_prot]. To read a file produced using
[save_bin_prot], one would typically use [Reader.load_bin_prot]. *)valsave_bin_prot:?temp_file:string->?perm:Unix.file_perm->?fsync:bool(** default is [false] *)->?replace_special:bool(** default is [false] *)->string->'aBin_prot.Type_class.writer->'a->unitDeferred.t(** [transfer' t pipe_r f] repeatedly reads values from [pipe_r] and feeds them to [f],
which should in turn write them to [t]. It provides pushback to [pipe_r] by not
reading when [t] cannot keep up with the data being pushed in.
By default, each read from [pipe_r] reads all the values in [pipe_r]. One can supply
[max_num_values_per_read] to limit the number of values per read.
The [transfer'] stops and the result becomes determined when [stop] becomes
determined, when [pipe_r] reaches its EOF, when [t] is closed, or when [t]'s consumer
leaves. In the latter two cases, [transfer'] closes [pipe_r].
[transfer'] causes [Pipe.flushed] on [pipe_r]'s writer to ensure that the bytes have
been flushed to [t] before returning. It also waits on [Pipe.upstream_flushed] at
shutdown.
[transfer t pipe_r f] is equivalent to:
{[
transfer' t pipe_r (fun q -> Queue.iter q ~f; return ()) ]} *)valtransfer':?stop:unitDeferred.t->?max_num_values_per_read:int->t->'aPipe.Reader.t->('aQueue.t->unitDeferred.t)->unitDeferred.tvaltransfer:?stop:unitDeferred.t->?max_num_values_per_read:int->t->'aPipe.Reader.t->('a->unit)->unitDeferred.t(** [pipe t] returns the writing end of a pipe attached to [t] that pushes back when [t]
cannot keep up with the data being pushed in. Closing the pipe does not close [t]. *)valpipe:t->stringPipe.Writer.t(** [behave_nicely_in_pipeline ~writers ()] causes the program to exit with status 141
(indicating SIGPIPE) if any of the consumers of [writers] go away. It also sets the
buffer age to unlimited, in case there is a human (e.g., using [less]) on the other
side of the pipeline.
This can be called at the toplevel of a program, before [Command.run] for instance.
(this function doesn't start the async scheduler). *)valbehave_nicely_in_pipeline:?writers:tlist(** defaults to [stdout; stderr] *)->unit->unit(** [set_synchronous_out_channel t out_channel] waits until [byte_to_write t = 0], and
then mutates [t] so that all future writes to [t] synchronously call
[Out_channel.output*] functions to send data to the OS immediately.
[set_synchronous_out_channel] is used by expect tests to ensure that the interleaving
between calls to [Core.printf] (and similar IO functions) and [Async.printf] generates
output with the same interleaving. [set_synchronous_out_channel] is idempotent. *)valset_synchronous_out_channel:t->Out_channel.t->unitDeferred.t(** [using_synchronous_backing_out_channel t = true] if writes to [t] are being done
synchronously, e.g., due to [set_synchronous_out_channel],
[set_synchronous_backing_out_channel], [use_synchronous_stdout_and_stderr]. *)valusing_synchronous_backing_out_channel:t->bool(** [clear_synchronous_out_channel t] restores [t] to its normal state, with the
background writer asynchronously feeding data to the OS.
[clear_synchronous_out_channel] is idempotent. *)valclear_synchronous_out_channel:t->unitvalwith_synchronous_out_channel:t->Out_channel.t->f:(unit->'aDeferred.t)->'aDeferred.t(** [use_synchronous_stdout_and_stderr ()] causes all subsequent writes to
stdout and stderr to occur synchronously (after any pending writes have
flushed).
This ensures [printf]-family writes happen immediately, which avoids two
common sources of confusion:
{ul
{li unexpected interleaving of [Core.printf] and [Async.printf] calls; and}
{li [Async.printf] calls that don't get flushed before an application exits}}
The disadvantages are:
{ul
{li this makes writes blocking, which can delay unrelated asynchronous jobs until
the consumer stops pushing back; and}
{li the errors raised by write are different and it won't respect
{!behave_nicely_in_pipeline} anymore}} *)valuse_synchronous_stdout_and_stderr:unit->unitDeferred.t(** [Backing_out_channel] generalizes [Out_channel] to a narrow interface that can be used
to collect strings, etc. *)moduleBacking_out_channel:sigtypet[@@derivingsexp_of]valcreate:output:(bigstring->pos:int->len:int->unit)->flush:(unit->unit)->sexp:(unit->Sexp.t)->tvalof_out_channel:Out_channel.t->tendvalset_synchronous_backing_out_channel:t->Backing_out_channel.t->unitDeferred.tvalwith_synchronous_backing_out_channel:t->Backing_out_channel.t->f:(unit->'aDeferred.t)->'aDeferred.t(**/**)modulePrivate:sigmoduleCheck_buffer_age:sigmoduleInternal_for_unit_test:sigvalcheck_now:check_invariants:bool->time_source:Time_source.t->unitvalnum_active_checks_for:Time_source.t->intoptionendendvalset_bytes_received:t->Int63.t->unitvalset_bytes_written:t->Int63.t->unitendendmoduletypeWriter=sigincludeWriter0(** [of_pipe info pipe_w] returns a writer [t] such that data written to [t] will appear
on [pipe_w]. If either [t] or [pipe_w] are closed, the other is closed as well.
[of_pipe] is implemented by attaching [t] to the write-end of a Unix pipe, and
shuttling bytes from the read-end of the Unix pipe to [pipe_w]. *)valof_pipe:?time_source:[>read]Time_source.T1.t(** default is [Time_source.wall_clock ()] *)->Info.t->stringPipe.Writer.t->(t*[`Closed_and_flushed_downstreamofunitDeferred.t])Deferred.tend