Source file store_migration.ml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
open Store_version
type version_result = Version_known | Unintialized_version
let messages_store_location ~storage_dir =
let open Filename.Infix in
storage_dir // "messages"
let version_of_unversioned_store ~storage_dir =
let open Lwt_syntax in
let path = messages_store_location ~storage_dir in
let* messages_store_v0 = Store_v0.Messages.load ~path ~cache_size:1 Read_only
and* messages_store_v1 =
Store_v1.Messages.load ~path ~cache_size:1 Read_only
in
let cleanup () =
let open Lwt_syntax in
let* (_ : unit tzresult) =
match messages_store_v0 with
| Error _ -> Lwt.return_ok ()
| Ok s -> Store_v0.Messages.close s
and* (_ : unit tzresult) =
match messages_store_v1 with
| Error _ -> Lwt.return_ok ()
| Ok s -> Store_v1.Messages.close s
in
return_unit
in
let guess_version () =
let open Lwt_result_syntax in
match (messages_store_v0, messages_store_v1) with
| Ok _, Error _ -> return_some V0
| Error _, Ok _ -> return_some V1
| Ok _, Ok _ ->
return_none
| Error _, Error _ ->
failwith
"Cannot determine unversioned store version (no messages decodable)"
in
Lwt.finalize guess_version cleanup
let version_of_store ~storage_dir =
let open Lwt_result_syntax in
let* version = Store_version.read_version_file ~dir:storage_dir in
match version with
| Some v -> return_some (v, Version_known)
| None ->
let+ v = version_of_unversioned_store ~storage_dir in
Option.map (fun v -> (v, Unintialized_version)) v
module type MIGRATION_ACTIONS = sig
type from_store
type dest_store
val migrate_block_action :
from_store -> dest_store -> Sc_rollup_block.t -> unit tzresult Lwt.t
val final_actions :
storage_dir:string ->
tmp_dir:string ->
from_store ->
dest_store ->
unit tzresult Lwt.t
end
module type S = sig
val migrate : storage_dir:string -> unit tzresult Lwt.t
end
let migrations = Stdlib.Hashtbl.create 7
module Make
(S_from : Store_sig.S)
(S_dest : Store_sig.S)
(Actions : MIGRATION_ACTIONS
with type from_store := Store_sigs.ro S_from.t
and type dest_store := Store_sigs.rw S_dest.t) : S = struct
let tmp_dir ~storage_dir =
Filename.concat (Configuration.default_storage_dir storage_dir)
@@ Format.asprintf
"migration_%a_%a"
Store_version.pp
S_from.version
Store_version.pp
S_dest.version
let migrate ~storage_dir =
let open Lwt_result_syntax in
let* source_store =
S_from.load Read_only ~l2_blocks_cache_size:1 storage_dir
in
let tmp_dir = tmp_dir ~storage_dir in
let*! tmp_dir_exists = Lwt_utils_unix.dir_exists tmp_dir in
let*? () =
if tmp_dir_exists then
error_with
"Store migration (from %a to %a) is already ongoing. Wait for it to \
finish or remove %S and restart."
Store_version.pp
S_from.version
Store_version.pp
S_dest.version
tmp_dir
else Ok ()
in
let*! () = Lwt_utils_unix.create_dir tmp_dir in
let* dest_store = S_dest.load Read_write ~l2_blocks_cache_size:1 tmp_dir in
let cleanup () =
let open Lwt_syntax in
let* (_ : unit tzresult) = S_from.close source_store
and* (_ : unit tzresult) = S_dest.close dest_store in
return_unit
in
let run_migration () =
let* () =
S_from.iter_l2_blocks
source_store
(Actions.migrate_block_action source_store dest_store)
in
let* () =
Actions.final_actions ~storage_dir ~tmp_dir source_store dest_store
in
let*! () = Lwt_utils_unix.remove_dir tmp_dir in
Store_version.write_version_file ~dir:storage_dir S_dest.version
in
Lwt.finalize run_migration cleanup
let () = Stdlib.Hashtbl.add migrations S_from.version (S_dest.version, migrate)
end
let migration_path ~from ~dest =
let rec path acc from dest =
if from = dest then Some (List.rev acc)
else
let first_steps =
Stdlib.Hashtbl.find_all migrations from
|> List.stable_sort (fun (va, _) (vb, _) ->
if va > dest && vb > dest then Stdlib.compare va vb
else if va > dest then 1
else if vb > dest then -1
else Stdlib.compare vb va)
in
let paths =
List.filter_map
(fun (step, migration) ->
path ((from, step, migration) :: acc) step dest)
first_steps
in
List.stable_sort List.compare_lengths paths |> List.hd
in
path [] from dest
let maybe_run_migration ~storage_dir =
let open Lwt_result_syntax in
let* current_version = version_of_store ~storage_dir in
let last_version = Store.version in
match (current_version, last_version) with
| None, _ ->
Store_version.write_version_file ~dir:storage_dir last_version
| Some (current, versioned), last when last = current -> (
match versioned with
| Unintialized_version ->
Store_version.write_version_file ~dir:storage_dir last_version
| Version_known ->
return_unit)
| Some (current, _), last -> (
let migrations = migration_path ~from:current ~dest:last in
match migrations with
| None ->
failwith
"Store version %a is not supported by this rollup node because \
there is no migration path from it to %a."
Store_version.pp
current
Store_version.pp
last
| Some migrations ->
Format.printf "Starting store migration@." ;
let+ () =
List.iter_es
(fun (vx, vy, migrate) ->
Format.printf
"- Migrating store from %a to %a@."
Store_version.pp
vx
Store_version.pp
vy ;
migrate ~storage_dir)
migrations
in
Format.printf "Store migration completed@.")
module V1_migrations = struct
let convert_store_messages
(messages, (block_hash, timestamp, number_of_messages)) =
( messages,
(false , block_hash, timestamp, number_of_messages)
)
let migrate_messages (v0_store : _ Store_v0.t) (v1_store : _ Store_v1.t)
(l2_block : Sc_rollup_block.t) =
let open Lwt_result_syntax in
let* v0_messages =
Store_v0.Messages.read v0_store.messages l2_block.header.inbox_witness
in
match v0_messages with
| None -> return_unit
| Some v0_messages ->
let value, = convert_store_messages v0_messages in
Store_v1.Messages.append
v1_store.messages
~key:l2_block.header.inbox_witness
~header
~value
let migrate_dal_processed_slots_irmin (v1_store : _ Store_v1.t) =
let open Lwt_syntax in
let open Store_v1 in
let info () =
let date =
Tezos_base.Time.(
System.now () |> System.to_protocol |> Protocol.to_seconds)
in
let author =
Format.asprintf
"Rollup node %a"
Tezos_version_parser.pp
Tezos_version_value.Current_git_info.version
in
let message = "Migration store from v0 to v1" in
Irmin_store.Raw_irmin.Info.v ~author ~message date
in
let store = Irmin_store.Raw_irmin.unsafe v1_store.irmin_store in
let old_root = Store_v0.Dal_processed_slots.path in
let new_root = Dal_slots_statuses.path in
let* old_tree = Irmin_store.Raw_irmin.find_tree store old_root in
match old_tree with
| None -> return_unit
| Some _ ->
Irmin_store.Raw_irmin.with_tree_exn
~info
store
new_root
(fun _new_tree -> return old_tree)
let final_actions ~storage_dir ~tmp_dir (_v0_store : _ Store_v0.t)
(v1_store : _ Store_v1.t) =
let open Lwt_result_syntax in
let*! () =
Lwt_utils_unix.remove_dir (messages_store_location ~storage_dir)
in
let*! () =
Lwt_unix.rename
(messages_store_location ~storage_dir:tmp_dir)
(messages_store_location ~storage_dir)
in
let*! () = migrate_dal_processed_slots_irmin v1_store in
return_unit
module From_v0 =
Make (Store_v0) (Store_v1)
(struct
let migrate_block_action = migrate_messages
let final_actions = final_actions
end)
end
module V2_migrations = struct
let messages_store_location ~storage_dir =
let open Filename.Infix in
storage_dir // "messages"
let commitments_store_location ~storage_dir =
let open Filename.Infix in
storage_dir // "commitments"
let inboxes_store_location ~storage_dir =
let open Filename.Infix in
storage_dir // "inboxes"
let migrate_messages read_messages (v2_store : _ Store_v2.t)
(l2_block : Sc_rollup_block.t) =
let open Lwt_result_syntax in
let* v1_messages = read_messages l2_block.header.inbox_witness in
match v1_messages with
| None -> return_unit
| Some (messages, ) ->
let = l2_block.header.block_hash in
Store_v2.Messages.append
v2_store.messages
~key:l2_block.header.inbox_witness
~header
~value:messages
let migrate_commitment read_commitment (v2_store : _ Store_v2.t)
(l2_block : Sc_rollup_block.t) =
let open Lwt_result_syntax in
match l2_block.header.commitment_hash with
| None -> return_unit
| Some commitment_hash -> (
let* v1_commitment = read_commitment commitment_hash in
match v1_commitment with
| None -> return_unit
| Some commitment ->
Store_v2.Commitments.append
v2_store.commitments
~key:commitment_hash
~value:commitment)
let migrate_inbox read_inbox (v2_store : _ Store_v2.t)
(l2_block : Sc_rollup_block.t) =
let open Lwt_result_syntax in
let* v1_inbox = read_inbox l2_block.header.inbox_hash in
match v1_inbox with
| None -> return_unit
| Some (inbox, ()) ->
Store_v2.Inboxes.append
v2_store.inboxes
~key:l2_block.header.inbox_hash
~value:inbox
let final_actions ~storage_dir ~tmp_dir _ _ =
let open Lwt_result_syntax in
let*! () =
Lwt_utils_unix.remove_dir (messages_store_location ~storage_dir)
in
let*! () =
Lwt_utils_unix.remove_dir (commitments_store_location ~storage_dir)
in
let*! () =
Lwt_utils_unix.remove_dir (inboxes_store_location ~storage_dir)
in
let*! () =
Lwt_unix.rename
(messages_store_location ~storage_dir:tmp_dir)
(messages_store_location ~storage_dir)
in
let*! () =
Lwt_unix.rename
(commitments_store_location ~storage_dir:tmp_dir)
(commitments_store_location ~storage_dir)
in
let*! () =
Lwt_unix.rename
(inboxes_store_location ~storage_dir:tmp_dir)
(inboxes_store_location ~storage_dir)
in
return_unit
module From_v1 =
Make (Store_v1) (Store_v2)
(struct
let migrate_block_action v1_store v2_store l2_block =
let open Lwt_result_syntax in
let* () =
migrate_messages
Store_v1.(Messages.read v1_store.messages)
v2_store
l2_block
and* () =
migrate_commitment
Store_v1.(Commitments.find v1_store.commitments)
v2_store
l2_block
and* () =
migrate_inbox
Store_v1.(Inboxes.read v1_store.inboxes)
v2_store
l2_block
in
return_unit
let final_actions = final_actions
end)
module From_v0 =
Make (Store_v0) (Store_v2)
(struct
let migrate_block_action v0_store v2_store l2_block =
let open Lwt_result_syntax in
let* () =
migrate_messages
Store_v0.(Messages.read v0_store.messages)
v2_store
l2_block
and* () =
migrate_commitment
Store_v0.(Commitments.find v0_store.commitments)
v2_store
l2_block
and* () =
migrate_inbox
Store_v0.(Inboxes.read v0_store.inboxes)
v2_store
l2_block
in
return_unit
let final_actions = final_actions
end)
end
module V3_migrations = struct
let levels_store_location ~storage_dir =
let open Filename.Infix in
storage_dir // "levels_to_hashes"
let recompute_level (v3_store : _ Store_v3.t) (l2_block : Sc_rollup_block.t) =
Store.Levels_to_hashes.add
~flush:true
v3_store.levels_to_hashes
l2_block.header.level
l2_block.header.block_hash
let final_actions ~storage_dir ~tmp_dir _ _ =
let open Lwt_result_syntax in
let*! () = Lwt_utils_unix.remove_dir (levels_store_location ~storage_dir) in
let*! () =
Lwt_unix.rename
(levels_store_location ~storage_dir:tmp_dir)
(levels_store_location ~storage_dir)
in
return_unit
module From_v2 =
Make (Store_v2) (Store_v3)
(struct
let migrate_block_action _v2_store v3_store l2_block =
recompute_level v3_store l2_block
let final_actions = final_actions
end)
end