123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234(*
* Copyright (c) 2011-2016 Anil Madhavapeddy <anil@recoil.org>
* Copyright (c) 2015 Mindy Preston
* Copyright (c) 2015 Thomas Gazagnaire <thomas@gazagnaire.org>
*
* Permission to use, copy, modify, and distribute this software for any
* purpose with or without fee is hereby granted, provided that the above
* copyright notice and this permission notice appear in all copies.
*
* THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
* WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
* MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
* ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
* WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
* ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
* OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*)(** Buffered reading and writing over the Flow API *)openLwt.InfixmoduletypeS=Mirage_channel.Swithtype'aio='aLwt.tandtypebuffer=Cstruct.tletsrc=Logs.Src.create"channel"~doc:"Buffered reading and writing over the Flow API"moduleLog=(valLogs.src_logsrc:Logs.LOG)moduleMake(Flow:Mirage_flow_lwt.S)=structtypeflow=Flow.flowtypebuffer=Cstruct.ttype+'aio='aLwt.ttypeerror=[`Read_zero|`FlowofFlow.error]typewrite_error=Flow.write_errorletpp_errorppf=function|`Flowe->Flow.pp_errorppfe|`Read_zero->Fmt.stringppf"FLOW.read returned 0 bytes in violation of the specification"letpp_write_error=Flow.pp_write_errortypet={flow:flow;mutableibuf:Cstruct.toption;(* Queue of incoming buf *)mutableobufq:Cstruct.tlist;(* Queue of completed writebuf *)mutableobuf:Cstruct.toption;(* Active write buffer *)mutableopos:int;(* Position in active write buffer *)}letcreateflow=letibuf=Noneinletobufq=[]inletobuf=Noneinletopos=0in{ibuf;obuf;flow;obufq;opos}letto_flow{flow;_}=flowletibuf_refillt=Flow.readt.flow>|=function|Ok(`Databuf)whenCstruct.lenbuf=0->Log.err(funl->l"%a"pp_error`Read_zero);Error`Read_zero|Ok(`Databuf)->t.ibuf<-Somebuf;Ok(`Databuf)|Ok`Eof->Ok`Eof|Errore->Error(`Flowe)letbindvfn=v>>=function|Ok(`Databuf)->fnbuf|Ok`Eof->Lwt.return(Ok`Eof)|Errore->Lwt.return(Errore)let(>>=~)=bindletrecget_ibuft=matcht.ibufwith|None->ibuf_refillt>>=~fun_->get_ibuft|SomebufwhenCstruct.lenbuf=0->ibuf_refillt>>=~fun_->get_ibuft|Somebuf->Lwt.return(Ok(`Databuf))(* Read one character from the input channel *)letread_chart=get_ibuft(* the fact that we returned means we have at least 1 char *)>>=~funbuf->letc=Cstruct.get_charbuf0int.ibuf<-Some(Cstruct.shiftbuf1);(* advance read buffer, possibly to
EOF *)Lwt.return(Ok(`Datac))(* Read up to len characters from the input channel
and at most a full view. If not specified, read all *)letread_some?lent=(* get_ibuf potentially throws EOF-related exceptions *)get_ibuft>>=~funbuf->letavail=Cstruct.lenbufinletlen=matchlenwith|Somelen->len|None->availiniflen<availthenbeginlethd,tl=Cstruct.splitbuflenint.ibuf<-Sometl;(* leave some in the buffer; next time, we won't do a
blocking read *)Lwt.return(Ok(`Datahd))endelsebegint.ibuf<-None;Lwt.return(Ok(`Databuf))endletread_exactly~lent=letrecloopacc=function|0->Lwt.return(Ok(`Data(List.revacc)))|len->read_some~lent>>=~funbuffer->loop(buffer::acc)(len-(Cstruct.lenbuffer))inloop[]len(* Read until a character is found *)letread_untiltch=get_ibuft>>=~funbuf->letlen=Cstruct.lenbufinletrecscanoff=ifoff=lenthenNoneelseifCstruct.get_charbufoff=chthenSomeoffelsescan(off+1)inmatchscan0with|None->(* not found, return what we have until EOF *)t.ibuf<-None;(* basically guaranteeing that next read is EOF *)Lwt.return(Ok(`Not_foundbuf))|Someoff->(* found, so split the buffer *)lethd=Cstruct.subbuf0offint.ibuf<-Some(Cstruct.shiftbuf(off+1));Lwt.return(Ok(`Foundhd))(* This reads a line of input, which is terminated either by a CRLF
sequence, or the end of the channel (which counts as a line).
@return Returns a stream of views that terminates at EOF. *)letread_linet=letrecgetacc=read_untilt'\n'>>=function|Errore->Lwt.return(Errore)|Ok`Eof->Lwt.return(Ok(`Dataacc))|Ok(`Not_foundbuf)whenCstruct.lenbuf=0->Lwt.return(Ok(`Dataacc))|Ok(`Not_foundbuf)->get(buf::acc)|Ok(`Foundbuf)->(* chop the CR if present *)letbuflen=Cstruct.lenbufinletbuf=ifbuflen>0&&(Cstruct.get_charbuf(buflen-1)='\r')thenCstruct.subbuf0(buflen-1)elsebufinLwt.return(Ok(`Data(buf::acc)))inget[]>>=~funbits->Lwt.return(Ok(`Data(List.revbits)))(* Output functions *)letalloc_obuft=letbuf=Io_page.to_cstruct(Io_page.get1)int.obuf<-Somebuf;t.opos<-0;buf(* Queue the active write buffer onto the write queue, resizing the
* view if necessary to the correct size. *)letqueue_obuft=matcht.obufwith|None->()|SomebufwhenCstruct.lenbuf=t.opos->(* obuf is full *)t.obufq<-buf::t.obufq;t.obuf<-None|Some_whent.opos=0->(* obuf wasnt ever used, so discard *)t.obuf<-None|Somebuf->(* partially filled obuf, so resize *)letbuf=Cstruct.subbuf0t.oposint.obufq<-buf::t.obufq;t.obuf<-None(* Get an active output buffer, which will allocate it if needed.
* The position to write into is stored in t.opos *)letget_obuft=matcht.obufwith|None->alloc_obuft|SomebufwhenCstruct.lenbuf=t.opos->queue_obuft;alloc_obuft|Somebuf->buf(* Non-blocking character write, since Io page allocation never blocks.
* That may change in the future... *)letwrite_chartch=letbuf=get_obuftinCstruct.set_charbuft.oposch;t.opos<-t.opos+1(* This is zero copy; flush current IO page and queue up the incoming
* buffer directly. *)letwrite_buffertbuf=queue_obuft;t.obufq<-buf::t.obufqletrecwrite_stringtsofflen=letbuf=get_obuftinletavail=Cstruct.lenbuf-t.oposinifavail<lenthenbeginCstruct.blit_from_stringsoffbuft.oposavail;t.opos<-t.opos+avail;write_stringts(off+avail)(len-avail)endelsebeginCstruct.blit_from_stringsoffbuft.oposlen;t.opos<-t.opos+lenendletwrite_linetbuf=write_stringtbuf0(String.lengthbuf);write_chart'\n'letflusht=queue_obuft;letl=List.revt.obufqint.obufq<-[];Flow.writevt.flowlletcloset=Lwt.finalize(fun()->flusht)(fun()->Flow.closet.flow)end