123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081open!Coreopen!Async_kernelopen!Importopen!Busletsubscribe_and_maybe_write_to_pipe1there~maybe_write_fn=ifBus.is_closedtthenPipe.empty()else(letr,w=Pipe.create()inletsubscription=subscribe_exnthere~f:(maybe_write_fnw)~on_close:(fun()->Pipe.closew)inupon(Pipe.closedw)(fun()->unsubscribetsubscription);r);;letpipe1_exnthere=subscribe_and_maybe_write_to_pipe1there~maybe_write_fn:Pipe.write_without_pushback_if_open;;letpipe1_filter_map_exnthere~f=subscribe_and_maybe_write_to_pipe1there~maybe_write_fn:(funpipev->matchfvwith|None->()|Somev->Pipe.write_without_pushback_if_openpipev);;moduleFirst_arity=structtype(_,_,_)t=|Arity1:('a->unit,'a->'roption,'r)t|Arity2:('a->'b->unit,'a->'b->'roption,'r)t|Arity3:('a->'b->'c->unit,'a->'b->'c->'roption,'r)t|Arity4:('a->'b->'c->'d->unit,'a->'b->'c->'d->'roption,'r)t|Arity5:('a->'b->'c->'d->'e->unit,'a->'b->'c->'d->'e->'roption,'r)t[@@derivingsexp_of]endletfirst_exn(typecfr)?stopthere(first_arity:(c,f,r)First_arity.t)~(f:f)=Deferred.create(funivar->letsubscriber:cBus.Subscriber.toptionref=refNoneinletfinish:roption->unit=function|None->()|Somer->Ivar.fillivarr;Bus.unsubscribet(Option.value_exn!subscriber)in(* We define [can_finish] separately from [finish] because we must call [can_finish]
before we call [f], so that we do not call [f] if [stop] is determined. *)letcan_finish=matchstopwith|None->fun()->true|Somestop->uponstop(fun()->Bus.unsubscribet(Option.value_exn!subscriber));fun()->not(Deferred.is_determinedstop)inletcallback:c=matchfirst_aritywith|Arity1->funa->ifcan_finish()thenfinish(fa)|Arity2->funa1a2->ifcan_finish()thenfinish(fa1a2)|Arity3->funa1a2a3->ifcan_finish()thenfinish(fa1a2a3)|Arity4->funa1a2a3a4->ifcan_finish()thenfinish(fa1a2a3a4)|Arity5->funa1a2a3a4a5->ifcan_finish()thenfinish(fa1a2a3a4a5)insubscriber:=Some(Bus.subscribe_exnthere~on_callback_raise:(letmonitor=Monitor.current()infunerror->Monitor.send_exnmonitor(Error.to_exnerror))~f:callback));;