Source file caqti_connect.ml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
open Printf
let dynload_library = ref @@ fun lib ->
Error (sprintf "Neither %s nor the dynamic linker is linked into the \
application." lib)
let define_loader f = dynload_library := f
let drivers = Hashtbl.create 11
let define_unix_driver scheme p = Hashtbl.add drivers scheme p
let scheme_driver_name = function
| "postgres" | "postgresql" -> "caqti-driver-postgresql"
| s -> "caqti-driver-" ^ s
let load_driver_functor ~uri scheme =
(try Ok (Hashtbl.find drivers scheme) with
| Not_found ->
(match !dynload_library (scheme_driver_name scheme) with
| Ok () ->
(try Ok (Hashtbl.find drivers scheme) with
| Not_found ->
let msg = sprintf "The driver for %s did not register itself \
after apparently loading." scheme in
Error (Caqti_error.load_failed ~uri (Caqti_error.Msg msg)))
| Error msg ->
Error (Caqti_error.load_failed ~uri (Caqti_error.Msg msg))))
module Make_unix (System : Caqti_driver_sig.System_unix) = struct
open System
let (>>=?) m f = m >>= function Ok x -> f x | Error _ as r -> return r
let (>|=?) m f = m >|= function Ok x -> (Ok (f x)) | Error _ as r -> r
module type DRIVER = Caqti_driver_sig.S
with type 'a future := 'a System.future
and type ('a, 'err) stream := ('a, 'err) System.Stream.t
let drivers : (string, (module DRIVER)) Hashtbl.t = Hashtbl.create 11
let load_driver uri =
(match Uri.scheme uri with
| None ->
let msg = "Missing URI scheme." in
Error (Caqti_error.load_rejected ~uri (Caqti_error.Msg msg))
| Some scheme ->
(try Ok (Hashtbl.find drivers scheme) with
| Not_found ->
(match load_driver_functor ~uri scheme with
| Ok make_driver ->
let module Make_driver =
(val make_driver : Caqti_driver_sig.Of_system_unix) in
let module Driver = Make_driver (System) in
let driver = (module Driver : DRIVER) in
Hashtbl.add drivers scheme driver;
Ok driver
| Error _ as r -> r)))
module type CONNECTION_BASE = Caqti_connection_sig.Base
with type 'a future := 'a System.future
and type ('a, 'err) stream := ('a, 'err) System.Stream.t
module type CONNECTION = Caqti_connection_sig.S
with type 'a future := 'a System.future
and type ('a, 'err) stream := ('a, 'err) System.Stream.t
type connection = (module CONNECTION)
let connect ?env uri : ((module CONNECTION), _) result future =
(match load_driver uri with
| Ok driver ->
let module Driver = (val driver) in
Driver.connect ?env uri
| Error err ->
return (Error err))
let with_connection ?env uri f =
connect ?env uri >>=? fun ((module Db) as conn) ->
try
f conn >>= fun result -> Db.disconnect () >|= fun () -> result
with exn ->
Db.disconnect () >|= fun () -> raise exn
module Pool = Caqti_pool.Make (System)
module Stream = System.Stream
let connect_pool ?max_size ?max_idle_size ?post_connect ?env uri =
let check_arg cond =
if not cond then invalid_arg "Caqti_connect.Make_unix.connect_pool"
in
(match max_size, max_idle_size with
| None, None -> ()
| Some max_size, None -> check_arg (max_size >= 0)
| None, Some _ -> check_arg false
| Some max_size, Some max_idle_size ->
check_arg (max_size >= 0);
check_arg (0 <= max_idle_size && max_idle_size <= max_size));
(match load_driver uri with
| Ok driver ->
let module Driver = (val driver) in
let connect =
(match post_connect with
| None ->
fun () ->
(Driver.connect ?env uri :> (connection, _) result future)
| Some post_connect ->
fun () ->
(Driver.connect ?env uri :> (connection, _) result future)
>>=? fun conn -> post_connect conn
>|=? fun () -> conn)
in
let disconnect (module Db : CONNECTION) = Db.disconnect () in
let validate (module Db : CONNECTION) = Db.validate () in
let check (module Db : CONNECTION) = Db.check in
let di = Driver.driver_info in
let max_size, max_idle_size =
(match Caqti_driver_info.can_concur di, Caqti_driver_info.can_pool di,
max_idle_size with
| true, true, _ -> max_size, max_idle_size
| true, false, _ -> max_size, Some 0
| false, true, Some 0 -> Some 1, Some 0
| false, true, _ -> Some 1, Some 1
| false, false, _ -> Some 1, Some 0)
in
let pool =
Pool.create ?max_size ?max_idle_size ~validate ~check
connect disconnect
in
Ok pool
| Error err ->
Error err)
end