1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
open! Import
module Make (IO : Io.S) (Key : Data.Key) (Value : Data.Value) = struct
module Entry = Data.Entry.Make (Key) (Value)
module IO = struct
include Io.Extend (IO)
let iter_keys ?min f =
let page_size = Int63.(mul Entry.encoded_sizeL (of_int 1_000)) in
iter ~page_size ?min (fun ~off ~buf ~buf_off ->
let key, _ = Entry.decode_key buf buf_off in
f off key;
Entry.encoded_size)
end
module Scratch = struct
type t = { buffer : bytes (** [Bytes.length buf = Entry.encoded_size] *) }
[@@unboxed]
let create () = { buffer = Bytes.create Entry.encoded_size }
end
type t = {
io : IO.t; (** The disk file handler *)
append_io : string -> unit; (** Pre-allocated [IO.append io] closure *)
mutable hashtbl : int63 Small_list.t Array.t;
(** Hashtable of (key, value) pairs in [io], stored using just their
file offsets for memory compactness. Length is always a power of
two. *)
mutable bucket_count_log2 : int;
(** Invariant: equal to [log_2 (Array.length hashtbl)] *)
mutable cardinal : int;
}
let io t = t.io
let cardinal t = t.cardinal
let clear_memory t =
t.hashtbl <- [| Small_list.empty |];
t.bucket_count_log2 <- 0;
t.cardinal <- 0
let clear ~generation ?hook ~reopen t =
IO.clear ~generation ?hook ~reopen t.io;
clear_memory t
let close t =
IO.close t.io;
clear_memory t
let flush ?no_callback ~with_fsync t = IO.flush ?no_callback ~with_fsync t.io
let key_of_offset t (scratch : Scratch.t) off =
let len = Key.encoded_size in
let r = IO.read t.io ~off ~len scratch.buffer in
assert (r = len);
fst (Entry.decode_key (Bytes.unsafe_to_string scratch.buffer) 0)
let entry_of_offset t (scratch : Scratch.t) off =
let len = Entry.encoded_size in
let r = IO.read t.io ~off ~len scratch.buffer in
assert (r = Entry.encoded_size);
Entry.decode (Bytes.unsafe_to_string scratch.buffer) 0
let elt_index t key =
let unneeded_bits = Key.hash_size - t.bucket_count_log2 in
(Key.hash key lsr unneeded_bits) land ((1 lsl t.bucket_count_log2) - 1)
let resize t scratch =
t.bucket_count_log2 <- t.bucket_count_log2 + 1;
let new_bucket_count = 1 lsl t.bucket_count_log2 in
if new_bucket_count > Sys.max_array_length then
Fmt.failwith
"Log_file.resize: can't construct a hashtbl with %d buckets \
(Sys.max_array_length = %d)"
new_bucket_count Sys.max_array_length;
let new_hashtbl = Array.make new_bucket_count Small_list.empty in
ArrayLabels.iteri t.hashtbl ~f:(fun i bucket ->
let bucket_2i, bucket_2i_plus_1 =
Small_list.to_list bucket
|> List.partition (fun offset ->
let key = key_of_offset t scratch offset in
let new_index = elt_index t key in
assert (new_index lsr 1 = i);
new_index land 1 = 0)
in
new_hashtbl.(2 * i) <- Small_list.of_list bucket_2i;
new_hashtbl.((2 * i) + 1) <- Small_list.of_list bucket_2i_plus_1);
t.hashtbl <- new_hashtbl
(** Replace implementation that only updates in-memory state (and doesn't
write the binding to disk). *)
let replace_memory t scratch key offset =
if t.cardinal > 2 * Array.length t.hashtbl then resize t scratch;
let elt_idx = elt_index t key in
let bucket = t.hashtbl.(elt_idx) in
let bucket =
let key_found = ref false in
let bucket' =
Small_list.map bucket ~f:(fun offset' ->
if !key_found then
offset'
else
let key' = key_of_offset t scratch offset' in
match Key.equal key key' with
| false -> offset'
| true ->
key_found := true;
offset)
in
match !key_found with
| true ->
bucket'
| false ->
t.cardinal <- t.cardinal + 1;
Small_list.cons offset bucket
in
t.hashtbl.(elt_idx) <- bucket
let replace t key value =
let offset = IO.offset t.io in
Entry.encode' key value t.append_io;
replace_memory t (Scratch.create ()) key offset
let sync_entries ~min t =
let scratch = Scratch.create () in
IO.iter_keys ~min
(fun offset key -> replace_memory t scratch key offset)
t.io
let reload t =
clear_memory t;
sync_entries ~min:Int63.zero t
let create io =
let cardinal = Int63.(to_int_exn (IO.offset io / Entry.encoded_sizeL)) in
let bucket_count_log2, bucket_count =
let rec aux n_log2 n =
if n >= cardinal then (n_log2, n)
else if n * 2 > Sys.max_array_length then (n_log2, n)
else aux (n_log2 + 1) (n * 2)
in
aux 4 16
in
let hashtbl = Array.make bucket_count Small_list.empty in
let t =
{ io; append_io = IO.append io; hashtbl; bucket_count_log2; cardinal }
in
let scratch = Scratch.create () in
IO.iter_keys (fun offset key -> replace_memory t scratch key offset) io;
t
let find t key =
let elt_idx = elt_index t key in
let bucket = t.hashtbl.(elt_idx) in
let scratch = Scratch.create () in
Small_list.find_map bucket ~f:(fun offset ->
let entry = entry_of_offset t scratch offset in
match Key.equal key entry.key with
| false -> None
| true -> Some entry.value)
|> function
| None -> raise Not_found
| Some x -> x
let fold t ~f ~init =
let scratch = Scratch.create () in
ArrayLabels.fold_left t.hashtbl ~init ~f:(fun acc bucket ->
Small_list.fold_left bucket ~init:acc ~f:(fun acc offset ->
let entry = entry_of_offset t scratch offset in
f acc entry))
let iter t ~f =
let scratch = Scratch.create () in
ArrayLabels.iter t.hashtbl ~f:(fun bucket ->
Small_list.iter bucket ~f:(fun offset ->
f (entry_of_offset t scratch offset)))
let to_sorted_seq t =
let scratch = Scratch.create () in
Array.to_seq t.hashtbl
|> Seq.flat_map (fun bucket ->
let arr =
Small_list.to_array bucket
|> Array.map (fun off -> entry_of_offset t scratch off)
in
Array.sort Entry.compare arr;
Array.to_seq arr)
end