1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
# 1 "async/src/message.ml"
open Spec.Basic
open Amqp_client_lib
type message = (Content.t * string)
let key value = key, Types.VLongstr value
let key value = key, Types.VLonglong value
type t =
{ delivery_tag : int;
redelivered : bool;
exchange : string;
routing_key : string;
message: message;
}
let make
?(content_type:string option)
?(content_encoding: string option)
?(: Types.table option)
?(delivery_mode: int option)
?(priority: int option)
?(correlation_id: string option)
?(reply_to: string option)
?(expiration: int option)
?(message_id: string option)
?(timestamp: int option)
?(amqp_type: string option)
?(user_id: string option)
?(app_id: string option)
body : message =
let expiration = match expiration with
| None -> None
| Some n -> Some (string_of_int n)
in
({ Content.content_type;
content_encoding;
headers;
delivery_mode;
priority;
correlation_id;
reply_to;
expiration;
message_id;
timestamp;
amqp_type;
user_id;
app_id;
reserved = None;
}, body)
let ack channel t =
let open Spec.Basic in
Ack.request (Channel.channel channel)
{ Ack.delivery_tag = t.delivery_tag; multiple = false }
let reject ~requeue channel t =
let open Spec.Basic in
Reject.request (Channel.channel channel)
{ Reject.delivery_tag = t.delivery_tag; requeue }
let recover ~requeue channel =
Spec.Basic.Recover.request (Channel.channel channel) { Spec.Basic.Recover.requeue }