1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
module Fiber_context = Eio__core.Private.Fiber_context
module Suspend = Eio__core.Private.Suspend
module Cancel = Eio__core.Cancel
type t = Broadcast.t
let create () = Broadcast.create ()
let lock_protected m =
Cancel.protect (fun () -> Eio_mutex.lock m)
let await_generic ?mutex t =
match
Suspend.enter_unchecked (fun ctx enqueue ->
match Fiber_context.get_error ctx with
| Some ex ->
Option.iter Eio_mutex.unlock mutex;
enqueue (Error ex)
| None ->
match Broadcast.suspend t (fun () -> enqueue (Ok ())) with
| None ->
Option.iter Eio_mutex.unlock mutex
| Some request ->
Option.iter Eio_mutex.unlock mutex;
Fiber_context.set_cancel_fn ctx (fun ex ->
if Broadcast.cancel request then enqueue (Error ex)
)
)
with
| () -> Option.iter lock_protected mutex
| exception ex ->
let bt = Printexc.get_raw_backtrace () in
Option.iter lock_protected mutex;
Printexc.raise_with_backtrace ex bt
let await t mutex = await_generic ~mutex t
let await_no_mutex t = await_generic t
let broadcast = Broadcast.resume_all
type request = Broadcast.request option
let register_immediate = Broadcast.suspend
let cancel = function
| Some request -> Broadcast.cancel request
| None -> false
let ensure_cancelled x = ignore (cancel x : bool)
type state =
| Init
| Waiting of ((unit, exn) result -> unit)
| Done
let rec loop_no_mutex t fn =
let state = Atomic.make Init in
let wake () =
match Atomic.exchange state Done with
| Init -> ()
| Waiting enqueue -> enqueue (Ok ())
| Done -> assert false
in
let request = Broadcast.suspend t wake in
match fn () with
| exception ex ->
let bt = Printexc.get_raw_backtrace () in
ensure_cancelled request;
Printexc.raise_with_backtrace ex bt
| Some x ->
ensure_cancelled request;
x
| None ->
Suspend.enter_unchecked (fun ctx enqueue ->
match Fiber_context.get_error ctx with
| Some ex ->
ensure_cancelled request;
enqueue (Error ex)
| None ->
let waiting = Waiting enqueue in
if Atomic.compare_and_set state Init waiting then (
Fiber_context.set_cancel_fn ctx (fun ex ->
if cancel request then (
enqueue (Error ex)
)
)
) else (
enqueue (Ok ())
)
);
loop_no_mutex t fn