Source file body.ml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
type _ t =
  { faraday                     : Faraday.t
  ; mutable scheduled           : bool
  ; mutable on_eof              : unit -> unit
  ; mutable on_read             : Bigstring.t -> off:int -> len:int -> unit
  ; mutable when_ready_to_write : unit -> unit
  ; buffered_bytes              : int ref
  }

let default_on_eof         = Sys.opaque_identity (fun () -> ())
let default_on_read        = Sys.opaque_identity (fun _ ~off:_ ~len:_ -> ())
let default_ready_to_write = Sys.opaque_identity (fun () -> ())

let of_faraday faraday =
  { faraday
  ; scheduled = false
  ; on_eof    = default_on_eof
  ; on_read   = default_on_read
  ; when_ready_to_write = default_ready_to_write
  ; buffered_bytes = ref 0
  }

let create buffer =
  of_faraday (Faraday.of_bigstring buffer)

let create_empty () =
  let t = create Bigstring.empty in
  Faraday.close t.faraday;
  t

let empty = create_empty ()

let write_char t c =
  Faraday.write_char t.faraday c

let write_string t ?off ?len s =
  Faraday.write_string ?off ?len t.faraday s

let write_bigstring t ?off ?len b =
  Faraday.write_bigstring ?off ?len t.faraday b

let schedule_bigstring t ?off ?len (b:Bigstring.t) =
  Faraday.schedule_bigstring ?off ?len t.faraday b

let ready_to_write t =
  let callback = t.when_ready_to_write in
  t.when_ready_to_write <- default_ready_to_write;
  callback ()

let flush t kontinue =
  Faraday.flush t.faraday kontinue;
  ready_to_write t

let is_closed t =
  Faraday.is_closed t.faraday

let close t =
  Faraday.close t.faraday;
  ready_to_write t

let unsafe_faraday t =
  t.faraday

let rec do_execute_read t on_eof on_read =
  match Faraday.operation t.faraday with
  | `Yield           -> ()
  | `Close           ->
    t.scheduled <- false;
    t.on_eof    <- default_on_eof;
    t.on_read   <- default_on_read;
    on_eof ()
  | `Writev []       -> assert false
  | `Writev (iovec::_) ->
    t.scheduled <- false;
    t.on_eof    <- default_on_eof;
    t.on_read   <- default_on_read;
    let { IOVec.buffer; off; len } = iovec in
    Faraday.shift t.faraday len;
    on_read buffer ~off ~len;
    execute_read t
and execute_read t =
  if t.scheduled then do_execute_read t t.on_eof t.on_read

let schedule_read t ~on_eof ~on_read =
  if t.scheduled
  then failwith "Body.schedule_read: reader already scheduled";
  if is_closed t
  then do_execute_read t on_eof on_read
  else begin
    t.scheduled <- true;
    t.on_eof    <- on_eof;
    t.on_read   <- on_read
  end

let has_pending_output t =
  Faraday.has_pending_output t.faraday

let when_ready_to_write t callback =
  if is_closed t then callback ();
  if not (t.when_ready_to_write == default_ready_to_write)
  then failwith "Body.when_ready_to_write: only one callback can be registered at a time";
  t.when_ready_to_write <- callback

let transfer_to_writer_with_encoding t ~encoding writer =
  let faraday = t.faraday in
  begin match Faraday.operation faraday with
  | `Yield | `Close -> ()
  | `Writev iovecs ->
    let buffered = t.buffered_bytes in
    let iovecs   = IOVec.shiftv  iovecs !buffered in
    let lengthv  = IOVec.lengthv iovecs in
    buffered := !buffered + lengthv;
    begin match encoding with
    | `Fixed _ | `Close_delimited -> Serialize.Writer.schedule_fixed writer iovecs
    | `Chunked                    -> Serialize.Writer.schedule_chunk writer iovecs
    end;
    Serialize.Writer.flush writer (fun () ->
      Faraday.shift faraday lengthv;
      buffered := !buffered - lengthv)
  end