Source file net_server.ml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
module Sem = Picos_std_sync.Semaphore.Counting

type client_handler = Unix.sockaddr -> IO_in.t -> IO_out.t -> unit

type t = {
  active: bool Atomic.t;
  sock: Unix.file_descr;
  client_handler: client_handler;
  spawn: (unit -> unit) -> unit Picos.Computation.t;
  max_conns: int;
  sem: Sem.t;
  mutable running: unit Picos.Computation.t option;
  exn_handler: exn -> Printexc.raw_backtrace -> unit;
}

let[@inline] join (self : t) : unit =
  Option.iter Picos.Computation.await self.running

let[@inline] max_connections self = self.max_conns

let[@inline] n_active_connections (self : t) : int =
  self.max_conns - Sem.get_value self.sem

let[@inline] running (self : t) : bool = Atomic.get self.active
let shutdown (self : t) = if Atomic.exchange self.active false then ()

open struct
  let default_exn_handler exn bt =
    Printf.eprintf "uncaught exception in network server: %s\n%s%!"
      (Printexc.to_string exn)
      (Printexc.raw_backtrace_to_string bt)

  let run (self : t) () : unit =
    while Atomic.get self.active do
      let client_sock, client_addr = Base.accept self.sock in
      Sem.acquire self.sem;

      let cleanup () =
        (try Unix.shutdown client_sock Unix.SHUTDOWN_ALL with _ -> ());
        (* TODO: close in nanoev too *)
        (try Unix.close client_sock with _ -> ());
        Sem.release self.sem
      in

      let comp : _ Picos.Computation.t =
        self.spawn (fun () ->
            let ic = IO_in.of_unix_fd client_sock in
            let oc = IO_out.of_unix_fd client_sock in
            try
              self.client_handler client_addr ic oc;
              cleanup ()
            with exn ->
              let bt = Printexc.get_raw_backtrace () in
              cleanup ();
              self.exn_handler exn bt)
      in
      ignore (comp : _ Picos.Computation.t)
    done
end

let establish ?backlog ?max_connections ?(exn_handler = default_exn_handler)
    ~spawn ~(client_handler : client_handler) addr : t =
  let ev = Global_ev.get_nanoev_exn () in
  let max_connections =
    match max_connections with
    | None -> Nanoev.max_fds ev
    | Some n -> min (Nanoev.max_fds ev) n
  in
  let sem = Sem.make max_connections in

  let backlog =
    match backlog with
    | Some n -> max 4 n
    | None -> max 4 max_connections
  in

  let domain = Unix.domain_of_sockaddr addr in
  let sock = Unix.socket domain Unix.SOCK_STREAM 0 in

  Unix.bind sock addr;
  Unix.listen sock backlog;
  Unix.set_nonblock sock;
  Unix.setsockopt sock Unix.SO_REUSEADDR true;
  (try Unix.setsockopt sock Unix.TCP_NODELAY true with _ -> ());

  let server =
    {
      active = Atomic.make true;
      max_conns = max_connections;
      sem;
      spawn;
      sock;
      client_handler;
      running = None;
      exn_handler;
    }
  in

  server.running <- Some (spawn (run server));
  server

let with_ ?backlog ?max_connections ?exn_handler ~spawn ~client_handler addr f =
  let server =
    establish ?backlog ?max_connections ?exn_handler ~spawn ~client_handler addr
  in
  Fun.protect ~finally:(fun () -> shutdown server) (fun () -> f server)