Source file forge_worker.ml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
open Baking_state
open Protocol
open Alpha_context
module Events = struct
include Baking_events.Forge_worker
include Baking_events.Actions
end
module Delegate_signing_queue = struct
type t = {
delegate : consensus_key_and_delegate;
task_stream : (unit -> unit Lwt.t) Lwt_stream.t;
push : (unit -> unit Lwt.t) option -> unit;
worker : unit Lwt.t;
}
let start_delegate_worker_queue task_stream =
let open Lwt_syntax in
let rec loop () =
let* task = Lwt_stream.get task_stream in
match task with
| None -> return_unit
| Some task ->
let* () = task () in
loop ()
in
loop ()
let create delegate =
let task_stream, push = Lwt_stream.create () in
let worker = start_delegate_worker_queue task_stream in
{delegate; task_stream; push; worker}
let cancel_pending_tasks state = Lwt_stream.junk_old state.task_stream
let wait_all_tasks_and_close state =
state.push None ;
state.worker
let cancel_all_tasks_and_close state =
let open Lwt_syntax in
let* () = cancel_pending_tasks state in
wait_all_tasks_and_close state
let push_task ~(on_error : tztrace -> unit Lwt.t)
(f : unit -> unit tzresult Lwt.t) state =
let open Lwt_result_syntax in
let task () =
let*! r =
protect
~on_error:(fun trace ->
let*! () = on_error trace in
return_unit)
(fun () -> f ())
in
match r with Error _err -> assert false | Ok () -> Lwt.return_unit
in
state.push (Some task)
end
type worker = {
push_task : forge_request option -> unit;
push_event : forge_event option -> unit;
event_stream : forge_event Lwt_stream.t;
delegate_signing_queues :
Delegate_signing_queue.t Signature.Public_key_hash.Table.t;
}
type t = worker
let push_request state request = state.push_task (Some request)
let get_event_stream state = state.event_stream
let cancel_all_pending_tasks {delegate_signing_queues; _} =
Lwt.dont_wait
(fun () ->
Signature.Public_key_hash.Table.iter_p
(fun _ queue -> Delegate_signing_queue.cancel_pending_tasks queue)
delegate_signing_queues)
(fun _exn -> ())
let shutdown state =
let open Lwt_syntax in
let* () =
Signature.Public_key_hash.Table.iter_p
(fun _ queue -> Delegate_signing_queue.cancel_all_tasks_and_close queue)
state.delegate_signing_queues
in
state.push_task None ;
return_unit
let get_or_create_queue worker delegate =
match
Signature.Public_key_hash.Table.find_opt
worker.delegate_signing_queues
(fst delegate).public_key_hash
with
| None ->
let queue = Delegate_signing_queue.create delegate in
Signature.Public_key_hash.Table.add
worker.delegate_signing_queues
(fst delegate).public_key_hash
queue ;
queue
| Some queue -> queue
let handle_forge_block worker baking_state (block_to_bake : block_to_bake) =
let open Lwt_result_syntax in
let task () =
let* prepared_block =
Baking_actions.prepare_block baking_state block_to_bake
in
worker.push_event (Some (Block_ready prepared_block)) ;
return_unit
in
let queue = get_or_create_queue worker block_to_bake.delegate in
Delegate_signing_queue.push_task
~on_error:(fun err ->
let*! () =
Events.(emit failed_to_forge_block (block_to_bake.delegate, err))
in
Lwt.return_unit)
task
queue
let handle_forge_consensus_votes worker baking_state
(unsigned_consensus_votes : unsigned_consensus_vote_batch) =
let open Lwt_result_syntax in
let batch_branch = unsigned_consensus_votes.batch_branch in
let task
({vote_consensus_content; vote_kind; delegate; dal_content = _} as
unsigned_consensus_vote) =
let*! signed_consensus_vote_r =
Baking_actions.forge_and_sign_consensus_vote
baking_state
~branch:batch_branch
unsigned_consensus_vote
in
match signed_consensus_vote_r with
| Error err ->
let level, round =
( Raw_level.to_int32 vote_consensus_content.level,
vote_consensus_content.round )
in
let*! () =
Events.(
emit skipping_consensus_vote (vote_kind, delegate, level, round, err))
in
fail err
| Ok signed_consensus_vote -> (
match vote_kind with
| Preattestation ->
worker.push_event
(Some (Preattestation_ready signed_consensus_vote)) ;
return_unit
| Attestation ->
worker.push_event (Some (Attestation_ready signed_consensus_vote)) ;
return_unit)
in
let* (authorized_consensus_votes : unsigned_consensus_vote list) =
protect
~on_error:(fun err ->
let*! () = Events.(emit error_while_authorizing_consensus_votes err) in
return_nil)
(fun () ->
Baking_actions.authorized_consensus_votes
baking_state
unsigned_consensus_votes)
in
List.iter
(fun unsigned_preattestation ->
let queue = get_or_create_queue worker unsigned_preattestation.delegate in
Delegate_signing_queue.push_task
~on_error:(fun _err -> Lwt.return_unit)
(fun () -> task unsigned_preattestation)
queue)
authorized_consensus_votes ;
return_unit
let start (baking_state : Baking_state.global_state) =
let open Lwt_result_syntax in
let task_stream, push_task = Lwt_stream.create () in
let event_stream, push_event = Lwt_stream.create () in
let delegate_signing_queues = Signature.Public_key_hash.Table.create 13 in
let state : worker =
{push_task; push_event; event_stream; delegate_signing_queues}
in
let rec worker_loop () =
let*! (forge_request_opt : forge_request option) =
Lwt_stream.get task_stream
in
let process_request = function
| Forge_and_sign_block block_to_bake ->
handle_forge_block state baking_state block_to_bake ;
return_unit
| Forge_and_sign_preattestations {unsigned_preattestations} ->
handle_forge_consensus_votes
state
baking_state
unsigned_preattestations
| Forge_and_sign_attestations {unsigned_attestations} ->
handle_forge_consensus_votes state baking_state unsigned_attestations
in
match forge_request_opt with
| None -> return_unit
| Some request ->
let*! result = process_request request in
let*! () =
match result with
| Ok () -> Lwt.return_unit
| Error errs ->
let*! () =
Events.(emit error_while_processing_forge_request errs)
in
Lwt.return_unit
in
worker_loop ()
in
Lwt.dont_wait
(fun () ->
Lwt.finalize
(fun () ->
let*! _r = worker_loop () in
Lwt.return_unit)
(fun () ->
let () = Lwt.dont_wait (fun () -> shutdown state) (fun _exn -> ()) in
Lwt.return_unit))
(fun _exn -> ()) ;
state