123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182openCoreopenCore.OopenStdunemoduleIn=struct(* Invariant: once [read] has returned [None], it always returns [None] *)typenonrec'at={mutableread:unit->'aoptiont;mutablereading:bool}letcreate_uncheckedread={read;reading=false}letcreateread=lett={read;reading=false}inletread()=let+x=read()inifOption.is_nonexthent.read<-(fun()->returnNone);xint.read<-read;tletlockt=ift.readingthenCode_error.raise"Fiber.Stream.In: already reading"[];t.reading<-trueletunlockt=t.reading<-falseletreadt=lockt;let+x=t.read()inunlockt;xletempty()=create_unchecked(fun()->returnNone)letconcat(typea)(xs:atlist)=letremains=refxsinletrecgo()=match!remainswith|[]->returnNone|x::xs->(let*v=readxinmatchvwith|Somev->return(Somev)|None->remains:=xs;go())increategoletappendxy=concat[x;y]letof_listxs=letxs=refxsincreate_unchecked(fun()->match!xswith|[]->returnNone|x::xs'->xs:=xs';return(Somex))letconsat=concat[of_list[a];t]letfilter_mapt~f=letrecread()=t.read()>>=function|None->unlockt;returnNone|Somex->(matchfxwith|None->read()|Somey->return(Somey))inlockt;create_uncheckedreadletsequential_itert~f=letrecloopt~f=t.read()>>=function|None->unlockt;return()|Somex->let*()=fxinloopt~finlockt;loopt~fletparallel_itert~fk=letn=ref1inletk()=decrn;if!n=0then(unlockt;k())elseend_of_fiberinletrecloopt=t.read()(function|None->k()|Somex->incrn;fork(fun()->fxk)(fun()->loopt))inlockt;looptendmoduleOut=structtypenonrec'at={mutablewrite:'aoption->unitt;mutablewriting:bool}letlockt=ift.writingthenCode_error.raise"Fiber.Stream.Out: already writing"[];t.writing<-trueletunlockt=t.writing<-falseletcreatewrite=lett={write;writing=false}inletwritex=ifOption.is_nonexthent.write<-(function|None->return()|Some_->Code_error.raise"Fiber.Stream.Out: stream output closed"[]);writexint.write<-write;tletwritetx=lockt;let+()=t.writexinunlocktletnull()=create(fun_->return())endletconnectio=In.locki;Out.locko;letrecgo()=let*a=i.read()inlet*()=o.writeainmatchawith|None->In.unlocki;Out.unlocko;return()|Some_->go()ingo()letsupplyio=In.locki;Out.locko;letrecgo()=let*a=i.read()inmatchawith|None->In.unlocki;Out.unlocko;return()|Some_->let*()=o.writeaingo()ingo()letpipe()=letmvar=Mvar.create()inleti=In.create(fun()->Mvar.readmvar)inleto=Out.create(funx->Mvar.writemvarx)in(i,o)