Source file append_only_file.ml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
open Import
include Append_only_file_intf
module Make (Io : Io.S) (Errs : Io_errors.S with module Io = Io) = struct
module Io = Io
module Errs = Errs
let auto_flush_threshold = 16_384
type rw_perm = { buf : Buffer.t; mutable fsync_required : bool }
(** [rw_perm] contains the data necessary to operate in readwrite mode. *)
type t = {
io : Io.t;
mutable persisted_end_poff : int63;
dead_header_size : int63;
rw_perm : rw_perm option;
}
let create_rw_perm () = Some { buf = Buffer.create 0; fsync_required = false }
let create_rw ~path ~overwrite =
let open Result_syntax in
let+ io = Io.create ~path ~overwrite in
let persisted_end_poff = Int63.zero in
{
io;
persisted_end_poff;
dead_header_size = Int63.zero;
rw_perm = create_rw_perm ();
}
(** A store is consistent if the real offset of the suffix/dict files is the
one recorded in the control file. When opening the store, the offset from
the control file is passed as the [end_poff] argument to the [open_ro],
[open_rw] functions. The [end_poff] from the control file is then used as
the real offset.
In case of a crash, we can only recover if the [end_poff] is smaller than
the real offset. We cannot recover otherwise, because we have no
guarantees that the last object fsynced to disk is written entirely to
disk. *)
let check_consistent_store ~end_poff ~ io =
let open Result_syntax in
let* real_offset = Io.read_size io in
let = Int63.of_int dead_header_size in
let =
Int63.Syntax.(real_offset - dead_header_size)
in
if real_offset_without_header < end_poff then Error `Inconsistent_store
else (
if real_offset_without_header > end_poff then
[%log.warn
"The end offset in the control file %a is smaller than the offset on \
disk %a for %s; the store was closed in a inconsistent state."
Int63.pp end_poff Int63.pp real_offset_without_header (Io.path io)];
Ok ())
let open_rw ~path ~end_poff ~ =
let open Result_syntax in
let* io = Io.open_ ~path ~readonly:false in
let+ () = check_consistent_store ~end_poff ~dead_header_size io in
let persisted_end_poff = end_poff in
let = Int63.of_int dead_header_size in
{ io; persisted_end_poff; dead_header_size; rw_perm = create_rw_perm () }
let open_ro ~path ~end_poff ~ =
let open Result_syntax in
let* io = Io.open_ ~path ~readonly:true in
let+ () = check_consistent_store ~end_poff ~dead_header_size io in
let persisted_end_poff = end_poff in
let = Int63.of_int dead_header_size in
{ io; persisted_end_poff; dead_header_size; rw_perm = None }
let empty_buffer = function
| { rw_perm = Some { buf; _ }; _ } when Buffer.length buf > 0 -> false
| _ -> true
let close t =
if not @@ empty_buffer t then Error `Pending_flush else Io.close t.io
let readonly t = Io.readonly t.io
let path t = Io.path t.io
let end_poff t =
match t.rw_perm with
| None -> t.persisted_end_poff
| Some rw_perm ->
let open Int63.Syntax in
t.persisted_end_poff + (Buffer.length rw_perm.buf |> Int63.of_int)
let refresh_end_poff t new_end_poff =
match t.rw_perm with
| Some _ -> Error `Rw_not_allowed
| None ->
t.persisted_end_poff <- new_end_poff;
Ok ()
let flush t =
match t.rw_perm with
| None -> Error `Ro_not_allowed
| Some rw_perm ->
let open Result_syntax in
let open Int63.Syntax in
let s = Buffer.contents rw_perm.buf in
let off = t.persisted_end_poff + t.dead_header_size in
let+ () = Io.write_string t.io ~off s in
t.persisted_end_poff <-
t.persisted_end_poff + (String.length s |> Int63.of_int);
Buffer.truncate rw_perm.buf 0;
rw_perm.fsync_required <- true
let fsync t =
match t.rw_perm with
| None -> Error `Ro_not_allowed
| Some rw ->
assert (Buffer.length rw.buf = 0);
if rw.fsync_required then
let open Result_syntax in
let+ () = Io.fsync t.io in
rw.fsync_required <- false
else Ok ()
let read_exn t ~off ~len b =
let open Int63.Syntax in
let off' = off + Int63.of_int len in
if off' > t.persisted_end_poff then
raise (Errors.Pack_error `Read_out_of_bounds);
let off = off + t.dead_header_size in
Io.read_exn t.io ~off ~len b
let read_to_string t ~off ~len =
let open Int63.Syntax in
let off' = off + Int63.of_int len in
if off' > t.persisted_end_poff then Error `Read_out_of_bounds
else
let off = off + t.dead_header_size in
Io.read_to_string t.io ~off ~len
let append_exn t s =
match t.rw_perm with
| None -> raise Errors.RO_not_allowed
| Some rw_perm ->
assert (Buffer.length rw_perm.buf < auto_flush_threshold);
Buffer.add_string rw_perm.buf s;
if Buffer.length rw_perm.buf >= auto_flush_threshold then
flush t |> Errs.raise_if_error
end