Source file snapshots.ml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
(*****************************************************************************)
(*                                                                           *)
(* SPDX-License-Identifier: MIT                                              *)
(* Copyright (c) 2023 Functori <contact@functori.com>                        *)
(*                                                                           *)
(*****************************************************************************)

open Snapshot_utils

type compression = No | On_the_fly | After

let check_store_version store_dir =
  let open Lwt_result_syntax in
  let* store_version = Store_version.read_version_file ~dir:store_dir in
  let*? () =
    match store_version with
    | None -> error_with "Unversionned store, cannot produce snapshot."
    | Some v when v <> Store.version ->
        error_with
          "Incompatible store version %a, expected %a. Cannot produce \
           snapshot. Please restart your rollup node to migrate."
          Store_version.pp
          v
          Store_version.pp
          Store.version
    | Some _ -> Ok ()
  in
  return_unit

let get_head (store : _ Store.t) =
  let open Lwt_result_syntax in
  let* head = Store.L2_head.read store.l2_head in
  let*? head =
    match head with
    | None ->
        error_with
          "There is no head in the rollup node store, cannot produce snapshot."
    | Some head -> Ok head
  in
  return head

let check_head (head : Sc_rollup_block.t) context =
  let open Lwt_result_syntax in
  (* Ensure head context is available. *)
  let*! head_ctxt =
    Context.checkout
      context
      (Smart_rollup_context_hash.to_context_hash head.header.context)
  in
  let*? () =
    error_when (Option.is_none head_ctxt)
    @@ error_of_fmt "Head context cannot be checkouted, won't produce snapshot."
  in
  return head

let pre_export_checks_and_get_snapshot_metadata ~data_dir =
  let open Lwt_result_syntax in
  let store_dir = Configuration.default_storage_dir data_dir in
  let context_dir = Configuration.default_context_dir data_dir in
  (* Load context and stores in read-only to check they are valid. *)
  let* () = check_store_version store_dir in
  let* metadata = Metadata.read_metadata_file ~dir:data_dir in
  let*? metadata =
    match metadata with
    | None -> error_with "No rollup node metadata in %S." data_dir
    | Some m -> Ok m
  in
  let*? () = Context.Version.check metadata.context_version in
  let* store =
    Store.load Read_only ~index_buffer_size:0 ~l2_blocks_cache_size:1 store_dir
  in

  let* head = get_head store in
  let level = head.Sc_rollup_block.header.level in
  let* (module Plugin) =
    Protocol_plugins.proto_plugin_for_level_with_store store level
  in
  let (module C) = Plugin.Pvm.context metadata.kind in
  let* context = Context.load (module C) ~cache_size:1 Read_only context_dir in
  let* history_mode = Store.History_mode.read store.history_mode in
  let*? history_mode =
    match history_mode with
    | None -> error_with "No history mode information in %S." data_dir
    | Some h -> Ok h
  in

  let* head = check_head head context in
  (* Closing context and stores after checks *)
  let*! () = Context.close context in
  let* () = Store.close store in
  return
    {
      history_mode;
      address = metadata.rollup_address;
      head_level = head.header.level;
      last_commitment = Sc_rollup_block.most_recent_commitment head.header;
    }

let first_available_level ~data_dir store =
  let open Lwt_result_syntax in
  let* gc_levels = Store.Gc_levels.read store.Store.gc_levels in
  match gc_levels with
  | Some {first_available_level; _} -> return first_available_level
  | None -> (
      let* metadata = Metadata.read_metadata_file ~dir:data_dir in
      match metadata with
      | None -> failwith "No metadata (needs rollup genesis info)."
      | Some {genesis_info = {level; _}; _} -> return level)

let check_some hash what = function
  | Some x -> Ok x
  | None ->
      error_with "Could not read %s at %a after import." what Block_hash.pp hash

