123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343(**************************************************************************)(* *)(* Copyright 2012-2015 OCamlPro *)(* Copyright 2012 INRIA *)(* *)(* All rights reserved. This file is distributed under the terms of the *)(* GNU Lesser General Public License version 2.1, with the special *)(* exception on linking described in the file LICENSE. *)(* *)(**************************************************************************)openOpamStd.OpopenOpamProcess.Job.Opletlogfmt=OpamConsole.log"PARALLEL"fmtletslog=OpamConsole.slogexceptionAbortedmoduletypeVERTEX=sigincludeOpamStd.OrderedTypeincludeGraph.Sig.COMPARABLEwithtypet:=tendmoduletypeG=sigincludeGraph.Sig.ImoduleVertex:VERTEXwithtypet=V.tmoduleTopological:sigvalfold:(V.t->'a->'a)->t->'a->'aendvalhas_cycle:t->boolvalscc_list:t->V.tlistlistendmoduletypeSIG=sigmoduleG:Gvaliter:jobs:int->command:(pred:(G.V.t*'a)list->G.V.t->'aOpamProcess.job)->?dry_run:bool->?mutually_exclusive:(G.V.tlistlist)->G.t->unitvalmap:jobs:int->command:(pred:(G.V.t*'a)list->G.V.t->'aOpamProcess.job)->?dry_run:bool->?mutually_exclusive:(G.V.tlistlist)->G.t->(G.V.t*'a)listexceptionErrorsofG.V.tlist*(G.V.t*exn)list*G.V.tlistexceptionCyclicofG.V.tlistlistendmoduleMake(G:G)=structmoduleG=GmoduleV=G.VertexmoduleM=OpamStd.Map.Make(V)moduleS=OpamStd.Set.Make(V)letmap_keysm=M.fold(funk_s->S.addks)mS.emptyexceptionErrorsofG.V.tlist*(G.V.t*exn)list*G.V.tlistexceptionCyclicofV.tlistlistopenS.Op(* Returns a map (node -> return value) *)letaux_map~jobs~command?(dry_run=false)?(mutually_exclusive=[])g=log"Iterate over %a task(s) with %d process(es)"(slog@@G.nb_vertex@>string_of_int)gjobs;letmutually_exclusive=List.mapS.of_listmutually_exclusiveinifG.has_cyclegthen(letsccs=G.scc_listginletsccs=List.filter(function_::_::_->true|_->false)sccsinraise(Cyclicsccs));letnjobs=G.nb_vertexginletprint_status(finished:int)(running:(OpamProcess.t*'a*stringoption)M.t)=lettexts=OpamStd.List.filter_map(fun(_,_,t)->t)(M.valuesrunning)inletreclimit_widthaccrem_cols=function|[]->List.revacc|t::ts->letlen=OpamStd.Format.visual_lengthtinifts=[]&&len<rem_colsthenList.rev(t::acc)elseiflen>rem_cols-5thenList.rev(Printf.sprintf"%s+%2d"(String.make(rem_cols-4)' ')(List.lengthts+1)::acc)elselimit_width(t::acc)(rem_cols-len-1)tsinlettitle=Printf.sprintf"Processing %2d/%d:"(finished+M.cardinalrunning)njobsinlettexts=ifOpamConsole.disp_status_line()thenlimit_width[](OpamStd.Sys.terminal_columns())(title::texts)elseifOpamConsole.verbose()thentitle::textselse[]iniftexts<>[]thenOpamConsole.status_line"%s"(String.concat" "texts)in(* nslots is the number of free slots *)letrecloop(nslots:int)(* number of free slots *)(results:'bM.t)(running:(OpamProcess.t*'a*stringoption)M.t)(ready:S.t)=letmutual_exclusion_setn=List.fold_left(funaccs->ifS.memnsthenacc++selseacc)S.emptymutually_exclusiveinletrun_seq_commandnslotsreadyn=function|Doner->log"Job %a finished"(slog(string_of_int@*V.hash))n;letresults=M.addnrresultsinletrunning=M.removenrunninginifnot(M.is_emptyrunning)thenprint_status(M.cardinalresults)running;letnew_ready=S.filter(funn->List.for_all(funn->M.memnresults)(G.predgn)&¬(M.memnresults)&&S.is_empty(mutual_exclusion_setn%%map_keysrunning))(S.of_list(G.succgn)++mutual_exclusion_setn)inloop(nslots+1)resultsrunning(ready++new_ready)|Run(cmd,cont)->log"Next task in job %a: %a"(slog(string_of_int@*V.hash))n(slogOpamProcess.string_of_command)cmd;letp=ifdry_runthenOpamProcess.dry_run_backgroundcmdelseOpamProcess.run_backgroundcmdinletrunning=M.addn(p,cont,OpamProcess.text_of_commandcmd)runninginprint_status(M.cardinalresults)running;loopnslotsresultsrunningreadyinletfailnodeerror=log"Exception while computing job %a: %a"(slog(string_of_int@*V.hash))node(slogV.to_string)node;iferror=Sys.BreakthenOpamConsole.error"User interruption";letrunning=M.removenoderunningin(* Cleanup *)leterrors,pend=ifdry_runthen[node,error],[]elseM.fold(funn(p,cont,_text)(errors,pend)->trymatchOpamProcess.dontwaitpwith|None->(* process still running *)OpamProcess.interruptp;(n,Aborted)::errors,p::pend|Someresult->matchcontresultwith|Done_->errors,pend|Run_->(n,Aborted)::errors,pendwith|Unix.Unix_error_->errors,pend|e->(n,e)::errors,pend)running([node,error],[])in(tryList.iter(fun_->ignore(OpamProcess.wait_onepend))pendwithe->log"%a in sub-process cleanup"(slogPrintexc.to_string)e);(* Generate the remaining nodes in topological order *)letremaining=G.Topological.fold(funnremaining->ifM.memnresults||List.mem_assocnerrorsthenremainingelsen::remaining)g[]inraise(Errors(M.keysresults,List.reverrors,List.revremaining))inifM.is_emptyrunning&&S.is_emptyreadythenresultselseifnslots>0&¬(S.is_emptyready)then(* Start a new process *)letn=S.choosereadyinlog"Starting job %a (worker %d/%d): %a"(slog(string_of_int@*V.hash))n(jobs-nslots+1)jobs(slogV.to_string)n;letpred=G.predgninletpred=List.map(funn->n,M.findnresults)predinletcmd=trycommand~prednwithe->failneinletready=S.removenready--mutual_exclusion_setninrun_seq_command(nslots-1)readyncmdelse(* Wait for a process to end *)letprocesses=M.fold(funn(p,x,_)acc->(p,(n,x))::acc)running[]inletprocess,result=ifdry_runthenOpamProcess.dry_wait_one(List.mapfstprocesses)elsetrymatchprocesseswith|[p,_]->p,OpamProcess.waitp|_->OpamProcess.wait_one(List.mapfstprocesses)withe->fail(fst(snd(List.hdprocesses)))einletn,cont=List.assocprocessprocessesinlog"Collected task for job %a (ret:%d)"(slog(string_of_int@*V.hash))nresult.OpamProcess.r_code;letnext=trycontresultwithe->OpamProcess.cleanupresult;failneinOpamProcess.cleanupresult;run_seq_commandnslotsreadynnextinletroots=G.fold_vertex(funnroots->ifG.in_degreegn=0thenS.addnrootselseroots)gS.emptyinletr=loopjobsM.emptyM.emptyrootsinOpamConsole.clear_status();rletiter~jobs~command?dry_run?mutually_exclusiveg=ignore(aux_map~jobs~command?dry_run?mutually_exclusiveg)letmap~jobs~command?dry_run?mutually_exclusiveg=M.bindings(aux_map~jobs~command?dry_run?mutually_exclusiveg)(* Only print the originally raised exception, which should come first. Ignore
Aborted exceptions due to other commands termination, and simultaneous
exceptions in other command's continuations (unlikely as that would require
both commands to have terminated simultaneously) *)leterror_printer=function|Errors(_,(_,exc)::_,_)->Some(Printexc.to_stringexc)|_->Nonelet()=Printexc.register_printererror_printerendmoduletypeGRAPH=sigincludeGraph.Sig.IincludeGraph.Oper.Swithtypeg=tmoduleTopological:sigvalfold:(V.t->'a->'a)->t->'a->'avaliter:(V.t->unit)->t->unitendmoduleParallel:SIGwithtypeG.t=tandtypeG.V.t=vertexmoduleDot:sigvaloutput_graph:out_channel->t->unitendvaltransitive_closure:?reflexive:bool->t->unitendmoduleMakeGraph(X:VERTEX)=structmoduleVertex=XmodulePG=Graph.Imperative.Digraph.ConcreteBidirectional(Vertex)moduleTopological=Graph.Topological.Make(PG)moduleTraverse=Graph.Traverse.Dfs(PG)moduleComponents=Graph.Components.Make(PG)moduleParallel=Make(structincludePGmoduleVertex=VertexmoduleTopological=TopologicalincludeTraverseincludeComponentsend)moduleDot=Graph.Graphviz.Dot(structletedge_attributes_=[]letdefault_edge_attributes_=[]letget_subgraph_=Noneletvertex_attributes_=[]letvertex_namev=Printf.sprintf"\"%s\""(Vertex.to_stringv)letdefault_vertex_attributes_=[]letgraph_attributes_=[]includePGend)includePGincludeGraph.Oper.I(PG)lettransitive_closure?reflexiveg=ignore(add_transitive_closure?reflexiveg)end(* Simple polymorphic implem on lists when we don't need full graphs.
We piggy-back on the advanced implem using an array and an int-graph *)moduleIntGraph=MakeGraph(structtypet=intletcomparexy=x-ylethashx=xletequalxy=x=yletto_string=string_of_intletto_jsonx=`Float(float_of_intx)end)letflat_graph_of_arraya=letg=IntGraph.create()inArray.iteri(funi_->IntGraph.add_vertexgi)a;gexceptionErrors=IntGraph.Parallel.Errorsletiter~jobs~command?dry_runl=leta=Array.of_listlinletg=flat_graph_of_arrayainletcommand~pred:_i=commanda.(i)inignore(IntGraph.Parallel.iter~jobs~command?dry_rung)letmap~jobs~command?dry_runl=leta=Array.of_listlinletg=flat_graph_of_arrayainletcommand~pred:_i=commanda.(i)inletr=IntGraph.Parallel.aux_map~jobs~command?dry_runginletrecmklistaccn=ifn<0thenaccelsemklist(IntGraph.Parallel.M.findnr::acc)(n-1)inmklist[](Array.lengtha-1)letreduce~jobs~command~merge~nil?dry_runl=leta=Array.of_listlinletg=flat_graph_of_arrayainletcommand~pred:_i=commanda.(i)inletr=IntGraph.Parallel.aux_map~jobs~command?dry_runginIntGraph.Parallel.M.fold(fun_->merge)rnil