1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
type sign = Unsigned | Signed
type endian = Le | Be | Ne
type 'a v =
| Vi8 : sign -> int v
| Vi16 : sign * endian -> int v
| Vi32 : endian -> int32 v
| Vi64 : endian -> int64 v
| Vi128 : string v
type value = Value : 'a v * 'a -> value
let length_of_value : type a. a v -> int = function
| Vi8 _ -> 1
| Vi16 _ -> 2
| Vi32 _ -> 4
| Vi64 _ -> 8
| Vi128 -> 16
let unsafe_value_into_bytes : type a. ?off:int -> bytes -> a v -> a -> unit =
fun ?(off = 0) buf k v ->
match k with
| Vi8 Unsigned -> Bytes.set_uint8 buf off v
| Vi8 Signed -> Bytes.set_int8 buf off v
| Vi16 (Unsigned, Le) -> Bytes.set_uint16_le buf off v
| Vi16 (Unsigned, Be) -> Bytes.set_uint16_be buf off v
| Vi16 (Unsigned, Ne) -> Bytes.set_uint16_ne buf off v
| Vi16 (Signed, Le) -> Bytes.set_int16_le buf off v
| Vi16 (Signed, Be) -> Bytes.set_int16_be buf off v
| Vi16 (Signed, Ne) -> Bytes.set_int16_ne buf off v
| Vi32 Le -> Bytes.set_int32_le buf off v
| Vi32 Be -> Bytes.set_int32_be buf off v
| Vi32 Ne -> Bytes.set_int32_ne buf off v
| Vi64 Le -> Bytes.set_int64_le buf off v
| Vi64 Be -> Bytes.set_int64_be buf off v
| Vi64 Ne -> Bytes.set_int64_ne buf off v
| Vi128 -> Bytes.blit_string v 0 buf off 16
let unsafe_value_into_bstr : type a. ?off:int -> Bstr.t -> a v -> a -> unit =
fun ?(off = 0) buf k v ->
match k with
| Vi8 Unsigned -> Bstr.set_uint8 buf off v
| Vi8 Signed -> Bstr.set_int8 buf off v
| Vi16 (Unsigned, Le) -> Bstr.set_uint16_le buf off v
| Vi16 (Unsigned, Be) -> Bstr.set_uint16_be buf off v
| Vi16 (Unsigned, Ne) -> Bstr.set_uint16_ne buf off v
| Vi16 (Signed, Le) -> Bstr.set_int16_le buf off v
| Vi16 (Signed, Be) -> Bstr.set_int16_be buf off v
| Vi16 (Signed, Ne) -> Bstr.set_int16_ne buf off v
| Vi32 Le -> Bstr.set_int32_le buf off v
| Vi32 Be -> Bstr.set_int32_be buf off v
| Vi32 Ne -> Bstr.set_int32_ne buf off v
| Vi64 Le -> Bstr.set_int64_le buf off v
| Vi64 Be -> Bstr.set_int64_be buf off v
| Vi64 Ne -> Bstr.set_int64_ne buf off v
| Vi128 -> Bstr.blit_from_string v ~src_off:0 buf ~dst_off:off ~len:16
let value_to_string k v =
let len = length_of_value k in
let buf = Bytes.create len in
unsafe_value_into_bytes ~off:0 buf k v;
Bytes.unsafe_to_string buf
type 'fd writev = 'fd -> pos:int -> Bstr.t list -> unit
type 'fd t = {
cache: 'fd Cachet.t
; pagesize: int
; pipeline: (int * value) Dllist.t
; mutable areas: Diet.t
; fd: 'fd
; number_of_pages: int
; map: 'fd Cachet.map
; writev: 'fd writev
}
let unsafe_ctz n =
let t = ref 1 in
let r = ref 0 in
while n land !t = 0 do
t := !t lsl 1;
incr r
done;
!r
let make ?cachesize ?(pagesize = 1 lsl 12) ~map ~writev ~number_of_pages fd =
let cache = Cachet.make ?cachesize ~pagesize ~map fd in
let pipeline = Dllist.create () in
let pagesize = unsafe_ctz pagesize in
let areas = Diet.empty in
{ cache; pagesize; pipeline; areas; fd; number_of_pages; map; writev }
let cache { cache; _ } = cache
let unroll : type a. 'fd t -> at:int -> a v -> a =
fun t ~at k ->
let buf = Bytes.make (16 * 3) '\000' in
let len = length_of_value k in
Cachet.blit_to_bytes t.cache ~src_off:at buf ~dst_off:16 ~len;
let a = at - 16 and b = at + 16 in
let fn node =
let at', Value (k, v) = Dllist.data node in
if at' >= a && at' < b then begin
let roff = if at' >= at then 16 + (at' - at) else 16 - (at - at') in
unsafe_value_into_bytes ~off:roff buf k v
end
in
Dllist.iter fn t.pipeline;
match k with
| Vi8 Unsigned -> Bytes.get_uint8 buf 16
| Vi8 Signed -> Bytes.get_int8 buf 16
| Vi16 (Unsigned, Le) -> Bytes.get_uint16_le buf 16
| Vi16 (Unsigned, Be) -> Bytes.get_uint16_be buf 16
| Vi16 (Unsigned, Ne) -> Bytes.get_uint16_ne buf 16
| Vi16 (Signed, Le) -> Bytes.get_int16_le buf 16
| Vi16 (Signed, Be) -> Bytes.get_int16_be buf 16
| Vi16 (Signed, Ne) -> Bytes.get_int16_ne buf 16
| Vi32 Le -> Bytes.get_int32_le buf 16
| Vi32 Be -> Bytes.get_int32_be buf 16
| Vi32 Ne -> Bytes.get_int32_ne buf 16
| Vi64 Le -> Bytes.get_int64_le buf 16
| Vi64 Be -> Bytes.get_int64_le buf 16
| Vi64 Ne -> Bytes.get_int64_le buf 16
| Vi128 -> Bytes.sub_string buf 16 16
let get : type a. 'fd t -> int -> a v -> a =
fun t offset k ->
let len = length_of_value k in
let z = Diet.inter (Diet.singleton offset (offset + len)) t.areas in
if Diet.is_empty z then
match k with
| Vi8 Unsigned -> Cachet.get_uint8 t.cache offset
| Vi8 Signed -> Cachet.get_int8 t.cache offset
| Vi16 (Unsigned, Le) -> Cachet.get_uint16_le t.cache offset
| Vi16 (Unsigned, Be) -> Cachet.get_uint16_be t.cache offset
| Vi16 (Unsigned, Ne) -> Cachet.get_uint16_ne t.cache offset
| Vi16 (Signed, Le) -> Cachet.get_int16_le t.cache offset
| Vi16 (Signed, Be) -> Cachet.get_int16_be t.cache offset
| Vi16 (Signed, Ne) -> Cachet.get_int16_ne t.cache offset
| Vi32 Le -> Cachet.get_int32_le t.cache offset
| Vi32 Be -> Cachet.get_int32_be t.cache offset
| Vi32 Ne -> Cachet.get_int32_ne t.cache offset
| Vi64 Le -> Cachet.get_int64_le t.cache offset
| Vi64 Be -> Cachet.get_int64_be t.cache offset
| Vi64 Ne -> Cachet.get_int64_ne t.cache offset
| Vi128 -> Cachet.get_string t.cache ~len:16 offset
else unroll t ~at:offset k
let set : type a. 'fd t -> int -> a v -> a -> unit =
fun t off k v ->
let len = length_of_value k in
t.areas <- Diet.add off (off + len) t.areas;
Dllist.add (off, Value (k, v)) t.pipeline
let persist t off bstrs (Value (k, v)) =
let str = value_to_string k v in
let rec go src_off off =
if src_off < String.length str then
let idx = off lsr t.pagesize in
let dst_off = off land ((1 lsl t.pagesize) - 1) in
if idx < Array.length bstrs then begin
let rem_in_page = (1 lsl t.pagesize) - dst_off in
let rem_in_str = String.length str - src_off in
let len = Int.min rem_in_page rem_in_str in
Bstr.blit_from_string str ~src_off bstrs.(idx) ~dst_off ~len;
go (src_off + len) (off + len)
end
in
go 0 off
module Interval = struct
type t = Inclusion | Overlap | Disjoint
let compare (x, y) (u, v) =
if (x >= u && y <= v) || (u >= x && v <= y) then Inclusion
else if y < u || x > v then Disjoint
else Overlap
end
let area t i0 =
let fn ((u, v) as i1) ((x, y) as i0) =
match Interval.compare i0 i1 with
| Disjoint -> i0
| Inclusion | Overlap ->
let a = Int.min x u in
let b = Int.max y v in
(a, b)
in
Diet.fold fn t.areas i0
let persist t ~off ~len =
let off, off_len = area t (off, off + len) in
let len = off_len - off in
let p0 = off lsr t.pagesize in
let p1 = Int.min ((off + len) lsr t.pagesize) (t.number_of_pages - 1) in
let number_of_pages = p1 - p0 + 1 in
let physical_address = p0 lsl t.pagesize in
let fn idx =
let pos = physical_address + (idx * (1 lsl t.pagesize)) in
t.map t.fd ~pos (1 lsl t.pagesize)
in
let bstrs = Array.init number_of_pages fn in
let to_write = ref false in
let top = ref (off + len) in
let fn node =
let off', (Value (k, _) as value) = Dllist.data node in
let len' = length_of_value k in
if off' >= off && off' + 1 <= off + len then begin
to_write := true;
top := Int.max !top (off' + len');
persist t (off' - physical_address) bstrs value;
Dllist.remove node
end
in
Dllist.iter fn t.pipeline;
t.areas <- Diet.remove (off, !top) t.areas;
if !to_write then t.writev t.fd ~pos:physical_address (Array.to_list bstrs);
if !to_write then Cachet.invalidate t.cache ~off ~len
type chunk = { off: int; len: int; rchunks: Bstr.t list }
let commit t =
let fn node acc =
let logical_address, _ = Dllist.data node in
((logical_address lsr t.pagesize) lsl t.pagesize) :: acc
in
let ps = Dllist.fold fn t.pipeline [] in
let ps = List.sort_uniq Int.compare ps in
let tbl = Hashtbl.create (List.length ps) in
let fn physical_address =
let bstr = t.map t.fd ~pos:physical_address (1 lsl t.pagesize) in
Hashtbl.add tbl physical_address bstr
in
List.iter fn ps;
let fn node =
let logical_address, Value (k, v) = Dllist.data node in
let len = length_of_value k in
let off = logical_address land ((1 lsl t.pagesize) - 1) in
if (1 lsl t.pagesize) - off >= len then
let physical_address = (logical_address lsr t.pagesize) lsl t.pagesize in
let bstr = Hashtbl.find tbl physical_address in
unsafe_value_into_bstr ~off bstr k v
else
let p0 = logical_address lsr t.pagesize in
let p1 = p0 + 1 in
let p0 = p0 lsl t.pagesize in
let p1 = p1 lsl t.pagesize in
let bstr0 = Hashtbl.find tbl p0 in
let bstr1 =
match Hashtbl.find_opt tbl p1 with
| Some bstr1 -> bstr1
| None ->
let bstr1 = t.map t.fd ~pos:p1 (1 lsl t.pagesize) in
Hashtbl.add tbl p1 bstr1; bstr1
in
let str = value_to_string k v in
let pre = (1 lsl t.pagesize) - off in
let rem = len - pre in
Bstr.blit_from_string str ~src_off:0 bstr0 ~dst_off:off ~len:pre;
Bstr.blit_from_string str ~src_off:pre bstr1 ~dst_off:0 ~len:rem
in
Dllist.iter fn t.pipeline;
Dllist.clear t.pipeline;
t.areas <- Diet.empty;
let fn p bstr acc = (p, bstr) :: acc in
let ps = Hashtbl.fold fn tbl [] in
let ps = List.sort (fun (a, _) (b, _) -> Int.compare a b) ps in
let ps =
List.map
(fun (off, chunk) -> { off; len= 1 lsl t.pagesize; rchunks= [ chunk ] })
ps
in
let coalesce (acc, curr) next =
if curr.off + curr.len == next.off then
let curr =
{
curr with
len= curr.len + (1 lsl t.pagesize)
; rchunks= List.hd next.rchunks :: curr.rchunks
}
in
(acc, curr)
else (curr :: acc, next)
in
match ps with
| [] -> ()
| p :: ps ->
let ps, p = List.fold_left coalesce ([], p) ps in
let fn { off; len; rchunks; _ } =
t.writev t.fd ~pos:off (List.rev rchunks);
Cachet.invalidate t.cache ~off ~len
in
List.iter fn (p :: ps)
let get_uint8 t off = get t off (Vi8 Unsigned)
let get_int8 t off = get t off (Vi8 Signed)
let get_uint16_ne t off = get t off (Vi16 (Unsigned, Ne))
let get_uint16_le t off = get t off (Vi16 (Unsigned, Le))
let get_uint16_be t off = get t off (Vi16 (Unsigned, Be))
let get_int16_ne t off = get t off (Vi16 (Signed, Ne))
let get_int16_le t off = get t off (Vi16 (Signed, Le))
let get_int16_be t off = get t off (Vi16 (Signed, Be))
let get_int32_ne t off = get t off (Vi32 Ne)
let get_int32_le t off = get t off (Vi32 Le)
let get_int32_be t off = get t off (Vi32 Be)
let get_int64_ne t off = get t off (Vi64 Ne)
let get_int64_le t off = get t off (Vi64 Le)
let get_int64_be t off = get t off (Vi64 Be)
let get_int128 t off = get t off Vi128
let set_int8 t off v = set t off (Vi8 Signed) v
let set_uint8 t off v = set t off (Vi8 Unsigned) v
let set_uint16_ne t off v = set t off (Vi16 (Unsigned, Ne)) v
let set_uint16_le t off v = set t off (Vi16 (Unsigned, Le)) v
let set_uint16_be t off v = set t off (Vi16 (Unsigned, Be)) v
let set_int16_ne t off v = set t off (Vi16 (Signed, Ne)) v
let set_int16_le t off v = set t off (Vi16 (Signed, Le)) v
let set_int16_be t off v = set t off (Vi16 (Signed, Be)) v
let set_int32_ne t off v = set t off (Vi32 Ne) v
let set_int32_le t off v = set t off (Vi32 Le) v
let set_int32_be t off v = set t off (Vi32 Be) v
let set_int64_ne t off v = set t off (Vi64 Ne) v
let set_int64_le t off v = set t off (Vi64 Le) v
let set_int64_be t off v = set t off (Vi64 Be) v
let set_int128 t off v = set t off Vi128 v