Source file server.ml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
open Base
open Async_kernel
open Async_unix
open Async_extra

module Request = struct
  include Cohttp.Request
  include (Make(Io) : module type of Make(Io) with type t := t)
end

module Response = struct
  include Cohttp.Response
  include (Make(Io) : module type of Make(Io) with type t := t)
end

type ('address, 'listening_on) t = {
  server: ('address, 'listening_on) Tcp.Server.t sexp_opaque;
} [@@deriving sexp_of]

type response = Response.t * Body.t [@@deriving sexp_of]

type response_action =
  [ `Expert of Cohttp.Response.t * (Io.ic -> Io.oc -> unit Deferred.t)
  | `Response of response
  ]

type 'r respond_t =
  ?flush      : bool
  -> ?headers : Cohttp.Header.t
  -> ?body    : Body.t
  -> Cohttp.Code.status_code
  -> 'r Deferred.t

let close t = Tcp.Server.close t.server
let close_finished t = Tcp.Server.close_finished t.server
let is_closed t = Tcp.Server.is_closed t.server
let listening_on t = Tcp.Server.listening_on t.server

let read_body req rd =
  match Request.has_body req with
  (* TODO maybe attempt to read body *)
  | `No | `Unknown -> (`Empty, Deferred.unit)
  | `Yes -> (* Create a Pipe for the body *)
    let reader = Request.make_body_reader req rd in
    let pipe = Body_raw.pipe_of_body Request.read_body_chunk reader in
    (`Pipe pipe, Pipe.closed pipe)

let collect_errors writer ~f =
  let monitor = Writer.monitor writer in
  (* don't propagate errors up, we handle them here *)
  Monitor.detach_and_get_error_stream monitor |> (ignore : exn Stream.t -> unit);
  choose [
    choice (Monitor.get_next_error monitor)
      (fun e -> Error (Exn.Reraised ("Cohttp_async.Server.collect_errors", e)));
    choice (try_with ~name:"Cohttp_async.Server.collect_errors" f) Fn.id;
  ]
;;

let handle_client handle_request sock rd wr =
  collect_errors wr ~f:(fun () ->
  let last_body_pipe_drained = ref Deferred.unit in
    let requests_pipe =
      Reader.read_all rd (fun rd ->
        !last_body_pipe_drained
        >>= fun () ->
        (* [`Expert] responses may close the [Reader.t] *)
        if Reader.is_closed rd
        then return `Eof
        else begin
          Request.read rd
          >>= function
          | `Eof | `Invalid _ -> return `Eof
          | `Ok req ->
            let body, finished = read_body req rd in
            handle_request ~body sock req
            >>| function
            | `Expert (headers, io_handler) ->
              let expert_finished = Ivar.create () in
              last_body_pipe_drained := Deferred.all_unit [Ivar.read expert_finished; finished];
              `Ok (`Expert (headers, io_handler, body, expert_finished))
            | `Response r ->
              last_body_pipe_drained := finished;
              `Ok (`Response (req, body, r))
        end
      )
    in
    Pipe.iter ~continue_on_error:false requests_pipe ~f:(function
      | `Expert (response, io_handler, body, finished) ->
        Response.write_header response wr
        >>= fun () ->
        io_handler rd wr
        >>= fun () ->
        Body.drain body >>| fun () ->
        Ivar.fill_if_empty finished ()
      | `Response(req, body, (res, res_body)) ->
        let keep_alive = Request.is_keep_alive req in
        let flush = Response.flush res in
        let res =
          let headers = Cohttp.Header.add_unless_exists
              (Cohttp.Response.headers res)
              "connection"
              (if keep_alive then "keep-alive" else "close") in
          { res with Response.headers } in
        Response.write ~flush (Body_raw.write_body Response.write_body res_body) res wr
        >>= fun () ->
        Writer.(if keep_alive then flushed else close ?force_close:None) wr
        >>= fun () ->
        Body.drain body
    )
  )
  >>= fun res ->
  Writer.close wr
  >>= fun () ->
  Reader.close rd
  >>| fun () ->
  Result.ok_exn res

let respond ?(flush=true) ?(headers=Cohttp.Header.init ())
    ?(body=`Empty) status : response Deferred.t =
  let encoding = Body.transfer_encoding body in
  let resp = Response.make ~status ~flush ~encoding ~headers () in
  return (resp, body)

let respond_with_pipe ?flush ?headers ?(code=`OK) body =
  respond ?flush ?headers ~body:(`Pipe body) code

let respond_string ?flush ?headers ?(status=`OK) body =
  respond ?flush ?headers ~body:(`String body) status

let respond_with_redirect ?headers uri =
  let headers = Cohttp.Header.add_opt_unless_exists headers "location" (Uri.to_string uri) in
  respond ~flush:false ~headers `Found

let resolve_local_file ~docroot ~uri =
  (* This normalises the Uri and strips out .. characters *)
  Uri.(pct_decode (path (resolve "" (of_string "/") uri)))
  |> Caml.Filename.concat docroot

let error_body_default =
  "<html><body><h1>404 Not Found</h1></body></html>"

let respond_with_file ?flush ?headers ?(error_body=error_body_default) filename =
  Monitor.try_with ~run:`Now
    (fun () ->
       Reader.open_file filename
       >>= fun rd ->
       let body = `Pipe (Reader.pipe rd) in
       let mime_type = Magic_mime.lookup filename in
       let headers = Cohttp.Header.add_opt_unless_exists headers "content-type" mime_type in
       respond ?flush ~headers ~body `OK
    )
  >>= function
  |Ok res -> return res
  |Error _exn -> respond_string ~status:`Not_found error_body

type mode = Conduit_async.server

let create_raw ?max_connections ?backlog ?buffer_age_limit ?(mode=`TCP)
    ~on_handler_error where_to_listen handle_request =
  Conduit_async.serve ?max_connections ?backlog
    ?buffer_age_limit ~on_handler_error mode
    where_to_listen (handle_client handle_request)
  >>| fun server ->
  { server }


let create_expert ?max_connections ?backlog
      ?buffer_age_limit ?(mode=`TCP) ~on_handler_error where_to_listen handle_request =
  create_raw ?max_connections ?backlog
    ?buffer_age_limit ~on_handler_error ~mode where_to_listen
    handle_request

let create
      ?max_connections
      ?backlog
      ?buffer_age_limit
      ?(mode = `TCP)
      ~on_handler_error
      where_to_listen
      handle_request =
  let handle_request ~body address request =
    handle_request ~body address request >>| fun r -> `Response r
  in
  create_raw ?max_connections ?backlog
    ?buffer_age_limit ~on_handler_error ~mode where_to_listen
    handle_request