Source file sel.ml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
(**************************************************************************)
(*                                                                        *)
(*                                 SEL                                    *)
(*                                                                        *)
(*                   Copyright INRIA and contributors                     *)
(*       (see version control and README file for authors & dates)        *)
(*                                                                        *)
(**************************************************************************)
(*                                                                        *)
(*   This file is distributed under the terms of the MIT License.         *)
(*   See LICENSE file.                                                    *)
(*                                                                        *)
(**************************************************************************)

open Events
open Event

let drop_event_type : 'b -> 'a WithAttributes.t -> 'b WithAttributes.t =
  fun b e -> WithAttributes.map (fun _ -> b) e
  
(* Splits the events intro 3 lists: systm, queue and task *)
let partition_events l =
  let rec partition_events sys queue task = function
    | [] -> sys, queue, task
    | { WithAttributes.it = SystemEvent x; _ } as e :: rest ->
        partition_events ((drop_event_type x e,e.priority) :: sys) queue task rest
    | { WithAttributes.it = QueueEvent x; _ } as e :: rest ->
        partition_events sys ((drop_event_type x e,e.priority) :: queue) task rest
    | { WithAttributes.it = Task x; _ } as e :: rest ->
        partition_events sys queue ((drop_event_type x e,e.priority) :: task) rest
  in
    partition_events [] [] [] l

let next_deadline delta = Unix.gettimeofday() +. delta

module Todo = struct

