Source file picos_randos.ml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
open Picos
module Collection = struct
type 'a t = (int, 'a) Picos_htbl.t
module Key = struct
type t = int
let equal = Int.equal
let hash = Fun.id
end
let create () = Picos_htbl.create ~hashed_type:(module Key) ()
let rec push t value =
let key = Random.bits () in
if not (Picos_htbl.try_add t key value) then push t value
let rec pop_exn t =
let key = Picos_htbl.find_random_exn t in
try Picos_htbl.remove_exn t key with Not_found -> pop_exn t
let is_empty t =
match Picos_htbl.find_random_exn t with
| _ -> false
| exception Not_found -> true
end
type ready =
| Spawn of Fiber.t * (unit -> unit)
| Current of Fiber.t * (Fiber.t, unit) Effect.Deep.continuation
| Continue of Fiber.t * (unit, unit) Effect.Deep.continuation
| Resume of Fiber.t * (Exn_bt.t option, unit) Effect.Deep.continuation
| Raise of (unit, unit) Effect.Deep.continuation * Exn_bt.t
| Return of (unit, unit) Effect.Deep.continuation
type t = {
ready : ready Collection.t;
num_waiters_non_zero : bool ref;
num_alive_fibers : int Atomic.t;
resume :
Trigger.t ->
Fiber.t ->
(Exn_bt.t option, unit) Effect.Deep.continuation ->
unit;
retc : unit -> unit;
num_waiters : int ref;
condition : Condition.t;
mutex : Mutex.t;
mutable run : bool;
}
let rec spawn t n forbid packed = function
| [] -> Atomic.fetch_and_add t.num_alive_fibers n |> ignore
| main :: mains ->
let fiber = Fiber.create_packed ~forbid packed in
Collection.push t.ready (Spawn (fiber, main));
if !(t.num_waiters_non_zero) then Condition.signal t.condition;
spawn t (n + 1) forbid packed mains
let rec next t =
match Collection.pop_exn t.ready with
| Spawn (fiber, main) ->
let current =
Some
(fun k ->
Collection.push t.ready (Current (fiber, k));
next t)
and yield =
Some
(fun k ->
Collection.push t.ready (Continue (fiber, k));
next t)
and return =
Some
(fun k ->
Collection.push t.ready (Return k);
next t)
in
let[@alert "-handler"] effc (type a) :
a Effect.t -> ((a, _) Effect.Deep.continuation -> _) option = function
| Fiber.Current -> current
| Fiber.Spawn r ->
if Fiber.is_canceled fiber then yield
else begin
spawn t 0 r.forbid (Packed r.computation) r.mains;
return
end
| Fiber.Yield -> yield
| Computation.Cancel_after r -> begin
if Fiber.is_canceled fiber then yield
else
match
Select.cancel_after r.computation ~seconds:r.seconds r.exn_bt
with
| () -> return
| exception exn ->
let exn_bt = Exn_bt.get exn in
Some
(fun k ->
Collection.push t.ready (Raise (k, exn_bt));
next t)
end
| Trigger.Await trigger ->
Some
(fun k ->
if Fiber.try_suspend fiber trigger fiber k t.resume then next t
else begin
Collection.push t.ready (Resume (fiber, k));
next t
end)
| _ -> None
in
Effect.Deep.match_with main () { retc = t.retc; exnc = raise; effc }
| Raise (k, exn_bt) -> Exn_bt.discontinue k exn_bt
| Return k -> Effect.Deep.continue k ()
| Current (fiber, k) -> Effect.Deep.continue k fiber
| Continue (fiber, k) -> Fiber.continue fiber k ()
| Resume (fiber, k) -> Fiber.resume fiber k
| exception Not_found ->
if Atomic.get t.num_alive_fibers <> 0 then begin
Mutex.lock t.mutex;
if Collection.is_empty t.ready && Atomic.get t.num_alive_fibers <> 0
then begin
let n = !(t.num_waiters) + 1 in
t.num_waiters := n;
if n = 1 then t.num_waiters_non_zero := true;
match Condition.wait t.condition t.mutex with
| () ->
let n = !(t.num_waiters) - 1 in
t.num_waiters := n;
if n = 0 then t.num_waiters_non_zero := false;
Mutex.unlock t.mutex;
next t
| exception async_exn ->
let n = !(t.num_waiters) - 1 in
t.num_waiters := n;
if n = 0 then t.num_waiters_non_zero := false;
Mutex.unlock t.mutex;
raise async_exn
end
else begin
Mutex.unlock t.mutex;
next t
end
end
else begin
Mutex.lock t.mutex;
Mutex.unlock t.mutex;
Condition.broadcast t.condition
end
let context () =
Select.check_configured ();
let rec t =
{
ready = Collection.create ();
num_waiters_non_zero = ref false |> Multicore_magic.copy_as_padded;
num_alive_fibers = Atomic.make 1 |> Multicore_magic.copy_as_padded;
resume;
retc;
num_waiters = ref 0 |> Multicore_magic.copy_as_padded;
condition = Condition.create ();
mutex = Mutex.create ();
run = false;
}
and retc () =
Atomic.decr t.num_alive_fibers;
next t
and resume trigger fiber k =
let resume = Resume (fiber, k) in
Fiber.unsuspend fiber trigger |> ignore;
Collection.push t.ready resume;
let non_zero =
match Mutex.lock t.mutex with
| () ->
let non_zero = !(t.num_waiters_non_zero) in
Mutex.unlock t.mutex;
non_zero
| exception Sys_error _ -> false
in
if non_zero then Condition.signal t.condition
in
t
let runner_on_this_thread t =
Select.check_configured ();
next t
let rec await t computation =
if !(t.num_waiters_non_zero) then begin
match Condition.wait t.condition t.mutex with
| () -> await t computation
| exception async_exn ->
Mutex.unlock t.mutex;
raise async_exn
end
else begin
Mutex.unlock t.mutex;
Computation.await computation
end
let run ?context:t_opt ?(forbid = false) main =
let t =
match t_opt with
| Some t ->
Select.check_configured ();
t
| None -> context ()
in
Mutex.lock t.mutex;
if t.run then begin
Mutex.unlock t.mutex;
invalid_arg "already run"
end
else begin
t.run <- true;
Mutex.unlock t.mutex;
let computation = Computation.create () in
let fiber = Fiber.create ~forbid computation in
let main = Computation.capture computation main in
Collection.push t.ready (Spawn (fiber, main));
next t;
Mutex.lock t.mutex;
await t computation
end