Source file body_raw.ml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
open Base
open Async_kernel
module B = Cohttp.Body

type t = [
  | B.t
  | `Pipe of string Pipe.Reader.t
] [@@deriving sexp_of]

let empty = `Empty
let of_string s = ((B.of_string s) :> t)
let of_pipe p = `Pipe p

let to_string = function
  | #B.t as body -> return (B.to_string body)
  | `Pipe s -> Pipe.to_list s >>| String.concat

let to_string_list = function
  | #B.t as body -> return (B.to_string_list body)
  | `Pipe s -> Pipe.to_list s

let drain = function
  | #B.t -> return ()
  | `Pipe p -> Pipe.drain p

let is_empty (body:t) =
  match body with
  | #B.t as body -> return (B.is_empty body)
  | `Pipe s ->
    Pipe.values_available s
    >>| function
    |`Eof -> false
    |`Ok ->
      match Pipe.peek s with
      | Some "" -> true
      | Some _ | None -> false

let to_pipe = function
  | `Empty -> Pipe.of_list []
  | `String s -> Pipe.singleton s
  | `Strings sl -> Pipe.of_list sl
  | `Pipe p -> p

let disable_chunked_encoding = function
  | #B.t as body -> return (body, B.length body)
  | `Pipe s ->
    Pipe.to_list s >>| fun l ->
    let body = `Strings l in
    let len = B.length body in
    body, len

let transfer_encoding = function
  | #B.t as t -> B.transfer_encoding t
  | `Pipe _ -> Cohttp.Transfer.Chunked

let of_string_list strings = `Pipe (Pipe.of_list strings)

let map t ~f =
  match t with
  | #B.t as t -> (B.map f t :> t)
  | `Pipe p -> `Pipe (Pipe.map p ~f)

let as_pipe t ~f = `Pipe (t |> to_pipe |> f)

let write_body write_body (body : t) writer =
  match body with
  | `Empty -> return ()
  | `String s -> write_body writer s
  | `Strings sl -> Deferred.List.iter sl ~f:(write_body writer)
  | `Pipe p -> Pipe.iter p ~f:(write_body writer)

let pipe_of_body read_chunk ic =
  let open Cohttp.Transfer in
  Pipe.create_reader ~close_on_exception:false (fun writer ->
      Deferred.repeat_until_finished () (fun () ->
          read_chunk ic >>= function
          | Chunk buf ->
            (* Even if [writer] has been closed, the loop must continue reading
             * from the input channel to ensure that it is left in a proper state
             * for the next request to be processed (in the case of keep-alive).
             *
             * The only case where [writer] will be closed is when
             * [Pipe.close_read] has been called on its read end. This could be
             * done by a request handler to signal that it does not need to
             * inspect the remainder of the body to fulfill the request.
            *)
            Pipe.write_when_ready writer ~f:(fun write -> write buf)
            >>| fun _ -> `Repeat ()
          | Final_chunk buf ->
            Pipe.write_when_ready writer ~f:(fun write -> write buf)
            >>| fun _ -> `Finished ()
          | Done -> return (`Finished ()))
    )