Source file caqti_blocking.ml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
open Caqti_platform
module Fiber = struct
type 'a t = 'a
module Infix = struct
let (>>=) x f = f x
let (>|=) x f = f x
end
let return x = x
let catch f g = try f () with exn -> g exn
let finally f g =
(match f () with
| y -> g (); y
| exception exn -> g (); raise exn)
let cleanup f g = try f () with exn -> g (); raise exn
end
module Stream = Caqti_platform.Stream.Make (Fiber)
module System_core = struct
module Fiber = Fiber
module Switch = Caqti_platform.Switch.Make (Fiber)
let async ~sw:_ f = f ()
module Stream = Stream
module Semaphore = struct
type t = bool ref
let create () = ref false
let release v = v := true
let acquire v =
if not !v then
failwith "Cannot acquire unreleased semaphore in blocking context."
end
module Log = struct
type 'a log = 'a Logs.log
let err ?(src = Logging.default_log_src) = Logs.err ~src
let warn ?(src = Logging.default_log_src) = Logs.warn ~src
let info ?(src = Logging.default_log_src) = Logs.info ~src
let debug ?(src = Logging.default_log_src) = Logs.debug ~src
end
type stdenv = unit
module Sequencer = struct
type 'a t = 'a
let create m = m
let enqueue m f = f m
end
end
module Pool = Caqti_platform.Pool.Make_without_alarm (System_core)
module System = struct
include System_core
module Net = struct
module Sockaddr = struct
type t = Unix.sockaddr
let unix s = Unix.ADDR_UNIX s
let tcp (addr, port) =
Unix.ADDR_INET (Unix.inet_addr_of_string (Ipaddr.to_string addr), port)
end
type nonrec in_channel = in_channel
type nonrec out_channel = out_channel
let getaddrinfo ~stdenv:() host port =
try
let opts = Unix.[AI_SOCKTYPE SOCK_STREAM] in
Unix.getaddrinfo (Domain_name.to_string host) (string_of_int port) opts
|> List.map (fun ai -> ai.Unix.ai_addr) |> Result.ok
with
| Not_found -> Ok []
| Unix.Unix_error (code, _, _) ->
Error (`Msg ("Cannot resolve host name: " ^ Unix.error_message code))
let connect ~sw:_ ~stdenv:() sockaddr =
try Ok (Unix.open_connection sockaddr) with
| Unix.Unix_error (code, _, _) ->
Error (`Msg ("Cannot connect: " ^ Unix.error_message code))
let output_char = output_char
let output_string = output_string
let flush = flush
let input_char = input_char
let really_input = really_input
let close_in = close_in
end
end
module System_unix = struct
module Unix = struct
type file_descr = Unix.file_descr
let wrap_fd f fd = f fd
let poll ~stdenv:()
?(read = false) ?(write = false) ?(timeout = -1.0) fd =
let read_fds = if read then [fd] else [] in
let write_fds = if write then [fd] else [] in
let read_fds, write_fds, _ = Unix.select read_fds write_fds [] timeout in
(read_fds <> [], write_fds <> [], read_fds = [] && write_fds = [])
end
module Preemptive = struct
let detach f x = f x
let run_in_main f = f ()
end
end
module Loader = Caqti_platform_unix.Driver_loader.Make (System) (System_unix)
include Connector.Make (System) (Pool) (Loader)
open System
module type CONNECTION = Caqti_connection_sig.S
with type 'a fiber := 'a
and type ('a, 'e) stream := ('a, 'e) Stream.t
type connection = (module CONNECTION)
let connect ?env ?tweaks_version uri =
let sw = Switch.create () in
connect ?env ?tweaks_version ~sw ~stdenv:() uri
let with_connection = with_connection ~stdenv:()
let connect_pool ?pool_config ?post_connect ?env ?tweaks_version uri =
let sw = Switch.create () in
connect_pool
?pool_config ?post_connect ?env ?tweaks_version ~sw ~stdenv:() uri
let or_fail = function
| Ok x -> x
| Error (#Caqti_error.t as err) -> raise (Caqti_error.Exn err)