123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960type'at={buffer:'aoptionarray;mutablerd_pos:int;mutablewr_pos:int;mutableclosed:bool;mutablehalted:bool;lock:Miou.Mutex.t;non_empty:Miou.Condition.t;non_full:Miou.Condition.t}letcreatesize:'at=letlock=Miou.Mutex.create()inletnon_empty=Miou.Condition.create()inletnon_full=Miou.Condition.create()in{buffer=Array.makesizeNone;lock;rd_pos=0;wr_pos=0;closed=false;halted=false;non_empty;non_full}letcloset=Miou.Mutex.protectt.lock@@fun()->t.closed<-true;Miou.Condition.signalt.non_emptylethaltt=Miou.Mutex.protectt.lock@@fun()->t.halted<-true;Miou.Condition.signalt.non_emptyletputtdata=Miou.Mutex.protectt.lock@@fun()->ifnot(t.closed||t.halted)then(while(t.wr_pos+1)modArray.lengtht.buffer=t.rd_posdoMiou.Condition.waitt.non_fullt.lockdone;(* do nothing if stream was closed or halted while waiting *)ifnot(t.closed||t.halted)then(t.buffer.(t.wr_pos)<-Somedata;t.wr_pos<-(t.wr_pos+1)modArray.lengtht.buffer;Miou.Condition.signalt.non_empty))letgett=Miou.Mutex.protectt.lock@@fun()->whilet.wr_pos=t.rd_pos&¬(t.closed||t.halted)doMiou.Condition.waitt.non_emptyt.lockdone;ift.halted||(t.closed&&t.rd_pos=t.wr_pos)thenNoneelseletdata=t.buffer.(t.rd_pos)int.buffer.(t.rd_pos)<-None;t.rd_pos<-(t.rd_pos+1)modArray.lengtht.buffer;Miou.Condition.signalt.non_full;data