1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
open Amqp_client_lib
let (>>=) = Lwt.(>>=)
let (>>|) = Lwt.(>|=)
let return = Lwt.return
let after ms = Lwt_unix.sleep (ms /. 1000.0)
let spawn ?exn_handler t = Lwt.async (fun () ->
match exn_handler with
| Some handler -> Lwt.catch (fun () -> t) handler
| None -> t
)
let with_timeout milliseconds deferred =
Lwt.pick [
Lwt_unix.sleep (float_of_int milliseconds /. 1000.) >>| (fun () -> `Timeout);
deferred >>| (fun success -> `Result success)
]
module Ivar = struct
type 'a t = { t: 'a Lwt.t;
u: 'a Lwt.u;
}
let create () =
let (t,u) = Lwt.wait () in
{ t; u }
let create_full v =
let t = create () in
Lwt.wakeup_later t.u v;
t
let is_full t =
Lwt.is_sleeping t.t |> not
let fill t v =
match is_full t with
| false ->
Lwt.wakeup_later t.u v;
| true -> failwith "Var already filled"
let read t = t.t
let fill_if_empty t v =
match is_full t with
| false -> fill t v
| true -> ()
end
module Deferred = struct
type 'a t = 'a Lwt.t
let all_unit = Lwt.join
let try_with f =
let open Lwt in
let var = Ivar.create () in
let hook = !async_exception_hook in
async_exception_hook := (Ivar.fill var);
catch (fun () -> (f () >>= fun r -> return (`Ok r)) <?>
(Ivar.read var >>= fun e -> return (`Error e)))
(fun exn -> return (`Error exn)) >>= fun x ->
async_exception_hook := hook;
return x
module List = struct
let init ~f n =
let rec inner = function
| i when i = n -> []
| i -> i :: inner (i + 1)
in
inner 0 |> Lwt_list.map_p f
let iter ?(how:[>`Sequential | `Parallel] = `Parallel) ~f l =
match how with
| `Sequential -> Lwt_list.iter_s f l
| `Parallel -> Lwt_list.iter_p f l
end
end
module Log = struct
let section = Lwt_log.Section.make "amqp-client"
let debug fmt = Lwt_log.ign_debug_f ~section fmt
let info fmt = Lwt_log.ign_info_f ~section fmt
let error fmt = Lwt_log.ign_error_f ~section fmt
end
module Pipe = struct
type 'a elem = Data of 'a
| Flush of unit Lwt_condition.t
type 'a t = { cond: unit Lwt_condition.t;
queue: 'a elem Ocaml_lib.Queue.t;
mutable closed: bool;
}
module Reader = struct
type nonrec 'a t = 'a t
end
module Writer = struct
type nonrec 'a t = 'a t
end
let create () =
let t = { cond = Lwt_condition.create ();
queue = Ocaml_lib.Queue.create ();
closed = false;
} in
(t, t)
(** Not supported yet *)
let set_size_budget _t _budget = ()
let flush t =
match Ocaml_lib.Queue.is_empty t.queue with
| true -> return ()
| false ->
let cond = Lwt_condition.create () in
Ocaml_lib.Queue.push (Flush cond) t.queue;
Lwt_condition.wait cond
let rec read_raw t =
match Ocaml_lib.Queue.is_empty t.queue with
| true ->
begin match t.closed with
| true -> return `Eof
| false ->
Lwt_condition.wait t.cond >>= fun () ->
read_raw t
end
| false ->
return (`Ok (Ocaml_lib.Queue.pop t.queue))
let rec read t =
read_raw t >>= function
| `Eof -> return `Eof
| `Ok (Data d) -> return @@ `Ok d
| `Ok (Flush cond) ->
Lwt_condition.signal cond ();
read t
let write_raw t data =
Ocaml_lib.Queue.push data t.queue;
Lwt_condition.broadcast t.cond ()
let write_without_pushback t data =
write_raw t (Data data)
let write t data =
write_without_pushback t data;
return ()
let rec iter t ~f =
read t >>= function
| `Eof -> return ()
| `Ok d -> f d >>= fun () -> iter t ~f
let rec iter_without_pushback t ~f =
read t >>= function
| `Eof -> return ()
| `Ok d -> f d; iter_without_pushback t ~f
let interleave_pipe t =
let (reader, writer) = create () in
let rec copy t =
read_raw t >>= function
| `Eof -> return ()
| `Ok data ->
write_raw writer data;
copy t
in
spawn (iter_without_pushback t ~f:(fun p -> spawn (copy p)));
reader
let transfer_in ~from:queue t =
Ocaml_lib.Queue.iter (write_without_pushback t) queue;
return ()
let close t =
t.closed <- true;
begin match Ocaml_lib.Queue.is_empty t.queue with
| true -> return ()
| false -> flush t
end >>= fun () ->
return ()
let close_without_pushback t =
t.closed <- true;
Lwt_condition.broadcast t.cond ()
end
module Reader = struct
type t = Lwt_io.input_channel
let close t = Lwt_io.close t
let read input buf : [ `Eof of int | `Ok ] Deferred.t =
let len = Bytes.length buf in
let rec inner = function
| n when n = len ->
return `Ok
| n -> begin
Lwt.catch
(fun () -> Lwt_io.read_into input buf n (len - n))
(fun _exn -> return 0) >>= function
| 0 -> return (`Eof n)
| read -> inner (n + read)
end
in
inner 0
end
module Writer = struct
type t = string Pipe.Writer.t
let close t = Pipe.close t
let flush t = Pipe.flush t
let write t data = Pipe.write_without_pushback t data
end
module Tcp = struct
let connect ~exn_handler ?nodelay host port =
let fd = Lwt_unix.(socket PF_INET SOCK_STREAM 0) in
Lwt_unix.gethostbyname host >>= fun entry ->
let sock_addr = (Lwt_unix.ADDR_INET (entry.Lwt_unix.h_addr_list.(0), port)) in
Lwt_io.open_connection ~fd sock_addr >>= fun (ic, oc) ->
let (reader, writer) = Pipe.create () in
spawn ~exn_handler (Pipe.iter ~f:(fun str ->
Lwt_io.write oc str) reader);
(match nodelay with
| Some () -> Lwt_unix.(setsockopt fd TCP_NODELAY true)
| None -> ());
return (ic, writer)
end
module Scheduler = struct
let cond = Lwt_condition.create ()
let go () = Lwt_main.run (Lwt_condition.wait cond) |> ignore
let shutdown (n : int) = Lwt_condition.signal cond n
end