1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
# 1 "async/src/rpc.ml"
open Thread
open Amqp_client_lib
open Types
open Spec.Basic
module Client = struct
type t = { queue: Queue.t;
channel: [ `Ok ] Channel.t;
id: string;
outstanding: (string, Message.message option Ivar.t) Hashtbl.t;
mutable counter: int;
consumer: [ `Ok ] Queue.consumer;
}
let handle_reply t ok (content, body) =
let reply = match ok with
| true -> Some (content, body)
| false -> None
in
match content.Content.correlation_id with
| Some id ->
begin match Hashtbl.find t.outstanding id with
| var ->
Ivar.fill var reply;
Hashtbl.remove t.outstanding id;
return ()
| exception Not_found ->
return ()
end
| None -> failwith "No correlation id set"
let init ~id connection =
Connection.open_channel ~id:"rpc_client" Channel.no_confirm connection >>= fun channel ->
let id = Printf.sprintf "%s.%s" (Channel.id channel) id in
Queue.declare channel
~exclusive:true
~auto_delete:true
id >>= fun queue ->
Queue.bind channel queue Exchange.amq_match (`Headers ["reply_to", VLongstr (Queue.name queue)]) >>= fun () ->
Queue.consume ~id:"rpc_client" ~no_ack:true ~exclusive:true channel queue >>= fun (consumer, reader) ->
let t = { queue; channel; id; outstanding = Hashtbl.create 0; counter = 0; consumer } in
spawn (Pipe.iter reader ~f:(fun { Message.message; routing_key; _ } -> handle_reply t (routing_key = Queue.name queue) message));
spawn (Pipe.iter (Channel.on_return channel) ~f:(fun (_, message) -> handle_reply t false message));
return t
let call t ?correlation_id ~ttl ~routing_key ~ exchange (, body) =
let correlation_id_prefix = match correlation_id with
| Some cid -> cid
| None -> t.id
in
let correlation_id = Printf.sprintf "%s.%d" correlation_id_prefix t.counter in
t.counter <- t.counter + 1;
let var = Ivar.create () in
Hashtbl.add t.outstanding correlation_id var;
let expiration = Some (string_of_int ttl) in
let = { header with Content.correlation_id = Some correlation_id;
expiration;
reply_to = Some (Queue.name t.queue);
headers = Some (Message.string_header "reply_to" (Queue.name t.queue) :: headers)
}
in
Exchange.publish t.channel ~mandatory:true ~routing_key exchange (header, body) >>= function
| `Ok -> with_timeout ttl (Ivar.read var) >>= function
| `Timeout ->
Hashtbl.remove t.outstanding correlation_id;
return None
| `Result a -> return a
(** Release resources *)
let close t =
Hashtbl.iter (fun _ var -> Ivar.fill var None) t.outstanding;
Queue.cancel t.consumer >>= fun () ->
Queue.delete t.channel t.queue >>= fun () ->
Channel.close t.channel >>= fun () ->
return ()
end
module Server = struct
open Spec.Basic
type 'a t = { consumer: 'a Queue.consumer }
let queue_argument = Queue.dead_letter_exchange (Exchange.name Exchange.amq_match)
let start ?(async=false) ?(discard_redelivered=false) channel queue handler =
let handler ({ Message.message = (content, body); redelivered; _} as message) =
let routing_key = match content.Content.reply_to with
| Some r -> r
| None -> failwith "Missing reply_to in reposnse"
in
let correlation_id = content.Content.correlation_id in
match redelivered && discard_redelivered with
| false -> begin
handler (content, body) >>= fun (content, body) ->
let content = { content with Content.correlation_id } in
Exchange.publish channel Exchange.default ~routing_key (content, body) >>= function
| `Ok -> Message.ack channel message
| `Failed -> Message.reject ~requeue:true channel message
end
| true ->
Message.reject ~requeue:false channel message
in
Queue.consume ~id:"rpc_server" channel queue >>= fun (consumer, reader) ->
let read = match async with
| true -> Pipe.iter_without_pushback reader ~f:(fun m -> spawn (handler m))
| false -> Pipe.iter reader ~f:handler
in
spawn read;
return { consumer }
let stop t =
Queue.cancel t.consumer
end