123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701openCoremoduleTime_ns=Core_private.Time_ns_alternate_sexpopenPolyletsec=Time_ns.Span.of_secletconcat=String.concatmoduleEpoll_max_ready_events=Validated.Make(structincludeIntlethere=[%here]letvalidate=Int.validate_positiveend)moduleIo_uring_max_submission_entries=Validated.Make(structincludeIntlethere=[%here]letvalidate=Int.validate_positiveend)moduleMax_inter_cycle_timeout=Validated.Make(structincludeTime_ns.Spanlethere=[%here]letvalidate=Time_ns.Span.validate_non_negativeend)moduleMin_inter_cycle_timeout=Validated.Make(structincludeTime_ns.Spanlethere=[%here]letvalidate=Time_ns.Span.validate_non_negativeend)moduleMax_num_open_file_descrs=structincludeValidated.Make(structincludeIntlethere=[%here]letvalidate=Int.validate_positiveend)letdefault=create_exn(1lsl17)letequal(t1:t)t2=t1=t2endmoduleMax_num_threads=Validated.Make(structincludeIntlethere=[%here]letvalidate=Int.validate_positiveend)moduleMax_num_jobs_per_priority_per_cycle=Validated.Make(structincludeIntlethere=[%here]letvalidate=Int.validate_positiveend)moduleDump_core_on_job_delay=structmoduleHow_to_dump=structtypet=|Default|Call_abort|Call_gcore[@@derivingsexp]endtypewatch={dump_if_delayed_by:Time_ns.Span.t;how_to_dump:How_to_dump.t}[@@derivingsexp]typet=|Watchofwatch|Do_not_watch[@@derivingsexp]endmoduleDebug_tag=structmoduleT=structtypet=|All|Clock|Fd|File_descr_watcher|Finalizers|Interruptor|Monitor|Monitor_send_exn|Parallel|Reader|Scheduler|Shutdown|Thread_pool|Thread_safe|Writer[@@derivingcompare,sexp]letequal=[%compare.equal:t]endincludeTincludeSexpable.To_stringable(T)letlist=[All;Clock;Fd;File_descr_watcher;Finalizers;Interruptor;Monitor;Monitor_send_exn;Parallel;Reader;Scheduler;Shutdown;Thread_pool;Thread_safe;Writer];;endmoduleFile_descr_watcher=structmoduleT=structtypet=|Epoll_if_timerfd|Epoll|Select|Io_uring[@@derivingsexp]endincludeTincludeSexpable.To_stringable(T)letlist=[Epoll_if_timerfd;Epoll;Select;Io_uring]endmoduleIo_uring_mode=structmoduleT=structtypet=|Disabled|Eventfd|If_available_eventfd|From_scheduler[@@derivingsexp]endincludeTincludeSexpable.To_stringable(T)letlist=[Disabled;Eventfd;If_available_eventfd;From_scheduler]endtypet={abort_after_thread_pool_stuck_for:Time_ns.Span.toption[@sexp.option];check_invariants:booloption[@sexp.option];detect_invalid_access_from_thread:booloption[@sexp.option];dump_core_on_job_delay:Dump_core_on_job_delay.toption[@sexp.option];epoll_max_ready_events:Epoll_max_ready_events.toption[@sexp.option];io_uring_max_submission_entries:Io_uring_max_submission_entries.toption[@sexp.option];io_uring_mode:Io_uring_mode.toption[@sexp.option];file_descr_watcher:File_descr_watcher.toption[@sexp.option];max_inter_cycle_timeout:Max_inter_cycle_timeout.toption[@sexp.option];max_num_open_file_descrs:Max_num_open_file_descrs.toption[@sexp.option];max_num_threads:Max_num_threads.toption[@sexp.option];max_num_jobs_per_priority_per_cycle:Max_num_jobs_per_priority_per_cycle.toption[@sexp.option];min_inter_cycle_timeout:Min_inter_cycle_timeout.toption[@sexp.option];print_debug_messages_for:Debug_tag.tlistoption[@sexp.option];record_backtraces:booloption[@sexp.option];report_thread_pool_stuck_for:Time_ns.Span.toption[@sexp.option];thread_pool_cpu_affinity:Thread_pool_cpu_affinity.toption[@sexp.option];timing_wheel_config:Timing_wheel.Config.toption[@sexp.option]}[@@derivingfields~fields~iterators:(map,fold),sexp]moduleAllow_extra_fields=structtypenonrect=t={abort_after_thread_pool_stuck_for:Time_ns.Span.toption[@sexp.option];check_invariants:booloption[@sexp.option];detect_invalid_access_from_thread:booloption[@sexp.option];dump_core_on_job_delay:Dump_core_on_job_delay.toption[@sexp.option];epoll_max_ready_events:Epoll_max_ready_events.toption[@sexp.option];io_uring_max_submission_entries:Io_uring_max_submission_entries.toption[@sexp.option];io_uring_mode:Io_uring_mode.toption[@sexp.option];file_descr_watcher:File_descr_watcher.toption[@sexp.option];max_inter_cycle_timeout:Max_inter_cycle_timeout.toption[@sexp.option];max_num_open_file_descrs:Max_num_open_file_descrs.toption[@sexp.option];max_num_threads:Max_num_threads.toption[@sexp.option];max_num_jobs_per_priority_per_cycle:Max_num_jobs_per_priority_per_cycle.toption[@sexp.option];min_inter_cycle_timeout:Min_inter_cycle_timeout.toption[@sexp.option];print_debug_messages_for:Debug_tag.tlistoption[@sexp.option];record_backtraces:booloption[@sexp.option];report_thread_pool_stuck_for:Time_ns.Span.toption[@sexp.option];thread_pool_cpu_affinity:Thread_pool_cpu_affinity.toption[@sexp.option];timing_wheel_config:Timing_wheel.Config.toption[@sexp.option]}[@@sexp.allow_extra_fields][@@derivingsexp]endlett_of_sexp~allow_extra_fieldssexp=matchallow_extra_fieldswith|true->Allow_extra_fields.t_of_sexpsexp|false->t_of_sexpsexp;;letempty={abort_after_thread_pool_stuck_for=None;check_invariants=None;detect_invalid_access_from_thread=None;dump_core_on_job_delay=None;epoll_max_ready_events=None;io_uring_max_submission_entries=None;io_uring_mode=None;file_descr_watcher=None;max_inter_cycle_timeout=None;max_num_open_file_descrs=None;max_num_threads=None;max_num_jobs_per_priority_per_cycle=None;min_inter_cycle_timeout=None;print_debug_messages_for=None;record_backtraces=None;report_thread_pool_stuck_for=None;thread_pool_cpu_affinity=None;timing_wheel_config=None};;letcreate?abort_after_thread_pool_stuck_for?check_invariants?detect_invalid_access_from_thread?dump_core_on_job_delay?epoll_max_ready_events?io_uring_max_submission_entries?io_uring_mode?file_descr_watcher?max_inter_cycle_timeout?max_num_open_file_descrs?max_num_threads?max_num_jobs_per_priority_per_cycle?min_inter_cycle_timeout?print_debug_messages_for?record_backtraces?report_thread_pool_stuck_for?thread_pool_cpu_affinity?timing_wheel_config()={abort_after_thread_pool_stuck_for;check_invariants;detect_invalid_access_from_thread;dump_core_on_job_delay;epoll_max_ready_events;io_uring_max_submission_entries;io_uring_mode;file_descr_watcher;max_inter_cycle_timeout;max_num_open_file_descrs;max_num_threads;max_num_jobs_per_priority_per_cycle;min_inter_cycle_timeout;print_debug_messages_for;record_backtraces;report_thread_pool_stuck_for;thread_pool_cpu_affinity;timing_wheel_config};;letdefault_timing_wheel_config_for_word_size(word_size:Word_size.t)=letmoduleAlarm_precision=Timing_wheel.Alarm_precisioninletalarm_precision,level_bits=matchword_sizewith|W32->Alarm_precision.about_one_millisecond,[10;10;9]|W64->Alarm_precision.(divabout_one_millisecond~pow2:3),[14;15;9;6]inTiming_wheel.Config.create~alarm_precision~level_bits:(Timing_wheel.Level_bits.create_exnlevel_bits)();;letdefault_timing_wheel_config=default_timing_wheel_config_for_word_sizeWord_size.word_size;;letdefault=(* For [file_descr_watcher] and [max_num_open_file_descrs] we choose the default for the
common case that [epoll] is available. There is some additional code in
[Async_unix.Config] that checks whether [epoll] is actually available, and if not,
uses [select] and a smaller number of file descriptors. *){abort_after_thread_pool_stuck_for=Some(sec60.);check_invariants=Somefalse;detect_invalid_access_from_thread=Somefalse;dump_core_on_job_delay=SomeDo_not_watch;epoll_max_ready_events=Some(Epoll_max_ready_events.create_exn256);io_uring_max_submission_entries=Some(Io_uring_max_submission_entries.create_exn1024);io_uring_mode=SomeDisabled;file_descr_watcher=SomeEpoll_if_timerfd;max_inter_cycle_timeout=Some(Max_inter_cycle_timeout.create_exn(sec0.05));max_num_open_file_descrs=SomeMax_num_open_file_descrs.default;max_num_threads=Some(Max_num_threads.create_exn50);max_num_jobs_per_priority_per_cycle=Some(Max_num_jobs_per_priority_per_cycle.create_exn500);min_inter_cycle_timeout=Some(Min_inter_cycle_timeout.create_exn(sec0.));print_debug_messages_for=Some[];record_backtraces=Somefalse;report_thread_pool_stuck_for=Some(sec1.);thread_pool_cpu_affinity=SomeInherit;timing_wheel_config=Somedefault_timing_wheel_config};;letexample={defaultwithprint_debug_messages_for=SomeDebug_tag.[Fd;Scheduler];thread_pool_cpu_affinity=Some(Cpuset([0;1;2]|>Int.Set.of_list|>Thread_pool_cpu_affinity.Cpuset.create_exn))};;letmerget1t2=letoverridefield=Option.first_some(Field.getfieldt1)(Field.getfieldt2)inFields.map~abort_after_thread_pool_stuck_for:override~check_invariants:override~detect_invalid_access_from_thread:override~dump_core_on_job_delay:override~epoll_max_ready_events:override~io_uring_max_submission_entries:override~io_uring_mode:override~file_descr_watcher:override~max_inter_cycle_timeout:override~max_num_open_file_descrs:override~max_num_threads:override~max_num_jobs_per_priority_per_cycle:override~min_inter_cycle_timeout:override~print_debug_messages_for:override~record_backtraces:override~report_thread_pool_stuck_for:override~thread_pool_cpu_affinity:override~timing_wheel_config:override;;letreducets=List.fold_right~init:empty~f:mergetsletenvironment_variable="ASYNC_CONFIG"letenvironment_variable_allow_extra_fields="ASYNC_CONFIG_ALLOW_EXTRA_FIELDS"letfield_descriptions():string=letfieldto_sexpdescriptionacfield=(Field.namefield,to_sexp(Option.value_exn(Field.getfielddefault)),description)::acinletfields=Fields.fold~init:[]~abort_after_thread_pool_stuck_for:(field[%sexp_of:Time_ns.Span.t][{|
By default, Async will send an exception to the toplevel monitor
if it detects that the thread pool is stuck for longer than this.
|}])~check_invariants:(field[%sexp_of:bool][{|
If true, causes Async to regularly check invariants of its internal
data structures. This can substantially slow down your program.
|}])~detect_invalid_access_from_thread:(field[%sexp_of:bool][{|
If true, causes Async routines to check if they are being accessed
from some thread other than the thread currently holding the Async
lock, which is not allowed and can lead to very confusing behavior.
|}])~dump_core_on_job_delay:(field[%sexp_of:Dump_core_on_job_delay.t][{|
Can be set to [Do_not_watch] or:
(Watch ((dump_if_delayed_by SPAN) (how_to_dump HOW)))
If set to [Watch], then on program start this will start a regular
Async job that increments a counter, and a C thread that will
detect if that job is delayed by [dump_if_delayed_by], and if so,
will core dump the program. If available, [/usr/bin/gcore] is
used by default to dump the core, which should allow the program
to continue running. Otherwise, [abort] will be called from C,
which will kill the program while causing a core dump. One can
force [abort] or [gcore] via [how_to_dump], which should be one of:
[Call_abort], [Call_gcore], or [Default].
|}])~epoll_max_ready_events:(field[%sexp_of:Epoll_max_ready_events.t][{|
The maximum number of ready events that Async's call to [Epoll.wait]
will handle.
|}])~io_uring_max_submission_entries:(field[%sexp_of:Io_uring_max_submission_entries.t][{|
The maximum number of submission queue entries that can be stored within
the [io_uring] submission queue at a given time. Will be rounded up to
the nearest power of two internally within [io_uring].
|}])~io_uring_mode:(field[%sexp_of:Io_uring_mode.t][{|
This determines the driver for using the io_uring. The default is not to use
[io_uring] at all.
Allowed values are:|};concat~sep:", "(List.mapIo_uring_mode.list~f:Io_uring_mode.to_string);{|.
|}])~file_descr_watcher:(field[%sexp_of:File_descr_watcher.t][{|
This determines what OS subsystem Async uses to watch file descriptors for being ready.
The default is to use [epoll] if timerfd's are supported and if not, use [select].
Allowed values are:|};concat~sep:", "(List.mapFile_descr_watcher.list~f:File_descr_watcher.to_string);{|.
|}])~max_num_open_file_descrs:(field(fundefault->[%message""~_:(concat["min ";default|>Max_num_open_file_descrs.raw|>Int.to_string_hum;" [ulimit -n -H]"]:string)])[{|
The maximum number of open file descriptors allowed at any one time.|}])~max_num_threads:(field[%sexp_of:Max_num_threads.t][{|
The maximum number of threads that Async will create to do blocking
system calls and handle calls to [In_thread.run].
|}])~max_inter_cycle_timeout:(field[%sexp_of:Max_inter_cycle_timeout.t][{|
The maximum amount of time the scheduler will pause between cycles
when it has no jobs and is going to wait for I/O. In principle one
doesn't need this, and we could use an infinite timeout. We instead
use a small timeout (by default), to be more robust to bugs that
could prevent Async from waking up and servicing events. For
example, as of 2013-01, the OCaml runtime has a bug that causes it
to not necessarily run an OCaml signal handler in a timely manner.
This in turn can cause a simple Async program that is waiting on a
signal to hang, when in fact it should handle the signal.
We use 50ms as the default timeout, because it is infrequent enough
to have a negligible performance impact, and frequent enough that
the latency would typically be not noticeable. Also, 50ms is what
the OCaml ticker thread uses.
|}])~max_num_jobs_per_priority_per_cycle:(field[%sexp_of:Max_num_jobs_per_priority_per_cycle.t][{|
The maximum number of jobs that will be done at each priority within
each Async cycle. This limits how many jobs the scheduler will run
before pausing to check for I/O.
|}])~min_inter_cycle_timeout:(field[%sexp_of:Min_inter_cycle_timeout.t][{|
The minimum timeout the scheduler will pass to the OS when it checks
for I/O between cycles. This is zero by default. Setting it to a
nonzero value is used to increase thread fairness between the scheduler
and other threads. A plausible setting is 1us. This is also
configurable in OCaml via [Scheduler.set_min_inter_cycle_timeout].
|}])~print_debug_messages_for:(field[%sexp_of:Debug_tag.tlist][{|
A list of tags specifying which Async functions should print debug
messages to stderr. Each tag identifies a group of related Async
functions. The tag [all] means to print debug messages for all
functions. Allowed values are:
|};concat(List.mapDebug_tag.list~f:(fund->concat[" ";Debug_tag.to_stringd;"\n"]));{|
Turning on debug messages will substantially slow down most programs.
|}])~record_backtraces:(field[%sexp_of:bool][{|
If true, this will cause Async to keep in the execution context the
history of stack backtraces (obtained via [Backtrace.get]) that led
to the current job. If an Async job has an unhandled exception,
this backtrace history will be recorded in the exception. In
particular the history will appear in an unhandled exception that
reaches the main monitor. This can have a substantial performance
impact, both in running time and space usage.
|}])~report_thread_pool_stuck_for:(field[%sexp_of:Time_ns.Span.t][{|
By default, Async will print a message to stderr every second if
the thread pool is stuck for longer than this.
|}])~thread_pool_cpu_affinity:(field[%sexp_of:Thread_pool_cpu_affinity.t][{|
Whether and how threads in the thread pool should be affinitized to CPUs.
|}])~timing_wheel_config:(field[%sexp_of:Timing_wheel.Config.t][{|
This is used to adjust the time/space tradeoff in the timing wheel
used to implement Async's clock. Time is split into intervals of
size [alarm_precision], and alarms with times in the same interval
fire in the same cycle. Level [i] in the timing wheel has an
array of size [2^b], where [b] is the [i]'th entry in [level_bits].
|}])inconcat(List.map(List.sortfields~compare:(fun(name1,_,_)(name2,_,_)->String.comparename1name2))~f:(fun(name,default,description)->concat("\n"::name::" (default "::Sexp.to_stringdefault::")"::description)));;lethelp_message()=concat["The ";environment_variable;{| environment variable affects Async
in various ways. Its value should be a sexp of the following form,
where all fields are optional:
|};Sexp.to_string_hum(sexp_of_texample);{|
Here is an explanation of each field.
|};field_descriptions()];;letusage()=eprintf"%s%!"(help_message());exit1;;letparse_variableenv_var~allow_extra_fields=matchSys.getenvenv_varwith|None->empty|Some""->usage()|Somestring->(matchParsexp.Conv_many.parse_stringstring(t_of_sexp~allow_extra_fields)with|Okts->reducets|Errorerror->eprintf"%s\n\n"(Sexp.to_string_hum[%sexp(sprintf"invalid value for %s environment variable"env_var:string),(error:Parsexp.Conv_error.t)]);usage());;lett=reduce[parse_variableenvironment_variable~allow_extra_fields:false;parse_variableenvironment_variable_allow_extra_fields~allow_extra_fields:true];;modulePrint_debug_messages_for=structletprint_debug_messages_fortag=matcht.print_debug_messages_forwith|None->false|Somel->List.memltag~equal:Debug_tag.equal;;letall=print_debug_messages_forAllletdebugtag=all||print_debug_messages_fortagletclock=debugClockletfd=debugFdletfile_descr_watcher=debugFile_descr_watcherletfinalizers=debugFinalizersletinterruptor=debugInterruptorletmonitor=debugMonitorletmonitor_send_exn=debugMonitor_send_exnletparallel=debugParallelletreader=debugReaderletscheduler=debugSchedulerletshutdown=debugShutdownletthread_pool=debugThread_poolletthread_safe=debugThread_safeletwriter=debugWriterendlet(!!)field=Option.value(Field.getfieldt)~default:(Option.value_exn(Field.getfielddefault));;letabort_after_thread_pool_stuck_for=!!Fields.abort_after_thread_pool_stuck_forletcheck_invariants=!!Fields.check_invariantsletdetect_invalid_access_from_thread=!!Fields.detect_invalid_access_from_threadletepoll_max_ready_events=!!Fields.epoll_max_ready_eventsletio_uring_max_submission_entries=!!Fields.io_uring_max_submission_entriesletio_uring_mode=!!Fields.io_uring_modeletthread_pool_cpu_affinity=!!Fields.thread_pool_cpu_affinityletfile_descr_watcher=!!Fields.file_descr_watcherletmax_inter_cycle_timeout=!!Fields.max_inter_cycle_timeoutletmax_num_open_file_descrs=!!Fields.max_num_open_file_descrsletmax_num_threads=!!Fields.max_num_threadsletmax_num_jobs_per_priority_per_cycle=!!Fields.max_num_jobs_per_priority_per_cycleletmin_inter_cycle_timeout=!!Fields.min_inter_cycle_timeoutletrecord_backtraces=!!Fields.record_backtracesletreport_thread_pool_stuck_for=!!Fields.report_thread_pool_stuck_forlettiming_wheel_config=!!Fields.timing_wheel_configletdump_core_on_job_delay=!!Fields.dump_core_on_job_delaylett={abort_after_thread_pool_stuck_for=Someabort_after_thread_pool_stuck_for;check_invariants=Somecheck_invariants;detect_invalid_access_from_thread=Somedetect_invalid_access_from_thread;dump_core_on_job_delay=Somedump_core_on_job_delay;thread_pool_cpu_affinity=Somethread_pool_cpu_affinity;epoll_max_ready_events=Someepoll_max_ready_events;io_uring_max_submission_entries=Someio_uring_max_submission_entries;io_uring_mode=Someio_uring_mode;file_descr_watcher=Somefile_descr_watcher;max_inter_cycle_timeout=Somemax_inter_cycle_timeout;max_num_open_file_descrs=Somemax_num_open_file_descrs;max_num_threads=Somemax_num_threads;max_num_jobs_per_priority_per_cycle=Somemax_num_jobs_per_priority_per_cycle;min_inter_cycle_timeout=Somemin_inter_cycle_timeout;print_debug_messages_for=t.print_debug_messages_for;record_backtraces=Somerecord_backtraces;report_thread_pool_stuck_for=Somereport_thread_pool_stuck_for;timing_wheel_config=Sometiming_wheel_config};;lettask_id=ref(fun()->Sexp.Atom"<no task id>")