1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276openCore_kernelopenImportopenDeferred_stdmoduleStream=Async_streamletshow_debug_messages=reffalseletcheck_invariant=reffalsemoduleFlushed_result=structtypet=[`Ok|`Reader_closed][@@derivingcompare,sexp_of]letequal=[%compare.equal:t]letcombine(l:tDeferred.tlist)=let%mapl=Deferred.alllinmatchList.meml`Reader_closed~equalwith|true->`Reader_closed|false->`Ok;;end(* A [Consumer.t] acts as the monitor of some process that reads values from a pipe and
processes them, allowing that process:
- to communicate that it has taken responsibility for the values
- to signal when it has finished with the values to interested parties (via
[downstream_flushed])
It is used in two steps:
1. calling [Consumer.start] at the point where the consumer takes values out of the
Pipe via [read] or [read'].
2. calling [Consumer.values_sent_downstream].
By calling [values_sent_downstream] one asserts that the [downstream_flushed] function
supplied to [create] will now wait for this value.
If no [Consumer.t] is supplied when a value is read then the value is defined to be
flushed at that time. *)moduleConsumer:sigtypet[@@derivingsexp_of]includeInvariant.Swithtypet:=tvalcreate:pipe_id:int->downstream_flushed:(unit->Flushed_result.tDeferred.t)->tvalpipe_id:t->intvalstart:t->unitvalvalues_sent_downstream:t->unitvalvalues_sent_downstream_and_flushed:t->Flushed_result.tDeferred.tend=structtypet={pipe_id:int;(* [values_read] reflects whether values the consumer has read from the pipe have been
sent downstream or if not, holds an ivar that is to be filled when they are. *)mutablevalues_read:[`Have_been_sent_downstream|`Have_not_been_sent_downstreamofunitIvar.t];(* [downstream_flushed ()] returns when all prior values that the consumer has
passed downstream have been flushed all the way down the chain of pipes. *)downstream_flushed:unit->Flushed_result.tDeferred.t}[@@derivingfields,sexp_of]letinvariantt:unit=tryletcheckffield=f(Field.getfieldt)inFields.iter~pipe_id:ignore~values_read:(check(function|`Have_been_sent_downstream->()|`Have_not_been_sent_downstreamivar->assert(Ivar.is_emptyivar)))~downstream_flushed:ignorewith|exn->raise_s[%message"Pipe.Consumer.invariant failed"(exn:exn)~pipe:(t:t)];;letcreate~pipe_id~downstream_flushed={pipe_id;values_read=`Have_been_sent_downstream;downstream_flushed};;letstartt=matcht.values_readwith|`Have_not_been_sent_downstream_->()|`Have_been_sent_downstream->t.values_read<-`Have_not_been_sent_downstream(Ivar.create());;letvalues_sent_downstreamt=matcht.values_readwith|`Have_been_sent_downstream->()|`Have_not_been_sent_downstreamivar->Ivar.fillivar();t.values_read<-`Have_been_sent_downstream;;letvalues_sent_downstream_and_flushedt=matcht.values_readwith|`Have_been_sent_downstream->t.downstream_flushed()|`Have_not_been_sent_downstreamwhen_sent_downstream->let%bind()=Ivar.readwhen_sent_downstreamint.downstream_flushed();;endmoduleBlocked_read=struct(* A [Blocked_read.t] represents a blocked read attempt. If someone reads from an empty
pipe, they enqueue a [Blocked_read.t] in the queue of [blocked_reads]. Later, when
values are written to a pipe, that will cause some number of blocked reads to be
filled, first come first serve. The blocked-read constructor specifies how many
values a read should consume from the pipe when it gets its turn.
If a pipe is closed, then all blocked reads will be filled with [`Eof]. *)type'awants=|Zeroof[`Eof|`Ok]Ivar.t|Oneof[`Eof|`Okof'a]Ivar.t|At_mostofint*[`Eof|`Okof'aQueue.t]Ivar.t[@@derivingsexp_of]type'at={wants:'awants;consumer:Consumer.toption}[@@derivingfields,sexp_of]letinvariantt:unit=tryletcheckffield=f(Field.getfieldt)inFields.iter~wants:(check(function|Zero_|One_->()|At_most(i,_)->assert(i>0)))~consumer:(check(function|None->()|Someconsumer->Consumer.invariantconsumer))with|exn->raise_s[%message"Pipe.Blocked_read.invariant failed"(exn:exn)~pipe:(t:_t)];;letcreatewantsconsumer={wants;consumer}letis_emptyt=matcht.wantswith|Zeroi->Ivar.is_emptyi|Onei->Ivar.is_emptyi|At_most(_,i)->Ivar.is_emptyi;;letfill_with_eoft=matcht.wantswith|Zeroi->Ivar.filli`Eof|Onei->Ivar.filli`Eof|At_most(_,i)->Ivar.filli`Eof;;endmoduleBlocked_flush=struct(* A [Blocked_flush.t] represents a blocked flush operation, which can be enabled by a
future read. If someone does [flushed p] on a pipe, that blocks until everything
that's currently in the pipe at that point has drained out of the pipe. When we call
[flushed], it records the total amount of data that has been written so far in
[fill_when_num_values_read]. We fill the [Flush.t] with [`Ok] when this amount of
data has been read from the pipe.
A [Blocked_flush.t] can also be filled with [`Reader_closed], which happens when the
reader end of the pipe is closed, and we are thus sure that the unread elements
preceding the flush will never be read. *)typet={fill_when_num_values_read:int;ready:[`Ok|`Reader_closed]Ivar.t}[@@derivingfields,sexp_of]letfilltv=Ivar.fillt.readyvendtype('a,'phantom)t={(* [id] is an integer used to distinguish pipes when debugging. *)id:intSexp_hidden_in_test.t;(* [info] is user-provided arbitrary sexp, for debugging purposes. *)mutableinfo:Sexp.toption[@sexp.option];(* [buffer] holds values written to the pipe that have not yet been read. *)mutablebuffer:'aQueue.t;(* [size_budget] governs pushback on writers to the pipe.
There is *no* invariant that [Queue.length buffer <= size_budget]. There is no
hard upper bound on the number of elements that can be stuffed into the [buffer].
This is due to the way we handle writes. When we do a write, all of the values
written are immediately enqueued into [buffer]. After the write, if [Queue.length
buffer <= t.size_budget], then the writer will be notified to continue writing.
After the write, if [length t > t.size_budget], then the write will block until the
pipe is under budget. *)mutablesize_budget:int;(* [pushback] is used to give feedback to writers about whether they should write to
the pipe. [pushback] is full iff [length t <= t.size_budget || is_closed t]. *)mutablepushback:unitIvar.t;(* [num_values_read] keeps track of the total number of values that have been read
from the pipe. We do not have to worry about overflow in [num_values_read]. You'd
need to write 2^62 elements to the pipe, which would take about 146 years, at a
flow rate of 1 size-unit/nanosecond. *)mutablenum_values_read:int;(* [blocked_flushes] holds flushes whose preceding elements have not been completely
read. For each blocked flush, the number of elements that need to be read from the
pipe in order to fill the flush is :
fill_when_num_values_read - num_values_read
Keeping the data in this form allows us to change a single field(num_values_read)
when we consume values instead of having to iterate over the whole queue of
flushes. *)blocked_flushes:Blocked_flush.tQueue.t;(* [blocked_reads] holds reads that are waiting on data to be written to the pipe. *)blocked_reads:'aBlocked_read.tQueue.t;(* [closed] is filled when we close the write end of the pipe. *)closed:unitIvar.t;(* [read_closed] is filled when we close the read end of the pipe. *)read_closed:unitIvar.t;(* [consumers] is a list of all consumers that may be handling values read from the
pipe. *)mutableconsumers:Consumer.tlist;(* [upstream_flusheds] has a function for each pipe immediately upstream of this one.
That function walks to the head(s) of the upstream pipe, and calls
[downstream_flushed] on the head(s). See the definition of [upstream_flushed]
below. *)upstream_flusheds:(unit->Flushed_result.tDeferred.t)Bag.t}[@@derivingfields,sexp_of]type('a,'phantom)pipe=('a,'phantom)t[@@derivingsexp_of]lethasht=Hashtbl.hasht.idletequal(t1:(_,_)t)t2=phys_equalt1t2letcomparet1t2=Int.comparet1.idt2.idletis_closedt=Ivar.is_fullt.closedletis_read_closedt=Ivar.is_fullt.read_closedletclosedt=Ivar.readt.closedletpushbackt=Ivar.readt.pushbackletlengtht=Queue.lengtht.bufferletis_emptyt=lengtht=0letinvariantt:unit=tryletcheckffield=f(Field.getfieldt)inFields.iter~id:ignore~info:ignore~buffer:ignore~size_budget:(check(funsize_budget->assert(size_budget>=0)))~pushback:(check(funpushback->assert(Bool.equal(Ivar.is_fullpushback)(lengtht<=t.size_budget||is_closedt))))~num_values_read:ignore~blocked_flushes:(check(funblocked_flushes->Queue.iterblocked_flushes~f:(fun(f:Blocked_flush.t)->assert(f.fill_when_num_values_read>t.num_values_read));assert(List.is_sorted~compare:Int.compare(List.map(Queue.to_listblocked_flushes)~f:Blocked_flush.fill_when_num_values_read));ifis_emptytthenassert(Queue.is_emptyblocked_flushes)))~blocked_reads:(check(funblocked_reads->(* If data is available, no one is waiting for it. This would need to change if
we ever implement [read_exactly] as an atomic operation. *)ifnot(is_emptyt)thenassert(Queue.is_emptyblocked_reads);Queue.iterblocked_reads~f:(funread->Blocked_read.invariantread;assert(Blocked_read.is_emptyread));(* You never block trying to read a closed pipe. *)ifis_closedtthenassert(Queue.is_emptyblocked_reads)))~closed:ignore~read_closed:ignore~consumers:(check(funl->List.iterl~f:(funconsumer->Consumer.invariantconsumer;assert(Consumer.pipe_idconsumer=t.id))))~upstream_flusheds:ignorewith|exn->raise_s[%message"Pipe.invariant failed"(exn:exn)~pipe:(t:(_,_)t)];;moduleReader=structtypephantom[@@derivingsexp_of]type'at=('a,phantom)pipe[@@derivingsexp_of]letinvariant=invariantendmoduleWriter=structtypephantom[@@derivingsexp_of]type'at=('a,phantom)pipe[@@derivingsexp_of]letinvariant=invariantendletid_ref=ref0letcreate_internal~info~initial_buffer=incrid_ref;lett={id=!id_ref;info;closed=Ivar.create();read_closed=Ivar.create();size_budget=0;pushback=Ivar.create();buffer=initial_buffer;num_values_read=0;blocked_flushes=Queue.create();blocked_reads=Queue.create();consumers=[];upstream_flusheds=Bag.create()}int;;letcreate?info()=lett=create_internal~info~initial_buffer:(Queue.create())in(* initially, the pipe does not pushback *)Ivar.fillt.pushback();if!check_invarianttheninvariantt;t,t;;letupdate_pushbackt=iflengtht<=t.size_budget||is_closedtthenIvar.fill_if_emptyt.pushback()elseifIvar.is_fullt.pushbackthent.pushback<-Ivar.create();;letcloset=if!show_debug_messagestheneprints"close"t[%sexp_of:(_,_)t];if!check_invarianttheninvariantt;ifnot(is_closedt)then(Ivar.fillt.closed();ifis_emptytthen(Queue.itert.blocked_reads~f:Blocked_read.fill_with_eof;Queue.cleart.blocked_reads);update_pushbackt);;letclose_readt=if!show_debug_messagestheneprints"close_read"t[%sexp_of:(_,_)t];if!check_invarianttheninvariantt;ifnot(is_read_closedt)then(Ivar.fillt.read_closed();Queue.itert.blocked_flushes~f:(funflush->Blocked_flush.fillflush`Reader_closed);Queue.cleart.blocked_flushes;Queue.cleart.buffer;update_pushbackt;(* we just cleared the buffer, so may need to fill [t.pushback] *)closet);;letcreate_reader_not_close_on_exceptionf=letr,w=create()inupon(fw)(fun()->closew);r;;letcreate_reader~close_on_exceptionf=ifnotclose_on_exceptionthencreate_reader_not_close_on_exceptionfelse(letr,w=create()indon't_wait_for(Monitor.protect(fun()->fw)~finally:(fun()->closew;return()));r);;letcreate_writerf=letr,w=create()indon't_wait_for(Monitor.protect(fun()->fr)~finally:(fun()->close_readr;return()));w;;letvalues_were_readtconsumer=Option.iterconsumer~f:Consumer.start;letrecloop()=matchQueue.peekt.blocked_flusheswith|None->()|Someflush->ift.num_values_read>=flush.fill_when_num_values_readthen(ignore(Queue.dequeue_exnt.blocked_flushes:Blocked_flush.t);(matchconsumerwith|None->Blocked_flush.fillflush`Ok|Someconsumer->upon(Consumer.values_sent_downstream_and_flushedconsumer)(funflush_result->Blocked_flush.fillflushflush_result));loop())inloop();;(* [consume_all t] reads all the elements in [t]. *)letconsume_alltconsumer=letresult=t.bufferint.buffer<-Queue.create();t.num_values_read<-t.num_values_read+Queue.lengthresult;values_were_readtconsumer;update_pushbackt;result;;letconsume_onetconsumer=assert(lengtht>=1);letresult=Queue.dequeue_exnt.bufferint.num_values_read<-t.num_values_read+1;values_were_readtconsumer;update_pushbackt;result;;letconsumet~max_queue_lengthconsumer=assert(max_queue_length>=0);ifmax_queue_length>=lengthtthenconsume_alltconsumerelse(t.num_values_read<-t.num_values_read+max_queue_length;values_were_readtconsumer;letresult=Queue.create~capacity:max_queue_length()inQueue.blit_transfer~src:t.buffer~dst:result~len:max_queue_length();update_pushbackt;result);;letset_size_budgettsize_budget=ifsize_budget<0thenraise_s[%message"negative size_budget"(size_budget:int)];t.size_budget<-size_budget;update_pushbackt;;letfill_blocked_readst=while(not(Queue.is_emptyt.blocked_reads))&¬(is_emptyt)doletblocked_read=Queue.dequeue_exnt.blocked_readsinletconsumer=blocked_read.consumerinmatchblocked_read.wantswith|Zeroivar->Ivar.fillivar`Ok|Oneivar->Ivar.fillivar(`Ok(consume_onetconsumer))|At_most(max_queue_length,ivar)->Ivar.fillivar(`Ok(consumet~max_queue_lengthconsumer))done;;(* checks all invariants, calls a passed in f to handle a write, then updates reads and
pushback *)letstart_writet=if!show_debug_messagestheneprints"write"t[%sexp_of:(_,_)t];if!check_invarianttheninvariantt;ifis_closedtthenraise_s[%message"write to closed pipe"~pipe:(t:(_,_)t)];;letfinish_writet=fill_blocked_readst;update_pushbackt;;lettransfer_in_without_pushbackt~from=start_writet;Queue.blit_transfer~src:from~dst:t.buffer();finish_writet;;lettransfer_int~from=transfer_in_without_pushbackt~from;pushbackt;;letcopy_in_without_pushbackt~from=start_writet;Queue.iterfrom~f:(funx->Queue.enqueuet.bufferx);finish_writet;;(* [write'] is used internally *)letwrite'tq=transfer_int~from:qletwrite_without_pushbacktvalue=start_writet;Queue.enqueuet.buffervalue;finish_writet;;letwritetvalue=write_without_pushbacktvalue;pushbackt;;letwrite_when_readyt~f=let%map()=pushbacktinifis_closedtthen`Closedelse`Ok(f(funx->write_without_pushbacktx));;letwrite_if_opentx=ifnot(is_closedt)thenwritetxelsereturn()letwrite_without_pushback_if_opentx=ifnot(is_closedt)thenwrite_without_pushbacktx;;letensure_consumer_matches?consumert=matchconsumerwith|None->()|Someconsumer->ift.id<>Consumer.pipe_idconsumerthenraise_s[%message"Attempt to use consumer with wrong pipe"(consumer:Consumer.t)~pipe:(t:_Reader.t)];;letstart_read?consumertlabel=if!show_debug_messagestheneprintslabelt[%sexp_of:(_,_)t];if!check_invarianttheninvariantt;ensure_consumer_matchest?consumer;;letgen_read_now?consumertconsume=start_readt"read_now"?consumer;ifis_emptytthenifis_closedtthen`Eofelse`Nothing_availableelse(assert(Queue.is_emptyt.blocked_reads);(* from [invariant] and [not (is_empty t)] *)`Ok(consumetconsumer));;letget_max_queue_length~max_queue_length=matchmax_queue_lengthwith|None->Int.max_value|Somemax_queue_length->ifmax_queue_length<=0thenraise_s[%message"max_queue_length <= 0"(max_queue_length:int)];max_queue_length;;letread_now'?consumer?max_queue_lengtht=letmax_queue_length=get_max_queue_length~max_queue_lengthingen_read_nowt?consumer(funtconsumer->consumet~max_queue_lengthconsumer);;letread_now?consumert=gen_read_nowt?consumerconsume_oneletpeekt=Queue.peekt.bufferletcleart=matchread_now'twith|`Eof|`Nothing_available|`Ok_->();;letread'?consumer?max_queue_lengtht=letmax_queue_length=get_max_queue_length~max_queue_lengthinstart_readt"read'"?consumer;matchread_now't?consumer~max_queue_lengthwith|(`Ok_|`Eof)asr->returnr|`Nothing_available->Deferred.create(funivar->Queue.enqueuet.blocked_reads(Blocked_read.create(At_most(max_queue_length,ivar))consumer));;letread?consumert=start_readt"read"?consumer;ifis_emptytthenifis_closedtthenreturn`EofelseDeferred.create(funivar->Queue.enqueuet.blocked_reads(Blocked_read.(create(Oneivar))consumer))else(assert(Queue.is_emptyt.blocked_reads);return(`Ok(consume_onetconsumer)));;letvalues_availablet=start_readt"values_available";ifnot(is_emptyt)thenreturn`Okelseifis_closedtthenreturn`Eofelse(matchQueue.lastt.blocked_readswith|Some{consumer=None;wants=Zeroivar}->(* This case is an optimization for multiple calls to [values_available] in
sequence. It causes them to all share the same ivar, rather than allocate
an ivar per call. *)Ivar.readivar|_->Deferred.create(funivar->Queue.enqueuet.blocked_reads(Blocked_read.(create(Zeroivar))None)));;letread_choicet=choice(values_availablet)(fun(_:[`Ok|`Eof])->read_nowt)letread_choice_single_consumer_exnthere=Deferred.Choice.map(read_choicet)~f:(function|(`Ok_|`Eof)asx->x|`Nothing_available->raise_s[%message"Pipe.read_choice_single_consumer_exn: choice was enabled but pipe is \
empty; this is likely due to a race condition with one or more other \
consumers"(here:Source_code_position.t)]);;(* [read_exactly t ~num_values] loops, getting you all [num_values] items, up
to EOF. *)letread_exactly?consumert~num_values=start_readt"read_exactly"?consumer;ifnum_values<=0thenraise_s[%message"Pipe.read_exactly got num_values <= 0"(num_values:int)];Deferred.create(funfinish->letresult=Queue.create()inletrecloop()=letalready_read=Queue.lengthresultinassert(already_read<=num_values);ifalready_read=num_valuesthenIvar.fillfinish(`Exactlyresult)elseread'?consumert~max_queue_length:(num_values-already_read)>>>function|`Eof->Ivar.fillfinish(ifalready_read=0then`Eofelse`Fewerresult)|`Okq->Queue.blit_transfer~src:q~dst:result();loop()inloop());;letdownstream_flushedt=ifis_emptytthenifList.is_emptyt.consumersthenreturn`OkelseFlushed_result.combine(List.mapt.consumers~f:Consumer.values_sent_downstream_and_flushed)else(* [t] might be closed. But the read end can't be closed, because if it were, then
[t] would be empty. If the write end is closed but not the read end, then we want
to enqueue a blocked flush because the enqueued values may get read. *)Deferred.create(funready->Queue.enqueuet.blocked_flushes{fill_when_num_values_read=t.num_values_read+lengtht;ready});;(* In practice, along with [Link.create] and [add_upstream_flushed], [upstream_flushed]
traverses the graph of linked pipes up to the heads and then calls [downstream_flushed]
on them. *)letupstream_flushedt=ifBag.is_emptyt.upstream_flushedsthendownstream_flushedtelseBag.to_listt.upstream_flusheds|>List.map~f:(funf->f())|>Flushed_result.combine;;letadd_upstream_flushedtupstream_flushed=Bag.addt.upstream_flushedsupstream_flushed;;letadd_consumert~downstream_flushed=letconsumer=Consumer.create~pipe_id:t.id~downstream_flushedint.consumers<-consumer::t.consumers;consumer;;(* A [Link.t] links flushing of two pipes together. *)moduleLink:sigtypetvalcreate:upstream:(_,_)pipe->downstream:(_,_)pipe->tvalconsumer:t->Consumer.t(* [unlink_upstream] removes downstream's reference to upstream. *)valunlink_upstream:t->unitend=structtype('a,'b)unpacked={downstream:('a,'b)t;consumer:Consumer.t;upstream_flusheds_bag_elt:(unit->Flushed_result.tDeferred.t)Bag.Elt.t}typet=T:(_,_)unpacked->tletconsumer(Tt)=t.consumerletcreate~upstream~downstream=T{downstream;consumer=add_consumerupstream~downstream_flushed:(fun()->downstream_flusheddownstream);upstream_flusheds_bag_elt=add_upstream_flusheddownstream(fun()->upstream_flushedupstream)};;letunlink_upstream(Tt)=Bag.removet.downstream.upstream_flushedst.upstream_flusheds_bag_elt;;endmoduleFlushed=structtypet=|ConsumerofConsumer.t|When_value_processed|When_value_read[@@derivingsexp_of]endletfold_gen(read_now:?consumer:Consumer.t->_Reader.t->_)?(flushed=Flushed.When_value_read)t~init~f=letconsumer=matchflushedwith|When_value_read->None|Consumerconsumer->Someconsumer|When_value_processed->(* The fact that "no consumer" behaves different from "trivial consumer" is weird,
but that's how the consumer machinery works. *)Some(add_consumert~downstream_flushed:(fun()->return`Ok))inif!check_invarianttheninvariantt;ensure_consumer_matchest?consumer;Deferred.create(funfinished->(* We do [return () >>>] to ensure that [f] is only called asynchronously. *)return()>>>fun()->letrecloopb=matchread_nowt?consumerwith|`Eof->Ivar.fillfinishedb|`Okv->fbvcontinue|`Nothing_available->values_availablet>>>fun_->loopbandcontinueb=Option.iterconsumer~f:Consumer.values_sent_downstream;loopbinloopinit);;letfold'?flushed?max_queue_lengtht~init~f=fold_gen(read_now'?max_queue_length)?flushedt~init~f:(funbqloop->fbq>>>loop);;letfold?flushedt~init~f=fold_genread_now?flushedt~init~f:(funbaloop->fba>>>loop);;letfold_without_pushback?consumert~init~f=fold_genread_nowt~init~f:(funbaloop->loop(fba))?flushed:(matchconsumerwith|None->None|Somec->Some(Consumerc));;letwith_error_to_current_monitor?(continue_on_error=false)fa=ifnotcontinue_on_errorthenfaelse(match%mapMonitor.try_with(fun()->fa)with|Ok()->()|Errorexn->Monitor.send_exn(Monitor.current())(Monitor.extract_exnexn));;letiter'?continue_on_error?flushed?max_queue_lengtht~f=fold'?max_queue_length?flushedt~init:()~f:(fun()q->with_error_to_current_monitor?continue_on_errorfq);;letiter?continue_on_error?flushedt~f=fold_genread_now?flushedt~init:()~f:(fun()aloop->with_error_to_current_monitor?continue_on_errorfa>>>fun()->loop());;(* [iter_without_pushback] is a common case, so we implement it in an optimized manner,
rather than via [iter]. The implementation reads only one element at a time, so that
if [f] closes [t] or raises, no more elements will be read. *)letiter_without_pushback?consumer?(continue_on_error=false)?max_iterations_per_jobt~f=ensure_consumer_matchest?consumer;letmax_iterations_per_job=matchmax_iterations_per_jobwith|None->Int.max_value|Somemax_iterations_per_job->ifmax_iterations_per_job<=0thenraise_s[%message"iter_without_pushback got non-positive max_iterations_per_job"(max_iterations_per_job:int)];max_iterations_per_jobinletf=ifnotcontinue_on_errorthenfelsefuna->tryfawith|exn->Monitor.send_exn(Monitor.current())exninDeferred.create(funfinished->(* We do [return () >>>] to ensure that [f] is only called asynchronously. *)return()>>>fun()->letrecstart()=loop~remaining:max_iterations_per_jobandloop~remaining=ifremaining=0thenreturn()>>>fun()->start()else(matchread_nowt?consumerwith|`Eof->Ivar.fillfinished()|`Oka->fa;loop~remaining:(remaining-1)|`Nothing_available->values_availablet>>>fun_->start())instart());;letdraint=iter't~f:(fun_->return())letdrain_and_countt=fold't~init:0~f:(funsumq->return(sum+Queue.lengthq))letread_allinput=letresult=Queue.create()inlet%map()=iter'input~f:(funq->Queue.blit_transfer~src:q~dst:result();return())inresult;;letto_listr=read_allr>>|Queue.to_listletto_stream_deprecatedt=Stream.create(funtail->iter_without_pushbackt~f:(funx->Tail.extendtailx)>>>fun()->Tail.close_exntail);;(* The implementation of [of_stream_deprecated] does as much batching as possible. It
grabs as many items as are available into an internal queue. Once it has grabbed
everything, it writes it to the pipe and then blocks waiting for the next element from
the stream.
There's no possibility that we'll starve the pipe reading an endless stream, just
accumulating the elements into our private queue forever without ever writing them
downstream to the pipe. Why? because while we're running, the stream-producer *isn't*
running -- there are no Async block points in the queue-accumulator loop. So the
queue-accumulator loop will eventually catch up to the current stream tail, at which
point we'll do the pipe-write and then block on the stream... thus giving the
stream-producer a chance to make more elements.
One can't implement [of_stream] using [Stream.iter] or [Stream.iter'] because you
need to be able to stop early when the consumer closes the pipe. Also, using either
of those would entail significantly more deferred overhead, whereas the below
implementation uses a deferred only when it needs to wait for data from the stream. *)letof_stream_deprecateds=letr,w=create()inletq=Queue.create()inlettransfer()=ifnot(Queue.is_emptyq)then(* Can not pushback on the stream, so ignore the pushback on the pipe. *)don't_wait_for(write'wq)inletrecloops=assert(not(is_closedw));letnext_deferred=Stream.nextsinmatchDeferred.peeknext_deferredwith|Somenext->loop_nextnext|None->transfer();uponnext_deferredcheck_closed_loop_nextandcheck_closed_loop_nextnext=ifnot(is_closedw)thenloop_nextnextandloop_next=function|Nil->transfer();closew|Cons(x,s)->Queue.enqueueqx;loopsinloops;r;;lettransfer_gen(read_now:?consumer:Consumer.t->_Reader.t->_)writeinputoutput~f=if!check_invariantthen(invariantinput;invariantoutput);letlink=Link.create~upstream:input~downstream:outputinletconsumer=Link.consumerlinkin(* When we're done with [input], we unlink to remove pointers from
[output] to [input], which would cause a space leak if we had single long-lived
output into which we transfer lots of short-lived inputs. *)letunlink()=Link.unlink_upstreamlinkinDeferred.create(funresult->(* We do [return () >>>] to ensure that [f] is only called asynchronously. *)return()>>>fun()->letoutput_closed()=close_readinput;unlink();Ivar.fillresult()inletrecloop()=ifis_closedoutputthenoutput_closed()else(matchread_nowinput~consumerwith|`Eof->unlink();Ivar.fillresult()|`Okx->fxcontinue|`Nothing_available->choose[choice(values_availableinput)ignore;choice(closedoutput)ignore]>>>fun()->loop())andcontinuey=ifis_closedoutputthenoutput_closed()else(letpushback=writeoutputyinConsumer.values_sent_downstreamconsumer;pushback>>>fun()->loop())inloop());;lettransfer'?max_queue_lengthinputoutput~f=transfer_gen(read_now'?max_queue_length)write'inputoutput~f:(funqk->fq>>>k);;lettransferinputoutput~f=transfer_genread_nowwriteinputoutput~f:(funak->k(fa));;lettransfer_id?max_queue_lengthinputoutput=transfer_gen(read_now'?max_queue_length)write'inputoutput~f:(funqk->kq);;letmap_genreadwriteinput~f=letinfo=Option.mapinput.info~f:(funinfo->[%sexpMapped(info:Sexp.t)])inletresult,output=create?info()inupon(transfer_genreadwriteinputoutput~f)(fun()->closeoutput);result;;letmap'?max_queue_lengthinput~f=map_gen(read_now'?max_queue_length)write'input~f:(funqk->fq>>>k);;letmapinput~f=map_genread_nowwriteinput~f:(funak->k(fa))letfilter_map'?max_queue_lengthinput~f=map'?max_queue_lengthinput~f:(funq->Deferred.Queue.filter_mapq~f);;letfilter_map?max_queue_lengthinput~f=map_gen(read_now'?max_queue_length)write'input~f:(funqk->k(Queue.filter_mapq~f:(funx->ifis_read_closedinputthenNoneelsefx)));;letfolding_filter_map'?max_queue_lengthinput~init~f=letaccum=refinitinfilter_map'?max_queue_lengthinput~f:(funx->let%mapa,x=f!accumxinaccum:=a;x);;letfolding_filter_map?max_queue_lengthinput~init~f=letaccum=refinitinfilter_map?max_queue_lengthinput~f:(funx->leta,x=f!accumxinaccum:=a;x);;letfolding_map?max_queue_lengthinput~init~f=folding_filter_map?max_queue_lengthinput~init~f:(funaccuma->letaccum,b=faccumainaccum,Someb);;letfilterinput~f=filter_mapinput~f:(funx->iffxthenSomexelseNone)letof_listl=lett=create_internal~info:None~initial_buffer:(Queue.of_listl)inIvar.fillt.closed();update_pushbackt;t;;letempty()=of_list[]letsingletonx=letreader,writer=create()inwrite_without_pushbackwriterx;closewriter;reader;;letunfold~init:s~f=(* To get some batching, we run the continuation immediately if the deferred is
determined. However, we always check for pushback. Because size budget can't be
infinite, the below loop is guaranteed to eventually yield to the scheduler. *)let(>>=~)df=matchDeferred.peekdwith|None->d>>=f|Somex->fxincreate_reader~close_on_exception:false(funwriter->letrecloops=fs>>=~function|None->return()|Some(a,s)->ifis_closedwriterthenreturn()elsewritewritera>>=~fun()->loopsinloops);;letof_sequencesequence=create_reader~close_on_exception:false(funwriter->letrecenqueue_nsequencei=ifi<=0thensequenceelse(matchSequence.nextsequencewith|None->sequence|Some(a,sequence)->Queue.enqueuewriter.buffera;enqueue_nsequence(i-1))inletrecloopsequence=ifis_closedwriter||Sequence.is_emptysequencethenreturn()else(start_writewriter;letsequence=enqueue_nsequence(1+writer.size_budget-lengthwriter)infinish_writewriter;let%bind()=pushbackwriterinloopsequence)inloopsequence);;type'ato_sequence_elt=|Valueof'a|Wait_for:_Deferred.t->_to_sequence_eltletto_sequencet=Sequence.unfold~init:()~f:(fun()->matchread_nowtwith|`Eof->None|`Oka->Some(Valuea,())|`Nothing_available->Some(Wait_for(values_availablet),()));;letinterleave_pipeinputs=letoutput,output_writer=create~info:[%sexp"Pipe.interleave"]()in(* We keep a reference count of all the pipes that [interleave_pipe] is managing;
[inputs] counts as one. When the reference count drops to zero, we know that all
pipes are closed and we can close [output_writer]. *)letnum_pipes_remaining=ref1inletdecr_num_pipes_remaining()=decrnum_pipes_remaining;if!num_pipes_remaining=0thencloseoutput_writerindon't_wait_for(let%map()=iter_without_pushbackinputs~f:(funinput->incrnum_pipes_remaining;don't_wait_for(let%map()=transfer_idinputoutput_writerindecr_num_pipes_remaining()))indecr_num_pipes_remaining());(* for [inputs] *)output;;letinterleaveinputs=if!check_invariantthenList.iterinputs~f:invariant;interleave_pipe(of_listinputs);;letmergeinputs~compare=matchinputswith|[]->empty()|[input]->input|inputs->letmoduleHeap=Pairing_heapinletr,w=create()inupon(closedw)(fun()->List.iterinputs~f:close_read);letheap=Heap.create~cmp:(fun(a1,_)(a2,_)->comparea1a2)()inlethandle_readinputeof_or_ok=matcheof_or_okwith|`Eof->()|`Okv->Heap.addheap(v,input)inletrecpop_heap_and_loop()=(* At this point, all inputs not at Eof occur in [heap] exactly once, so we know
what the next output element is. [pop_heap_and_loop] repeatedly takes elements
from the inputs as long as it has one from each input. This is done
synchronously to avoid the cost of a deferred for each element of the output --
there's no need to pushback since that is only moving elements from one pipe to
another. As soon as [pop_heap_and_loop] can't get an element from some input, it
waits on pushback from the output, since it has to wait on the input anyway.
This also prevents [merge] from consuming inputs at a rate faster than its output
is consumed. *)matchHeap.popheapwith|None->closew|Some(v,input)->ifnot(is_closedw)then(write_without_pushbackwv;ifHeap.lengthheap=0thenupon(transfer_idinputw)(fun()->closew)else(matchread_nowinputwith|(`Eof|`Ok_)asx->handle_readinputx;pop_heap_and_loop()|`Nothing_available->pushbackw>>>fun()->readinput>>>funx->handle_readinputx;pop_heap_and_loop()))inletinitial_push=Deferred.List.iterinputs~f:(funinput->let%mapx=readinputinhandle_readinputx)inuponinitial_pushpop_heap_and_loop;r;;letconcat_pipeinputs=letr=create_reader_not_close_on_exception(funw->letlink=Link.create~upstream:inputs~downstream:winletconsumer=Link.consumerlinkiniter~flushed:(Consumerconsumer)inputs~f:(funinput->transfer_idinputw))inupon(closedr)(fun()->closeinputs);r;;letconcatinputs=create_reader_not_close_on_exception(funw->Deferred.List.iterinputs~f:(funinput->transfer_idinputw));;letforkt~pushback_uses=letreader0,writer0=create()inletreader1,writer1=create()inletsome_reader_was_closed=reffalseinletconsumer=add_consumert~downstream_flushed:(fun()->letsome_reader_was_closed=!some_reader_was_closedinmatch%mapFlushed_result.combine[downstream_flushedwriter0;downstream_flushedwriter1]with|`Reader_closed->`Reader_closed|`Ok->(* In this case, there could have been no pending items in [writer0] nor in
[writer1], in which case we could have had a closed pipe that missed some
writes, but [Flushed_result.combine] would still have returned [`Ok] *)ifsome_reader_was_closedthen`Reader_closedelse`Ok)indon't_wait_for(letstill_open=[writer0;writer1]inletfilter_openstill_open=(* Only call [filter] and reallocate list if something will get filtered *)ifnot(List.existsstill_open~f:is_closed)thenstill_openelse(some_reader_was_closed:=true;letstill_open=List.filterstill_open~f:(funw->not(is_closedw))inifList.is_emptystill_openthencloset;still_open)inlet%bindstill_open=fold't~flushed:(Consumerconsumer)~init:still_open~f:(funstill_openqueue->letstill_open=filter_openstill_openinifList.is_emptystill_openthenreturn[]else(let%map()=matchpushback_useswith|`Fast_consumer_only->Deferred.any(List.mapstill_open~f:pushback)|`Both_consumers->Deferred.all_unit(List.mapstill_open~f:pushback)inletstill_open=filter_openstill_openinList.iterstill_open~f:(funw->copy_in_without_pushbackw~from:queue);still_open))inList.iterstill_open~f:close;return());reader0,reader1;;letset_infotinfo=set_infot(Someinfo)