1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
open Picos
let[@inline never] completed () = invalid_arg "already completed"
type _ tdt =
| Nothing : [> `Nothing ] tdt
| Bundle : {
config : int Atomic.t;
bundle : Computation.packed;
errors : Control.Errors.t;
finished : Trigger.t;
}
-> [> `Bundle ] tdt
let config_terminated_bit = 0x01
and config_callstack_mask = 0x3E
and config_callstack_shift = 1
and config_one = 0x40
let flock_key : [ `Bundle | `Nothing ] tdt Fiber.FLS.t = Fiber.FLS.create ()
type t = [ `Bundle ] tdt
let terminate ?callstack (Bundle { bundle = Packed bundle; _ } : t) =
Computation.cancel bundle Control.Terminate
(Control.get_callstack_opt callstack)
let terminate_after ?callstack (Bundle { bundle = Packed bundle; _ } : t)
~seconds =
Computation.cancel_after bundle ~seconds Control.Terminate
(Control.get_callstack_opt callstack)
let error ?callstack (Bundle r as t : t) exn bt =
if exn != Control.Terminate then begin
terminate ?callstack t;
Control.Errors.push r.errors exn bt
end
let decr (Bundle r : t) =
let n = Atomic.fetch_and_add r.config (-config_one) in
if n < config_one * 2 then begin
let (Packed bundle) = r.bundle in
Computation.cancel bundle Control.Terminate Control.empty_bt;
Trigger.signal r.finished
end
type _ pass = FLS : unit pass | Arg : t pass
let[@inline never] no_flock () = invalid_arg "no flock"
let get_flock fiber =
match Fiber.FLS.get fiber flock_key ~default:Nothing with
| Bundle _ as t -> t
| Nothing -> no_flock ()
let await (type a) (Bundle r as t : t) fiber packed canceler outer
(pass : a pass) =
decr t;
Fiber.set_computation fiber packed;
let forbid = Fiber.exchange fiber ~forbid:true in
Trigger.await r.finished |> ignore;
Fiber.set fiber ~forbid;
begin
match pass with FLS -> Fiber.FLS.set fiber flock_key outer | Arg -> ()
end;
let (Packed parent) = packed in
Computation.detach parent canceler;
Control.Errors.check r.errors;
Fiber.check fiber
let join_after_pass (type a) ?callstack ?on_return (fn : a -> _) (pass : a pass)
=
let (Bundle r as t : t) =
let terminated =
match on_return with
| None | Some `Wait -> 0
| Some `Terminate -> config_terminated_bit
in
let callstack =
match callstack with
| None -> 0
| Some n ->
if n <= 0 then 0
else
Int.min n (config_callstack_mask lsr config_callstack_shift)
lsl config_callstack_shift
in
let config = Atomic.make (config_one lor callstack lor terminated) in
let bundle = Computation.Packed (Computation.create ~mode:`LIFO ()) in
let errors = Control.Errors.create () in
let finished = Trigger.create () in
Bundle { config; bundle; errors; finished }
in
let fiber = Fiber.current () in
let outer =
match pass with
| Arg -> Nothing
| FLS -> Fiber.FLS.get fiber flock_key ~default:Nothing
in
let (Packed parent as packed) = Fiber.get_computation fiber in
let (Packed bundle) = r.bundle in
let canceler = Computation.attach_canceler ~from:parent ~into:bundle in
match
Fiber.set_computation fiber r.bundle;
fn (match pass with FLS -> Fiber.FLS.set fiber flock_key t | Arg -> t)
with
| value ->
let config = Atomic.get r.config in
if config land config_terminated_bit <> 0 then begin
let callstack =
let n =
(config land config_callstack_mask) lsr config_callstack_shift
in
if n = 0 then None else Some n
in
terminate ?callstack t
end;
await t fiber packed canceler outer pass;
value
| exception exn ->
let bt = Printexc.get_raw_backtrace () in
error t exn bt;
await t fiber packed canceler outer pass;
Printexc.raise_with_backtrace exn bt
let rec incr (Bundle r as t : t) backoff =
let before = Atomic.get r.config in
if before < config_one then completed ()
else if not (Atomic.compare_and_set r.config before (before + config_one))
then incr t (Backoff.once backoff)
let fork_as_promise_pass (type a) (Bundle r as t : t) thunk (pass : a pass) =
incr t Backoff.default;
try
let child = Computation.create ~mode:`LIFO () in
let fiber = Fiber.create ~forbid:false child in
let (Packed bundle) = r.bundle in
let canceler = Computation.attach_canceler ~from:bundle ~into:child in
let main =
match pass with
| FLS ->
Fiber.FLS.set fiber flock_key t;
fun fiber ->
begin
match thunk () with
| value -> Computation.return child value
| exception exn ->
let bt = Printexc.get_raw_backtrace () in
Computation.cancel child exn bt;
error (get_flock fiber) exn bt
end;
let (Bundle r as t : t) = get_flock fiber in
let (Packed bundle) = r.bundle in
Computation.detach bundle canceler;
decr t
| Arg ->
fun _ ->
begin
match thunk () with
| value -> Computation.return child value
| exception exn ->
let bt = Printexc.get_raw_backtrace () in
Computation.cancel child exn bt;
error t exn bt
end;
let (Packed bundle) = r.bundle in
Computation.detach bundle canceler;
decr t
in
Fiber.spawn fiber main;
child
with canceled_exn ->
decr t;
raise canceled_exn
let fork_pass (type a) (Bundle r as t : t) thunk (pass : a pass) =
incr t Backoff.default;
try
let fiber = Fiber.create_packed ~forbid:false r.bundle in
let main =
match pass with
| FLS ->
Fiber.FLS.set fiber flock_key t;
fun fiber ->
begin
try thunk ()
with exn ->
error (get_flock fiber) exn (Printexc.get_raw_backtrace ())
end;
decr (get_flock fiber)
| Arg ->
fun _ ->
begin
try thunk ()
with exn -> error t exn (Printexc.get_raw_backtrace ())
end;
decr t
in
Fiber.spawn fiber main
with canceled_exn ->
decr t;
raise canceled_exn
let is_running (Bundle { bundle = Packed bundle; _ } : t) =
Computation.is_running bundle
let join_after ?callstack ?on_return fn =
join_after_pass ?callstack ?on_return fn Arg
let fork t thunk = fork_pass t thunk Arg
let fork_as_promise t thunk = fork_as_promise_pass t thunk Arg
let unsafe_incr (Bundle r : t) =
Atomic.fetch_and_add r.config config_one |> ignore