SchedulerSourceRunning tasks on a set number of forked processes.
The main content of this module is:
add_task to add tasks to a queue;run to run those tasks in separate worker processes;Message to send messages to and from workers;Timer to schedule delayed functions.The scheduler (i.e. run) maintains a pool of workers. Workers receive tasks from the scheduler, execute them and send the result to the scheduler. Tasks can be given a limited amount of time to run. New tasks can be added (with add_task) while the scheduler is running in response to events such as:
Examples of messages include:
Messages are meant to be used while a task is running. The return value of a task is also sent to the scheduler as a message, but this is handled transparently.
Examples of use cases for this library include:
make -j).Message sending functions are supposed to be run from a specific context:
Those message sending functions take a value of type worker_context or scheduler_context as proof. (There are ways to leak the contexts to other contexts but it would be a programming error.)
Values used by message sending functions meant to be called from a worker.
Values used by message sending functions meant to be called from the scheduler.
Get the current worker context.
Returns None if not currently in a worker process.
val add_task :
?sigterm:int ->
?term_timeout:float ->
?kill_timeout:float ->
?on_term_timeout:(unit -> unit) ->
?on_kill_timeout:(unit -> unit) ->
?on_start:(scheduler_context -> unit) ->
?on_message:(scheduler_context -> Message.t -> unit) ->
?on_finish:(('a, string) result -> unit) ->
'a Message.typ ->
(worker_context -> 'a) ->
unitAdd a task to the queue.
Usage: add_task typ execute
typ is a type description for values returned by execute. When a worker is ready to execute this task, this worker will run execute. Note that execute is serialized to the worker using Marshal. If this closure captures some variables, those variables should thus be serializable using Marshal.
add_task can be called before run, or while run is running (i.e. from an event handler like on_start, on_message, on_finish; or the on_empty_queue argument of run).
If term_timeout is specified, sigterm is sent to the worker if the task has not finished (successfully or not) after term_timeout seconds. sigterm defaults to Sys.sigterm.
If kill_timeout is specified but term_timeout is not, SIGKILL is sent to the worker if the task has not finished (successfully or not) after kill_timeout seconds.
If both term_timeout and kill_timeout are specified, sigterm is sent first, and if the task is not willing to end gracefully kill_timeout seconds after sigterm was sent, SIGKILL is sent as well. Note that in that case, kill_timeout is relative to the time sigterm was sent, not to the time the task started.
on_start is triggered when the task is sent to a worker. It takes a scheduler_context argument that allows to send a message to this worker, typically with additional information that was not known at the time the task was queued, such as a free port number that the worker can use.
on_message is triggered for each message that is sent from the worker. It also takes a scheduler_context argument to be able to respond.
on_finish is triggered with:
Ok result when the task returns successfully, in which case result is the return value of execute;Error error_message when the task fails, in which case error_message can be the result of Printexc.to_string (if execute raised an exception) or something else (e.g. if the worker died).Clear the queue of tasks.
This has no effect on tasks that are already running, because they have been removed from the queue.
val run :
?worker_idle_timeout:float ->
?worker_kill_timeout:float ->
?on_worker_kill_timeout:(unit -> unit) ->
?on_empty_queue:(unit -> unit) ->
?on_message:(Message.t -> unit) ->
?on_unexpected_worker_exit:(Unix.process_status -> unit) ->
fork:(unit -> int) ->
int ->
unitRun tasks until the queue is empty.
on_empty_queue is called when a worker is available and the task queue is empty. It can in particular use add_task to fill the queue.
on_message is called when a worker emits a message while not executing a task. This can happen in particular if you use at_exit. Messages received from a worker which is running a task are passed to the on_message of the corresponding add_task call instead.
When a worker exits:
on_finish is triggered with Error);on_unexpected_worker_exit is called so that you can emit a warning.If worker_idle_timeout is specified, workers stop if they are not given any task after worker_idle_timeout seconds of doing nothing. This can be useful to prevent workers from running forever, although in general they should detect that the scheduler is dead by receiving end of file while trying to receive their next task.
If worker_kill_timeout is specified, send SIGKILL to workers if they are still running worker_kill_timeout seconds after they were told to stop. This only applies when they were told to stop because the task queue is empty. When this happens, on_worker_kill_timeout is called.
fork is supposed to be Unix.fork. But if tasks may use Lwt, it should be Lwt_unix.fork instead. You can also modify fork to run some code on each fork, for instance to initialize some global variables when a worker starts.
The last argument is the maximum number of tasks to run in parallel.
This function is blocking. It returns once:
stop);add_task and Timer.on_delay.Stop the current run.
This function is meant to be called from the event handlers (on_ functions) of tasks. It does nothing if run is not currently running.
This function is not blocking. It causes run to stop starting new tasks. It also causes run to consider that all current tasks are passed their term_timeout, even if they do not actually have such a timeout, except that on_term_timeout is not triggered. In other words, all workers receive SIGTERM if they didn't already.
Calling add_task still adds tasks to the queue but they will not be started unless you call run again.
This function does not cancel timers.
Convert a process status to a human-readable string.
Example results:
"exited with code 0""was killed by SIGTERM""was stopped by unknown signal (-100)"