Source file nanoev_tiny_httpd.ml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
module TH = Tiny_httpd_core
module EV = Nanoev_picos
module Log = TH.Log
module Slice = Iostream.Slice
module Pool = TH.Pool
module Buf = TH.Buf
module Sem_ = Picos_std_sync.Semaphore.Counting
(** Non blocking semaphore *)
module Out = struct
open Iostream
class type t = Out_buf.t
class of_unix_fd ?(close_noerr = false) ~closed ~(buf : Slice.t)
(fd : Unix.file_descr) :
t =
object
inherit Out_buf.t_from_output ~bytes:buf.bytes ()
method private output_underlying bs i len0 =
let i = ref i in
let len = ref len0 in
while !len > 0 do
match EV.write_once fd bs !i !len with
| 0 -> failwith "write failed"
| n ->
i := !i + n;
len := !len - n
| exception
Unix.Unix_error
( (( Unix.EBADF | Unix.ENOTCONN | Unix.ESHUTDOWN
| Unix.ECONNRESET | Unix.EPIPE ) as err),
fn,
_ ) ->
failwith
@@ Printf.sprintf "write failed in %s: %s" fn
(Unix.error_message err)
done
method private close_underlying () =
if not (Atomic.exchange closed true) then
if close_noerr then (
try EV.close fd with _ -> ()
) else
EV.close fd
end
end
module In = struct
open Iostream
class type t = In_buf.t
let of_unix_fd ?(close_noerr = false) ~closed ~(buf : Slice.t)
(fd : Unix.file_descr) : t =
let eof = ref false in
object
inherit Iostream.In_buf.t_from_refill ~bytes:buf.bytes ()
method private refill (slice : Slice.t) =
if not !eof then (
slice.off <- 0;
let continue = ref true in
while !continue do
match EV.read fd slice.bytes 0 (Bytes.length slice.bytes) with
| n ->
slice.len <- n;
continue := false
| exception
Unix.Unix_error
( ( Unix.EBADF | Unix.ENOTCONN | Unix.ESHUTDOWN
| Unix.ECONNRESET | Unix.EPIPE ),
_,
_ ) ->
eof := true;
continue := false
done;
if slice.len = 0 then eof := true
)
method close () =
if not (Atomic.exchange closed true) then (
eof := true;
if close_noerr then (
try EV.close fd with _ -> ()
) else
EV.close fd
)
end
end
module Unix_tcp_server_ = struct
let get_addr_ sock =
match Unix.getsockname sock with
| Unix.ADDR_INET (addr, port) -> addr, port
| _ -> invalid_arg "httpd: address is not INET"
type t = {
addr: string;
port: int;
buf_pool: Buf.t Pool.t;
slice_pool: Slice.t Pool.t;
max_connections: int;
sem_max_connections: Sem_.t;
(** semaphore to restrict the number of active concurrent connections *)
mutable sock: Unix.file_descr option; (** Socket *)
new_thread: (unit -> unit) -> unit;
timeout: float;
masksigpipe: bool;
running: bool Atomic.t;
}
let shutdown_silent_ fd =
try Unix.shutdown fd Unix.SHUTDOWN_ALL with _ -> ()
let close_silent_ fd = try Unix.close fd with _ -> ()
let to_tcp_server (self : t) : TH.IO.TCP_server.builder =
{
TH.IO.TCP_server.serve =
(fun ~after_init ~handle () : unit ->
if self.masksigpipe && not Sys.win32 then
ignore (Unix.sigprocmask Unix.SIG_BLOCK [ Sys.sigpipe ] : _ list);
let sock, should_bind =
match self.sock with
| Some s ->
s, false
| None ->
let s =
Unix.socket
(if TH.Util.is_ipv6_str self.addr then
Unix.PF_INET6
else
Unix.PF_INET)
Unix.SOCK_STREAM 0
in
s, true
in
Unix.set_nonblock sock;
Unix.setsockopt_optint sock Unix.SO_LINGER None;
if should_bind then (
let inet_addr = Unix.inet_addr_of_string self.addr in
Unix.setsockopt sock Unix.SO_REUSEADDR true;
Unix.bind sock (Unix.ADDR_INET (inet_addr, self.port));
let n_listen = self.max_connections in
Unix.listen sock n_listen
);
self.sock <- Some sock;
let tcp_server =
{
TH.IO.TCP_server.stop =
(fun () ->
Atomic.set self.running false;
try Unix.close sock with _ -> ());
running = (fun () -> Atomic.get self.running);
active_connections =
(fun () ->
self.max_connections - Sem_.get_value self.sem_max_connections);
endpoint =
(fun () ->
let addr, port = get_addr_ sock in
Unix.string_of_inet_addr addr, port);
}
in
after_init tcp_server;
let handle_client_ (client_sock : Unix.file_descr)
(client_addr : Unix.sockaddr) : unit =
Log.debug (fun k ->
k "t[%d]: serving new client on %s"
(Thread.id @@ Thread.self ())
(TH.Util.show_sockaddr client_addr));
if self.masksigpipe && not Sys.win32 then
ignore (Unix.sigprocmask Unix.SIG_BLOCK [ Sys.sigpipe ] : _ list);
Unix.set_nonblock client_sock;
Unix.setsockopt client_sock Unix.TCP_NODELAY true;
Unix.(setsockopt_float client_sock SO_RCVTIMEO self.timeout);
Unix.(setsockopt_float client_sock SO_SNDTIMEO self.timeout);
Pool.with_resource self.slice_pool @@ fun ic_buf ->
Pool.with_resource self.slice_pool @@ fun oc_buf ->
let closed = Atomic.make false in
let oc =
new Out.of_unix_fd
~close_noerr:true ~closed ~buf:oc_buf client_sock
in
let ic =
In.of_unix_fd ~close_noerr:true ~closed ~buf:ic_buf client_sock
in
handle.handle ~client_addr ic oc
in
Unix.set_nonblock sock;
while Atomic.get self.running do
match EV.accept sock with
| client_sock, client_addr ->
Sem_.acquire self.sem_max_connections;
if not Sys.win32 then
ignore Unix.(sigprocmask SIG_BLOCK Sys.[ sigint; sighup ]);
self.new_thread (fun () ->
try
handle_client_ client_sock client_addr;
Log.debug (fun k ->
k "t[%d]: done with client on %s, exiting"
(Thread.id @@ Thread.self ())
@@ TH.Util.show_sockaddr client_addr);
shutdown_silent_ client_sock;
close_silent_ client_sock;
Sem_.release self.sem_max_connections
with e ->
let bt = Printexc.get_raw_backtrace () in
shutdown_silent_ client_sock;
close_silent_ client_sock;
Sem_.release self.sem_max_connections;
Log.error (fun k ->
k
"@[<v>Handler: uncaught exception for client %s:@ \
%s@ %s@]"
(TH.Util.show_sockaddr client_addr)
(Printexc.to_string e)
(Printexc.raw_backtrace_to_string bt)));
if not Sys.win32 then
ignore Unix.(sigprocmask SIG_UNBLOCK Sys.[ sigint; sighup ])
| exception e ->
Log.error (fun k ->
k "Unix.accept raised an exception: %s" (Printexc.to_string e))
done;
(try Unix.close sock with _ -> ());
());
}
end
open struct
let get_max_connection_ ?(max_connections = 2048) () : int =
let max_connections = min (max 4 @@ EV.max_fds ()) max_connections in
max_connections
let clear_slice (slice : Slice.t) =
Bytes.fill slice.bytes 0 (Bytes.length slice.bytes) '\x00';
slice.off <- 0;
slice.len <- 0
end
let create ?enable_logging ?(masksigpipe = not Sys.win32) ?max_connections
?max_buf_pool_size ?(timeout = 0.0) ?buf_size
?(get_time_s = Unix.gettimeofday) ?(addr = "127.0.0.1") ?(port = 8080) ?sock
?head_middlewares ?middlewares ~new_thread () : TH.Server.t =
let max_connections = get_max_connection_ ?max_connections () in
let max_pool_size =
match max_buf_pool_size with
| None -> min 4096 max_connections * 2
| Some m -> m
in
let server =
{
Unix_tcp_server_.addr;
new_thread;
buf_pool =
Pool.create ~clear:Buf.clear_and_zero ~max_size:max_pool_size
~mk_item:(fun () -> Buf.create ?size:buf_size ())
();
slice_pool =
Pool.create ~clear:clear_slice
~mk_item:
(let buf_size = Option.value buf_size ~default:4096 in
fun () -> Slice.create buf_size)
();
running = Atomic.make true;
port;
sock;
max_connections;
sem_max_connections = Sem_.make max_connections;
masksigpipe;
timeout;
}
in
let tcp_server_builder = Unix_tcp_server_.to_tcp_server server in
let module B = struct
let init_addr () = addr
let init_port () = port
let get_time_s = get_time_s
let tcp_server () = tcp_server_builder
end in
let backend = (module B : TH.Server.IO_BACKEND) in
TH.Server.create_from ?enable_logging ?buf_size ?head_middlewares ?middlewares
~backend ()