openImportmoduleConv=ConvmoduleVersioned=VersionedmoduleMenu=MenumoduleProcedures=ProceduresmoduleWhere=WheremoduleRegistry=RegistryincludeTypesincludeExported_typesmoduleVersion_error=Versioned.Version_errormoduleDecl=DeclmoduleSub=SubmoduletypeFiber=Fiber_intf.SmodulePublic=structmoduleRequest=structtype('a,'b)t=('a,'b)Decl.Request.witnessletping=Procedures.Public.ping.declletdiagnostics=Procedures.Public.diagnostics.declletformat_dune_file=Procedures.Public.format_dune_file.declletpromote=Procedures.Public.promote.declletpromote_many=Procedures.Public.promote_many.declletbuild_dir=Procedures.Public.build_dir.declendmoduleNotification=structtype'at='aDecl.Notification.witnessletshutdown=Procedures.Public.shutdown.declendmoduleSub=structtype'at='aSub.tletdiagnostic=Sub.of_procedureProcedures.Poll.diagnosticletprogress=Sub.of_procedureProcedures.Poll.progressletrunning_jobs=Sub.of_procedureProcedures.Poll.running_jobsendendmoduleServer_notifications=structletabort=Procedures.Server_side.abort.declletlog=Procedures.Server_side.log.declendmoduleClient=structmoduletypeS=sigtypettype'afibertypechanmoduleVersioned:sigtype('a,'b)request=('a,'b)Versioned.Staged.requesttype'anotification='aVersioned.Staged.notificationvalprepare_request:t->('a,'b)Decl.Request.witness->(('a,'b)request,Version_error.t)resultfibervalprepare_notification:t->'aDecl.Notification.witness->('anotification,Version_error.t)resultfiberendvalrequest:?id:Id.t->t->('a,'b)Versioned.request->'a->('b,Response.Error.t)resultfibervalnotification:t->'aVersioned.notification->'a->unitfibervaldisconnected:t->unitfibermoduleStream:sigtype'atvalcancel:_t->unitfibervalnext:'at->'aoptionfiberendvalpoll:?id:Id.t->t->'aSub.t->('aStream.t,Version_error.t)resultfibermoduleBatch:sigtypeclient:=ttypetvalcreate:client->tvalrequest:?id:Id.t->t->('a,'b)Versioned.request->'a->('b,Response.Error.t)resultfibervalnotification:t->'aVersioned.notification->'a->unitvalsubmit:t->unitfiberendmoduleHandler:sigtypetvalcreate:?log:(Message.t->unitfiber)->?abort:(Message.t->unitfiber)->unit->tendtypeproc=|Request:('a,'b)Decl.request->proc|Notification:'aDecl.notification->proc|Poll:'aProcedures.Poll.t->proc|Handle_request:('a,'b)Decl.request*('a->'bfiber)->procvalconnect_with_menu:?handler:Handler.t->private_menu:proclist->chan->Initialize.Request.t->f:(t->'afiber)->'afibervalconnect:?handler:Handler.t->chan->Initialize.Request.t->f:(t->'afiber)->'afiberendmoduleMake(Fiber:Fiber_intf.S)(Chan:sigtypetvalwrite:t->Sexp.tlistoption->unitFiber.tvalread:t->Sexp.toptionFiber.tend)=structopenFiber.OmoduleV=Versioned.Make(Fiber)moduleChan=structtypet={read:unit->Sexp.toptionFiber.t;write:Sexp.tlistoption->unitFiber.t;closed_read:bool;mutableclosed_write:bool;disconnected:unitFiber.Ivar.t}letof_chanc=letdisconnected=Fiber.Ivar.create()inletread()=let*result=Chan.readcinmatchresultwith|None->let+()=Fiber.Ivar.filldisconnected()inNone|_->Fiber.returnresultin{read;write=(funs->Chan.writecs);closed_read=false;closed_write=false;disconnected};;letwritets=let*()=Fiber.return()inmatchswith|Some_->t.writes|None->ift.closed_writethenFiber.return()else(t.closed_write<-true;t.writeNone);;letreadt=let*()=Fiber.return()inift.closed_readthenFiber.returnNoneelset.read();;endtypeabort=|Invalid_sessionofConv.error|Server_abortedofMessage.texceptionAbortofabortlet()=Printexc.register_printer(function|Aborterror->letdyn=matcherrorwith|Invalid_sessione->Dyn.variant"Invalid_session"[Conv.dyn_of_errore]|Server_abortede->Dyn.variant"Server_aborted"[Sexp.to_dyn(Message.to_sexp_unversionede)]inSome(Dyn.to_stringdyn)|_->None);;typet={chan:Chan.t;requests:(Id.t,[`Cancelled|`Pendingof[`CompletedofResponse.t|`Connection_dead|`Cancelled]Fiber.Ivar.t])Table.t;initialize:Initialize.Request.t;mutablenext_id:int;mutablerunning:bool;mutablehandler_initialized:bool;(* We need this field to be an Ivar to ensure that any typed
communications are correctly versioned. The contract of the [Fiber]
interface ensures that this will be filled before any user code is
run. *)handler:unitV.Handler.tFiber.t;on_preemptive_abort:Message.t->unitFiber.t}(* When the client is terminated via this function, the session is
considered to be dead without a way to recover. *)letterminatet=let*()=Fiber.return()inmatcht.runningwith|false->Fiber.return()|true->t.running<-false;letivars=ref[]inTable.filteri_inplacet.requests~f:(fun~key:_~data:ivar->ivars:=ivar::!ivars;false);letivars()=Fiber.return(match!ivarswith|[]->None|x::xs->ivars:=xs;Somex)inFiber.fork_and_join_unit(fun()->Chan.writet.chanNone)(fun()->Fiber.parallel_iterivars~f:(funstatus->matchstatuswith|`Cancelled->Fiber.return()|`Pendingivar->Fiber.Ivar.fillivar`Connection_dead));;letterminate_with_errortmessageinfo=Fiber.fork_and_join_unit(fun()->terminatet)(fun()->(* TODO stop using code error here. If [terminate_with_error] is
called, it's because the other side is doing something unexpected,
not because we have a bug *)Code_error.raisemessageinfo);;letsendconn(packet:Packet.tlistoption)=letsexps=Option.mappacket~f:(List.map~f:(Conv.to_sexpPacket.sexp))inChan.writeconn.chansexps;;letcreate~chan~initialize~handler~on_preemptive_abort=letrequests=Table.create(moduleId)16in{chan;requests;next_id=0;initialize;running=true;handler_initialized=false;handler;on_preemptive_abort};;letprepare_request'conn(id,req)=matchconn.runningwith|false->leterr=letpayload=Sexp.record["id",Id.to_sexpid;"req",Conv.to_sexp(Conv.recordCall.fields)req]inResponse.Error.create~payload~message:"request sent while connection is dead"~kind:Connection_dead()inErrorerr|true->letivar=Fiber.Ivar.create()in(matchTable.addconn.requestsid(`Pendingivar)with|Ok()->()|Error_->Code_error.raise"duplicate id"["id",Id.to_dynid]);Okivar;;letrequest_untypedconn(id,req)=let*()=Fiber.return()inmatchprepare_request'conn(id,req)with|Errore->Fiber.return(`Completed(Errore))|Okivar->let*()=sendconn(Some[Request(id,req)])inFiber.Ivar.readivar;;letparse_responsetdecode=function|Errore->Fiber.return(Errore)|Okres->(matchdecodereswith|Oks->Fiber.return(Oks)|Errore->terminate_with_errort"response not matched by decl"["e",Response.Error.to_dyne]);;letgen_idt=function|Someid->id|None->letid=Sexp.List[Atom"auto";Atom(Int.to_stringt.next_id)]int.next_id<-t.next_id+1;Id.makeid;;moduleVersioned=structtype('a,'b)request=('a,'b)Versioned.Staged.requesttype'anotification='aVersioned.Staged.notificationletprepare_requestt(decl:_Decl.Request.witness)=let+handler=t.handlerinV.Handler.prepare_requesthandlerdecl;;letprepare_notification(typea)t(decl:aDecl.Notification.witness)=let+handler=t.handlerinV.Handler.prepare_notificationhandlerdecl;;endletrequesttid({encode_req;decode_resp}:_Versioned.request)req=letreq=encode_reqreqinlet*res=request_untypedt(id,req)inmatchreswith|`Connection_dead->Fiber.return`Connection_dead|`Cancelled->Fiber.return`Cancelled|`Completedres->let+res=parse_responsetdecode_respresin`Completedres;;letcanceltid=matchTable.findt.requestsidwith|None|Some`Cancelled->Fiber.return()|Some(`Pendingivar)->Table.removet.requestsid;Fiber.Ivar.fillivar`Cancelled;;letmake_notification(typea)t({encode}:aVersioned.notification)(n:a)(k:Call.t->'a):'a=letcall=encodeninmatcht.runningwith|true->kcall|false->leterr=letpayload=Conv.to_sexp(Conv.recordCall.fields)callinResponse.Error.create~payload~message:"notification sent while connection is dead"~kind:Code_error()inraise(Response.Error.Eerr);;letnotification(typea)t(stg:aVersioned.notification)(n:a)=let*()=Fiber.return()inmake_notificationtstgn(funcall->sendt(Some[Notificationcall]));;letdisconnectedt=Fiber.Ivar.readt.chan.disconnectedmoduleStream=structtypenonrec'at={poll:(Id.t,'aoption)Versioned.request;cancel:Id.tVersioned.notification;client:t;id:Id.t;mutablepending_request_id:Id.toption;counter:int;mutableactive:bool}letcreatesubclientid=let+handler=client.handlerinletopenResult.Oinlet+poll=V.Handler.prepare_requesthandler(Sub.pollsub)and+cancel=V.Handler.prepare_notificationhandler(Sub.poll_cancelsub)in{poll;cancel;client;id;pending_request_id=None;counter=0;active=true};;letcheck_activet=ifnott.activethenCode_error.raise"polling is inactive"["id",Id.to_dynt.id];;letnextt=let*()=Fiber.return()incheck_activet;(matcht.pending_request_idwith|Some_->Code_error.raise"Poll.next: previous Poll.next did not terminate yet"[]|None->());letid=Sexp.record["poll",Id.to_sexpt.id;"i",Sexp.Atom(string_of_intt.counter)]|>Id.makeint.pending_request_id<-Someid;let+res=requestt.clientidt.pollt.idint.pending_request_id<-None;matchreswith|`Connection_dead|`Cancelled->None|`Completed(Okres)->res|`Completed(Errore)->(* cwong: Should this really be a raise? *)raise(Response.Error.Ee);;letcancelt=let*()=Fiber.return()incheck_activet;t.active<-false;(* XXX should we add a pool to stop waiting for the notification to
reach the server? *)letnotify()=notificationt.clientt.cancelt.idinmatcht.pending_request_idwith|None->notify()|Someid->Fiber.fork_and_join_unit(fun()->cancelt.clientid)notify;;endletno_cancel_raise_connection_deadid=function|`Cancelled->assertfalse|`Completeds->s|`Connection_dead->letpayload=Sexp.record["id",Id.to_sexpid]inleterror=Response.Error.create~kind:Connection_dead~payload~message:"connection terminated. this request will never receive a response"()inErrorerror;;letrequest?idtspecreq=letid=gen_idtidinlet+res=requesttidspecreqinno_cancel_raise_connection_deadidres;;letpoll?idclientsub=let*()=Fiber.return()inletid=gen_idclientidinStream.createsubclientid;;moduleBatch=structtypenonrect={client:t;mutablepending:Packet.tlist}letcreateclient={client;pending=[]}letnotificationtna=make_notificationt.clientna(funcall->t.pending<-Notificationcall::t.pending);;letrequest(typeab)?idt({encode_req;decode_resp}:(a,b)Versioned.request)(req:a):(b,_)resultFiber.t=let*()=Fiber.return()inletid=gen_idt.clientidinletcall=encode_reqreqinletivar=prepare_request't.client(id,call)inmatchivarwith|Errore->Fiber.return(Errore)|Okivar->t.pending<-Packet.Request(id,call)::t.pending;let*res=Fiber.Ivar.readivarin(* currently impossible because there's no batching for polling and
cancellation is only available for polled requests *)letres=no_cancel_raise_connection_deadidresinparse_responset.clientdecode_respres;;letsubmitt=let*()=Fiber.return()inletpending=List.revt.pendingint.pending<-[];sendt.client(Somepending);;endletread_packetstpackets=let*()=Fiber.parallel_iterpackets~f:(function|Packet.Notificationn->ifString.equaln.method_Procedures.Server_side.abort.decl.method_&¬t.handler_initializedthen(matchConv.of_sexp~version:t.initialize.dune_versionMessage.sexpn.paramswith|Okmsg->t.on_preemptive_abortmsg|Errorerror->terminate_with_errort"fatal: server aborted connection, but couldn't parse reason"["reason",Sexp.to_dynn.params;"error",Conv.dyn_of_errorerror])elselet*handler=t.handlerinlet*result=V.Handler.handle_notificationhandler()nin(matchresultwith|Errore->terminate_with_errort"received bad notification from server"["error",Response.Error.to_dyne;"notification",Call.to_dynn]|Ok()->Fiber.return())|Request(id,req)->let*handler=t.handlerinlet*result=V.Handler.handle_requesthandler()(id,req)insendt(Some[Response(id,result)])|Response(id,response)->(matchTable.findt.requestsidwith|Somestatus->Table.removet.requestsid;(matchstatuswith|`Pendingivar->Fiber.Ivar.fillivar(`Completedresponse)|`Cancelled->Fiber.return())|None->terminate_with_errort"unexpected response"["id",Id.to_dynid;"response",Response.to_dynresponse]))interminatet;;moduleHandler=structtypenonrect={log:Message.t->unitFiber.t;abort:Message.t->unitFiber.t}letlog{Message.payload;message}=let+()=Fiber.return()inmatchpayloadwith|None->Format.eprintf"%s@."message|Somepayload->Format.eprintf"%s: %s@."message(Sexp.to_stringpayload);;letabortm=raise(Abort(Server_abortedm))letdefault={log;abort}letcreate?log?abort()=lett=lett=defaultinmatchlogwith|None->t|Somelog->{twithlog}inlett=matchabortwith|None->t|Someabort->{twithabort}int;;endtypeproc=|Request:('a,'b)Decl.request->proc|Notification:'aDecl.notification->proc|Poll:'aProcedures.Poll.t->proc|Handle_request:('a,'b)Decl.request*('a->'bFiber.t)->procletsetup_versioning~private_menu~(handler:Handler.t)=letmoduleBuilder=V.Builderinlett:unitBuilder.t=Builder.create()in(* CR-soon cwong: It is a *huge* footgun that you have to remember to
declare a request here, or via [private_menu], and there is no
mechanism to warn you if you forget. The closest thing is either seeing
that [dune rpc status] does not report the new procedure, or need to
deal with the [Notification_error.t], which contains some good context,
but very little to indicate this specific problem. *)Builder.declare_requesttProcedures.Public.ping;Builder.declare_requesttProcedures.Public.diagnostics;Builder.declare_requesttProcedures.Poll.(pollrunning_jobs);Builder.declare_notificationtProcedures.Public.shutdown;Builder.declare_requesttProcedures.Public.format_dune_file;Builder.declare_requesttProcedures.Public.promote;Builder.declare_requesttProcedures.Public.promote_many;Builder.declare_requesttProcedures.Public.build_dir;Builder.implement_notificationtProcedures.Server_side.abort(fun()->handler.abort);Builder.implement_notificationtProcedures.Server_side.log(fun()->handler.log);Builder.declare_requesttProcedures.Poll.(polldiagnostic);Builder.declare_requesttProcedures.Poll.(pollprogress);Builder.declare_notificationtProcedures.Poll.(cancelrunning_jobs);Builder.declare_notificationtProcedures.Poll.(canceldiagnostic);Builder.declare_notificationtProcedures.Poll.(cancelprogress);List.iterprivate_menu~f:(function|Handle_request(r,h)->Builder.implement_requesttr(fun()->h)|Requestr->Builder.declare_requesttr|Notificationn->Builder.declare_notificationtn|Pollp->Builder.declare_requestt(Procedures.Poll.pollp);Builder.declare_notificationt(Procedures.Poll.cancelp));t;;letconnect_rawchan(initialize:Initialize.Request.t)~(private_menu:proclist)~(handler:Handler.t)~f=letpackets()=let+read=Chan.readchaninOption.mapread~f:(funsexp->matchConv.of_sexpPacket.sexp~version:initialize.dune_versionsexpwith|Errore->raise(Abort(Invalid_sessione))|Okmessage->message)inletbuilder=setup_versioning~handler~private_menuinlethandler_var=Fiber.Ivar.create()inletclient=leton_preemptive_abort=handler.abortinlethandler=Fiber.Ivar.readhandler_varincreate~initialize~chan~handler~on_preemptive_abortinletrun()=let*init=letid=Id.make(List[Atom"initialize"])inletinitialize=Initialize.Request.to_callinitializeinlet+res=request_untypedclient(id,initialize)inno_cancel_raise_connection_deadidresinmatchinitwith|Errore->raise(Response.Error.Ee)|Okcsexp->let*menu=matchConv.of_sexp~version:initialize.dune_versionInitialize.Response.sexpcsexpwith|Errore->raise(Abort(Invalid_sessione))|Ok_resp->letid=Id.make(List[Atom"version menu"])inletsupported_versions=letrequest=Version_negotiation.Request.create(V.Builder.registered_proceduresbuilder)inVersion_negotiation.Request.to_callrequestinlet*resp=request_untypedclient(id,supported_versions)in(* we don't allow cancelling negotiation *)(matchno_cancel_raise_connection_deadidrespwith|Errore->raise(Response.Error.Ee)|Oksexp->(matchConv.of_sexp~version:initialize.dune_versionVersion_negotiation.Response.sexpsexpwith|Errore->raise(Abort(Invalid_sessione))|Ok(Selectedmethods)->(matchMenu.of_listmethodswith|Okm->Fiber.returnm|Error(method_,a,b)->terminate_with_errorclient"server responded with invalid version menu"[("duplicated",Dyn.Tuple[Dyn.Stringmethod_;Dyn.Inta;Dyn.Intb])])))inlethandler=V.Builder.to_handlerbuilder~menu~session_version:(fun()->client.initialize.dune_version)inclient.handler_initialized<-true;let*()=Fiber.Ivar.fillhandler_varhandlerinFiber.finalize(fun()->fclient)~finally:(fun()->Chan.writechanNone)inFiber.fork_and_join_unit(fun()->read_packetsclientpackets)run;;letconnect_with_menu?(handler=Handler.default)~private_menuchaninit~f=connect_raw(Chan.of_chanchan)init~handler~private_menu~f;;letconnect=connect_with_menu~private_menu:[]endend