123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103(*****************************************************************************)(* *)(* Open Source License *)(* Copyright (c) 2020-2022 Nomadic Labs <contact@nomadic-labs.com> *)(* Copyright (c) 2020 Metastate AG <hello@metastate.dev> *)(* *)(* Permission is hereby granted, free of charge, to any person obtaining a *)(* copy of this software and associated documentation files (the "Software"),*)(* to deal in the Software without restriction, including without limitation *)(* the rights to use, copy, modify, merge, publish, distribute, sublicense, *)(* and/or sell copies of the Software, and to permit persons to whom the *)(* Software is furnished to do so, subject to the following conditions: *)(* *)(* The above copyright notice and this permission notice shall be included *)(* in all copies or substantial portions of the Software. *)(* *)(* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR*)(* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, *)(* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL *)(* THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER*)(* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING *)(* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER *)(* DEALINGS IN THE SOFTWARE. *)(* *)(*****************************************************************************)openBasetypet={queue:stringrefQueue.t;mutablelwt_channel:Lwt_io.input_channeloption;mutableclosed:bool;mutablepending:intlist;}letwake_upecho=letpending=echo.pendinginecho.pending<-[];List.iter(funpending->Lwt_unix.send_notificationpending)pendingletpushechostring=(* Maintain the invariant that strings in the queue are never empty. *)ifString.lengthstring>0then(Queue.push(refstring)echo.queue;wake_upecho)letcloseecho=ifnotecho.closedthen(echo.closed<-true;wake_upecho)letcreate()=letecho={queue=Queue.create();lwt_channel=None;closed=false;pending=[]}inletrecreadbytesofslen=matchQueue.peek_optecho.queuewith|None->ifecho.closedthenreturn0else(* Nothing to read, for now. *)letpromise,resolver=Lwt.task()inletnote=Lwt_unix.make_notification~once:true(fun()->Lwt.wakeup_laterresolver())inecho.pending<-note::echo.pending;let*()=promiseinreadbytesofslen|Somestr_ref->(* Note: we rely on the invariant that strings in the queue are never empty. *)letstr_len=String.length!str_refinifstr_len<=lenthen((* Caller requested more bytes than available in this item of the queue:
return the item in full and remove it from the queue. *)(* use [Lwt_bytes.blit_from_string] once available *)Lwt_bytes.blit_from_bytes(Bytes.of_string!str_ref)0bytesofsstr_len;let(_:stringrefoption)=Queue.take_optecho.queueinreturnstr_len)else((* Caller requested strictly less bytes than available in this item of the queue:
return what caller requested, and only keep the remainder. *)(* use [Lwt_bytes.blit_from_string] once available *)Lwt_bytes.blit_from_bytes(Bytes.of_string!str_ref)0bytesofslen;str_ref:=String.sub!str_reflen(str_len-len);returnlen)inletlwt_channel=Lwt_io.(make~mode:input)readinecho.lwt_channel<-Somelwt_channel;echoletget_lwt_channelecho=matchecho.lwt_channelwith|None->(* Impossible: [lwt_channel] is filled by [Some ...] immediately after the [echo]
is created by [create_echo]. *)assertfalse|Somelwt_channel->lwt_channel