123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178(* A pool is a sequence of cells containing either available slots or consumers waiting for them.
A slot may or may not contain an actual resource.
To use a resource:
1. Get the next "suspend" cell. If it contains a resource slot, use it.
2. If no slot is ready and we're below capacity, create a new slot and add it (to the next resume cell).
3. Either way, wait for the cell to be resumed with a slot.
4. Once you have a slot, ensure it contains a resource, creating one if not.
5. When done, add the slot back (in the next resume cell).
*)(* Import these directly because we copy this file for the dscheck tests. *)moduleFiber_context=Eio__core.Private.Fiber_contextmoduleSuspend=Eio__core.Private.Suspendtype'aslot='aoptionrefmoduleCell=struct(* The possible behaviours are:
1. Suspender : In_transition -> Request Suspender waits for a resource
1.1. Resumer : Request -> Finished Resumer then providers a resource
1.2. Suspender : Request -> Finished Suspender cancels
2. Resumer : In_transition -> Resource Resumer provides a spare resource
2.1. Suspender : Resource -> Finished Suspender doesn't need to wait
*)type'at=|In_transition|Requestof('aslot->unit)|Resourceof'aslot|Finishedletinit=In_transitionletsegment_order=2letdumpf=function|In_transition->Fmt.stringf"In_transition"|Request_->Fmt.stringf"Request"|Resource_->Fmt.stringf"Resource"|Finished->Fmt.stringf"Finished"endmoduleQ=Cells.Make(Cell)type'at={slots:intAtomic.t;(* Total resources, available and in use *)max_slots:int;alloc:unit->'a;validate:'a->bool;dispose:'a->unit;q:'aQ.t;}letcreate?(validate=Fun.consttrue)?(dispose=ignore)max_sizealloc=ifmax_size<=0theninvalid_arg"Pool.create: max_size is <= 0";{slots=Atomic.make0;max_slots=max_size;alloc;validate;dispose;q=Q.make();}(* [add t x] adds [x] to the queue of available slots. *)letrecaddtx=letcell=Q.next_resumet.qinletrecaux()=matchAtomic.getcellwith|In_transition->ifnot(Atomic.compare_and_setcellIn_transition(Resourcex))thenaux()|Finished->addtx(* The consumer cancelled. Get another cell and retry. *)|Requestrasprev->ifAtomic.compare_and_setcellprevFinishedthen(rx(* We had a consumer waiting. Give it to them. *))elseaddtx(* Consumer cancelled; retry with another cell. *)|Resource_->assertfalse(* Can't happen; only a resumer can set this, and we're the resumer. *)inaux()(* Try to cancel by transitioning from [Request] to [Finished].
This can only be called after previously transitioning to [Request]. *)letcancelsegmentcell=matchAtomic.exchangecellCell.Finishedwith|Request_->Q.cancel_cellsegment;true|Finished->false(* Already resumed; reject cancellation *)|In_transition|Resource_->assertfalse(* Can't get here from [Request]. *)(* If [t] is under capacity, add another (empty) slot. *)letrecmaybe_add_slottcurrent=ifcurrent<t.max_slotsthen(ifAtomic.compare_and_sett.slotscurrent(current+1)thenaddt(refNone)elsemaybe_add_slott(Atomic.gett.slots)(* Concurrent update; try again *))(* [run_with t f slot] ensures that [slot] contains a valid resource and then runs [f resource] with it.
Afterwards, the slot is returned to [t]. *)letrun_withtfslot=matchbeginmatch!slotwith|Somexwhent.validatex->fx|Somex->slot:=None;t.disposex;letx=t.alloc()inslot:=Somex;fx|None->letx=t.alloc()inslot:=Somex;fxendwith|r->addtslot;r|exceptionex->letbt=Printexc.get_raw_backtrace()inaddtslot;Printexc.raise_with_backtraceexbt(* Creates a fresh resource [x], runs [f x], then disposes of [x] *)letrun_new_and_disposetf=letx=t.alloc()inmatchfxwith|r->t.disposex;r|exceptionex->letbt=Printexc.get_raw_backtrace()int.disposex;Printexc.raise_with_backtraceexbtletuset?(never_block=false)f=letsegment,cell=Q.next_suspendt.qinmatchAtomic.getcellwith|Finished|Request_->assertfalse|Resourceslot->Atomic.setcellFinished;(* Allow value to be GC'd *)run_withtfslot|In_transition->letcurrent=Atomic.gett.slotsinmatchcurrent<t.max_slotswith|falsewhennever_block->((* We are at capacity, but cannot block.
Create a new resource to run f but don't add it to the pool. *)matchAtomic.exchangecellFinishedwith|Resourceslot->run_withtfslot|_->run_new_and_disposetf)|can_add->(* Create a slot if not at capacity. *)ifcan_addthenmaybe_add_slottcurrent;(* No item is available right now. Start waiting *)letslot=Suspend.enter_unchecked"Pool.acquire"(functxenqueue->letrx=enqueue(Okx)inifAtomic.compare_and_setcellIn_transition(Requestr)then(matchFiber_context.get_errorctxwith|Someex->ifcancelsegmentcellthenenqueue(Errorex);(* else being resumed *)|None->Fiber_context.set_cancel_fnctx(funex->ifcancelsegmentcellthenenqueue(Errorex)(* else being resumed *)))else(matchAtomic.exchangecellFinishedwith|Resourcex->enqueue(Okx)|_->assertfalse);)in(* assert (Atomic.get cell = Finished); *)run_withtfslot