123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153open!Coreopen!ImportmoduleMutex=Error_checking_mutexmoduleQueue=Linked_queue(** Synchronized queue type *)type'at={ev_q:'aQueue.t;maxsize:int;mutex:(Mutex.t[@sexp.opaque]);not_empty:(Condition.t[@sexp.opaque]);not_full:(Condition.t[@sexp.opaque]);finally:unit->unit}[@@derivingsexp_of]letcreatemaxsize=letev_q=Queue.create()inletmutex=Mutex.create()inletnot_empty=Condition.create()inletnot_full=Condition.create()in{ev_q;mutex;not_empty;not_full;maxsize;finally=(fun()->letlen=Queue.lengthev_qiniflen<>0thenCondition.signalnot_empty;iflen<maxsizethenCondition.signalnot_full;Mutex.unlockmutex)};;letwrapqrun=Mutex.lockq.mutex;protect~f:run~finally:q.finally;;letclearq=letrun()=Queue.clearq.ev_qinwrapqrun;;letwait_not_fullq=whileQueue.lengthq.ev_q>=q.maxsizedoCondition.waitq.not_fullq.mutexdone;;letwait_not_emptyq=whileQueue.is_emptyq.ev_qdoCondition.waitq.not_emptyq.mutexdone;;(** Pushes an event on the queue if there's room *)letpushqx=letrun()=wait_not_fullq;Queue.enqueueq.ev_qxinwrapqrun;;(** Pushes an event on the queue, unconditionally, may grow the queue past maxsize *)letpush_uncondqx=letrun()=Queue.enqueueq.ev_qxinwrapqrun;;(** Pushes an event on the queue if the queue is less than maxsize, otherwise drops it.
Returns true if the push was successful *)letpush_or_dropqx=letrun()=ifQueue.lengthq.ev_q<q.maxsizethen(Queue.enqueueq.ev_qx;true)elsefalseinwrapqrun;;(** computes the length of the queue *)letlengthq=letrun()=Queue.lengthq.ev_qinwrapqrun;;(** Pops an event off of the queue, blocking until
something is available *)letpopq=letrun()=wait_not_emptyq;Queue.dequeue_exnq.ev_qinwrapqrun;;(** Pops an event off of the queue, blocking until something is available.
Returns pair of the element found and the length of remaining queue *)letlpopq=letrun()=wait_not_emptyq;letel=Queue.dequeue_exnq.ev_qinletlen=Queue.lengthq.ev_qinel,leninwrapqrun;;lettransfer_queue_in_uncondqin_q=ifnot(Queue.is_emptyin_q)then(letrun()=Queue.transfer~src:in_q~dst:q.ev_qinwrapqrun);;lettransfer_queue_inqin_q=ifnot(Queue.is_emptyin_q)then(letrun()=wait_not_fullq;Queue.transfer~src:in_q~dst:q.ev_qinwrapqrun);;lettransfer_queue_nowait_nolocksqq=Queue.transfer~src:sq.ev_q~dst:qlettransfer_queue_nowaitsqq=ifnot(Queue.is_emptysq.ev_q)then(letrun()=transfer_queue_nowait_nolocksqqinwrapsqrun);;lettransfer_queuesqq=letrun()=wait_not_emptysq;transfer_queue_nowait_nolocksqqinwrapsqrun;;(* The external version of wait_not_empty *)letwait_not_emptysq=letrun()=wait_not_emptysqinwrapsqrun;;