let check_block_data_and_get_content (store : _ Store.t) context hash =
  let open Lwt_result_syntax in
  let* b = Store.L2_blocks.read store.l2_blocks hash in
  let*? _b, header = check_some hash "L2 block" b in
  let* messages = Store.Messages.read store.messages header.inbox_witness in
  let*? _messages, _ = check_some hash "messages" messages in
  let* inbox = Store.Inboxes.read store.inboxes header.inbox_hash in
  let*? inbox, () = check_some hash "inbox" inbox in
  let* commitment =
    match header.commitment_hash with
    | None -> return_none
    | Some commitment_hash ->
        let* commitment =
          Store.Commitments.read store.commitments commitment_hash
        in
        let*? commitment, () = check_some hash "commitment" commitment in
        return_some commitment
  in
  (* Ensure head context is available. *)
  let*! head_ctxt =
    Context.checkout
      context
      (Smart_rollup_context_hash.to_context_hash header.context)
  in
  let*? head_ctxt = check_some hash "context" head_ctxt in
  return (header, inbox, commitment, head_ctxt)

let get_pvm_state_from_store head_ctxt hash =
  let open Lwt_result_syntax in
  let*! pvm_state = Context.PVMState.find head_ctxt in
  let*? pvm_state = check_some hash "pvm_state" pvm_state in
  return pvm_state

