Source file server_pool.ml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
(**
This module is built around [Resource_pool]. While a pool of type
[Resource_pool.t] manages a number of resources, here we manage a cluster of
such pools. A typical use case would be a cluster of servers, where for each
server we maintain a number of connections. A user of this module can call [use]
to access one of the connections, which are served in a round-robin fashion.
*)
[@@@ocaml.warning "+A-9-44-48"]
let (>>=) = Lwt.(>>=)
let section = Lwt_log.Section.make "server-pool"
let () = Lwt_log.Section.set_level section Lwt_log.Info
module type CONF = sig
type connection
type server
type serverid
val serverid_to_string : serverid -> string
val connect : server -> connection Lwt.t
val close : connection -> unit Lwt.t
val check_delay : float
val check_server : serverid -> server -> bool Lwt.t
end
module Make (Conf : CONF) = struct
let show = Conf.serverid_to_string
type server_status = {
serverid : Conf.serverid;
desired : int;
current : int;
essential : bool;
suspended : bool;
check_server : unit -> bool Lwt.t;
connections : Conf.connection Resource_pool.t;
}
let mk_server_status
~serverid ~desired ~essential ~check_server ~connections =
{serverid = serverid; desired; current = 0; essential; suspended = false;
check_server; connections}
let servers : (Conf.serverid, server_status) Hashtbl.t = Hashtbl.create 9
let get_status serverid = Hashtbl.find_opt servers serverid
let non_essential_active_connection_pools () =
let accum serverid {essential; suspended; connections} acc =
if not essential && not suspended
then (serverid, connections) :: acc
else acc
in
Hashtbl.fold accum servers []
let remove serverid =
Lwt_log.ign_notice_f ~section "removing server %s" (show serverid);
Hashtbl.remove servers serverid
let update_current_count serverid f =
match get_status serverid with None -> () | Some status ->
let status = {status with current = f status.current} in
Hashtbl.replace servers serverid status;
Lwt_log.ign_debug_f ~section "current number of instances for %s: %d/%d"
(show serverid) status.current status.desired
let server_pool : server_status Resource_pool.t =
let nil () = failwith "Bs_db.server_pool: invalid connection attempt" in
let n = 0
and dispose {serverid} =
update_current_count serverid pred;
Lwt.return_unit
and check {serverid} f =
let _retire_non_essential_servers () =
match get_status serverid with
| None -> f false
| Some status -> f status.essential
in
f true
in Resource_pool.create ~check ~dispose n nil
let server_exists serverid = Hashtbl.mem servers serverid
let add_many
?(essential = false)
?(connect_immediately = false) ~num_conn new_servers =
let mk_connection_pool (serverid, server) : server_status =
Lwt_log.ign_notice_f ~section "adding server: %s" (show serverid);
let connect () =
Lwt_log.ign_info_f ~section "opening connection to %s" (show serverid);
Conf.connect server >>= fun conn ->
Lwt.return conn
in
let dispose conn =
Lwt_log.ign_info_f ~section "closing connection to %s" (show serverid);
Lwt.catch (fun () -> Conf.close conn) (fun _ -> Lwt.return_unit)
in
let check_server () = Conf.check_server serverid server in
let check _ f = f true in
let connections = Resource_pool.create num_conn ~check ~dispose connect in
let status =
mk_server_status
~serverid ~desired:num_conn ~essential ~check_server ~connections
in
Hashtbl.add servers serverid status;
if connect_immediately then
for _ = 1 to num_conn do
Lwt.async @@ fun () ->
connect () >>= fun c ->
try Resource_pool.add connections c; Lwt.return_unit
with Resource_pool.Resource_limit_exceeded -> dispose c
done;
status
in
let pools = List.map mk_connection_pool @@
List.filter (fun l -> not @@ server_exists @@ fst l) new_servers in
for _ = 1 to num_conn do
pools |> List.iter @@ fun conn_pool ->
Resource_pool.add ~omit_max_check:true server_pool conn_pool;
update_current_count conn_pool.serverid succ
done
let add_one ?essential ?connect_immediately ~num_conn serverid server =
add_many ?essential ?connect_immediately ~num_conn [(serverid, server)]
let add_existing
?(essential = false) ?(check_server = fun () -> Lwt.return_true)
~num_conn serverid connections =
Lwt_log.ign_notice_f ~section "adding existing server: %s" (show serverid);
let status =
mk_server_status
~serverid ~desired:num_conn ~essential ~check_server ~connections
in
Hashtbl.add servers serverid status;
for _ = 1 to num_conn do
Resource_pool.add ~omit_max_check:true server_pool status;
update_current_count serverid succ
done
let reactivate_server ~check_server connection_pool =
let serverid = connection_pool.serverid in
let check () =
Lwt_unix.sleep Conf.check_delay >>= fun () ->
Lwt_log.ign_debug_f ~section "checking server health of %s" (show serverid);
Lwt.catch
check_server
(fun e ->
Lwt_log.ign_info_f ~section
"exception during health check of %s: %s"
(show serverid) (Printexc.to_string e);
Lwt.return_false)
in
let reactivate () =
match get_status serverid with None -> Lwt.return_unit | Some status ->
Lwt_log.ign_notice_f ~section
"reactivating healthy server %s" (show serverid);
Hashtbl.replace servers serverid {status with suspended = false};
for _ = status.current to status.desired - 1 do
Resource_pool.add ~omit_max_check:true server_pool connection_pool;
update_current_count serverid succ
done;
Lwt.return_unit
in
let rec loop () =
if not @@ server_exists serverid then Lwt.return_unit else
check () >>= fun healthy -> if healthy then reactivate () else loop ()
in loop ()
let suspend_server ~check_server connection_pool =
let serverid = connection_pool.serverid in
match get_status serverid with None -> () | Some status ->
if status.essential || status.suspended then () else begin
Lwt_log.ign_warning_f ~section "suspending %s" (show serverid);
Hashtbl.replace servers serverid {status with suspended = true};
Lwt.async @@ fun () ->
Resource_pool.clear connection_pool.connections >>= fun () ->
reactivate_server ~check_server connection_pool
end
let use ?usage_attempts f =
Resource_pool.use ~usage_attempts:9 server_pool @@ fun connection_pool ->
let {serverid; connections} = connection_pool in
match get_status serverid with
| None ->
Lwt_log.info_f ~section "cannot use %s (removed)" (show serverid)
>>= fun () ->
Lwt.fail Resource_pool.(Resource_invalid {safe = true})
| Some {suspended = true} ->
Lwt_log.info_f ~section "not using %s (suspended)" (show serverid)
>>= fun () ->
Lwt.fail Resource_pool.(Resource_invalid {safe = true})
| Some {check_server} ->
Lwt_log.debug_f ~section "using connection to %s" (show serverid)
>>= fun () ->
Lwt.catch
(fun () -> Resource_pool.use ?usage_attempts connections f)
(fun e -> match e with
| Resource_pool.(Resource_invalid {safe = true}) ->
Lwt_log.warning
"connection unusable (safe to retry using another server)"
>>= fun () ->
Lwt.fail e
| Resource_pool.(Resource_invalid {safe = false}) ->
Lwt_log.warning
"connection unusable (unsafe to retry using another server)"
>>= fun () ->
suspend_server ~check_server connection_pool;
Lwt.fail e
| e -> Lwt.fail e
)
let server_statuses () =
Hashtbl.fold (fun _ status l -> status :: l) servers []
let servers () =
Hashtbl.fold (fun server _ l -> server :: l) servers []
end