Source file io_uring_raw_singleton.ml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
open! Core
open Import
module Eventfd = Linux_ext.Eventfd
type t =
| Not_supported of unit
| Ok of Io_uring_raw.t
module Eventfd_driver = struct
(**
The submission and completion of tasks is completely autonomous, using two async jobs
(one for submission and one for completion) that get scheduled when needed.
Submission is done at the end of every cycle (submissions when the queue is empty
should be very cheap). If a syscall makes its way to the completion queue, the job
that will fill the corresponding deferred is scheduled the next time the async
scheduler checks for I/O through the file descriptor watcher.
*)
let register_hooks uring eventfd =
Io_uring_raw.register_eventfd uring (Eventfd.to_file_descr eventfd);
Async_kernel_scheduler.Expert.run_every_cycle_end (fun () ->
let (_ : int) = Io_uring_raw.submit uring in
());
let fd =
Raw_scheduler.create_fd
Raw_fd.Kind.Fifo
(Eventfd.to_file_descr eventfd)
(Info.create_s [%sexp "io_uring_raw eventfd"])
in
let eventfd_ready_job =
Raw_scheduler.create_job
(Raw_scheduler.the_one_and_only ())
(fun () ->
try
let (_ : Int64.t) = Eventfd.read eventfd in
let (_ : int) = Io_uring_raw.fill_completions uring in
()
with
| Unix.Unix_error ((EAGAIN | EWOULDBLOCK), _, _) -> ()
| exn -> raise exn)
()
in
let finished_watching = Ivar.create () in
(match
Raw_scheduler.request_start_watching
(Raw_scheduler.the_one_and_only ())
fd
`Read
(Raw_fd.Watching.Watch_repeatedly (eventfd_ready_job, finished_watching))
with
| `Watching -> ()
| (`Already_closed | `Already_watching) as result ->
raise_s
[%sexp
(("unexpected result when asked to watch eventfd", result)
: string * [ `Already_closed | `Already_watching ])]);
Deferred.upon (Ivar.read finished_watching) (fun reason ->
raise_s
[%sexp
(("unexpectedly stopped watching eventfd", reason)
: string * [ `Bad_fd | `Closed | `Interrupted | `Unsupported ])])
;;
let force_uring_exn () =
let uring =
Io_uring_raw.create
~queue_depth:
(Io_uring_max_submission_entries.raw Config.io_uring_max_submission_entries)
()
in
match Eventfd.create, uring with
| Error eventfd_error, Error uring_error ->
Error.raise (Error.of_list [ eventfd_error; uring_error ])
| Error eventfd_error, Ok uring ->
Io_uring_raw.exit uring;
Error.raise eventfd_error
| Ok _, Error uring_error -> Error.raise uring_error
| Ok create_eventfd, Ok uring ->
let eventfd =
create_eventfd ~flags:Eventfd.Flags.(cloexec + nonblock) (Int32.of_int_exn 0)
in
register_hooks uring eventfd;
Ok uring
;;
let force_uring_noraise () =
try force_uring_exn () with
| _exn -> Not_supported ()
;;
end
module From_scheduler_driver = struct
let force_uring () =
match Raw_scheduler.uring (Raw_scheduler.t ()) with
| None -> Not_supported ()
| Some uring -> Ok uring
;;
end
let create_global_io_uring () =
match Config.io_uring_mode with
| Disabled -> Not_supported ()
| Eventfd -> Eventfd_driver.force_uring_exn ()
| If_available_eventfd -> Eventfd_driver.force_uring_noraise ()
| From_scheduler -> From_scheduler_driver.force_uring ()
;;
let global_io_uring = lazy (create_global_io_uring ())
let the_one_and_only () =
match force global_io_uring with
| Not_supported () -> None
| Ok io_uring -> Some io_uring
;;