123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293(** Copyright (c) 2016-present, Facebook, Inc.
Modified work Copyright (c) 2018-2019 Rijnard van Tonder
This source code is licensed under the MIT license found in the
LICENSE file in the root directory of this source tree. *)openHack_parallel_intf.StdmoduleDaemon=Daemontypet={workers:Worker.tlist;number_of_workers:int;bucket_multiplier:int;}letentry=Worker.register_entry_point~restore:(fun_->())letcreate?(number_of_workers=1)?(bucket_multiplier=10)()=letheap_handle=Memory.get_heap_handle()inletworkers=Hack_parallel_intf.Std.Worker.make~saved_state:()~entry~nbr_procs:number_of_workers~heap_handle~gc_control:Memory.worker_garbage_controlinMemory.connectheap_handle;{workers;number_of_workers;bucket_multiplier}letmap_reduce{workers;number_of_workers;bucket_multiplier}?bucket_size~init~map~reducework=letnumber_of_workers=matchbucket_sizewith|Someexact_sizewhenexact_size>0->(List.lengthwork/exact_size)+1|_->letbucket_multiplier=Core_kernel.Int.minbucket_multiplier(1+(List.lengthwork/400))innumber_of_workers*bucket_multiplierinMultiWorker.call(Someworkers)~job:map~merge:reduce~neutral:init~next:(Bucket.make~num_workers:number_of_workerswork)letiterscheduler~fwork=map_reducescheduler~init:()~map:(fun_work->fwork)~reduce:(fun__->())workletsingle_job{workers;_}~fwork=letrecwait_until_readyhandle=let{Worker.readys;_}=Worker.select[handle]inmatchreadyswith|[]->wait_until_readyhandle|ready::_->readyinmatchworkerswith|worker::_->Worker.callworkerfwork|>wait_until_ready|>Worker.get_result|[]->failwith"This service contains no workers"letmock()=Memory.get_heap_handle()|>ignore;{workers=[];number_of_workers=1;bucket_multiplier=1}letdestroy_=Worker.killall()