Source file store_migration.ml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
(*****************************************************************************)
(*                                                                           *)
(* Open Source License                                                       *)
(* Copyright (c) 2023 Functori, <contact@functori.com>                       *)
(*                                                                           *)
(* Permission is hereby granted, free of charge, to any person obtaining a   *)
(* copy of this software and associated documentation files (the "Software"),*)
(* to deal in the Software without restriction, including without limitation *)
(* the rights to use, copy, modify, merge, publish, distribute, sublicense,  *)
(* and/or sell copies of the Software, and to permit persons to whom the     *)
(* Software is furnished to do so, subject to the following conditions:      *)
(*                                                                           *)
(* The above copyright notice and this permission notice shall be included   *)
(* in all copies or substantial portions of the Software.                    *)
(*                                                                           *)
(* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR*)
(* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,  *)
(* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL   *)
(* THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER*)
(* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING   *)
(* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER       *)
(* DEALINGS IN THE SOFTWARE.                                                 *)
(*                                                                           *)
(*****************************************************************************)

open Store_version

type version_result = Version_known | Unintialized_version

let messages_store_location ~storage_dir =
  let open Filename.Infix in
  storage_dir // "messages"

let version_of_unversioned_store ~storage_dir =
  let open Lwt_syntax in
  let path = messages_store_location ~storage_dir in
  let* messages_store_v0 = Store_v0.Messages.load ~path ~cache_size:1 Read_only
  and* messages_store_v1 =
    Store_v1.Messages.load ~path ~cache_size:1 Read_only
  in
  let cleanup () =
    let open Lwt_syntax in
    let* (_ : unit tzresult) =
      match messages_store_v0 with
      | Error _ -> Lwt.return_ok ()
      | Ok s -> Store_v0.Messages.close s
    and* (_ : unit tzresult) =
      match messages_store_v1 with
      | Error _ -> Lwt.return_ok ()
      | Ok s -> Store_v1.Messages.close s
    in
    return_unit
  in
  let guess_version () =
    let open Lwt_result_syntax in
    match (messages_store_v0, messages_store_v1) with
    | Ok _, Error _ -> return_some V0
    | Error _, Ok _ -> return_some V1
    | Ok _, Ok _ ->
        (* Empty store, both loads succeed *)
        return_none
    | Error _, Error _ ->
        failwith
          "Cannot determine unversioned store version (no messages decodable)"
  in
  Lwt.finalize guess_version cleanup

let version_of_store ~storage_dir =
  let open Lwt_result_syntax in
  let* version = Store_version.read_version_file ~dir:storage_dir in
  match version with
  | Some v -> return_some (v, Version_known)
  | None ->
      let+ v = version_of_unversioned_store ~storage_dir in
      Option.map (fun v -> (v, Unintialized_version)) v

module type MIGRATION_ACTIONS = sig
  type from_store

  type dest_store

  val migrate_block_action :
    from_store -> dest_store -> Sc_rollup_block.t -> unit tzresult Lwt.t

  val final_actions :
    storage_dir:string ->
    tmp_dir:string ->
    from_store ->
    dest_store ->
    unit tzresult Lwt.t
end

module type S = sig
  val migrate : storage_dir:string -> unit tzresult Lwt.t
end

let migrations = Stdlib.Hashtbl.create 7

