Source file caqti_async.ml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
open Async_kernel
open Async_unix
open Caqti_common_priv
open Core
module System = struct
type 'a future = 'a Deferred.t
let (>>=) m f = Deferred.bind m ~f
let (>|=) = Deferred.(>>|)
let return = Deferred.return
let finally f g =
try_with ~extract_exn:true f >>= function
| Ok y -> g () >|= fun () -> y
| Error exn -> g () >|= fun () -> Error.raise (Error.of_exn exn)
let cleanup f g =
try_with ~extract_exn:true f >>= function
| Ok y -> return y
| Error exn -> g () >|= fun () -> Error.raise (Error.of_exn exn)
let join = Deferred.all_unit
module Mvar = struct
type 'a t = 'a Ivar.t
let create = Ivar.create
let store x v = Ivar.fill v x
let fetch v = Ivar.read v
end
module Unix = struct
type file_descr = Async_unix.Fd.t
let fdinfo = Info.of_string "Caqti_async file descriptor"
let wrap_fd f ufd =
let fd = Fd.create (Fd.Kind.Socket `Active) ufd fdinfo in
let open Deferred in
f fd >>= fun r ->
Fd.(close ~file_descriptor_handling:Do_not_close_file_descriptor) fd
>>= fun () ->
return r
let poll ?(read = false) ?(write = false) ?timeout fd =
let wait_read =
if read then Async_unix.Fd.ready_to fd `Read else Deferred.never () in
let wait_write =
if write then Async_unix.Fd.ready_to fd `Write else Deferred.never () in
let wait_timeout =
(match timeout with
| Some t -> Clock.after (Time.Span.of_sec t)
| None -> Deferred.never ()) in
let did_read, did_write, did_timeout = ref false, ref false, ref false in
let is_ready = function
| `Ready -> true
| `Bad_fd | `Closed -> false in
Deferred.enabled [
Deferred.choice wait_read (fun st -> did_read := is_ready st);
Deferred.choice wait_write (fun st -> did_write := is_ready st);
Deferred.choice wait_timeout (fun () -> did_timeout := true);
] >>|
(fun f ->
ignore (f ());
(!did_read, !did_write, !did_timeout))
end
module Log = struct
type 'a log = ('a, unit Deferred.t) Logs.msgf -> unit Deferred.t
let kmsg ?(src = Logs.default) level msgf =
let count_it () =
(match level with
| Logs.Error -> Logs.incr_err_count ()
| Logs.Warning -> Logs.incr_warn_count ()
| _ -> ()) in
(match Logs.Src.level src with
| None -> return ()
| Some level' when Poly.(level > level') ->
count_it ();
return ()
| Some _ ->
count_it ();
let ivar = Ivar.create () in
let k () = Ivar.read ivar in
let over () = Ivar.fill ivar () in
Logs.report src level ~over k msgf)
let err ?(src = default_log_src) msgf = kmsg ~src Logs.Error msgf
let warn ?(src = default_log_src) msgf = kmsg ~src Logs.Warning msgf
let info ?(src = default_log_src) msgf = kmsg ~src Logs.Info msgf
let debug ?(src = default_log_src) msgf = kmsg ~src Logs.Debug msgf
end
module Preemptive = struct
let detach f x = In_thread.run (fun () -> f x)
let run_in_main f = Thread_safe.block_on_async_exn f
end
module Stream = Caqti_stream.Make (struct
type 'a future = 'a Deferred.t
let (>>=) m f = Deferred.bind m ~f
let (>|=) = Deferred.(>>|)
let return = Deferred.return
end)
end
include Caqti_connect.Make_unix (System)