123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364(************************************************************************)(* * The Coq Proof Assistant / The Coq Development Team *)(* v * Copyright INRIA, CNRS and contributors *)(* <O___,, * (see version control and CREDITS file for authors & dates) *)(* \VV/ **************************************************************)(* // * This file is distributed under the terms of the *)(* * GNU Lesser General Public License Version 2.1 *)(* * (see LICENSE file for the text of the license) *)(************************************************************************)openCErrorsopenPpopenUtilletstm_pr_errpp=Format.eprintf"%s] @[%a@]\n%!"(Spawned.process_id())Pp.pp_withppletstm_prerr_endlines=ifCDebug.(get_flagmisc)thenbeginstm_pr_err(strs)endelse()typecancel_switch=boolrefletasync_proofs_flags_for_workers=ref[]moduletypeTask=sigtypetasktypecompetencetypeworker_status=Fresh|Oldofcompetence(* Marshallable *)typerequesttyperesponsevalname:string(* UID of the task kind, for -toploop *)valextra_env:unit->stringarray(* run by the master, on a thread *)valrequest_of_task:worker_status->task->requestoptionvaltask_match:worker_status->task->boolvaluse_response:worker_status->task->response->[`Stayofcompetence*tasklist|`End]valon_marshal_error:string->task->unitvalon_task_cancellation_or_expiration_or_slave_death:taskoption->unitvalforward_feedback:Feedback.feedback->unit(* run by the worker *)valperform:request->response(* debugging *)valname_of_task:task->stringvalname_of_request:request->stringendmoduleMake(T:Task)()=structexceptionDietyperesponse=|ResponseofT.response|RespFeedbackofFeedback.feedbacktyperequest=RequestofT.requestletslave_respond(Requestr)=letres=T.performrinResponseresexceptionMarshalErrorofstringletmarshal_to_channelocdata=Marshal.to_channelocdata[];flushocletmarshal_errs=raise(MarshalErrors)letmarshal_requestoc(req:request)=trymarshal_to_channelocreqwithFailures|Invalid_arguments|Sys_errors->marshal_err("marshal_request: "^s)letunmarshal_requestic=try(CThread.thread_friendly_input_valueic:request)withFailures|Invalid_arguments|Sys_errors->marshal_err("unmarshal_request: "^s)letmarshal_responseoc(res:response)=trymarshal_to_channelocreswithFailures|Invalid_arguments|Sys_errors->marshal_err("marshal_response: "^s)letunmarshal_responseic=try(CThread.thread_friendly_input_valueic:response)withFailures|Invalid_arguments|Sys_errors->marshal_err("unmarshal_response: "^s)letreport_status?(id=!Flags.async_proofs_worker_id)s=letopenFeedbackinfeedback~id:Stateid.initial(WorkerStatus(id,s))moduleWorker=Spawn.Sync()moduleModel=structtypeprocess=Worker.processtypeextra=(T.task*cancel_switch)TQueue.tletspawnidpriority=letname=Printf.sprintf"%s:%d"T.nameidinletproc,ic,oc=(* Filter arguments for slaves. *)letrecset_slave_opt=function|[]->!async_proofs_flags_for_workers@["-worker-id";name;"-async-proofs-worker-priority";CoqworkmgrApi.(string_of_prioritypriority)](* Options to discard: 0 arguments *)|("-emacs"|"--xml_format=Ppcmds"|"-batch"|"-vok"|"-vos")::tl->set_slave_opttl(* Options to discard: 1 argument *)|("-async-proofs"|"-vio2vo"|"-o"|"-load-vernac-source"|"-l"|"-load-vernac-source-verbose"|"-lv"|"-require-import"|"-require-export"|"-ri"|"-re"|"-load-vernac-object"|"-set"|"-unset"|"-compat"|"-mangle-names"|"-diffs"|"-async-proofs-cache"|"-async-proofs-j"|"-async-proofs-tac-j"|"-async-proofs-private-flags"|"-async-proofs-tactic-error-resilience"|"-async-proofs-command-error-resilience"|"-async-proofs-delegation-threshold"|"-async-proofs-worker-priority"|"-worker-id")::_::tl->set_slave_opttl(* Options to discard: 2 arguments *)|("-rifrom"|"-refrom"|"-rfrom"|"-require-import-from"|"-require-export-from")::_::_::tl->set_slave_opttl(* We need to pass some options with one argument *)|("-I"|"-nI"|"-include"|"-top"|"-topfile"|"-coqlib"|"-exclude-dir"|"-color"|"-init-file"|"-profile-ltac-cutoff"|"-main-channel"|"-control-channel"|"-native-output-dir"|"-w"|"-dump-glob"|"-bytecode-compiler"|"-native-compiler"asx)::a::tl->x::a::set_slave_opttl(* We need to pass some options with two arguments *)|("-R"|"-Q"asx)::a1::a2::tl->x::a1::a2::set_slave_opttl(* Finally we pass all options starting in '-'; check this is safe w.r.t the weird vio* option set *)|x::tlwhenx.[0]='-'->x::set_slave_opttl(* We assume this is a file, filter out *)|_::tl->set_slave_opttlinletargs=letwselect="--kind="^T.nameinArray.of_list(wselect::set_slave_opt(List.tl(Array.to_listSys.argv)))inletenv=Array.append(T.extra_env())(Unix.environment())inletworker_name=System.get_toplevel_path("coqworker")inWorker.spawn~envworker_nameargsinname,proc,CThread.prepare_in_channel_for_thread_friendly_ioic,ocletmanagercpanel(id,proc,ic,oc)=let{WorkerPool.extra=queue;exit;cancelled}=cpanelinletexit()=report_status~id"Dead";exit()inletlast_task=refNoneinletworker_age=refT.Freshinletgot_token=reffalseinletgiveback_exec_token()=if!got_tokenthen(CoqworkmgrApi.giveback1;got_token:=false)inletstop_waiting=reffalseinletexpiration_date=ref(reffalse)inletpick_task()=stm_prerr_endline"waiting for a task";letpickage(t,c)=not!c&&T.task_matchagetinlettask,task_expiration=TQueue.pop~picky:(pick!worker_age)~destroy:stop_waitingqueueinexpiration_date:=task_expiration;last_task:=Sometask;stm_prerr_endline("got task: "^T.name_of_tasktask);taskinletadd_tasksl=List.iter(funt->TQueue.pushqueue(t,!expiration_date))linletget_exec_token()=ignore(CoqworkmgrApi.get1);got_token:=true;stm_prerr_endline("got execution token")inletkillproc=Worker.killproc;stm_prerr_endline("Worker exited: "^matchWorker.waitprocwith|Unix.WEXITED0x400->"exit code unavailable"|Unix.WEXITEDi->Printf.sprintf"exit(%d)"i|Unix.WSIGNALEDsno->Printf.sprintf"signalled(%d)"sno|Unix.WSTOPPEDsno->Printf.sprintf"stopped(%d)"sno)inletreckill_if()=ifnot(Worker.is_aliveproc)then()elseifcancelled()||!(!expiration_date)thenlet()=stop_waiting:=trueinlet()=TQueue.broadcastqueueinWorker.killprocelselet()=Unix.sleep1inkill_if()inletkill_if()=trykill_if()withSys.Break->let()=stop_waiting:=trueinlet()=TQueue.broadcastqueueinWorker.killprocinlet_=CThread.createkill_if()intrywhiletruedoreport_status~id"Idle";lettask=pick_task()inmatchT.request_of_task!worker_agetaskwith|None->stm_prerr_endline("Task expired: "^T.name_of_tasktask)|Somereq->tryget_exec_token();marshal_requestoc(Requestreq);letreccontinue()=matchunmarshal_responseicwith|RespFeedbackfbk->T.forward_feedbackfbk;continue()|Responseresp->matchT.use_response!worker_agetaskrespwith|`End->raiseDie|`Stay(competence,new_tasks)->last_task:=None;giveback_exec_token();worker_age:=T.Oldcompetence;add_tasksnew_tasksincontinue()with|(Sys_error_|Invalid_argument_|End_of_file|Die)ase->raisee(* we pass the exception to the external handler *)|MarshalErrors->T.on_marshal_errorstask;raiseDie|e->stm_pr_errPp.(seq[str"Uncaught exception in worker manager: ";printe]);flush_all();raiseDiedonewith|(Die|TQueue.BeingDestroyed)->giveback_exec_token();killproc;exit()|Sys_error_|Invalid_argument_|End_of_file->T.on_task_cancellation_or_expiration_or_slave_death!last_task;giveback_exec_token();killproc;exit()endmodulePool=WorkerPool.Make(Model)typequeue={active:Pool.pool;queue:(T.task*cancel_switch)TQueue.t;cleaner:Thread.toption;}letcreatesizepriority=letcleanerqueue=whiletruedotryignore(TQueue.pop~picky:(fun(_,cancelled)->!cancelled)queue)withTQueue.BeingDestroyed->(Thread.exit[@warning"-3"])()doneinletqueue=TQueue.create()in{active=Pool.createqueue~sizepriority;queue;cleaner=ifsize>0thenSome(CThread.createcleanerqueue)elseNone;}letdestroy{active;queue}=Pool.destroyactive;TQueue.destroyqueueletbroadcast{queue}=TQueue.broadcastqueueletenqueue_task{queue;active}t~cancel_switch=stm_prerr_endline("Enqueue task "^T.name_of_taskt);TQueue.pushqueue(t,cancel_switch)letcancel_worker{active}n=Pool.cancelnactiveletname_of_request(Requestr)=T.name_of_requestrletset_order{queue}cmp=TQueue.set_orderqueue(fun(t1,_)(t2,_)->cmpt1t2)letjoin{queue;active}=ifnot(Pool.is_emptyactive)thenTQueue.wait_until_n_are_waiting_and_queue_empty(Pool.n_workersactive+1(*cleaner*))queueletcancel_all{queue;active}=TQueue.clearqueue;Pool.cancel_allactiveletslave_ic=refNoneletslave_oc=refNoneletinit_stdout()=letic,oc=Spawned.get_channels()inslave_oc:=Someoc;slave_ic:=Someicletslave_handshake()=Pool.worker_handshake(Option.get!slave_ic)(Option.get!slave_oc)letpp_pidpp=Pp.(str(Spawned.process_id()^" ")++pp)letdebug_with_pid=Feedback.(function|{contents=Message(Debug,loc,pp)}asfb->{fbwithcontents=Message(Debug,loc,pp_pidpp)}|x->x)letmain_loop()=(* We pass feedback to master *)letslave_feederocfb=Control.protect_sigalrm(fun()->Marshal.to_channeloc(RespFeedback(debug_with_pidfb))[];flushoc)()inignore(Feedback.add_feeder(funx->slave_feeder(Option.get!slave_oc)x));letworking=reffalseinslave_handshake();whiletruedotryworking:=false;letrequest=unmarshal_request(Option.get!slave_ic)inworking:=true;report_status(name_of_requestrequest);letresponse=slave_respondrequestinreport_status"Idle";marshal_response(Option.get!slave_oc)response;CEphemeron.clean()with|MarshalErrors->stm_pr_errPp.(prliststr["Fatal marshal error: ";s]);flush_all();exit2|End_of_file->stm_prerr_endline"connection lost";flush_all();exit2|e->stm_pr_errPp.(seq[str"Slave: critical exception: ";printe]);flush_all();exit1doneletclear{queue;active}=assert(Pool.is_emptyactive);(* We allow that only if no slaves *)TQueue.clearqueueletsnapshot{queue;active}=List.mapfst(TQueue.wait_until_n_are_waiting_then_snapshot(Pool.n_workersactive)queue)letwith_n_workersnpriorityf=letq=createnpriorityintryletrc=fqindestroyq;rcwithe->lete=Exninfo.captureeindestroyq;Exninfo.iraiseeletn_workers{active}=Pool.n_workersactiveendmoduleMakeQueue(T:Task)()=structincludeMake(T)()endmoduleMakeWorker(T:Task)()=structincludeMake(T)()endexceptionRemoteExceptionofPp.tlet_=CErrors.register_handler(function|RemoteExceptionppcmd->Someppcmd|_->None)