12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091(**
* Copyright (c) 2015, Facebook, Inc.
* All rights reserved.
*
* This source code is licensed under the BSD-style license found in the
* LICENSE file in the "hack" directory of this source tree. An additional grant
* of patent rights can be found in the PATENTS file in the same directory.
*
*)openHack_coretype'anextlist='alistHack_bucket.nexttype'abucket='aHack_bucket.bucket=|Jobof'a|Wait|Doneletsingle_threaded_calljobmergeneutralnext=letx=ref(next())inletacc=refneutralin(* This is a just a sanity check that the job is serializable and so
* that the same code will work both in single threaded and parallel
* mode.
*)let_=Marshal.to_stringjob[Marshal.Closures]inwhile!x<>Donedomatch!xwith|Wait->(* this state should never be reached in single threaded mode, since
there is no hope for ever getting out of this state *)failwith"stuck!"|Jobl->letres=jobneutrallinacc:=mergeres!acc;x:=next()|Done->()done;!accletmulti_threaded_call(typea)(typeb)(typec)workers(job:c->a->b)(merge:b->c->c)(neutral:c)(next:aHack_bucket.next)=letrecdispatchworkershandlesacc=(* 'worker' represents available workers. *)(* 'handles' represents pendings jobs. *)(* 'acc' are the accumulated results. *)matchworkerswith|[]whenhandles=[]->acc|[]->(* No worker available: wait for some workers to finish. *)collect[]handlesacc|worker::workers->(* At least one worker is available... *)matchnext()with|Wait->collect(worker::workers)handlesacc|Done->(* ... but no more job to be distributed, let's collect results. *)dispatch[]handlesacc|Jobbucket->(* ... send a job to the worker.*)lethandle=Worker.callworker(funxl->jobneutralxl)bucketindispatchworkers(handle::handles)accandcollectworkershandlesacc=let{Worker.readys;waiters}=Worker.selecthandlesinletworkers=List.map~f:Worker.get_workerreadys@workersin(* Collect the results. *)letacc=List.fold_left~f:(funacch->merge(Worker.get_resulth)acc)~init:accreadysin(* And continue.. *)dispatchworkerswaitersaccindispatchworkers[]neutralletcallworkers~job~merge~neutral~next=matchworkerswith|None->single_threaded_calljobmergeneutralnext|Someworkers->multi_threaded_callworkersjobmergeneutralnextletnextworkers=Hack_bucket.make~num_workers:(matchworkerswithSomew->List.lengthw|None->1)