123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266openCore_kernelopenDeferred_stdmoduleDeferred=Deferred1includeTail.Streamletfirst_exnt=match%mapnexttwith|Nil->raise_s[%message"Stream.first of empty stream"]|Cons(x,_)->x;;letfold't~init~f=Deferred.create(funresult->letreclooptb=upon(nextt)(function|Nil->Ivar.fillresultb|Cons(v,t)->upon(fbv)(loopt))inlooptinit);;(* [fold] is implemented to avoid per-stream-element deferred overhead in the case when
multiple stream elements are available simultaneously. *)letfoldt~init~f=Deferred.create(funresult->letreclooptb=matchDeferred.peek(nextt)with|None->upon(nextt)(funnext->loop_nextnextb)|Somenext->loop_nextnextbandloop_nextnextb=matchnextwith|Nil->Ivar.fillresultb|Cons(v,t)->loopt(fbv)inlooptinit);;letlengtht=foldt~init:0~f:(funn_->n+1)letiter't~f=fold't~init:()~f:(fun()v->fv)letclosedt=matchDeferred.peek(nextt)with|SomeNil->return()|_->iter't~f:(fun_->return());;letitert~f=don't_wait_for(iter't~f:(funa->fa;return()));;letcreatef=lettail=Tail.create()in(* collect before calling [f], in case [f] immediately extends. *)lett=Tail.collecttailinftail;t;;letunfoldb~f=create(funtail->letrecloopb=upon(fb)(function|None->Tail.close_exntail|Some(a,b)->Tail.extendtaila;loopb)inloopb);;letof_listl=create(funtail->List.iterl~f:(funx->Tail.extendtailx);Tail.close_exntail);;letto_lists=fold's~init:[]~f:(funba->return(a::b))>>|List.revletcopy_to_tailttail=iter't~f:(funa->return(Tail.extendtaila))letappendt1t2=create(funtail->upon(copy_to_tailt1tail)(fun()->upon(copy_to_tailt2tail)(fun()->Tail.close_exntail)));;letconcatt=create(funtail->upon(iter't~f:(funt->copy_to_tailttail))(fun()->Tail.close_exntail));;letfilter't~f=create(funtail->upon(iter't~f:(funv->match%mapfvwith|false->()|true->Tail.extendtailv))(fun()->Tail.close_exntail));;letfilter_deprecatedt~f=filter't~f:(funa->return(fa))letfilter_map't~f=create(funtail->upon(iter't~f:(funv->match%mapfvwith|None->()|Somev->Tail.extendtailv))(fun()->Tail.close_exntail));;letfilter_map_deprecatedt~f=filter_map't~f:(funa->return(fa))letmap't~f=create(funtail->upon(iter't~f:(funv->fv>>|Tail.extendtail))(fun()->Tail.close_exntail));;letmapt~f=map't~f:(funa->return(fa))letfirst_nsn=create(funtail->letrecloopsn=ifn=0thenTail.close_exntailelseupon(nexts)(function|Nil->Tail.close_exntail|Cons(x,t)->Tail.extendtailx;loopt(n-1))inloopsn);;letavailable_nowt=letreclooptac=matchDeferred.peek(nextt)with|None|SomeNil->List.revac,t|Some(Cons(x,t))->loopt(x::ac)inloopt[];;letsplit?(stop=Deferred.never())?(f=fun_->`Continue)t=letreason_for_stopping=Ivar.create()inletprefix=Tail.create()inletfinishv=Tail.close_exnprefix;Ivar.fillreason_for_stoppingvinletrecloopt=choose[choicestop(fun()->`Stopped);choice(nextt)(funo->`Nexto)]>>>function|`Stopped->finish(`Stoppedt)|`Nexto->(matchowith|Nil->finish`End_of_stream|Cons(a,t)->(matchfawith|`Continue->Tail.extendprefixa;loopt|`Foundb->finish(`Found(b,t))))inloopt;Tail.collectprefix,Ivar.readreason_for_stopping;;letfindt~f=let_,found=splitt~f:(funa->iffathen`Foundaelse`Continue)inmatch%mapfoundwith|`Stopped_->assertfalse|(`End_of_stream|`Found_)asx->x;;letungroupt=create(funtail->upon(iter't~f:(funl->List.iterl~f:(funx->Tail.extendtailx);return()))(fun()->Tail.close_exntail));;letinterleavets=create(funtail->(* The interleaved stream should be closed when the outer stream and all of
the inner streams have been closed. Keep a count of the number of open
streams and close the interleaved stream when that count becomes
zero. *)letnum_open=ref1in(* 1 for the outer stream that is open *)letclose()=num_open:=!num_open-1;if!num_open=0thenTail.close_exntailinletouter_closed=iter'ts~f:(funt->num_open:=!num_open+1;upon(copy_to_tailttail)close;return())inuponouter_closedclose);;lettake_untiltd=create(funtail->letrecloopt=upon(choose[choiced(fun()->`Stop);choice(nextt)(funz->`Nextz)])(function|`Stop|`NextNil->Tail.close_exntail|`Next(Cons(x,t))->Tail.extendtailx;loopt)inloopt);;letiter_durably't~f=Deferred.create(funresult->letrecloopt=nextt>>>function|Nil->Ivar.fillresult()|Cons(x,t)->Monitor.try_with~rest:`Raise(fun()->fx)>>>funz->loopt;(matchzwith|Ok()->()|Errore->Monitor.send_exn(Monitor.current())e)inloopt);;letiter_durably_report_endt~f=Deferred.create(funresult->letrecloopt=nextt>>>function|Nil->Ivar.fillresult()|Cons(x,t)->(* We immediately call [loop], thus making the iter durable. Any exceptions
raised by [f] will not prevent the loop from continuing, and will go to the
monitor of whomever called [iter_durably_report_end]. *)loopt;fxinloopt);;letiter_durablyt~f=don't_wait_for(iter_durably_report_endt~f)letof_funf=unfold()~f:(fun()->let%mapa=f()inSome(a,()));;