1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
module Backoff = Miou_backoff
module Trigger : sig
type state =
| Signaled
| Awaiting : (t -> 'x -> 'y -> unit) * 'x * 'y -> state
| Initial
and t = state Atomic.t
val create : unit -> t
val is_initial : t -> bool
val is_signaled : t -> bool
val signal : t -> unit
val on_signal : t -> 'x -> 'y -> (t -> 'x -> 'y -> unit) -> bool
type _ Effect.t +=
private
| Await : t -> (exn * Printexc.raw_backtrace) option Effect.t
val await : t -> (exn * Printexc.raw_backtrace) option
end = struct
type state =
| Signaled
| Awaiting : (t -> 'x -> 'y -> unit) * 'x * 'y -> state
| Initial
and t = state Atomic.t
let create () = Atomic.make Initial
let rec signal t =
match Atomic.get t with
| Signaled -> ()
| Initial ->
if not (Atomic.compare_and_set t Initial Signaled) then signal t
| Awaiting (fn, x, y) as seen ->
if Atomic.compare_and_set t seen Signaled then fn t x y else signal t
let is_signaled t = Atomic.get t = Signaled
let is_initial t = Atomic.get t = Initial
let[@inline never] awaiting () = invalid_arg "Trigger: already awaiting"
let rec on_signal t x y fn =
match Atomic.get t with
| Initial ->
Atomic.compare_and_set t Initial (Awaiting (fn, x, y))
|| on_signal t x y fn
| Signaled -> false
| Awaiting _ -> awaiting ()
type _ Effect.t += Await : t -> (exn * Printexc.raw_backtrace) option Effect.t
let await t =
match Atomic.get t with
| Signaled -> None
| Awaiting _ -> awaiting ()
| Initial -> Effect.perform (Await t)
end
module Computation : sig
type 'a state =
| Cancelled of exn * Printexc.raw_backtrace
| Returned of 'a
| Continue of { balance: int; triggers: Trigger.t list }
type !'a t = 'a state Atomic.t
val create : unit -> 'a t
val try_return : 'a t -> 'a -> bool
val try_capture : 'r t -> ('a -> 'r) -> 'a -> bool
val try_cancel : 'a t -> exn * Printexc.raw_backtrace -> bool
val is_running : 'a t -> bool
val cancelled : 'a t -> (exn * Printexc.raw_backtrace) option
val raise_if_errored : 'a t -> unit
val peek : 'a t -> ('a, exn * Printexc.raw_backtrace) result option
val try_attach : 'a t -> Trigger.t -> bool
val detach : 'a t -> Trigger.t -> unit
val clean : 'a t -> unit
val await : 'a t -> ('a, exn * Printexc.raw_backtrace) result
val await_exn : 'a t -> 'a
val canceller : from:'a t -> into:'b t -> Trigger.t
end = struct
type 'a state =
| Cancelled of exn * Printexc.raw_backtrace
| Returned of 'a
| Continue of { balance: int; triggers: Trigger.t list }
type 'a t = 'a state Atomic.t
let create () = Atomic.make (Continue { balance= 0; triggers= [] })
let cancelled t =
match Atomic.get t with
| Cancelled (exn, bt) -> Some (exn, bt)
| Returned _ | Continue _ -> None
open struct
let rec gc length triggers = function
| [] -> Continue { balance= length; triggers }
| r :: rs ->
if Trigger.is_signaled r then gc length triggers rs
else gc (succ length) (r :: triggers) rs
end
let rec try_attach backoff t trigger =
match Atomic.get t with
| Returned _ | Cancelled _ -> false
| Continue r as seen ->
(not (Trigger.is_signaled trigger))
&&
let after =
if 0 <= r.balance then
Continue { balance= r.balance + 1; triggers= trigger :: r.triggers }
else gc 1 [ trigger ] r.triggers
in
Atomic.compare_and_set t seen after
|| try_attach (Backoff.once backoff) t trigger
let try_attach t trigger = try_attach Backoff.default t trigger
let rec detach backoff t =
match Atomic.get t with
| Returned _ | Cancelled _ -> ()
| Continue r as seen ->
let after =
if 0 <= r.balance then Continue { r with balance= r.balance - 2 }
else gc 0 [] r.triggers
in
if not (Atomic.compare_and_set t seen after) then
detach (Backoff.once backoff) t
let detach t trigger = Trigger.signal trigger; detach Backoff.default t
let rec clean backoff t =
match Atomic.get t with
| Returned _ | Cancelled _ -> ()
| Continue r as seen ->
let after = gc 0 [] r.triggers in
if not (Atomic.compare_and_set t seen after) then
clean (Backoff.once backoff) t
let clean t = clean Backoff.default t
let is_running t =
match Atomic.get t with
| Cancelled _ | Returned _ -> false
| Continue _ -> true
open struct
let rec try_terminate backoff t after =
match Atomic.get t with
| Returned _ | Cancelled _ -> false
| Continue r as seen ->
if Atomic.compare_and_set t seen after then
let () = List.iter Trigger.signal r.triggers in
true
else try_terminate (Backoff.once backoff) t after
end
let try_return t value = try_terminate Backoff.default t (Returned value)
let try_cancel t (exn, bt) =
try_terminate Backoff.default t (Cancelled (exn, bt))
let try_capture t fn x =
match fn x with
| y -> try_return t y
| exception exn ->
let bt = Printexc.get_raw_backtrace () in
try_cancel t (exn, bt)
let raise_if_errored t =
match Atomic.get t with
| Cancelled (exn, bt) -> Printexc.raise_with_backtrace exn bt
| Returned _ | Continue _ -> ()
let peek t =
match Atomic.get t with
| Cancelled (exn, bt) -> Some (Error (exn, bt))
| Returned v -> Some (Ok v)
| Continue _ -> None
open struct
let propagate _ from into =
match cancelled from with
| None -> ()
| Some v -> ignore (try_cancel into v)
end
let canceller ~from ~into =
Atomic.make (Trigger.Awaiting (propagate, from, into))
let rec await t =
match Atomic.get t with
| Returned value -> Ok value
| Cancelled (exn, bt) -> Error (exn, bt)
| Continue _ ->
let trigger = Trigger.create () in
if try_attach t trigger then (
match Trigger.await trigger with
| None -> await t
| Some (exn, bt) ->
detach t trigger;
Error (exn, bt))
else await t
let await_exn t =
match await t with
| Ok value -> value
| Error (exn, bt) -> Printexc.raise_with_backtrace exn bt
end