1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
open Utils
module IO = Io.IO
type body = Body.t
type conn = IO.conn * Cohttp.Connection.t [@@warning "-3"]
type response_action =
[ `Expert of Http.Response.t * (IO.ic -> IO.oc -> unit IO.t)
| `Response of Http.Response.t * body ]
type t = {
conn_closed : conn -> unit;
handler : conn -> Http.Request.t -> body -> response_action IO.t;
}
let make_response_action ?(conn_closed = fun _ -> ()) ~callback () =
{ conn_closed; handler = callback }
let make_expert ?conn_closed ~callback () =
make_response_action ?conn_closed
~callback:(fun conn request body ->
IO.(callback conn request body >>= fun expert -> `Expert expert))
()
let make ?conn_closed ~callback () =
make_response_action ?conn_closed
~callback:(fun conn request body ->
IO.(callback conn request body >>= fun response -> `Response response))
()
let respond ? ?flush ~status ~body () =
let response = Cohttp.Response.make ?headers ?flush ~status () in
(response, body)
let respond_string ? ?flush ~status ~body () =
respond ?headers ?flush ~status ~body:(Body.of_string body) ()
let read input =
match Io.Request.read input with
| (`Eof | `Invalid _) as e -> e
| `Ok request -> (
match Http.Request.has_body request with
| `No -> `Ok (request, Eio.Flow.string_source "")
| _ ->
let body =
let reader = Io.Request.make_body_reader request input in
flow_of_reader (fun () -> Io.Request.read_body_chunk reader)
in
`Ok (request, body))
let write output (response : Cohttp.Response.t) body =
let response =
let content_length =
let (Eio.Resource.T (body, ops)) = body in
let module X = (val Eio.Resource.get ops Eio.Flow.Pi.Source) in
List.find_map
(function
| Body.String get -> Some (String.length (get body)) | _ -> None)
X.read_methods
in
match
(Cohttp.Header.get_transfer_encoding response.headers, content_length)
with
| Unknown, None ->
{ response with encoding = Chunked } [@ocaml.warning "-3"]
| Unknown, Some size ->
{ response with encoding = Fixed (Int64.of_int size) }
[@ocaml.warning "-3"]
| , _ ->
{ response with encoding = from_headers } [@ocaml.warning "-3"]
in
let () = Logs.debug (fun m -> m "send headers") in
let () =
Io.Response.write
(fun writer ->
let () =
Logs.debug (fun m ->
(m "send body (%a)" Cohttp.Transfer.pp_encoding response.encoding
[@ocaml.warning "-3"]))
in
flow_to_writer body writer Io.Response.write_body)
response output
in
Eio.Buf_write.flush output
let callback { conn_closed; handler } conn input output =
let id = (Cohttp.Connection.create () [@ocaml.warning "-3"]) in
let rec handle () =
match read input with
| `Eof -> conn_closed (conn, id)
| `Invalid e ->
write output
(Http.Response.make ~status:`Bad_request ())
(Body.of_string e)
| `Ok (request, body) ->
let () =
match handler (conn, id) request body with
| `Response (response, body) -> write output response body
| `Expert (response, handler) ->
let () = Io.Response.write_header response output in
handler input output
in
if Cohttp.Request.is_keep_alive request then handle ()
in
handle ()
let run ?max_connections ?additional_domains ?stop ~on_error socket server =
Eio.Net.run_server socket ?max_connections ?additional_domains ?stop ~on_error
(fun socket peer_address ->
try
Eio.Switch.run @@ fun sw ->
let () =
Logs.info (fun m ->
m "%a: accept connection" Eio.Net.Sockaddr.pp peer_address)
and input = Eio.Buf_read.of_flow ~max_size:max_int socket in
Eio.Buf_write.with_flow socket @@ fun output ->
callback server (sw, peer_address) input output
with Eio.Io (Eio.Net.E (Connection_reset _), _) ->
Logs.info (fun m ->
m "%a: disconnected" Eio.Net.Sockaddr.pp peer_address))