1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
open Common_
type io_mode =
| Read
| Write
module IO_wait = struct
type t = {
mutable active: bool;
f: cancel_handle -> unit;
as_cancel_handle: cancel_handle;
}
(** A single event, waiting on a unix FD *)
let make f : t =
let rec self =
{
active = true;
f;
as_cancel_handle = { cancel = (fun () -> self.active <- false) };
}
in
self
end
module Per_fd = struct
type t = {
fd: Unix.file_descr;
mutable reads: IO_wait.t list;
mutable writes: IO_wait.t list;
}
let[@inline] is_empty self = self.reads = [] && self.writes = []
end
module IO_tbl = struct
type t = {
mutable n_read: int;
mutable n_write: int;
tbl: (Unix.file_descr, Per_fd.t) Hashtbl.t;
}
let create () : t = { tbl = Hashtbl.create 32; n_read = 0; n_write = 0 }
let get_or_create (self : t) fd : Per_fd.t =
try Hashtbl.find self.tbl fd
with Not_found ->
let per_fd = { Per_fd.fd; reads = []; writes = [] } in
Hashtbl.add self.tbl fd per_fd;
per_fd
let add_io_wait (self : t) fd mode (ev : IO_wait.t) =
let per_fd = get_or_create self fd in
match mode with
| Read ->
self.n_read <- 1 + self.n_read;
per_fd.reads <- ev :: per_fd.reads
| Write ->
self.n_write <- 1 + self.n_write;
per_fd.writes <- ev :: per_fd.writes
let prepare_select (self : t) =
let reads = ref [] in
let writes = ref [] in
Hashtbl.iter
(fun _ (per_fd : Per_fd.t) ->
if Per_fd.is_empty per_fd then
Hashtbl.remove self.tbl per_fd.fd
else (
if per_fd.reads <> [] then reads := per_fd.fd :: !reads;
if per_fd.writes <> [] then writes := per_fd.fd :: !writes
))
self.tbl;
!reads, !writes
let trigger_waiter (io : IO_wait.t) =
if io.active then io.f io.as_cancel_handle
let handle_ready ~ignore_read (self : t) (reads : Unix.file_descr list)
(writes : Unix.file_descr list) : unit =
List.iter
(fun fd ->
if fd <> ignore_read then (
let per_fd = Hashtbl.find self.tbl fd in
List.iter trigger_waiter per_fd.reads;
self.n_read <- self.n_read - List.length per_fd.reads;
per_fd.reads <- []
))
reads;
List.iter
(fun fd ->
let per_fd = Hashtbl.find self.tbl fd in
List.iter trigger_waiter per_fd.writes;
self.n_write <- self.n_write - List.length per_fd.writes;
per_fd.writes <- [])
writes;
()
end
let run_timer_ (t : Timer.t) =
let rec loop () =
match Timer.next t with
| Timer.Empty -> None
| Timer.Run (f, ev_h) ->
f ev_h;
loop ()
| Timer.Wait f ->
if f > 0. then
Some f
else
None
in
loop ()
class unix_ev_loop =
let _timer = Timer.create () in
let _io_wait : IO_tbl.t = IO_tbl.create () in
let _in_blocking_section = ref false in
let _magic_pipe_read, _magic_pipe_write = Unix.pipe ~cloexec:true () in
let () =
Unix.set_nonblock _magic_pipe_read;
Unix.set_nonblock _magic_pipe_write
in
let[@inline] has_pending_tasks () =
_io_wait.n_read > 0 || _io_wait.n_write > 0 || Timer.has_tasks _timer
in
object
method one_step ~block () : unit =
let@ _sp = Trace.with_span ~__FILE__ ~__LINE__ "fuseau-unix.iter" in
let delay = run_timer_ _timer in
let delay =
if block then
Option.value delay ~default:10.
else
0.
in
let reads, writes = IO_tbl.prepare_select _io_wait in
if has_pending_tasks () then (
_in_blocking_section := true;
let reads, writes, _ =
Unix.select (_magic_pipe_read :: reads) writes [] delay
in
_in_blocking_section := false;
IO_tbl.handle_ready ~ignore_read:_magic_pipe_read _io_wait reads writes
);
()
method on_readable
: Unix.file_descr -> (cancel_handle -> unit) -> cancel_handle =
fun fd f : cancel_handle ->
let ev = IO_wait.make f in
IO_tbl.add_io_wait _io_wait fd Read ev;
ev.as_cancel_handle
method on_writable
: Unix.file_descr -> (cancel_handle -> unit) -> cancel_handle =
fun fd f : cancel_handle ->
let ev = IO_wait.make f in
IO_tbl.add_io_wait _io_wait fd Write ev;
ev.as_cancel_handle
method on_timer
: float -> repeat:bool -> (cancel_handle -> unit) -> cancel_handle =
fun delay ~repeat f ->
if repeat then
Timer.run_every _timer delay f
else
Timer.run_after _timer delay f
method interrupt_if_in_blocking_section =
if !_in_blocking_section then (
let b = Bytes.create 1 in
ignore (Unix.write _magic_pipe_write b 0 1 : int)
)
method has_pending_tasks : bool = has_pending_tasks ()
end
open struct
let k_ev_loop : unix_ev_loop option ref TLS.key =
TLS.new_key (fun () -> ref None)
end
(** Access the event loop from within it *)
let[@inline] cur () =
match !(TLS.get k_ev_loop) with
| None -> failwith "must be called from inside Fuseau_unix"
| Some ev -> ev
let with_cur (ev : unix_ev_loop) f =
let r = TLS.get k_ev_loop in
let old = !r in
r := Some ev;
let finally () = r := old in
Fun.protect ~finally f