123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129moduleA=Atomic_(* terminology:
- Bottom: where we push/pop normally. Only one thread can do that.
- top: where work stealing happens (older values).
This only ever grows.
Elements are always added on the bottom end. *)(** Circular array (size is [2 ^ log_size]) *)moduleCA:sigtype'atvalcreate:dummy:'a->unit->'atvalsize:'at->intvalget:'at->int->'avalset:'at->int->'a->unitend=struct(** The array has size 256. *)letlog_size=8type'at={arr:'aarray}[@@unboxed]let[@inline]size(_self:_t)=1lsllog_sizeletcreate~dummy():_t={arr=Array.make(1lsllog_size)dummy}let[@inline]get(self:'at)(i:int):'a=Array.unsafe_getself.arr(iland((1lsllog_size)-1))let[@inline]set(self:'at)(i:int)(x:'a):unit=Array.unsafe_setself.arr(iland((1lsllog_size)-1))xendtype'at={top:intA.t;(** Where we steal *)bottom:intA.t;(** Where we push/pop from the owning thread *)mutabletop_cached:int;(** Last read value of [top] *)arr:'aCA.t;(** The circular array *)}letcreate~dummy():_t=lettop=A.make0inletarr=CA.create~dummy()in(* allocate far from [top] to avoid false sharing *)letbottom=A.make0in{top;top_cached=0;bottom;arr}let[@inline]size(self:_t):int=max0(A.getself.bottom-A.getself.top)exceptionFullletpush(self:'at)(x:'a):bool=tryletb=A.getself.bottominlett_approx=self.top_cachedin(* Section 2.3: over-approximation of size.
Only if it seems too big do we actually read [t]. *)letsize_approx=b-t_approxinifsize_approx>=CA.sizeself.arr-1then((* we need to read the actual value of [top], which might entail contention. *)lett=A.getself.topinself.top_cached<-t;letsize=b-tinifsize>=CA.sizeself.arr-1then(* full! *)raise_notraceFull);CA.setself.arrbx;A.setself.bottom(b+1);truewithFull->falseexceptionEmptyletpop_exn(self:'at):'a=letb=A.getself.bottominletb=b-1inA.setself.bottomb;lett=A.getself.topinself.top_cached<-t;letsize=b-tinifsize<0then((* reset to basic empty state *)A.setself.bottomt;raise_notraceEmpty)elseifsize>0then((* can pop without modifying [top] *)letx=CA.getself.arrbinx)else(assert(size=0);(* there was exactly one slot, so we might be racing against stealers
to update [self.top] *)ifA.compare_and_setself.topt(t+1)then(letx=CA.getself.arrbinA.setself.bottom(t+1);x)else(A.setself.bottom(t+1);raise_notraceEmpty))let[@inline]popself:_option=matchpop_exnselfwith|exceptionEmpty->None|t->Sometletsteal(self:'at):'aoption=(* read [top], but do not update [top_cached]
as we're in another thread *)lett=A.getself.topinletb=A.getself.bottominletsize=b-tinifsize<=0thenNoneelse(letx=CA.getself.arrtinifA.compare_and_setself.topt(t+1)then(* successfully increased top to consume [x] *)SomexelseNone)