Source file conduit_lwt_server.ml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
open Lwt.Infix
let src = Logs.Src.create "conduit_lwt_server" ~doc:"Conduit Lwt transport"
module Log = (val Logs.src_log src : Logs.LOG)
let safe_close t =
Lwt.catch (fun () -> Lwt_io.close t) (fun _ -> Lwt.return_unit)
let close (ic, oc) = safe_close oc >>= fun () -> safe_close ic
let with_socket sockaddr f =
let fd =
Lwt_unix.socket (Unix.domain_of_sockaddr sockaddr) Unix.SOCK_STREAM 0
in
Lwt.catch
(fun () -> f fd)
(fun e ->
Lwt.catch (fun () -> Lwt_unix.close fd) (fun _ -> Lwt.return_unit)
>>= fun () -> Lwt.reraise e)
let listen ?(backlog = 128) sa =
with_socket sa (fun fd ->
Lwt_unix.(setsockopt fd SO_REUSEADDR true);
Lwt_unix.bind fd sa >|= fun () ->
Lwt_unix.listen fd backlog;
Lwt_unix.set_close_on_exec fd;
fd)
let process_accept ?timeout callback (sa, ic, oc) =
let c = callback sa ic oc in
let events =
match timeout with
| None -> [ c ]
| Some t -> [ c; Lwt_unix.sleep (float_of_int t) ]
in
Lwt.finalize (fun () -> Lwt.pick events) (fun () -> close (ic, oc))
let maxactive = ref None
let active = ref 0
let cond = Lwt_condition.create ()
let connected () = incr active
let disconnected () =
decr active;
Lwt_condition.broadcast cond ()
let rec throttle () =
match !maxactive with
| Some limit when !active > limit -> Lwt_condition.wait cond >>= throttle
| _ -> Lwt.return_unit
let set_max_active max_active =
maxactive := Some max_active;
Lwt_condition.broadcast cond ()
let run_handler handler v =
Lwt.async (fun () ->
Lwt.try_bind
(fun () -> handler v)
(fun () ->
disconnected ();
Lwt.return_unit)
(fun x ->
disconnected ();
(match x with
| Lwt.Canceled -> ()
| ex ->
Log.warn (fun f ->
f "Uncaught exception in handler: %s" (Printexc.to_string ex)));
Lwt.return_unit))
let init ?(stop = fst (Lwt.wait ())) handler fd =
let stop = Lwt.map (fun () -> `Stop) stop in
let rec loop () =
Lwt.try_bind
(fun () ->
connected ();
throttle () >>= fun () ->
let accept = Lwt.map (fun v -> `Accept v) (Lwt_unix.accept fd) in
Lwt.choose [ accept; stop ] >|= function
| `Stop ->
Lwt.cancel accept;
`Stop
| `Accept _ as x -> x)
(function
| `Stop ->
disconnected ();
Lwt.return_unit
| `Accept v ->
run_handler handler v;
loop ())
(fun exn ->
disconnected ();
match exn with
| Lwt.Canceled -> Lwt.return_unit
| ex ->
Log.warn (fun f ->
f "Uncaught exception accepting connection: %s"
(Printexc.to_string ex));
Lwt.pause () >>= loop)
in
Lwt.finalize loop (fun () -> Lwt_unix.close fd)