123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191(* This file is free software, part of containers. See file "license" for more details. *)(** {1 Blocking Queue} *)type'at={q:'aQueue.t;lock:Mutex.t;cond:Condition.t;capacity:int;mutablesize:int;}letcreaten=ifn<1theninvalid_arg"BloquingQueue.create";letq={q=Queue.create();lock=Mutex.create();cond=Condition.create();capacity=n;size=0;}inqletincr_size_q=assert(q.size<q.capacity);q.size<-q.size+1letdecr_size_q=assert(q.size>0);q.size<-q.size-1letfinally_fx~h=tryletres=fxinignore(h());reswithe->ignore(h());raiseeletwith_lock_qf=Mutex.lockq.lock;finally_f()~h:(fun()->Mutex.unlockq.lock)letpushqx=with_lock_q(fun()->whileq.size=q.capacitydoCondition.waitq.condq.lockdone;assert(q.size<q.capacity);Queue.pushxq.q;(* if there are blocked receivers, awake one of them *)incr_size_q;Condition.broadcastq.cond)lettakeq=with_lock_q(fun()->whileq.size=0doCondition.waitq.condq.lockdone;letx=Queue.takeq.qin(* if there are blocked senders, awake one of them *)decr_size_q;Condition.broadcastq.cond;x)(*$R
let q = create 1 in
let t1 = CCThread.spawn (fun () -> push q 1; push q 2) in
let t2 = CCThread.spawn (fun () -> push q 3; push q 4) in
let l = CCLock.create [] in
let t3 = CCThread.spawn (fun () -> for i = 1 to 4 do
let x = take q in
CCLock.update l (fun l -> x :: l)
done)
in
Thread.join t1; Thread.join t2; Thread.join t3;
assert_equal [1;2;3;4] (List.sort Stdlib.compare (CCLock.get l))
*)letpush_listql=(* push elements until it's not possible.
Assumes the lock is acquired. *)letrecpush_ql=matchlwith|[]->l|_::_whenq.size=q.capacity->l(* no room remaining *)|x::tl->Queue.pushxq.q;incr_size_q;push_qtlin(* push chunks of [l] in [q] until [l] is empty *)letrecauxql=matchlwith|[]->()|_::_->letl=with_lock_q(fun()->whileq.size=q.capacitydoCondition.waitq.condq.lockdone;letl=push_qlinCondition.broadcastq.cond;l)inauxqlinauxqllettake_listqn=(* take at most [n] elements of [q] and prepend them to [acc] *)letrecpop_accqn=ifn=0||Queue.is_emptyq.qthenacc,nelse((* take next element *)letx=Queue.takeq.qindecr_size_q;pop_(x::acc)q(n-1))in(* call [pop_] until [n] elements have been gathered *)letrecauxaccqn=ifn=0thenList.revaccelseletacc,n=with_lock_q(fun()->whileq.size=0doCondition.waitq.condq.lockdone;letacc,n=pop_accqninCondition.broadcastq.cond;acc,n)inauxaccqninaux[]qn(*$R
let n = 1000 in
let lists = [| CCList.(1 -- n) ; CCList.(n+1 -- 2*n); CCList.(2*n+1 -- 3*n) |] in
let q = create 2 in
let senders = CCThread.Arr.spawn 3
(fun i ->
if i=1
then push_list q lists.(i) (* test push_list *)
else List.iter (push q) lists.(i)
)
in
let res = CCLock.create [] in
let receivers = CCThread.Arr.spawn 3
(fun i ->
if i=1 then
let l = take_list q n in
CCLock.update res (fun acc -> l @ acc)
else
for _j = 1 to n do
let x = take q in
CCLock.update res (fun acc -> x::acc)
done
)
in
CCThread.Arr.join senders; CCThread.Arr.join receivers;
let l = CCLock.get res |> List.sort Stdlib.compare in
assert_equal CCList.(1 -- 3*n) l
*)lettry_takeq=with_lock_q(fun()->ifq.size=0thenNoneelse(decr_size_q;Some(Queue.takeq.q)))lettry_pushqx=with_lock_q(fun()->ifq.size=q.capacitythenfalseelse(incr_size_q;Queue.pushxq.q;Condition.signalq.cond;true))letpeekq=with_lock_q(fun()->trySome(Queue.peekq.q)withQueue.Empty->None)letsizeq=with_lock_q(fun()->q.size)letcapacityq=q.capacity