1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
open Thread
open Amqp_client_lib
open Spec.Queue
type t = { name: string }
let message_ttl v = "x-message-ttl", Types.VLonglong v
let auto_expire v = "x-expires", Types.VLonglong v
let max_length v = "x-max-length", Types.VLonglong v
let max_length_bytes v = "x-max-length-bytes", Types.VLonglong v
let dead_letter_exchange v = "x-dead-letter-exchange", Types.VLongstr v
let dead_letter_routing_key v = "x-dead-letter-routing-key", Types.VLongstr v
let maximum_priority v = "x-max-priotity", Types.VLonglong v
let declare channel ?(durable=false) ?(exclusive=false) ?(auto_delete=false) ?(passive=false) ?(arguments=[]) name =
let channel = Channel.channel channel in
let req = { Declare.queue=name; passive; durable; exclusive;
auto_delete; no_wait=false; arguments }
in
Declare.request channel req >>= fun rep ->
assert (name = rep.Declare_ok.queue);
return { name = rep.Declare_ok.queue }
let get ~no_ack channel t =
let open Spec.Basic in
let channel = Channel.channel channel in
Get.request channel { Get.queue=t.name; no_ack } >>= function
| `Get_empty () ->
return None
| `Get_ok (get_ok, (, body)) ->
return (Some { Message.delivery_tag = get_ok.Get_ok.delivery_tag;
Message.redelivered = get_ok.Get_ok.redelivered;
Message.exchange = get_ok.Get_ok.exchange;
Message.routing_key = get_ok.Get_ok.routing_key;
Message.message = (header, body) })
(** Publish a message directly to a queue *)
let publish channel t ?mandatory message =
Exchange.publish channel Exchange.default ?mandatory
~routing_key:t.name
message
type 'a consumer = { channel: 'a Channel.t;
tag: string;
writer: Message.t Pipe.Writer.t }
(** Consume message from a queue. *)
let consume ~id ?(no_local=false) ?(no_ack=false) ?(exclusive=false)
?on_cancel channel t =
let open Spec.Basic in
let (reader, writer) = Pipe.create () in
let consumer_tag = Printf.sprintf "%s.%s" (Channel.Internal.unique_id channel) id
in
let on_cancel () =
Pipe.close_without_pushback writer;
match on_cancel with
| Some f -> f ()
| None -> raise (Types.Consumer_cancelled consumer_tag)
in
let to_writer (deliver, , body) =
{ Message.delivery_tag = deliver.Deliver.delivery_tag;
Message.redelivered = deliver.Deliver.redelivered;
Message.exchange = deliver.Deliver.exchange;
Message.routing_key = deliver.Deliver.routing_key;
Message.message = (header, body) }
|> Pipe.write_without_pushback writer
in
let req = { Consume.queue=t.name;
consumer_tag;
no_local;
no_ack;
exclusive;
no_wait = false;
arguments = [];
}
in
let var = Ivar.create () in
let on_receive consume_ok =
Channel.Internal.register_consumer_handler channel consume_ok.Consume_ok.consumer_tag to_writer on_cancel;
Ivar.fill var consume_ok
in
let read = snd Consume_ok.Internal.read in
read ~once:true on_receive (Channel.channel channel);
Consume.Internal.write (Channel.channel channel) req >>= fun () ->
Ivar.read var >>= fun rep ->
let tag = rep.Consume_ok.consumer_tag in
return ({ channel; tag; writer }, reader)
let cancel consumer =
let open Spec.Basic in
Cancel.request (Channel.channel consumer.channel) { Cancel.consumer_tag = consumer.tag; no_wait = false } >>= fun _rep ->
Channel.Internal.deregister_consumer_handler consumer.channel consumer.tag;
Pipe.close consumer.writer
let bind channel t exchange = Exchange.Internal.bind_queue channel exchange t.name
let unbind channel t exchange = Exchange.Internal.unbind_queue channel exchange t.name
(** Purge the queue *)
let purge channel t =
Purge.request (Channel.channel channel)
{ Purge.queue = t.name;
no_wait = false;
} >>= fun _rep ->
return ()
(** Delete the queue. *)
let delete ?(if_unused=false) ?(if_empty=false) channel t =
Delete.request (Channel.channel channel)
{ Delete.queue = t.name;
if_unused;
if_empty;
no_wait = false;
} >>= fun _rep -> return ()
(** Name of the queue *)
let name t = t.name
(** Construct a queue without any validation *)
let fake _channel name = return { name }