Source file tiny_httpd_unix.ml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
module Dir = Dir
module Sem = Sem
module Unix_tcp_server_ = struct
let get_addr_ sock =
match Unix.getsockname sock with
| Unix.ADDR_INET (addr, port) -> addr, port
| _ -> invalid_arg "httpd: address is not INET"
type t = {
addr: string;
port: int;
buf_pool: Buf.t Pool.t;
slice_pool: IO.Slice.t Pool.t;
max_connections: int;
sem_max_connections: Sem.t;
(** semaphore to restrict the number of active concurrent connections *)
mutable sock: Unix.file_descr option; (** Socket *)
new_thread: (unit -> unit) -> unit;
timeout: float;
masksigpipe: bool;
mutable running: bool;
}
let shutdown_silent_ fd =
try Unix.shutdown fd Unix.SHUTDOWN_ALL with _ -> ()
let close_silent_ fd = try Unix.close fd with _ -> ()
let to_tcp_server (self : t) : IO.TCP_server.builder =
{
IO.TCP_server.serve =
(fun ~after_init ~handle () : unit ->
if self.masksigpipe && not Sys.win32 then
ignore (Unix.sigprocmask Unix.SIG_BLOCK [ Sys.sigpipe ] : _ list);
let sock, should_bind =
match self.sock with
| Some s ->
( s,
false
)
| None ->
( Unix.socket
(if Util.is_ipv6_str self.addr then
Unix.PF_INET6
else
Unix.PF_INET)
Unix.SOCK_STREAM 0,
true )
in
Unix.clear_nonblock sock;
Unix.setsockopt_optint sock Unix.SO_LINGER None;
if should_bind then (
let inet_addr = Unix.inet_addr_of_string self.addr in
Unix.setsockopt sock Unix.SO_REUSEADDR true;
Unix.bind sock (Unix.ADDR_INET (inet_addr, self.port));
let n_listen = 2 * self.max_connections in
Unix.listen sock n_listen
);
self.sock <- Some sock;
let tcp_server =
{
IO.TCP_server.stop = (fun () -> self.running <- false);
running = (fun () -> self.running);
active_connections =
(fun () -> Sem.num_acquired self.sem_max_connections - 1);
endpoint =
(fun () ->
let addr, port = get_addr_ sock in
Unix.string_of_inet_addr addr, port);
}
in
after_init tcp_server;
let handle_client_unix_ (client_sock : Unix.file_descr)
(client_addr : Unix.sockaddr) : unit =
Log.debug (fun k ->
k "t[%d]: serving new client on %s"
(Thread.id @@ Thread.self ())
(Util.show_sockaddr client_addr));
if self.masksigpipe && not Sys.win32 then
ignore (Unix.sigprocmask Unix.SIG_BLOCK [ Sys.sigpipe ] : _ list);
Unix.set_nonblock client_sock;
Unix.setsockopt client_sock Unix.TCP_NODELAY true;
Unix.(setsockopt_float client_sock SO_RCVTIMEO self.timeout);
Unix.(setsockopt_float client_sock SO_SNDTIMEO self.timeout);
Pool.with_resource self.slice_pool @@ fun ic_buf ->
Pool.with_resource self.slice_pool @@ fun oc_buf ->
let closed = ref false in
let oc =
new IO.Output.of_unix_fd
~close_noerr:true ~closed ~buf:oc_buf client_sock
in
let ic =
IO.Input.of_unix_fd ~close_noerr:true ~closed ~buf:ic_buf
client_sock
in
handle.handle ~client_addr ic oc
in
Unix.set_nonblock sock;
while self.running do
match Unix.accept sock with
| client_sock, client_addr ->
Sem.acquire 1 self.sem_max_connections;
if not Sys.win32 then
ignore Unix.(sigprocmask SIG_BLOCK Sys.[ sigint; sighup ]);
self.new_thread (fun () ->
try
handle_client_unix_ client_sock client_addr;
Log.debug (fun k ->
k "t[%d]: done with client on %s, exiting"
(Thread.id @@ Thread.self ())
@@ Util.show_sockaddr client_addr);
shutdown_silent_ client_sock;
close_silent_ client_sock;
Sem.release 1 self.sem_max_connections
with e ->
let bt = Printexc.get_raw_backtrace () in
shutdown_silent_ client_sock;
close_silent_ client_sock;
Sem.release 1 self.sem_max_connections;
Log.error (fun k ->
k
"@[<v>Handler: uncaught exception for client %s:@ \
%s@ %s@]"
(Util.show_sockaddr client_addr)
(Printexc.to_string e)
(Printexc.raw_backtrace_to_string bt)));
if not Sys.win32 then
ignore Unix.(sigprocmask SIG_UNBLOCK Sys.[ sigint; sighup ])
| exception Unix.Unix_error ((Unix.EAGAIN | Unix.EWOULDBLOCK), _, _)
->
ignore (Unix.select [ sock ] [] [ sock ] 1.0 : _ * _ * _)
| exception e ->
Log.error (fun k ->
k "Unix.accept raised an exception: %s" (Printexc.to_string e));
Thread.delay 0.01
done;
Unix.close sock;
Sem.acquire self.sem_max_connections.max self.sem_max_connections;
());
}
end