Source file picos_fifos.ml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
open Picos
module Queue = Picos_mpscq
type ready =
| Spawn of Fiber.t * (unit -> unit)
| Continue of Fiber.t * (unit, unit) Effect.Deep.continuation
| Resume of Fiber.t * (Exn_bt.t option, unit) Effect.Deep.continuation
type t = {
ready : ready Queue.t;
needs_wakeup : bool Atomic.t;
num_alive_fibers : int Atomic.t;
mutex : Mutex.t;
condition : Condition.t;
resume :
Trigger.t ->
Fiber.t ->
(Exn_bt.t option, unit) Effect.Deep.continuation ->
unit;
retc : unit -> unit;
}
let rec spawn t n forbid packed = function
| [] -> Atomic.fetch_and_add t.num_alive_fibers n |> ignore
| main :: mains ->
let fiber = Fiber.create_packed ~forbid packed in
Queue.push t.ready (Spawn (fiber, main));
spawn t (n + 1) forbid packed mains
let continue = Some (fun k -> Effect.Deep.continue k ())
let rec next t =
match Queue.pop_exn t.ready with
| Spawn (fiber, main) ->
let current =
Some (fun k -> Effect.Deep.continue k fiber)
and yield =
Some
(fun k ->
Queue.push t.ready (Continue (fiber, k));
next t)
and discontinue = Some (fun k -> Fiber.continue fiber k ()) in
let[@alert "-handler"] effc (type a) :
a Effect.t -> ((a, _) Effect.Deep.continuation -> _) option = function
| Fiber.Current ->
current
| Fiber.Spawn r ->
if Fiber.is_canceled fiber then discontinue
else begin
spawn t 0 r.forbid (Packed r.computation) r.mains;
continue
end
| Fiber.Yield -> yield
| Computation.Cancel_after r -> begin
if Fiber.is_canceled fiber then discontinue
else
match
Select.cancel_after r.computation ~seconds:r.seconds r.exn_bt
with
| () -> continue
| exception exn ->
let exn_bt = Exn_bt.get exn in
Some (fun k -> Exn_bt.discontinue k exn_bt)
end
| Trigger.Await trigger ->
Some
(fun k ->
if Fiber.try_suspend fiber trigger fiber k t.resume then next t
else Fiber.resume fiber k)
| _ -> None
in
Effect.Deep.match_with main () { retc = t.retc; exnc = raise; effc }
| Continue (fiber, k) -> Fiber.continue fiber k ()
| Resume (fiber, k) -> Fiber.resume fiber k
| exception Queue.Empty ->
if Atomic.get t.num_alive_fibers <> 0 then begin
if Atomic.get t.needs_wakeup then begin
Mutex.lock t.mutex;
match
if Atomic.get t.needs_wakeup then
Condition.wait t.condition t.mutex
with
| () -> Mutex.unlock t.mutex
| exception exn ->
Mutex.unlock t.mutex;
raise exn
end
else Atomic.set t.needs_wakeup true;
next t
end
let run ?(forbid = false) main =
Select.check_configured ();
let ready = Queue.create ()
and needs_wakeup = Atomic.make false
and num_alive_fibers = Atomic.make 1
and mutex = Mutex.create ()
and condition = Condition.create () in
let rec t =
{ ready; needs_wakeup; num_alive_fibers; mutex; condition; resume; retc }
and retc () =
Atomic.decr t.num_alive_fibers;
next t
and resume trigger fiber k =
let resume = Resume (fiber, k) in
if Fiber.unsuspend fiber trigger then
Queue.push t.ready resume
else
Queue.push_head t.ready resume;
if
Atomic.get t.needs_wakeup
&& Atomic.compare_and_set t.needs_wakeup true false
then begin
begin
match Mutex.lock t.mutex with
| () -> Mutex.unlock t.mutex
| exception Sys_error _ ->
()
end;
Condition.broadcast t.condition
end
in
let computation = Computation.create () in
let fiber = Fiber.create ~forbid computation in
let main = Computation.capture computation main in
Queue.push t.ready (Spawn (fiber, main));
next t;
Computation.await computation