1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
type _ Effect.t += Fork : Cancel.fiber_context * (unit -> unit) -> unit Effect.t
let yield () =
let fiber = Suspend.enter (fun fiber enqueue -> enqueue (Ok fiber)) in
Cancel.check fiber.cancel_context
let fork_raw new_fiber f =
Effect.perform (Fork (new_fiber, f))
let fork ~sw f =
Switch.check_our_domain sw;
if Cancel.is_on sw.cancel then (
let vars = Cancel.Fiber_context.get_vars () in
let new_fiber = Cancel.Fiber_context.make ~cc:sw.cancel ~vars in
fork_raw new_fiber @@ fun () ->
Switch.with_op sw @@ fun () ->
match f () with
| () ->
Ctf.note_resolved (Cancel.Fiber_context.tid new_fiber) ~ex:None
| exception ex ->
Switch.fail sw ex;
Ctf.note_resolved (Cancel.Fiber_context.tid new_fiber) ~ex:(Some ex)
)
let fork_daemon ~sw f =
Switch.check_our_domain sw;
if Cancel.is_on sw.cancel then (
let vars = Cancel.Fiber_context.get_vars () in
let new_fiber = Cancel.Fiber_context.make ~cc:sw.cancel ~vars in
fork_raw new_fiber @@ fun () ->
Switch.with_daemon sw @@ fun () ->
match f () with
| `Stop_daemon ->
Ctf.note_resolved (Cancel.Fiber_context.tid new_fiber) ~ex:None
| exception Cancel.Cancelled Exit when not (Cancel.is_on sw.cancel) ->
Ctf.note_resolved (Cancel.Fiber_context.tid new_fiber) ~ex:None
| exception ex ->
Switch.fail sw ex;
Ctf.note_resolved (Cancel.Fiber_context.tid new_fiber) ~ex:(Some ex)
)
let fork_promise ~sw f =
Switch.check_our_domain sw;
let vars = Cancel.Fiber_context.get_vars () in
let new_fiber = Cancel.Fiber_context.make ~cc:sw.Switch.cancel ~vars in
let p, r = Promise.create_with_id (Cancel.Fiber_context.tid new_fiber) in
fork_raw new_fiber (fun () ->
match Switch.with_op sw f with
| x -> Promise.resolve_ok r x
| exception ex -> Promise.resolve_error r ex
);
p
let fork_promise_exn ~sw f =
Switch.check_our_domain sw;
let vars = Cancel.Fiber_context.get_vars () in
let new_fiber = Cancel.Fiber_context.make ~cc:sw.Switch.cancel ~vars in
let p, r = Promise.create_with_id (Cancel.Fiber_context.tid new_fiber) in
fork_raw new_fiber (fun () ->
match Switch.with_op sw f with
| x -> Promise.resolve r x
| exception ex ->
Switch.fail sw ex
);
p
let all xs =
Switch.run @@ fun sw ->
List.iter (fork ~sw) xs
let both f g = all [f; g]
let pair f g =
Switch.run @@ fun sw ->
let x = fork_promise ~sw f in
let y = g () in
(Promise.await_exn x, y)
let fork_sub ~sw ~on_error f =
fork ~sw (fun () ->
try Switch.run f
with
| ex when Cancel.is_on sw.cancel ->
Switch.run_in sw @@ fun () ->
try on_error ex
with ex2 ->
Switch.fail sw ex;
Switch.fail sw ex2
)
exception Not_first
let await_cancel () =
Suspend.enter @@ fun fiber enqueue ->
Cancel.Fiber_context.set_cancel_fn fiber (fun ex -> enqueue (Error ex))
let any fs =
let r = ref `None in
let parent_c =
Cancel.sub_unchecked (fun cc ->
let wrap h =
match h () with
| x ->
begin match !r with
| `None -> r := `Ok x; Cancel.cancel cc Not_first
| `Ex _ | `Ok _ -> ()
end
| exception Cancel.Cancelled _ when not (Cancel.is_on cc) ->
()
| exception ex ->
begin match !r with
| `None -> r := `Ex (ex, Printexc.get_raw_backtrace ()); Cancel.cancel cc ex
| `Ok _ -> r := `Ex (ex, Printexc.get_raw_backtrace ())
| `Ex prev ->
let bt = Printexc.get_raw_backtrace () in
r := `Ex (Exn.combine prev (ex, bt))
end
in
let vars = Cancel.Fiber_context.get_vars () in
let rec aux = function
| [] -> await_cancel ()
| [f] -> wrap f; []
| f :: fs ->
let new_fiber = Cancel.Fiber_context.make ~cc ~vars in
let p, r = Promise.create_with_id (Cancel.Fiber_context.tid new_fiber) in
fork_raw new_fiber (fun () ->
match wrap f with
| x -> Promise.resolve_ok r x
| exception ex -> Promise.resolve_error r ex
);
p :: aux fs
in
let ps = aux fs in
Cancel.protect (fun () -> List.iter Promise.await_exn ps)
)
in
match !r, Cancel.get_error parent_c with
| `Ok r, None -> r
| (`Ok _ | `None), Some ex -> raise ex
| `Ex (ex, bt), None -> Printexc.raise_with_backtrace ex bt
| `Ex ex1, Some ex2 ->
let bt2 = Printexc.get_raw_backtrace () in
let ex, bt = Exn.combine ex1 (ex2, bt2) in
Printexc.raise_with_backtrace ex bt
| `None, None -> assert false
let first f g = any [f; g]
let check () =
let ctx = Effect.perform Cancel.Get_context in
Cancel.check ctx.cancel_context
module List = struct
let opt_cons x xs =
match x with
| None -> xs
| Some x -> x :: xs
module Limiter : sig
(** This is a bit like using a semaphore, but it assumes that there is only a
single fiber using it. e.g. you must not call {!use}, {!fork}, etc from
two different fibers. *)
type t
val create : sw:Switch.t -> int -> t
(** [create ~sw n] is a limiter that allows running up to [n] jobs at once. *)
val use : t -> ('a -> 'b) -> 'a -> 'b
(** [use t fn x] runs [fn x] in this fiber, counting it as one use of [t]. *)
val fork : t -> ('a -> unit) -> 'a -> unit
(** [fork t fn x] runs [fn x] in a new fibre, once a fiber is free. *)
val fork_promise_exn : t -> ('a -> 'b) -> 'a -> 'b Promise.t
(** [fork_promise_exn t fn x] runs [fn x] in a new fibre, once a fiber is free,
and returns a promise for the result. *)
end = struct
type t = {
mutable free_fibers : int;
cond : unit Single_waiter.t;
sw : Switch.t;
}
let max_fibers_err n =
Fmt.failwith "max_fibers must be positive (got %d)" n
let create ~sw max_fibers =
if max_fibers <= 0 then max_fibers_err max_fibers;
{
free_fibers = max_fibers;
cond = Single_waiter.create ();
sw;
}
let await_free t =
if t.free_fibers = 0 then Single_waiter.await t.cond t.sw.id;
assert (t.free_fibers > 0);
t.free_fibers <- t.free_fibers - 1
let release t =
t.free_fibers <- t.free_fibers + 1;
if t.free_fibers = 1 then Single_waiter.wake t.cond (Ok ())
let use t fn x =
await_free t;
let r = fn x in
release t;
r
let fork_promise_exn t fn x =
await_free t;
fork_promise_exn ~sw:t.sw (fun () -> let r = fn x in release t; r)
let fork t fn x =
await_free t;
fork ~sw:t.sw (fun () -> fn x; release t)
end
let filter_map ?(max_fibers=max_int) fn items =
match items with
| [] -> []
| items ->
Switch.run @@ fun sw ->
let limiter = Limiter.create ~sw max_fibers in
let rec aux = function
| [] -> []
| [x] -> Option.to_list (Limiter.use limiter fn x)
| x :: xs ->
let x = Limiter.fork_promise_exn limiter fn x in
let xs = aux xs in
opt_cons (Promise.await x) xs
in
aux items
let map ?max_fibers fn = filter_map ?max_fibers (fun x -> Some (fn x))
let filter ?max_fibers fn = filter_map ?max_fibers (fun x -> if fn x then Some x else None)
let iter ?(max_fibers=max_int) fn items =
match items with
| [] -> ()
| items ->
Switch.run @@ fun sw ->
let limiter = Limiter.create ~sw max_fibers in
let rec aux = function
| [] -> ()
| [x] -> Limiter.use limiter fn x
| x :: xs ->
Limiter.fork limiter fn x;
aux xs
in
aux items
end
include List
type 'a key = 'a Hmap.key
let create_key () = Hmap.Key.create ()
let get key = Hmap.find key (Cancel.Fiber_context.get_vars ())
let with_binding var value fn =
let ctx = Effect.perform Cancel.Get_context in
Cancel.Fiber_context.with_vars ctx (Hmap.add var value ctx.vars) fn
let without_binding var fn =
let ctx = Effect.perform Cancel.Get_context in
Cancel.Fiber_context.with_vars ctx (Hmap.rem var ctx.vars) fn