1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889openStduneopenCoretypestatus=|Open(* new tasks are allowed *)|Closed(* new tasks are forbidden *)typerunner=|Running(* Firing fibers inside the queue. *)|Awaiting_resumeofunitk(* Ran out of work. Waiting to be resumed once work is added or
pool is closed. *)|Awaiting_run(* Just created. [run] hasn't been called yet. *)(* A pool consumes tasks from a queue in the context where [run] was executed.
It's implemented by a simple queue of thunks and a continuation to resume
[run] whenever it runs out of work.
To optimize this further, we can bake in the operation into [effect] in [Core]. *)typenonrect={tasks:(unit->unitt)Queue.t(* pending tasks *);mutablerunner:runner(* The continuation to resume the runner set by [run] *);mutablestatus:status}letrunningtk=matcht.statuswith|Open->ktrue|Closed->kfalseletcreate()={tasks=Queue.create();runner=Awaiting_run;status=Open}lettaskt~fk=matcht.statuswith|Closed->Code_error.raise"pool is closed. new tasks may not be submitted"[]|Open->(Queue.pusht.tasksf;matcht.runnerwith|Running|Awaiting_run->k()|Awaiting_resumer->t.runner<-Running;resumer()k)letstoptk=matcht.statuswith|Closed->k()|Open->(t.status<-Closed;matcht.runnerwith|Running|Awaiting_run->k()|Awaiting_resumer->t.runner<-Running;resumer()k)letruntk=matcht.runnerwith|Awaiting_resume_|Running->Code_error.raise"Fiber.Pool.run: concurent calls to run aren't allowed"[]|Awaiting_run->t.runner<-Running;(* The number of currently running fibers in the pool. Only when this
number reaches zero we may call the final continuation [k]. *)letn=ref1inletdone_fiber()=decrn;if!n=0thenk()elseend_of_fiberinletrecreadt=matchQueue.popt.taskswith|None->finish_or_suspendt|Somev->incrn;fork(fun()->v()done_fiber)read_delayedandread_delayed()=readtandsuspend_kk=(* we are suspending because we have no tasks *)assert(Queue.is_emptyt.tasks);t.runner<-Awaiting_resumekandfinish_or_suspendt=matcht.statuswith|Closed->done_fiber()|Open->suspendsuspend_kread_delayedinreadt