Source file injection_directory.ml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
let read_chain_id validator chain =
let open Lwt_syntax in
let distributed_db = Validator.distributed_db validator in
let store = Distributed_db.store distributed_db in
match chain with
| None -> Lwt.return_none
| Some chain ->
let* v = Chain_directory.get_chain_id store chain in
Lwt.return_some v
let inject_block validator ?force ?chain bytes operations =
let open Lwt_result_syntax in
let*! chain_id = read_chain_id validator chain in
let* hash, block =
Validator.validate_block validator ?force ?chain_id bytes operations
in
return
( hash,
let* _ = block in
return_unit )
let inject_operation validator ~force ?chain bytes =
let open Lwt_result_syntax in
let*! chain_id = read_chain_id validator chain in
match Data_encoding.Binary.of_bytes_opt Operation.encoding bytes with
| None -> failwith "Can't parse the operation"
| Some op ->
let t = Validator.inject_operation validator ~force ?chain_id op in
let hash = Operation.hash op in
return (hash, t)
let inject_operations validator ~force ?chain bytes_list =
let open Lwt_result_syntax in
let* rev_hashes, rev_promises =
List.fold_left_es
(fun (hashes, promises) bytes ->
let* hash, promise = inject_operation validator ~force ?chain bytes in
return (hash :: hashes, promise :: promises))
([], [])
bytes_list
in
let hashes = List.rev rev_hashes in
let promises = List.rev rev_promises in
let fold_errors (has_failed, result) promise_result oph =
match promise_result with
| Ok () ->
( has_failed,
Injection_services.Injection_operation_succeed_case oph :: result )
| Error trace ->
( true,
List.rev_append trace
@@ (Injection_services.Injection_operation_error_case oph :: result)
)
in
let join_results l =
let has_failed, result =
WithExceptions.Result.get_ok ~loc:__LOC__
@@ List.fold_left2
~when_different_lengths:()
fold_errors
(false, [])
l
hashes
in
if not has_failed then Ok ()
else
Result_syntax.fail
(Injection_services.Injection_operations_error :: List.rev result)
in
let t = Lwt.map join_results (Lwt.all promises) in
return (hashes, t)
let inject_protocol store proto =
let open Lwt_result_syntax in
let proto_bytes = Data_encoding.Binary.to_bytes_exn Protocol.encoding proto in
let hash = Protocol_hash.hash_bytes [proto_bytes] in
let validation =
let*! b = Updater.compile hash proto in
match b with
| false -> failwith "Compilation failed (%a)" Protocol_hash.pp_short hash
| true -> (
let*! o = Store.Protocol.store store hash proto in
match o with
| None ->
failwith
"Previously registered protocol (%a)"
Protocol_hash.pp_short
hash
| Some _ -> return_unit)
in
Lwt.return (hash, validation)
let build_rpc_directory validator =
let open Lwt_result_syntax in
let distributed_db = Validator.distributed_db validator in
let state = Distributed_db.store distributed_db in
let dir : unit Tezos_rpc.Directory.t ref = ref Tezos_rpc.Directory.empty in
let register0 s f =
dir := Tezos_rpc.Directory.register !dir s (fun () p q -> f p q)
in
let inject_operation ~force q contents =
let* hash, wait =
inject_operation validator ~force ?chain:q#chain contents
in
let* () = if q#async then return_unit else wait in
return hash
in
let inject_operations q contents =
let* hashes, wait =
inject_operations validator ~force:q#force ?chain:q#chain contents
in
let* () = if q#async then return_unit else wait in
return hashes
in
register0 Injection_services.S.block (fun q (raw, operations) ->
let* hash, wait =
inject_block validator ?chain:q#chain ~force:q#force raw operations
in
let* () = if q#async then return_unit else wait in
return hash) ;
register0 Injection_services.S.operation (inject_operation ~force:false) ;
register0
Injection_services.S.private_operation
(inject_operation ~force:true) ;
register0 Injection_services.S.private_operations inject_operations ;
register0 Injection_services.S.protocol (fun q protocol ->
let*! hash, wait = inject_protocol state protocol in
let* () = if q#async then return_unit else wait in
return hash) ;
!dir