Source file kqueue.ml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
module Ffi = struct
  external is_available : unit -> bool = "kqueue_stubs_is_available"
  external kqueue : unit -> Unix.file_descr = "kqueue_ml_kqueue_create"

  external kevent :
    Unix.file_descr -> Bigstring.t -> int -> Bigstring.t -> int -> int64 -> int
    = "kqueue_ml_kevent_bytecode" "kqueue_ml_kevent_native"

  module Kevent = struct
    external sizeof : unit -> int = "kqueue_ml_kevent_sizeof"

    external event_ident_offset : unit -> int
      = "kqueue_ml_kevent_offset_event_fd"

    let event_ident_offset = event_ident_offset ()

    external event_filter_offset : unit -> int
      = "kqueue_ml_kevent_offset_filter"

    let event_filter_offset = event_filter_offset ()

    external event_flags_offset : unit -> int = "kqueue_ml_kevent_offset_flags"

    let event_flags_offset = event_flags_offset ()

    external event_fflags_offset : unit -> int
      = "kqueue_ml_kevent_offset_fflags"

    let event_fflags_offset = event_fflags_offset ()

    external event_data_offset : unit -> int = "kqueue_ml_kevent_offset_data"

    let event_data_offset = event_data_offset ()

    external event_udata_offset : unit -> int = "kqueue_ml_kevent_offset_udata"

    let event_udata_offset = event_udata_offset ()
    let sizeof = sizeof ()

    let read_ident_at buf idx =
      Bigstring.unsafe_get_int64_le_trunc buf
        ~pos:((idx * sizeof) + event_ident_offset)

    let write_ident_at buf idx ident =
      Bigstring.unsafe_set_int64_le buf
        ~pos:((idx * sizeof) + event_ident_offset)
        ident

    let read_filter_at buf idx =
      Bigstring.unsafe_get_int16_le buf
        ~pos:((idx * sizeof) + event_filter_offset)

    let write_filter_at buf idx filter =
      Bigstring.unsafe_set_int16_le buf
        ~pos:((idx * sizeof) + event_filter_offset)
        filter

    let read_flags_at buf idx =
      Bigstring.unsafe_get_int16_le buf
        ~pos:((idx * sizeof) + event_flags_offset)

    let write_flags_at buf idx flags =
      Bigstring.unsafe_set_int16_le buf
        ~pos:((idx * sizeof) + event_flags_offset)
        flags

    let read_fflags_at buf idx =
      Bigstring.unsafe_get_int32_le buf
        ~pos:((idx * sizeof) + event_fflags_offset)

    let write_fflags_at buf idx fflags =
      Bigstring.unsafe_set_int32_le buf
        ~pos:((idx * sizeof) + event_fflags_offset)
        fflags

    let read_data_at buf idx =
      Bigstring.unsafe_get_int64_le_trunc buf
        ~pos:((idx * sizeof) + event_data_offset)

    let write_data_at buf idx data =
      Bigstring.unsafe_set_int64_le buf
        ~pos:((idx * sizeof) + event_data_offset)
        data

    let read_udata_at buf idx =
      Bigstring.unsafe_get_int64_le_trunc buf
        ~pos:((idx * sizeof) + event_udata_offset)

    let write_udata_at buf idx data =
      Bigstring.unsafe_set_int64_le buf
        ~pos:((idx * sizeof) + event_udata_offset)
        data
  end
end

module Note = struct
  type t = int

  let equal = Int.equal
  let ( = ) = equal
  let empty = 0

  external seconds : unit -> int = "kqueue_constant_note_seconds"
  external useconds : unit -> int = "kqueue_constant_note_useconds"
  external nseconds : unit -> int = "kqueue_constant_note_nseconds"
  external lowat : unit -> int = "kqueue_constant_note_lowat"
  external exec : unit -> int = "kqueue_constant_note_exec"
  external fork : unit -> int = "kqueue_constant_note_fork"
  external exit : unit -> int = "kqueue_constant_note_exit"
  external revoke : unit -> int = "kqueue_constant_note_revoke"
  external rename : unit -> int = "kqueue_constant_note_rename"
  external link : unit -> int = "kqueue_constant_note_link"
  external attrib : unit -> int = "kqueue_constant_note_attrib"
  external extend : unit -> int = "kqueue_constant_note_extend"
  external write : unit -> int = "kqueue_constant_note_write"
  external delete : unit -> int = "kqueue_constant_note_delete"

  let seconds = seconds ()
  let useconds = useconds ()
  let nseconds = nseconds ()
  let lowat = lowat ()
  let exec = exec ()
  let fork = fork ()
  let exit = exit ()
  let revoke = revoke ()
  let rename = rename ()
  let link = link ()
  let attrib = attrib ()
  let extend = extend ()
  let write = write ()
  let delete = delete ()
end

