Source file database_pools.ml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
exception Exception of string
type connection_type =
| SinglePool of string
| MultiPools of (string * string) list
let with_log ?(log_level = Logs.Error) ?(msg_prefix = "Error") err =
let msg = Caqti_error.show err in
Logs.msg log_level (fun m -> m "%s: %s" msg_prefix msg);
msg
;;
let get_or_raise ?log_level ?msg_prefix () = function
| Ok result -> result
| Error error -> failwith (with_log ?log_level ?msg_prefix error)
;;
let map_or_raise ?log_level ?msg_prefix fcn result =
result |> CCResult.map fcn |> get_or_raise ?log_level ?msg_prefix ()
;;
module type ConfigSig = sig
val database : connection_type
val database_pool_size : int
end
module DefaultConfig : ConfigSig = struct
let database = SinglePool "mariadb://root@database:3306/test"
let database_pool_size = 5
end
module Make (Config : ConfigSig) = struct
let main_pool_ref
: (Caqti_lwt.connection, Caqti_error.t) Caqti_lwt.Pool.t option ref
=
ref None
;;
let pools
: (string, (Caqti_lwt.connection, Caqti_error.t) Caqti_lwt.Pool.t) Hashtbl.t
=
let spare_for_pools = 5 in
Hashtbl.create
(match Config.database with
| SinglePool _ -> 1
| MultiPools pools -> CCList.length pools + spare_for_pools)
;;
let print_pool_usage pool =
let n_connections = Caqti_lwt.Pool.size pool in
let max_connections = Config.database_pool_size in
Logs.debug (fun m -> m "Pool usage: %i/%i" n_connections max_connections)
;;
let connect_or_failwith
?(pool_size = Config.database_pool_size)
ok_fun
database_url
=
database_url
|> Uri.of_string
|> Caqti_lwt.connect_pool ~max_size:pool_size
|> map_or_raise ~msg_prefix:"Failed to connect to DB pool" ok_fun
;;
let add_pool ?pool_size name database_url =
match Config.database, Hashtbl.find_opt pools name with
| SinglePool _, _ ->
failwith "SinglePool is selected: Switch to 'MultiPools' first"
| MultiPools _, Some _ ->
let msg =
Format.asprintf
"Failed to create pool: Connection pool with name '%s' exists already"
name
in
Logs.err (fun m -> m "%s" msg);
failwith msg
| MultiPools _, None ->
database_url |> connect_or_failwith ?pool_size (Hashtbl.add pools name)
;;
let initialize () =
match Config.database with
| SinglePool database_url when CCOption.is_none !main_pool_ref ->
database_url
|> connect_or_failwith (fun pool ->
main_pool_ref := Some pool;
())
| SinglePool _ -> ()
| MultiPools pools' ->
pools'
|> CCList.filter (fun (name, _) ->
CCOption.is_none (Hashtbl.find_opt pools name))
|> CCList.iter (fun (name, url) ->
url |> connect_or_failwith (Hashtbl.add pools name))
;;
let fetch_pool ?(ctx = []) () =
let open CCOption in
let () = initialize () in
match Config.database with
| SinglePool _ ->
!main_pool_ref |> get_exn_or "Initialization missed: run 'initialize'"
| MultiPools _ ->
CCList.assoc_opt ~eq:CCString.equal "pool" ctx
>>= Hashtbl.find_opt pools
|> (function
| Some pool -> pool
| None -> failwith "Unknown Pool: Please 'add_pool' first!")
;;
let transaction ?ctx f =
let open Lwt.Infix in
let pool = fetch_pool ?ctx () in
print_pool_usage pool;
Caqti_lwt.Pool.use
(fun connection ->
Logs.debug (fun m -> m "Fetched connection from pool");
let (module Connection : Caqti_lwt.CONNECTION) = connection in
let open Caqti_error in
match%lwt Connection.start () with
| Error msg ->
Logs.debug (fun m -> m "Failed to start transaction: %s" (show msg));
Lwt.return_error msg
| Ok () ->
Logs.debug (fun m -> m "Started transaction");
Lwt.catch
(fun () ->
match%lwt Connection.commit () with
| Ok () ->
Logs.debug (fun m -> m "Successfully committed transaction");
f connection |> Lwt_result.return
| Error error ->
Exception
(with_log ~msg_prefix:"Failed to commit transaction" error)
|> Lwt.fail)
(fun e ->
match%lwt Connection.rollback () with
| Ok () ->
Logs.debug (fun m -> m "Successfully rolled back transaction");
Lwt.fail e
| Error error ->
Exception
(with_log ~msg_prefix:"Failed to rollback transaction" error)
|> Lwt.fail))
pool
>|= get_or_raise ()
;;
let transaction' ?ctx f = transaction ?ctx f |> Lwt.map (get_or_raise ())
let query ?ctx f =
let open Lwt.Infix in
let pool = fetch_pool ?ctx () in
print_pool_usage pool;
Caqti_lwt.Pool.use
(fun connection ->
let module Connection = (val connection : Caqti_lwt.CONNECTION) in
f connection >|= CCResult.return)
pool
>|= get_or_raise ()
;;
let query' ?ctx f = query ?ctx f |> Lwt.map (get_or_raise ())
let find_opt ?ctx request input =
query' ?ctx (fun connection ->
let module Connection = (val connection : Caqti_lwt.CONNECTION) in
Connection.find_opt request input)
;;
let find ?ctx request input =
query' ?ctx (fun connection ->
let module Connection = (val connection : Caqti_lwt.CONNECTION) in
Connection.find request input)
;;
let collect ?ctx request input =
query' ?ctx (fun connection ->
let module Connection = (val connection : Caqti_lwt.CONNECTION) in
Connection.collect_list request input)
;;
let exec ?ctx request input =
query' ?ctx (fun connection ->
let module Connection = (val connection : Caqti_lwt.CONNECTION) in
Connection.exec request input)
;;
end
module type Sig = sig
val initialize : unit -> unit
val fetch_pool
: ?ctx:(string * string) list
-> unit
-> (Caqti_lwt.connection, Caqti_error.t) Caqti_lwt.Pool.t
val add_pool : ?pool_size:int -> string -> string -> unit
val find_opt
: ?ctx:(string * string) list
-> ('a, 'b, [< `One | `Zero ]) Caqti_request.t
-> 'a
-> 'b option Lwt.t
val find
: ?ctx:(string * string) list
-> ('a, 'b, [< `One ]) Caqti_request.t
-> 'a
-> 'b Lwt.t
val collect
: ?ctx:(string * string) list
-> ('a, 'b, [< `Many | `One | `Zero ]) Caqti_request.t
-> 'a
-> 'b list Lwt.t
val exec
: ?ctx:(string * string) list
-> ('a, unit, [< `Zero ]) Caqti_request.t
-> 'a
-> unit Lwt.t
end