Source file read.ml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
open Core
open Async
include Delimited_kernel.Read
open Deferred.Let_syntax
open! Int.Replace_polymorphic_compare

(* the maximum read/write I managed to get off of a socket or disk was 65k *)
let buffer_size = 10 * 65 * 1024

let fold_reader'
      ?strip
      ?(skip_lines = 0)
      ?sep
      ?quote
      ?header
      ?on_invalid_row
      builder
      ~init
      ~f
      r
  =
  let%bind () = Shared.drop_lines r skip_lines in
  match%bind
    match Expert.Parse_header.create ?strip ?sep ?quote ?header () with
    | Second header_map -> return (Some (header_map, None))
    | First header_parse ->
      let buffer = Bytes.create buffer_size in
      Deferred.repeat_until_finished header_parse (fun header_parse ->
        match%bind Reader.read r buffer ~len:buffer_size with
        | `Eof ->
          let newline = "\n" in
          (match
             Expert.Parse_header.input_string
               header_parse
               ~len:(String.length newline)
               newline
           with
           | First (_ : Expert.Parse_header.t) ->
             let%map () = Reader.close r in
             failwith "Header is incomplete"
           | Second (headers, input) -> return (`Finished (Some (headers, Some input))))
        | `Ok len ->
          return
            (match Expert.Parse_header.input header_parse ~len buffer with
             | First header_parse -> `Repeat header_parse
             | Second (headers, input) -> `Finished (Some (headers, Some input))))
  with
  | None -> return init
  | Some (header_map, trailing_input) ->
    let state =
      Expert.create_parse_state
        ?strip
        ?sep
        ?quote
        ?on_invalid_row
        ~header_map
        builder
        ~init:(Queue.create ())
        ~f:(fun queue elt ->
          Queue.enqueue queue elt;
          queue)
    in
    let state =
      Option.fold trailing_input ~init:state ~f:(fun state input ->
        Expert.Parse_state.input_string state input)
    in
    let buffer = Bytes.create buffer_size in
    Deferred.repeat_until_finished (state, init) (fun (state, init) ->
      match%bind Reader.read r buffer ~len:buffer_size with
      | `Eof ->
        let state = Expert.Parse_state.finish state in
        let%bind init = f init (Expert.Parse_state.acc state) in
        let%map () = Reader.close r in
        `Finished init
      | `Ok i ->
        let state = Expert.Parse_state.input state buffer ~len:i in
        let%map init = f init (Expert.Parse_state.acc state) in
        Queue.clear (Expert.Parse_state.acc state);
        `Repeat (state, init))
;;

let bind_without_unnecessary_yielding x ~f =
  match Deferred.peek x with
  | Some x -> f x
  | None -> Deferred.bind x ~f
;;

let fold_reader ?strip ?skip_lines ?sep ?quote ?header ?on_invalid_row builder ~init ~f r
  =
  fold_reader'
    ?strip
    ?skip_lines
    ?sep
    ?quote
    ?header
    ?on_invalid_row
    builder
    ~init
    r
    ~f:(fun acc queue ->
      Queue.fold queue ~init:(return acc) ~f:(fun deferred_acc row ->
        bind_without_unnecessary_yielding deferred_acc ~f:(fun acc -> f acc row)))
;;

let fold_reader_without_pushback
      ?strip
      ?skip_lines
      ?sep
      ?quote
      ?header
      ?on_invalid_row
      builder
      ~init
      ~f
      r
  =
  fold_reader'
    ?strip
    ?skip_lines
    ?sep
    ?quote
    ?header
    ?on_invalid_row
    builder
    ~init
    r
    ~f:(fun acc queue -> return (Queue.fold queue ~init:acc ~f))
;;

let pipe_of_reader ?strip ?skip_lines ?sep ?quote ?header ?on_invalid_row builder reader =
  let r, w = Pipe.create () in
  let write_to_pipe : unit Deferred.t =
    let%bind () =
      fold_reader'
        ?strip
        ?skip_lines
        ?sep
        ?quote
        ?header
        ?on_invalid_row
        builder
        ~init:()
        reader
        ~f:(fun () queue ->
          if Pipe.is_closed w
          then (
            let%bind () = Reader.close reader in
            Deferred.never ())
          else Pipe.transfer_in w ~from:queue)
    in
    return (Pipe.close w)
  in
  don't_wait_for write_to_pipe;
  r
;;

let create_reader ?strip ?skip_lines ?sep ?quote ?header ?on_invalid_row builder filename
  =
  Reader.open_file filename
  >>| pipe_of_reader ?strip ?skip_lines ?sep ?quote ?header ?on_invalid_row builder
;;