123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324(* This file is part of Lwt, released under the MIT license. See LICENSE.md for
details, or visit https://github.com/ocsigen/lwt/blob/master/LICENSE.md. *)openLwt.SyntaxopenLwt.Infixtype+'anode=Nil|Consof'a*'atand'at=unit->'anodeLwt.tletreturn_nil=Lwt.returnNilletempty:'at=fun()->return_nilletreturn(x:'a):'at=fun()->Lwt.return(Cons(x,empty))letreturn_lwt(x:'aLwt.t):'at=fun()->let+x=xinCons(x,empty)letconsxt()=Lwt.return(Cons(x,t))letcons_lwtxt()=let+x=xinCons(x,t)(* A note on recursing through the seqs:
When traversing a seq, the first time we evaluate a suspended node we are
on the left of the first bind (>>=). In that case, we use apply to capture
exceptions into promise rejection.
This is only needed on the first iteration because we are within a callback
passed to Lwt on the right-hand side of a bind after that.
Throughout this file we use the same code pattern to achieve this: we
shadow the recursive traversal function with an identical-but-for-the-apply
non-recursive copy. *)letrecappendseq1seq2()=seq1()>>=function|Nil->seq2()|Cons(x,next)->Lwt.return(Cons(x,appendnextseq2))letappendseq1seq2()=Lwt.applyseq1()>>=function|Nil->seq2()|Cons(x,next)->Lwt.return(Cons(x,appendnextseq2))letrecmapfseq()=seq()>|=function|Nil->Nil|Cons(x,next)->letx=fxinCons(x,mapfnext)letmapfseq()=Lwt.applyseq()>|=function|Nil->Nil|Cons(x,next)->letx=fxinCons(x,mapfnext)letrecmap_sfseq()=seq()>>=function|Nil->return_nil|Cons(x,next)->let+x=fxinCons(x,map_sfnext)letmap_sfseq()=Lwt.applyseq()>>=function|Nil->return_nil|Cons(x,next)->let+x=fxinCons(x,map_sfnext)letrecfilter_mapfseq()=seq()>>=function|Nil->return_nil|Cons(x,next)->(letx=fxinmatchxwith|None->filter_mapfnext()|Somey->Lwt.return(Cons(y,filter_mapfnext)))letfilter_mapfseq()=Lwt.applyseq()>>=function|Nil->return_nil|Cons(x,next)->(letx=fxinmatchxwith|None->filter_mapfnext()|Somey->Lwt.return(Cons(y,filter_mapfnext)))letrecfilter_map_sfseq()=seq()>>=function|Nil->return_nil|Cons(x,next)->(let*x=fxinmatchxwith|None->filter_map_sfnext()|Somey->Lwt.return(Cons(y,filter_map_sfnext)))letfilter_map_sfseq()=Lwt.applyseq()>>=function|Nil->return_nil|Cons(x,next)->(let*x=fxinmatchxwith|None->filter_map_sfnext()|Somey->Lwt.return(Cons(y,filter_map_sfnext)))letrecfilterfseq()=seq()>>=function|Nil->return_nil|Cons(x,next)->letok=fxinifokthenLwt.return(Cons(x,filterfnext))elsefilterfnext()letfilterfseq()=Lwt.applyseq()>>=function|Nil->return_nil|Cons(x,next)->letok=fxinifokthenLwt.return(Cons(x,filterfnext))elsefilterfnext()letrecfilter_sfseq()=seq()>>=function|Nil->return_nil|Cons(x,next)->let*ok=fxinifokthenLwt.return(Cons(x,filter_sfnext))elsefilter_sfnext()letfilter_sfseq()=Lwt.applyseq()>>=function|Nil->return_nil|Cons(x,next)->let*ok=fxinifokthenLwt.return(Cons(x,filter_sfnext))elsefilter_sfnext()letrecflat_mapfseq()=seq()>>=function|Nil->return_nil|Cons(x,next)->flat_map_appf(fx)next()(* this is [append seq (flat_map f tail)] *)andflat_map_appfseqtail()=seq()>>=function|Nil->flat_mapftail()|Cons(x,next)->Lwt.return(Cons(x,flat_map_appfnexttail))letflat_mapfseq()=Lwt.applyseq()>>=function|Nil->return_nil|Cons(x,next)->flat_map_appf(fx)next()letfold_leftfaccseq=letrecauxfaccseq=seq()>>=function|Nil->Lwt.returnacc|Cons(x,next)->letacc=faccxinauxfaccnextinletauxfaccseq=Lwt.applyseq()>>=function|Nil->Lwt.returnacc|Cons(x,next)->letacc=faccxinauxfaccnextinauxfaccseqletfold_left_sfaccseq=letrecauxfaccseq=seq()>>=function|Nil->Lwt.returnacc|Cons(x,next)->let*acc=faccxinauxfaccnextinletauxfaccseq=Lwt.applyseq()>>=function|Nil->Lwt.returnacc|Cons(x,next)->let*acc=faccxinauxfaccnextinauxfaccseqletiterfseq=letrecauxseq=seq()>>=function|Nil->Lwt.return_unit|Cons(x,next)->fx;auxnextinletauxseq=Lwt.applyseq()>>=function|Nil->Lwt.return_unit|Cons(x,next)->fx;auxnextinauxseqletiter_sfseq=letrecauxseq=seq()>>=function|Nil->Lwt.return_unit|Cons(x,next)->let*()=fxinauxnextinletauxseq=Lwt.applyseq()>>=function|Nil->Lwt.return_unit|Cons(x,next)->let*()=fxinauxnextinauxseqletiter_pfseq=letrecauxaccseq=seq()>>=function|Nil->Lwt.joinacc|Cons(x,next)->letp=fxinaux(p::acc)nextinletauxaccseq=Lwt.applyseq()>>=function|Nil->Lwt.joinacc|Cons(x,next)->letp=fxinaux(p::acc)nextinaux[]seqletiter_n?(max_concurrency=1)fseq=beginifmax_concurrency<=0thenletmessage=Printf.sprintf"Lwt_seq.iter_n: max_concurrency must be > 0, %d given"max_concurrencyininvalid_argmessageend;letreclooprunningavailableseq=beginifavailable>0then(Lwt.return(running,available))else(Lwt.nchoose_splitrunning>>=fun(complete,running)->Lwt.return(running,available+List.lengthcomplete))end>>=fun(running,available)->seq()>>=function|Nil->Lwt.joinrunning|Cons(elt,seq)->loop(felt::running)(predavailable)seqin(* because the recursion is more complicated here, we apply the seq directly at
the call-site instead *)loop[]max_concurrency(fun()->Lwt.applyseq())letrecunfoldfu()=matchfuwith|None->return_nil|Some(x,u')->Lwt.return(Cons(x,unfoldfu'))|exceptionexcwhenLwt.Exception_filter.runexc->Lwt.failexcletrecunfold_lwtfu()=let*x=fuinmatchxwith|None->return_nil|Some(x,u')->Lwt.return(Cons(x,unfold_lwtfu'))letunfold_lwtfu()=let*x=Lwt.applyfuinmatchxwith|None->return_nil|Some(x,u')->Lwt.return(Cons(x,unfold_lwtfu'))letrecof_list=function|[]->empty|h::t->consh(of_listt)letrecto_listseq=seq()>>=function|Nil->Lwt.return_nil|Cons(x,next)->let+l=to_listnextinx::lletto_listseq=Lwt.applyseq()>>=function|Nil->Lwt.return_nil|Cons(x,next)->let+l=to_listnextinx::lletrecof_seqseq()=matchseq()with|Seq.Nil->return_nil|Seq.Cons(x,next)->Lwt.return(Cons(x,(of_seqnext)))|exceptionexnwhenLwt.Exception_filter.runexn->Lwt.failexnletrecof_seq_lwt(seq:'aLwt.tSeq.t):'at=fun()->matchseq()with|Seq.Nil->return_nil|Seq.Cons(x,next)->let+x=xinletnext=of_seq_lwtnextinCons(x,next)letof_seq_lwt(seq:'aLwt.tSeq.t):'at=fun()->matchseq()with|Seq.Nil->return_nil|Seq.Cons(x,next)->let+x=xinletnext=of_seq_lwtnextinCons(x,next)|exceptionexcwhenLwt.Exception_filter.runexc->Lwt.failexc