123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139openCommon(*****************************************************************************)(* Prelude *)(*****************************************************************************)(*
* Some simple helpers around harrop's invoke() to do some
* map-reduce like parallel programming using fork.
*
* related work:
* - https://gitorious.org/parmap, very similar, but also contains
* a master/workers model which helps do some form of load balancing
*
* - https://github.com/MyLifeLabs/nproc, but it uses lwt
*
* - distribution.ml which use MPI and so can leverage multiple
* machines (but MPI turned out to be quite unstable in my experience)
*
* - julien's MultiWorker but works only in native code
*
* less: could be useful to autodetect a good number based on the
* number of cores and available memory like we do in our libphutil 'Future'
* library.
*)(*****************************************************************************)(* Globals *)(*****************************************************************************)letbacktrace_when_exn=reftrue(*****************************************************************************)(* Building block *)(*****************************************************************************)(* src: harrop article on fork-based parallelism
* returns a futur
*)letinvoke2fx=letinput,output=Unix.pipe()inmatchUnix.fork()with(* error, could not create process, well compute now then *)|-1->letv=fxin(fun()->v)(* child *)|0->Unix.closeinput;letoutput=Unix.out_channel_of_descroutputinMarshal.to_channeloutput(try`Res(fx)withe->if!backtrace_when_exnthenbeginletbacktrace=Printexc.get_backtrace()inpr2(spf"Exception in invoked func: %s"(Common.exn_to_se));pr2backtrace;end;`Exne)[];close_outoutput;exit0(* parent *)|pid->Unix.closeoutput;letinput=Unix.in_channel_of_descrinputinfun()->letv=Marshal.from_channelinputinignore(Unix.waitpid[]pid);close_ininput;matchvwith|`Resx->x|`Exne->raiseeletinvokeab=Common.profile_code"Parallel.invoke"(fun()->invoke2ab)letparallel_mapfxs=(* create all the fork *)letfutures=List.map(invokef)xsin(* sync, get all parents to waitpid *)List.map(funfutur->futur())futures(*****************************************************************************)(* Poor's man job scheduler *)(*****************************************************************************)type'ajob=unit->'atype'ajobs=('ajob)list(*
* This is a very naive job scheduler. One limitation is that before
* launching another round we must wait for the slowest process. A
* set of workers and a master model would be more efficient by always
* feeding processors. A partial fix is to give a tasks number that
* is quite superior to the actual number of processors.
*
* This will create (List.length xs) forks, but n at a time, in multiple
* rounds, where n=tasks.
*
* I use it for now to //ize the code coverage computation for PHP.
*)letmap_jobs~tasksxs=iftasks=1thenList.map(funjob->job())xselseletxxs=Common2.pack_safetasksxsinxxs|>List.map(funxs->(* do in parallel a batch of job *)parallel_map(funjob->job())xs)|>List.flatten(*
* For some computation, it doesn't help to process every item in a
* separate process because the cost of fork is higher than the
* computation cost. But it can still makes sense to group the files
* into batches and process them in parallel.
*
* This will create (tasks) forks.
*
* I use it for now to //ize the abstract-interpreter-based callgraph
* generation.
*
* Thx to Michal burger for the initial idea.
*)letmap_batch_jobs~tasksxs=iftasks=1thenList.map(funjob->job())xselse(* todo? a double pack ? because the initial pack/chunks can
* be computationaly "inbalanced".
*)letxxs=Common2.chunkstasksxsinletjobs=xxs|>List.map(funxs->(fun()->xs|>List.map(funjob->job())))inparallel_map(funjob->job())jobs|>List.flatten