123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166(*****************************************************************************)(* *)(* Open Source License *)(* Copyright (c) 2021 Nomadic Labs, <contact@nomadic-labs.com> *)(* *)(* Permission is hereby granted, free of charge, to any person obtaining a *)(* copy of this software and associated documentation files (the "Software"),*)(* to deal in the Software without restriction, including without limitation *)(* the rights to use, copy, modify, merge, publish, distribute, sublicense, *)(* and/or sell copies of the Software, and to permit persons to whom the *)(* Software is furnished to do so, subject to the following conditions: *)(* *)(* The above copyright notice and this permission notice shall be included *)(* in all copies or substantial portions of the Software. *)(* *)(* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR*)(* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, *)(* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL *)(* THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER*)(* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING *)(* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER *)(* DEALINGS IN THE SOFTWARE. *)(* *)(*****************************************************************************)typeerror+=|Invalid_read_requestof{expected:string;pos:int64;length_to_copy:int64;buflen:int64;}let()=register_error_kind`Permanent~id:"p2p_io_scheduler.invalid_read_request"~title:"Read request is erroneously specified"~description:"When about to read incoming data, read parameters are erroneous"~pp:(funfmt(expected,pos,length_to_copy,buflen)->Format.fprintffmt"%s should hold, but pos=%Ld length_to_copy=%Ld buflen=%Ld"expectedposlength_to_copybuflen)Data_encoding.(obj4(req"expected"string)(req"pos"int64)(req"length_to_copy"int64)(req"buflen"int64))(function|Invalid_read_request{expected;pos;length_to_copy;buflen}->Some(expected,pos,length_to_copy,buflen)|_->None)(fun(expected,pos,length_to_copy,buflen)->Invalid_read_request{expected;pos;length_to_copy;buflen})typereadable={read_buffer:Circular_buffer.t;read_queue:Circular_buffer.datatzresultLwt_pipe.Maybe_bounded.t;mutablepartial_read:Circular_buffer.dataoption;}letmk_readable~read_buffer~read_queue={read_buffer;read_queue;partial_read=None}(** Container to write bytes to when reading [length_to_copy] bytes from a readable.
Bytes read are written to [buf] starting at offset [pos].
Values of this type guarantee the invariants:
- [0 <= length_to_copy]
- [0 <= pos]
- [pos + length_to_copy <= (Bytes.length buf)]
This is ensured by the smart constructor [mk_buffer] and
the type being abstract. *)typebuffer={length_to_copy:int;pos:int;buf:Bytes.t}letmk_buffer?pos?length_to_copybuf:buffertzresult=letopenResult_syntaxinletbuflen=Bytes.lengthbufinletpos=Option.value~default:0posinletlength_to_copy=Option.value~default:(buflen-pos)length_to_copyinletcheckcond~expected=ifcondthenreturn_unitelsetzfail(Invalid_read_request{expected;pos=Int64.of_intpos;length_to_copy=Int64.of_intpos;buflen=Int64.of_intbuflen;})inlet*()=check(0<=length_to_copy)~expected:"0 <= length_to_copy"inlet*()=check(0<=pos)~expected:"0 <= pos"inlet*()=check(pos+length_to_copy<=buflen)~expected:"pos + length_to_copy <= buflen"inOk{length_to_copy;pos;buf}letmk_buffer_safebuf:buffer={length_to_copy=Bytes.lengthbuf;pos=0;buf}(** [shift amount buffer] returns a variant of [buffer] with its [pos] shifted
[amount] cells to the right and [length_to_copy] adapted accordingly. *)letshiftamount{pos;length_to_copy;buf}=mk_buffer~pos:(pos+amount)~length_to_copy:(length_to_copy-amount)buf(** Copy [length_to_copy] bytes from [data] starting at [pos] into [buf].
If not all [data] is read, the remainder is put back in
[readable.partial_read] *)letread_fromreadable{pos=offset;length_to_copy;buf}data=matchdatawith|Okdata->letread_len=minlength_to_copy(Circular_buffer.lengthdata)inOption.iter(fundata->readable.partial_read<-Somedata)(Circular_buffer.readdatareadable.read_buffer~len:read_len~into:buf~offset);Okread_len|Error_->Result_syntax.tzfailP2p_errors.Connection_closedletread?cancelerreadablebuffer=letopenLwt_syntaxinmatchreadable.partial_readwith|Somemsg->readable.partial_read<-None;Lwt.return(read_fromreadablebuffer(Okmsg))|None->let+m=protect?canceler(fun()->Lwt_pipe.Maybe_bounded.popreadable.read_queue)inread_fromreadablebuffermletread_full?cancelerreadablebuffer=letopenLwt_result_syntaxinletrecloop({length_to_copy;_}asbuffer)=iflength_to_copy=0thenreturn_unitelselet*read_len=read?cancelerreadablebufferin(* This is safe - even if the initial [length_to_copy] is not a multiple of the
readable's pending bytes - because [read] reads *at most*
the requested number of bytes: it doesn't try to read more
than available. *)let*?buffer=shiftread_lenbufferinloopbufferinloopbuffermoduleInternal_for_tests=structletdestruct_buffer{pos;length_to_copy;buf}=(pos,length_to_copy,buf)end