1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465moduleA=Trace_core.Internal_.Atomic_type'at={mutex:Mutex.t;cond:Condition.t;q:'aMpsc_bag.t;mutableclosed:bool;consumer_waiting:boolA.t;}exceptionClosedletcreate():_t={mutex=Mutex.create();cond=Condition.create();q=Mpsc_bag.create();closed=false;consumer_waiting=A.makefalse;}letclose(self:_t)=Mutex.lockself.mutex;ifnotself.closedthen(self.closed<-true;Condition.broadcastself.cond(* awake waiters so they fail *));Mutex.unlockself.mutexletpush(self:_t)x:unit=ifself.closedthenraiseClosed;Mpsc_bag.addself.qx;ifself.closedthenraiseClosed;ifA.getself.consumer_waitingthen((* wakeup consumer *)Mutex.lockself.mutex;Condition.broadcastself.cond;Mutex.unlockself.mutex)letrecpop_all(self:'at):'alist=matchMpsc_bag.pop_allself.qwith|Somel->l|None->ifself.closedthenraiseClosed;Mutex.lockself.mutex;A.setself.consumer_waitingtrue;(* check again, a producer might have pushed an element since we
last checked. However if we still find
nothing, because this comes after [consumer_waiting:=true],
any producer arriving after that will know to wake us up. *)(matchMpsc_bag.pop_allself.qwith|Somel->A.setself.consumer_waitingfalse;Mutex.unlockself.mutex;l|None->ifself.closedthen(Mutex.unlockself.mutex;raiseClosed);Condition.waitself.condself.mutex;A.setself.consumer_waitingfalse;Mutex.unlockself.mutex;pop_allself)