Source file connection_cache.ml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
exception Retry = Connection.Retry
(** This functor establishes a new connection for each request. *)
module Make_no_cache (Connection : S.Connection) : sig
include S.Connection_cache
val create : ?ctx:Connection.Net.ctx -> unit -> t
(** [create ?ctx ()] creates a connection for handling a single request. The
connection accepts only a single request and will automatically be closed
as soon as possible.
@param ctx See {!Connection.Net.ctx} *)
end = struct
module Net = Connection.Net
module IO = Net.IO
open IO
type t = S.call
let call = Fun.id
let create ?(ctx = Lazy.force Net.default_ctx) () ? ?body meth uri =
Net.resolve ~ctx uri
>>= Connection.connect ~ctx ~persistent:true
>>= fun connection ->
let res = Connection.call connection ?headers ?body meth uri in
Lwt.dont_wait
(fun () ->
res >>= fun (_, body) ->
(match body with
| `Empty | `String _ | `Strings _ -> Lwt.return_unit
| `Stream stream -> Lwt_stream.closed stream)
>>= fun () ->
Connection.close connection;
Lwt.return_unit)
(function Retry -> () | e -> raise e);
res
end
(** This functor keeps a cache of connections for reuse. Connections are reused
based on their remote {!type:Conduit.endp} (effectively IP / port). *)
module Make (Connection : S.Connection) (Sleep : S.Sleep) : sig
include S.Connection_cache
val create :
?ctx:Connection.Net.ctx ->
?keep:int64 ->
?retry:int ->
?parallel:int ->
?depth:int ->
unit ->
t
(** Create a new connection cache
@param ctx Conduit context to use. See {!type:Connection.Net.ctx}.
@param keep Number of nanoseconds to keep an idle connection around.
@param retry
Number of times a {e gracefully} failed request is automatically
retried. {e graceful} means failed with {!exception:Connection.Retry}.
Requests with a [`Stream] {!module:Body} cannot be retried
automatically. Such requests will fail with
{!exception:Connection.Retry} and a new {!module:Body} will need to be
provided to retry.
@param parallel
maximum number of connections to establish to a single endpoint. Beware:
A single hostname may resolve to multiple endpoints. In such a case
connections may be created in excess to what was intended.
@param depth
maximum number of requests to queue and / or send on a single
connection. *)
end = struct
module Net = Connection.Net
module IO = Net.IO
open IO
type ctx = Net.ctx
type t = {
cache : (Net.endp, Connection.t) Hashtbl.t;
ctx : ctx;
keep : int64;
retry : int;
parallel : int;
depth : int;
}
let create ?(ctx = Lazy.force Net.default_ctx) ?(keep = 60_000_000_000L)
?(retry = 2) ?(parallel = 4) ?(depth = 100) () =
{
cache = Hashtbl.create ~random:true 10;
ctx;
keep;
retry;
parallel;
depth;
}
let rec get_connection self endp =
let finalise connection =
let rec remove keep =
let current = Hashtbl.find self.cache endp in
Hashtbl.remove self.cache endp;
if current == connection then
List.iter (Hashtbl.add self.cache endp) keep
else remove (current :: keep)
in
remove [];
Lwt.return_unit
in
let create () =
let connection = Connection.create ~finalise ~ctx:self.ctx endp
and timeout = ref Lwt.return_unit in
let rec busy () =
Lwt.cancel !timeout;
if Connection.length connection = 0 then (
timeout :=
Sleep.sleep_ns self.keep >>= fun () ->
Connection.close connection;
Lwt.return_unit);
Lwt.on_termination (Connection.notify connection) busy
in
busy ();
connection
in
match Hashtbl.find_all self.cache endp with
| [] ->
let connection = create () in
Hashtbl.add self.cache endp connection;
Lwt.return connection
| conns -> (
let rec search length = function
| [ a ] -> (a, length + 1)
| a :: b :: tl when Connection.length a < Connection.length b ->
search (length + 1) (a :: tl)
| _ :: tl -> search (length + 1) tl
| [] -> assert false
in
match search 0 conns with
| shallowest, _ when Connection.length shallowest = 0 ->
Lwt.return shallowest
| _, length when length < self.parallel ->
let connection = create () in
Hashtbl.add self.cache endp connection;
Lwt.return connection
| shallowest, _ when Connection.length shallowest < self.depth ->
Lwt.return shallowest
| _ ->
Lwt.try_bind
(fun () -> Lwt.choose (List.map Connection.notify conns))
(fun _ -> get_connection self endp)
(fun _ -> get_connection self endp))
let call self ? ?body meth uri =
Net.resolve ~ctx:self.ctx uri >>= fun endp ->
let rec request retry =
get_connection self endp >>= fun conn ->
Lwt.catch
(fun () -> Connection.call conn ?headers ?body meth uri)
(function
| Retry -> (
match body with
| Some (`Stream _) -> raise Retry
| None | Some `Empty | Some (`String _) | Some (`Strings _) ->
if retry <= 0 then raise Retry else request (retry - 1))
| e -> Lwt.reraise e)
in
request self.retry
end