123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251open!Core_kernelopen!Async_kernelopen!Async_kernel_require_explicit_time_sourceincludePersistent_connection_kernel_intfmoduleMake(Conn:T)=structtypeaddress=Conn.Address.t[@@derivingsexp_of]typeconn=Conn.tmoduleEvent=structtypet=|Attempting_to_connect|Obtained_addressofaddress|Failed_to_connectofError.t|Connectedof(conn[@sexp.opaque])|Disconnected[@@derivingsexp_of]typeevent=tmoduleHandler=structtypet={server_name:string;on_event:event->unitDeferred.t}endletlog_level=function|Attempting_to_connect|Connected_|Disconnected|Obtained_address_->`Info|Failed_to_connect_->`Error;;lethandlet{Handler.server_name=_;on_event}=on_eventtendtypet={get_address:unit->addressOr_error.tDeferred.t;connect:address->Conn.tOr_error.tDeferred.t;retry_delay:unit->unitDeferred.t;mutableconn:[`OkofConn.t|`Close_started]Ivar.t;mutablenext_connect_result:Conn.tOr_error.tIvar.t;event_handler:Event.Handler.t;close_started:unitIvar.t;close_finished:unitIvar.t}[@@derivingfields]lethandle_eventtevent=Event.handleeventt.event_handler(* This function focuses in on the the error itself, discarding information about which
monitor caught the error, if any.
If we don't do this, we sometimes end up with noisy logs which report the same error
again and again, differing only as to what monitor caught them. *)letsame_errore1e2=letto_sexpe=Exn.sexp_of_t(Monitor.extract_exn(Error.to_exne))inSexp.equal(to_sexpe1)(to_sexpe2);;lettry_connecting_until_successfult=(* We take care not to spam logs with the same message over and over by comparing
each log message the the previous one of the same type. *)letprevious_address=refNoneinletprevious_error=refNoneinletconnect()=t.get_address()>>=function|Errore->return(Errore)|Okaddr->letsame_as_previous_address=match!previous_addresswith|None->false|Someprevious_address->Conn.Address.equaladdrprevious_addressinprevious_address:=Someaddr;(ifsame_as_previous_addressthenDeferred.unitelsehandle_eventt(Obtained_addressaddr))>>=fun()->t.connectaddrinletrecloop()=ifIvar.is_fullt.close_startedthenreturn`Close_startedelse(let%bindconnect_result=connect()inIvar.fillt.next_connect_resultconnect_result;t.next_connect_result<-Ivar.create();matchconnect_resultwith|Okconn->return(`Okconn)|Errorerr->letsame_as_previous_error=match!previous_errorwith|None->false|Someprevious_err->same_errorerrprevious_errinprevious_error:=Someerr;(ifsame_as_previous_errorthenDeferred.unitelsehandle_eventt(Failed_to_connecterr))>>=fun()->Deferred.any[t.retry_delay();Ivar.readt.close_started]>>=fun()->loop())inloop();;letcreate~server_name?(on_event=fun_->Deferred.unit)?retry_delay?(random_state=Random.State.default)?(time_source=Time_source.wall_clock())~connectget_address=letevent_handler={Event.Handler.server_name;on_event}inletretry_delay()=letdefault_retry_delay()=ifam_running_testthenTime_ns.Span.of_sec0.1elseTime_ns.Span.of_sec10.inletretry_delay=Option.valueretry_delay~default:default_retry_delayinletspan=Time_ns.Span.to_sec(retry_delay())inletdistance=Random.State.floatrandom_state(span*.0.3)inletwait=ifRandom.State.boolrandom_statethenspan+.distanceelsespan-.distanceinTime_source.aftertime_source(Time_ns.Span.of_secwait)inlett={event_handler;get_address;connect;next_connect_result=Ivar.create();retry_delay;conn=Ivar.create();close_started=Ivar.create();close_finished=Ivar.create()}in(* this loop finishes once [close t] has been called, in which case it makes sure to
leave [t.conn] filled with [`Close_started]. *)don't_wait_for@@Deferred.repeat_until_finished()(fun()->handle_eventtAttempting_to_connect>>=fun()->letready_to_retry_connecting=t.retry_delay()intry_connecting_until_successfult>>=funmaybe_conn->Ivar.fillt.connmaybe_conn;matchmaybe_connwith|`Close_started->return(`Finished())|`Okconn->handle_eventt(Connectedconn)>>=fun()->Conn.close_finishedconn>>=fun()->t.conn<-Ivar.create();handle_eventtDisconnected>>=fun()->(* waits until [retry_delay ()] time has passed since the time just before we last
tried to connect rather than the time we noticed being disconnected, so that if
a long-lived connection dies, we will attempt to reconnect immediately. *)Deferred.choose[Deferred.choiceready_to_retry_connecting(fun()->`Repeat());Deferred.choice(Ivar.readt.close_started)(fun()->Ivar.fillt.conn`Close_started;`Finished())]);t;;letconnectedt=(* Take care not to return a connection that is known to be closed at the time
[connected] was called. This could happen in client code that behaves like
{[
Persistent_connection.Rpc.connected t
>>= fun c1 ->
...
Rpc.Connection.close_finished c1
(* at this point we are in a race with the same call inside
persistent_client.ml *)
>>= fun () ->
Persistent_connection.Rpc.connected t
(* depending on how the race turns out, we don't want to get a closed connection
here *)
>>= fun c2 ->
...
]}
This doesn't remove the race condition, but it makes it less likely to happen.
*)letrecloop()=letd=Ivar.readt.conninmatchDeferred.peekdwith|None->d>>=(function|`Close_started->Deferred.never()|`Okconn->returnconn)|Some`Close_started->Deferred.never()|Some(`Okconn)->ifConn.is_closedconnthen(* give the reconnection loop a chance to overwrite the ivar *)Conn.close_finishedconn>>=loopelsereturnconninloop();;letcurrent_connectiont=matchDeferred.peek(Ivar.readt.conn)with|None|Some`Close_started->None|Some(`Okconn)->Someconn;;letclose_finishedt=Ivar.readt.close_finishedletis_closedt=Ivar.is_fullt.close_startedletcloset=ifIvar.is_fullt.close_startedthen(* Another call to close is already in progress. Wait for it to finish. *)close_finishedtelse(Ivar.fillt.close_started();Ivar.readt.conn>>=funconn_opt->(matchconn_optwith|`Close_started->Deferred.unit|`Okconn->Conn.closeconn)>>|fun()->Ivar.fillt.close_finished());;letconnected_or_failed_to_connect_connection_closed=Or_error.error_s[%message"Persistent connection closed"];;letconnected_or_failed_to_connectt=ifis_closedtthenreturnconnected_or_failed_to_connect_connection_closedelse(matchDeferred.peek(connectedt)with|Somex->return(Okx)|None->Deferred.choose[choice(Ivar.readt.close_started)(fun()->connected_or_failed_to_connect_connection_closed);choice(Ivar.readt.next_connect_result)Fn.id]);;end