Source file mpmc_relaxed_queue.ml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
include Lockfree.Relaxed_queue
module Spin = struct
let push = push
let pop = pop
end
let ccas cell seen v =
if Atomic.get cell != seen then false else Atomic.compare_and_set cell seen v
module Not_lockfree = struct
let spin_threshold = 30
let try_other_exit_every_n = 10
let time_to_try_push_forward n = n mod try_other_exit_every_n == 0
let push { array; tail; head; mask; _ } item =
let tail_val = Atomic.fetch_and_add tail 1 in
let index = tail_val land mask in
let cell = Array.get array index in
let i = ref 0 in
while
!i < spin_threshold && not (Atomic.compare_and_set cell None (Some item))
do
i := !i + 1
done;
let rec take_or_rollback nth_attempt =
if Atomic.compare_and_set cell None (Some item) then
true
else if ccas tail (tail_val + 1) tail_val then
false
else if
time_to_try_push_forward nth_attempt && ccas head tail_val (tail_val + 1)
then
false
else
take_or_rollback (nth_attempt + 1)
in
if !i < spin_threshold then true else take_or_rollback 0
let take_item cell =
let value = Atomic.get cell in
if Option.is_some value && Atomic.compare_and_set cell value None then value
else None
let pop queue =
let ({ array; head; tail; mask; _ } : 'a t) = queue in
let head_value = Atomic.get head in
let tail_value = Atomic.get tail in
if head_value - tail_value >= 0 then None
else
let old_head = Atomic.fetch_and_add head 1 in
let cell = Array.get array (old_head land mask) in
let i = ref 0 in
let item = ref None in
while !i < spin_threshold && not (Option.is_some !item) do
item := take_item cell;
i := !i + 1
done;
let rec take_or_rollback nth_attempt =
let value = Atomic.get cell in
if Option.is_some value && Atomic.compare_and_set cell value None then
value
else if ccas head (old_head + 1) old_head then
None
else if
time_to_try_push_forward nth_attempt
&& ccas tail old_head (old_head + 1)
then
None
else take_or_rollback (nth_attempt + 1)
in
if Option.is_some !item then !item else take_or_rollback 0
module CAS_interface = struct
let rec push ({ array; tail; head; mask; _ } as t) item =
let tail_val = Atomic.get tail in
let head_val = Atomic.get head in
let size = mask + 1 in
if tail_val - head_val >= size then false
else if ccas tail tail_val (tail_val + 1) then (
let index = tail_val land mask in
let cell = Array.get array index in
while not (Atomic.compare_and_set cell None (Some item)) do
()
done;
true)
else push t item
let rec pop ({ array; tail; head; mask; _ } as t) =
let tail_val = Atomic.get tail in
let head_val = Atomic.get head in
if head_val - tail_val >= 0 then None
else if ccas head head_val (head_val + 1) then (
let index = head_val land mask in
let cell = Array.get array index in
let item = ref (Atomic.get cell) in
while
not (Option.is_some !item && Atomic.compare_and_set cell !item None)
do
item := Atomic.get cell
done;
!item)
else pop t
end
end