1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
open Base
open Async_kernel
open Async_unix
module Request = struct
include Cohttp.Request
include (Make (Io) : module type of Make (Io) with type t := t)
end
module Response = struct
include Cohttp.Response
include (Make (Io) : module type of Make (Io) with type t := t)
end
type ('address, 'listening_on) t = {
server : ('address, 'listening_on) Tcp.Server.t; [@sexp.opaque]
}
[@@deriving sexp_of]
type response = Response.t * Body.t [@@deriving sexp_of]
type response_action =
[ `Expert of Cohttp.Response.t * (Io.ic -> Io.oc -> unit Deferred.t)
| `Response of response ]
type 'r respond_t =
?flush:bool ->
?headers:Cohttp.Header.t ->
?body:Body.t ->
Cohttp.Code.status_code ->
'r Deferred.t
let close t = Tcp.Server.close t.server
let close_finished t = Tcp.Server.close_finished t.server
let is_closed t = Tcp.Server.is_closed t.server
let listening_on t = Tcp.Server.listening_on t.server
let read_body req rd =
match Request.has_body req with
| `No | `Unknown -> (`Empty, Deferred.unit)
| `Yes ->
let reader = Request.make_body_reader req rd in
let pipe = Body_raw.pipe_of_body Request.read_body_chunk reader in
(`Pipe pipe, Pipe.closed pipe)
let collect_errors writer ~f =
let monitor = Writer.monitor writer in
Monitor.detach_and_get_error_stream monitor |> (ignore : exn Stream.t -> unit);
choose
[
choice (Monitor.get_next_error monitor) (fun e ->
Error (Exn.Reraised ("Cohttp_async.Server.collect_errors", e)));
choice (try_with ~name:"Cohttp_async.Server.collect_errors" f) Fn.id;
]
let handle_client handle_request sock rd wr =
collect_errors wr ~f:(fun () ->
let last_body_pipe_drained = ref Deferred.unit in
let requests_pipe =
Reader.read_all rd (fun rd ->
!last_body_pipe_drained >>= fun () ->
if Reader.is_closed rd then return `Eof
else
Request.read rd >>= function
| `Eof | `Invalid _ -> return `Eof
| `Ok req -> (
let body, finished = read_body req rd in
handle_request ~body sock req >>| function
| `Expert (, io_handler) ->
let expert_finished = Ivar.create () in
last_body_pipe_drained :=
Deferred.all_unit
[ Ivar.read expert_finished; finished ];
`Ok (`Expert (headers, io_handler, body, expert_finished))
| `Response r ->
last_body_pipe_drained := finished;
`Ok (`Response (req, body, r))))
in
Pipe.iter ~continue_on_error:false requests_pipe ~f:(function
| `Expert (response, io_handler, body, finished) ->
Response.write_header response wr >>= fun () ->
io_handler rd wr >>= fun () ->
Body.drain body >>| fun () -> Ivar.fill_if_empty finished ()
| `Response (req, body, (res, res_body)) ->
(match res_body with
| `Empty | `String _ | `Strings _ -> ()
| `Pipe stream ->
Deferred.any_unit
[ Writer.close_finished wr; Writer.consumer_left wr ]
>>> fun () -> Pipe.close_read stream);
let keep_alive = Request.is_keep_alive req in
let flush = Response.flush res in
let res =
let =
Cohttp.Header.add_unless_exists
(Cohttp.Response.headers res)
"connection"
(if keep_alive then "keep-alive" else "close")
in
{ res with Response.headers }
in
Response.write ~flush
(Body_raw.write_body Response.write_body res_body)
res wr
>>= fun () ->
Writer.(if keep_alive then flushed else close ?force_close:None) wr
>>= fun () -> Body.drain body))
>>= fun res ->
Writer.close wr >>= fun () ->
Reader.close rd >>| fun () -> Result.ok_exn res
let respond ?(flush = true) ?( = Cohttp.Header.init ()) ?(body = `Empty)
status : response Deferred.t =
let encoding = Body.transfer_encoding body in
let resp = Response.make ~status ~flush ~encoding ~headers () in
return (resp, body)
let respond_with_pipe ?flush ? ?(code = `OK) body =
respond ?flush ?headers ~body:(`Pipe body) code
let respond_string ?flush ? ?(status = `OK) body =
respond ?flush ?headers ~body:(`String body) status
let respond_with_redirect ? uri =
let =
Cohttp.Header.add_opt_unless_exists headers "location" (Uri.to_string uri)
in
respond ~flush:false ~headers `Found
let resolve_local_file ~docroot ~uri =
Cohttp.Path.resolve_local_file ~docroot ~uri
let error_body_default = "<html><body><h1>404 Not Found</h1></body></html>"
let respond_with_file ?flush ? ?(error_body = error_body_default)
filename =
Monitor.try_with ~run:`Now (fun () ->
Reader.open_file filename >>= fun rd ->
let body = `Pipe (Reader.pipe rd) in
let mime_type = Magic_mime.lookup filename in
let =
Cohttp.Header.add_opt_unless_exists headers "content-type" mime_type
in
respond ?flush ~headers ~body `OK)
>>= function
| Ok res -> return res
| Error _exn -> respond_string ~status:`Not_found error_body
type mode = Conduit_async.server
let create_raw ?max_connections ?backlog ?buffer_age_limit ?(mode = `TCP)
~on_handler_error where_to_listen handle_request =
Conduit_async.serve ?max_connections ?backlog ?buffer_age_limit
~on_handler_error mode where_to_listen
(handle_client handle_request)
>>| fun server -> { server }
let create_expert ?max_connections ?backlog ?buffer_age_limit ?(mode = `TCP)
~on_handler_error where_to_listen handle_request =
create_raw ?max_connections ?backlog ?buffer_age_limit ~on_handler_error ~mode
where_to_listen handle_request
let create ?max_connections ?backlog ?buffer_age_limit ?(mode = `TCP)
~on_handler_error where_to_listen handle_request =
let handle_request ~body address request =
handle_request ~body address request >>| fun r -> `Response r
in
create_raw ?max_connections ?backlog ?buffer_age_limit ~on_handler_error ~mode
where_to_listen handle_request