123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153openTypes_typefiber=Picos.Fiber.ttypetask_full=|T_startof{fiber:fiber;f:unit->unit;}|T_resume:{fiber:fiber;k:unit->unit;}->task_fulltypearound_task=|AT_pair:(Runner.t->'a)*(Runner.t->'a->unit)->around_taskexceptionNo_more_taskstype'stops={schedule:'st->task_full->unit;get_next_task:'st->task_full;(** @raise No_more_tasks *)get_thread_state:unit->'st;(** Access current thread's worker state from any worker *)around_task:'st->around_task;on_exn:'st->Exn_bt.t->unit;runner:'st->Runner.t;before_start:'st->unit;cleanup:'st->unit;}(** A dummy task. *)let_dummy_task:task_full=T_start{f=ignore;fiber=_dummy_fiber}[@@@ifge5.0]let[@inline]discontinuekexn=letbt=Printexc.get_raw_backtrace()inEffect.Deep.discontinue_with_backtracekexnbtletwith_handler(typestarg)~(ops:stops)(self:st):(unit ->unit)->unit=letcurrent=Some(funk->matchget_current_fiber_exn()with|fiber->Effect.Deep.continuekfiber|exceptionexn->discontinuekexn)andyield=Some(funk->letfiber=get_current_fiber_exn()inmatchletk()=Effect.Deep.continuek()inops.scheduleself@@T_resume{fiber;k}with|()->()|exceptionexn->discontinuekexn)andrescheduletriggerfiberk:unit=ignore (Picos.Fiber.unsuspendfibertrigger:bool);letk()=Picos.Fiber.resumefiberkinlettask=T_resume{fiber;k}inops.scheduleselftaskinleteffc(typea):aEffect.t->((a,_)Effect.Deep.continuation->_)option=function|Picos.Fiber.Current->current|Picos.Fiber.Yield->yield|Picos.Fiber.Spawnr->Some(funk->matchletf()=r.mainr.fiberinlettask=T_start{fiber=r.fiber;f}inops.scheduleselftaskwith|unit->Effect.Deep.continuekunit|exceptionexn->discontinuekexn)|Picos.Trigger.Awaittrigger->Some(funk->letfiber=get_current_fiber_exn()in(* when triggers is signaled, reschedule task *)ifnot(Picos.Fiber.try_suspendfibertriggerfiberkreschedule)then(* trigger was already signaled, run task now *)Picos.Fiber.resumefiberk)|Picos.Computation.Cancel_after_r->Some(funk->(*not implemented *)letexn=Failure"Moonpool: cancel_after is not supported."indiscontinuekexn)|_->Noneinlethandler=Effect.Deep.{retc=Fun.id;exnc=raise;effc}infunf->Effect.Deep.match_withf()handler[@@@else_]letwith_handler~ops:_selff=f()[@@@endif]letworker_loop(type st)~(ops:stops)(self:st):unit=letcur_fiber :fiber ref=ref_dummy_fiberinletrunner=ops.runnerselfinTLS.setRunner.For_runner_implementors.k_cur_runnerrunner;let(AT_pair(before_task,after_task))=ops.around_task selfinletrun_task(task:task_full):unit=letfiber=matchtaskwith|T_start {fiber;_}|T_resume {fiber;_}->fiberincur_fiber:=fiber;TLS.setk_cur_fiber fiber;let_ctx=before_task runnerin(* run the task now, catching errors, handling effects *)assert(task!=_dummy_task);(trymatchtaskwith|T_start{fiber=_;f}->with_handler~opsselff|T_resume{fiber=_;k}->(* this is already in an effect handler*)k()withe->letebt=Exn_bt.geteinops.on_exn selfebt);after_task runner_ctx;cur_fiber:=_dummy_fiber;TLS.setk_cur_fiber_dummy_fiberinops.before_startself;letcontinue=reftrueintrywhile!continuedomatchops.get_next_taskselfwith|task->run_tasktask|exceptionNo_more_tasks-> continue:=falsedone;ops.cleanupselfwithexn->letbt=Printexc.get_raw_backtrace ()inops.cleanupself;Printexc.raise_with_backtraceexnbt