12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455openCore_kernelopenAsync_kerneltype'at=('a,Rpc_error.t)Result.t[@@derivingbin_io]typelocated_error={location:string;exn:Exn.t}[@@derivingsexp_of]letuncaught_exn~locationexn=Error(Rpc_error.Uncaught_exn(sexp_of_located_error{location;exn}));;letbin_io_exn~locationexn=Error(Rpc_error.Bin_io_exn(sexp_of_located_error{location;exn}));;lettry_with?run~locationf=letx=Monitor.try_with?runfinletjoin=function|Okx->x|Errorexn->uncaught_exn~locationexninmatchDeferred.peekxwith|None->x>>|join|Somex->return(joinx);;(* it would make sense to just take a [Connection.t], but we take its pieces instead to
avoid a dependency cycle *)letor_error~rpc_tag~rpc_version~connection_description~connection_close_started=function|Okx->Okx|Error(rpc_error:Rpc_error.t)->letrpc_error=Rpc_error.sexp_of_trpc_error~get_connection_close_reason:(fun()->letclose_reason=(* Usually (always?) here we will have the deferred already full
because Connection_closed error means the connection is already
closed *)Deferred.peekconnection_close_startedin[%sexp(close_reason:Info.toption)])inOr_error.error_s[%sexp{rpc_error=(rpc_error:Sexp.t);connection_description=(connection_description:Info.t);rpc_tag=(rpc_tag:Protocol.Rpc_tag.t);rpc_version=(rpc_version:int)}];;