1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
open Events
open Event
let drop_event_type : 'b -> 'a WithAttributes.t -> 'b WithAttributes.t =
fun b e -> WithAttributes.map (fun _ -> b) e
let partition_events l =
let rec partition_events sys queue task = function
| [] -> sys, queue, task
| { WithAttributes.it = SystemEvent x; _ } as e :: rest ->
partition_events ((drop_event_type x e,e.priority) :: sys) queue task rest
| { WithAttributes.it = QueueEvent x; _ } as e :: rest ->
partition_events sys ((drop_event_type x e,e.priority) :: queue) task rest
| { WithAttributes.it = Task x; _ } as e :: rest ->
partition_events sys queue ((drop_event_type x e,e.priority) :: task) rest
in
partition_events [] [] [] l
let next_deadline delta = Unix.gettimeofday() +. delta
module Todo = struct
type 'a t = {
ready : 'a WithAttributes.t Sorted.t;
system : ('a system_event) WithAttributes.t Sorted.t;
queue : ('a queue_event) WithAttributes.t Sorted.t;
tasks : ('a task_event) WithAttributes.t Sorted.t;
}
[@@deriving show]
let empty = {
system = Sorted.nil;
queue = Sorted.nil ;
tasks = Sorted.nil;
ready = Sorted.nil;
}
let prune_cancelled { system; queue; tasks; ready } =
let not_cancelled { WithAttributes.cancelled; _ } = !cancelled = false in
let system = Sorted.filter not_cancelled system in
let queue = Sorted.filter not_cancelled queue in
let tasks = Sorted.filter not_cancelled tasks in
let ready = Sorted.filter not_cancelled ready in
{ system; queue; tasks; ready }
let size todo =
let { system; queue; tasks; ready } = prune_cancelled todo in
Sorted.length system + Sorted.length queue + Sorted.length tasks + Sorted.length ready
let is_empty todo =
let { system; queue; tasks; ready } = prune_cancelled todo in
Sorted.is_nil system &&
Sorted.is_nil queue &&
Sorted.is_nil tasks &&
Sorted.is_nil ready
let tick = ref 0
let add { system; queue; tasks; ready } l =
let tick ({ WithAttributes.priority; _ } as e) =
incr tick;
let priority = { priority with insertion = !tick } in
{ e with priority } in
let l = List.map tick l in
let new_sys, new_queue, new_tasks = partition_events l in
{
ready;
system = Sorted.append system (Sorted.of_list new_sys);
queue = Sorted.append queue (Sorted.of_list new_queue);
tasks = Sorted.append tasks (Sorted.of_list new_tasks);
}
end
let pull_ready ~advance l =
let rec pull_ready yes no min_priority l =
match Sorted.look l with
| Sorted.Nil -> yes, no, min_priority
| Sorted.Cons(({ WithAttributes.it; cancelled; priority; _ } as e, _), rest) ->
match advance cancelled it with
| Yes y ->
let min_priority = min min_priority priority in
let e = drop_event_type y e in
pull_ready (Sorted.cons e e.priority yes) no min_priority rest
| No x ->
pull_ready yes (Sorted.cons { e with it = x } e.priority no) min_priority rest
in
pull_ready Sorted.nil Sorted.nil Sorted.max_priority l
type ('a,'b) ev_checker =
'a WithAttributes.t Sorted.t -> 'b WithAttributes.t Sorted.t * 'a WithAttributes.t Sorted.t * Sorted.priority
let file_descriptors_of l =
Sorted.map_filter (function { WithAttributes.it = ReadInProgress(fd,_); _ } -> Some fd | _ -> None) l
let check_for_system_events : ('a system_event,'a) ev_checker = fun waiting ->
let fds = file_descriptors_of waiting in
let ready_fds, _, _ = Unix.select fds [] [] 0.0 in
let new_ready, waiting, min_prio = pull_ready ~advance:(advance_system ~ready_fds) waiting in
new_ready, waiting, min_prio
let check_for_queue_events : ('a queue_event,'a) ev_checker =
fun waiting ->
let new_ready, waiting, min_prio = pull_ready ~advance:advance_queue waiting in
new_ready, waiting, min_prio
let rec wait_for_system_or_queue_events ~deadline (fds,sys) queue =
if Unix.gettimeofday () > deadline then Sorted.nil, Sorted.nil, sys, queue, Sorted.max_priority
else
let ready_fds, _, _ = Unix.select fds [] [] 0.1 in
let ready_sys, waiting_sys, min_prio_sys = pull_ready ~advance:(advance_system ~ready_fds) sys in
let ready_queue, waiting_queue, min_prio_queue = pull_ready ~advance:advance_queue queue in
if ready_sys <> Sorted.nil || ready_queue <> Sorted.nil
then ready_sys, ready_queue, waiting_sys, waiting_queue, Sorted.min_priority min_prio_queue min_prio_sys
else wait_for_system_or_queue_events ~deadline (fds,waiting_sys) queue
let wait_for_system_or_queue_events ~deadline sys queue =
let fds = file_descriptors_of sys in
wait_for_system_or_queue_events ~deadline (fds,sys) queue
let rec pull_tasks min_prio l =
match Sorted.look l with
| Sorted.Nil -> Sorted.nil, Sorted.nil
| Sorted.Cons((x,p),l) when Sorted.le_user p min_prio ->
let tasks, l = pull_tasks min_prio l in
Sorted.cons x p tasks, l
| _ -> Sorted.nil, l
let postpone p ready =
let leq_user_prio { WithAttributes.priority = q; _} = Sorted.le_user q p in
let ready, postponed = Sorted.partition leq_user_prio ready in
ready, postponed
let wait ?(deadline=max_float) todo : 'a WithAttributes.t list * 'a Todo.t =
let open Todo in
let { system; queue; tasks; ready } as todo = prune_cancelled todo in
if is_empty todo then
[], todo
else
let ready_sys, waiting_sys, min_prio_sys = check_for_system_events system in
let ready_queue, waiting_queue, min_prio_queue = check_for_queue_events queue in
if Sorted.is_nil ready_sys &&
Sorted.is_nil ready_queue &&
Sorted.is_nil ready &&
Sorted.is_nil tasks
then
let ready_sys, ready_queue, waiting_sys, waiting_queue, min_prio =
wait_for_system_or_queue_events ~deadline waiting_sys waiting_queue in
let ready_sys, postponed_sys = postpone min_prio ready_sys in
let ready_queue, postponed_queue = postpone min_prio ready_queue in
let postponed = Sorted.append postponed_sys postponed_queue in
let ready = Sorted.to_list (Sorted.append ready_sys ready_queue) in
ready, { system = waiting_sys; queue = waiting_queue; tasks; ready = postponed }
else
let min_prio, ready = Sorted.min ready in
let min_prio = Sorted.min_priority min_prio min_prio_sys in
let min_prio = Sorted.min_priority min_prio min_prio_queue in
let ready_old, postponed_ready = pull_tasks min_prio ready in
let ready_tasks, tasks = pull_tasks min_prio tasks in
let ready_sys, postponed_sys = postpone min_prio ready_sys in
let ready_queue, postponed_queue = postpone min_prio ready_queue in
let postponed = Sorted.concat [postponed_sys; postponed_queue; postponed_ready] in
let ready = Sorted.to_list (Sorted.concat [ready_sys; ready_queue; ready_tasks; ready_old]) in
ready, { system = waiting_sys; queue = waiting_queue; tasks; ready = postponed }
let pop_return (ready, todo) =
match ready with
| { WithAttributes.it; _} :: rest ->
let rest_w_prio = List.map (fun x -> x, x.WithAttributes.priority) rest in
let ready = Sorted.append todo.Todo.ready (Sorted.of_list rest_w_prio) in
let todo = { todo with Todo.ready } in
Some it, todo
| [] ->
None, todo
let pop l =
match pop_return @@ wait l with
| None, _ -> raise @@ Failure "nothing to pop"
| Some x, t -> x, t
let pop_opt l = pop_return @@ wait l
let pop_timeout ~stop_after_being_idle_for:delta l =
let deadline = next_deadline delta in
pop_return @@ wait ~deadline l
let wait_return (l,todo) =
l |> List.map (fun x -> x.WithAttributes.it), todo
let wait_timeout ~stop_after_being_idle_for:delta l =
let deadline = next_deadline delta in
wait_return @@ wait ~deadline l
let wait l = wait_return @@ wait l
include Events