1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
open Printf
module Fn = Filename
let debug = ref false
let core_pinning = ref false
let enable_core_pinning () =
core_pinning := true
let disable_core_pinning () =
core_pinning := false
exception End_of_input
module Shm = struct
let init () =
Unix.(socketpair PF_UNIX SOCK_DGRAM 0)
let unmarshal_from_file fn =
let input = open_in_bin fn in
let res = Marshal.from_channel input in
close_in input;
res
let marshal_to_file fn v =
let out = open_out_bin fn in
Marshal.to_channel out v [Marshal.No_sharing];
close_out out
let rec send_loop sock buff n =
try
let sent = Unix.send sock buff 0 n [] in
assert(sent = n)
with Unix.Unix_error(ENOBUFS, _, _) ->
let _ = Unix.select [] [] [] 0.001 in
send_loop sock buff n
let raw_send sock str =
let n = String.length str in
let buff = Bytes.unsafe_of_string str in
send_loop sock buff n
let send fn queue to_send =
marshal_to_file fn to_send;
raw_send queue fn
let raw_receive sock buff =
let n = Bytes.length buff in
let received = Unix.recv sock buff 0 n [] in
assert(received > 0);
Bytes.sub_string buff 0 received
let receive queue buff =
let fn = raw_receive queue buff in
if fn = "EOF" then
raise End_of_input
else
let res = unmarshal_from_file fn in
Sys.remove fn;
res
end
let feed_them_all csize ncores demux queue =
let in_count = ref 0 in
let prfx = Filename.temp_file "iparany_" "" in
let to_send = ref [] in
try
while true do
for _ = 1 to csize do
to_send := (demux ()) :: !to_send
done;
let fn = sprintf "%s_%d" prfx !in_count in
Shm.send fn queue !to_send;
to_send := [];
incr in_count
done
with End_of_input ->
begin
(if !to_send <> [] then
let fn = sprintf "%s_%d" prfx !in_count in
Shm.send fn queue !to_send);
for _ = 1 to ncores do
Shm.raw_send queue "EOF"
done;
Sys.remove prfx;
Unix.close queue
end
let go_to_work jobs_queue work results_queue =
let out_count = ref 0 in
let prfx = Filename.temp_file "oparany_" "" in
try
let buff = Bytes.create 80 in
while true do
let xs = Shm.receive jobs_queue buff in
let ys = List.rev_map work xs in
let fn = sprintf "%s_%d" prfx !out_count in
Shm.send fn results_queue ys;
incr out_count
done
with End_of_input ->
begin
Sys.remove prfx;
Shm.raw_send results_queue "EOF";
Unix.close results_queue
end
let fork_out f =
match Unix.fork () with
| -1 -> failwith "Parany.fork_out: fork failed"
| 0 -> let () = f () in exit 0
| _pid -> ()
let run ~verbose ~csize ~nprocs ~demux ~work ~mux =
debug := verbose;
if nprocs <= 1 then
try
while true do
mux (work (demux ()))
done
with End_of_input -> ()
else
begin
assert(csize >= 1);
let max_cores = Cpu.numcores () in
assert(nprocs <= max_cores);
let jobs_in, jobs_out = Shm.init () in
let res_in, res_out = Shm.init () in
Gc.compact ();
fork_out (fun () -> feed_them_all csize nprocs demux jobs_in);
for worker_rank = 0 to nprocs - 1 do
fork_out (fun () ->
if !core_pinning then Cpu.setcore worker_rank;
go_to_work jobs_out work res_in
)
done;
let finished = ref 0 in
let buff = Bytes.create 80 in
while !finished < nprocs do
try
while true do
let xs = Shm.receive res_out buff in
List.iter mux xs
done
with End_of_input ->
incr finished
done;
List.iter Unix.close [jobs_in; jobs_out; res_in; res_out]
end
module Parmap = struct
let tail_rec_map f l =
List.rev (List.rev_map f l)
let parmap ~ncores ?(csize = 1) f l =
if ncores <= 1 then tail_rec_map f l
else
let input = ref l in
let demux () = match !input with
| [] -> raise End_of_input
| x :: xs -> (input := xs; x) in
let output = ref [] in
let mux x =
output := x :: !output in
run ~verbose:false ~csize ~nprocs:ncores ~demux ~work:f ~mux;
!output
let pariter ~ncores ?(csize = 1) f l =
if ncores <= 1 then List.iter f l
else
let input = ref l in
let demux () = match !input with
| [] -> raise End_of_input
| x :: xs -> (input := xs; x) in
run ~verbose:false ~csize ~nprocs:ncores ~demux ~work:f ~mux:ignore
let parfold ~ncores ?(csize = 1) f g init l =
if ncores <= 1 then List.fold_left g init (tail_rec_map f l)
else
let input = ref l in
let demux () = match !input with
| [] -> raise End_of_input
| x :: xs -> (input := xs; x) in
let output = ref init in
let mux x =
output := g !output x in
run ~verbose:false ~csize ~nprocs:ncores ~demux ~work:f ~mux;
!output
end