Source file domain_mgr.ml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
open Eio.Std
[@@@alert "-unstable"]
module Fd = Eio_unix.Fd
let socketpair k ~sw ~domain ~ty ~protocol wrap_a wrap_b =
let open Effect.Deep in
match
let unix_a, unix_b = Unix.socketpair ~cloexec:true domain ty protocol in
let a = Fd.of_unix ~sw ~blocking:false ~close_unix:true unix_a in
let b = Fd.of_unix ~sw ~blocking:false ~close_unix:true unix_b in
Unix.set_nonblock unix_a;
Unix.set_nonblock unix_b;
(wrap_a a, wrap_b b)
with
| r -> continue k r
| exception Unix.Unix_error (code, name, arg) ->
discontinue k (Err.wrap code name arg)
let run_event_loop fn x =
Sched.with_sched @@ fun sched ->
let open Effect.Deep in
let : _ effect_handler = {
effc = fun (type a) (e : a Effect.t) : ((a, Sched.exit) continuation -> Sched.exit) option ->
match e with
| Eio_unix.Private.Get_monotonic_clock -> Some (fun k -> continue k Time.mono_clock)
| Eio_unix.Net.Import_socket_stream (sw, close_unix, unix_fd) -> Some (fun k ->
let fd = Fd.of_unix ~sw ~blocking:false ~close_unix unix_fd in
Unix.set_nonblock unix_fd;
continue k (Flow.of_fd fd :> _ Eio_unix.Net.stream_socket)
)
| Eio_unix.Net.Import_socket_datagram (sw, close_unix, unix_fd) -> Some (fun k ->
let fd = Fd.of_unix ~sw ~blocking:false ~close_unix unix_fd in
Unix.set_nonblock unix_fd;
continue k (Net.datagram_socket fd)
)
| Eio_unix.Net.Socketpair_stream (sw, domain, protocol) -> Some (fun k ->
let wrap fd = (Flow.of_fd fd :> _ Eio_unix.Net.stream_socket) in
socketpair k ~sw ~domain ~protocol ~ty:Unix.SOCK_STREAM wrap wrap
)
| Eio_unix.Net.Socketpair_datagram (sw, domain, protocol) -> Some (fun k ->
let wrap fd = Net.datagram_socket fd in
socketpair k ~sw ~domain ~protocol ~ty:Unix.SOCK_DGRAM wrap wrap
)
| Eio_unix.Private.Pipe sw -> Some (fun k ->
match
let r, w = Low_level.pipe ~sw in
let source = Flow.of_fd r in
let sink = Flow.of_fd w in
(source, sink)
with
| r -> continue k r
| exception Unix.Unix_error (code, name, arg) ->
discontinue k (Err.wrap code name arg)
)
| _ -> None
}
in
Sched.run ~extra_effects sched fn x
let wrap_backtrace fn x =
match fn x with
| x -> Ok x
| exception ex ->
let bt = Printexc.get_raw_backtrace () in
Error (ex, bt)
let unwrap_backtrace = function
| Ok x -> x
| Error (ex, bt) -> Printexc.raise_with_backtrace ex bt
module Impl = struct
type t = unit
let run_raw () fn =
let domain = ref None in
Eio.Private.Suspend.enter (fun _ctx enqueue ->
domain := Some (Domain.spawn (fun () -> Fun.protect (wrap_backtrace fn) ~finally:(fun () -> enqueue (Ok ()))))
);
unwrap_backtrace (Domain.join (Option.get !domain))
let run () fn =
let domain = ref None in
Eio.Private.Suspend.enter (fun ctx enqueue ->
let cancelled, set_cancelled = Promise.create () in
Eio.Private.Fiber_context.set_cancel_fn ctx (Promise.resolve set_cancelled);
domain := Some (Domain.spawn (fun () ->
Fun.protect (run_event_loop (wrap_backtrace (fun () -> fn ~cancelled)))
~finally:(fun () -> enqueue (Ok ()))))
);
unwrap_backtrace (Domain.join (Option.get !domain))
end
let v =
let handler = Eio.Domain_manager.Pi.mgr (module Impl) in
Eio.Resource.T ((), handler)