1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
include Column_types
let trim s = String.trim s
let reader_cache : (string, in_channel -> int -> value array) Hashtbl.t = Hashtbl.create 64
let reader_br_cache : (string, Buffered_reader.t -> int -> value array) Hashtbl.t =
Hashtbl.create 64
let reader_cache_hits = ref 0
let reader_cache_misses = ref 0
let reader_br_cache_hits = ref 0
let reader_br_cache_misses = ref 0
let normalize_spec (spec : string) = String.lowercase_ascii (trim spec)
let rec compile_reader s : in_channel -> int -> value array =
match Column_primitives.reader_primitive_of_spec s with
| Some r -> r
| None -> (
match Column_datetime.reader_datetime_of_spec s with
| Some r -> r
| None -> (
match Column_complex.reader_complex_of_spec ~resolver:reader_of_spec s with
| Some r -> r
| None -> (
match
Column_lowcardinality.reader_lowcardinality_of_spec ~resolver:reader_of_spec s
with
| Some r -> r
| None -> failwith (Printf.sprintf "Unsupported column type: %s" s))))
and reader_of_spec (spec : string) : in_channel -> int -> value array =
let s = normalize_spec spec in
match Hashtbl.find_opt reader_cache s with
| Some r ->
incr reader_cache_hits;
r
| None ->
incr reader_cache_misses;
let r = compile_reader s in
Hashtbl.add reader_cache s r;
r
let rec compile_reader_br s : Buffered_reader.t -> int -> value array =
match Column_primitives.reader_primitive_of_spec_br s with
| Some r -> r
| None -> (
match Column_datetime.reader_datetime_of_spec_br s with
| Some r -> r
| None -> (
match Column_complex.reader_complex_of_spec_br ~resolver:reader_of_spec_br s with
| Some r -> r
| None -> (
match
Column_lowcardinality.reader_lowcardinality_of_spec_br ~resolver:reader_of_spec_br s
with
| Some r -> r
| None -> failwith (Printf.sprintf "Unsupported column type: %s" s))))
and reader_of_spec_br (spec : string) : Buffered_reader.t -> int -> value array =
let s = normalize_spec spec in
match Hashtbl.find_opt reader_br_cache s with
| Some r ->
incr reader_br_cache_hits;
r
| None ->
incr reader_br_cache_misses;
let r = compile_reader_br s in
Hashtbl.add reader_br_cache s r;
r
type cache_stats = {
reader_cache_size : int;
reader_cache_hits : int;
reader_cache_misses : int;
reader_br_cache_size : int;
reader_br_cache_hits : int;
reader_br_cache_misses : int;
}
let get_cache_stats () : cache_stats =
{
reader_cache_size = Hashtbl.length reader_cache;
reader_cache_hits = !reader_cache_hits;
reader_cache_misses = !reader_cache_misses;
reader_br_cache_size = Hashtbl.length reader_br_cache;
reader_br_cache_hits = !reader_br_cache_hits;
reader_br_cache_misses = !reader_br_cache_misses;
}
let cache_stats_to_string (s : cache_stats) =
Printf.sprintf "readers: hits=%d misses=%d entries=%d | buffered: hits=%d misses=%d entries=%d"
s.reader_cache_hits s.reader_cache_misses s.reader_cache_size s.reader_br_cache_hits
s.reader_br_cache_misses s.reader_br_cache_size
let reset_cache_stats () =
reader_cache_hits := 0;
reader_cache_misses := 0;
reader_br_cache_hits := 0;
reader_br_cache_misses := 0
let clear_reader_caches () =
Hashtbl.clear reader_cache;
Hashtbl.clear reader_br_cache