1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
module Httpaf = Dream_httpaf_.Httpaf
module H2 = Dream_h2.H2
module Stream = Dream_pure.Stream
module Message = Dream_pure.Message
let address_to_string : Unix.sockaddr -> string = function
| ADDR_UNIX path -> path
| ADDR_INET (address, port) ->
Printf.sprintf "%s:%i" (Unix.string_of_inet_addr address) port
let forward_body_general
(response : Message.response)
(_write_string : ?off:int -> ?len:int -> string -> unit)
(write_buffer : ?off:int -> ?len:int -> Stream.buffer -> unit)
http_flush
close =
let abort _exn = close 1000 in
let bytes_since_flush = ref 0 in
let rec send () =
Message.client_stream response
|> fun stream ->
Stream.read
stream
~data
~flush
~ping
~pong
~close
~exn:abort
and data chunk off len _binary _fin =
write_buffer ~off ~len chunk;
bytes_since_flush := !bytes_since_flush + len;
if !bytes_since_flush >= 4096 then begin
bytes_since_flush := 0;
http_flush send
end
else
send ()
and flush () =
bytes_since_flush := 0;
http_flush send
and ping _buffer _offset _length =
send ()
and pong _buffer _offset _length =
send ()
in
send ()
let forward_body
(response : Message.response)
(body : Httpaf.Body.Writer.t) =
forward_body_general
response
(Httpaf.Body.Writer.write_string body)
(Httpaf.Body.Writer.write_bigstring body)
(Httpaf.Body.Writer.flush body)
(fun _code -> Httpaf.Body.Writer.close body)
let forward_body_h2
(response : Message.response)
(body : H2.Body.Writer.t) =
forward_body_general
response
(H2.Body.Writer.write_string body)
(H2.Body.Writer.write_bigstring body)
(H2.Body.Writer.flush body)
(fun _code -> H2.Body.Writer.close body)