123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156(*----------------------------------------------------------------------------
* Copyright (c) 2022, António Nuno Monteiro
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* 1. Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
*
* 2. Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
*
* 3. Neither the name of the copyright holder nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*---------------------------------------------------------------------------*)openEio.Stdtype'akind=|Fromof(unit->'aoption)|Pushof{stream:'aEio.Stream.t;capacity:int}type'at={stream:'akind;is_closed:boolAtomic.t;closed:unitPromise.t*unitPromise.u}letunsafe_eio_stream{stream;_}=matchstreamwithFrom_->assertfalse|Push{stream;_}->streamletis_closed{is_closed;_}=Atomic.getis_closedletcloset=ifnot(is_closedt)then(let{closed=_,u;_}=tinAtomic.sett.is_closedtrue;Promise.resolveu())letpushtitem=letstream=unsafe_eio_streamtinmatchitemwithSomeitem->Eio.Stream.addstreamitem|None->closetletcreatecapacity=letstream=Eio.Stream.createcapacityinlett={stream=Push{stream;capacity};is_closed=Atomic.makefalse;closed=Promise.create()}int,pushtletempty()=lett,_=create0incloset;tletfrom~f={stream=Fromf;is_closed=Atomic.makefalse;closed=Promise.create()}letclosedt=let{closed=p,_;_}=tinpletwhen_closed~ft=Promise.await(closedt);f()letof_listxs=letstream,_push=create(List.lengthxs)inList.iter(Eio.Stream.add(unsafe_eio_streamstream))xs;(* TODO(anmonteiro): should this return a closed stream? *)streamlettaket=matcht.streamwith|Fromf->(matchf()with|Some_asitem->item|None->closet;None)|Push{capacity=0;_}->None|Push{stream;_}->Fiber.first(fun()->Some(Eio.Stream.takestream))(fun()->let{closed=p,_;_}=tinPromise.awaitp;None)lettake_nonblockingt=matcht.streamwith|From_f->None|Push{stream;_}->Eio.Stream.take_nonblockingstreamletmap~ft=from~f:(fun()->matchtaketwithSomeitem->Some(fitem)|None->None)letreciter~ft=matcht.streamwith|Push{capacity=0;_}whenis_closedt->()|Push_|From_->(matchtaketwith|Someitem->fitem;iter~ft|None->())letreciter_p~sw~ft=matcht.streamwith|Push{capacity=0;_}whenis_closedt->()|Push_|From_->(matchtaketwith|Someitem->letresult=Fiber.fork_promise~sw(fun()->fitem)andrest=Fiber.fork_promise~sw(fun()->iter_p~sw~ft)inPromise.await_exnresult;Promise.await_exnrest|None->())letfold~f~initt=letrecloop~f~acct=matchtaketwithSomeitem->loop~f~acc:(faccitem)t|None->accinloop~f~acc:inittletto_listt=letlst=fold~f:(funaccitem->item::acc)~init:[]tinList.revlstletdraint=iter~f:ignoretletrecdrain_availablet=matchtake_nonblockingtwithSome_->drain_availablet|None->()