123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392openCoreopenAsync(** {0 Multispool}
Multispool allows multiple, separate processes to cooperate via a filesystem-based
queue. The design was influenced by various UNIX tools that use typical POSIX-y
write/fsync/close/rename semantics to provide atomic filesystem operations (Maildir,
in particular--see http://www.qmail.org/man/man5/maildir.html for an overview).
One or more processes may place files in a queue, wait for files to appear in queues
(and handle them), or iterate over files in a queue.
{1 Usage Summary}
A spool is physically represented by a directory, and logically as a module created by
applying the [Multispool] functor. A spool deals in a data type that implements the
[Spoolable] interface. This interface tells the spool how to encode and decode items
for on-disk storage and how to map queue names to directories on disk. See
../test/lib/widget.ml for an example [Spoolable] implementation.
An existing spool is opened with [load] and a new one is created with [create].
Use [enqueue] to add items to a queue, optionally reserving a name beforehand with
[reserve_name].
If you want to wait on entries to appear, use [Queue_reader.iter]. If you want to
make periodic passes over all entries in a queue, use [Queue_reader.iter_available].
Lower-level functionality is available in the [Expert] module.
{1 Implementation Details}
Enqueueing attempts to create a file with a unique name within the .registry/
directory by open(2)ing the file with the O_CREAT and O_EXCL flag (which will fail if
the name exists). If it fails, a new name is generated via [Spoolable.Name_generator]
and the process repeats. Once a file is created in .registry/, it remains as a name
"reservation" and a file (with the same name) is created in the desired queue. Keeping
the empty file in .registry/ ensures that no other process can create the same file
name while this name is in use within the spool. *)moduleName_generator=struct(** Generates filenames for enqueued [Spoolable]s. [t] is a user-supplied input to name
generation via [name]. *)moduletypeS=sigmoduleUnique_name:sigtypetvalto_string:t->stringendtypetvalnext:t->attempt:int->Unique_name.tendendmoduleSpoolable=structmoduletypeS=sig(** [Spoolable.Metadata.t] should be smallish since it is read and written more
frequently than [Spoolable.Data.t]. *)moduleMetadata:sigtypet(** [of_string] and [to_string] are used to persist and read [t] on disk. *)includeStringable.Swithtypet:=tend(** [Spoolable.Data.t] is where the "real" data lives and it allows for data-specific
[load] and [save] functionality. *)moduleData:sigtypetvalload:string->tDeferred.Or_error.tvalsave:?temp_file:string->t->string->unitDeferred.Or_error.tend(** [Queue.t] is an enumerable type that represents the available queues and the
mapping to directory names on-disk. *)moduleQueue:sigtypet[@@derivingsexp,enumerate,compare]valto_dirname:t->stringendmoduleName_generator:Name_generator.S(** All operations that touch disk are passed through [Throttle.enqueue] *)moduleThrottle:sigvalenqueue:(unit->'aDeferred.t)->'aDeferred.tendendendmoduletypeS=sigtypet[@@derivingsexp_of]typespool=tmoduleSpoolable:Spoolable.Svaldir:t->string(** Open a [Multispool.t]. This function will fail by default if the spool directory
does not exist, does not look like a spool, or does not contain the set of
directories named after the strings returned by [Spoolable.Queue.to_dir]. Pass
[~create_if_missing:()] to create the necessary directories.
Note that, even if [~create_if_missing:()] is specified, this function will still
fail if the supplied directory is non-empty and not already a spool. *)valload:?create_if_missing:unit->string->tDeferred.Or_error.t(** Open a [Multispool.t] with no spool directory validation. *)valload_unsafe:string->t(** Open a [Multispool.t] and create the spool directory if necessary. This is
functionally identical to [load ?create_if_missing:()]. *)valcreate:string->tDeferred.Or_error.t(** Provide access to a [Spoolable.Data.t]. [Data_file.t] functions as a "handle" to
the underlying data so the user can choose when to read a [Spoolable.Data.t]. *)moduleData_file:sigtypetvalpath:t->stringvalload:t->Spoolable.Data.tDeferred.Or_error.tvalsave:t->contents:Spoolable.Data.t->unitDeferred.Or_error.tvalstat:t->Unix.Stats.tDeferred.Or_error.tend(** An [Entry] is associated with a particular queue *)moduleEntry:sigtypet[@@derivingsexp_of]valstat:t->Unix.Stats.tDeferred.Or_error.tvalspool:t->spoolvalqueue:t->Spoolable.Queue.tvalname:t->string(** create an [Entry.t] from a file name on disk. There is no validation done to
ensure that the corresponding entry exists in the spool. The validation is
performed when using the [Entry.t]. *)valcreate:spool->Spoolable.Queue.t->name:string->t(** Direct operations that provide no validation or exclusive access guarantees. *)moduleDirect:sig(** No checkout is performed and the data is read directly from the queue file. If
you need to later update this data, consider revalidating the contents after
checkout and before writing. *)valcontents:t->Spoolable.Metadata.tDeferred.Or_error.t(** Get the data_file associated with an [Entry.t]. It is unsafe to operate on this
directly ouside of a checkout, much like [contents]. *)valdata_file:t->Data_file.t(** Save contents directly to the file path derived from [t]. There are no
validation or exclusive access guarantees. This will atomically clobber over any
existing file. *)valsave:t->Spoolable.Metadata.t->unitDeferred.Or_error.t(** Delete an [Entry.t] from disk along with its registry file and data_file. There
are no validation or exclusive access guarantees. *)valremove:t->unitDeferred.Or_error.tendendvallist:t->Spoolable.Queue.t->Entry.tlistDeferred.Or_error.tmoduleUnique_name:sigvalreserve:spool->Spoolable.Name_generator.t->Spoolable.Name_generator.Unique_name.tDeferred.Or_error.tend(** Add a [Spoolable] to a queue. An [Entry.t] is returned, but it may make sense to
ignore it. *)valenqueue:t->Spoolable.Queue.t->Spoolable.Metadata.t->Spoolable.Data.t->[`ReserveofSpoolable.Name_generator.t|`UseofSpoolable.Name_generator.Unique_name.t]->Entry.tDeferred.Or_error.t(** Do something with the contents of an entry within [f]. Use [with_entry] if you
expect to be the only user of an [Entry.t] and it is an error if the Entry.t is
grabbed by another process (or otherwise disappears). See [checkout] for a
lower-level interface. *)valwith_entry:f:(Spoolable.Metadata.t->Data_file.t->([`SaveofSpoolable.Metadata.t*Spoolable.Queue.t|`Remove]*'a)Deferred.t)->Entry.t->'aDeferred.Or_error.t(** Like [with_entry], but use [with_entry'] if you expect that another process might
race to grab an [Entry.t] and want straightforward handling. See [checkout'] for a
lower-level interface.*)valwith_entry':f:(Spoolable.Metadata.t->Data_file.t->([`SaveofSpoolable.Metadata.t*Spoolable.Queue.t|`Remove]*'a)Deferred.t)->Entry.t->[`Okof'a|`Not_found]Deferred.Or_error.t(** Interface for iteration and waiting on queue activity. Multiple processes will not
interfere with one another. *)moduleQueue_reader:sigtypetvalcreate:spool->Spoolable.Queue.t->tDeferred.Or_error.t(** Iterate over entries in a queue and call [f] on each, and wait for a new entry if
the list is exhausted. *)valiter:?stop:unitDeferred.t->f:(Spoolable.Metadata.t->Data_file.t->[`SaveofSpoolable.Metadata.t*Spoolable.Queue.t|`Remove]Deferred.t)->t->unitDeferred.Or_error.t(** Iterate over entries in a queue and call [f] on each, if any are available. Do
not wait. *)valiter_available:f:(Spoolable.Metadata.t->Data_file.t->[`SaveofSpoolable.Metadata.t*Spoolable.Queue.t|`Remove]Deferred.t)->t->unitDeferred.Or_error.tendmoduleExpert:sig(** A spooled entry that is checked out, independent of any particular queue. No
other process using this interface will be able to interfere with a
[Checked_out_entry.t] (unlike an [Entry.t], which may be stolen out from under
you). *)moduleChecked_out_entry:sigtypetvalname:t->stringvalqueue:t->Spoolable.Queue.tvalcontents:t->Spoolable.Metadata.tvalupdate:t->f:(Spoolable.Metadata.t->Spoolable.Metadata.t)->tvaldata_file:t->Data_file.t(** Atomically return a [Checked_out_entry.t] to a queue. The [Checked_out_entry.t]
should be forgotten after this. *)valsave:t->Spoolable.Queue.t->unitDeferred.Or_error.t(** Delete a [Checked_out_entry.t] (along with its registry file and data_file).
The [Checked_out_entry.t] should be forgotten after this. *)valremove:t->unitDeferred.Or_error.tend(** Check out an [Entry.t]. Use [checkout] if you expect to be the only user of an
[Entry.t] and it is an error if the Entry.t is grabbed by another process (or does
not exist). See [with_entry] for a higher-level interface. *)valcheckout:Entry.t->Checked_out_entry.tDeferred.Or_error.t(** Check out an [Entry.t]. Use [checkout'] if you expect that another process might
race to grab an [Entry.t]. See [with_entry'] for a higher-level interface. *)valcheckout':Entry.t->[`Not_found|`OkofChecked_out_entry.t]Deferred.Or_error.t(** Get a hold of all currently checked out entries in the given [queue].
This operation breaks the invariant that each [t] has a single owner. It should
only be used in cases where it is easy to reason about what processes are
potentially manipulating the spool. *)vallist_checkouts_unsafe:spool->Spoolable.Queue.t->Checked_out_entry.tlistDeferred.Or_error.tmoduleQueue_reader:sig(** Wait for and dequeue the next entry that appears. *)valdequeue:?stop:unitDeferred.t->Queue_reader.t->[`Stopped|`Checked_outofChecked_out_entry.t*Queue_reader.t]Deferred.Or_error.t(** Dequeue the next entry that that is available, if any. Do not wait. *)valdequeue_available:Queue_reader.t->([`Nothing_available|`Checked_outofChecked_out_entry.t]*Queue_reader.t)Deferred.Or_error.tendendend(** [Multispool.Monitor] provides consistency checks for a spool. *)moduleMonitor=structmoduletypeS=sigtypetmoduleSpoolable:Spoolable.SmoduleFile_with_mtime:sigtypet={filename:string;mtime:Time_float.t}[@@derivingsexp_of]endmoduleDir:sigtypet=|Registry|Tmp|Data|QueueofSpoolable.Queue.t|Queue_checkoutofSpoolable.Queue.tvalname_on_disk:t->stringendmoduleProblem:sigtypet=|Too_oldofFile_with_mtime.t*Dir.t|OrphanedofFile_with_mtime.t*Dir.t|DuplicatedofFile_with_mtime.t*Dir.tlist[@@derivingsexp_of,compare]includeComparable.S_plainwithtypet:=tendmoduleEvent:sigtypet=|StartofTime_float.t*Problem.t|EndofTime_float.t*Problem.t[@@derivingsexp_of,compare]includeComparable.S_plainwithtypet:=tendmoduleLimits:sigtypet={max_checked_out_age:Time_float.Span.t(* default: 10 minutes *);max_tmp_file_age:Time_float.Span.t(* default: 10 minutes *);max_queue_ages:(Spoolable.Queue.t*Time_float.Span.t)list}[@@derivingsexp]valcreate:?max_checked_out_age:Time_float.Span.t->?max_tmp_file_age:Time_float.Span.t->?max_queue_ages:(Spoolable.Queue.t*Time_float.Span.t)list->unit->tendmoduleSpec:sigtypet={spool_dir:string;limits:Limits.t}[@@derivingsexp]valcreate:spool_dir:string->limits:Limits.t->tvalparam:tCommand.Param.tend(** Does not create a spool. *)valcreate:Spec.t->tDeferred.Or_error.tvalrun_once:t->Problem.tlistDeferred.Or_error.tmoduleDaemon:sigtypemonitor=ttypet={check_every:Time_float.Span.t(* default: 15 seconds *);alert_after_cycles:int(* default: 2 cycles *)}valcreate:?check_every:Time_float.Span.t->?alert_after_cycles:int->unit->tvalparam:tCommand.Param.tvalstart:t->monitor:monitor->f:(Event.t->unitDeferred.t)->unitendendend