1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
open Thread
open Amqp_client_lib
open Spec.Exchange
type _ exchange_type =
| Direct: [`Queue of string] exchange_type
| Fanout: unit exchange_type
| Topic: [`Topic of string] exchange_type
| Match: [`Headers of Types.header list] exchange_type
let direct_t = Direct
let fanout_t = Fanout
let topic_t = Topic
let match_t = Match
type 'a t = { name : string;
exchange_type: 'a exchange_type }
(** Predefined Default exchange *)
let default = { name=""; exchange_type = Direct }
(** Predefined Direct exchange *)
let amq_direct = { name = "amq.direct"; exchange_type = Direct }
(** Predefined Fanout exchange *)
let amq_fanout = { name = "amq.fanout"; exchange_type = Fanout }
(** Predefined topic exchange *)
let amq_topic = { name = "amq.topic"; exchange_type = Topic }
(** Predefined match (header) exchange *)
let amq_match = { name = "amq.match"; exchange_type = Match }
let string_of_exchange_type: type a. a exchange_type -> string = function
| Direct -> "direct"
| Fanout -> "fanout"
| Topic -> "topic"
| Match -> "match"
module Internal = struct
let bind_queue: type a. _ Channel.t -> a t -> string -> a -> unit Deferred.t =
let open Spec.Queue in
fun channel { name; exchange_type} queue ->
let bind ?(routing_key="") ?(arguments=[]) () =
let query = { Bind.queue;
exchange = name;
routing_key;
no_wait = false;
arguments;
}
in
Bind.request (Channel.channel channel) query
in
match exchange_type with
| Direct -> fun (`Queue routing_key) -> bind ~routing_key ()
| Fanout -> fun () -> bind ()
| Topic -> fun (`Topic routing_key) -> bind ~routing_key ()
| Match -> fun (`Headers arguments) -> bind ~arguments ()
let unbind_queue: type a. _ Channel.t -> a t -> string -> a -> unit Deferred.t =
let open Spec.Queue in
fun channel { name; exchange_type} queue ->
let unbind ?(routing_key="") ?(arguments=[]) () =
let query = { Unbind.queue;
exchange = name;
routing_key;
arguments;
}
in
Unbind.request (Channel.channel channel) query
in
match exchange_type with
| Direct -> fun (`Queue routing_key) -> unbind ~routing_key ()
| Fanout -> fun () -> unbind ()
| Topic -> fun (`Topic routing_key) -> unbind ~routing_key ()
| Match -> fun (`Headers arguments) -> unbind ~arguments ()
end
let declare: type a. ?passive:bool -> ?durable:bool -> ?auto_delete:bool -> ?internal:bool ->
_ Channel.t -> a exchange_type -> ?arguments:Types.table -> string -> a t Deferred.t =
fun ?(passive=false) ?(durable=false) ?(auto_delete=false) ?(internal=false)
channel exchange_type ?(arguments=[]) name ->
Declare.request (Channel.channel channel)
{ Declare.exchange = name;
amqp_type = (string_of_exchange_type exchange_type);
passive;
durable;
auto_delete;
internal;
no_wait = false;
arguments; } >>= fun () ->
return { name; exchange_type }
let delete ?(if_unused=false) channel t =
Delete.request (Channel.channel channel)
{ Delete.exchange = t.name;
if_unused;
no_wait = false;
}
let bind: type a. _ Channel.t -> destination:_ t -> source:a t -> a -> unit Deferred.t=
fun channel ~destination ~source ->
let bind ?(routing_key="") ?(arguments=[]) () =
let query = { Bind.destination = destination.name;
source = source.name;
routing_key;
no_wait = false;
arguments;
}
in
Bind.request (Channel.channel channel) query
in
match source.exchange_type with
| Direct -> fun (`Queue routing_key) -> bind ~routing_key ()
| Fanout -> fun () -> bind ()
| Topic -> fun (`Topic routing_key) -> bind ~routing_key ()
| Match -> fun (`Headers arguments) -> bind ~arguments ()
let unbind: type a. _ Channel.t -> destination:_ t -> source:a t -> a -> unit Deferred.t=
fun channel ~destination ~source ->
let unbind ?(routing_key="") ?(arguments=[]) () =
let query = { Unbind.destination = destination.name;
source = source.name;
routing_key;
no_wait = false;
arguments;
}
in
Unbind.request (Channel.channel channel) query
in
match source.exchange_type with
| Direct -> fun (`Queue routing_key) -> unbind ~routing_key ()
| Fanout -> fun () -> unbind ()
| Topic -> fun (`Topic routing_key) -> unbind ~routing_key ()
| Match -> fun (`Headers arguments) -> unbind ~arguments ()
let publish channel t
?(mandatory=false)
~routing_key
(, body) =
let open Spec.Basic in
let = match header.Content.app_id with
| Some _ -> header
| None -> { header with Content.app_id = Some (Channel.id channel) }
in
let wait_for_confirm = Channel.Internal.wait_for_confirm channel in
Publish.request (Channel.channel channel)
({Publish.exchange = t.name;
routing_key;
mandatory;
immediate=false},
header, body) >>= fun () ->
wait_for_confirm ~routing_key ~exchange_name:t.name
let name t = t.name