module Flag = struct
  type t = int

  let equal = Int.equal
  let ( = ) = equal
  let ( + ) = ( lor )
  let intersect t1 t2 = t1 land t2 <> 0

  external receipt : unit -> int = "kqueue_constant_ev_receipt"
  external add : unit -> int = "kqueue_constant_ev_add"
  external error : unit -> int = "kqueue_constant_ev_error"
  external eof : unit -> int = "kqueue_constant_ev_eof"
  external clear : unit -> int = "kqueue_constant_ev_clear"
  external oneshot : unit -> int = "kqueue_constant_ev_oneshot"
  external delete : unit -> int = "kqueue_constant_ev_delete"
  external disable : unit -> int = "kqueue_constant_ev_disable"
  external enable : unit -> int = "kqueue_constant_ev_enable"

  let receipt = receipt ()
  let add = add ()
  let error = error ()
  let eof = eof ()
  let clear = clear ()
  let oneshot = oneshot ()
  let delete = delete ()
  let disable = disable ()
  let enable = enable ()

  let known_flags =
    [
      (add, "EV_ADD");
      (receipt, "EV_RECEIPT");
      (error, "EV_ERROR");
      (eof, "EV_EOF");
      (clear, "EV_CLEAR");
      (oneshot, "EV_ONESHOT");
      (delete, "EV_DELETE");
      (disable, "EV_DISABLE");
      (enable, "EV_ENABLE");
    ]

  let is_subset t ~of_:flags = t = t land flags

  let pp fmt t =
    let known_flags =
      List.filter_map
        (fun (k, label) -> if is_subset k ~of_:t then Some label else None)
        known_flags
    in
    Format.pp_print_list
      ~pp_sep:(fun fmt () -> Format.fprintf fmt ", ")
      Format.pp_print_string fmt known_flags
end

module Filter = struct
  type t = int

  let pp fmt x = Format.pp_print_int fmt x
  let equal a b = Int.equal a b
  let ( = ) = equal
  let compare a b = Int.compare a b

  external read : unit -> int = "kqueue_constant_evfilt_read"
  external write : unit -> int = "kqueue_constant_evfilt_write"
  external timer : unit -> int = "kqueue_constant_evfilt_timer"
  external proc : unit -> int = "kqueue_constant_evfilt_proc"
  external vnode : unit -> int = "kqueue_constant_evfilt_vnode"

  let read = read ()
  let write = write ()
  let timer = timer ()
  let proc = proc ()
  let vnode = vnode ()
end

module Timeout = struct
  type t = int64

  let never = -1L
  let immediate = 0L

  let of_ns x =
    if x < 0L then invalid_arg "Timeout cannot be negative";
    x
end

module Event_list = struct
  type t = { buffer : Bigstring.t; mutable length : int; max_length : int }

  let null = { buffer = Bigstring.create 0; length = 0; max_length = 0 }
  let length t = t.length

  module Event = struct
    type t = { buf : Bigstring.t; idx : int }

    let get_ident t = Ffi.Kevent.read_ident_at t.buf t.idx
    let set_ident t ident = Ffi.Kevent.write_ident_at t.buf t.idx ident
    let get_filter t = Ffi.Kevent.read_filter_at t.buf t.idx
    let set_filter t filter = Ffi.Kevent.write_filter_at t.buf t.idx filter
    let get_flags t = Ffi.Kevent.read_flags_at t.buf t.idx
    let set_flags t flags = Ffi.Kevent.write_flags_at t.buf t.idx flags
    let get_fflags t = Ffi.Kevent.read_fflags_at t.buf t.idx
    let set_fflags t fflags = Ffi.Kevent.write_fflags_at t.buf t.idx fflags
    let get_data t = Ffi.Kevent.read_data_at t.buf t.idx
    let set_data t data = Ffi.Kevent.write_data_at t.buf t.idx data
    let get_udata t = Ffi.Kevent.read_udata_at t.buf t.idx
    let set_udata t udata = Ffi.Kevent.write_udata_at t.buf t.idx udata
  end

  let create size =
    if size < 1 then
      invalid_arg "Kqueue.create: changelist_size cannot be less than 1";
    if not (Ffi.is_available ()) then begin
      failwith "Kqueue is not available"
    end;
    {
      buffer = Bigstring.create (Ffi.Kevent.sizeof * size);
      length = 0;
      max_length = size;
    }

  let clear t = t.length <- 0

  let fill_events t count f =
    if count < 1 then
      invalid_arg
        (Printf.sprintf "Number of events (%d) cannot be less than 1" count);
    if count > t.max_length then
      invalid_arg
        (Printf.sprintf "count cannot be greater than max event list size (%d)"
           t.max_length);
    for i = 0 to count - 1 do
      f i { Event.buf = t.buffer; idx = i }
    done;
    t.length <- count

  let iter_events t f =
    if t.length > 0 then begin
      for i = 0 to t.length - 1 do
        f { Event.buf = t.buffer; idx = i }
      done
    end
end

type t = { fd : Unix.file_descr; mutable is_closed : bool }

let ensure_open t =
  if t.is_closed then failwith "Attempting to use a closed kqueue"

let is_available = Ffi.is_available ()

let create () =
  let fd = Ffi.kqueue () in
  { fd; is_closed = false }

let close t =
  if not t.is_closed then begin
    t.is_closed <- true;
    Unix.close t.fd
  end

let kevent t ~(changelist : Event_list.t) ~(eventlist : Event_list.t) timeout =
  ensure_open t;
  let events_to_write = changelist.length in
  changelist.length <- 0;
  let count =
    Ffi.kevent t.fd changelist.Event_list.buffer events_to_write
      eventlist.Event_list.buffer eventlist.max_length timeout
  in
  eventlist.length <- count;
  count