Source file threadext.ml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
(*
 * Copyright (C) 2006-2009 Citrix Systems Inc.
 *
 * This program is free software; you can redistribute it and/or modify
 * it under the terms of the GNU Lesser General Public License as published
 * by the Free Software Foundation; version 2.1 only. with the special
 * exception on linking described in file LICENSE.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU Lesser General Public License for more details.
 *)

module M = Mutex

module Mutex = struct
  (** execute the function f with the mutex hold *)
  let execute lock f =
    Mutex.lock lock;
    Xapi_stdext_pervasives.Pervasiveext.finally f (fun () -> Mutex.unlock lock)
end

(** Parallel List.iter. Remembers all exceptions and returns an association list mapping input x to an exception.
    Applications of x which succeed will be missing from the returned list. *)
let thread_iter_all_exns f xs =
  let exns = ref [] in
  let m = M.create () in
  List.iter
    Thread.join
    (List.map
       (fun x ->
          Thread.create
            (fun () ->
               try
                 f x
               with e -> Mutex.execute m (fun () -> exns := (x, e) :: !exns)
            )
            ()
       ) xs);
  !exns

(** Parallel List.iter. Remembers one exception (at random) and throws it in the
    error case. *)
let thread_iter f xs = match thread_iter_all_exns f xs with
  | [] -> ()
  | (_, e) :: _ -> raise e

module Delay = struct
  (* Concrete type is the ends of a pipe *)
  type t = {
    (* A pipe is used to wake up a thread blocked in wait: *)
    mutable pipe_out: Unix.file_descr option;
    mutable pipe_in: Unix.file_descr option;
    (* Indicates that a signal arrived before a wait: *)
    mutable signalled: bool;
    m: M.t
  }

  let make () =
    { pipe_out = None;
      pipe_in = None;
      signalled = false;
      m = M.create () }

  exception Pre_signalled

  let wait (x: t) (seconds: float) =
    let finally = Xapi_stdext_pervasives.Pervasiveext.finally in
    let to_close = ref [ ] in
    let close' fd =
      if List.mem fd !to_close then Unix.close fd;
      to_close := List.filter (fun x -> fd <> x) !to_close in
    finally
      (fun () ->
         try
           let pipe_out = Mutex.execute x.m
               (fun () ->
                  if x.signalled then begin
                    x.signalled <- false;
                    raise Pre_signalled;
                  end;
                  let pipe_out, pipe_in = Unix.pipe () in
                  (* these will be unconditionally closed on exit *)
                  to_close := [ pipe_out; pipe_in ];
                  x.pipe_out <- Some pipe_out;
                  x.pipe_in <- Some pipe_in;
                  x.signalled <- false;
                  pipe_out) in
           let r, _, _ = Unix.select [ pipe_out ] [] [] seconds in
           (* flush the single byte from the pipe *)
           if r <> [] then ignore(Unix.read pipe_out (Bytes.create 1) 0 1);
           (* return true if we waited the full length of time, false if we were woken *)
           r = []
         with Pre_signalled -> false
      )
      (fun () ->
         Mutex.execute x.m
           (fun () ->
              x.pipe_out <- None;
              x.pipe_in <- None;
              List.iter close' !to_close)
      )

  let signal (x: t) =
    Mutex.execute x.m
      (fun () ->
         match x.pipe_in with
         | Some fd -> ignore(Unix.write fd (Bytes.of_string "X") 0 1)
         | None -> x.signalled <- true (* If the wait hasn't happened yet then store up the signal *)
      )
end