123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109open!Importtype'astate=|Pending|Finishedof('a,exn)result|Waiting_for_resultofCondition.ttype'at={work:unit->'a;mutablestate:'astate;mutex:Mutex.t}typepacked=T:_t->packedletrunt=letstate=Finished(matcht.work()with|x->Okx|exceptione->Errore)inMutex.lockt.mutex;letold_state=t.stateint.state<-state;(matchold_statewith|Waiting_for_resultcond->Condition.broadcastcond|_->());Mutex.unlockt.mutexmoduleWorker=structtypet={next_job:packedEvent.channel}letworkers=Queue.create()letcount=ref0letmutex=Mutex.create()letrecloopt=Mutex.lockmutex;Queue.pushtworkers;Mutex.unlockmutex;(* Wait for a job *)let(Tjob)=Event.sync(Event.receivet.next_job)inrunjob;looptletstartjob=runjob;loop{next_job=Event.new_channel()}endletpid=ref0letdetach~f=lett={work=f;state=Pending;mutex=Mutex.create()}inMutex.lockWorker.mutex;(* Detect forks *)letcurrent_pid=Unix.getpid()inif!pid<>current_pidthenbeginpid:=current_pid;Queue.clearWorker.workers;Worker.count:=0end;ifnot(Queue.is_emptyWorker.workers)thenbeginletworker=Queue.popWorker.workersinMutex.unlockWorker.mutex;Event.sync(Event.sendworker.next_job(Tt));endelsebeginletf=if!Worker.count=16thenrunelsebeginincrWorker.count;Worker.startendinMutex.unlockWorker.mutex;ignore(Thread.createft:Thread.t)end;tletreally_waittcond=Condition.waitcondt.mutex;matcht.statewith|Finishedres->res|_->assertfalseletwaitt=Mutex.lockt.mutex;letres=matcht.statewith|Finishedres->res|Waiting_for_resultcond->really_waittcond|Pending->letcond=Condition.create()int.state<-Waiting_for_resultcond;really_waittcondinMutex.unlockt.mutex;matchreswith|Okx->x|Errore->raisee