123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353(**************************************************************************)(* *)(* VSCoq *)(* *)(* Copyright INRIA and contributors *)(* (see version control and README file for authors & dates) *)(* *)(**************************************************************************)(* *)(* This file is distributed under the terms of the MIT License. *)(* See LICENSE file. *)(* *)(**************************************************************************)openTypesletLoglog=Log.mk_log"delegationManager"typesentence_id=Stateid.ttypelink={write_to:Unix.file_descr;read_from:Unix.file_descr;}letwrite_value{write_to;_}x=(* alert: calling log from write value causes a loop, since log (from the worker)
writes the value to a channel. Hence we mask [log] *)let[@warning"-26"]log_=()inletdata=Marshal.to_bytesx[]inletdatalength=Bytes.lengthdatainletwriteno=Unix.writewrite_todata0datalengthinassert(writeno=datalength);flush_all()letabort_on_unix_errorfx=tryfxwithUnix.Unix_error(e,f,p)->Printf.eprintf"Error: %s: %s: %s\n%!"fp(Unix.error_messagee);exit3typejob_handle=(Feedback.route_id*sentence_id)*intoptionrefmoduletypeJob=sigtypetvalname:stringvalbinary_name:stringvalinitial_pool_size:inttypeupdate_requestvalappendFeedback:Feedback.route_id*sentence_id->(Feedback.level*Loc.toption*Quickfix.tlist*Pp.t)->update_requestend(* One typically created a job id way before the worker is spawned, so we
allocate a slot for the PID, but set it later. The sentence_id is used
for error reporting (e.g. fail to spawn) *)letmk_job_handle(rid,sid):job_handle=(rid,sid),refNoneletcancel_job(_,id)=match!idwith|None->()|Somepid->Unix.killpid9(* TODO: this queue should not be here, it should be "per URI" but we want to
keep here the conversion (STM) feedback -> (LSP) feedback *)letinstall_feedbacksend=Log.feedback_add_feeder_on_Message(funroutespan_lvllocqfmsg->send(route,span,(lvl,loc,qf,msg)))moduletypeWorker=sigtypejob_ttypejob_update_requestvalresize_pool:int->unit(** Event for the main loop *)typedelegationvalpr_event:delegation->Pp.ttypeevents=delegationSel.Event.tlist(** handling an event may require an update to a sentence in the exec state,
e.g. when a feedback is received *)valhandle_event:delegation->(job_update_requestoption*events)(* When a worker is available and the [jobs] queue can be popped the
event becomes ready; in turn the event triggers the action:
- if we can fork, job is passed to fork_action
- otherwise Job.binary_name is spawn and the job sent to it *)valworker_available:jobs:((job_handle*Sel.Event.cancellation_handle*job_t)Queue.t)->fork_action:(job_t->send_back:(job_update_request->unit)->unit)->feedback_cleanup:(unit->unit)->delegationSel.Event.t(* for worker toplevels *)typeoptions[%%ifcoq="8.18"||coq="8.19"||coq="8.20"]valparse_options:stringlist->options*stringlist[%%else]valparse_options:Coqargs.t->stringlist->options*stringlist[%%endif](* the sentence ids of the remote_mapping being delegated *)valsetup_plumbing:options->((job_update_request->unit)*job_t)(* CDebug aware print *)vallog:?force:bool->string->unitendmoduleMakeWorker(Job:Job)=structtypejob_t=Job.ttypejob_update_request=Job.update_requesttypeworker_message=|Job_updateofJob.update_request|DebugMessageofLog.eventletLoglog_worker=Log.mk_log("worker."^Job.name)letinstall_feedback_worker~feedback_cleanuplink=feedback_cleanup();ignore(install_feedback(fun(rid,id,fb)->write_valuelink(Job.appendFeedback(rid,id)fb)))typefeedback_cleanup=unit->unit(* This is the lifetime of a delegation, there is one start event, many progress
evants, then one ending event. *)typedelegation=|WorkerStart:feedback_cleanup*job_handle*'job*('job->send_back:(Job.update_request->unit)->unit)*string->delegation|WorkerProgressof{link:link;update_request:worker_message}(* TODO: use a recurring event (+cancel) and remove link *)|WorkerEndof(int*Unix.process_status)|WorkerIOErrorofexnletpr_event=function|WorkerEnd_->Pp.str"WorkerEnd"|WorkerIOError_->Pp.str"WorkerIOError"|WorkerProgress_->Pp.str"WorkerProgress"|WorkerStart_->Pp.str"WorkerStart"letinstall_debug_workerlink=Log.worker_initialization_done~fwd_event:(fune->write_valuelink(DebugMessagee))typeevents=delegationSel.Event.tlisttyperole=Master|Workeroflink(* The pool is just a queue of tokens *)letpool=Queue.create()let()=assert(Job.initial_pool_size>=1);for_i=0toJob.initial_pool_sizedoQueue.push()pooldoneletcurrent_pool_size=refJob.initial_pool_sizeletresize_poolnew_pool_size=assert(new_pool_size>=1);letdelta=!current_pool_size-new_pool_sizeincurrent_pool_size:=new_pool_size;(* We add tokens if needed *)ifdelta<0thenfor_i=1toabs(delta)doQueue.push()pooldone;(* We remove tokens if needed, the ones currently in use are not added back.
See handling of WorkerEnd and WorkerIOError *)ifdelta>0thenfor_i=1toabs(delta)doignore(Queue.take_optpool)done;;(* In order to create a job we enqueue this event *)letworker_available~jobs~fork_action~feedback_cleanup:delegationSel.Event.t=Sel.On.queuesjobspool(fun(job_handle,_,job)()->WorkerStart(feedback_cleanup,job_handle,job,fork_action,Job.binary_name))(* When a worker is spawn, we enqueue this event, since eventually it will die *)letworker_endspid:delegationSel.Event.t=Sel.On.death_of~pid(funreason->WorkerEnd(pid,reason))(* When a worker is spawn, we enqueue this event, since eventually will make progress *)letworker_progresslink:delegationSel.Event.t=Sel.On.ocaml_valuelink.read_from(function|Errore->WorkerIOErrore|Okupdate_request->WorkerProgress{link;update_request;})(* ************ spawning *************************************************** *)letaccept_timeout?(timeout=2.0)sr=letr,_,_=Unix.select[sr][][]timeoutinifr=[]thenNoneelseSome(Unix.acceptsr)letfork_worker:feedback_cleanup:feedback_cleanup->intoptionref->(role*events,string*events)result=fun~feedback_cleanupcancellation_handle->letopenUnixintryletchan=socketPF_INETSOCK_STREAM0inbindchan(ADDR_INET(Unix.inet_addr_loopback,0));listenchan1;letaddress=getsocknamechaninlog@@"forking...";flush_all();letnull=openfile"/dev/null"[O_RDWR]0o640inletpid=fork()inifpid=0thenbegin(* Children process *)dup2nullstdin;dup2nullstdout;closechan;Log.worker_initialization_begins();letchan=socketPF_INETSOCK_STREAM0inconnectchanaddress;letread_from=chaninletwrite_to=chaninletlink={write_to;read_from}ininstall_feedback_worker~feedback_cleanuplink;install_debug_workerlink;log_worker@@"borning...";Ok(Workerlink,[])endelse(* Parent process *)let()=cancellation_handle:=Somepidinmatchaccept_timeoutchanwith|None->closechan;log@@Printf.sprintf"forked pid %d did not connect back"pid;Unix.killpid9;Error("worker did not connect back",[worker_endspid])|Some(worker,_worker_addr)->closechan;log@@Printf.sprintf"forked pid %d called back"pid;letread_from=workerinletwrite_to=workerinletlink={write_to;read_from}inOk(Master,[worker_progresslink;worker_endspid])withUnix_error(e,f,p)->Error(f^": "^p^": "^error_messagee,[]);;letoption_name="-"^Str.global_replace(Str.regexp_string" ")"."Job.name^"_master_address"letcreate_process_workerprocnamecancellation_handlejob=letopenUnixintryletchan=socketPF_INETSOCK_STREAM0inbindchan(ADDR_INET(Unix.inet_addr_loopback,0));listenchan1;letport=matchgetsocknamechanwith|ADDR_INET(_,port)->port|_->assertfalseinletnull=openfile"/dev/null"[O_RDWR]0o640inletextra_flags=ifCDebug.get_flags()="all"then[|"-debug"|]else[||]inletargs=Array.append[|procname;option_name;string_of_intport|]extra_flagsinletpid=create_processprocnameargsnullstdoutstderrinclosenull;let()=cancellation_handle:=Somepidinlog@@Printf.sprintf"created worker %d, waiting on port %d"pidport;matchaccept_timeoutchanwith|Some(worker,_worker_addr)->closechan;letread_from=workerinletwrite_to=workerinletlink={write_to;read_from}ininstall_feedback_worker~feedback_cleanup:(fun_->())link;install_debug_workerlink;log@@"sending job";write_valuelinkjob;flush_all();log@@"sent";Ok[worker_progresslink;worker_endspid]|None->log@@Printf.sprintf"child process %d did not connect back"pid;Unix.killpid9;Error("worker did not connect back",[worker_endspid])withUnix_error(e,f,p)->Error(f^": "^p^": "^error_messagee,[])(* **************** /spawning ********************************************** *)lethandle_event=function|WorkerIOErrore->log@@"worker IO Error: "^Printexc.to_stringe;ifQueue.lengthpool<!current_pool_sizethenQueue.push()pool;(None,[])|WorkerEnd(pid,_status)->log@@Printf.sprintf"worker %d went on holidays"pid;ifQueue.lengthpool<!current_pool_sizethenQueue.push()pool;(None,[])|WorkerProgress{link;update_request=DebugMessaged}->Log.handle_eventd;(None,[worker_progresslink])|WorkerProgress{link;update_request=Job_updateu}->log"worker progress";(Someu,[worker_progresslink])|WorkerStart(feedback_cleanup,(feedback_route,cancellation_handle),job,action,procname)->log"worker starts";ifSys.os_type="Unix"thenmatchfork_worker~feedback_cleanupcancellation_handlewith|Ok(Master,events)->log"worker spawned (fork)";(None,events)|Ok(Workerlink,_)->actionjob~send_back:(funj->abort_on_unix_errorwrite_valuelink(Job_updatej));exit0|Error(msg,cleanup_events)->log@@"worker did not spawn: "^msg;(Some(Job.appendFeedbackfeedback_route(Feedback.Error,None,[],Pp.strmsg)),cleanup_events)elsematchcreate_process_workerprocnamecancellation_handlejobwith|Okevents->log"worker spawned (create_process)";(None,events)|Error(msg,cleanup_events)->log@@"worker did not spawn: "^msg;(Some(Job.appendFeedbackfeedback_route(Feedback.Error,None,[],Pp.strmsg)),cleanup_events)(* the only option is the socket port *)typeoptions=intletsetup_plumbingport=tryletopenUnixinletchan=socketPF_INETSOCK_STREAM0inletaddress=ADDR_INET(inet_addr_loopback,port)inlog_worker@@"connecting to "^string_of_intport;connectchanaddress;letread_from=chaninletwrite_to=chaninletlink={read_from;write_to}in(* Unix.read_value does not exist, we use Sel *)matchSel.(popTodo.(addempty[Sel.On.ocaml_valueread_from(funx->x)]))with|Ok(job:Job.t),_->(write_valuelink,job)|Errorexn,_->log_worker@@"error receiving job: "^Printexc.to_stringexn;exit1withUnix.Unix_error(code,syscall,param)->log_worker@@Printf.sprintf"error starting: %s: %s: %s"syscallparam(Unix.error_messagecode);exit1letparse_optionsextra_args=matchextra_argswith|[o;port]wheno=option_name->int_of_stringport,[]|_->Printf.eprintf"unknown arguments: %s"(String.concat" "extra_args);exit2[%%ifcoq="8.18"||coq="8.19"||coq="8.20"]letparse_optionsa=parse_optionsa[%%else]letparse_options_a=parse_optionsa[%%endif]letlog=log_workerend