123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439(* $Id$
* ----------------------------------------------------------------------
*
*)openNetchannels;;classtypein_obj_stream=objectinheritNetchannels.in_obj_channelmethodblock_size:intmethodwindow:Netbuffer.tmethodwant:int->unitmethodwant_another_block:unit ->unitmethodwindow_length:intmethod window_at_eof:boolmethodskip:int->unitendclassvirtualinput_methodsinit_s_netbuf=object(self)valmutables_pos=0valmutables_at_eof=falsevals_netbuf=init_s_netbufvalmutables_closed=falsemethodvirtualwant:int->unitmethodvirtualwant_another_block:unit->unitmethodvirtualwindow_length:intmethodvirtualinput:Bytes.t->int->int->int(* The following input methods base all on [input] *)methodreally_inputbufposlen=ifs_closedthenraiseNetchannels.Closed_channel;letrecreadp=letl=self#inputbuf(pos+p)(len-p)inletp'=p+linifp'=lenthen()else(ifl=0then raiseSys_blocked_io;readp')inself#wantlen;(* may raise Buffer_underrun *)read0methodreally_input_stringlen=letbuf=Bytes.createleninself#really_inputbuf0len;Bytes.unsafe_to_stringbufmethodinput_char()=let s=Bytes.create1inself#really_inputs01;Bytes.gets0methodinput_byte()=lets=Bytes.create1inself#really_inputs01;int_of_char(Bytes.gets0)methodinput_line()=(* CHECK: Are the different end of line conventions important here? *)letrecfind_eol()=tryNetbuffer.index_from s_netbuf0'\n'(* or Not_found *)withNot_found->ifnots_at_eofthenbeginself#want_another_block();(* may raise Buffer_underrun *)find_eol()endelseself#window_lengthinifs_closedthenraiseNetchannels.Closed_channel;letn=find_eol()inifn>=self#window_lengththenbeginifn=0thenraiseEnd_of_file;self#really_input_stringnendelsebeginlets=self#really_input_stringninignore(self#input_char());(* '\n' *)sendmethodpos_in=ifs_closedthenraiseNetchannels.Closed_channel;s_posendclassinput_stream?len?(block_size=4096)in_ch:in_obj_stream=object(self)vals_channel=(in_ch:in_obj_channel)vals_maxlength=lenvals_blocksize =block_sizevalmutables_underrun=falseinheritinput_methods(Netbuffer.createblock_size)(* Note: This implementation must even work if [in_ch] is a pipe,
* and raises Buffer_underrun from time to time. This may happen
* at inconvenient situations. In this case the flag s_underrun stores
* whether an underrun happened, and should be reported later.
*)initializertryself#want_minimum()(* may raise Buffer_underrun *)withBuffer_underrun->s_underrun<-truemethodprivatedebugmsg=prerr_endline(msg^": s_pos="^string_of_int s_pos^" s_at_eof="^string_of_bools_at_eof^" buflen="^string_of_int(Netbuffer.lengths_netbuf)^" s_closed="^string_of_bools_closed);methodblock_size=s_blocksize(* The block size is a static property, so never raise Closed_channel *)methodwindow=ifs_closedthenraiseNetchannels.Closed_channel;s_netbufmethodwindow_length=ifs_closedthenraiseNetchannels.Closed_channel;Netbuffer.lengths_netbufmethodwindow_at_eof=ifs_closedthenraiseNetchannels.Closed_channel;s_at_eofmethodwant_another_block()=ifs_closedthenraiseNetchannels.Closed_channel;ifnots_at_eofthenbegin(* How much are we allowed to read? *)letm=matchs_maxlengthwithNone->s_blocksize|Somel->min(l-s_pos-Netbuffer.lengths_netbuf)s_blocksizeinassert(m>=0);(* Try to read m bytes: *)letrecread_blockk=ifk<mthenletn=Netbuffer.add_inplace~len:(m-k)s_netbuf(s_channel#input)(* may raise End_of_file, Buffer_underrun *)in(ifn>0thenread_block(k+n)elseraiseSys_blocked_io)else()intryifm=0then(* Artificial EOF because len is reached *)s_at_eof<-trueelseread_block0withEnd_of_file->s_at_eof<-trueend;(* self # debug "after stream#want_another_block"; *)(* Unix.sleep 1; *)methodwantn=ifs_closedthenraiseNetchannels.Closed_channel;whilenots_at_eof&&Netbuffer.lengths_netbuf<ndoself#want_another_block()donemethodprivatewant_minimum()=self#wants_blocksizemethodskiplen=ifs_closedthenraiseNetchannels.Closed_channel;letrecreadlen=iflen>0thenbeginlet k=min(Netbuffer.lengths_netbuf)leninNetbuffer.delete s_netbuf0k;s_pos<-s_pos+k;self#want_minimum();(*may raise Buffer_underrun *)ifk>0thenread(len-k)endinread lenmethodinputbufposlen=ifs_closedthenraiseNetchannels.Closed_channel;ifs_underrunthen(self#want_minimum();(* may raise Buffer_underrun *)s_underrun<-false;);(* Assertion: Either window length >= minimum, or eof *)letlen'=minlen(Netbuffer.lengths_netbuf)inNetbuffer.blits_netbuf0bufposlen';Netbuffer.deletes_netbuf0len';s_pos<-s_pos +len';(tryself#want_minimum();(* may raise Buffer_underrun *)withBuffer_underrun->s_underrun<-true);iflen'=0&&len>0thenraiseEnd_of_file;len'methodclose_in()=ifnots_closedthen(s_channel#close_in();s_closed<-true;)end(*
let find_prefix s1 pos len s2 =
(* Checks where a non-empty prefix of [s2] occurs at the end of the substring
* of [s1] beginning at [pos] with length [len]. The function returns
* the position [p] of the prefix in [s1].
* The function raises Not_found if it does not find a prefix.
* POSTCONDITION:
* - s1[p..p+n-1] = s2[0..n-1] for some biggest n, n <= String.length s2
* "The string s1 contains the prefix of s2 at position p, and the
* prefix has the maximum length n."
* - n < String.length s2 ==> p+n = String.length s1
* "If the prefix is a proper prefix, it occurs at the end of s1"
*)
assert(String.length s2 > 0);
let l1 = min (String.length s1) (pos+len) in
let l2 = String.length s2 in
let s2c0 = s2.[0] in
let rec check_rec p k =
k >= l2 || p+k >= l1 || (s1.[p+k] = s2.[k] && check_rec p (k+1)) in
let rec search_rec p =
if p >= l1 then raise Not_found;
let p' = String.index_from s1 p s2c0 in (* or Not_found *)
if p' >= l1 then raise Not_found;
if check_rec p' 0 then
p'
else
search_rec (p'+1)
in
search_rec pos
;;
*)classsub_stream?len?delimiterin_stream:in_obj_stream=object(self)vals=(in_stream:in_obj_stream)valmutables_winlen =0val mutables_del=None(* initialized below *)vals_len=lenvalmutables_underrun=falseinheritinput_methods(in_stream#window)initializer(matchdelimiterwithSome""->invalid_arg"new Netstream.sub_stream";|Somed->s_del<-Some(d,Netaux.KMP.make_patternd)|None->s_del<-None);(matchs_lenwithSomel->ifl<0theninvalid_arg"newNetstream.sub_stream";|None->());tryself#want_minimum()withBuffer_underrun->s_underrun<-truemethodblock_size=s#block_sizemethodwindow=ifs_closedthenraiseNetchannels.Closed_channel;s_netbufmethodwindow_length=ifs_closedthenraiseNetchannels.Closed_channel;s_winlenmethodwindow_at_eof=ifs_closedthenraiseNetchannels.Closed_channel;s_at_eofmethodprivatecompute_winlen()=(* sets [s_winlen], [s_at_eof], and returns whether the current window
* is "ambigous" (it is not clear if the stream does end or does not
* end)
*)letambigous=reffalseinletw=s#windowinletwlen=s#window_lengthinletweof =s#window_at_eofinbeginmatchs_delwithNone->s_winlen<-wlen;s_at_eof<-weof;|Some(d,pat)->letp=Netaux.KMP.find_patternpat~len:wlen(Netbuffer.unsafe_buffer w)inifp>=wlenthenbegin(* Delimiter d does not occur in the buffer *)s_winlen<-wlen;s_at_eof<-weof;endelseif(p+String.lengthd)>wlenthenbegin(* Case:prefix is a proper prefix *)ambigous:=notweof;s_winlen<-wlen;s_at_eof<-weof;endelsebegin(* Case: [d] occurs in the window *)s_winlen<-p;s_at_eof<-true;endend;beginmatchs_lenwithNone->()|Somel->ifl-s_pos<s_winlenthenbeginambigous:=false;s_winlen<-l-s_pos;s_at_eof<-true;endend;!ambigousmethodwant_another_block()=ifs_closedthenraiseNetchannels.Closed_channel;s#want_another_block();(* may raise Buffer_underrun *)whileself#compute_winlen()dos#want_another_block();(* may raise Buffer_underrun *)donemethodwantn=ifs_closedthenraiseNetchannels.Closed_channel;whilenots_at_eof&&s_winlen<ndoself#want_another_block()(* may raise Buffer_underrun *)donemethodprivatewant_minimum()=ifself#compute_winlen()thenself#want_another_block();(* may raise Buffer_underrun *)self#wants#block_sizemethodskiplen=ifs_closedthenraiseNetchannels.Closed_channel;letrecreadlen=iflen>0thenbeginlet k=mins_winlenlenins#skipk;(* may raise Buffer_underrun *)s_pos<-s_pos+k;self#want_minimum();(*may raise Buffer_underrun *)ifk>0thenread(len-k)endinread lenmethodinputbufposlen=ifs_closedthenraiseNetchannels.Closed_channel;ifs_underrunthen(self#want_minimum();(* may raise Buffer_underrun *)s_underrun<-false;);(* Assertion: Either window length >= minimum, or eof *)letlen'=minlens_winleninNetbuffer.blit s_netbuf 0bufposlen';s#skiplen';(* never raises Buffer_underrun *)s_pos<-s_pos+len';(tryself#want_minimum();withBuffer_underrun->s_underrun<-true);iflen'=0&&len>0thenraiseEnd_of_file;len'methodclose_in()=ifnots_closedthen(s#close_in();s_closed<-true;)endletprint_in_obj_streamfmts=Format.fprintffmt"<NETSTREAM pos_in:%d window_length:%d eof=%b>"s#pos_ins#window_lengths#window_at_eof;;