123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117open!Core_kernelopen!Importopen!Deferred_stdtype('a,'phantom)t={current_value:'aMoption.t;taken:(unit,read_write)Bvar.t;mutablevalue_available:unitIvar.t}[@@derivingfields,sexp_of]letvalue_availablet=Ivar.readt.value_availableletis_emptyt=Moption.is_nonet.current_valueletinvariantinvariant_a_(t:_t)=Invariant.invariant[%here]t[%sexp_of:(_,_)t](fun()->letcheckf=Invariant.check_fieldtfinFields.iter~current_value:(check(Moption.invariantinvariant_a))~taken:(check(Bvar.invariantUnit.invariantignore))~value_available:(check(funvalue_available->[%test_result:bool](Ivar.is_fullvalue_available)~expect:(Moption.is_somet.current_value))));;letpeekt=Moption.gett.current_valueletpeek_exnt=ifis_emptytthenraise_s[%message"Mvar.peek_exn called on empty mvar"];Moption.get_some_exnt.current_value;;letsexp_of_tsexp_of_a_t=[%sexp(peekt:aoption)]moduleRead_write=structtypenonrec'at=('a,read_write)t[@@derivingsexp_of]letinvariantinvariant_at=invariantinvariant_aignoretendmoduleRead_only=structtypenonrec'at=('a,read)t[@@derivingsexp_of]letinvariantinvariant_at=invariantinvariant_aignoretendletread_only(t:('a,[>read])t)=(t:>('a,read)t)letwrite_only(t:('a,[>write])t)=(t:>('a,write)t)letcreate()={current_value=Moption.create();taken=Bvar.create();value_available=Ivar.create()};;lettake_nonemptyt=assert(not(is_emptyt));letr=Moption.get_some_exnt.current_valueinMoption.set_nonet.current_value;Bvar.broadcastt.taken();t.value_available<-Ivar.create();r;;lettake_now_exnt=ifis_emptytthenraise_s[%message"Mvar.take_exn called on empty mvar"];take_nonemptyt;;lettake_nowt=ifnot(is_emptyt)thenSome(take_nonemptyt)elseNoneletrectaket=ifnot(is_emptyt)thenreturn(take_nonemptyt)else(let%bind()=value_availabletintaket);;letsettv=Moption.set_somet.current_valuev;Ivar.fill_if_emptyt.value_available();;letupdatet~f=sett(f(peekt))letupdate_exnt~f=sett(f(peek_exnt))lettakent=Bvar.waitt.takenletrecputtv=ifis_emptytthen(settv;return())else(let%bind()=takentinputtv);;letpipe_when_readyt=letr,w=Pipe.create()inletrecloop()=let%bind()=value_availabletinifnot(Pipe.is_closedw)then(matchtake_nowtwith|None->loop()|Somex->let%bind()=Pipe.writewxinloop())elsereturn()indon't_wait_for(loop());r;;