1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
type _ t =
{ faraday : Faraday.t
; mutable scheduled : bool
; mutable on_eof : unit -> unit
; mutable on_read : Bigstring.t -> off:int -> len:int -> unit
; mutable when_ready_to_write : unit -> unit
; buffered_bytes : int ref
}
let default_on_eof = Sys.opaque_identity (fun () -> ())
let default_on_read = Sys.opaque_identity (fun _ ~off:_ ~len:_ -> ())
let default_ready_to_write = Sys.opaque_identity (fun () -> ())
let of_faraday faraday =
{ faraday
; scheduled = false
; on_eof = default_on_eof
; on_read = default_on_read
; when_ready_to_write = default_ready_to_write
; buffered_bytes = ref 0
}
let create buffer =
of_faraday (Faraday.of_bigstring buffer)
let create_empty () =
let t = create Bigstring.empty in
Faraday.close t.faraday;
t
let empty = create_empty ()
let write_char t c =
Faraday.write_char t.faraday c
let write_string t ?off ?len s =
Faraday.write_string ?off ?len t.faraday s
let write_bigstring t ?off ?len b =
Faraday.write_bigstring ?off ?len t.faraday b
let schedule_bigstring t ?off ?len (b:Bigstring.t) =
Faraday.schedule_bigstring ?off ?len t.faraday b
let ready_to_write t =
let callback = t.when_ready_to_write in
t.when_ready_to_write <- default_ready_to_write;
callback ()
let flush t kontinue =
Faraday.flush t.faraday kontinue;
ready_to_write t
let is_closed t =
Faraday.is_closed t.faraday
let close t =
Faraday.close t.faraday;
ready_to_write t
let unsafe_faraday t =
t.faraday
let rec do_execute_read t on_eof on_read =
match Faraday.operation t.faraday with
| `Yield -> ()
| `Close ->
t.scheduled <- false;
t.on_eof <- default_on_eof;
t.on_read <- default_on_read;
on_eof ()
| `Writev [] -> assert false
| `Writev (iovec::_) ->
t.scheduled <- false;
t.on_eof <- default_on_eof;
t.on_read <- default_on_read;
let { IOVec.buffer; off; len } = iovec in
Faraday.shift t.faraday len;
on_read buffer ~off ~len;
execute_read t
and execute_read t =
if t.scheduled then do_execute_read t t.on_eof t.on_read
let schedule_read t ~on_eof ~on_read =
if t.scheduled
then failwith "Body.schedule_read: reader already scheduled";
if is_closed t
then do_execute_read t on_eof on_read
else begin
t.scheduled <- true;
t.on_eof <- on_eof;
t.on_read <- on_read
end
let has_pending_output t =
Faraday.has_pending_output t.faraday
let when_ready_to_write t callback =
if is_closed t then callback ();
if not (t.when_ready_to_write == default_ready_to_write)
then failwith "Body.when_ready_to_write: only one callback can be registered at a time";
t.when_ready_to_write <- callback
let transfer_to_writer_with_encoding t ~encoding writer =
let faraday = t.faraday in
begin match Faraday.operation faraday with
| `Yield | `Close -> ()
| `Writev iovecs ->
let buffered = t.buffered_bytes in
let iovecs = IOVec.shiftv iovecs !buffered in
let lengthv = IOVec.lengthv iovecs in
buffered := !buffered + lengthv;
begin match encoding with
| `Fixed _ | `Close_delimited -> Serialize.Writer.schedule_fixed writer iovecs
| `Chunked -> Serialize.Writer.schedule_chunk writer iovecs
end;
Serialize.Writer.flush writer (fun () ->
Faraday.shift faraday lengthv;
buffered := !buffered - lengthv)
end