1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
open Printf
module A = Array
module Fn = Filename
module Ht = Hashtbl
exception End_of_input
module Shm = struct
let init () =
Unix.(socketpair PF_UNIX SOCK_DGRAM 0)
let unmarshal_from_file fn =
let input = open_in_bin fn in
let res = Marshal.from_channel input in
close_in input;
res
let marshal_to_file fn v =
let out = open_out_bin fn in
Marshal.to_channel out v Marshal.[No_sharing; Closures];
close_out out
let rec send_loop sock buff n =
try
let sent = Unix.send sock buff 0 n [] in
assert(sent = n)
with Unix.Unix_error(ENOBUFS, _, _) ->
let _ = Unix.select [] [] [] 0.001 in
send_loop sock buff n
let raw_send sock str =
let n = String.length str in
let buff = Bytes.unsafe_of_string str in
send_loop sock buff n
let send fn queue to_send =
marshal_to_file fn to_send;
raw_send queue fn
let raw_receive sock buff =
let n = Bytes.length buff in
let received = Unix.recv sock buff 0 n [] in
assert(received > 0);
Bytes.sub_string buff 0 received
let receive queue buff =
let fn = raw_receive queue buff in
if fn = "EOF" then
raise End_of_input
else
let res = unmarshal_from_file fn in
Sys.remove fn;
res
end
let feed_them_all tmp csize ncores demux queue =
let in_count = ref 0 in
let prfx = Filename.temp_file ~temp_dir:tmp "iparany_" "" in
let to_send = ref [] in
try
while true do
for _ = 1 to csize do
to_send := (demux ()) :: !to_send
done;
let fn = sprintf "%s_%d" prfx !in_count in
Shm.send fn queue !to_send;
to_send := [];
incr in_count
done
with End_of_input ->
begin
(if !to_send <> [] then
let fn = sprintf "%s_%d" prfx !in_count in
Shm.send fn queue !to_send);
for _ = 1 to ncores do
Shm.raw_send queue "EOF"
done;
Sys.remove prfx;
Unix.close queue
end
let go_to_work prfx jobs_queue work results_queue =
try
let out_count = ref 0 in
let buff = Bytes.create 80 in
while true do
let xs = Shm.receive jobs_queue buff in
let ys = List.rev_map work xs in
let fn = sprintf "%s_%d" prfx !out_count in
Shm.send fn results_queue ys;
incr out_count
done
with End_of_input ->
()
let fork_out f =
match Unix.fork () with
| -1 -> failwith "Parany.fork_out: fork failed"
| 0 -> let () = f () in exit 0
| _pid -> ()
let idemux (demux: unit -> 'a) =
let demux_count = ref 0 in
function () ->
let res = (!demux_count, demux ()) in
incr demux_count;
res
let iwork (work: 'a -> 'b) ((i, x): int * 'a): int * 'b =
(i, work x)
let imux (mux: 'b -> unit) =
let mux_count = ref 0 in
let wait_list = Ht.create 11 in
function (i, res) ->
if !mux_count = i then
begin
mux res;
incr mux_count;
if Ht.length wait_list > 0 then
try
while true do
let next = Ht.find wait_list !mux_count in
Ht.remove wait_list !mux_count;
mux next;
incr mux_count
done
with Not_found -> ()
end
else
Ht.add wait_list i res
let my_rank = ref (-1)
let get_rank () =
!my_rank
let run ?(init = fun (_rank: int) -> ()) ?(finalize = fun () -> ())
?(preserve = false) ?(core_pin = false) ?(csize = 1) nprocs
~demux ~work ~mux =
let demux_work_mux (type a b)
~(demux: unit -> a) ~(work: a -> b) ~(mux: b -> unit): unit =
let jobs_in, jobs_out = Shm.init () in
let res_in, res_out = Shm.init () in
flush_all ();
Gc.compact ();
let tmp = Filename.get_temp_dir_name () in
fork_out (fun () -> feed_them_all tmp csize nprocs demux jobs_in);
for worker_rank = 0 to nprocs - 1 do
my_rank := worker_rank;
fork_out (fun () ->
init worker_rank;
at_exit finalize;
if core_pin then Cpu.setcore worker_rank;
let prfx = Filename.temp_file ~temp_dir:tmp "oparany_" "" in
at_exit (fun () ->
Shm.raw_send res_in "EOF";
Unix.close res_in;
Sys.remove prfx
);
go_to_work prfx jobs_out work res_in
)
done;
let finished = ref 0 in
let buff = Bytes.create 80 in
while !finished < nprocs do
try
while true do
let xs = Shm.receive res_out buff in
List.iter mux xs
done
with End_of_input -> incr finished
done;
List.iter Unix.close [jobs_in; jobs_out; res_in; res_out]
in
if nprocs <= 1 then
try
while true do
mux (work (demux ()))
done
with End_of_input -> ()
else
begin
assert(csize >= 1);
let max_cores = Cpu.numcores () in
assert(nprocs <= max_cores);
(if preserve then
demux_work_mux
~demux:(idemux demux) ~work:(iwork work) ~mux:(imux mux)
else
demux_work_mux ~demux ~work ~mux
);
end
module Parmap = struct
let tail_rec_map f l =
List.rev (List.rev_map f l)
let tail_rec_mapi f l =
let i = ref 0 in
let res =
List.rev_map (fun x ->
let j = !i in
let y = f j x in
incr i;
y
) l in
List.rev res
let parmap ?(init = fun (_rank: int) -> ()) ?(finalize = fun () -> ())
?(preserve = false) ?(core_pin = false) ?(csize = 1) ncores f l =
if ncores <= 1 then tail_rec_map f l
else
let input = ref l in
let demux () = match !input with
| [] -> raise End_of_input
| x :: xs -> (input := xs; x) in
let output = ref [] in
let mux x =
output := x :: !output in
run ~init ~finalize ~preserve ~core_pin ~csize ncores
~demux ~work:f ~mux;
if preserve then
List.rev !output
else
!output
let parmapi ?(init = fun (_rank: int) -> ()) ?(finalize = fun () -> ())
?(preserve = false) ?(core_pin = false) ?(csize = 1) ncores f l =
if ncores <= 1 then tail_rec_mapi f l
else
let input = ref l in
let i = ref 0 in
let demux () =
match !input with
| [] -> raise End_of_input
| x :: xs ->
begin
let j = !i in
input := xs;
let res = (j, x) in
incr i;
res
end in
let output = ref [] in
let f' (i, x) = f i x in
let mux x =
output := x :: !output in
run ~init ~finalize ~preserve ~core_pin ~csize ncores
~demux ~work:f' ~mux;
if preserve then
List.rev !output
else
!output
let pariter ?(init = fun (_rank: int) -> ()) ?(finalize = fun () -> ())
?(preserve = false) ?(core_pin = false) ?(csize = 1) ncores f l =
if ncores <= 1 then List.iter f l
else
let input = ref l in
let demux () = match !input with
| [] -> raise End_of_input
| x :: xs -> (input := xs; x) in
run ~init ~finalize ~preserve ~core_pin ~csize ncores
~demux ~work:f ~mux:ignore
let parfold ?(init = fun (_rank: int) -> ()) ?(finalize = fun () -> ())
?(preserve = false) ?(core_pin = false) ?(csize = 1) ncores
f g init_acc l =
if ncores <= 1 then
List.fold_left (fun acc x -> g acc (f x)) init_acc l
else
let input = ref l in
let demux () = match !input with
| [] -> raise End_of_input
| x :: xs -> (input := xs; x) in
let output = ref init_acc in
let mux x =
output := g !output x in
run ~init ~finalize ~preserve ~core_pin ~csize ncores
~demux ~work:f ~mux;
!output
let array_parmap
?(init = fun (_rank: int) -> ())
?(finalize = fun () -> ())
?(core_pin = false) ncores f init_acc a =
let n = A.length a in
let res = A.make n init_acc in
run ~init ~finalize
~preserve:false
~core_pin ~csize:1 ncores
~demux:(
let in_count = ref 0 in
fun () ->
if !in_count = n then
raise End_of_input
else
let i = !in_count in
incr in_count;
i)
~work:(fun i -> (i, f (A.unsafe_get a i)))
~mux:(fun (i, y) -> A.unsafe_set res i y);
res
end