Source file picos_io_select.ml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
open Picos
open Picos_std_event
let handle_sigchld_bit = 0b001
let select_thread_running_on_main_domain_bit = 0b010
let ignore_sigpipe_bit = 0b100
type config = {
mutable bits : int;
mutable intr_sig : int;
mutable intr_sigs : int list;
}
let config = { bits = 0; intr_sig = 0; intr_sigs = [] }
type return =
| Return : {
value : 'a;
computation : 'a Computation.t;
mutable alive : bool;
}
-> return
(** We use random numbers as keys for the awaiters. *)
module RandomInt = struct
type t = int
let equal = Int.equal
let hash = Fun.id
end
module Htbl = Picos_aux_htbl
let chld_awaiters = Htbl.create ~hashed_type:(module RandomInt) ()
type cancel_at =
| Cancel_at : {
time : Mtime.span;
exn : exn;
bt : Printexc.raw_backtrace;
computation : 'a Computation.t;
}
-> cancel_at
module Q =
Psq.Make
(Int)
(struct
type t = cancel_at
let compare (Cancel_at l) (Cancel_at r) = Mtime.Span.compare l.time r.time
end)
type return_on =
| Return_on : {
file_descr : Picos_io_fd.t;
value : 'a;
computation : 'a Computation.t;
mutable alive : bool;
}
-> return_on
type phase = Continue | Select | Waking_up | Process
type state = {
phase : phase Atomic.t;
mutable state : [ `Initial | `Starting | `Alive | `Stopping | `Stopped ];
mutable exn_bt : exn * Printexc.raw_backtrace;
mutable pipe_inn : Unix.file_descr;
mutable pipe_out : Unix.file_descr;
byte : Bytes.t;
timeouts : Q.t Atomic.t;
mutable next_id : int;
new_rd : return_on list ref;
new_wr : return_on list ref;
new_ex : return_on list ref;
}
type intr_status = Cleared | Signaled
type _ tdt =
| Nothing : [> `Nothing ] tdt
| Req : {
state : state;
mutable unused : bool;
mutable computation : intr_status Computation.t;
}
-> [> `Req ] tdt
type req = R : [< `Nothing | `Req ] tdt -> req [@@unboxed]
type counter_state = { value : int; req : req }
let intr_pending = Atomic.make { value = 0; req = R Nothing }
let empty_bt = Printexc.get_callstack 0
let exit_bt = (Exit, empty_bt)
let cleared =
let computation = Computation.create () in
Computation.return computation Cleared;
computation
let intr_key : [ `Req ] tdt Picos_thread.TLS.t = Picos_thread.TLS.create ()
let key =
Picos_domain.DLS.new_key @@ fun () ->
{
phase = Atomic.make Continue;
state = `Initial;
exn_bt = exit_bt;
pipe_inn = Unix.stdin;
pipe_out = Unix.stdin;
byte = Bytes.create 1;
timeouts = Atomic.make Q.empty;
next_id = 0;
new_rd = ref [];
new_wr = ref [];
new_ex = ref [];
}
let[@poll error] [@inline never] try_transition s from into =
s.state == from
&& begin
s.state <- into;
true
end
let[@poll error] [@inline never] transition s into =
let from = s.state in
s.state <- into;
from
let rec wakeup s from =
match Atomic.get s.phase with
| Process | Waking_up ->
()
| Continue ->
if Atomic.compare_and_set s.phase Continue Process then
()
else
wakeup s from
| Select ->
if Atomic.compare_and_set s.phase Select Waking_up then
if s.state == from then
let n = Unix.write s.pipe_out s.byte 0 1 in
assert (n = 1)
type fos = { n : int; unique_fds : Unix.file_descr list; ops : return_on list }
let fos_empty = { n = 1; unique_fds = []; ops = [] }
module Ht = Hashtbl.Make (Picos_io_fd.Resource)
let rec process_fds ht unique_fds ops = function
| [] ->
if unique_fds == [] && ops == [] then fos_empty
else { n = Ht.length ht; unique_fds; ops }
| (Return_on r as op) :: ops_todo ->
if Computation.is_running r.computation then begin
let file_descr = Picos_io_fd.unsafe_get r.file_descr in
match Ht.find ht file_descr with
| `Return ->
Picos_io_fd.decr r.file_descr;
r.alive <- false;
Computation.return r.computation r.value;
process_fds ht unique_fds ops ops_todo
| `Alive -> process_fds ht unique_fds (op :: ops) ops_todo
| exception Not_found ->
Ht.add ht file_descr `Alive;
process_fds ht (file_descr :: unique_fds) (op :: ops) ops_todo
end
else begin
Picos_io_fd.decr r.file_descr;
process_fds ht unique_fds ops ops_todo
end
let process_fds unique_fds fos new_ops =
if fos.ops == [] && new_ops == [] then fos_empty
else
let ht = Ht.create fos.n in
unique_fds |> List.iter (fun fd -> Ht.add ht fd `Return);
let r = process_fds ht [] [] fos.ops in
if new_ops == [] then r else process_fds ht r.unique_fds r.ops new_ops
let rec process_timeouts s =
let before = Atomic.get s.timeouts in
match Q.pop before with
| None -> -1.0
| Some ((_, Cancel_at e), after) ->
let elapsed = Mtime_clock.elapsed () in
if Mtime.Span.compare e.time elapsed <= 0 then begin
if Atomic.compare_and_set s.timeouts before after then
Computation.cancel e.computation e.exn e.bt;
process_timeouts s
end
else
Mtime.Span.to_float_ns (Mtime.Span.abs_diff e.time elapsed)
*. (1. /. 1_000_000_000.)
module Thread_atomic = Picos_io_thread_atomic
let rec select_thread s timeout rd wr ex =
if s.state == `Alive then begin
let rd_fds, wr_fds, ex_fds =
if Atomic.compare_and_set s.phase Continue Select then begin
try
Unix.select
(s.pipe_inn :: rd.unique_fds)
wr.unique_fds ex.unique_fds timeout
with Unix.Unix_error (EINTR, _, _) -> ([], [], [])
end
else ([], [], [])
in
begin
match Atomic.exchange s.phase Continue with
| Select | Process | Continue -> ()
| Waking_up ->
let n = Unix.read s.pipe_inn s.byte 0 1 in
assert (n = 1)
end;
let rd = process_fds rd_fds rd (Thread_atomic.exchange s.new_rd []) in
let wr = process_fds wr_fds wr (Thread_atomic.exchange s.new_wr []) in
let ex = process_fds ex_fds ex (Thread_atomic.exchange s.new_ex []) in
let timeout = process_timeouts s in
let timeout =
let state = Atomic.get intr_pending in
if state.value = 0 then timeout
else begin
assert (0 < state.value);
Unix.kill (Unix.getpid ()) config.intr_sig;
let idle = 0.000_001 in
if timeout < 0.0 || idle <= timeout then idle else timeout
end
in
select_thread s timeout rd wr ex
end
let select_thread s =
if Picos_domain.is_main_domain () then
config.bits <- select_thread_running_on_main_domain_bit lor config.bits;
if not Sys.win32 then begin
Thread.sigmask SIG_BLOCK config.intr_sigs |> ignore;
Thread.sigmask
(if config.bits land handle_sigchld_bit <> 0 then SIG_UNBLOCK
else SIG_BLOCK)
[ Sys.sigchld ]
|> ignore
end;
begin
try
let pipe_inn, pipe_out = Unix.pipe ~cloexec:true () in
s.pipe_inn <- pipe_inn;
s.pipe_out <- pipe_out;
if try_transition s `Starting `Alive then
select_thread s (-1.0) fos_empty fos_empty fos_empty
with exn ->
let bt = Printexc.get_raw_backtrace () in
s.exn_bt <- (exn, bt)
end;
transition s `Stopped |> ignore;
if s.pipe_inn != Unix.stdin then Unix.close s.pipe_inn;
if s.pipe_out != Unix.stdin then Unix.close s.pipe_out
let[@poll error] [@inline never] try_configure ~intr_sig ~intr_sigs
~handle_sigchld ~ignore_sigpipe =
config.intr_sigs == []
&& begin
config.bits <-
Bool.to_int handle_sigchld
lor (ignore_sigpipe_bit land -Bool.to_int ignore_sigpipe);
config.intr_sig <- intr_sig;
config.intr_sigs <- intr_sigs;
true
end
let is_intr_sig signum = signum = config.intr_sig
let handle_signal signal =
if signal = Sys.sigchld then begin
Htbl.remove_all chld_awaiters
|> Seq.iter @@ fun (_, Return r) ->
r.alive <- false;
Computation.return r.computation r.value
end
else if signal = config.intr_sig then
let (Req r) = Picos_thread.TLS.get_exn intr_key in
Computation.return r.computation Signaled
let reconfigure_signal_handlers () =
if not Sys.win32 then begin
Sys.signal config.intr_sig (Sys.Signal_handle handle_signal) |> ignore;
Thread.sigmask SIG_BLOCK config.intr_sigs |> ignore;
if config.bits land handle_sigchld_bit <> 0 then begin
Sys.signal Sys.sigchld (Sys.Signal_handle handle_signal) |> ignore;
Thread.sigmask SIG_BLOCK [ Sys.sigchld ] |> ignore
end;
if config.bits land ignore_sigpipe_bit <> 0 then begin
Sys.signal Sys.sigpipe Signal_ignore |> ignore
end
end
let configure ?(intr_sig = Sys.sigusr2) ?(handle_sigchld = true)
?(ignore_sigpipe = true) () =
if not (Picos_thread.is_main_thread ()) then
invalid_arg "must be called from the main thread on the main domain";
assert (Sys.sigabrt = -1 && Sys.sigxfsz < Sys.sigabrt);
if intr_sig < Sys.sigxfsz || 0 <= intr_sig || intr_sig = Sys.sigchld then
invalid_arg "invalid interrupt signal number";
if
not
(try_configure ~intr_sig ~intr_sigs:[ intr_sig ] ~handle_sigchld
~ignore_sigpipe)
then invalid_arg "already configured";
reconfigure_signal_handlers ()
let check_configured () =
Multicore_magic.instantaneous_domain_index () |> ignore;
if config.intr_sigs == [] then configure ()
else reconfigure_signal_handlers ()
let[@inline never] init s =
check_configured ();
if try_transition s `Initial `Starting then begin
match Thread.create select_thread s with
| thread ->
Picos_domain.at_exit @@ fun () ->
if try_transition s `Alive `Stopping then wakeup s `Stopping;
Thread.join thread;
if s.exn_bt != exit_bt then
Printexc.raise_with_backtrace (fst s.exn_bt) (snd s.exn_bt)
| exception exn ->
transition s `Stopped |> ignore;
raise exn
end;
while s.state == `Starting do
Thread.yield ()
done;
if s.state != `Alive then invalid_arg "domain has been terminated"
let get () =
let s = Picos_domain.DLS.get key in
if s.state != `Alive then init s;
s
let[@poll error] [@inline never] next_id t =
let id = t.next_id in
t.next_id <- id + 1;
id
let rec add_timeout s id entry =
let before = Atomic.get s.timeouts in
let after = Q.add id entry before in
if Atomic.compare_and_set s.timeouts before after then
match Q.min after with
| Some (id', _) -> if id = id' then wakeup s `Alive
| None -> ()
else add_timeout s id entry
let rec remove_action _trigger s id =
let before = Atomic.get s.timeouts in
let after = Q.remove id before in
if not (Atomic.compare_and_set s.timeouts before after) then
remove_action (Obj.magic ()) s id
let to_deadline ~seconds =
match Mtime.Span.of_float_ns (seconds *. 1_000_000_000.) with
| None -> invalid_arg "seconds should be between 0 to pow(2, 53) nanoseconds"
| Some span -> Mtime.Span.add (Mtime_clock.elapsed ()) span
let[@alert "-handler"] cancel_after computation ~seconds exn bt =
let time = to_deadline ~seconds in
let entry = Cancel_at { time; exn; bt; computation } in
let s = get () in
let id = next_id s in
add_timeout s id entry;
let remover = Trigger.from_action s id remove_action in
if not (Computation.try_attach computation remover) then
Trigger.signal remover
let[@alert "-handler"] timeout ~seconds =
let request outer to_result =
let inner =
Computation.with_action to_result outer @@ fun _ to_result outer ->
Computation.return outer to_result
in
let canceler =
Trigger.from_action () inner @@ fun _ _ inner ->
Computation.cancel inner Exit empty_bt
in
if Computation.try_attach outer canceler then
cancel_after inner ~seconds Exit empty_bt
in
Event.from_request { request }
let wakeup_action _trigger s (Return_on r) = if r.alive then wakeup s `Alive
let[@alert "-handler"] rec insert_fd s fds (Return_on r as op) =
let before = !fds in
if Computation.is_running r.computation then
if Thread_atomic.compare_and_set fds before (Return_on r :: before) then
let _ : bool =
Computation.try_attach r.computation
(Trigger.from_action s op wakeup_action)
in
wakeup s `Alive
else insert_fd s fds op
else Picos_io_fd.decr r.file_descr
let return_on computation file_descr op value =
Picos_io_fd.incr file_descr;
let s = get () in
insert_fd s
(match op with `R -> s.new_rd | `W -> s.new_wr | `E -> s.new_ex)
(Return_on { computation; file_descr; value; alive = true })
let await_on file_descr op =
let computation = Computation.create ~mode:`LIFO () in
return_on computation file_descr op file_descr;
try Computation.await computation
with exn ->
Computation.cancel computation Exit empty_bt;
raise exn
let on file_descr op =
let request computation to_result =
return_on computation file_descr op to_result
in
Event.from_request { request }
module Intr = struct
type t = req
let[@inline] use = function R Nothing -> () | R (Req r) -> r.unused <- false
(** This is used to ensure that the [intr_pending] counter is incremented
exactly once before the counter is decremented. *)
let rec incr_once (Req r as req : [ `Req ] tdt) backoff =
let before = Atomic.get intr_pending in
r.unused && before.req != R req
&& begin
use before.req;
let after = { value = before.value + 1; req = R req } in
if Atomic.compare_and_set intr_pending before after then
after.value = 1
else incr_once req (Backoff.once backoff)
end
let intr_action trigger (Req r as req : [ `Req ] tdt) id =
match Computation.await r.computation with
| Cleared ->
remove_action trigger r.state id
| Signaled ->
remove_action trigger r.state id;
if incr_once req Backoff.default then
wakeup r.state `Alive
| exception Exit ->
let _ : bool = incr_once req Backoff.default in
()
let nothing = R Nothing
let[@alert "-handler"] req ~seconds =
if Sys.win32 then invalid_arg "not supported on Windows"
else begin
let time = to_deadline ~seconds in
let state = get () in
let id = next_id state in
let (Req r as req : [ `Req ] tdt) =
Req { state; unused = true; computation = cleared }
in
let computation = Computation.with_action req id intr_action in
r.computation <- computation;
Picos_thread.TLS.set intr_key req;
let entry = Cancel_at { time; exn = Exit; bt = empty_bt; computation } in
add_timeout state id entry;
let was_blocked : int list =
Thread.sigmask SIG_UNBLOCK config.intr_sigs
in
assert (List.exists is_intr_sig was_blocked);
R req
end
let rec decr backoff =
let before = Atomic.get intr_pending in
use before.req;
let after = { value = before.value - 1; req = R Nothing } in
assert (0 <= after.value);
if not (Atomic.compare_and_set intr_pending before after) then
decr (Backoff.once backoff)
let clr = function
| R Nothing -> ()
| R (Req r as req) ->
let was_blocked : int list =
Thread.sigmask SIG_BLOCK config.intr_sigs
in
assert (not (List.exists is_intr_sig was_blocked));
if not (Computation.try_return r.computation Cleared) then begin
let _ : bool = incr_once req Backoff.default in
decr Backoff.default
end
end
let rec insert return =
let id = Random.bits () in
if Htbl.try_add chld_awaiters id return then id else insert return
let[@alert "-handler"] return_on_sigchld computation value =
if
config.bits
land (select_thread_running_on_main_domain_bit lor handle_sigchld_bit)
= handle_sigchld_bit
then
get () |> ignore;
let return = Return { value; computation; alive = true } in
let id = insert return in
let remover =
Trigger.from_action id return @@ fun _trigger id (Return this_r as this) ->
if this_r.alive then begin
this_r.alive <- false;
match Htbl.remove_exn chld_awaiters id with
| Return that_r as that ->
if this != that then
Computation.return that_r.computation that_r.value
| exception Not_found -> ()
end
in
if not (Computation.try_attach computation remover) then
Trigger.signal remover
let on_sigchld = Event.from_request { request = return_on_sigchld }