1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
open Events
open Event
let drop_event_type : 'b -> 'a WithAttributes.t -> 'b WithAttributes.t =
fun b e -> WithAttributes.map (fun _ -> b) e
let partition_events l =
let rec partition_events sys queue task = function
| [] -> sys, queue, task
| { WithAttributes.it = SystemEvent x; _ } as e :: rest ->
partition_events ((drop_event_type x e,e.priority) :: sys) queue task rest
| { WithAttributes.it = QueueEvent x; _ } as e :: rest ->
partition_events sys ((drop_event_type x e,e.priority) :: queue) task rest
| { WithAttributes.it = Task x; _ } as e :: rest ->
partition_events sys queue ((drop_event_type x e,e.priority) :: task) rest
in
partition_events [] [] [] l
let next_deadline delta = Unix.gettimeofday() +. delta
module Todo = struct
type 'a t = {
ready : 'a WithAttributes.t Sorted.t;
system : ('a system_event) WithAttributes.t Sorted.t;
queue : ('a queue_event) WithAttributes.t Sorted.t;
tasks : ('a task_event) WithAttributes.t Sorted.t;
}
[@@deriving show]
let empty = {
system = Sorted.nil;
queue = Sorted.nil ;
tasks = Sorted.nil;
ready = Sorted.nil;
}
let prune_cancelled { system; queue; tasks; ready } =
let not_cancelled { WithAttributes.cancelled; _ } = !cancelled = false in
let system = Sorted.filter not_cancelled system in
let queue = Sorted.filter not_cancelled queue in
let tasks = Sorted.filter not_cancelled tasks in
let ready = Sorted.filter not_cancelled ready in
{ system; queue; tasks; ready }
let size todo =
let { system; queue; tasks; ready } = prune_cancelled todo in
Sorted.length system + Sorted.length queue + Sorted.length tasks + Sorted.length ready
let is_empty todo =
let { system; queue; tasks; ready } = prune_cancelled todo in
Sorted.is_nil system &&
Sorted.is_nil queue &&
Sorted.is_nil tasks &&
Sorted.is_nil ready
let tick = ref 0
let add { system; queue; tasks; ready } l =
let tick ({ WithAttributes.priority; _ } as e) =
incr tick;
let priority = { priority with insertion = !tick } in
{ e with priority } in
let l = List.map tick l in
let new_sys, new_queue, new_tasks = partition_events l in
{
ready;
system = Sorted.append system (Sorted.of_list new_sys);
queue = Sorted.append queue (Sorted.of_list new_queue);
tasks = Sorted.append tasks (Sorted.of_list new_tasks);
}
end
let pull_ready ~advance st l =
let rec pull_ready yes min_priority_ready no st l =
match Sorted.look l with
| Sorted.Nil -> yes, no, min_priority_ready
| Sorted.Cons(({ WithAttributes.it; cancelled; priority; _ } as e, _), rest) ->
match advance st cancelled it with
| st, Yes y ->
let min_priority_ready = Sorted.min_user min_priority_ready priority in
let e = drop_event_type y e in
pull_ready (Sorted.cons e e.priority yes) min_priority_ready no st rest
| st, Advanced x ->
pull_ready yes min_priority_ready (Sorted.cons { e with it = x } e.priority no) st rest
| st, No x ->
pull_ready yes min_priority_ready (Sorted.cons { e with it = x } e.priority no) st rest
in
pull_ready Sorted.nil Sorted.max_priority Sorted.nil st l
type ('a,'b) ev_checker =
'a WithAttributes.t Sorted.t -> 'b WithAttributes.t Sorted.t * 'a WithAttributes.t Sorted.t * Sorted.priority
let file_descriptors_of l =
Sorted.map_filter (function { WithAttributes.it = ReadInProgress(fd,_); _ } -> Some fd | _ -> None) l
let filter_file_descriptor fds = function
| { WithAttributes.it = ReadInProgress(fd,_); _ } -> List.mem fd fds
| _ -> false
let check_for_system_events min_prio_task_queue : ('a system_event,'a) ev_checker = fun waiting ->
let rec check_for_system_events new_ready waiting_skipped min_prio_ready waiting =
let fds = file_descriptors_of waiting in
let ready_fds, _, _ = Unix.select fds [] [] 0.0 in
let new_ready_1, waiting, min_prio_ready_1 = pull_ready ~advance:advance_system ready_fds waiting in
let new_ready = Sorted.append new_ready_1 new_ready in
let min_prio_ready = Sorted.min_user min_prio_ready_1 min_prio_ready in
if ready_fds = [] then
new_ready, Sorted.append waiting waiting_skipped, min_prio_ready
else
let waiting, waiting_skipped_1 = Sorted.partition (filter_file_descriptor ready_fds) waiting in
let waiting_skipped = Sorted.concat [waiting_skipped_1; waiting_skipped] in
check_for_system_events new_ready waiting_skipped min_prio_ready waiting
in
let waiting, waiting_skipped = Sorted.partition_priority (fun x -> Sorted.le_user x min_prio_task_queue) waiting in
check_for_system_events Sorted.nil waiting_skipped Sorted.max_priority waiting
let check_for_queue_events : ('a queue_event,'a) ev_checker =
fun waiting ->
let new_ready, waiting, min_prio = pull_ready ~advance:advance_queue () waiting in
new_ready, waiting, min_prio
let rec wait_for_system_or_queue_events ~deadline (fds,sys) queue =
if Unix.gettimeofday () > deadline then Sorted.nil, Sorted.nil, sys, queue, Sorted.max_priority
else
let ready_fds, _, _ = Unix.select fds [] [] 0.1 in
let ready_sys, waiting_sys, min_prio_sys = pull_ready ~advance:advance_system ready_fds sys in
let ready_queue, waiting_queue, min_prio_queue = pull_ready ~advance:advance_queue () queue in
if not(Sorted.is_nil ready_sys) || not(Sorted.is_nil ready_queue)
then
let min_prio = Sorted.min_priority min_prio_queue min_prio_sys in
let new_ready_sys, waiting_sys, min_prio_new_ready_sys = check_for_system_events min_prio waiting_sys in
Sorted.append new_ready_sys ready_sys, ready_queue, waiting_sys, waiting_queue, Sorted.min_priority min_prio_new_ready_sys min_prio
else wait_for_system_or_queue_events ~deadline (fds,waiting_sys) queue
let wait_for_system_or_queue_events ~deadline sys queue =
let fds = file_descriptors_of sys in
wait_for_system_or_queue_events ~deadline (fds,sys) queue
let rec pull_tasks min_prio l =
match Sorted.look l with
| Sorted.Nil -> Sorted.nil, Sorted.nil
| Sorted.Cons((x,p),l) when Sorted.le_user p min_prio ->
let tasks, l = pull_tasks min_prio l in
Sorted.cons x p tasks, l
| _ -> Sorted.nil, l
let postpone p ready =
let leq_user_prio { WithAttributes.priority = q; _} = Sorted.le_user q p in
let ready, postponed = Sorted.partition leq_user_prio ready in
ready, postponed
let wait ?(deadline=max_float) todo : 'a WithAttributes.t list * 'a Todo.t =
let open Todo in
let { system; queue; tasks; ready } as todo = prune_cancelled todo in
if is_empty todo then
[], todo
else
let min_prio, ready = Sorted.min ready in
let ready_queue, waiting_queue, min_prio_queue = check_for_queue_events queue in
let min_prio = Sorted.min_priority min_prio min_prio_queue in
let ready_sys, waiting_sys, min_prio_sys = check_for_system_events min_prio system in
if Sorted.is_nil ready_sys &&
Sorted.is_nil ready_queue &&
Sorted.is_nil ready &&
Sorted.is_nil tasks
then
let ready_sys, ready_queue, waiting_sys, waiting_queue, min_prio =
wait_for_system_or_queue_events ~deadline waiting_sys waiting_queue in
let ready_sys, postponed_sys = postpone min_prio ready_sys in
let ready_queue, postponed_queue = postpone min_prio ready_queue in
let postponed = Sorted.append postponed_sys postponed_queue in
let ready = Sorted.to_list (Sorted.append ready_sys ready_queue) in
ready, { system = waiting_sys; queue = waiting_queue; tasks; ready = postponed }
else
let min_prio = Sorted.min_priority min_prio min_prio_sys in
let ready_old, postponed_ready = pull_tasks min_prio ready in
let ready_tasks, tasks = pull_tasks min_prio tasks in
let ready_sys, postponed_sys = postpone min_prio ready_sys in
let ready_queue, postponed_queue = postpone min_prio ready_queue in
let postponed = Sorted.concat [postponed_sys; postponed_queue; postponed_ready] in
let ready = Sorted.to_list (Sorted.concat [ready_sys; ready_queue; ready_tasks; ready_old]) in
ready, { system = waiting_sys; queue = waiting_queue; tasks; ready = postponed }
let pop_return (ready, todo) =
match ready with
| { WithAttributes.it; _} :: rest ->
let rest_w_prio = List.map (fun x -> x, x.WithAttributes.priority) rest in
let ready = Sorted.append todo.Todo.ready (Sorted.of_list rest_w_prio) in
let todo = { todo with Todo.ready } in
Some it, todo
| [] ->
None, todo
let pop l =
match pop_return @@ wait l with
| None, _ -> raise @@ Failure "nothing to pop"
| Some x, t -> x, t
let pop_opt l = pop_return @@ wait l
let pop_timeout ~stop_after_being_idle_for:delta l =
let deadline = next_deadline delta in
pop_return @@ wait ~deadline l
let wait_return (l,todo) =
l |> List.map (fun x -> x.WithAttributes.it), todo
let wait_timeout ~stop_after_being_idle_for:delta l =
let deadline = next_deadline delta in
wait_return @@ wait ~deadline l
let wait l = wait_return @@ wait l
include Events