Source file thread.ml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
open Amqp_client_lib
let (>>=) = Lwt.(>>=)
let (>>|) = Lwt.(>|=)
let return = Lwt.return
let after ms = Lwt_unix.sleep (ms /. 1000.0)
let spawn ?exn_handler t = Lwt.async (fun () ->
    match exn_handler with
    | Some handler -> Lwt.catch (fun () -> t) handler
    | None -> t
  )

let with_timeout milliseconds deferred =
  Lwt.pick [
    Lwt_unix.sleep (float_of_int milliseconds /. 1000.) >>| (fun () -> `Timeout);
    deferred >>| (fun success -> `Result success)
  ]

(* Replace with simpler Lwt.wait *)
module Ivar = struct
  type 'a t = { t: 'a Lwt.t;
                u: 'a Lwt.u;
              }
  let create () =
    let (t,u) = Lwt.wait () in
    { t; u }

  let create_full v =
    let t = create () in
    Lwt.wakeup_later t.u v;
    t

  let is_full t =
    Lwt.is_sleeping t.t |> not

  let fill t v =
    match is_full t with
    | false ->
      Lwt.wakeup_later t.u v;
    | true -> failwith "Var already filled"

  let read t = t.t

  let fill_if_empty t v =
    match is_full t with
    | false -> fill t v
    | true -> ()
end

module Deferred = struct
  type 'a t = 'a Lwt.t
  let all_unit = Lwt.join
  let try_with f =
    let open Lwt in
    let var = Ivar.create () in
    let hook = !async_exception_hook in
    async_exception_hook := (Ivar.fill var);
    catch (fun () -> (f () >>= fun r -> return (`Ok r)) <?>
           (Ivar.read var >>= fun e -> return (`Error e)))
      (fun exn -> return (`Error exn)) >>= fun x ->
    async_exception_hook := hook;
    return x

  module List = struct
    let init ~f n =
      let rec inner = function
        | i when i = n -> []
        | i -> i :: inner (i + 1)
      in
      inner 0 |> Lwt_list.map_p f

    let iter ?(how:[>`Sequential | `Parallel] = `Parallel) ~f l =
      match how with
      | `Sequential -> Lwt_list.iter_s f l
      | `Parallel -> Lwt_list.iter_p f l
  end
end

module Log = struct
  let section = Lwt_log.Section.make "amqp-client"

  let debug fmt = Lwt_log.ign_debug_f ~section fmt
  let info fmt = Lwt_log.ign_info_f ~section fmt
  let error fmt = Lwt_log.ign_error_f ~section fmt
end

(* Pipes. Bounds are not implemented yet. *)
module Pipe = struct
  type 'a elem = Data of 'a
               | Flush of unit Lwt_condition.t

  type 'a t = { cond: unit Lwt_condition.t;
                queue: 'a elem Ocaml_lib.Queue.t;
                mutable closed: bool;
              }

  module Reader = struct
    type nonrec 'a t = 'a t
  end

  module Writer = struct
    type nonrec 'a t = 'a t
  end

  let create () =
    let t = { cond = Lwt_condition.create ();
              queue = Ocaml_lib.Queue.create ();
              closed = false;
            } in
    (t, t)

  (** Not supported yet *)
  let set_size_budget _t _budget = ()

  (* Can be readers and writers. *)
  let flush t =
    match Ocaml_lib.Queue.is_empty t.queue with
    | true -> return ()
    | false ->
      let cond = Lwt_condition.create () in
      Ocaml_lib.Queue.push (Flush cond) t.queue;
      Lwt_condition.wait cond

  let rec read_raw t =
    match Ocaml_lib.Queue.is_empty t.queue with
    | true ->
      begin match t.closed with
      | true -> return `Eof
      | false ->
        Lwt_condition.wait t.cond >>= fun () ->
        read_raw t
      end
    | false ->
      return (`Ok (Ocaml_lib.Queue.pop t.queue))

  let rec read t =
    read_raw t >>= function
    | `Eof -> return `Eof
    | `Ok (Data d) -> return @@ `Ok d
    | `Ok (Flush cond) ->
      Lwt_condition.signal cond ();
      read t

  let write_raw t data =
    Ocaml_lib.Queue.push data t.queue;
    Lwt_condition.broadcast t.cond ()

  let write_without_pushback t data =
    write_raw t (Data data)

  let write t data =
    write_without_pushback t data;
    return ()

  let rec iter t ~f =
    read t >>= function
    | `Eof -> return ()
    | `Ok d -> f d >>= fun () -> iter t ~f

  let rec iter_without_pushback t ~f =
    read t >>= function
    | `Eof -> return ()
    | `Ok d -> f d; iter_without_pushback t ~f

  (* Pipe of pipes. Must spawn more *)
  let interleave_pipe t =
    let (reader, writer) = create () in
    let rec copy t =
      read_raw t >>= function
      | `Eof -> return ()
      | `Ok data ->
        write_raw writer data;
        copy t
    in
    spawn (iter_without_pushback t ~f:(fun p -> spawn (copy p)));
    reader

  let transfer_in ~from:queue t =
    Ocaml_lib.Queue.iter (write_without_pushback t) queue;
    return ()

  let close t =
    t.closed <- true;
    begin match Ocaml_lib.Queue.is_empty t.queue with
    | true -> return ()
    | false -> flush t
    end >>= fun () ->
    return ()

  let close_without_pushback t =
    t.closed <- true;
    Lwt_condition.broadcast t.cond ()

end

module Reader = struct
  type t = Lwt_io.input_channel
  let close t = Lwt_io.close t

  let read input buf : [ `Eof of int | `Ok ] Deferred.t =
    let len = Bytes.length buf in
    let rec inner = function
      | n when n = len ->
          return `Ok
      | n -> begin
          Lwt.catch
            (fun () -> Lwt_io.read_into input buf n (len - n))
            (fun _exn -> return 0) >>= function
          | 0 -> return (`Eof n)
          | read -> inner (n + read)
        end
    in
    inner 0
end

module Writer = struct
  type t = string Pipe.Writer.t
  let close t = Pipe.close t
  let flush t = Pipe.flush t
  let write t data = Pipe.write_without_pushback t data
end

module Tcp = struct

  let connect  ~exn_handler ?nodelay host port =
    let fd = Lwt_unix.(socket PF_INET SOCK_STREAM 0) in
    Lwt_unix.gethostbyname host >>= fun entry ->
    let sock_addr = (Lwt_unix.ADDR_INET (entry.Lwt_unix.h_addr_list.(0), port)) in
    Lwt_io.open_connection ~fd sock_addr >>= fun (ic, oc) ->
    (* Start a process that writes *)
    let (reader, writer) = Pipe.create () in
    spawn ~exn_handler (Pipe.iter ~f:(fun str ->
        Lwt_io.write oc str) reader);

    (match nodelay with
     | Some () -> Lwt_unix.(setsockopt fd TCP_NODELAY true)
     | None -> ());
    return (ic, writer)

end

module Scheduler = struct
  let cond = Lwt_condition.create ()
  let go () = Lwt_main.run (Lwt_condition.wait cond) |> ignore
  let shutdown (n : int) = Lwt_condition.signal cond n
end