Source file Multiprocess.ml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
open Printf
module Client = struct
type worker = {
id : int;
name : string;
process : in_channel * out_channel;
std_in_ch : in_channel;
std_in_fd : Unix.file_descr;
std_out_ch : out_channel;
std_input_buffer : Buffer.t;
mutable running : bool;
}
type t = {
array : worker array;
read : t -> (worker * Msg_from_worker.t) option;
create_worker : int -> worker;
}
let iter_workers workers func = Array.iter func workers.array
let fds_of_running_workers (workers : t) =
Array.fold_right (fun x acc ->
if x.running then x.std_in_fd :: acc else acc)
workers.array []
let show_process_status (x : Unix.process_status) =
match x with
| WEXITED n -> sprintf "WEXITED %i" n
| WSIGNALED n -> sprintf "WSIGNALED %i" n
| WSTOPPED n -> sprintf " %i" n
let close_worker (x : worker) =
if x.running then (
x.running <- false;
Debug.log (fun () -> sprintf "close worker %s" x.name);
let t1 = Unix.gettimeofday () in
let pid = Unix.process_pid x.process in
Unix.kill pid Sys.sigkill;
let status = Unix.close_process x.process in
let t2 = Unix.gettimeofday () in
Debug.log (fun () ->
sprintf "closed worker %s: %s (took %.6fs)"
x.name (show_process_status status) (t2 -. t1)
)
)
let close (workers : t) = Array.iter close_worker workers.array
let replace_worker workers worker =
close_worker worker;
let new_worker = workers.create_worker worker.id in
workers.array.(worker.id) <- new_worker;
new_worker
let take_lines_from_buffer buf =
let lines = Buffer.contents buf |> String.split_on_char '\n' in
let complete_lines, leftover =
match lines |> List.rev with
| leftover :: complete -> (List.rev complete, leftover)
| [] -> assert false
in
Buffer.clear buf;
Buffer.add_string buf leftover;
complete_lines
let flush_input_buffer worker buf queue =
let lines =
take_lines_from_buffer buf
|> List.filter (function
| "" -> false
| _ -> true)
in
List.iter
(fun line -> Queue.add (worker, Msg_from_worker.of_string line) queue)
lines
let read_from_worker (x : worker)
(incoming_message_queue : (worker * Msg_from_worker.t) Queue.t) =
let buf_len = 1024 in
let buf = Bytes.create buf_len in
let bytes_read = input x.std_in_ch buf 0 buf_len in
if bytes_read = 0 then (
Debug.log (fun () ->
sprintf "received 0 bytes from worker %s, closing" x.name);
close_worker x)
else (
Debug.log (fun () ->
sprintf "received %i bytes from worker %s" bytes_read x.name);
Buffer.add_subbytes x.std_input_buffer buf 0 bytes_read;
flush_input_buffer x x.std_input_buffer incoming_message_queue)
let kill_and_replace_timed_out_workers
~feed_or_terminate_worker
~get_timed_out_workers workers =
let timed_out_workers : (worker * (unit -> unit)) list =
get_timed_out_workers () in
List.iter (fun (worker, on_worker_termination) ->
let new_worker = replace_worker workers worker in
on_worker_termination ();
feed_or_terminate_worker new_worker;
) timed_out_workers
let create_fd_worker_table () =
Hashtbl.create 100
let make_poller
?(max_poll_interval_secs = 0.1)
~feed_or_terminate_worker
~fd_worker_tbl ~get_timed_out_workers () =
let incoming_message_queue = Queue.create () in
let rec poll (workers : t) =
match Queue.take_opt incoming_message_queue with
| Some msg -> Some msg
| None ->
kill_and_replace_timed_out_workers
~feed_or_terminate_worker
~get_timed_out_workers workers;
match fds_of_running_workers workers with
| [] -> None
| in_fds -> (
Debug.log (fun () ->
sprintf "wait for a message from one of %i worker(s)"
(List.length in_fds));
match Unix.select in_fds [] [] max_poll_interval_secs with
| in_fd :: _, _, _ ->
let worker =
match Hashtbl.find_opt fd_worker_tbl in_fd with
| None -> assert false
| Some x -> x
in
Debug.log (fun () ->
sprintf "receiving data from worker %s" worker.name);
read_from_worker worker incoming_message_queue;
poll workers
| [], _, _ ->
if max_poll_interval_secs >= 0. then
poll workers
else (
Debug.log (fun () ->
"'select' returned nothing. Did a worker exit \
prematurely?");
close workers;
None)
)
in
poll
let create_worker
~fd_worker_tbl
~num_workers
~original_argv
~test_list_checksum
worker_id =
let worker_name = sprintf "%d/%d" (worker_id + 1) num_workers in
let program_name =
Sys.argv.(0)
in
let argv =
Array.concat
[
original_argv;
[| "--worker"; "--test-list-checksum"; test_list_checksum |];
]
in
Debug.log (fun () ->
sprintf "create worker %s with command: %s" worker_name
(argv |> Array.to_list |> String.concat " "));
let ((std_in_ch, std_out_ch) as process) =
Unix.open_process_args program_name argv
in
let std_in_fd = Unix.descr_of_in_channel std_in_ch in
let worker = {
id = worker_id;
name = worker_name;
process;
std_in_ch;
std_in_fd;
std_out_ch;
std_input_buffer = Buffer.create 200;
running = true;
} in
Hashtbl.replace fd_worker_tbl std_in_fd worker;
worker
let create
~feed_or_terminate_worker
~get_timed_out_workers
~num_workers
~original_argv
~test_list_checksum
() =
Debug.log (fun () -> sprintf "create %i worker(s)" num_workers);
let fd_worker_tbl = create_fd_worker_table () in
let create_worker =
create_worker ~fd_worker_tbl ~num_workers ~original_argv ~test_list_checksum in
let worker_array = Array.init num_workers create_worker in
let read =
make_poller
~feed_or_terminate_worker ~fd_worker_tbl ~get_timed_out_workers () in
{ array = worker_array; read; create_worker }
let read (x : t) = x.read x
let write worker msg =
fprintf worker.std_out_ch "%s\n%!" (Msg_from_master.to_string msg)
let start_test ~on_start_test worker test_id test =
on_start_test (Some worker) test;
write worker (Start_test test_id)
let feed_worker ~get_test_id ~on_start_test test_queue worker =
match Queue.take_opt test_queue with
| Some test -> start_test ~on_start_test worker (get_test_id test) test
| None -> close_worker worker
let run_tests_in_workers
~argv
~get_test_id
~get_timed_out_workers
~num_workers
~on_end_test
~on_start_test
~test_list_checksum (tests : 'test list) =
let test_queue = tests |> List.to_seq |> Queue.of_seq in
let feed_or_terminate_worker worker =
feed_worker ~get_test_id ~on_start_test test_queue worker
in
let workers =
create
~feed_or_terminate_worker
~get_timed_out_workers
~num_workers
~original_argv:argv
~test_list_checksum ()
in
let get_test =
let tbl = Hashtbl.create (2 * List.length tests) in
List.iter (fun test -> Hashtbl.add tbl (get_test_id test) test) tests;
fun test_id ->
try Hashtbl.find tbl test_id with
| Not_found ->
failwith
(sprintf "Internal error: received invalid test ID from worker: %S"
test_id)
in
iter_workers workers feed_or_terminate_worker;
let rec loop () =
match read workers with
| None ->
Ok ()
| Some (worker, msg) ->
(match msg with
| End_test test_id ->
let test = get_test test_id in
on_end_test test;
feed_or_terminate_worker worker;
loop ()
| Error msg ->
let msg =
sprintf "error in worker %s: %s" worker.name msg
in
close workers;
Error msg
| Junk line ->
printf "[worker %s] %s\n%!" worker.name line;
loop ())
in
loop ()
end
module Server = struct
let read () =
let line =
try Some (input_line stdin) with
| End_of_file -> None
in
Option.map Msg_from_master.of_string line
let write msg =
printf "\n%s\n%!" (Msg_from_worker.to_string msg)
let fatal_error str =
write (Error str);
exit 1
end