123456789101112131415161718192021222324252627282930313233343536373839404142434445464748open!Coreopen!AsyncmoduleStream=structtypet={encoding:[`Chunked|`Fixedofint];reader:stringPipe.Reader.t;mutableread_started:bool}[@@derivingsexp_of]letof_pipeencodingreader={encoding;reader;read_started=false}letcloset=Pipe.close_readt.readerletencodingt=t.encodingletitert~f=ift.read_startedthenraise_s[%message"Only one consumer can read from a stream"];t.read_started<-true;Pipe.itert.reader~f;;letread_startedt=t.read_startedletdraint=ift.read_startedthenraise_s[%message"Cannot drain a body that's currently being read"];Pipe.draint.reader;;letclosedt=Pipe.closedt.readerendtypet=|Empty|Fixedofstring|StreamofStream.t[@@derivingsexp_of]letstringx=Fixedxletempty=Emptyletof_pipeencodingreader=Stream{Stream.encoding;reader;read_started=false}letstreamstream=Streamstreamletto_stream=function|Empty->Stream.of_pipe(`Fixed0)(Pipe.empty())|Fixedx->Stream.of_pipe(`Fixed(String.lengthx))(Pipe.singletonx)|Streamx->x;;