1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075(*****************************************************************************)(* *)(* Open Source License *)(* Copyright (c) 2022 Nomadic Labs, <contact@nomadic-labs.com> *)(* *)(* Permission is hereby granted, free of charge, to any person obtaining a *)(* copy of this software and associated documentation files (the "Software"),*)(* to deal in the Software without restriction, including without limitation *)(* the rights to use, copy, modify, merge, publish, distribute, sublicense, *)(* and/or sell copies of the Software, and to permit persons to whom the *)(* Software is furnished to do so, subject to the following conditions: *)(* *)(* The above copyright notice and this permission notice shall be included *)(* in all copies or substantial portions of the Software. *)(* *)(* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR*)(* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, *)(* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL *)(* THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER*)(* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING *)(* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER *)(* DEALINGS IN THE SOFTWARE. *)(* *)(*****************************************************************************)openProtocol_client_contextopenProtocolopenAlpha_contextopenInjector_commonopenInjector_worker_typesopenInjector_sigsopenInjector_errors(* This is the Tenderbake finality for blocks. *)(* TODO: https://gitlab.com/tezos/tezos/-/issues/2815
Centralize this and maybe make it configurable. *)letconfirmations=2typeinjection_strategy=[`Each_block|`Delay_block](* TODO/TORU: https://gitlab.com/tezos/tezos/-/issues/2755
Persist injector data on disk *)(** Builds a client context from another client context but uses logging instead
of printing on stdout directly. This client context cannot make the injector
exit. *)letinjector_context(cctxt:#Protocol_client_context.full)=letlog_channelmsg=Logs_lwt.info(funm->m"%s"msg)inobjectinheritProtocol_client_context.wrap_full(newClient_context.proxy_context(cctxt:>Client_context.full))inherit!Client_context.simple_printerlogmethod!exitcode=Format.ksprintfStdlib.failwith"Injector client wants to exit %d"codeendmoduleMake(Rollup:PARAMETERS)=structmoduleTags=Injector_tags.Make(Rollup.Tag)moduleTags_table=Hashtbl.Make(Rollup.Tag)moduleOp_queue=Disk_persistence.Make_queue(structletname="operations_queue"end)(L1_operation.Hash)(L1_operation)(** Information stored about an L1 operation that was injected on a Tezos
node. *)typeinjected_info={op:L1_operation.t;(** The L1 manager operation. *)oph:Operation_hash.t;(** The hash of the operation which contains [op] (this can be an L1 batch of
several manager operations). *)}moduleInjected_operations=Disk_persistence.Make_table(structincludeL1_operation.Hash.Tabletypevalue=injected_infoletname="injected_operations"letstring_of_key=L1_operation.Hash.to_b58checkletkey_of_string=L1_operation.Hash.of_b58check_optletvalue_encoding=letopenData_encodinginconv(fun{op;oph}->(oph,op))(fun(oph,op)->{op;oph})@@merge_objs(obj1(req"oph"Operation_hash.encoding))L1_operation.encodingend)moduleInjected_ophs=Disk_persistence.Make_table(structincludeOperation_hash.Tabletypevalue=L1_operation.Hash.tlistletname="injected_ophs"letstring_of_key=Operation_hash.to_b58checkletkey_of_string=Operation_hash.of_b58check_optletvalue_encoding=Data_encoding.listL1_operation.Hash.encodingend)(** The part of the state which gathers information about injected
operations (but not included). *)typeinjected_state={injected_operations:Injected_operations.t;(** A table mapping L1 manager operation hashes to the injection info for that
operation. *)injected_ophs:Injected_ophs.t;(** A mapping of all L1 manager operations contained in a L1 batch (i.e. an L1
operation). *)}(** Information stored about an L1 operation that was included in a Tezos
block. *)typeincluded_info={op:L1_operation.t;(** The L1 manager operation. *)oph:Operation_hash.t;(** The hash of the operation which contains [op] (this can be an L1 batch of
several manager operations). *)l1_block:Block_hash.t;(** The hash of the L1 block in which the operation was included. *)l1_level:int32;(** The level of [l1_block]. *)}moduleIncluded_operations=Disk_persistence.Make_table(structincludeL1_operation.Hash.Tabletypevalue=included_infoletname="included_operations"letstring_of_key=L1_operation.Hash.to_b58checkletkey_of_string=L1_operation.Hash.of_b58check_optletvalue_encoding=letopenData_encodinginconv(fun{op;oph;l1_block;l1_level}->(op,(oph,l1_block,l1_level)))(fun(op,(oph,l1_block,l1_level))->{op;oph;l1_block;l1_level})@@merge_objsL1_operation.encoding(obj3(req"oph"Operation_hash.encoding)(req"l1_block"Block_hash.encoding)(req"l1_level"int32))end)moduleIncluded_in_blocks=Disk_persistence.Make_table(structincludeBlock_hash.Tabletypevalue=int32*L1_operation.Hash.tlistletname="included_in_blocks"letstring_of_key=Block_hash.to_b58checkletkey_of_string=Block_hash.of_b58check_optletvalue_encoding=letopenData_encodinginobj2(req"level"int32)(req"l1_ops"(listL1_operation.Hash.encoding))end)(** The part of the state which gathers information about
operations which are included in the L1 chain (but not confirmed). *)typeincluded_state={included_operations:Included_operations.t;included_in_blocks:Included_in_blocks.t;}(** The internal state of each injector worker. *)typestate={cctxt:Protocol_client_context.full;(** The client context which is used to perform the injections. *)signer:signer;(** The signer for this worker. *)tags:Tags.t;(** The tags of this worker, for both informative and identification
purposes. *)strategy:injection_strategy;(** The strategy of this worker for injecting the pending operations. *)save_dir:string;(** Path to where save persistent state *)queue:Op_queue.t;(** The queue of pending operations for this injector. *)injected:injected_state;(** The information about injected operations. *)included:included_state;(** The information about included operations. {b Note}: Operations which
are confirmed are simply removed from the state and do not appear
anymore. *)rollup_node_state:Rollup.rollup_node_state;(** The state of the rollup node. *)}moduleEvent=structincludeInjector_events.Make(Rollup)letemit1estatex=emite(state.signer.pkh,state.tags,x)letemit2estatexy=emite(state.signer.pkh,state.tags,x,y)letemit3estatexyz=emite(state.signer.pkh,state.tags,x,y,z)endletinit_injectorcctxt~data_dirrollup_node_state~signerstrategytags=letopenLwt_result_syntaxinlet*signer=get_signercctxtsignerinletdata_dir=Filename.concatdata_dir"injector"inlet*!()=Lwt_utils_unix.create_dirdata_dirinletfilterop_projop=let{L1_operation.manager_operation=Managerop;_}=op_projopinmatchRollup.operation_tagopwith|None->false|Somet->Tags.memttagsinletwarn_unreadable=(* Warn of corrupted files but don't fail *)Some(funfileerror->Event.(emitcorrupted_operation_on_disk)(signer.pkh,tags,file,error))inletemit_event_loadedkindnb=Event.(emitloaded_from_disk)(signer.pkh,tags,nb,kind)inlet*queue=Op_queue.load_from_disk~warn_unreadable~capacity:50_000~data_dir~filter:(filter(funop->op))inlet*!()=emit_event_loaded"operations_queue"@@Op_queue.lengthqueuein(* Very coarse approximation for the number of operation we expect for each
block *)letn=Tags.fold(funtacc->acc+Rollup.table_estimated_sizet)tags0inlet*injected_operations=Injected_operations.load_from_disk~warn_unreadable~initial_size:n~data_dir~filter:(filter(fun(i:injected_info)->i.op))inlet*!()=emit_event_loaded"injected_operations"@@Injected_operations.lengthinjected_operationsinlet*included_operations=Included_operations.load_from_disk~warn_unreadable~initial_size:(confirmations*n)~data_dir~filter:(filter(fun(i:included_info)->i.op))inlet*!()=emit_event_loaded"included_operations"@@Included_operations.lengthincluded_operationsinlet*injected_ophs=Injected_ophs.load_from_disk~warn_unreadable~initial_size:n~data_dir~filter:(List.exists(Injected_operations.meminjected_operations))inlet*!()=emit_event_loaded"injected_ophs"@@Injected_ophs.lengthinjected_ophsinlet*included_in_blocks=Included_in_blocks.load_from_disk~warn_unreadable~initial_size:(confirmations*n)~data_dir~filter:(fun(_,ops)->List.exists(Included_operations.memincluded_operations)ops)inlet*!()=emit_event_loaded"included_in_blocks"@@Included_in_blocks.lengthincluded_in_blocksinreturn{cctxt=injector_context(cctxt:>#Protocol_client_context.full);signer;tags;strategy;save_dir=data_dir;queue;injected={injected_operations;injected_ophs};included={included_operations;included_in_blocks};rollup_node_state;}(** Add an operation to the pending queue corresponding to the signer for this
operation. *)letadd_pending_operationstateop=letopenLwt_result_syntaxinlet*!()=Event.(emit1add_pending)stateopinOp_queue.replacestate.queueop.L1_operation.hashop(** Mark operations as injected (in [oph]). *)letadd_injected_operationsstateophoperations=letopenLwt_result_syntaxinletinfos=List.map(funop->(op.L1_operation.hash,{op;oph}))operationsinlet*()=Injected_operations.replace_seqstate.injected.injected_operations(List.to_seqinfos)inInjected_ophs.replacestate.injected.injected_ophsoph(List.mapfstinfos)(** [add_included_operations state oph l1_block l1_level operations] marks the
[operations] as included (in the L1 batch [oph]) in the Tezos block
[l1_block] of level [l1_level]. *)letadd_included_operationsstateophl1_blockl1_leveloperations=letopenLwt_result_syntaxinlet*!()=Event.(emit3included)statel1_blockl1_level(List.map(funo->o.L1_operation.hash)operations)inletinfos=List.map(funop->(op.L1_operation.hash,{op;oph;l1_block;l1_level}))operationsinlet*()=Included_operations.replace_seqstate.included.included_operations(List.to_seqinfos)inIncluded_in_blocks.replacestate.included.included_in_blocksl1_block(l1_level,List.mapfstinfos)(** [remove state oph] removes the operations that correspond to the L1 batch
[oph] from the injected operations in the injector state. This function is
used to move operations from injected to included. *)letremove_injected_operationstateoph=letopenLwt_result_syntaxinmatchInjected_ophs.findstate.injected.injected_ophsophwith|None->(* Nothing removed *)return[]|Somemophs->let*()=Injected_ophs.removestate.injected.injected_ophsophinList.fold_left_es(funremovedmoph->matchInjected_operations.findstate.injected.injected_operationsmophwith|None->returnremoved|Someinfo->let+()=Injected_operations.removestate.injected.injected_operationsmophininfo::removed)[]mophs(** [remove state block] removes the included operations that correspond to all
the L1 batches included in [block]. This function is used when [block] is on
an alternative chain in the case of a reorganization. *)letremove_included_operationstateblock=letopenLwt_result_syntaxinmatchIncluded_in_blocks.findstate.included.included_in_blocksblockwith|None->(* Nothing removed *)return[]|Some(_level,mophs)->let*()=Included_in_blocks.removestate.included.included_in_blocksblockinList.fold_left_es(funremovedmoph->matchIncluded_operations.findstate.included.included_operationsmophwith|None->returnremoved|Someinfo->let+()=Included_operations.removestate.included.included_operationsmophininfo::removed)[]mophsletfee_parameter_of_operationsstateops=List.fold_left(funacc{L1_operation.manager_operation=Managerop;_}->letparam=Rollup.fee_parameterstateopinInjection.{minimal_fees=Tez.maxacc.minimal_feesparam.minimal_fees;minimal_nanotez_per_byte=Q.maxacc.minimal_nanotez_per_byteparam.minimal_nanotez_per_byte;minimal_nanotez_per_gas_unit=Q.maxacc.minimal_nanotez_per_gas_unitparam.minimal_nanotez_per_gas_unit;force_low_fee=acc.force_low_fee||param.force_low_fee;fee_cap=WithExceptions.Result.get_ok~loc:__LOC__Tez.(acc.fee_cap+?param.fee_cap);burn_cap=WithExceptions.Result.get_ok~loc:__LOC__Tez.(acc.burn_cap+?param.burn_cap);})Injection.{minimal_fees=Tez.zero;minimal_nanotez_per_byte=Q.zero;minimal_nanotez_per_gas_unit=Q.zero;force_low_fee=false;fee_cap=Tez.zero;burn_cap=Tez.zero;}ops(** Returns the first half of the list [ops] if there is more than two
elements, or [None] otherwise. *)letkeep_halfops=lettotal=List.lengthopsiniftotal<=1thenNoneelseSome(List.take_n(total/2)ops)(** [simulate_operations ~must_succeed state operations] simulates the
injection of [operations] and returns a triple [(op, ops, results)] where
[op] is the packed operation with the adjusted limits, [ops] is the prefix
of [operations] which was considered (because it did not exceed the
quotas) and [results] are the results of the simulation. See
{!inject_operations} for the specification of [must_succeed]. *)letrecsimulate_operations~must_succeedstate(operations:L1_operation.tlist)=letopenLwt_result_syntaxinletopenAnnotated_manager_operationinletforce=matchoperationswith|[]->assertfalse|[_]->(* If there is only one operation, fail when simulation fails *)false|_->((* We want to see which operation failed in the batch if not all must
succeed *)matchmust_succeedwith`All->false|`At_least_one->true)inlet*!()=Event.(emit2simulating_operations)stateoperationsforceinletfee_parameter=fee_parameter_of_operationsstate.rollup_node_stateoperationsinletannotated_operations=List.map(fun{L1_operation.manager_operation=Manageroperation;_}->Annotated_manager_operation(Injection.prepare_manager_operation~fee:Limit.unknown~gas_limit:Limit.unknown~storage_limit:Limit.unknownoperation))operationsinlet(Manager_listannot_op)=Annotated_manager_operation.manager_of_listannotated_operationsinlet*!simulation_result=Injection.inject_manager_operationstate.cctxt~simulation:true(* Only simulation here *)~force~chain:state.cctxt#chain~block:(`Head0)~source:state.signer.pkh~src_pk:state.signer.pk~src_sk:state.signer.sk~successor_level:true(* Needed to simulate tx_rollup operations in the next block *)~fee:Limit.unknown~gas_limit:Limit.unknown~storage_limit:Limit.unknown~fee_parameterannot_opinmatchsimulation_resultwith|Errortrace->letexceeds_quota=TzTrace.fold(funexceeds->function|Environment.Ecoproto_error(Gas.Block_quota_exceeded|Gas.Operation_quota_exceeded)->true|_->exceeds)falsetraceinifexceeds_quotathen(* We perform a dichotomy by injecting the first half of the
operations (we are not looking to maximize the number of operations
injected because of the cost of simulation). Only the operations
which are actually injected will be removed from the queue so the
other half will be reconsidered later. *)matchkeep_halfoperationswith|None->failtrace|Someoperations->simulate_operations~must_succeedstateoperationselsefailtrace|Ok(_,op,_,result)->return(op,operations,Apply_results.Contents_result_listresult)letinject_on_nodestate~nb{shell;protocol_data=Operation_data{contents;_}}=letopenLwt_result_syntaxinletunsigned_op=(shell,Contents_listcontents)inletunsigned_op_bytes=Data_encoding.Binary.to_bytes_exnOperation.unsigned_encodingunsigned_opinlet*signature=Client_keys_v0.signstate.cctxt~watermark:Tezos_crypto.Signature.V0.Generic_operationstate.signer.skunsigned_op_bytesinletop:_Operation.t={shell;protocol_data={contents;signature=Somesignature}}inletop_bytes=Data_encoding.Binary.to_bytes_exnOperation.encoding(Operation.packop)inTezos_shell_services.Shell_services.Injection.operationstate.cctxt~chain:state.cctxt#chainop_bytes>>=?funoph->let*!()=Event.(emit2injected)statenbophinreturnoph(** Inject the given [operations] in an L1 batch. If [must_succeed] is [`All]
then all the operations must succeed in the simulation of injection. If
[must_succeed] is [`At_least_one] at least one operation in the list
[operations] must be successful in the simulation. In any case, only
operations which are known as successful will be included in the injected L1
batch. {b Note}: [must_succeed = `At_least_one] allows to incrementally build
"or-batches" by iteratively removing operations that fail from the desired
batch. *)letrecinject_operations~must_succeedstate(operations:L1_operation.tlist)=letopenLwt_result_syntaxinlet*packed_op,operations,result=trace(Step_failed"simulation")@@simulate_operations~must_succeedstateoperationsinletresults=Apply_results.to_listresultinletfailure=reffalseinlet*rev_non_failing_operations=List.fold_left2_s~when_different_lengths:[Exn(Failure"Unexpected error: length of operations and result differ in \
simulation");](funaccop(Apply_results.Contents_resultresult)->matchresultwith|Apply_results.Manager_operation_result{operation_result=Failed(_,error)|Backtracked(_,Someerror);_;}->let*!()=Event.(emit2dropping_operation)stateoperrorinfailure:=true;Lwt.returnacc|Apply_results.Manager_operation_result{operation_result=Applied_|Backtracked(_,None)|Skipped_;_;}->(* Not known to be failing *)Lwt.return(op::acc)|_->(* Only manager operations *)assertfalse)[]operationsresultsinif!failurethen(* Invariant: must_succeed = `At_least_one, otherwise the simulation would have
returned an error. We try to inject without the failing operation. *)letoperations=List.revrev_non_failing_operationsininject_operations~must_succeedstateoperationselse(* Inject on node for real *)let+oph=trace(Step_failed"injection")@@inject_on_node~nb:(List.lengthoperations)statepacked_opin(oph,operations)(** Returns the (upper bound on) the size of an L1 batch of operations composed
of the manager operations [rev_ops]. *)letsize_l1_batchstaterev_ops=letcontents_list=List.map(fun(op:L1_operation.t)->let(Manageroperation)=op.manager_operationinlet{fee;counter;gas_limit;storage_limit}=Rollup.approximate_fee_boundstate.rollup_node_stateoperationinletcontents=Manager_operation{source=state.signer.pkh;operation;fee;counter;gas_limit;storage_limit;}inContentscontents)rev_opsinlet(Contents_listcontents)=matchOperation.of_listcontents_listwith|Error_->(* Cannot happen: rev_ops is non empty and contains only manager
operations *)assertfalse|Okpacked_contents_list->packed_contents_listinletsignature=Tezos_crypto.Signature.V0.zeroinletbranch=Block_hash.zeroinletoperation={shell={branch};protocol_data=Operation_data{contents;signature=Somesignature};}inData_encoding.Binary.lengthOperation.encodingoperation(** Retrieve as many operations from the queue while remaining below the size
limit. *)letget_operations_from_queue~size_limitstate=letexceptionReached_limitofL1_operation.tlistinletrev_ops=tryOp_queue.fold(fun_ophopops->letnew_ops=op::opsinletnew_size=size_l1_batchstatenew_opsinifnew_size>size_limitthenraise(Reached_limitops);new_ops)state.queue[]withReached_limitops->opsinList.revrev_ops(* Ignore the failures of finalize and remove commitment operations. These
operations fail when there are either no commitment to finalize or to remove
(which can happen when there are no inbox for instance). *)letignore_ignorable_failing_operationsoperations=function|Okres->Ok(`Injectedres)|Error_asres->letopenResult_syntaxinlet+operations_to_drop=List.fold_left_e(funto_dropop->let(Manageroperation)=op.L1_operation.manager_operationinmatchRollup.ignore_failing_operationoperationwith|`Don't_ignore->res|`Ignore_keep->Okto_drop|`Ignore_drop->Ok(op::to_drop))[]operationsin`Ignoredoperations_to_drop(** [inject_pending_operations_for ~size_limit state pending] injects
operations from the pending queue [pending], whose total size does
not exceed [size_limit]. Upon successful injection, the
operations are removed from the queue and marked as injected. *)letinject_pending_operations?(size_limit=Constants.max_operation_data_length)state=letopenLwt_result_syntaxin(* Retrieve and remove operations from pending *)letoperations_to_inject=get_operations_from_queue~size_limitstateinmatchoperations_to_injectwith|[]->return_unit|_->(let*!()=Event.(emit1injecting_pending)state(List.lengthoperations_to_inject)inletmust_succeed=Rollup.batch_must_succeed@@List.map(funop->op.L1_operation.manager_operation)operations_to_injectinlet*!res=inject_operations~must_succeedstateoperations_to_injectinlet*?res=ignore_ignorable_failing_operationsoperations_to_injectresinmatchreswith|`Injected(oph,injected_operations)->(* Injection succeeded, remove from pending and add to injected *)let*()=List.iter_es(funop->Op_queue.removestate.queueop.L1_operation.hash)injected_operationsinadd_injected_operationsstateophinjected_operations|`Ignoredoperations_to_drop->(* Injection failed but we ignore the failure. *)let*()=List.iter_es(funop->Op_queue.removestate.queueop.L1_operation.hash)operations_to_dropinreturn_unit)(** [register_included_operation state block level oph] marks the manager
operations contained in the L1 batch [oph] as being included in the [block]
of level [level], by moving them from the "injected" state to the "included"
state. *)letregister_included_operationstateblockleveloph=letopenLwt_result_syntaxinlet*rmed=remove_injected_operationstateophinmatchrmedwith|[]->return_unit|injected_infos->letincluded_mops=List.map(fun(i:injected_info)->i.op)injected_infosinadd_included_operationsstateophblocklevelincluded_mops(** [register_included_operations state block level oph] marks the known (by
this injector) manager operations contained in [block] as being included. *)letregister_included_operationsstate(block:Alpha_block_services.block_info)=List.iter_es(List.iter_es(fun(op:Alpha_block_services.operation)->register_included_operationstateblock.hashblock.header.shell.levelop.hash(* TODO/TORU: Handle operations for rollup_id here with
callback *)))block.Alpha_block_services.operations(** [revert_included_operations state block] marks the known (by this injector)
manager operations contained in [block] as not being included any more,
typically in the case of a reorganization where [block] is on an alternative
chain. The operations are put back in the pending queue. *)letrevert_included_operationsstateblock=letopenLwt_result_syntaxinlet*included_infos=remove_included_operationstateblockinlet*!()=Event.(emit1revert_operations)state(List.map(funo->o.op.hash)included_infos)in(* TODO/TORU: https://gitlab.com/tezos/tezos/-/issues/2814
maybe put at the front of the queue for re-injection. *)List.iter_es(fun{op;_}->let{L1_operation.manager_operation=Managermop;_}=opinlet*!requeue=Rollup.requeue_reverted_operationstate.rollup_node_statemopinifrequeuethenadd_pending_operationstateopelsereturn_unit)included_infos(** [register_confirmed_level state confirmed_level] is called when the level
[confirmed_level] is known as confirmed. In this case, the operations of
block which are below this level are also considered as confirmed and are
removed from the "included" state. These operations cannot be part of a
reorganization so there will be no need to re-inject them anymore. *)letregister_confirmed_levelstateconfirmed_level=letopenLwt_result_syntaxinlet*!()=Event.(emitconfirmed_level)(state.signer.pkh,state.tags,confirmed_level)inIncluded_in_blocks.iter_es(funblock(level,_operations)->iflevel<=confirmed_levelthenlet*confirmed_ops=remove_included_operationstateblockinlet*!()=Event.(emit2confirmed_operations)statelevel(List.map(funo->o.op.hash)confirmed_ops)inreturn_unitelsereturn_unit)state.included.included_in_blocks(** [on_new_tezos_head state head reorg] is called when there is a new Tezos
head (with a potential reorganization [reorg]). It first reverts any blocks
that are in the alternative branch of the reorganization and then registers
the effect of the new branch (the newly included operation and confirmed
operations). *)leton_new_tezos_headstate(head:Alpha_block_services.block_info)(reorg:Alpha_block_services.block_inforeorg)=letopenLwt_result_syntaxinlet*!()=Event.(emit1new_tezos_head)statehead.hashinlet*()=List.iter_es(funremoved_block->revert_included_operationsstateremoved_block.Alpha_block_services.hash)(List.revreorg.old_chain)inlet*()=List.iter_es(funadded_block->register_included_operationsstateadded_block)reorg.new_chainin(* Head is already included in the reorganization, so no need to process it
separately. *)letconfirmed_level=Int32.subhead.Alpha_block_services.header.shell.level(Int32.of_intconfirmations)inifconfirmed_level>=0lthenregister_confirmed_levelstateconfirmed_levelelsereturn_unit(* The request {Request.Inject} triggers an injection of the operations
the pending queue. *)leton_injectstate=inject_pending_operationsstatemoduleTypes=structtypenonrecstate=statetypeparameters={cctxt:Protocol_client_context.full;data_dir:string;rollup_node_state:Rollup.rollup_node_state;strategy:injection_strategy;tags:Tags.t;}end(* The worker for the injector. *)moduleWorker=Worker.MakeSingle(Name)(Request)(Types)(* The queue for the requests to the injector worker is infinite. *)typeworker=Worker.infiniteWorker.queueWorker.tlettable=Worker.create_tableQueuelettags_table=Tags_table.create7moduleHandlers=structtypeself=workerleton_request:typerrequest_error.worker->(r,request_error)Request.t->(r,request_error)resultLwt.t=funwrequest->letstate=Worker.statewinmatchrequestwith|Request.Add_pendingop->(* The execution of the request handler is protected to avoid stopping the
worker in case of an exception. *)protect@@fun()->add_pending_operationstateop|Request.New_tezos_head(head,reorg)->protect@@fun()->on_new_tezos_headstateheadreorg|Request.Inject->protect@@fun()->on_injectstatetypelaunch_error=errortraceleton_launch_wsignerTypes.{cctxt;data_dir;rollup_node_state;strategy;tags}=trace(Step_failed"initialization")@@init_injectorcctxt~data_dirrollup_node_state~signerstrategytagsleton_error(typeab)wst(r:(a,b)Request.t)(errs:b):unittzresultLwt.t=letopenLwt_result_syntaxinletstate=Worker.statewinletrequest_view=Request.viewrinletemit_and_return_errorserrs=(* Errors do not stop the worker but emit an entry in the log. *)let*!()=Event.(emit3request_failed)staterequest_viewsterrsinreturn_unitinmatchrwith|Request.Add_pending_->emit_and_return_errorserrs|Request.New_tezos_head_->emit_and_return_errorserrs|Request.Inject->emit_and_return_errorserrsleton_completionwr_st=letstate=Worker.statewinmatchRequest.viewrwith|Request.View(Add_pending_|New_tezos_head_)->Event.(emit2request_completed_debug)state(Request.viewr)st|ViewInject->Event.(emit2request_completed_notice)state(Request.viewr)stleton_no_request_=Lwt.return_unitleton_closew=letstate=Worker.statewinTags.iter(Tags_table.removetags_table)state.tags;Lwt.return_unitend(* TODO/TORU: https://gitlab.com/tezos/tezos/-/issues/2754
Injector worker in a separate process *)letinit(cctxt:#Protocol_client_context.full)~data_dirrollup_node_state~signers=letopenLwt_result_syntaxinletsigners_map=List.fold_left(funacc(signer,strategy,tags)->lettags=Tags.of_listtagsinletstrategy,tags=matchTezos_crypto.Signature.V0.Public_key_hash.Map.find_optsigneraccwith|None->(strategy,tags)|Some(other_strategy,other_tags)->letstrategy=match(strategy,other_strategy)with|`Each_block,`Each_block->`Each_block|`Delay_block,_|_,`Delay_block->(* Delay_block strategy takes over because we can always wait a
little bit more to inject operation which are to be injected
"each block". *)`Delay_blockin(strategy,Tags.unionother_tagstags)inTezos_crypto.Signature.V0.Public_key_hash.Map.addsigner(strategy,tags)acc)Tezos_crypto.Signature.V0.Public_key_hash.Map.emptysignersinTezos_crypto.Signature.V0.Public_key_hash.Map.iter_es(funsigner(strategy,tags)->let+worker=Worker.launchtablesigner{cctxt=(cctxt:>Protocol_client_context.full);data_dir;rollup_node_state;strategy;tags;}(moduleHandlers)inignoreworker)signers_mapletworker_of_signersigner_pkh=matchWorker.find_opttablesigner_pkhwith|None->(* TODO: https://gitlab.com/tezos/tezos/-/issues/2818
maybe lazily start worker here *)error(No_worker_for_sourcesigner_pkh)|Someworker->okworkerletworker_of_tagtag=matchTags_table.find_opttags_tabletagwith|None->Format.kasprintf(funs->error(No_worker_for_tags))"%a"Rollup.Tag.pptag|Someworker->okworkerletadd_pending_operation?sourceop=letopenLwt_result_syntaxinletl1_operation=L1_operation.makeopinlet*?w=matchsourcewith|Somesource->worker_of_signersource|None->(matchRollup.operation_tagopwith|None->error(No_worker_for_operationl1_operation)|Sometag->worker_of_tagtag)inlet*!(_pushed:bool)=Worker.Queue.push_requestw(Request.Add_pendingl1_operation)inreturn_unitletnew_tezos_headhreorg=letopenLwt_syntaxinletworkers=Worker.listtableinList.iter_p(fun(_signer,w)->let*(_pushed:bool)=Worker.Queue.push_requestw(Request.New_tezos_head(h,reorg))inreturn_unit)workerslethas_tag_in~tagsstate=matchtagswith|None->(* Not filtering on tags *)true|Sometags->not(Tags.disjointstate.tagstags)lethas_strategy~strategystate=matchstrategywith|None->(* Not filtering on strategy *)true|Somestrategy->state.strategy=strategyletinject?tags?strategy()=letworkers=Worker.listtableinlettags=Option.mapTags.of_listtagsinList.iter_p(fun(_signer,w)->letopenLwt_syntaxinletworker_state=Worker.statewinifhas_tag_in~tagsworker_state&&has_strategy~strategyworker_statethenlet*_pushed=Worker.Queue.push_requestwRequest.Injectinreturn_unitelseLwt.return_unit)workersletshutdown()=letworkers=Worker.listtableinList.iter_p(fun(_signer,w)->Worker.shutdownw)workersend