Source file character_separated_without_quoting.ml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
open Core
open Async
open Shared
module Row = Delimited_kernel.Read.Row
let of_reader
?(strip = false)
?(skip_lines = 0)
?(on_parse_error = `Raise)
~
?(quote = '\\')
~sep
reader
=
assert (Char.O.(quote <> sep));
let lineno = ref 1 in
let pipe_r, pipe_w = Pipe.create () in
let buffer = Bytes.create buffer_size in
let field = Buffer.create 1 in
let quoted = ref false in
let current_row = Queue.create () in
let row_queue = Queue.create () in
let emit_field = unstage (make_emit_field ~strip current_row field) in
let `on_eof on_eof, emit_row = make_emit_row current_row row_queue header ~lineno in
let flush_rows () = Pipe.transfer_in pipe_w ~from:row_queue in
let prev_was_cr = ref false in
let emit_pending_cr () =
if !prev_was_cr
then (
Buffer.add_char field '\r';
prev_was_cr := false)
in
let add_char c =
emit_pending_cr ();
if Char.equal c '\r' then prev_was_cr := true else Buffer.add_char field c
in
let close () =
don't_wait_for (flush_rows ());
don't_wait_for (Reader.close reader);
Pipe.close pipe_w
in
let rec loop () =
Reader.read reader buffer
>>> function
| `Eof ->
if Queue.length current_row <> 0
then (
emit_field ();
emit_row ());
on_eof ();
close ()
| `Ok n ->
let res =
Result.try_with (fun () ->
for i = 0 to n - 1 do
let c = Bytes.get buffer i in
if Char.equal c '\n'
then (
prev_was_cr := false;
if !quoted
then (
close ();
failwithf
"escape character found at the end of a line (lineno=%d)"
!lineno
())
else (
emit_field ();
emit_row ()))
else if !quoted
then (
quoted := false;
add_char c)
else if Char.equal c sep
then (
emit_pending_cr ();
emit_field ())
else if Char.equal c quote
then (
emit_pending_cr ();
quoted := true)
else add_char c
done)
in
flush_rows ()
>>> fun () ->
(match res with
| Ok () -> loop ()
| Error e ->
(match on_parse_error with
| `Raise -> raise e
| `Handle f ->
emit_field ();
(match f current_row e with
| `Continue -> loop ()
| `Finish -> close ())))
in
upon (drop_lines reader skip_lines) loop;
pipe_r
;;
let create_reader ?strip ?skip_lines ?on_parse_error ~ ?quote ~sep filename =
let%map r = Reader.open_file filename in
of_reader ?strip ?skip_lines ?on_parse_error ~header ?quote ~sep r
;;