Source file b_sync.ml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
(** the queue of actions to be executed before everything in the main loop *)
(* NOTE: if some actions take too much time, the remaining ones are postponed to
   next iteration of the main loop. *)
(* FIFO queue: order of actions is preserved *)

(* Warning: the first Sync execution is done slightly before the main loop, with
   a little more time... Make sure that's no problem. *)

open B_utils
module Time = B_time
module Var = B_var
module Trigger =  B_trigger

type action = unit -> unit

let queue : (action Queue.t) Var.t = Var.create (Queue.create ())

let is_empty () =
  Queue.is_empty (Var.get queue)

(* Warning: a Sync action cannot push another Sync action, because it will wait
   forever the release of the mutex *)
(* TODO: add a test of this using Mutex.try_lock *)
let push action =
  printd debug_thread "Sync push action";
  Var.protect_fn queue (Queue.push action);
  Trigger.push_action ()

(* If [o] is None, Sync-compute the value with [future] and apply it to [f] *)
let option o future f =
  match o with
  | Some x -> f x
  | None -> push (fun () ->
      let x = future () in f x
    )

(* Returns true if some action was executed *)
(* We assume that the whole process does not need to be mutex protected. It
   should be OK if other treads are adding to the Queue, as long as we (the main
   thread) are the only ones popping from the queue. *)
let execute timeout =
  if is_empty () then false (* a quick test in order to avoid lock *)
  else let t = Time.now () in
    let rec loop () =
      if Var.with_protect queue Queue.is_empty then () (* we exit *)
      else if Time.(now () - t > timeout)
      then (* we exit but also send a action event to finish the rest of the
              queue at next iteration. *)
        begin
          printd debug_thread "Didn't have time to finish Sync queue.";
          Trigger.push_action ()
        end
      else let action = Var.with_protect queue Queue.pop in
        printd debug_thread "Popping one action from the Sync Queue.";
        action ();
        loop ();
    in
    loop ();
    true