1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
open Picos
type state = Empty | Queue of { head : Trigger.t list; tail : Trigger.t list }
type t = state Atomic.t
let create ?(padded = false) () =
let t = Atomic.make Empty in
if padded then Multicore_magic.copy_as_padded t else t
let broadcast t =
if Atomic.get t != Empty then
match Atomic.exchange t Empty with
| Empty -> ()
| Queue r ->
List.iter Trigger.signal r.head;
List.iter Trigger.signal (List.rev r.tail)
let rec signal t backoff =
match Atomic.get t with
| Empty -> ()
| Queue r as before -> begin
match r.head with
| trigger :: head ->
signal_cas t backoff before
(if head == [] && r.tail == [] then Empty else Queue { r with head })
trigger
| [] -> begin
match List.rev r.tail with
| trigger :: head ->
signal_cas t backoff before
(if head == [] then Empty else Queue { head; tail = [] })
trigger
| [] -> failwith "impossible"
end
end
and signal_cas t backoff before after trigger =
if Atomic.compare_and_set t before after then Trigger.signal trigger
else signal t (Backoff.once backoff)
let signal t = signal t Backoff.default
let rec cleanup backoff trigger t =
match Atomic.get t with
| Empty -> ()
| Queue r as before -> begin
if r.head != [] then
match List_ext.drop_first_or_not_found trigger r.head with
| head ->
cleanup_cas backoff trigger t before
(if head == [] && r.tail == [] then Empty
else Queue { r with head })
| exception Not_found -> begin
match List_ext.drop_first_or_not_found trigger r.tail with
| tail ->
cleanup_cas backoff trigger t before (Queue { r with tail })
| exception Not_found -> signal t
end
else
match List_ext.drop_first_or_not_found trigger r.tail with
| tail ->
cleanup_cas backoff trigger t before
(if tail == [] then Empty else Queue { head = []; tail })
| exception Not_found -> signal t
end
and cleanup_cas backoff trigger t before after =
if not (Atomic.compare_and_set t before after) then
cleanup (Backoff.once backoff) trigger t
let rec wait t mutex trigger fiber backoff =
let before = Atomic.get t in
let after =
match before with
| Empty -> Queue { head = [ trigger ]; tail = [] }
| Queue r ->
if r.head != [] then Queue { r with tail = trigger :: r.tail }
else Queue { head = List.rev_append r.tail [ trigger ]; tail = [] }
in
if Atomic.compare_and_set t before after then begin
Mutex.unlock_as (Fiber.Maybe.of_fiber fiber) mutex Backoff.default;
let result = Trigger.await trigger in
let forbid = Fiber.exchange fiber ~forbid:true in
Mutex.lock_as (Fiber.Maybe.of_fiber fiber) mutex Backoff.default;
Fiber.set fiber ~forbid;
match result with
| None -> ()
| Some exn_bt ->
cleanup Backoff.default trigger t;
Exn_bt.raise exn_bt
end
else wait t mutex trigger fiber (Backoff.once backoff)
let wait t mutex =
let fiber = Fiber.current () in
let trigger = Trigger.create () in
wait t mutex trigger fiber Backoff.default