1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
open Core
open Async
include Delimited_kernel.Read
open Deferred.Let_syntax
open! Int.Replace_polymorphic_compare
let buffer_size = 10 * 65 * 1024
let fold_reader'
?strip
?(skip_lines = 0)
?sep
?quote
?
?on_invalid_row
builder
~init
~f
r
=
let%bind () = Shared.drop_lines r skip_lines in
match%bind
match Expert.Parse_header.create ?strip ?sep ?quote ?header () with
| Second -> return (Some (header_map, None))
| First ->
let buffer = Bytes.create buffer_size in
Deferred.repeat_until_finished header_parse (fun ->
match%bind Reader.read r buffer ~len:buffer_size with
| `Eof ->
let newline = "\n" in
(match
Expert.Parse_header.input_string
header_parse
~len:(String.length newline)
newline
with
| First (_ : Expert.Parse_header.t) ->
let%map () = Reader.close r in
failwith "Header is incomplete"
| Second (, input) -> return (`Finished (Some (headers, Some input))))
| `Ok len ->
return
(match Expert.Parse_header.input header_parse ~len buffer with
| First -> `Repeat header_parse
| Second (, input) -> `Finished (Some (headers, Some input))))
with
| None -> return init
| Some (, trailing_input) ->
let state =
Expert.create_parse_state
?strip
?sep
?quote
?on_invalid_row
~header_map
builder
~init:(Queue.create ())
~f:(fun queue elt ->
Queue.enqueue queue elt;
queue)
in
let state =
Option.fold trailing_input ~init:state ~f:(fun state input ->
Expert.Parse_state.input_string state input)
in
let buffer = Bytes.create buffer_size in
Deferred.repeat_until_finished (state, init) (fun (state, init) ->
match%bind Reader.read r buffer ~len:buffer_size with
| `Eof ->
let state = Expert.Parse_state.finish state in
let%bind init = f init (Expert.Parse_state.acc state) in
let%map () = Reader.close r in
`Finished init
| `Ok i ->
let state = Expert.Parse_state.input state buffer ~len:i in
let%map init = f init (Expert.Parse_state.acc state) in
Queue.clear (Expert.Parse_state.acc state);
`Repeat (state, init))
;;
let bind_without_unnecessary_yielding x ~f =
match Deferred.peek x with
| Some x -> f x
| None -> Deferred.bind x ~f
;;
let fold_reader ?strip ?skip_lines ?sep ?quote ? ?on_invalid_row builder ~init ~f r
=
fold_reader'
?strip
?skip_lines
?sep
?quote
?header
?on_invalid_row
builder
~init
r
~f:(fun acc queue ->
Queue.fold queue ~init:(return acc) ~f:(fun deferred_acc row ->
bind_without_unnecessary_yielding deferred_acc ~f:(fun acc -> f acc row)))
;;
let fold_reader_without_pushback
?strip
?skip_lines
?sep
?quote
?
?on_invalid_row
builder
~init
~f
r
=
fold_reader'
?strip
?skip_lines
?sep
?quote
?header
?on_invalid_row
builder
~init
r
~f:(fun acc queue -> return (Queue.fold queue ~init:acc ~f))
;;
let pipe_of_reader ?strip ?skip_lines ?sep ?quote ? ?on_invalid_row builder reader =
let r, w = Pipe.create () in
let write_to_pipe : unit Deferred.t =
let%bind () =
fold_reader'
?strip
?skip_lines
?sep
?quote
?header
?on_invalid_row
builder
~init:()
reader
~f:(fun () queue ->
if Pipe.is_closed w
then (
let%bind () = Reader.close reader in
Deferred.never ())
else Pipe.transfer_in w ~from:queue)
in
return (Pipe.close w)
in
don't_wait_for write_to_pipe;
r
;;
let create_reader ?strip ?skip_lines ?sep ?quote ? ?on_invalid_row builder filename
=
Reader.open_file filename
>>| pipe_of_reader ?strip ?skip_lines ?sep ?quote ?header ?on_invalid_row builder
;;