Source file picos_fifos.ml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
open Picos
module Queue = Picos_mpsc_queue
type ready =
| Spawn of Fiber.t * (unit -> unit)
| Continue of Fiber.t * (unit, unit) Effect.Deep.continuation
| Resume of Fiber.t * (Exn_bt.t option, unit) Effect.Deep.continuation
type t = {
ready : ready Queue.t;
needs_wakeup : bool Atomic.t;
num_alive_fibers : int Atomic.t;
mutex : Mutex.t;
condition : Condition.t;
resume :
Trigger.t ->
Fiber.t ->
(Exn_bt.t option, unit) Effect.Deep.continuation ->
unit;
retc : unit -> unit;
}
let rec spawn t n forbid computation = function
| [] -> Atomic.fetch_and_add t.num_alive_fibers n |> ignore
| main :: mains ->
let fiber = Fiber.create ~forbid computation in
Queue.push t.ready (Spawn (fiber, main));
spawn t (n + 1) forbid computation mains
let continue = Some (fun k -> Effect.Deep.continue k ())
let rec next t =
match Queue.pop_exn t.ready with
| Spawn (fiber, main) ->
let current =
Some (fun k -> Effect.Deep.continue k fiber)
and yield =
Some
(fun k ->
Queue.push t.ready (Continue (fiber, k));
next t)
and discontinue = Some (fun k -> Fiber.continue fiber k ()) in
let[@alert "-handler"] effc (type a) :
a Effect.t -> ((a, _) Effect.Deep.continuation -> _) option = function
| Fiber.Current ->
current
| Fiber.Spawn r ->
if Fiber.is_canceled fiber then discontinue
else begin
spawn t 0 r.forbid r.computation r.mains;
continue
end
| Fiber.Yield -> yield
| Computation.Cancel_after r ->
if Fiber.is_canceled fiber then discontinue
else begin
Picos_select.cancel_after r.computation ~seconds:r.seconds
r.exn_bt;
continue
end
| Trigger.Await trigger ->
Some
(fun k ->
if Fiber.try_suspend fiber trigger fiber k t.resume then next t
else Fiber.resume fiber k)
| _ -> None
in
Effect.Deep.match_with main () { retc = t.retc; exnc = raise; effc }
| Continue (fiber, k) -> Fiber.continue fiber k ()
| Resume (fiber, k) -> Fiber.resume fiber k
| exception Queue.Empty ->
if Atomic.get t.num_alive_fibers <> 0 then begin
if Atomic.get t.needs_wakeup then begin
Mutex.lock t.mutex;
match
if Atomic.get t.needs_wakeup then Condition.wait t.condition t.mutex
with
| () -> Mutex.unlock t.mutex
| exception exn ->
Mutex.unlock t.mutex;
raise exn
end
else Atomic.set t.needs_wakeup true;
next t
end
let run ~forbid main =
let ready = Queue.create ()
and needs_wakeup = Atomic.make false
and num_alive_fibers = Atomic.make 1
and mutex = Mutex.create ()
and condition = Condition.create () in
let rec t =
{ ready; needs_wakeup; num_alive_fibers; mutex; condition; resume; retc }
and retc () =
Atomic.decr t.num_alive_fibers;
next t
and resume trigger fiber k =
let resume = Resume (fiber, k) in
if Fiber.unsuspend fiber trigger then
Queue.push t.ready resume
else
Queue.push_head t.ready resume;
if
Atomic.get t.needs_wakeup
&& Atomic.compare_and_set t.needs_wakeup true false
then begin
Mutex.lock t.mutex;
Mutex.unlock t.mutex;
Condition.broadcast t.condition
end
in
let computation = Computation.create () in
let fiber = Fiber.create ~forbid computation in
let main = Computation.capture computation main in
Queue.push t.ready (Spawn (fiber, main));
next t;
Computation.await computation