1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
open Printf
let default_tweaks_version = (1, 7)
let dynload_library = ref @@ fun lib ->
Error (sprintf "Neither %s nor the dynamic linker is linked into the \
application." lib)
let define_loader f = dynload_library := f
let load_library name = !dynload_library name
let scheme_driver_name = function
| "postgres" | "postgresql" -> "caqti-driver-postgresql"
| s -> "caqti-driver-" ^ s
let retry_with_library load_driver ~uri scheme =
(match load_driver ~uri scheme with
| Error (`Load_failed _) ->
let driver_name = scheme_driver_name scheme in
(match load_library driver_name with
| Ok () ->
(match load_driver ~uri scheme with
| Error (`Load_failed _) ->
let msg = Printf.sprintf
"The %s did not register a handler for the URI scheme %s."
driver_name scheme
in
Error (Caqti_error.load_failed ~uri (Caqti_error.Msg msg))
| r -> r)
| Error msg ->
Error (Caqti_error.load_failed ~uri (Caqti_error.Msg msg)))
| r -> r)
module Make
(System : System_sig.S)
(Pool : Pool.S
with type 'a fiber := 'a System.Fiber.t
and type switch := System.Switch.t
and type stdenv := System.stdenv)
(Loader : Driver_loader.S
with type 'a fiber := 'a System.Fiber.t
and type switch := System.Switch.t
and type stdenv := System.stdenv
and type ('a, 'e) stream := ('a, 'e) System.Stream.t) =
struct
open System
open System.Fiber.Infix
module type CONNECTION = Caqti_connection_sig.S
with type 'a fiber := 'a Fiber.t
and type ('a, 'err) stream := ('a, 'err) Stream.t
type connection = (module CONNECTION)
let (>>=?) m f = m >>= function Ok x -> f x | Error _ as r -> Fiber.return r
let (>|=?) m f = m >|= function Ok x -> (Ok (f x)) | Error _ as r -> r
let (let+?) = (>|=?)
module type DRIVER = Driver_loader.DRIVER
with type 'a fiber := 'a Fiber.t
and type ('a, 'err) stream := ('a, 'err) Stream.t
and type switch := System.Switch.t
and type stdenv := System.stdenv
let drivers : (string, (module DRIVER)) Hashtbl.t = Hashtbl.create 11
let load_driver uri =
(match Uri.scheme uri with
| None ->
let msg = "Missing URI scheme." in
Error (Caqti_error.load_rejected ~uri (Caqti_error.Msg msg))
| Some scheme ->
(try Ok (Hashtbl.find drivers scheme) with
| Not_found ->
(match retry_with_library Loader.load_driver ~uri scheme with
| Ok driver ->
Hashtbl.add drivers scheme driver;
Ok driver
| Error _ as r -> r)))
let connect
?env ?(tweaks_version = default_tweaks_version) ~sw ~stdenv uri
: ((module CONNECTION), _) result Fiber.t =
Switch.check sw;
(match load_driver uri with
| Ok driver ->
let module Driver = (val driver) in
let+? conn = Driver.connect ~sw ~stdenv ?env ~tweaks_version uri in
let module Conn = (val conn : CONNECTION) in
let module Conn' = struct
include Conn
let disconnect =
let hook = Switch.on_release_cancellable sw disconnect in
fun () -> Switch.remove_hook hook; disconnect ()
end in
(module Conn' : CONNECTION)
| Error err ->
Fiber.return (Error err))
let with_connection
?env ?(tweaks_version = default_tweaks_version) ~stdenv uri f =
Switch.run begin fun sw ->
connect ~sw ~stdenv ?env ~tweaks_version uri >>=? f
end
let connect_pool
?pool_config ?post_connect
?env ?(tweaks_version = default_tweaks_version) ~sw ~stdenv uri =
let pool_config =
(match pool_config with
| None -> Caqti_pool_config.default_from_env ()
| Some pool_config -> pool_config)
in
Switch.check sw;
let check_arg cond =
if not cond then invalid_arg "Caqti_connect.Make.connect_pool"
in
(match Caqti_pool_config.(get max_size) pool_config,
Caqti_pool_config.(get max_idle_size) pool_config with
| None, None -> ()
| Some max_size, None -> check_arg (max_size >= 0)
| None, Some _ -> check_arg false
| Some max_size, Some max_idle_size ->
check_arg (max_size >= 0);
check_arg (0 <= max_idle_size && max_idle_size <= max_size));
(match load_driver uri with
| Ok driver ->
let module Driver = (val driver) in
let connect =
(match post_connect with
| None ->
fun () ->
(Driver.connect ~sw ~stdenv ?env ~tweaks_version uri
:> (connection, _) result Fiber.t)
| Some post_connect ->
fun () ->
(Driver.connect ~sw ~stdenv ?env ~tweaks_version uri
:> (connection, _) result Fiber.t)
>>=? fun conn -> post_connect conn
>|=? fun () -> conn)
in
let disconnect (module Db : CONNECTION) = Db.disconnect () in
let validate (module Db : CONNECTION) = Db.validate () in
let check (module Db : CONNECTION) = Db.check in
let di = Driver.driver_info in
let pool_config =
(match Caqti_driver_info.can_concur di,
Caqti_driver_info.can_pool di,
Caqti_pool_config.(get max_idle_size) pool_config with
| true, true, _ ->
pool_config
| true, false, _ ->
pool_config |> Caqti_pool_config.(set max_idle_size) 0
| false, true, Some 0 ->
pool_config
|> Caqti_pool_config.(set max_size) 1
|> Caqti_pool_config.(set max_idle_size) 0
| false, true, _ ->
pool_config
|> Caqti_pool_config.(set max_size) 1
|> Caqti_pool_config.(set max_idle_size) 1
| false, false, _ ->
pool_config
|> Caqti_pool_config.(set max_size) 1
|> Caqti_pool_config.(set max_idle_size) 0)
in
let pool =
Pool.create
~config:pool_config ~validate ~check ~sw ~stdenv connect disconnect
in
let hook =
Switch.on_release_cancellable sw (fun () -> Pool.drain pool)
in
Gc.finalise (fun _ -> Switch.remove_hook hook) pool;
Ok pool
| Error err ->
Error err)
end