123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119(* Import these directly because we copy this file for the dscheck tests. *)moduleFiber_context=Eio__core.Private.Fiber_contextmoduleSuspend=Eio__core.Private.SuspendmoduleCancel=Eio__core.Canceltypet=Broadcast.tletcreate()=Broadcast.create()letlock_protectedm=Cancel.protect(fun()->Eio_mutex.lockm)letawait_generic?mutext=matchSuspend.enter_unchecked"Condition.await"(functxenqueue->matchFiber_context.get_errorctxwith|Someex->Option.iterEio_mutex.unlockmutex;enqueue(Errorex)|None->matchBroadcast.suspendt(fun()->enqueue(Ok()))with|None->Option.iterEio_mutex.unlockmutex|Somerequest->Option.iterEio_mutex.unlockmutex;Fiber_context.set_cancel_fnctx(funex->ifBroadcast.cancelrequestthenenqueue(Errorex)(* else already succeeded *)))with|()->Option.iterlock_protectedmutex|exceptionex->letbt=Printexc.get_raw_backtrace()inOption.iterlock_protectedmutex;Printexc.raise_with_backtraceexbtletawaittmutex=await_generic~mutextletawait_no_mutext=await_generictletbroadcast=Broadcast.resume_alltyperequest=Broadcast.requestoptionletregister_immediate=Broadcast.suspendletcancel=function|Somerequest->Broadcast.cancelrequest|None->falseletensure_cancelledx=ignore(cancelx:bool)typestate=|Init|Waitingof((unit,exn)result->unit)|Done(* There main property want is that we don't suspend forever if a broadcast
happened after [fn] started, or if the fiber is cancelled.
1. We start in the Init state.
2. If a broadcast happens here we move to Done. If we later try to suspend, we'll resume immediately.
3. We run [fn]. If a broadcast happens during this we'll transition to Done as before.
4. If [fn] raises or wants to stop normally, we return without suspending at all.
5. Otherwise, we suspend the fiber.
6. We try to transition from Init to Waiting.
If a broadcast transitioned to Done before this, we resume immediately.
If a broadcast transitions afterwards, [wake] will see the [enqueue] function and wake us.
Therefore, we can only sleep forever if a broadcast never happens after starting [fn].
7. If the fiber is cancelled before suspending, we raise on suspend.
If cancelled after suspending and before the request succeeds, we cancel the request and raise.
If cancelled after the request succeeds, [wake] will resume us.
*)letrecloop_no_mutextfn=letstate=Atomic.makeInitinletwake()=matchAtomic.exchangestateDonewith|Init->()(* Broadcast happened before we suspended; suspend will notice *)|Waitingenqueue->enqueue(Ok())|Done->assertfalseinletrequest=Broadcast.suspendtwakein(* Note: to avoid memory leaks, make sure that [request] is finished in all cases. *)matchfn()with|exceptionex->letbt=Printexc.get_raw_backtrace()inensure_cancelledrequest;Printexc.raise_with_backtraceexbt|Somex->ensure_cancelledrequest;x|None->Suspend.enter_unchecked"Condition.loop_no_mutex"(functxenqueue->matchFiber_context.get_errorctxwith|Someex->ensure_cancelledrequest;(* If a broadcast already happened, we still cancel. *)enqueue(Errorex)|None->letwaiting=WaitingenqueueinifAtomic.compare_and_setstateInitwaitingthen((* We were in Init, so [wake] hasn't yet done anything.
When it runs, it will resume us.
We're also not currently cancelled, because we checked above
and cancellations only come from the same thread. *)Fiber_context.set_cancel_fnctx(funex->ifcancelrequestthen((* We could set the state to Done here, but there's no need;
we're not racing with anything now. [wake] never runs. *)enqueue(Errorex))(* else we already got resumed *)))else((* State is already Done, but [wake] couldn't wake us then
because we hadn't moved to [waiting]. Resume now. *)enqueue(Ok())));loop_no_mutextfn