Source file body.ml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
open! Core
open! Async

module Stream = struct
  type t =
    { encoding : [ `Chunked | `Fixed of int ]
    ; reader : string Pipe.Reader.t
    ; mutable read_started : bool
    }
  [@@deriving sexp_of]

  let of_pipe encoding reader = { encoding; reader; read_started = false }
  let close t = Pipe.close_read t.reader
  let encoding t = t.encoding

  let iter t ~f =
    if t.read_started then raise_s [%message "Only one consumer can read from a stream"];
    t.read_started <- true;
    Pipe.iter t.reader ~f
  ;;

  let read_started t = t.read_started

  let drain t =
    if t.read_started
    then raise_s [%message "Cannot drain a body that's currently being read"];
    Pipe.drain t.reader
  ;;

  let closed t = Pipe.closed t.reader
end

type t =
  | Empty
  | Fixed of string
  | Stream of Stream.t
[@@deriving sexp_of]

let string x = Fixed x
let empty = Empty
let of_pipe encoding reader = Stream { Stream.encoding; reader; read_started = false }
let stream stream = Stream stream

let to_stream = function
  | Empty -> Stream.of_pipe (`Fixed 0) (Pipe.empty ())
  | Fixed x -> Stream.of_pipe (`Fixed (String.length x)) (Pipe.singleton x)
  | Stream x -> x
;;