module Make
    (S_from : Store_sig.S)
    (S_dest : Store_sig.S)
    (Actions : MIGRATION_ACTIONS
                 with type from_store := Store_sigs.ro S_from.t
                  and type dest_store := Store_sigs.rw S_dest.t) : S = struct
  let tmp_dir ~storage_dir =
    Filename.concat (Configuration.default_storage_dir storage_dir)
    @@ Format.asprintf
         "migration_%a_%a"
         Store_version.pp
         S_from.version
         Store_version.pp
         S_dest.version

  let migrate ~storage_dir =
    let open Lwt_result_syntax in
    let* source_store =
      S_from.load Read_only ~l2_blocks_cache_size:1 storage_dir
    in
    let tmp_dir = tmp_dir ~storage_dir in
    let*! tmp_dir_exists = Lwt_utils_unix.dir_exists tmp_dir in
    let*? () =
      if tmp_dir_exists then
        error_with
          "Store migration (from %a to %a) is already ongoing. Wait for it to \
           finish or remove %S and restart."
          Store_version.pp
          S_from.version
          Store_version.pp
          S_dest.version
          tmp_dir
      else Ok ()
    in
    let*! () = Lwt_utils_unix.create_dir tmp_dir in
    let* dest_store = S_dest.load Read_write ~l2_blocks_cache_size:1 tmp_dir in
    let cleanup () =
      let open Lwt_syntax in
      let* (_ : unit tzresult) = S_from.close source_store
      and* (_ : unit tzresult) = S_dest.close dest_store in
      (* Don't remove migration dir to allow for later resume. *)
      return_unit
    in
    let run_migration () =
      let* () =
        S_from.iter_l2_blocks
          source_store
          (Actions.migrate_block_action source_store dest_store)
      in
      let* () =
        Actions.final_actions ~storage_dir ~tmp_dir source_store dest_store
      in
      let*! () = Lwt_utils_unix.remove_dir tmp_dir in
      Store_version.write_version_file ~dir:storage_dir S_dest.version
    in
    Lwt.finalize run_migration cleanup

  let () = Stdlib.Hashtbl.add migrations S_from.version (S_dest.version, migrate)
end

let migration_path ~from ~dest =
  let rec path acc from dest =
    if from = dest then Some (List.rev acc)
    else
      let first_steps =
        Stdlib.Hashtbl.find_all migrations from
        |> List.stable_sort (fun (va, _) (vb, _) ->
               (* Try biggest jumps first that don't go beyond the
                  destination *)
               if va > dest && vb > dest then Stdlib.compare va vb
               else if va > dest then 1
               else if vb > dest then -1
               else Stdlib.compare vb va)
      in
      (* Recursively look for migration sub-paths *)
      let paths =
        List.filter_map
          (fun (step, migration) ->
            path ((from, step, migration) :: acc) step dest)
          first_steps
      in
      (* Choose shortest migration path *)
      List.stable_sort List.compare_lengths paths |> List.hd
  in
  path [] from dest

let maybe_run_migration ~storage_dir =
  let open Lwt_result_syntax in
  let* current_version = version_of_store ~storage_dir in
  let last_version = Store.version in
  match (current_version, last_version) with
  | None, _ ->
      (* Store not initialized, write last version *)
      Store_version.write_version_file ~dir:storage_dir last_version
  | Some (current, versioned), last when last = current -> (
      match versioned with
      | Unintialized_version ->
          Store_version.write_version_file ~dir:storage_dir last_version
      | Version_known ->
          (* Up to date, nothing to do *)
          return_unit)
  | Some (current, _), last -> (
      let migrations = migration_path ~from:current ~dest:last in
      match migrations with
      | None ->
          failwith
            "Store version %a is not supported by this rollup node because \
             there is no migration path from it to %a."
            Store_version.pp
            current
            Store_version.pp
            last
      | Some migrations ->
          Format.printf "Starting store migration@." ;
          let+ () =
            List.iter_es
              (fun (vx, vy, migrate) ->
                Format.printf
                  "- Migrating store from %a to %a@."
                  Store_version.pp
                  vx
                  Store_version.pp
                  vy ;
                migrate ~storage_dir)
              migrations
          in
          Format.printf "Store migration completed@.")

module V1_migrations = struct
  let convert_store_messages
      (messages, (block_hash, timestamp, number_of_messages)) =
    ( messages,
      (false (* is migration block *), block_hash, timestamp, number_of_messages)
    )

  let migrate_messages (v0_store : _ Store_v0.t) (v1_store : _ Store_v1.t)
      (l2_block : Sc_rollup_block.t) =
    let open Lwt_result_syntax in
    let* v0_messages =
      Store_v0.Messages.read v0_store.messages l2_block.header.inbox_witness
    in
    match v0_messages with
    | None -> return_unit
    | Some v0_messages ->
        let value, header = convert_store_messages v0_messages in
        Store_v1.Messages.append
          v1_store.messages
          ~key:l2_block.header.inbox_witness
          ~header
          ~value

  (* In place migration of processed slots under new key name by hand *)
  let migrate_dal_processed_slots_irmin (v1_store : _ Store_v1.t) =
    let open Lwt_syntax in
    let open Store_v1 in
    let info () =
      let date =
        Tezos_base.Time.(
          System.now () |> System.to_protocol |> Protocol.to_seconds)
      in
      let author =
        Format.asprintf
          "Rollup node %a"
          Tezos_version_parser.pp
          Tezos_version_value.Current_git_info.version
      in
      let message = "Migration store from v0 to v1" in
      Irmin_store.Raw_irmin.Info.v ~author ~message date
    in
    let store = Irmin_store.Raw_irmin.unsafe v1_store.irmin_store in
    let old_root = Store_v0.Dal_processed_slots.path in
    let new_root = Dal_slots_statuses.path in
    let* old_tree = Irmin_store.Raw_irmin.find_tree store old_root in
    match old_tree with
    | None -> return_unit
    | Some _ ->
        (* Move the tree in the new key *)
        Irmin_store.Raw_irmin.with_tree_exn
          ~info
          store
          new_root
          (fun _new_tree -> return old_tree)

  let final_actions ~storage_dir ~tmp_dir (_v0_store : _ Store_v0.t)
      (v1_store : _ Store_v1.t) =
    let open Lwt_result_syntax in
    let*! () =
      Lwt_utils_unix.remove_dir (messages_store_location ~storage_dir)
    in
    let*! () =
      Lwt_unix.rename
        (messages_store_location ~storage_dir:tmp_dir)
        (messages_store_location ~storage_dir)
    in
    let*! () = migrate_dal_processed_slots_irmin v1_store in
    return_unit

  module From_v0 =
    Make (Store_v0) (Store_v1)
      (struct
        let migrate_block_action = migrate_messages

        let final_actions = final_actions
      end)
end

module V2_migrations = struct
  let messages_store_location ~storage_dir =
    let open Filename.Infix in
    storage_dir // "messages"

  let commitments_store_location ~storage_dir =
    let open Filename.Infix in
    storage_dir // "commitments"

  let inboxes_store_location ~storage_dir =
    let open Filename.Infix in
    storage_dir // "inboxes"

  let migrate_messages read_messages (v2_store : _ Store_v2.t)
      (l2_block : Sc_rollup_block.t) =
    let open Lwt_result_syntax in
    let* v1_messages = read_messages l2_block.header.inbox_witness in
    match v1_messages with
    | None -> return_unit
    | Some (messages, _v1_header) ->
        let header = l2_block.header.block_hash in
        Store_v2.Messages.append
          v2_store.messages
          ~key:l2_block.header.inbox_witness
          ~header
          ~value:messages

  let migrate_commitment read_commitment (v2_store : _ Store_v2.t)
      (l2_block : Sc_rollup_block.t) =
    let open Lwt_result_syntax in
    match l2_block.header.commitment_hash with
    | None -> return_unit
    | Some commitment_hash -> (
        let* v1_commitment = read_commitment commitment_hash in
        match v1_commitment with
        | None -> return_unit
        | Some commitment ->
            Store_v2.Commitments.append
              v2_store.commitments
              ~key:commitment_hash
              ~value:commitment)

  let migrate_inbox read_inbox (v2_store : _ Store_v2.t)
      (l2_block : Sc_rollup_block.t) =
    let open Lwt_result_syntax in
    let* v1_inbox = read_inbox l2_block.header.inbox_hash in
    match v1_inbox with
    | None -> return_unit
    | Some (inbox, ()) ->
        Store_v2.Inboxes.append
          v2_store.inboxes
          ~key:l2_block.header.inbox_hash
          ~value:inbox

  let final_actions ~storage_dir ~tmp_dir _ _ =
    let open Lwt_result_syntax in
    let*! () =
      Lwt_utils_unix.remove_dir (messages_store_location ~storage_dir)
    in
    let*! () =
      Lwt_utils_unix.remove_dir (commitments_store_location ~storage_dir)
    in
    let*! () =
      Lwt_utils_unix.remove_dir (inboxes_store_location ~storage_dir)
    in
    let*! () =
      Lwt_unix.rename
        (messages_store_location ~storage_dir:tmp_dir)
        (messages_store_location ~storage_dir)
    in
    let*! () =
      Lwt_unix.rename
        (commitments_store_location ~storage_dir:tmp_dir)
        (commitments_store_location ~storage_dir)
    in
    let*! () =
      Lwt_unix.rename
        (inboxes_store_location ~storage_dir:tmp_dir)
        (inboxes_store_location ~storage_dir)
    in
    return_unit

  module From_v1 =
    Make (Store_v1) (Store_v2)
      (struct
        let migrate_block_action v1_store v2_store l2_block =
          let open Lwt_result_syntax in
          let* () =
            migrate_messages
              Store_v1.(Messages.read v1_store.messages)
              v2_store
              l2_block
          and* () =
            migrate_commitment
              Store_v1.(Commitments.find v1_store.commitments)
              v2_store
              l2_block
          and* () =
            migrate_inbox
              Store_v1.(Inboxes.read v1_store.inboxes)
              v2_store
              l2_block
          in
          return_unit

        let final_actions = final_actions
      end)

  module From_v0 =
    Make (Store_v0) (Store_v2)
      (struct
        let migrate_block_action v0_store v2_store l2_block =
          let open Lwt_result_syntax in
          let* () =
            migrate_messages
              Store_v0.(Messages.read v0_store.messages)
              v2_store
              l2_block
          and* () =
            migrate_commitment
              Store_v0.(Commitments.find v0_store.commitments)
              v2_store
              l2_block
          and* () =
            migrate_inbox
              Store_v0.(Inboxes.read v0_store.inboxes)
              v2_store
              l2_block
          in
          return_unit

        let final_actions = final_actions
      end)
end

module V3_migrations = struct
  let levels_store_location ~storage_dir =
    let open Filename.Infix in
    storage_dir // "levels_to_hashes"

  let recompute_level (v3_store : _ Store_v3.t) (l2_block : Sc_rollup_block.t) =
    Store.Levels_to_hashes.add
      ~flush:true
      v3_store.levels_to_hashes
      l2_block.header.level
      l2_block.header.block_hash

  let final_actions ~storage_dir ~tmp_dir _ _ =
    let open Lwt_result_syntax in
    let*! () = Lwt_utils_unix.remove_dir (levels_store_location ~storage_dir) in
    let*! () =
      Lwt_unix.rename
        (levels_store_location ~storage_dir:tmp_dir)
        (levels_store_location ~storage_dir)
    in
    return_unit

  module From_v2 =
    Make (Store_v2) (Store_v3)
      (struct
        let migrate_block_action _v2_store v3_store l2_block =
          recompute_level v3_store l2_block

        let final_actions = final_actions
      end)
end