Source file oUnitRunnerProcesses.ml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
(** Use processes to run several tests in parallel.
*
* Run processes that handle running tests. The processes read test, execute
* it, and communicate back to the master the log.
*
* This need to be done in another process because OCaml Threads are not truly
* running in parallel. Moreover we cannot use Unix.fork because it's not
* portable
*)
open Unix
open OUnitRunner.GenericWorker
let unix_fork = ref Unix.fork
let make_channel
shard_id
string_of_read_message
string_of_written_message
fd_read
fd_write =
let () =
set_nonblock fd_read;
set_close_on_exec fd_read;
set_close_on_exec fd_write
in
let chn_write = out_channel_of_descr fd_write in
let really_read fd str =
let off = ref 0 in
let read = ref 0 in
while !read < Bytes.length str do
try
let one_read =
Unix.read fd str !off (Bytes.length str - !off)
in
read := !read + one_read;
off := !off + one_read
with Unix_error(EAGAIN, _, _) ->
()
done;
str
in
let = Bytes.create Marshal.header_size in
let send_data msg =
Marshal.to_channel chn_write msg [];
Stdlib.flush chn_write
in
let receive_data () =
try
let data_size = Marshal.data_size (really_read fd_read header_str) 0 in
let data_str = really_read fd_read (Bytes.create data_size) in
let msg =
Marshal.from_string
(Bytes.unsafe_to_string (Bytes.cat header_str data_str))
0
in
msg
with Failure(msg) ->
OUnitUtils.failwithf "Communication error with worker processes: %s" msg
in
let close () =
close_out chn_write;
in
wrap_channel
shard_id
string_of_read_message
string_of_written_message
{
send_data = send_data;
receive_data = receive_data;
close = close
}
let processes_grace_period =
OUnitConf.make_float
"processes_grace_period"
5.0
"Delay to wait for a process to stop."
let processes_kill_period =
OUnitConf.make_float
"processes_kill_period"
5.0
"Delay to wait for a process to stop after killing it."
let rec select_no_interrupt read_descrs write_descrs except_descrs timeout =
if timeout < 0.0 then begin
[], [], []
end else begin
try
Unix.select read_descrs write_descrs except_descrs 0.1
with Unix.Unix_error (Unix.EINTR, "select", "") ->
select_no_interrupt
read_descrs write_descrs except_descrs (timeout -. 0.1)
end
let create_worker ~shard_id ~master_id ~worker_log_file conf map_test_cases =
let safe_close fd = try close fd with Unix_error _ -> () in
let pipe_read_from_worker, pipe_write_to_master = Unix.pipe () in
let pipe_read_from_master, pipe_write_to_worker = Unix.pipe () in
match !unix_fork () with
| 0 ->
let () =
safe_close pipe_read_from_worker;
safe_close pipe_write_to_worker;
()
in
let channel =
make_channel
shard_id
string_of_message_to_worker
string_of_message_from_worker
pipe_read_from_master
pipe_write_to_master
in
main_worker_loop
conf
~yield:ignore
channel
~shard_id
map_test_cases
~worker_log_file;
channel.close ();
safe_close pipe_read_from_master;
safe_close pipe_write_to_master;
exit 0
| pid ->
let channel =
make_channel
master_id
string_of_message_from_worker
string_of_message_to_worker
pipe_read_from_worker
pipe_write_to_worker
in
let rstatus = ref None in
let msg_of_process_status status =
if status = WEXITED 0 then
None
else
Some (OUnitUtils.string_of_process_status status)
in
let is_running () =
match !rstatus with
| None ->
let pid, status = waitpid [WNOHANG] pid in
if pid <> 0 then begin
rstatus := Some status;
false
end else begin
true
end
| Some _ ->
false
in
let close_worker () =
let rec wait_end timeout =
if timeout < 0.0 then begin
false, None
end else begin
if is_running () then
let _, _, _ = select_no_interrupt [] [] [] 0.1 in
wait_end (timeout -. 0.1)
else
match !rstatus with
| Some status -> true, msg_of_process_status status
| None -> true, None
end
in
let ended, msg_opt =
channel.close ();
safe_close pipe_read_from_worker;
safe_close pipe_write_to_worker;
List.fold_left
(fun (ended, msg_opt) signal ->
if ended then begin
ended, msg_opt
end else begin
kill pid signal;
wait_end (processes_kill_period conf)
end)
(wait_end (processes_grace_period conf))
[15 ; 9 ]
in
if ended then
msg_opt
else
Some (Printf.sprintf "unable to kill process %d" pid)
in
{
channel = channel;
close_worker = close_worker;
select_fd = pipe_read_from_worker;
shard_id = shard_id;
is_running = is_running;
}
let workers_waiting ~timeout workers =
let workers_fd_lst =
List.rev_map (fun worker -> worker.select_fd) workers
in
let workers_fd_waiting_lst, _, _ =
select_no_interrupt workers_fd_lst [] [] timeout
in
List.filter
(fun workers -> List.memq workers.select_fd workers_fd_waiting_lst)
workers
let init () =
if Sys.os_type = "Unix" then
match Sys.backend_type with
| Native | Bytecode ->
OUnitRunner.register "processes" 100
(runner create_worker workers_waiting)
| Other _ -> ()