Source file domain_mgr.ml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
open Eio.Std
[@@@alert "-unstable"]
let run_event_loop fn x =
Sched.with_sched @@ fun sched ->
let open Effect.Deep in
let : _ effect_handler = {
effc = fun (type a) (e : a Effect.t) : ((a, Sched.exit) continuation -> Sched.exit) option ->
match e with
| Eio_unix.Private.Get_monotonic_clock -> Some (fun k -> continue k (Time.mono_clock : Eio.Time.Mono.t))
| Eio_unix.Private.Socket_of_fd (sw, close_unix, unix_fd) -> Some (fun k ->
let fd = Fd.of_unix ~sw ~blocking:false ~close_unix unix_fd in
Unix.set_nonblock unix_fd;
continue k (Flow.of_fd fd :> Eio_unix.socket)
)
| Eio_unix.Private.Socketpair (sw, domain, ty, protocol) -> Some (fun k ->
match
let unix_a, unix_b = Unix.socketpair ~cloexec:true domain ty protocol in
let a = Fd.of_unix ~sw ~blocking:false ~close_unix:true unix_a in
let b = Fd.of_unix ~sw ~blocking:false ~close_unix:true unix_b in
Unix.set_nonblock unix_a;
Unix.set_nonblock unix_b;
(Flow.of_fd a :> Eio_unix.socket), (Flow.of_fd b :> Eio_unix.socket)
with
| r -> continue k r
| exception Unix.Unix_error (code, name, arg) ->
discontinue k (Err.wrap code name arg)
)
| Eio_unix.Private.Pipe sw -> Some (fun k ->
match
let r, w = Low_level.pipe ~sw in
let source = (Flow.of_fd r :> <Eio.Flow.source; Eio.Flow.close; Eio_unix.unix_fd>) in
let sink = (Flow.of_fd w :> <Eio.Flow.sink; Eio.Flow.close; Eio_unix.unix_fd>) in
(source, sink)
with
| r -> continue k r
| exception Unix.Unix_error (code, name, arg) ->
discontinue k (Err.wrap code name arg)
)
| _ -> None
}
in
Sched.run ~extra_effects sched fn x
let v = object
inherit Eio.Domain_manager.t
method run_raw fn =
let domain = ref None in
Eio.Private.Suspend.enter (fun _ctx enqueue ->
domain := Some (Domain.spawn (fun () -> Fun.protect fn ~finally:(fun () -> enqueue (Ok ()))))
);
Domain.join (Option.get !domain)
method run fn =
let domain = ref None in
Eio.Private.Suspend.enter (fun ctx enqueue ->
let cancelled, set_cancelled = Promise.create () in
Eio.Private.Fiber_context.set_cancel_fn ctx (Promise.resolve set_cancelled);
domain := Some (Domain.spawn (fun () ->
Fun.protect (run_event_loop (fun () -> fn ~cancelled))
~finally:(fun () -> enqueue (Ok ()))))
);
Domain.join (Option.get !domain)
end