123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129(* A lock-free multi-producer, single-consumer, thread-safe queue without support for cancellation.
This makes a good data structure for a scheduler's run queue.
Based on Vesa Karvonen's example at:
https://github.com/ocaml-multicore/picos/blob/07d6c2d391e076b490098c0379d01208b3a9cc96/test/lib/foundation/mpsc_queue.ml
*)exceptionClosedexceptionEmpty(* A list where the end indicates whether the queue is closed. *)type'aclist=(::)of'a*'aclist|Open|Closed(* [rev_append l1 l2] is like [rev l1 @ l2] *)letrecrev_appendl1l2=matchl1with|a::l->rev_appendl(a::l2)|Open->l2|Closed->assertfalselet[@tail_mod_cons]rec(@)l1l2=matchl1with|h1::tl->h1::(tl@l2)|Open->l2|Closed->assertfalse(* *)(* The queue contains [head @ rev tail].
If [tail] is non-empty then it ends in [Open]. *)type'at={mutablehead:'aclist;tail:'aclistAtomic.t}letcreate()={head=Open;tail=Atomic.make_contendedOpen}let[@tail_mod_cons]recappend_list_to_clistll'=matchlwith[]->l'|List.(x::xs)->x::append_list_to_clistxsl'letof_listl={head=append_list_to_clistlOpen;tail=Atomic.make_contendedOpen}(* *)letis_emptyt=matcht.headwith|_::_->false|Closed->raiseClosed|Open->(matchAtomic.gett.tailwith_::_->false|_->true)letcloset=matchAtomic.exchanget.tailClosedwith|Closed->raiseClosed|xs->t.head<-t.head@rev_appendxsClosed(* *)letrecpushtx=matchAtomic.gett.tailwith|Closed->raiseClosed|before->letafter=x::beforeinifnot(Atomic.compare_and_sett.tailbeforeafter)thenpushtxletpush_headtx=matcht.headwithClosed->raiseClosed|before->t.head<-x::beforelet[@tail_mod_cons]recappend_list_to_clistll'=matchlwith[]->l'|List.(x::xs)->x::append_list_to_clistxsl'letrecpush_alltvalues=matchAtomic.gett.tailwith|Closed->raiseClosed|before->letafter=append_list_to_clist(List.revvalues)beforeinifnot(Atomic.compare_and_sett.tailbeforeafter)thenpush_alltvalues(* *)type('a,_)poly=|Option:('a,'aoption)poly|Value:('a,'a)poly|Unit:('a,unit)polyletrecpop_as:typear.at->(a,r)poly->r=funtpoly->matcht.headwith|x::xs->begint.head<-xs;matchpolywithOption->Somex|Value->x|Unit->()end|Closed->raiseClosed|Open->((* We know the tail is open because we just saw the head was open
and we don't run concurrently with [close]. *)matchAtomic.exchanget.tailOpenwith|Closed->assertfalse|Open->(matchpolywith|Option->None|Value|Unit->raiseEmpty(* Optimise the common case *))|tail->t.head<-rev_appendtailOpen;pop_astpoly)(* *)type('a,_)poly2=Option:('a,'aoption)poly2|Value:('a,'a)poly2letrecpeek_as:typear.at->(a,r)poly2->r=funtpoly->matcht.headwith|x::_->(matchpolywithOption->Somex|Value->x)|Closed->raiseClosed|Open->((* We know the tail is open because we just saw the head was open
and we don't run concurrently with [close]. *)matchAtomic.exchanget.tailOpenwith|Closed->assertfalse|Open->(matchpolywithOption->None|Value->raiseEmpty)|tail->t.head<-rev_appendtailOpen;peek_astpoly)(* *)letpop_optt=pop_astOptionletpop_exnt=pop_astValueletdrop_exnt=pop_astUnitletpeek_exnt=peek_astValueletpeek_optt=peek_astOption