Source file obj_container_file.ml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
let spf = Printf.sprintf
module Codec = struct
type t = {
name: string;
compress: string -> string;
decompress: string -> string;
}
let null = {
name="null"; compress=(fun _->assert false); decompress=(fun _ -> assert false);
}
let[@inline] name self = self.name
let[@inline] is_null_ self = self == null
let deflate = {
name="deflate";
decompress=(fun str ->
match Zip_helper.decompress ~header:false str with
| Error (`Zlib e) ->
failwith (Format.asprintf "Input: cannot read compressed string:@ %a"
Zip_helper.pp_error e)
| Ok str' -> str'
);
compress=(fun str ->
Zip_helper.compress ~header:false str
);
}
let all_ = ref [ null; deflate ]
let find_by_name name = List.find_opt (fun c -> c.name = name) !all_
let find_by_name_exn name = List.find (fun c -> c.name = name) !all_
let register ~name ~compress ~decompress () : t =
match find_by_name name with
| Some _ -> failwith (spf "codec '%s' already exists" name);
| None ->
let c = {name; compress; decompress} in
all_ := c :: !all_;
c
let register' ~name ~compress ~decompress () : unit =
ignore (register ~name ~compress ~decompress () : t)
end
module Decode = struct
type 'a t = {
input: Input.t;
row: (Input.t -> 'a);
mutable sync_marker: string;
mutable first_block: bool;
mutable is_done: bool;
mutable codec: Codec.t;
mutable block_input: Input.t;
mutable block_remaining_count: int;
(** How many items in the current block. *)
}
let self : unit =
let magic = Bytes.make 4 '\x00' in
Input.read_exact self.input magic 0 4;
let magic = Bytes.unsafe_to_string magic in
if magic <> "Obj\x01" then (
failwith (spf "cannot decode file container: expected 'Obj\x01' header, got %S" magic);
);
let module OH = Obj_container_header in
let = OH.read self.input in
let codec = match OH.Str_map.find_opt "avro.codec" header with
| None -> Codec.null
| Some name ->
begin match Codec.find_by_name name with
| Some c -> c
| None -> failwith (spf "unknown codec %S" name)
end
in
self.codec <- codec;
self.sync_marker <- Input.read_string_of_len self.input 16;
()
let make input ~read:row : _ t =
let self = {
input; row; first_block=true; is_done=false;
codec=Codec.null;
block_input=input; sync_marker="";
block_remaining_count=0;
} in
read_header_ self;
self
let[@inline] cur_block_remaining_count self = self.block_remaining_count
let next_block_ self : unit =
if self.first_block then (
self.first_block <- false;
) else (
let syncm = Input.read_string_of_len self.input 16 in
if syncm <> self.sync_marker then (
failwith (spf "expected sync marker %S, got %S" self.sync_marker syncm);
);
);
begin match Input.read_int self.input with
| count ->
self.block_remaining_count <- count;
let byte_size = Input.read_int self.input in
let block_input =
if Codec.is_null_ self.codec then (
self.input
) else (
let buf = Bytes.make byte_size '\x00' in
Input.read_exact self.input buf 0 byte_size;
let decompressed =
self.codec.Codec.decompress (Bytes.unsafe_to_string buf) in
Input.of_string decompressed
)
in
self.block_input <- block_input;
| exception End_of_file ->
self.is_done <- true;
end;
()
let rec next_ self =
if self.is_done then None
else if self.block_remaining_count = 0 then (
next_block_ self;
(next_ [@tailcall]) self
) else (
let r = self.row self.block_input in
self.block_remaining_count <- self.block_remaining_count - 1;
Some r
)
let[@inline] next self =
if self.is_done then None else next_ self
let rec to_seq self () =
match next self with
| None -> Seq.Nil
| Some x -> Seq.Cons (x, to_seq self)
let rec iter self ~f =
match next self with
| None -> ()
| Some x -> f x; iter self ~f
let fold self ~f ~init =
let r = ref init in
iter self ~f:(fun x -> r := f !r x);
!r
let to_list self = List.of_seq @@ to_seq self
let to_array self = Array.of_seq @@ to_seq self
end
module Encode = struct
type 'a with_params =
?max_block_count:int ->
?buf_size:int ->
?pool:Iobuf.Pool.t ->
?codec:Codec.t ->
'a
type 'a t = {
out: Output.t;
write: Output.t -> 'a -> unit;
schema: string;
codec: Codec.t;
pool: Iobuf.Pool.t;
sync_marker: string;
max_block_count: int;
mutable closed: bool;
mutable block_out: Output.t;
mutable block_chain : Iobuf.Chain.t;
mutable block_count: int;
}
exception Closed
let self =
let module OH = Obj_container_header in
let magic = "Obj\x01" in
Output.write_string_of_len self.out 4 magic;
let meta = OH.Str_map.(
empty |> add "avro.schema" self.schema
|> add "avro.codec" self.codec.Codec.name) in
OH.write self.out meta;
Output.write_string_of_len self.out 16 self.sync_marker;
()
let make
?(max_block_count=50_000) ?(buf_size=16 * 1024) ?pool ?(codec=Codec.null)
out ~schema ~write : _ t =
let max_block_count = max 100 (min max_block_count 50_000) in
let buf_size = max 128 buf_size in
let pool = match pool with
| None -> Iobuf.Pool.create ~buf_size ()
| Some p -> p
in
let sync_marker =
let s = "syncmeup" in
s ^ s
in
let block_out, block_chain = Output.of_iobuf_chain pool in
let self = {
out; write; schema;
closed=false; pool; codec;
max_block_count; sync_marker;
block_out; block_chain; block_count=0;
} in
write_header_ self;
self
let[@inline never] flush_block_ self : unit =
if Codec.is_null_ self.codec then (
let block_size = Iobuf.Chain.len self.block_chain in
Output.write_int self.out self.block_count;
Output.write_int self.out block_size;
Iobuf.Chain.iter self.block_chain
~f:(fun (buf:Iobuf.t) ->
Output.write_slice self.out buf.b buf.i (Iobuf.len buf));
Iobuf.Chain.dealloc self.block_chain;
) else (
let content = Iobuf.Chain.contents self.block_chain in
Iobuf.Chain.dealloc self.block_chain;
let compressed = self.codec.Codec.compress content in
let block_size = String.length compressed in
Output.write_int self.out self.block_count;
Output.write_int self.out block_size;
Output.write_slice self.out (Bytes.unsafe_of_string compressed) 0 block_size;
);
self.block_count <- 0;
begin
let out, block_chain = Output.of_iobuf_chain self.pool in
self.block_out <- out;
self.block_chain <- block_chain;
end;
Output.write_string_of_len self.out 16 self.sync_marker;
Output.flush self.out
let[@inline never] post_push_ self =
if self.block_count >= self.max_block_count then (
flush_block_ self
)
let cur_block_count self = self.block_count
let flush_block self = if self.block_count > 0 then flush_block_ self
let[@inline] push self x : unit =
if self.closed then raise Closed;
self.block_count <- 1 + self.block_count;
self.write self.block_out x;
post_push_ self
let close self =
if not self.closed then (
if self.block_count > 0 then flush_block_ self;
self.closed <- true;
)
let write_seq
?max_block_count ?buf_size ?pool ?codec
~schema ~write out seq : unit =
let self =
make ?max_block_count ?buf_size ?pool ?codec
~schema ~write out
in
Seq.iter (push self) seq;
close self
let write_seq_to_string
?max_block_count ?buf_size ?pool ?codec
~schema ~write seq : string =
let buf = Buffer.create 1_024 in
let out = Output.of_buffer buf in
write_seq
?max_block_count ?buf_size ?pool ?codec
~schema ~write out seq;
Buffer.contents buf
end