12345678910111213141516171819202122232425262728293031323334353637383940414243open!Coreopen!Importtype'at={mutablevalue:'aoption;mutablenum_waiting:int;mutex:Mutex.tsexp_opaque(* Threads that do [read t] when [is_none t.value] block using [Condition.wait t.full].
When [fill] sets [t.value], it uses [Condition.broadcast] to wake up all the blocked
threads. *);full:Condition.tsexp_opaque}[@@derivingsexp_of]letcreate()={value=None;num_waiting=0;mutex=Mutex.create();full=Condition.create()};;letcritical_sectiont~f=Mutex.critical_sectiont.mutex~fletfilltv=critical_sectiont~f:(fun()->ifis_somet.valuethenraise_s[%message"Thread_safe_ivar.fill of full ivar"];t.value<-Somev;Condition.broadcastt.full);;letreadt=matcht.valuewith|Somev->v|None->critical_sectiont~f:(fun()->matcht.valuewith|Somev->v|None->t.num_waiting<-t.num_waiting+1;Condition.waitt.fullt.mutex;t.num_waiting<-t.num_waiting-1;(matcht.valuewith|Somev->v|None->assertfalse));;