Source file node_context_loader.ml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
open Node_context
let lock ~data_dir =
let lockfile_path = global_lockfile_path ~data_dir in
let lock_aux ~data_dir =
let open Lwt_result_syntax in
let*! () = Event.acquiring_lock () in
let*! () = Lwt_utils_unix.create_dir data_dir in
let* lockfile =
protect @@ fun () ->
Lwt_unix.openfile
lockfile_path
[Unix.O_CREAT; O_RDWR; O_CLOEXEC; O_SYNC]
0o644
|> Lwt_result.ok
in
let* () =
protect ~on_error:(fun err ->
let*! () = Lwt_unix.close lockfile in
fail err)
@@ fun () ->
let*! () = Lwt_unix.lockf lockfile Unix.F_LOCK 0 in
return_unit
in
return lockfile
in
trace (Rollup_node_errors.Could_not_acquire_lock lockfile_path)
@@ lock_aux ~data_dir
let unlock {lockfile; _} =
Lwt.finalize
(fun () -> Lwt_unix.lockf lockfile Unix.F_ULOCK 0)
(fun () -> Lwt_unix.close lockfile)
let update_metadata ({Metadata.rollup_address; _} as metadata) ~data_dir =
let open Lwt_result_syntax in
let* disk_metadata = Metadata.Versioned.read_metadata_file ~dir:data_dir in
match disk_metadata with
| Some (V1 {rollup_address = saved_address; context_version; _}) ->
let*? () = Context.Version.check context_version in
fail_unless Address.(rollup_address = saved_address)
@@ Rollup_node_errors.Unexpected_rollup {rollup_address; saved_address}
| Some (V0 {rollup_address = saved_address; context_version}) ->
let*? () = Context.Version.check context_version in
let*? () =
error_unless Address.(rollup_address = saved_address)
@@ Rollup_node_errors.Unexpected_rollup {rollup_address; saved_address}
in
Metadata.write_metadata_file ~dir:data_dir metadata
| None -> Metadata.write_metadata_file ~dir:data_dir metadata
let create_sync_info () =
{
on_synchronized = Lwt_condition.create ();
processed_level = 0l;
sync_level_input = Lwt_watcher.create_input ();
}
let init (cctxt : #Client_context.full) ~data_dir ~irmin_cache_size
~index_buffer_size ?log_kernel_debug_file ?last_whitelist_update mode
l1_ctxt genesis_info ~lcc ~lpc kind current_protocol
Configuration.(
{
sc_rollup_address = rollup_address;
l2_blocks_cache_size;
dal_node_endpoint;
_;
} as configuration) =
let open Lwt_result_syntax in
let* lockfile = lock ~data_dir in
let metadata =
{
Metadata.rollup_address;
context_version = Context.Version.version;
kind;
genesis_info;
}
in
let* () = update_metadata metadata ~data_dir in
let* () =
Store_migration.maybe_run_migration
metadata
~storage_dir:(Configuration.default_storage_dir data_dir)
~index_buffer_size:Configuration.default_index_buffer_size
in
let dal_cctxt =
Option.map Dal_node_client.make_unix_cctxt dal_node_endpoint
in
let* store =
Node_context.Node_store.load
mode
~index_buffer_size
~l2_blocks_cache_size
Configuration.(default_storage_dir data_dir)
in
let*? (module Plugin : Protocol_plugin_sig.S) =
Protocol_plugins.proto_plugin_for_protocol current_protocol.hash
in
let (module C) = Plugin.Pvm.context kind in
let* context =
Context.load
(module C)
~cache_size:irmin_cache_size
mode
(Configuration.default_context_dir data_dir)
in
let* () =
Node_context.Node_store.check_and_set_history_mode
mode
store
configuration.history_mode
in
let*! () = Event.rollup_exists ~addr:rollup_address ~kind in
let*! () =
if dal_cctxt = None && current_protocol.constants.dal.feature_enable then
Event.warn_dal_enabled_no_node ()
else Lwt.return_unit
in
let* dac_client =
Option.map_es
(fun observer_endpoint ->
Dac_observer_client.init
{
observer_endpoint;
reveal_data_dir = Filename.concat data_dir (Kind.to_string kind);
timeout_seconds = configuration.dac_timeout;
})
configuration.dac_observer_endpoint
in
let*! kernel_debug_logger, kernel_debug_finaliser =
let open Lwt_syntax in
if configuration.log_kernel_debug then
make_kernel_logger Event.kernel_debug ?log_kernel_debug_file data_dir
else return (Event.kernel_debug, fun () -> return_unit)
in
let global_block_watcher = Lwt_watcher.create_input () in
let private_info =
Option.map
(fun (message_index, outbox_level) ->
{
last_whitelist_update =
{outbox_level; message_index = Z.to_int message_index};
last_outbox_level_searched = outbox_level;
})
last_whitelist_update
in
let*? unsafe_patches =
Pvm_patches.make kind rollup_address configuration.unsafe_pvm_patches
in
let sync = create_sync_info () in
let node_ctxt =
{
config = configuration;
cctxt = (cctxt :> Client_context.full);
dal_cctxt;
dac_client;
data_dir;
l1_ctxt;
genesis_info;
lcc = Reference.new_ lcc;
lpc = Reference.new_ lpc;
private_info = Reference.new_ private_info;
kind;
unsafe_patches;
injector_retention_period = 0;
block_finality_time = 2;
lockfile;
store;
context;
kernel_debug_logger;
finaliser = kernel_debug_finaliser;
current_protocol = Reference.new_ current_protocol;
global_block_watcher;
sync;
}
in
let* l2_head = Node_context.last_processed_head_opt node_ctxt in
let processed_level =
match l2_head with Some {header = {level; _}; _} -> level | None -> 0l
in
sync.processed_level <- processed_level ;
return node_ctxt
let close ({cctxt; store; context; l1_ctxt; finaliser; _} as node_ctxt) =
let open Lwt_result_syntax in
let message = cctxt#message in
let*! () = message "Running finaliser@." in
let*! () = finaliser () in
let*! () = message "Shutting down L1@." in
let*! () = Layer1.shutdown l1_ctxt in
let*! () = message "Closing context@." in
let*! () = Context.close context in
let*! () = message "Closing store@." in
let* () = Node_context.Node_store.close store in
let*! () = message "Releasing lock@." in
let*! () = unlock node_ctxt in
return_unit
module For_snapshots = struct
let create_node_context cctxt current_protocol store context ~data_dir =
let open Lwt_result_syntax in
let loser_mode = Loser_mode.no_failures in
let l1_blocks_cache_size = Configuration.default_l1_blocks_cache_size in
let l2_blocks_cache_size = Configuration.default_l2_blocks_cache_size in
let index_buffer_size = Configuration.default_index_buffer_size in
let irmin_cache_size = Configuration.default_irmin_cache_size in
let l1_rpc_timeout = Configuration.default_l1_rpc_timeout in
let* metadata = Metadata.read_metadata_file ~dir:data_dir in
let*? metadata =
match metadata with
| None -> error_with "Missing metadata file for snapshot node context"
| Some m -> Ok m
in
let mode = Configuration.Observer in
let*? operators =
Purpose.make_operator
~needed_purposes:(Configuration.purposes_of_mode mode)
[]
in
let config =
Configuration.
{
sc_rollup_address = metadata.rollup_address;
boot_sector_file = None;
operators;
rpc_addr = Configuration.default_rpc_addr;
rpc_port = Configuration.default_rpc_port;
acl = Configuration.default_acl;
metrics_addr = None;
reconnection_delay = 1.;
fee_parameters = Configuration.default_fee_parameters;
mode;
loser_mode;
apply_unsafe_patches = true;
unsafe_pvm_patches = [];
dal_node_endpoint = None;
dac_observer_endpoint = None;
dac_timeout = None;
batcher = Configuration.default_batcher;
injector = Configuration.default_injector;
l1_blocks_cache_size;
l2_blocks_cache_size;
index_buffer_size = Some index_buffer_size;
irmin_cache_size = Some irmin_cache_size;
prefetch_blocks = None;
log_kernel_debug = false;
no_degraded = false;
gc_parameters = Configuration.default_gc_parameters;
history_mode = None;
cors = Resto_cohttp.Cors.default;
l1_rpc_timeout;
loop_retry_delay = 10.;
pre_images_endpoint = None;
}
in
let*? l1_ctxt =
Layer1.create
~name:"smart_rollup_node.snapshot"
~reconnection_delay:config.reconnection_delay
~l1_blocks_cache_size
cctxt
in
let* lcc = Store.Lcc.read store.Store.lcc in
let lcc =
match lcc with
| Some lcc -> lcc
| None ->
{
commitment = metadata.genesis_info.commitment_hash;
level = metadata.genesis_info.level;
}
in
let* lpc = Store.Lpc.read store.Store.lpc in
let*! lockfile =
Lwt_unix.openfile (Filename.temp_file "lock" "") [] 0o644
in
let global_block_watcher = Lwt_watcher.create_input () in
let sync = create_sync_info () in
let*? unsafe_patches =
Pvm_patches.make metadata.kind metadata.rollup_address []
in
return
{
config;
cctxt = (cctxt :> Client_context.full);
dal_cctxt = None;
dac_client = None;
data_dir;
l1_ctxt;
genesis_info = metadata.genesis_info;
lcc = Reference.new_ lcc;
lpc = Reference.new_ lpc;
private_info = Reference.new_ None;
kind = metadata.kind;
unsafe_patches;
injector_retention_period = 0;
block_finality_time = 2;
lockfile;
store = Node_context.Node_store.of_store store;
context;
kernel_debug_logger = (fun _ -> Lwt.return_unit);
finaliser = Lwt.return;
current_protocol = Reference.new_ current_protocol;
global_block_watcher;
sync;
}
end
module Internal_for_tests = struct
let create_node_context cctxt (current_protocol : current_protocol) ~data_dir
kind =
let open Lwt_result_syntax in
let rollup_address = Address.zero in
let mode = Configuration.Observer in
let*? operators =
Purpose.make_operator
~needed_purposes:(Configuration.purposes_of_mode mode)
[]
in
let loser_mode = Loser_mode.no_failures in
let l1_blocks_cache_size = Configuration.default_l1_blocks_cache_size in
let l2_blocks_cache_size = Configuration.default_l2_blocks_cache_size in
let index_buffer_size = Configuration.default_index_buffer_size in
let irmin_cache_size = Configuration.default_irmin_cache_size in
let l1_rpc_timeout = Configuration.default_l1_rpc_timeout in
let config =
Configuration.
{
sc_rollup_address = rollup_address;
boot_sector_file = None;
operators;
rpc_addr = Configuration.default_rpc_addr;
rpc_port = Configuration.default_rpc_port;
acl = Configuration.default_acl;
metrics_addr = None;
reconnection_delay = 5.;
fee_parameters = Configuration.default_fee_parameters;
mode;
loser_mode;
apply_unsafe_patches = false;
unsafe_pvm_patches = [];
dal_node_endpoint = None;
dac_observer_endpoint = None;
dac_timeout = None;
pre_images_endpoint = None;
batcher = Configuration.default_batcher;
injector = Configuration.default_injector;
l1_blocks_cache_size;
l2_blocks_cache_size;
index_buffer_size = Some index_buffer_size;
irmin_cache_size = Some irmin_cache_size;
prefetch_blocks = None;
l1_rpc_timeout;
loop_retry_delay = 10.;
log_kernel_debug = false;
no_degraded = false;
gc_parameters = Configuration.default_gc_parameters;
history_mode = None;
cors = Resto_cohttp.Cors.default;
}
in
let* lockfile = lock ~data_dir in
let* store =
Node_context.Node_store.load
Read_write
~index_buffer_size
~l2_blocks_cache_size
Configuration.(default_storage_dir data_dir)
in
let*? (module Plugin : Protocol_plugin_sig.S) =
Protocol_plugins.proto_plugin_for_protocol current_protocol.hash
in
let (module C) = Plugin.Pvm.context kind in
let* context =
Context.load
(module C)
Read_write
(Configuration.default_context_dir data_dir)
~cache_size:irmin_cache_size
in
let genesis_info = {level = 0l; commitment_hash = Commitment.Hash.zero} in
let l1_ctxt = Layer1.Internal_for_tests.dummy cctxt in
let lcc = Reference.new_ {commitment = Commitment.Hash.zero; level = 0l} in
let lpc = Reference.new_ None in
let* () =
Node_context.Internal_for_tests.write_protocols_in_store
store
[
Store.Protocols.
{
level = Activation_level 0l;
proto_level = current_protocol.proto_level;
protocol = current_protocol.hash;
};
]
in
let global_block_watcher = Lwt_watcher.create_input () in
let sync = create_sync_info () in
let*? unsafe_patches = Pvm_patches.make kind rollup_address [] in
return
{
config;
cctxt = (cctxt :> Client_context.full);
dal_cctxt = None;
dac_client = None;
data_dir;
l1_ctxt;
genesis_info;
lcc;
lpc;
private_info = Reference.new_ None;
kind;
unsafe_patches;
injector_retention_period = 0;
block_finality_time = 2;
current_protocol = Reference.new_ current_protocol;
lockfile;
store;
context;
kernel_debug_logger = Event.kernel_debug;
finaliser = (fun () -> Lwt.return_unit);
global_block_watcher;
sync;
}
let openapi_context cctxt protocol =
let current_protocol =
{
hash = protocol;
proto_level = 0;
constants =
Rollup_constants.
{
minimal_block_delay = 0L;
delay_increment_per_round = 0L;
sc_rollup =
{
challenge_window_in_blocks = 0;
commitment_period_in_blocks = 0;
reveal_activation_level =
Some
{
blake2B = 0l;
metadata = 0l;
dal_page = 0l;
dal_parameters = 0l;
dal_attested_slots_validity_lag = Int32.max_int;
};
max_number_of_stored_cemented_commitments = 0;
};
dal =
{
feature_enable = false;
attestation_lag = 0;
number_of_slots = 0;
cryptobox_parameters =
{
redundancy_factor = 0;
page_size = 0;
slot_size = 0;
number_of_shards = 0;
};
};
};
}
in
Lwt_utils_unix.with_tempdir "smart-rollup-node-openapi" @@ fun data_dir ->
let open Lwt_result_syntax in
let* node_ctxt =
create_node_context cctxt current_protocol ~data_dir Wasm_2_0_0
in
let*! () = Context.close node_ctxt.context in
let* () = Node_context.Node_store.close node_ctxt.store in
return node_ctxt
end