let compute_pvm_state_for_genenis cctxt dest (store : _ Store.t) context
    (header : Sc_rollup_block.header) plugin =
  let open Lwt_result_syntax in
  let* current_protocol =
    Node_context.protocol_of_level_with_store store header.level
  in
  let (module Plugin : Protocol_plugin_sig.PARTIAL) = plugin in
  let* constants =
    Plugin.Layer1_helpers.retrieve_constants cctxt ~block:(`Level header.level)
  in
  let current_protocol =
    {
      Node_context.hash = current_protocol.protocol;
      proto_level = current_protocol.proto_level;
      constants;
    }
  in
  let* node_context =
    Node_context_loader.For_snapshots.create_node_context
      cctxt
      current_protocol
      store
      context
      ~data_dir:dest
  in
  Interpreter.genesis_state plugin node_context

let check_genesis_pvm_state_and_return cctxt dest store context header
    (module Plugin : Protocol_plugin_sig.PARTIAL) (metadata : Metadata.t)
    head_ctxt hash =
  let open Lwt_result_syntax in
  let* patched_pvm_state, Original pvm_state =
    compute_pvm_state_for_genenis
      cctxt
      dest
      store
      context
      header
      (module Plugin)
  in
  let* context_pvm_state = get_pvm_state_from_store head_ctxt hash in
  let*! context_state_hash =
    Plugin.Pvm.state_hash metadata.kind context_pvm_state
  in
  let*! patched_state_hash =
    Plugin.Pvm.state_hash metadata.kind patched_pvm_state
  in
  let*? () =
    error_unless State_hash.(context_state_hash = patched_state_hash)
    @@ error_of_fmt
         "Erroneous state hash %a for originated rollup (level %ld) instead of \
          %a."
         State_hash.pp
         context_state_hash
         header.level
         State_hash.pp
         patched_state_hash
  in
  return pvm_state

let check_block_data_consistency cctxt dest (metadata : Metadata.t)
    (store : _ Store.t) context hash next_commitment =
  let open Lwt_result_syntax in
  let* header, inbox, commitment, head_ctxt =
    check_block_data_and_get_content store context hash
  in
  let* (module Plugin) =
    Protocol_plugins.proto_plugin_for_level_with_store store header.level
  in
  let* pvm_state_of_commitment =
    if metadata.genesis_info.level = header.level then
      check_genesis_pvm_state_and_return
        cctxt
        dest
        store
        context
        header
        (module Plugin)
        metadata
        head_ctxt
        hash
    else get_pvm_state_from_store head_ctxt hash
  in
  let*! state_hash =
    Plugin.Pvm.state_hash metadata.kind pvm_state_of_commitment
  in
  let* () =
    match (commitment, header.commitment_hash) with
    | None, None -> return_unit
    | Some _, None | None, Some _ ->
        (* The commitment is fetched from the header value *)
        assert false
    | Some commitment, Some commitment_hash ->
        let hash_of_commitment = Commitment.hash commitment in
        let*? () =
          error_unless Commitment.Hash.(hash_of_commitment = commitment_hash)
          @@ error_of_fmt
               "Erroneous commitment hash %a for level %ld instead of %a."
               Commitment.Hash.pp
               hash_of_commitment
               header.level
               Commitment.Hash.pp
               commitment_hash
        in
        let*? () =
          error_unless State_hash.(state_hash = commitment.compressed_state)
          @@ error_of_fmt
               "Erroneous state hash %a for level %ld instead of %a."
               State_hash.pp
               state_hash
               header.level
               State_hash.pp
               commitment.compressed_state
        in
        let*? () =
          error_unless (commitment.inbox_level = header.level)
          @@ error_of_fmt
               "Erroneous inbox level %ld in commitment instead of level %ld."
               commitment.inbox_level
               header.level
        in
        let*? () =
          if header.level = metadata.genesis_info.level then Ok ()
          else
            error_unless
              Commitment.Hash.(
                header.previous_commitment_hash = commitment.predecessor)
            @@ error_of_fmt
                 "Erroneous previous commitment hash %a for level %ld instead \
                  of %a."
                 Commitment.Hash.pp
                 header.previous_commitment_hash
                 header.level
                 Commitment.Hash.pp
                 commitment.predecessor
        in
        return_unit
  in
  let*? () =
    match (next_commitment, header.commitment_hash) with
    | None, _ | _, None ->
        (* If there is no commitment for this block there is no check to do. *)
        Ok ()
    | Some next_commitment, Some commitment_hash ->
        error_unless
          Commitment.Hash.(
            next_commitment.Commitment.predecessor = commitment_hash)
        @@ error_of_fmt
             "Commitment hash %a for level %ld was expected to be %a in the \
              chain of commitments."
             Commitment.Hash.pp
             commitment_hash
             header.level
             Commitment.Hash.pp
             next_commitment.predecessor
  in
  let hash_of_inbox = Inbox.hash inbox in
  let*? () =
    error_unless Inbox.Hash.(hash_of_inbox = header.inbox_hash)
    @@ error_of_fmt
         "Erroneous inbox %a for level %ld instead of %a."
         Inbox.Hash.pp
         hash_of_inbox
         header.level
         Inbox.Hash.pp
         header.inbox_hash
  in
  return (header, commitment)

let check_block_data (store : _ Store.t) context hash _next_commitment =
  let open Lwt_result_syntax in
  let* header, _inbox, commitment, _head_ctxt =
    check_block_data_and_get_content store context hash
  in
  return (header, commitment)

let check_l2_chain ~message ~data_dir (store : _ Store.t) context
    (head : Sc_rollup_block.t) check_block =
  let open Lwt_result_syntax in
  let* first_available_level = first_available_level ~data_dir store in
  let blocks_to_check =
    Int32.sub head.header.level first_available_level |> Int32.to_int |> succ
  in
  let progress_bar =
    Progress_bar.progress_bar
      ~counter:`Int
      ~message
      ~color:(Terminal.Color.rgb 3 252 132)
      blocks_to_check
  in
  Progress_bar.Lwt.with_reporter progress_bar @@ fun count_progress ->
  let rec check_chain hash next_commitment =
    let* header, commitment = check_block store context hash next_commitment in
    let*! () = count_progress 1 in
    if header.Sc_rollup_block.level <= first_available_level then return_unit
    else
      check_chain header.predecessor (Option.either commitment next_commitment)
  in
  check_chain head.header.block_hash None

let check_last_commitment head snapshot_metadata =
  let last_snapshot_commitment =
    Sc_rollup_block.most_recent_commitment head.Sc_rollup_block.header
  in
  error_unless
    Commitment.Hash.(
      snapshot_metadata.last_commitment = last_snapshot_commitment)
  @@ error_of_fmt
       "Last commitment in snapshot is %a but should be %a."
       Commitment.Hash.pp
       last_snapshot_commitment
       Commitment.Hash.pp
       snapshot_metadata.last_commitment

let check_last_commitment_published cctxt snapshot_metadata =
  let open Lwt_result_syntax in
  Error.trace_lwt_result_with
    "Last commitment of snapshot is not published on L1."
  @@ let* {current_protocol; _} =
       Tezos_shell_services.Shell_services.Blocks.protocols
         cctxt
         ~block:(`Head 0)
         ()
     in
     let*? (module Plugin) =
       Protocol_plugins.proto_plugin_for_protocol current_protocol
     in
     let* (_commitment : Commitment.t) =
       Plugin.Layer1_helpers.get_commitment
         cctxt
         snapshot_metadata.address
         snapshot_metadata.last_commitment
     in
     return_unit

let check_lcc metadata cctxt (store : _ Store.t) (head : Sc_rollup_block.t)
    (module Plugin : Protocol_plugin_sig.S) =
  let open Lwt_result_syntax in
  let* lcc =
    Plugin.Layer1_helpers.get_last_cemented_commitment
      cctxt
      metadata.Metadata.rollup_address
  in
  if lcc.level > head.header.level then
    (* The snapshot is older than the current LCC *)
    return_unit
  else
    let* lcc_block_hash =
      Store.Levels_to_hashes.find store.levels_to_hashes lcc.level
    in
    let*? lcc_block_hash =
      match lcc_block_hash with
      | None -> error_with "No block for LCC level %ld" lcc.level
      | Some h -> Ok h
    in
    let* lcc_block_header =
      Store.L2_blocks.header store.l2_blocks lcc_block_hash
    in
    match lcc_block_header with
    | None ->
        failwith
          "Unknown block %a for LCC level %ld"
          Block_hash.pp
          lcc_block_hash
          lcc.level
    | Some {commitment_hash = None; _} ->
        failwith
          "No commitment for block %a for LCC level %ld"
          Block_hash.pp
          lcc_block_hash
          lcc.level
    | Some {commitment_hash = Some commitment_hash; _} ->
        fail_unless Commitment.Hash.(lcc.commitment = commitment_hash)
        @@ error_of_fmt
             "Snapshot contains %a for LCC at level %ld but was expected to be \
              %a."
             Commitment.Hash.pp
             commitment_hash
             lcc.level
             Commitment.Hash.pp
             lcc.commitment

let hash_level_of_l2_block (b : Sc_rollup_block.t) =
  Layer1.{hash = b.header.block_hash; level = b.header.level}

let reconstruct_level_context rollup_ctxt ~predecessor
    (node_ctxt : _ Node_context.t) level =
  let open Lwt_result_syntax in
  let* block = Node_context.get_l2_block_by_level node_ctxt level in
  let* inbox = Node_context.get_inbox node_ctxt block.header.inbox_hash
  and* messages = Messages.get node_ctxt block.header.inbox_witness in
  let* (module Plugin) =
    Protocol_plugins.proto_plugin_for_level node_ctxt level
  in
  let* ctxt, _num_messages, _num_ticks, _initial_tick =
    Interpreter.process_head
      (module Plugin)
      node_ctxt
      rollup_ctxt
      ~predecessor:(hash_level_of_l2_block predecessor)
      (hash_level_of_l2_block block)
      (inbox, messages)
  in
  let*! context_hash = Context.commit ctxt in
  assert (
    Smart_rollup_context_hash.(
      of_context_hash context_hash = block.header.context)) ;
  return (block, ctxt)

let reconstruct_context_from_first_available_level
    (node_ctxt : _ Node_context.t) ~(head : Sc_rollup_block.t) =
  let open Lwt_result_syntax in
  let* {first_available_level = first_level; _} =
    Node_context.get_gc_levels node_ctxt
  in
  let total = Int32.sub head.header.level first_level in
  let progress_bar =
    Progress_bar.progress_bar
      ~counter:`Int
      ~message:"Reconstructing context"
      ~color:(Terminal.Color.rgb 219 146 21)
      (Int32.to_int total)
  in
  Progress_bar.Lwt.with_reporter progress_bar @@ fun count_progress ->
  let* first_block = Node_context.get_l2_block_by_level node_ctxt first_level in
  let* first_ctxt =
    Node_context.checkout_context node_ctxt first_block.header.block_hash
  in
  let rec reconstruct_chain_from (block : Sc_rollup_block.t) rollup_ctxt =
    if block.header.level >= head.header.level then return_unit
    else
      let level = Int32.succ block.header.level in
      let* block, rollup_ctxt =
        reconstruct_level_context rollup_ctxt ~predecessor:block node_ctxt level
      in
      let*! () = count_progress 1 in
      reconstruct_chain_from block rollup_ctxt
  in
  reconstruct_chain_from first_block first_ctxt

let with_modify_data_dir cctxt ~data_dir
    ?(skip_condition = fun _ _ ~head:_ -> Lwt_result.return false) f =
  let open Lwt_result_syntax in
  let store_dir = Configuration.default_storage_dir data_dir in
  let context_dir = Configuration.default_context_dir data_dir in
  let* () = check_store_version store_dir in
  let* store =
    Store.load
      Read_write
      ~index_buffer_size:1000
      ~l2_blocks_cache_size:100
      store_dir
  in
  let* head = get_head store in
  let* (module Plugin) =
    Protocol_plugins.proto_plugin_for_level_with_store store head.header.level
  in
  let* metadata = Metadata.read_metadata_file ~dir:data_dir in
  let*? metadata =
    match metadata with
    | None -> error_with "No rollup node metadata in %S." data_dir
    | Some m -> Ok m
  in
  let (module C) = Plugin.Pvm.context metadata.kind in
  let* context =
    Context.load (module C) ~cache_size:100 Read_write context_dir
  in
  let* skip = skip_condition store context ~head in
  unless skip @@ fun () ->
  let* current_protocol =
    Node_context.protocol_of_level_with_store store head.header.level
  in
  let*? (module Plugin) =
    Protocol_plugins.proto_plugin_for_protocol current_protocol.protocol
  in
  let* constants =
    Plugin.Layer1_helpers.retrieve_constants
      cctxt
      ~block:(`Level head.header.level)
  in
  let current_protocol =
    {
      Node_context.hash = current_protocol.protocol;
      proto_level = current_protocol.proto_level;
      constants;
    }
  in
  let* node_ctxt =
    Node_context_loader.For_snapshots.create_node_context
      cctxt
      current_protocol
      store
      context
      ~data_dir
  in
  let* () = f node_ctxt ~head in
  let*! () = Context.close context in
  let* () = Store.close store in
  return_unit

let maybe_reconstruct_context cctxt ~data_dir =
  with_modify_data_dir
    cctxt
    ~data_dir
    ~skip_condition:(fun _store context ~head ->
      let open Lwt_result_syntax in
      let*! head_ctxt =
        Context.checkout
          context
          (Smart_rollup_context_hash.to_context_hash head.header.context)
      in
      return (Option.is_some head_ctxt))
    reconstruct_context_from_first_available_level

let post_checks ~action ~message snapshot_metadata ~dest =
  let open Lwt_result_syntax in
  let store_dir = Configuration.default_storage_dir dest in
  let context_dir = Configuration.default_context_dir dest in
  (* Load context and stores in read-only to run checks. *)
  let* () = check_store_version store_dir in
  let* store =
    Store.load
      Read_only
      ~index_buffer_size:1000
      ~l2_blocks_cache_size:100
      store_dir
  in
  let* head = get_head store in
  let* (module Plugin) =
    Protocol_plugins.proto_plugin_for_level_with_store store head.header.level
  in
  let* metadata = Metadata.read_metadata_file ~dir:dest in
  let*? metadata =
    match metadata with
    | None -> error_with "No rollup node metadata in %S." dest
    | Some m -> Ok m
  in
  let (module C) = Plugin.Pvm.context metadata.kind in
  let* context =
    Context.load (module C) ~cache_size:100 Read_only context_dir
  in
  let* head = check_head head context in
  let* check_block_data =
    match action with
    | `Export -> return check_block_data
    | `Import cctxt -> (
        let* metadata = Metadata.read_metadata_file ~dir:dest in
        match metadata with
        | None ->
            (* We need the kind of the rollup to run the consistency checks in
               order to verify state hashes. *)
            failwith "No metadata (needs rollup kind)."
        | Some metadata ->
            let*? () = check_last_commitment head snapshot_metadata in
            let* () = check_lcc metadata cctxt store head (module Plugin) in
            return (check_block_data_consistency cctxt dest metadata))
  in
  let* () =
    check_l2_chain ~message ~data_dir:dest store context head check_block_data
  in
  let*! () = Context.close context in
  let* () = Store.close store in
  return_unit

(* Magic bytes for gzip files is 1f8b. *)
let is_compressed_snapshot snapshot_file =
  let ic = open_in snapshot_file in
  try
    let ok = input_byte ic = 0x1f && input_byte ic = 0x8b in
    close_in ic ;
    ok
  with
  | End_of_file ->
      close_in ic ;
      false
  | e ->
      close_in ic ;
      raise e

let post_export_checks ~snapshot_file =
  let open Lwt_result_syntax in
  Lwt_utils_unix.with_tempdir "snapshot_checks_" @@ fun dest ->
  let reader =
    if is_compressed_snapshot snapshot_file then gzip_reader else stdlib_reader
  in
  let* snapshot_metadata =
    extract reader stdlib_writer (fun _ -> return_unit) ~snapshot_file ~dest
  in
  post_checks
    ~action:`Export
    ~message:"Checking snapshot   "
    snapshot_metadata
    ~dest

let operator_local_file_regexp = Re.Str.regexp "^storage/lpc$"

let snapshotable_files_regexp =
  Re.Str.regexp
    "^\\(storage/.*\\|context/.*\\|wasm_2_0_0/.*\\|arith/.*\\|riscv/.*\\|context/.*\\|metadata$\\)"

let with_locks ~data_dir f =
  Format.eprintf "Acquiring GC lock@." ;
  (* Take GC lock first in order to not prevent progression of rollup node. *)
  Utils.with_lockfile (Node_context.gc_lockfile_path ~data_dir) @@ fun () ->
  Format.eprintf "Acquiring process lock@." ;
  Utils.with_lockfile (Node_context.processing_lockfile_path ~data_dir) f

let export_dir metadata ~take_locks ~compression ~data_dir ~dest ~filename =
  let open Lwt_result_syntax in
  let* snapshot_file =
    let with_locks =
      if take_locks then with_locks ~data_dir else fun f -> f ()
    in
    with_locks @@ fun () ->
    let dest_file_name =
      match filename with
      | Some f ->
          let suffix =
            match compression with
            | No | On_the_fly -> ""
            | After -> ".uncompressed"
          in
          f ^ suffix
      | None ->
          let suffix =
            match compression with
            | On_the_fly -> ""
            | No | After -> ".uncompressed"
          in
          Format.asprintf
            "snapshot-%a-%ld.%s%s"
            Address.pp_short
            metadata.address
            metadata.head_level
            (Configuration.string_of_history_mode metadata.history_mode)
            suffix
    in
    let dest_file =
      match dest with
      | Some dest -> Filename.concat dest dest_file_name
      | None -> dest_file_name
    in
    let*! () =
      let open Lwt_syntax in
      let* () = Option.iter_s Lwt_utils_unix.create_dir dest in
      let include_file ~relative_path =
        Re.Str.string_match snapshotable_files_regexp relative_path 0
        && not (Re.Str.string_match operator_local_file_regexp relative_path 0)
      in
      let writer =
        match compression with
        | On_the_fly -> gzip_writer
        | No | After -> stdlib_writer
      in
      create
        stdlib_reader
        writer
        metadata
        ~dir:data_dir
        ~include_file
        ~dest:dest_file ;
      return_unit
    in
    return dest_file
  in
  let snapshot_file =
    match compression with
    | No | On_the_fly -> snapshot_file
    | After -> compress ~snapshot_file
  in
  return snapshot_file

let export ~no_checks ~compression ~data_dir ~dest ~filename =
  let open Lwt_result_syntax in
  let* metadata = pre_export_checks_and_get_snapshot_metadata ~data_dir in
  let* snapshot_file =
    export_dir metadata ~take_locks:true ~compression ~data_dir ~dest ~filename
  in
  let* () = unless no_checks @@ fun () -> post_export_checks ~snapshot_file in
  return snapshot_file

let export_compact ~compression ~data_dir ~dest ~filename =
  let open Lwt_result_syntax in
  let* snapshot_metadata =
    pre_export_checks_and_get_snapshot_metadata ~data_dir
  in
  Lwt_utils_unix.with_tempdir "snapshot_temp_" @@ fun tmp_dir ->
  let tmp_context_dir = Configuration.default_context_dir tmp_dir in
  let tmp_store_dir = Configuration.default_storage_dir tmp_dir in
  let*! () = Lwt_utils_unix.create_dir tmp_context_dir in
  let*! () = Lwt_utils_unix.create_dir tmp_store_dir in
  let store_dir = Configuration.default_storage_dir data_dir in
  let context_dir = Configuration.default_context_dir data_dir in
  let* store =
    Store.load Read_only ~index_buffer_size:0 ~l2_blocks_cache_size:1 store_dir
  in
  let* metadata = Metadata.read_metadata_file ~dir:data_dir in
  let*? metadata =
    match metadata with
    | None -> error_with "No rollup node metadata in %S." data_dir
    | Some m -> Ok m
  in
  let* head = get_head store in
  let level = head.Sc_rollup_block.header.level in
  let* (module Plugin) =
    Protocol_plugins.proto_plugin_for_level_with_store store level
  in
  let (module C) = Plugin.Pvm.context metadata.kind in
  let* context = Context.load (module C) ~cache_size:1 Read_only context_dir in
  let* first_level = first_available_level ~data_dir store in
  let* first_block_hash =
    Store.Levels_to_hashes.find store.levels_to_hashes first_level
  in
  let first_block_hash =
    WithExceptions.Option.get first_block_hash ~loc:__LOC__
  in
  let* first_block = Store.L2_blocks.read store.l2_blocks first_block_hash in
  let _, first_block = WithExceptions.Option.get first_block ~loc:__LOC__ in
  Format.eprintf "Exporting context snapshot with first level %ld@." first_level ;
  let* () =
    Context.export_snapshot
      context
      (Smart_rollup_context_hash.to_context_hash first_block.context)
      ~path:tmp_context_dir
  in
  let ( // ) = Filename.concat in
  let copy_dir a =
    let dir = data_dir // a in
    if Sys.file_exists dir && Sys.is_directory dir then
      copy_dir dir (tmp_dir // a)
  in
  let copy_file a =
    let path = data_dir // a in
    if Sys.file_exists path then copy_file ~src:path ~dst:(tmp_dir // a)
  in
  Format.eprintf "Acquiring process lock@." ;
  let* () =
    Utils.with_lockfile (Node_context.processing_lockfile_path ~data_dir)
    @@ fun () ->
    Format.eprintf "Copying data@." ;
    Snapshot_utils.copy_dir store_dir tmp_store_dir ;
    copy_file "metadata" ;
    return_unit
  in
  copy_dir "wasm_2_0_0" ;
  copy_dir "arith" ;
  copy_dir "riscv" ;
  let compression =
    match compression with
    | After ->
        (* We've already copied data *)
        On_the_fly
    | _ -> compression
  in
  export_dir
    snapshot_metadata
    ~take_locks:false
    ~compression
    ~data_dir:tmp_dir
    ~dest
    ~filename

let pre_import_checks cctxt ~no_checks ~data_dir snapshot_metadata =
  let open Lwt_result_syntax in
  let store_dir = Configuration.default_storage_dir data_dir in
  (* Load stores in read-only to make simple checks. *)
  let* store =
    Store.load
      Read_write
      ~index_buffer_size:1000
      ~l2_blocks_cache_size:100
      store_dir
  in
  let* metadata = Metadata.read_metadata_file ~dir:data_dir
  and* history_mode = Store.History_mode.read store.history_mode
  and* head = Store.L2_head.read store.l2_head in
  let* () = Store.close store in
  let*? () =
    let open Result_syntax in
    match (metadata, history_mode) with
    | None, _ | _, None ->
        (* The rollup node data dir was never initialized, i.e. the rollup node
           wasn't run yet. *)
        return_unit
    | Some {rollup_address; _}, Some history_mode ->
        let* () =
          error_unless Address.(rollup_address = snapshot_metadata.address)
          @@ error_of_fmt
               "The existing rollup node is for %a, but the snapshot is for \
                rollup %a."
               Address.pp
               rollup_address
               Address.pp
               snapshot_metadata.address
        in
        let a_history_str = function
          | Configuration.Archive -> "an archive"
          | Configuration.Full -> "a full"
        in
        error_unless (history_mode = snapshot_metadata.history_mode)
        @@ error_of_fmt
             "Cannot import %s snapshot into %s rollup node."
             (a_history_str snapshot_metadata.history_mode)
             (a_history_str history_mode)
  in
  let*? () =
    let open Result_syntax in
    match head with
    | None ->
        (* The rollup node has no L2 chain. *)
        return_unit
    | Some head ->
        error_when (snapshot_metadata.head_level <= head.header.level)
        @@ error_of_fmt
             "The rollup node is already at level %ld but the snapshot is only \
              for level %ld."
             head.header.level
             snapshot_metadata.head_level
  in
  let* () =
    unless no_checks @@ fun () ->
    check_last_commitment_published cctxt snapshot_metadata
  in
  return_unit

let check_data_dir_unpopulated data_dir () =
  let open Lwt_result_syntax in
  let store_dir = Configuration.default_storage_dir data_dir in
  let context_dir = Configuration.default_context_dir data_dir in
  let*! store_exists = Lwt_utils_unix.dir_exists store_dir in
  let*! context_exists = Lwt_utils_unix.dir_exists context_dir in
  if store_exists || context_exists then
    failwith
      "The rollup node data dir %s is already populated. If you want to \
       overwrite its non-local content use the --force option."
      data_dir
  else return_unit

let import ~no_checks ~force cctxt ~data_dir ~snapshot_file =
  let open Lwt_result_syntax in
  let* () = unless force (check_data_dir_unpopulated data_dir) in
  let*! () = Lwt_utils_unix.create_dir data_dir in
  let*! () = Event.acquiring_lock () in
  Utils.with_lockfile
    ~when_locked:`Fail
    (Node_context.global_lockfile_path ~data_dir)
  @@ fun () ->
  let reader =
    if is_compressed_snapshot snapshot_file then gzip_reader else stdlib_reader
  in
  let* snapshot_metadata =
    extract
      reader
      stdlib_writer
      (pre_import_checks cctxt ~no_checks ~data_dir)
      ~snapshot_file
      ~dest:data_dir
  in
  let* () = maybe_reconstruct_context cctxt ~data_dir in
  unless no_checks @@ fun () ->
  post_checks
    ~action:(`Import cctxt)
    ~message:"Checking imported data"
    snapshot_metadata
    ~dest:data_dir

let info ~snapshot_file =
  let compressed = is_compressed_snapshot snapshot_file in
  let reader = if compressed then gzip_reader else stdlib_reader in
  let metadata = read_metadata reader ~snapshot_file in
  (metadata, if compressed then `Compressed else `Uncompressed)