1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
(** Copyright (c) 2016-present, Facebook, Inc.
Modified work Copyright (c) 2018-2019 Rijnard van Tonder
This source code is licensed under the MIT license found in the
LICENSE file in the root directory of this source tree. *)
open Hack_parallel_intf.Std
module Daemon = Daemon
type t = {
workers: Worker.t list;
number_of_workers: int;
bucket_multiplier: int;
}
let entry =
Worker.register_entry_point ~restore:(fun _ -> ())
let create
?(number_of_workers = 1)
?(bucket_multiplier = 10)
() =
let heap_handle = Memory.get_heap_handle () in
let workers =
Hack_parallel_intf.Std.Worker.make
~saved_state:()
~entry
~nbr_procs:number_of_workers
~heap_handle
~gc_control:Memory.worker_garbage_control
in
Memory.connect heap_handle;
{ workers; number_of_workers; bucket_multiplier }
let map_reduce
{ workers; number_of_workers; bucket_multiplier }
?bucket_size
~init
~map
~reduce
work =
let number_of_workers =
match bucket_size with
| Some exact_size when exact_size > 0 ->
(List.length work / exact_size) + 1
| _ ->
let bucket_multiplier = Core_kernel.Int.min bucket_multiplier (1 + (List.length work / 400)) in
number_of_workers * bucket_multiplier
in
MultiWorker.call
(Some workers)
~job:map
~merge:reduce
~neutral:init
~next:(Bucket.make ~num_workers:number_of_workers work)
let iter scheduler ~f work =
map_reduce
scheduler
~init:()
~map:(fun _ work -> f work)
~reduce:(fun _ _ -> ())
work
let single_job { workers; _ } ~f work =
let rec wait_until_ready handle =
let { Worker.readys; _ } = Worker.select [handle] in
match readys with
| [] -> wait_until_ready handle
| ready :: _ -> ready
in
match workers with
| worker::_ ->
Worker.call worker f work
|> wait_until_ready
|> Worker.get_result
| [] ->
failwith "This service contains no workers"
let mock () =
Memory.get_heap_handle () |> ignore;
{ workers = []; number_of_workers = 1; bucket_multiplier = 1 }
let destroy _ =
Worker.killall ()