type 'a t = {
  (* The pop API may need to postpone ready events *)
  ready : 'a WithAttributes.t Sorted.t;
  (* The three queues of events *)
  system : ('a system_event) WithAttributes.t Sorted.t;
  queue  : ('a queue_event) WithAttributes.t Sorted.t;
  tasks  : ('a task_event) WithAttributes.t Sorted.t;
}
[@@deriving show]
let empty = {
  system = Sorted.nil;
  queue = Sorted.nil ;
  tasks = Sorted.nil;
  ready = Sorted.nil;
}

let prune_cancelled { system; queue; tasks; ready } =
  let not_cancelled { WithAttributes.cancelled; _ } = !cancelled = false in
  let system = Sorted.filter not_cancelled system in
  let queue  = Sorted.filter not_cancelled queue in
  let tasks  = Sorted.filter not_cancelled tasks in
  let ready  = Sorted.filter not_cancelled ready in
  { system; queue; tasks; ready }

let size todo =
  let { system; queue; tasks; ready } = prune_cancelled todo in
  Sorted.length system + Sorted.length queue + Sorted.length tasks + Sorted.length ready

let is_empty todo =
  let { system; queue; tasks; ready } = prune_cancelled todo in
  Sorted.is_nil system &&
  Sorted.is_nil queue &&
  Sorted.is_nil tasks &&
  Sorted.is_nil ready

(* In order to preserve insertion order we tag each event with a tick *)
let tick = ref 0

let add { system; queue; tasks; ready } l =
  let tick ({ WithAttributes.priority; _ } as e) =
    incr tick;
    let priority = { priority with insertion = !tick } in
    { e with priority } in
  let l = List.map tick l in
  let new_sys, new_queue, new_tasks = partition_events l in
  {
    ready;
    system = Sorted.append system (Sorted.of_list new_sys);
    queue = Sorted.append queue (Sorted.of_list new_queue);
    tasks = Sorted.append tasks (Sorted.of_list new_tasks);
  }
  
end

(* Like List.filter but also returns the minimum priority of ready events.
   Moreover ~advance can make the event advance (whilst not being ready yet)*)
let pull_ready ~advance l =
  let rec pull_ready yes no min_priority l =
    match Sorted.look l with
    | Sorted.Nil -> yes, no, min_priority
    | Sorted.Cons(({ WithAttributes.it; cancelled; priority; _ } as e, _), rest) ->
        match advance cancelled it with
        | Yes y ->
          let min_priority = min min_priority priority in
          let e = drop_event_type y e in
          pull_ready (Sorted.cons e e.priority yes) no min_priority rest
        | No x  ->
          pull_ready yes (Sorted.cons { e with it = x } e.priority no) min_priority rest 
  in
    pull_ready Sorted.nil Sorted.nil Sorted.max_priority l
  
type ('a,'b) ev_checker =
  'a WithAttributes.t Sorted.t -> 'b WithAttributes.t Sorted.t * 'a WithAttributes.t Sorted.t * Sorted.priority

let file_descriptors_of l =
  Sorted.map_filter (function { WithAttributes.it = ReadInProgress(fd,_); _ } -> Some fd | _ -> None) l

(* For fairness reasons, even if there are immediately ready events we
   give a shot to system events with 0 wait, otherwise we wait until a
   system event is ready. We never sleep forever, since process death events
   do not wakeup select: we anyway wake up 10 times per second *)
let check_for_system_events : ('a system_event,'a) ev_checker = fun waiting ->
    let fds = file_descriptors_of waiting in
    let ready_fds, _, _ = Unix.select fds [] [] 0.0 in
    let new_ready, waiting, min_prio  = pull_ready ~advance:(advance_system ~ready_fds) waiting in
    new_ready, waiting, min_prio
  
let check_for_queue_events : ('a queue_event,'a) ev_checker =
  fun waiting ->
    let new_ready, waiting, min_prio = pull_ready ~advance:advance_queue waiting in
    new_ready, waiting, min_prio  

(* This is blocking wait (modulo a deadline). We check for system events
   (io, process death) or a queue (in case some thread puts a token there). *)
let rec wait_for_system_or_queue_events ~deadline (fds,sys) queue =
  if Unix.gettimeofday () > deadline then Sorted.nil, Sorted.nil, sys, queue, Sorted.max_priority
  else
    let ready_fds, _, _ = Unix.select fds [] [] 0.1 in
    let ready_sys, waiting_sys, min_prio_sys = pull_ready ~advance:(advance_system ~ready_fds) sys in
    let ready_queue, waiting_queue, min_prio_queue = pull_ready ~advance:advance_queue queue in
    if ready_sys <> Sorted.nil || ready_queue <> Sorted.nil
    then ready_sys, ready_queue, waiting_sys, waiting_queue, Sorted.min_priority min_prio_queue min_prio_sys
    else wait_for_system_or_queue_events ~deadline (fds,waiting_sys) queue

let wait_for_system_or_queue_events ~deadline sys queue =
  let fds = file_descriptors_of sys in
  wait_for_system_or_queue_events ~deadline (fds,sys) queue

let rec pull_tasks min_prio l =
  match Sorted.look l with
  | Sorted.Nil -> Sorted.nil, Sorted.nil
  | Sorted.Cons((x,p),l) when Sorted.le_user p min_prio ->
      let tasks, l = pull_tasks min_prio l in
      Sorted.cons x p tasks, l
  | _ -> Sorted.nil, l
  
(* Keep only events with a user priority equal to the given one (assumed to be the minimum) *)
let postpone p ready =
  let leq_user_prio { WithAttributes.priority = q; _} = Sorted.le_user q p in
  let ready, postponed = Sorted.partition leq_user_prio ready in
  ready, postponed
        
let wait ?(deadline=max_float) todo : 'a WithAttributes.t list * 'a Todo.t =
  let open Todo in
  let { system; queue; tasks; ready } as todo = prune_cancelled todo in
  if is_empty todo then
    [], todo
  else
    let ready_sys, waiting_sys, min_prio_sys = check_for_system_events system in
    let ready_queue, waiting_queue, min_prio_queue = check_for_queue_events queue in
    if Sorted.is_nil ready_sys &&
       Sorted.is_nil ready_queue &&
       Sorted.is_nil ready &&
       Sorted.is_nil tasks
    then
      let ready_sys, ready_queue, waiting_sys, waiting_queue, min_prio =
        wait_for_system_or_queue_events ~deadline waiting_sys waiting_queue in
      let ready_sys, postponed_sys = postpone min_prio ready_sys in
      let ready_queue, postponed_queue = postpone min_prio ready_queue in
      let postponed = Sorted.append postponed_sys postponed_queue in
      let ready = Sorted.to_list (Sorted.append ready_sys ready_queue) in
      ready, { system = waiting_sys; queue = waiting_queue; tasks; ready = postponed }
    else
      let min_prio, ready = Sorted.min ready in
      let min_prio = Sorted.min_priority min_prio min_prio_sys in
      let min_prio = Sorted.min_priority min_prio min_prio_queue in
      let ready_old, postponed_ready = pull_tasks min_prio ready in
      let ready_tasks, tasks = pull_tasks min_prio tasks in
      let ready_sys, postponed_sys = postpone min_prio ready_sys in
      let ready_queue, postponed_queue = postpone min_prio ready_queue in
      let postponed = Sorted.concat [postponed_sys; postponed_queue; postponed_ready] in
      let ready = Sorted.to_list (Sorted.concat [ready_sys; ready_queue; ready_tasks; ready_old]) in
      ready, { system = waiting_sys; queue = waiting_queue; tasks; ready = postponed }

let pop_return (ready, todo) =
  match ready with
  | { WithAttributes.it; _} :: rest ->
    let rest_w_prio = List.map (fun x -> x, x.WithAttributes.priority) rest in
    let ready = Sorted.append todo.Todo.ready (Sorted.of_list rest_w_prio) in
    let todo = { todo with Todo.ready } in
    Some it, todo
  | [] ->
    None, todo

let pop l =
  match pop_return @@ wait l with
  | None, _ -> raise @@ Failure "nothing to pop"
  | Some x, t -> x, t
let pop_opt l = pop_return @@ wait l

let pop_timeout ~stop_after_being_idle_for:delta l = 
  let deadline = next_deadline delta in
  pop_return @@ wait ~deadline l

let wait_return (l,todo) =
  l |> List.map (fun x -> x.WithAttributes.it), todo

let wait_timeout ~stop_after_being_idle_for:delta l =
  let deadline = next_deadline delta in
  wait_return @@ wait ~deadline l

let wait l = wait_return @@ wait l

include Events