1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
open Picos
let[@inline never] completed () = invalid_arg "already completed"
type _ tdt =
| Nothing : [> `Nothing ] tdt
| Bundle : {
config : int Atomic.t;
bundle : Computation.packed;
errors : Control.Errors.t;
finished : Trigger.t;
}
-> [> `Bundle ] tdt
let config_terminated_bit = 0x01
and config_callstack_mask = 0x3E
and config_callstack_shift = 1
and config_one = 0x40
let flock_key : [ `Bundle | `Nothing ] tdt Fiber.FLS.t = Fiber.FLS.create ()
type t = [ `Bundle ] tdt
let terminate ?callstack (Bundle { bundle = Packed bundle; _ } : t) =
Computation.cancel bundle Control.Terminate
(Control.get_callstack_opt callstack)
let terminate_after ?callstack (Bundle { bundle = Packed bundle; _ } : t)
~seconds =
Computation.cancel_after bundle ~seconds Control.Terminate
(Control.get_callstack_opt callstack)
let error ?callstack (Bundle r as t : t) exn bt =
if exn != Control.Terminate then begin
terminate ?callstack t;
Control.Errors.push r.errors exn bt
end
let decr (Bundle r : t) =
let n = Atomic.fetch_and_add r.config (-config_one) in
if n < config_one * 2 then begin
let (Packed bundle) = r.bundle in
Computation.cancel bundle Control.Terminate Control.empty_bt;
Trigger.signal r.finished
end
type _ pass = FLS : unit pass | Arg : t pass
let[@inline never] no_flock () = invalid_arg "no flock"
let get_flock fiber =
match Fiber.FLS.get fiber flock_key ~default:Nothing with
| Bundle _ as t -> t
| Nothing -> no_flock ()
let await (Bundle r as t : t) fiber packed canceler outer =
decr t;
Fiber.set_computation fiber packed;
let forbid = Fiber.exchange fiber ~forbid:true in
Trigger.await r.finished |> ignore;
Fiber.set fiber ~forbid;
if Fiber.FLS.get fiber flock_key ~default:Nothing != outer then
Fiber.FLS.set fiber flock_key outer;
let (Packed parent) = packed in
Computation.detach parent canceler;
Control.Errors.check r.errors;
Fiber.check fiber
let[@inline never] raised exn t fiber packed canceler outer =
let bt = Printexc.get_raw_backtrace () in
error t exn bt;
await t fiber packed canceler outer;
Printexc.raise_with_backtrace exn bt
let[@inline never] returned value (Bundle r as t : t) fiber packed canceler
outer =
let config = Atomic.get r.config in
if config land config_terminated_bit <> 0 then begin
let callstack =
let n = (config land config_callstack_mask) lsr config_callstack_shift in
if n = 0 then None else Some n
in
terminate ?callstack t
end;
await t fiber packed canceler outer;
value
let join_after_realloc x fn t fiber packed canceler outer =
match fn x with
| value -> returned value t fiber packed canceler outer
| exception exn -> raised exn t fiber packed canceler outer
let join_after_pass (type a) ?callstack ?on_return (fn : a -> _) (pass : a pass)
=
let (Bundle r as t : t) =
let terminated =
match on_return with
| None | Some `Wait -> 0
| Some `Terminate -> config_terminated_bit
in
let callstack =
match callstack with
| None -> 0
| Some n ->
if n <= 0 then 0
else
Int.min n (config_callstack_mask lsr config_callstack_shift)
lsl config_callstack_shift
in
let config = Atomic.make (config_one lor callstack lor terminated) in
let bundle = Computation.Packed (Computation.create ~mode:`LIFO ()) in
let errors = Control.Errors.create () in
let finished = Trigger.create () in
Bundle { config; bundle; errors; finished }
in
let fiber = Fiber.current () in
let outer = Fiber.FLS.get fiber flock_key ~default:Nothing in
begin
match pass with FLS -> Fiber.FLS.reserve fiber flock_key | Arg -> ()
end;
let (Packed parent as packed) = Fiber.get_computation fiber in
let (Packed bundle) = r.bundle in
let canceler = Computation.attach_canceler ~from:parent ~into:bundle in
Fiber.set_computation fiber r.bundle;
let x : a =
match pass with FLS -> Fiber.FLS.set fiber flock_key t | Arg -> t
in
join_after_realloc x fn t fiber packed canceler outer
let rec incr (Bundle r as t : t) backoff =
let before = Atomic.get r.config in
if before < config_one then completed ()
else if not (Atomic.compare_and_set r.config before (before + config_one))
then incr t (Backoff.once backoff)
let finish (Bundle { bundle = Packed bundle; _ } as t : t) canceler =
Computation.detach bundle canceler;
decr t
let[@inline never] raised exn child t canceler =
let bt = Printexc.get_raw_backtrace () in
Computation.cancel child exn bt;
error t exn bt;
finish t canceler
let[@inline never] returned value child t canceler =
Computation.return child value;
finish t canceler
let[@inline never] plug t thunk child canceler =
match thunk () with
| value -> returned value child t canceler
| exception exn -> raised exn child t canceler
let fork_as_promise_pass (type a) (Bundle r as t : t) thunk (pass : a pass) =
incr t Backoff.default;
try
let child = Computation.create ~mode:`LIFO () in
let fiber = Fiber.create ~forbid:false child in
let (Packed bundle) = r.bundle in
let canceler = Computation.attach_canceler ~from:bundle ~into:child in
let main =
match pass with
| FLS ->
Fiber.FLS.set fiber flock_key t;
fun fiber -> plug (get_flock fiber) thunk child canceler
| Arg -> fun _ -> plug t thunk child canceler
in
Fiber.spawn fiber main;
child
with canceled_exn ->
decr t;
raise canceled_exn
let[@inline never] raised exn t =
error t exn (Printexc.get_raw_backtrace ());
decr t
let[@inline never] plug t thunk =
match thunk () with () -> decr t | exception exn -> raised exn t
let fork_pass (type a) (Bundle r as t : t) thunk (pass : a pass) =
incr t Backoff.default;
try
let fiber = Fiber.create_packed ~forbid:false r.bundle in
let main =
match pass with
| FLS ->
Fiber.FLS.set fiber flock_key t;
fun fiber -> plug (get_flock fiber) thunk
| Arg -> fun _ -> plug t thunk
in
Fiber.spawn fiber main
with canceled_exn ->
decr t;
raise canceled_exn
let is_running (Bundle { bundle = Packed bundle; _ } : t) =
Computation.is_running bundle
let join_after ?callstack ?on_return fn =
join_after_pass ?callstack ?on_return fn Arg
let fork t thunk = fork_pass t thunk Arg
let fork_as_promise t thunk = fork_as_promise_pass t thunk Arg
let unsafe_incr (Bundle r : t) =
Atomic.fetch_and_add r.config config_one |> ignore