123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142(* A lock-free multi-producer, single-consumer, thread-safe queue without support for cancellation.
This makes a good data structure for a scheduler's run queue.
See: "Implementing lock-free queues"
https://people.cs.pitt.edu/~jacklange/teaching/cs2510-f12/papers/implementing_lock_free.pdf
It is simplified slightly because we don't need multiple consumers.
Therefore [head] is not atomic. *)exceptionClosedmoduleNode:sigtype'at={next:'aoptAtomic.t;mutablevalue:'a;}and+'aoptvalmake:next:'aopt->'a->'atvalnone:'aopt(** [t.next = none] means that [t] is currently the last node. *)valclosed:'aopt(** [t.next = closed] means that [t] will always be the last node. *)valsome:'at->'aoptvalfold:'aopt->none:(unit->'b)->some:('at->'b)->'bend=struct(* https://github.com/ocaml/RFCs/pull/14 should remove the need for magic here *)type+'aopt(* special | 'a t *)type'at={next:'aoptAtomic.t;mutablevalue:'a;}typespecial=|Nothing|Closedletnone:'a.'aopt=Obj.magicNothingletclosed:'a.'aopt=Obj.magicClosedletsome(t:'at):'aopt=Obj.magictletfold(opt:'aopt)~none:n~some=ifopt==nonethenn()elseifopt==closedthenraiseClosedelsesome(Obj.magicopt:'at)letmake~nextvalue={value;next=Atomic.makenext}endtype'at={tail:'aNode.tAtomic.t;mutablehead:'aNode.t;}(* [head] is the last node dequeued (or a dummy node, initially).
[head.next] gives the real first node, if not [Node.none].
If [tail.next] is [none] then it is the last node in the queue.
Otherwise, [tail.next] is a node that is closer to the tail. *)letpushtx=letnode=Node.(make~next:none)xinletrecaux()=letp=Atomic.gett.tailin(* While [p.next == none], [p] is the last node in the queue. *)ifAtomic.compare_and_setp.nextNode.none(Node.somenode)then((* [node] has now been added to the queue (and possibly even consumed).
Update [tail], unless someone else already did it for us. *)ignore(Atomic.compare_and_sett.tailpnode:bool))else((* Someone else added a different node first ([p.next] is not [none]).
Make [t.tail] more up-to-date, if it hasn't already changed, and try again. *)Node.fold(Atomic.getp.next)~none:(fun()->assertfalse)~some:(funp_next->ignore(Atomic.compare_and_sett.tailpp_next:bool);aux()))inaux()letrecpush_headtx=letp=t.headinletnext=Atomic.getp.nextinifnext==Node.closedthenraiseClosed;letnode=Node.make~nextxinifAtomic.compare_and_setp.nextnext(Node.somenode)then((* We don't want to let [tail] get too far behind, so if the queue was empty, move it to the new node. *)ifnext==Node.nonethen(ignore(Atomic.compare_and_sett.tailpnode:bool);)else((* If the queue wasn't empty, there's nothing to do.
Either tail isn't at head or there is some [push] thread working to update it.
Either [push] will update it directly to the new tail, or will update it to [node]
and then retry. Either way, it ends up at the real tail. *)))else((* Someone else changed it first. This can only happen if the queue was empty. *)assert(next==Node.none);push_headtx)letrecclose(t:'at)=(* Mark the tail node as final. *)letp=Atomic.gett.tailinifnot(Atomic.compare_and_setp.nextNode.noneNode.closed)then((* CAS failed because [p] is no longer the tail (or is already closed). *)Node.fold(Atomic.getp.next)~none:(fun()->assertfalse)(* Can't switch from another state to [none] *)~some:(funp_next->(* Make [tail] more up-to-date if it hasn't changed already *)ignore(Atomic.compare_and_sett.tailpp_next:bool);(* Retry *)closet))letpopt=letp=t.headin(* [p] is the previously-popped item. *)letnode=Atomic.getp.nextinNode.foldnode~none:(fun()->None)~some:(funnode->t.head<-node;letv=node.valueinnode.value<-Obj.magic();(* So it can be GC'd *)Somev)letis_emptyt=Node.fold(Atomic.gett.head.next)~none:(fun()->true)~some:(fun_->false)letcreate()=letdummy={Node.value=Obj.magic();next=Atomic.makeNode.none}in{tail=Atomic.makedummy;head=dummy}