Source file transaction.ml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
let debug fmt = Logging.debug "transaction" fmt
open Junk
let none = 0l
let test_eagain = ref false
let check_parents_perms_identical root1 root2 path =
let hierarch = Store.Path.get_hierarchy path in
let permdiff = List.fold_left (fun acc path ->
let n1 = Store.lookup root1 path
and n2 = Store.lookup root2 path in
match n1, n2 with
| Some n1, Some n2 ->
(Store.Node.get_perms n1) <> (Store.Node.get_perms n2) || acc
| _ ->
true || acc
) false hierarch in
(not permdiff)
let get_lowest path1 path2 =
match path2 with
| None -> Some path1
| Some path2 -> Some (Store.Path.get_common_prefix path1 path2)
let test_coalesce oldroot currentroot path =
let oldnode = Store.lookup oldroot path
and currentnode = Store.lookup currentroot path in
match oldnode, currentnode with
| (Some oldnode), (Some currentnode) ->
if oldnode == currentnode then (
check_parents_perms_identical oldroot currentroot path
) else (
false
)
| None, None -> (
let pnode = Store.lookup currentroot (Store.Path.get_parent path) in
match pnode with
| None -> false
| Some _pnode -> true
)
| _ ->
false
let can_coalesce oldroot currentroot path =
try test_coalesce oldroot currentroot path with _ -> false
type ty = No | Full of (int32 * Store.Node.t * Store.t)
type t = {
ty: ty;
store: Store.t;
quota: Quota.t;
mutable paths: (Xs_protocol.Op.t * Store.Name.t) list;
mutable operations: (Xs_protocol.Request.payload * Xs_protocol.Response.payload) list;
mutable read_lowpath: Store.Path.t option;
mutable write_lowpath: Store.Path.t option;
}
let make id store =
let ty = if id = none then No else Full(id, store.Store.root, store) in
{
ty = ty;
store = if id = none then store else Store.copy store;
quota = Quota.copy store.Store.quota;
paths = [];
operations = [];
read_lowpath = None;
write_lowpath = None;
}
let get_id t = match t.ty with No -> none | Full (id, _, _) -> id
let get_store t = t.store
let get_paths t = t.paths
let add_wop t ty path = t.paths <- (ty, Store.Path.to_name path) :: t.paths
let add_operation t request response = t.operations <- (request, response) :: t.operations
let get_operations t = List.rev t.operations
let set_read_lowpath t path = t.read_lowpath <- get_lowest path t.read_lowpath
let set_write_lowpath t path = t.write_lowpath <- get_lowest path t.write_lowpath
let exists t _perms path = Store.exists t.store path
let write t creator perm path value =
let path_existed = exists t perm path in
Store.write t.store creator perm path value;
if path_existed
then set_write_lowpath t path
else set_write_lowpath t (Store.Path.get_parent path);
add_wop t Xs_protocol.Op.Write path
let mkdir ?(with_watch=true) t creator perm path =
Store.mkdir t.store creator perm path;
set_write_lowpath t path;
if with_watch then
add_wop t Xs_protocol.Op.Mkdir path
let setperms t perm path perms =
Store.setperms t.store perm path perms;
set_write_lowpath t path;
add_wop t Xs_protocol.Op.Setperms path
let rm t perm path =
Store.rm t.store perm path;
set_write_lowpath t (Store.Path.get_parent path);
add_wop t Xs_protocol.Op.Rm path
let list t perm path =
let r = Store.ls t.store perm path in
set_read_lowpath t path;
r
let read t perm path =
let r = Store.read t.store perm path in
set_read_lowpath t path;
r
let getperms t perm path =
let r = Store.getperms t.store perm path in
set_read_lowpath t path;
r
let commit ~con t =
let has_write_ops = List.length t.paths > 0 in
let has_coalesced = ref false in
let has_commited =
match t.ty with
| No -> true
| Full (_id, oldroot, cstore) ->
let commit_partial oldroot cstore store =
let readpath_ok = match t.read_lowpath with
| None -> true
| Some path -> can_coalesce oldroot cstore.Store.root path in
let writepath_ok = match t.write_lowpath with
| None -> true
| Some path -> can_coalesce oldroot cstore.Store.root path in
if readpath_ok && writepath_ok then (
maybe (fun p ->
let n = Store.lookup store.Store.root p in
maybe (fun n -> Store.set_node cstore p n t.quota store.Store.quota) n;
Logging.write_coalesce ~tid:(get_id t) ~con (Store.Path.to_string p);
) t.write_lowpath;
maybe (fun p ->
Logging.read_coalesce ~tid:(get_id t) ~con (Store.Path.to_string p)
) t.read_lowpath;
has_coalesced := true;
cstore.Store.stat_transaction_coalesce <- cstore.Store.stat_transaction_coalesce + 1;
true
) else (
cstore.Store.stat_transaction_abort <- cstore.Store.stat_transaction_abort + 1;
false
)
in
let try_commit oldroot cstore store =
if oldroot == cstore.Store.root then (
if has_write_ops then (
Store.set_root cstore store.Store.root;
Store.set_quota cstore store.Store.quota
);
true
) else
commit_partial oldroot cstore store
in
if !test_eagain && Random.int 3 = 0 then
false
else
try_commit oldroot cstore t.store
in
if not has_commited
then Logging.conflict ~tid:(get_id t) ~con
else if not !has_coalesced
then Logging.commit ~tid:(get_id t) ~con;
has_commited