openCoreopenImportopenDeferred_stdmoduleDeferred=Deferred1moduleScheduler=Scheduler1moduleStream=Tail.StreammoduleMonitor=Monitor0includeMonitortypemonitor=t[@@derivingsexp_of]letinvariantt=Invariant.invariant[%here]t[%sexp_of:t](fun()->letcheckf=Invariant.check_fieldtfinFields.iter~name:ignore~here:ignore~id:ignore~forwarding:ignore~next_error:(check(funnext_error->assert(Ivar.is_emptynext_error)))~handlers_for_all_errors:ignore~tails_for_all_errors:ignore~has_seen_error:ignore);;letcurrent_execution_context()=Scheduler.(current_execution_context(t()))letcurrent()=Execution_context.monitor(current_execution_context())type'awith_optional_monitor_name=?here:Source_code_position.t->?info:Info.t->?name:string->'aletdetacht=t.forwarding<-Detachedtypehandler_state=|Uninitialized|Runningof(Execution_context.t*(exn->unit))Bag.Elt.t|Terminatedletdetach_and_iter_errorst~f=detacht;letscheduler=Scheduler.t()inletexecution_context=Scheduler.current_execution_contextschedulerinlethandler_state_ref=refUninitializedinletrun_fexn=match!handler_state_refwith|Uninitialized->assertfalse|Terminated->()|Runningbag_elt->(tryfexnwith|inner_exn->handler_state_ref:=Terminated;Bag.removet.handlers_for_all_errorsbag_elt;(* [run_f] always runs in [execution_context]. Hence, [raise inner_exn] sends
[inner_exn] to [execution_context]'s monitor, i.e. the monitor in effect when
[detach_and_iter_errors] was called. *)raiseinner_exn)inhandler_state_ref:=Running(Bag.addt.handlers_for_all_errors(execution_context,run_f));;letdetach_and_get_error_streamt=detacht;lettail=Tail.create()int.tails_for_all_errors<-tail::t.tails_for_all_errors;Tail.collecttail;;letget_next_errort=Ivar.readt.next_errorletdetach_and_get_next_errort=detacht;get_next_errort;;letcreate?here?info?name()=letparent=current()increate_with_parent?here?info?name(Someparent);;moduleMonitor_exn=structtypet={exn:exn;backtrace:Backtrace.toption;backtrace_history:Backtrace.tlist;monitor:Monitor.t}letbacktracet=t.backtraceletextract_exnt=t.exnletbacktrace_truncation_heuristics=letjob_queue="Called from file \"job_queue.ml\""inletdeferred0="Called from file \"deferred0.ml\""inletdeferred1="Called from file \"deferred1.ml\""inletmonitor="Called from file \"monitor.ml\""inletimport0="Raised at file \"import0.ml\""inleterror="Called from file \"error.ml\""infuntraces->(* ../test/test_try_with_error_display.ml makes sure this stays up-to-date. *)lettraces=matchtraceswith|t1::restwhenString.is_prefixt1~prefix:import0->(matchrestwith|t2::restwhenString.is_prefixt2~prefix:error->(matchrestwith|t3::restwhenString.is_prefixt3~prefix:error->rest|_->rest)|_->rest)|_->tracesinmatchList.revtraceswith|t1::restwhenString.is_prefixt1~prefix:job_queue->(matchrestwith|t2::restwhenString.is_prefixt2~prefix:job_queue->(matchrestwith|t2::restwhenString.is_prefixt2~prefix:deferred0(* bind *)||String.is_prefixt2~prefix:deferred1(* map *)||String.is_prefixt2~prefix:monitor(* try_with *)->List.revrest|_->List.revrest)|_->List.revrest)|_->traces;;letsexp_of_t{exn;backtrace;backtrace_history;monitor}=letmonitor=letname=matchInfo.to_string_hummonitor.namewith|""->None|s->Somesinletpos=matchmonitor.herewith|None->None|Somehere->(* We display the full filename, whereas backtraces only have basenames, but
perhaps that's what should change. *)letcolumn=here.pos_cnum-here.pos_bolinSome(* We hide line and column numbers when [am_running_test] to make test output
more robust. This saves people manually hiding the numbers or even worse,
leaving them in test output. Hiding in test is different choice for
behavior than our codebase makes for [Backtrace.elide], which has default
[false], and thus shows backtraces in test. There are a couple reasons for
this different choice. First, expect-test machinery has check to prevent
backtraces from appearing in test output. It has no such checks for line
and column numbers. Second, when there is a real error and you want to see
the backtrace, throwing away the whole backtrace loses a lot of potentially
useful information that may be hard to recover. Whereas we're just
throwing a way a line number and column, which are a minor convenience
given that the filename has most of the information. *)(ifam_running_testthensprintf"file %S, line LINE, characters C1-C2"here.pos_fnameelsesprintf"file %S, line %d, characters %d-%d"here.pos_fnamehere.pos_lnumcolumncolumn)inmatchpos,namewith|None,None->[]|Somepos,None->[sprintf"Caught by monitor at %s"pos]|None,Somename->[sprintf"Caught by monitor %s"name]|Somepos,Somename->[sprintf"Caught by monitor %s at %s"namepos]inletbacktrace=letbacktrace=matchbacktracewith|None->[]|Somebacktrace->Backtrace.to_string_listbacktraceinbacktrace_truncation_heuristicsbacktrace@monitorinletlist_if_not_empty=function|[]->None|_::_asl->Somelin[%sexp(exn:exn),(list_if_not_emptybacktrace:(stringlistoption[@sexp.option])),`backtrace_history(list_if_not_emptybacktrace_history:(Backtrace.tlistoption[@sexp.option]))];;endexceptionMonitor_exnofMonitor_exn.tlet()=Sexplib.Conv.Exn_converter.add[%extension_constructorMonitor_exn](function|Monitor_exnt->[%sexp"monitor.ml.Error"::(t:Monitor_exn.t)]|_->(* Reaching this branch indicates a bug in sexplib. *)assertfalse);;letextract_exnexn=matchexnwith|Monitor_exnerror->error.exn|exn->exn;;letsend_exnt?(backtrace=`Get)exn=letexn=matchexnwith|Monitor_exn_->exn|_->letbacktrace=matchbacktracewith|`Get->Backtrace.Exn.most_recent_for_exnexn|`Thisb->Somebinletbacktrace_history=(current_execution_context()).backtrace_historyinMonitor_exn{Monitor_exn.exn;backtrace;backtrace_history;monitor=t}inifDebug.monitor_send_exnthenDebug.log"Monitor.send_exn"(t,exn)[%sexp_of:t*exn];t.has_seen_error<-true;letscheduler=Scheduler.t()inletrecloopt=Ivar.fillt.next_errorexn;t.next_error<-Ivar.create();matcht.forwardingwith|Detached->ifDebug.monitor_send_exnthenDebug.log"Monitor.send_exn found listening monitor"(t,exn)[%sexp_of:t*exn];Bag.itert.handlers_for_all_errors~f:(fun(execution_context,f)->Scheduler.enqueueschedulerexecution_contextfexn);List.itert.tails_for_all_errors~f:(funtail->Tail.extendtailexn)|Parentparent->loopparent|Report_uncaught_exn->(* Do not change this branch to print the exception or to exit. Having the
scheduler raise an uncaught exception is the necessary behavior for programs
that call [Scheduler.go] and want to handle it. *)Scheduler.(got_uncaught_exn(t()))exn(!Async_kernel_config.task_id())inloopt;;moduleExported_for_scheduler=structletwithin_contextcontextf=Scheduler.(with_execution_context(t()))context~f:(fun()->matchResult.try_withfwith|Okx->Okx|Errorexn->send_exn(Execution_context.monitorcontext)exn~backtrace:`Get;Error());;type'awith_options=?monitor:t->?priority:Priority.t->'aletwithin_gen?monitor?priorityf=lettmp_context=Execution_context.create_like(current_execution_context())?monitor?priorityinwithin_contexttmp_contextf;;letwithin'?monitor?priorityf=matchwithin_gen?monitor?priorityfwith|Error()->Deferred.never()|Okd->d;;letwithin_v?monitor?priorityf=matchwithin_gen?monitor?priorityfwith|Error()->None|Okx->Somex;;letwithin?monitor?priorityf=matchwithin_gen?monitor?priorityfwith|Error()->()|Ok()->();;letschedule_with_data?monitor?priorityworkx=letscheduler=Scheduler.t()inScheduler.enqueuescheduler(Execution_context.create_like(Scheduler.current_execution_contextscheduler)?monitor?priority)workx;;letschedule?monitor?prioritywork=schedule_with_data?monitor?prioritywork()letschedule'=(* For performance, we use [schedule_with_data] with a closed function, and inline
[Deferred.create]. *)letupon_work_fill_i(work,i)=upon(work())(funa->Ivar.fillia)infun?monitor?prioritywork->leti=Ivar.create()inschedule_with_data?monitor?priorityupon_work_fill_i(work,i);Ivar.readi;;letpreserve_execution_contextf=letscheduler=Scheduler.t()inletexecution_context=Scheduler.current_execution_contextschedulerinstage(funa->Scheduler.enqueueschedulerexecution_contextfa);;letpreserve_execution_context'f=letscheduler=Scheduler.t()inletexecution_context=Scheduler.current_execution_contextschedulerinletcall_and_fill(f,a,i)=upon(fa)(funr->Ivar.fillir)instage(funa->Deferred.create(funi->Scheduler.enqueueschedulerexecution_contextcall_and_fill(f,a,i)));;endopenExported_for_schedulerletstream_iterstream~f=letrecloopstream=Stream.nextstream>>>function|Nil->()|Cons(v,stream)->loopstream;fvinloopstream;;(* An ['a Ok_and_exns.t] represents the output of a computation running in a detached
monitor. *)moduleOk_and_exns=structtype'at={ok:'aDeferred.t;exns:exnStream.t}[@@derivingfields,sexp_of]letcreate?here?info?name~runf=(* We call [create_with_parent None] because [monitor] does not need a parent. It
does not because we call [detach_and_get_error_stream monitor] and deal with the
errors explicitly, thus [send_exn] would never propagate an exn past [monitor]. *)letmonitor=create_with_parent?here?info?nameNoneinletexns=detach_and_get_error_streammonitorinletok=matchrunwith|`Now->within'~monitorf|`Schedule->schedule'~monitorfin{ok;exns};;endletfill_result_and_handle_background_errorsresult_fillerresultexnshandle_exns_after_result=ifIvar_filler.is_emptyresult_fillerthen(Ivar_filler.fillresult_fillerresult;handle_exns_after_resultexns);;moduleExpert=structlettry_with_log_exn:(exn->unit)ref=ref(funexn->raise_s[%message"failed to set [Monitor.Expert.try_with_log_exn]"(exn:Exn.t)]);;endletmake_handle_exnrest=matchrestwith|`Log->(* We are careful to not close over current context, which is not needed. *)!Expert.try_with_log_exn|`Raise->letparent=current()infunexn->send_exnparentexn?backtrace:None|`Callf->letparent=current()infunexn->within~monitor:parent(fun()->fexn);;lettry_with?here?info?(name="")?extract_exn:(do_extract_exn=false)?(run=`Now)?(rest=`Raise)f=let{Ok_and_exns.ok;exns}=Ok_and_exns.create?here?info~name~runfinlethandle_exn=make_handle_exnrestinlethandle_exns_after_resultexns=stream_iterexns~f:handle_exnin(* We run [within' ~monitor:main] to avoid holding on to references to the evaluation
context in which [try_with] was called. This avoids a space leak when a chain of
[try_with]'s are run each nested within the previous one. Without the [within'], the
error handling for the innermost [try_with] would keep alive the entire chain. *)within'~monitor:main(fun()->ifDeferred.is_determinedokthen(handle_exns_after_resultexns;return(Ok(Deferred.value_exnok)))else(letresult_filler,result=Ivar_filler.create()inuponok(funres->fill_result_and_handle_background_errorsresult_filler(Okres)exnshandle_exns_after_result);upon(Stream.nextexns)(function|Nil->assertfalse|Cons(exn,exns)->letexn=ifdo_extract_exnthenextract_exnexnelseexninfill_result_and_handle_background_errorsresult_filler(Errorexn)exnshandle_exns_after_result);result));;lettry_with_or_error?here?info?(name="try_with_or_error")?extract_exn?restf=try_withf?here?info~name?extract_exn~run:`Now?rest>>|Or_error.of_exn_result;;lettry_with_join_or_error?here?info?(name="try_with_join_or_error")?extract_exn?restf=try_with_or_errorf?here?info~name?extract_exn?rest>>|Or_error.join;;letprotect?here?info?(name="Monitor.protect")?extract_exn?run?restf~finally=let%bindr=try_with?extract_exn?here?info?run?rest~namefinlet%mapfr=try_with~extract_exn:false?here?info~run:`Schedule(* consider [~run:`Now] *)?rest~name:"finally"finallyinmatchr,frwith|Errorexn,Errorfinally_exn->raise_s[%message"Async finally"(exn:exn)(finally_exn:exn)]|Errore,Ok()|Ok_,Errore->raisee|Okr,Ok()->r;;lethandle_errors?here?info?namefhandler=let{Ok_and_exns.ok;exns}=Ok_and_exns.create?here?info?name~run:`Nowfinstream_iterexns~f:handler;ok;;letcatch_stream?here?info?namef=let{Ok_and_exns.exns;_}=Ok_and_exns.create?here?info?name~run:`Now(fun()->f();return())inexns;;letcatch?here?info?namef=match%mapStream.next(catch_stream?here?info?namef)with|Cons(x,_)->x|Nil->raise_s[%message"Monitor.catch got unexpected empty stream"];;letcatch_error?here?info?namef=catch?here?info?namef>>|Error.of_exnmoduleFor_tests=structletparentt=matcht.forwardingwith|Report_uncaught_exn->None|Parentparent->Someparent|Detached->None;;letdeptht=letreclooptn=matchparenttwith|None->n|Somet->loopt(n+1)inloopt0;;end