123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602(*****************************************************************************)(* *)(* Open Source License *)(* Copyright (c) 2018 Dynamic Ledger Solutions, Inc. <contact@tezos.com> *)(* Copyright (c) 2018-2022 Nomadic Labs, <contact@nomadic-labs.com> *)(* *)(* Permission is hereby granted, free of charge, to any person obtaining a *)(* copy of this software and associated documentation files (the "Software"),*)(* to deal in the Software without restriction, including without limitation *)(* the rights to use, copy, modify, merge, publish, distribute, sublicense, *)(* and/or sell copies of the Software, and to permit persons to whom the *)(* Software is furnished to do so, subject to the following conditions: *)(* *)(* The above copyright notice and this permission notice shall be included *)(* in all copies or substantial portions of the Software. *)(* *)(* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR*)(* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, *)(* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL *)(* THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER*)(* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING *)(* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER *)(* DEALINGS IN THE SOFTWARE. *)(* *)(*****************************************************************************)(** Minimal delay between two mempool advertisements *)letadvertisement_delay=0.1(** Argument that will be provided to {!Worker.MakeGroup} to create
the prevalidator worker. *)moduleName=structtypet=Chain_id.t*Protocol_hash.tletencoding=Data_encoding.tup2Chain_id.encodingProtocol_hash.encodingletbase=["prevalidator"]letppfmt(chain_id,proto_hash)=Format.fprintffmt"%a:%a"Chain_id.pp_shortchain_idProtocol_hash.pp_shortproto_hashletequal(c1,p1)(c2,p2)=Chain_id.equalc1c2&&Protocol_hash.equalp1p2endopenPrevalidator_worker_state(** A prevalidator instance, tailored to a specific protocol (even if
it is not visible in this module type). *)moduletypeT=sigtypetypes_statevalget_rpc_directory:types_state->types_stateTezos_rpc.Directory.tlazy_tvalname:Name.tmoduleTypes:Worker_intf.TYPESwithtypestate=types_statemoduleWorker:Worker.Twithtype('a,'b)Request.t=('a,'b)Request.tandtypeRequest.view=Request.viewandtypeTypes.state=types_statetypeworker=Worker.infiniteWorker.queueWorker.tvalworker:workerLazy.tendopenShell_operationmoduleEvents=Prevalidator_eventsmoduleClassification=Prevalidator_classification(** This module encapsulates pending operations to maintain them in two
different data structure and avoid coslty repetitive convertions when
handling batches in [classify_pending_operations]. *)modulePending_ops=Prevalidator_pending_operations(** Module encapsulating some types that are used both in production
and in tests. Having them in a module makes it possible to
[include] this module in {!Internal_for_tests} below and avoid
code duplication.
The raison d'etre of these records of functions is to be able to use
alternative implementations of all functions in tests.
The purpose of the {!Tools.tools} record is to abstract away from {!Store.chain_store}.
Under the hood [Store.chain_store] requires an Irmin store on disk,
which makes it impractical for fast testing: every test would need
to create a temporary folder on disk which doesn't scale well.
The purpose of the {!Tools.worker_tools} record is to abstract away
from the {!Worker} implementation. This implementation is overkill
for testing: we don't need asynchronicity and concurrency in our
pretty basic existing tests. Having this abstraction allows to get
away with a much simpler state machine model of execution and
to have simpler test setup. *)moduleTools=struct(** Functions provided by {!Distributed_db} and {!Store.chain_store}
that are used in various places of the mempool. Gathered here so that we can test
the mempool without requiring a full-fledged [Distributed_db]/[Store.Chain_store]. *)type'prevalidation_ttools={advertise_current_head:mempool:Mempool.t->Store.Block.t->unit;(** [advertise_current_head mempool head] sends a
[Current_head (chain_id, head_header, mempool)] message to all known
active peers for the chain being considered. *)chain_tools:Store.Block.tClassification.chain_tools;(** Lower-level tools provided by {!Prevalidator_classification} *)flush:head:Store.Block.t->timestamp:Time.Protocol.t->'prevalidation_t->'prevalidation_ttzresultLwt.t;(** Create a new empty prevalidation state, recycling some elements
of the provided previous prevalidation state. *)fetch:?peer:P2p_peer.Id.t->?timeout:Time.System.Span.t->Operation_hash.t->Operation.ttzresultLwt.t;(** [fetch ?peer ?timeout oph] returns the value when it is known.
It can fail with [Requester.Timeout] if [timeout] is provided and the value
isn't known before the timeout expires. It can fail with [Requester.Cancel] if
the request is canceled. *)read_block:Block_hash.t->Store.Block.ttzresultLwt.t;(** [read_block bh] tries to read the block [bh] from the chain store. *)send_get_current_head:?peer:P2p_peer_id.t->unit->unit;(** [send_get_current_head ?peer ()] sends a [Get_Current_head]
to a given peer, or to all known active peers for the chain considered.
Expected answer is a [Get_current_head] message *)set_mempool:head:Block_hash.t->Mempool.t->unittzresultLwt.t;(** [set_mempool ~head mempool] sets the [mempool] of
the [chain_store] of the chain considered. Does nothing if [head] differs
from current_head which might happen when a new head concurrently arrives just
before this operation is being called. *)}(** Abstraction over services implemented in production by {!Worker}
but implemented differently in tests.
Also see the enclosing module documentation as to why we have this record. *)typeworker_tools={push_request:(unit,Empty.t)Prevalidator_worker_state.Request.t->boolLwt.t;(** Adds a message to the queue. *)push_request_now:(unit,Empty.t)Prevalidator_worker_state.Request.t->unit;(** Adds a message to the queue immediately. *)}endtype'aparameters={limits:Shell_limits.prevalidator_limits;tools:'aTools.tools;}(** The type needed for the implementation of [Make] below, but
* which is independent from the protocol. *)type('protocol_data,'a)types_state_shell={classification:'protocol_dataClassification.t;parameters:'aparameters;mutablepredecessor:Store.Block.t;mutabletimestamp:Time.System.t;mutablelive_blocks:Block_hash.Set.t;mutablelive_operations:Operation_hash.Set.t;mutablefetching:Operation_hash.Set.t;(** An operation is in [fetching] while the ddb is actively
requesting it from peers. It is removed from it when the
operation arrives or if the request fails (e.g. timeout). *)mutablepending:'protocol_dataPending_ops.t;mutablemempool:Mempool.t;mutableadvertisement:[`PendingofMempool.t|`None];mutablebanned_operations:Operation_hash.Set.t;worker:Tools.worker_tools;}letmetrics=Shell_metrics.Mempool.init["mempool"](** The concrete production instance of {!block_tools} *)letblock_tools:Store.Block.tClassification.block_tools={bhash=Store.Block.hash;operations=Store.Block.operations;all_operation_hashes=Store.Block.all_operation_hashes;}(** How to create an instance of {!chain_tools} from a {!Distributed_db.chain_db}. *)letmk_chain_tools(chain_db:Distributed_db.chain_db):Store.Block.tClassification.chain_tools=letopenLwt_syntaxinletnew_blocks~from_block~to_block=letchain_store=Distributed_db.chain_storechain_dbinStore.Chain_traversal.new_blockschain_store~from_block~to_blockinletread_predecessor_optblock=letchain_store=Distributed_db.chain_storechain_dbinStore.Block.read_predecessor_optchain_storeblockinletinject_operationophop=let*_=Distributed_db.inject_operationchain_dbophopinLwt.return_unitin{clear_or_cancel=Distributed_db.Operation.clear_or_cancelchain_db;inject_operation;new_blocks;read_predecessor_opt;}(** Module type used both in production and in tests. *)moduletypeS=sig(** Type instantiated by {!Prevalidation.T.config}. *)typeconfig(** Similar to the type [operation] from the protocol,
see {!Tezos_protocol_environment.PROTOCOL} *)typeprotocol_operation(** Type instantiated by {!Prevalidation.t} *)typeprevalidation_ttypetypes_state={shell:(protocol_operation,prevalidation_t)types_state_shell;mutablevalidation_state:prevalidation_t;(** Internal prevalidation state. Among others, this contains the
internal states of the protocol mempool and of the plugin. *)mutableoperation_stream:(Classification.classification*protocol_operationoperation)Lwt_watcher.input;mutablerpc_directory:types_stateTezos_rpc.Directory.tlazy_t;mutableconfig:config;lock:Lwt_mutex.t;}(** This function fetches an operation if it is not already handled
as defined by [already_handled] below. The implementation makes
sure to fetch an operation at most once, modulo operations
lost because of bounded buffers becoming full.
This function is an intruder to this module type. It just happens
that it is needed both by internals of the implementation of {!S}
and by the internals of the implementation of {!T}; so it needs
to be exposed here. *)valmay_fetch_operation:(protocol_operation,prevalidation_t)types_state_shell->P2p_peer_id.toption->Operation_hash.t->unit(** The function called after every call to a function of {!API}. *)valhandle_unprocessed:types_state->unitLwt.t(** The inner API of the mempool i.e. functions called by the worker
when an individual request arrives. These functions are the
most high-level ones that we test. All these [on_*] functions
correspond to a single event. Possible
sequences of calls to this API are always of the form:
on_*; handle_unprocessed; on_*; handle_unprocessed; ... *)moduleRequests:sigvalon_advertise:_types_state_shell->unitvalon_arrived:types_state->Operation_hash.t->Operation.t->(unit,Empty.t)resultLwt.tvalon_ban:types_state->Operation_hash.t->unittzresultLwt.tvalon_flush:handle_branch_refused:bool->types_state->Store.Block.t->Block_hash.Set.t->Operation_hash.Set.t->unittzresultLwt.tvalon_inject:types_state->force:bool->Operation.t->unittzresultLwt.tvalon_notify:_types_state_shell->P2p_peer_id.t->Mempool.t->unitendend(** A functor for obtaining the testable part of this file (see
the instantiation of this functor in {!Internal_for_tests} at the
end of this file). Contrary to the production-only functor {!Make} below,
this functor doesn't assume a specific chain store implementation,
which is the crux for having it easily unit-testable. *)moduleMake_s(Proto:Protocol_plugin.T)(Prevalidation_t:Prevalidation.Twithtypeprotocol_operation=Proto.operation):Swithtypeconfig=Prevalidation_t.configandtypeprotocol_operation=Proto.operationandtypeprevalidation_t=Prevalidation_t.t=structtypeconfig=Prevalidation_t.configtypeprotocol_operation=Proto.operationtypeprevalidation_t=Prevalidation_t.ttypetypes_state={shell:(protocol_operation,prevalidation_t)types_state_shell;mutablevalidation_state:prevalidation_t;mutableoperation_stream:(Classification.classification*protocol_operationoperation)Lwt_watcher.input;mutablerpc_directory:types_stateTezos_rpc.Directory.tlazy_t;mutableconfig:config;lock:Lwt_mutex.t;}letalready_handled~originshelloph=ifOperation_hash.Set.memophshell.banned_operationsthen((* In order to avoid data-races (for instance in
[may_fetch_operation]), this event is triggered
asynchronously which may lead to misordered events. *)ignore(Unit.catch_s(fun()->Events.(emitban_operation_encountered)(origin,oph)));true)elseClassification.is_in_mempoolophshell.classification<>None||Operation_hash.Set.memophshell.live_operations||Pending_ops.memophshell.pending||Classification.is_known_unparsableophshell.classificationletadvertise(shell:('operation_data,_)types_state_shell)mempool=letopenLwt_syntaxinmatchshell.advertisementwith|`Pending{Mempool.known_valid;pending}->shell.advertisement<-`Pending{known_valid=Operation_hash.Set.unionknown_validmempool.Mempool.known_valid;pending=Operation_hash.Set.unionpendingmempool.pending;}|`None->shell.advertisement<-`Pendingmempool;Lwt.dont_wait(fun()->let*()=Lwt_unix.sleepadvertisement_delayinshell.worker.push_request_nowAdvertise;Lwt.return_unit)(funexc->Format.eprintf"Uncaught exception: %s\n%!"(Printexc.to_stringexc))(* Each classified operation should be notified exactly ONCE for a
given stream. Operations which cannot be parsed are not notified. *)lethandle_classification~(notifier:Classification.classification->protocol_operationoperation->unit)shell(op,kind)=Classification.addkindopshell.classification;notifierkindopletmk_notifieroperation_streamclassificationop=(* This callback is safe encapsulation-wise, because it depends
on an "harmless" field of [types_state_shell]: [operation_stream] *)Lwt_watcher.notifyoperation_stream(classification,op)letpre_filterpv~notifierparsed_op:[Pending_ops.priority|`Drop]Lwt.t=letopenLwt_syntaxinlet+v=Prevalidation_t.pre_filterpv.validation_statepv.configparsed_opinmatchvwith|(`Branch_delayed_|`Branch_refused_|`Refused_|`Outdated_)aserrs->handle_classification~notifierpv.shell(parsed_op,errs);`Drop|`Passed_prefilterpriority->(priority:>[Pending_ops.priority|`Drop])letset_mempoolshellmempool=shell.mempool<-mempool;shell.parameters.tools.set_mempool~head:(Store.Block.hashshell.predecessor)shell.mempoolletremove_from_advertisementoph=function|`Pendingmempool->`Pending(Mempool.removeophmempool)|`None->`None(* This function retrieves an old/replaced operation and reclassifies it as
[replacement_classification].
The operation is expected to be (a) parsable and (b) in the "validated"
class. So, we softly handle the situations where the operation is
unparsable or not found in any class in case this invariant is broken
for some reason.
*)letreclassify_replaced_manager_opold_hashshell(replacement_classification:[<Classification.error_classification])=shell.advertisement<-remove_from_advertisementold_hashshell.advertisement;matchClassification.removeold_hashshell.classificationwith|Some(op,_class)->Some(op,(replacement_classification:>Classification.classification))|None->(* This case should not happen. *)shell.parameters.tools.chain_tools.clear_or_cancelold_hash;None(* Determine the classification of a given operation in the current
validation state, i.e. whether it could be included in a
block on top of the current head, and if not, why. If yes, the
operation is accumulated in the given [mempool].
The function returns a tuple
[(validation_state, mempool, to_handle)], where:
- [validation_state] is the (possibly) updated validation_state,
- [mempool] is the (possibly) updated mempool,
- [to_handle] contains the given operation and its classification, and all
operations whose classes are changed/impacted by this classification
(eg. in case of operation replacement).
*)letclassify_operationshell~config~validation_statemempoolop:(prevalidation_t*Mempool.t*(protocol_operationoperation*Classification.classification)trace)Lwt.t=letopenLwt_syntaxinlet*v_state,op,classification,replacements=Prevalidation_t.add_operationvalidation_stateconfigopinletto_replace=List.filter_map(fun(replaced_oph,new_classification)->reclassify_replaced_manager_opreplaced_ophshellnew_classification)replacementsinletto_handle=(op,classification)::to_replaceinletmempool=matchclassificationwith|`Validated->Mempool.cons_validop.hashmempool|`Branch_refused_|`Branch_delayed_|`Refused_|`Outdated_->mempoolinreturn(v_state,mempool,to_handle)(* Classify pending operations into either:
[Refused | Outdated | Branch_delayed | Branch_refused | Validated].
To ensure fairness with other worker requests, classification of
operations is done by batch of [operation_batch_size] operations.
This function ensures the following invariants:
- If an operation is classified, it is not part of the [pending]
map
- See {!type-Prevalidator_classification.t} for additional details
and invariants on the classifications themselves.
Moreover, this function ensures that only each newly classified
operations are advertised to the remote peers. However, if a peer
requests our mempool, we advertise all our classified operations and
all our pending operations. *)letclassify_pending_operations~notifiershellconfigstate=letopenLwt_syntaxinlet*r=Pending_ops.fold_es(fun_prioophop(acc_validation_state,acc_mempool,limit)->iflimit<=0then(* Using Error as an early-return mechanism *)Lwt.return_error(acc_validation_state,acc_mempool)else(shell.pending<-Pending_ops.removeophshell.pending;let*new_validation_state,new_mempool,to_handle=classify_operationshell~config~validation_state:acc_validation_stateacc_mempoolopinlet+()=Events.(emitoperation_reclassified)ophinList.iter(handle_classification~notifiershell)to_handle;Ok(new_validation_state,new_mempool,limit-1)))shell.pending(state,Mempool.empty,shell.parameters.limits.operations_batch_size)inmatchrwith|Error(state,advertised_mempool)->(* Early return after iteration limit was reached *)let*(_was_pushed:bool)=shell.worker.push_requestRequest.LeftoverinLwt.return(state,advertised_mempool)|Ok(state,advertised_mempool,_)->Lwt.return(state,advertised_mempool)letupdate_advertised_mempool_fieldspv_shelldelta_mempool=letopenLwt_syntaxinifMempool.is_emptydelta_mempoolthenLwt.return_unitelse(* We only advertise newly classified operations. *)letmempool_to_advertise=Mempool.{delta_mempoolwithknown_valid=delta_mempool.known_valid}inadvertisepv_shellmempool_to_advertise;letour_mempool=letvalidated_hashes=Classification.Sized_map.fold(funx_acc->Operation_hash.Set.addxacc)pv_shell.classification.validatedOperation_hash.Set.emptyin{Mempool.known_valid=validated_hashes;pending=Pending_ops.hashespv_shell.pending;}inlet*_res=set_mempoolpv_shellour_mempoolinLwt.pause()lethandle_unprocessedpv=letopenLwt_syntaxinletnotifier=mk_notifierpv.operation_streaminifPending_ops.is_emptypv.shell.pendingthenLwt.return_unitelselet*()=Events.(emitprocessing_operations)()inlet*validation_state,delta_mempool=classify_pending_operations~notifierpv.shellpv.configpv.validation_stateinpv.validation_state<-validation_state;update_advertised_mempool_fieldspv.shelldelta_mempool(* This function fetches one operation through the
[distributed_db]. On errors, we emit an event and proceed as
usual. *)letfetch_operation~notify_arrival(shell:('operation_data,_)types_state_shell)?peeroph=letopenLwt_syntaxinlet*()=Events.(emitfetching_operation)ophinlet*r=protect@@fun()->shell.parameters.tools.fetch~timeout:shell.parameters.limits.operation_timeout?peerophinmatchrwith|Okop->ifnotify_arrivalthenshell.worker.push_request_now(Arrived(oph,op));Lwt.return_unit|Errorerr->((* Make sure to remove the operation from fetching if the
retrieval fails. This only needs to be done once. *)ifnotify_arrivalthenshell.fetching<-Operation_hash.Set.removeophshell.fetching;matcherrwith|Distributed_db.Operation.Canceled_::_->Events.(emitoperation_included)oph|_->(* This may happen if the peer timed out for example. *)Events.(emitoperation_not_fetched)oph)(* This function fetches an operation if it is not already handled
by the mempool. To ensure we fetch at most a given operation,
we record it in the [pv.fetching] field.
Invariant: This function should be the only one to modify this
field.
Invariant: To ensure, there is no leak, we ensure that when the
promise [p] is terminated, we remove the operation from the
fetching operations. This is to ensure that if an error
happened, we can still fetch this operation in the future. *)letmay_fetch_operation(shell:('operation_data,_)types_state_shell)peeroph=letorigin=matchpeerwithSomepeer->Events.Peerpeer|None->Leftoverinletspawn_fetch_operation~notify_arrival=ignore(Unit.catch_s(fun()->fetch_operation~notify_arrivalshell?peeroph))inifOperation_hash.Set.memophshell.fetchingthen(* If the operation is already being fetched, we notify the DDB
that another peer may also be requested for the resource. In
any case, the initial fetching thread will still be resolved
and push an arrived worker request. *)spawn_fetch_operation~notify_arrival:falseelseifnot(already_handled~originshelloph)then(shell.fetching<-Operation_hash.Set.addophshell.fetching;spawn_fetch_operation~notify_arrival:true)(** Module containing functions that are the internal transitions
of the mempool. These functions are called by the {!Worker} when
an event arrives. *)moduleRequests=structmoduleParser=MakeParser(Proto)leton_arrived(pv:types_state)ophop:(unit,Empty.t)resultLwt.t=letopenLwt_syntaxinpv.shell.fetching<-Operation_hash.Set.removeophpv.shell.fetching;ifalready_handled~origin:Events.Arrivedpv.shellophthenreturn_ok_unitelsematchParser.parseophopwith|Error_->let*()=Events.(emitunparsable_operation)ophinPrevalidator_classification.add_unparsableophpv.shell.classification;return_ok_unit|Okparsed_op->(let*v=pre_filterpv~notifier:(mk_notifierpv.operation_stream)parsed_opinmatchvwith|`Drop->return_ok_unit|(`High|`Medium|`Low_)asprio->ifnot(Block_hash.Set.memop.Operation.shell.branchpv.shell.live_blocks)then(pv.shell.parameters.tools.chain_tools.clear_or_canceloph;return_ok_unit)else((* TODO: https://gitlab.com/tezos/tezos/-/issues/1723
Should this have an influence on the peer's score ? *)pv.shell.pending<-Pending_ops.addparsed_oppriopv.shell.pending;return_ok_unit))leton_inject(pv:types_state)~forceop=letopenLwt_result_syntaxinletoph=Operation.hashopin(* Currently, an injection is always done with the highest priority, because:
- We want to process and propagate the injected operations fast,
- We don't want to call prefilter to get the priority.
But, this may change in the future
*)letprio=`Highinifalready_handled~origin:Events.Injectedpv.shellophthen(* FIXME: https://gitlab.com/tezos/tezos/-/issues/1722
Is this an error? *)return_unitelsematchParser.parseophopwith|Errorerr->failwith"Invalid operation %a: %a."Operation_hash.ppophError_monad.pp_print_traceerr|Okparsed_op->(ifforcethen(let*!()=pv.shell.parameters.tools.chain_tools.inject_operationophopinpv.shell.pending<-Pending_ops.addparsed_oppriopv.shell.pending;let*!()=Events.(emitoperation_injected)ophinreturn_unit)elseifnot(Block_hash.Set.memop.Operation.shell.branchpv.shell.live_blocks)thenfailwith"Operation %a is branched on a block %a which is too old"Operation_hash.ppophBlock_hash.ppop.Operation.shell.branchelseletnotifier=mk_notifierpv.operation_streaminlet*!validation_state,delta_mempool,to_handle=classify_operationpv.shell~config:pv.config~validation_state:pv.validation_stateMempool.emptyparsed_opinletop_status=(* to_handle contains the given operation and its classification, and
all operations whose classes are changed/impacted by this
classification (eg. in case of operation replacement). Here, we
retrieve the classification of our operation. *)List.find_opt(function|({hash;_}:protocol_operationoperation),_->Operation_hash.equalhashoph)to_handleinmatchop_statuswith|Some(_h,`Validated)->(* TODO: https://gitlab.com/tezos/tezos/-/issues/2294
We may want to only do the injection/replacement if a
flag `replace` is set to true in the injection query. *)let*!()=pv.shell.parameters.tools.chain_tools.inject_operationophopin(* Call handle & update_advertised_mempool only if op is accepted *)List.iter(handle_classification~notifierpv.shell)to_handle;pv.validation_state<-validation_state;(* Note that in this case, we may advertise an operation and bypass
the prioritirization strategy. *)let*!v=update_advertised_mempool_fieldspv.shelldelta_mempoolinlet*!()=Events.(emitoperation_injected)ophinreturnv|Some(_h,(`Branch_delayede|`Branch_refusede|`Refusede|`Outdatede))->Lwt.return@@error_with"Error while validating injected operation %a:@ %a"Operation_hash.ppophpp_print_tracee|None->(* This case should not happen *)failwith"Unexpected error while injecting operation %a. Operation \
not found after classifying it."Operation_hash.ppoph)leton_notify(shell:('operation_data,_)types_state_shell)peermempool=letmay_fetch_operation=may_fetch_operationshell(Somepeer)inlet()=Operation_hash.Set.itermay_fetch_operationmempool.Mempool.known_validinSeq.itermay_fetch_operation(Operation_hash.Set.to_seqmempool.Mempool.pending)leton_flush~handle_branch_refusedpvnew_predecessornew_live_blocksnew_live_operations=letopenLwt_result_syntaxinletold_predecessor=pv.shell.predecessorinpv.shell.predecessor<-new_predecessor;pv.shell.live_blocks<-new_live_blocks;pv.shell.live_operations<-new_live_operations;Lwt_watcher.shutdown_inputpv.operation_stream;pv.operation_stream<-Lwt_watcher.create_input();lettimestamp_system=Tezos_base.Time.System.now()inpv.shell.timestamp<-timestamp_system;lettimestamp=Time.System.to_protocoltimestamp_systeminlet*validation_state=pv.shell.parameters.tools.flush~head:new_predecessor~timestamppv.validation_stateinpv.validation_state<-validation_state;let*!new_pending_operations=Classification.recycle_operations~from_branch:old_predecessor~to_branch:new_predecessor~live_blocks:new_live_blocks~parse:(funophop->Result.to_option(Parser.parseophop))~classes:pv.shell.classification~pending:(Pending_ops.operationspv.shell.pending)~block_store:block_tools~chain:pv.shell.parameters.tools.chain_tools~handle_branch_refusedin(* Could be implemented as Operation_hash.Map.filter_s which
does not exist for the moment. *)let*!new_pending_operations,nb_pending=Operation_hash.Map.fold_s(fun_ophop(pending,nb_pending)->let*!v=pre_filterpv~notifier:(mk_notifierpv.operation_stream)opinmatchvwith|`Drop->Lwt.return(pending,nb_pending)|(`High|`Medium|`Low_)asprio->(* Here, an operation injected in this node with `High priority will
now get its approriate priority. *)Lwt.return(Pending_ops.addoppriopending,nb_pending+1))new_pending_operations(Pending_ops.empty,0)inlet*!()=Events.(emitoperations_to_reclassify)nb_pendinginpv.shell.pending<-new_pending_operations;set_mempoolpv.shellMempool.emptyleton_advertise(shell:('protocol_data,_)types_state_shell)=matchshell.advertisementwith|`None->()(* May happen if nothing to advertise since last advertisement. *)|`Pendingmempool->shell.advertisement<-`None;(* In this case, mempool is not empty, but let's avoid advertising
empty mempools in case this invariant is broken. *)ifnot(Mempool.is_emptymempool)thenshell.parameters.tools.advertise_current_head~mempoolshell.predecessor(* If [flush_if_validated] is [true], removing a validated
operation triggers a flush of the mempool. Because flushing may
be costly this should be done only when the action is triggered
locally by the user. This allows a better UX if the user bans a
[validated] operation with the express goal to allow a
[branch_delayed] operation to become [validated] again. *)letremove~flush_if_validatedpvoph=letopenLwt_result_syntaxinpv.shell.parameters.tools.chain_tools.clear_or_canceloph;pv.shell.advertisement<-remove_from_advertisementophpv.shell.advertisement;pv.shell.banned_operations<-Operation_hash.Set.addophpv.shell.banned_operations;matchClassification.removeophpv.shell.classificationwith|None->pv.shell.pending<-Pending_ops.removeophpv.shell.pending;pv.shell.fetching<-Operation_hash.Set.removeophpv.shell.fetching;return_unit|Some(_op,classification)->(match(classification,flush_if_validated)with|`Validated,true->let+()=on_flush~handle_branch_refused:falsepvpv.shell.predecessorpv.shell.live_blockspv.shell.live_operationsinpv.shell.pending<-Pending_ops.removeophpv.shell.pending|`Branch_delayed_,_|`Branch_refused_,_|`Refused_,_|`Outdated_,_|`Validated,false->pv.validation_state<-Prevalidation_t.remove_operationpv.validation_stateoph;return_unit)leton_banpvoph_to_ban=letopenLwt_result_syntaxinpv.shell.banned_operations<-Operation_hash.Set.addoph_to_banpv.shell.banned_operations;let*res=remove~flush_if_validated:truepvoph_to_baninlet*!()=Events.(emitoperation_banned)oph_to_baninreturnresendendmoduletypeARG=sigvallimits:Shell_limits.prevalidator_limitsvalchain_db:Distributed_db.chain_dbvalchain_id:Chain_id.tendmoduleWorkerGroup=Worker.MakeGroup(Name)(Prevalidator_worker_state.Request)(** The functor that is not tested, in other words used only in production.
This functor's code is not tested (contrary to functor {!Make_s} above),
because it hardcodes a dependency to [Store.chain_store] in its instantiation
of type [chain_store]. This is what makes the code of this functor
not testable for the moment, because [Store.chain_store] has poor
testing capabilities.
Note that, because this functor [include]s {!Make_s}, it is a
strict extension of [Make_s]. *)moduleMake(Proto:Protocol_plugin.T)(Arg:ARG)(Prevalidation_t:Prevalidation.Twithtypeprotocol_operation=Proto.operationandtypechain_store=Store.chain_store):T=structmoduleS=Make_s(Proto)(Prevalidation_t)openStypetypes_state=S.types_stateletget_rpc_directorypv=pv.rpc_directoryletname=(Arg.chain_id,Proto.hash)moduleTypes=structtypestate=types_statetypeparameters=Shell_limits.prevalidator_limits*Distributed_db.chain_dbendmoduleWorker:Worker.TwithtypeName.t=Name.tandtype('a,'b)Request.t=('a,'b)Request.tandtypeRequest.view=Request.viewandtypeTypes.state=Types.stateandtypeTypes.parameters=Types.parameters=WorkerGroup.MakeWorker(Types)openTypestypeworker=Worker.infiniteWorker.queueWorker.t(** Return a json describing the prevalidator's [config].
The boolean [include_default] ([true] by default) indicates
whether the json should include the fields which have a value
equal to their default value. *)letget_config_json?(include_default=true)pv=letinclude_default_fields=ifinclude_defaultthen`Alwayselse`NeverinData_encoding.Json.construct~include_default_fieldsPrevalidation_t.config_encodingpv.configletfilter_validation_passesallowed_validation_passes(op:protocol_operation)=matchallowed_validation_passeswith|[]->true|validation_passes->(matchProto.acceptable_passopwith|None->false|Somevalidation_pass->List.mem~equal:Compare.Int.equalvalidation_passvalidation_passes)letbuild_rpc_directoryw=lazy(letopenLwt_result_syntaxinletdir:stateTezos_rpc.Directory.tref=refTezos_rpc.Directory.emptyinletmoduleProto_services=Block_services.Make(Proto)(Proto)indir:=Tezos_rpc.Directory.register!dir(Proto_services.S.Mempool.get_filterTezos_rpc.Path.open_root)(funpvparams()->return(get_config_json~include_default:params#include_defaultpv));dir:=Tezos_rpc.Directory.register!dir(Proto_services.S.Mempool.set_filterTezos_rpc.Path.open_root)(funpv()obj->letopenLwt_syntaxinlet*()=tryletconfig=Data_encoding.Json.destructPrevalidation_t.config_encodingobjinpv.config<-config;Lwt.return_unitwith_->Events.(emitinvalid_mempool_filter_configuration)()in(* We return [get_config_json pv] rather than [obj] in
order to show omitted fields (which have been reset to
their default values), and also in case [obj] is invalid. *)return_ok(get_config_jsonpv));(* Ban an operation (from its given hash): remove it from the
mempool if present. Add it to the set pv.banned_operations
to prevent it from being fetched/processed/injected in the
future.
Note: If the baker has already received the operation, then
it's necessary to restart it manually to flush the operation
from it. *)dir:=Tezos_rpc.Directory.register!dir(Proto_services.S.Mempool.ban_operationTezos_rpc.Path.open_root)(fun_pv()oph->letopenLwt_result_syntaxinlet*!r=Worker.Queue.push_request_and_waitw(Request.Banoph)inmatchrwith|Error(ClosedNone)->fail[Worker_types.Terminated]|Error(Closed(Someerrs))->failerrs|Error(Request_errorerr)->failerr|Error(Anyexn)->fail[Exnexn]|Ok()->return_unit);(* Unban an operation (from its given hash): remove it from the
set pv.banned_operations (nothing happens if it was not banned). *)dir:=Tezos_rpc.Directory.register!dir(Proto_services.S.Mempool.unban_operationTezos_rpc.Path.open_root)(funpv()oph->pv.shell.banned_operations<-Operation_hash.Set.removeophpv.shell.banned_operations;return_unit);(* Unban all operations: clear the set pv.banned_operations. *)dir:=Tezos_rpc.Directory.register!dir(Proto_services.S.Mempool.unban_all_operationsTezos_rpc.Path.open_root)(funpv()()->pv.shell.banned_operations<-Operation_hash.Set.empty;return_unit);dir:=Tezos_rpc.Directory.gen_register!dir(Proto_services.S.Mempool.pending_operationsTezos_rpc.Path.open_root)(funpvparams()->letvalidated=ifparams#validated&&Option.value~default:trueparams#applied(* https://gitlab.com/tezos/tezos/-/issues/5891
applied is deprecated and should be removed in a future
version of Octez *)thenClassification.Sized_map.to_mappv.shell.classification.validated|>Operation_hash.Map.to_seq|>Seq.filter_map(fun(oph,op)->iffilter_validation_passesparams#validation_passesop.protocolthenSome(oph,op.protocol)elseNone)|>List.of_seqelse[]inletprocess_mapmap=letopenOperation_hashinMap.filter_map(fun_oph(op,error)->iffilter_validation_passesparams#validation_passesop.protocolthenSome(op.protocol,error)elseNone)mapinletrefused=ifparams#refusedthenprocess_map(Classification.mappv.shell.classification.refused)elseOperation_hash.Map.emptyinletoutdated=ifparams#outdatedthenprocess_map(Classification.mappv.shell.classification.outdated)elseOperation_hash.Map.emptyinletbranch_refused=ifparams#branch_refusedthenprocess_map(Classification.mappv.shell.classification.branch_refused)elseOperation_hash.Map.emptyinletbranch_delayed=ifparams#branch_delayedthenprocess_map(Classification.mappv.shell.classification.branch_delayed)elseOperation_hash.Map.emptyinletunprocessed=Operation_hash.Map.filter_map(fun_{protocol;_}->iffilter_validation_passesparams#validation_passesprotocolthenSomeprotocolelseNone)(Pending_ops.operationspv.shell.pending)inletpending_operations={Proto_services.Mempool.validated;refused;outdated;branch_refused;branch_delayed;unprocessed;}inTezos_rpc.Answer.return(params#version,pending_operations));dir:=Tezos_rpc.Directory.register!dir(Proto_services.S.Mempool.request_operationsTezos_rpc.Path.open_root)(funpvt()->pv.shell.parameters.tools.send_get_current_head?peer:t#peer_id();return_unit);dir:=Tezos_rpc.Directory.gen_register!dir(Proto_services.S.Mempool.monitor_operationsTezos_rpc.Path.open_root)(funpvparams()->Lwt_mutex.with_lockpv.lock@@fun()->letop_stream,stopper=Lwt_watcher.create_streampv.operation_streamin(* First call : retrieve the current set of op from the mempool *)letvalidated_seq=ifparams#validated&&Option.value~default:trueparams#applied(* https://gitlab.com/tezos/tezos/-/issues/5891
applied is deprecated and should be removed in a future
version of Octez *)thenClassification.Sized_map.to_mappv.shell.classification.validated|>Operation_hash.Map.to_seq|>Seq.map(fun(hash,{protocol;_})->((hash,protocol),None))elseSeq.emptyinletprocess_error_mapmap=letopenOperation_hashinmap|>Map.to_seq|>Seq.map(fun(hash,(op,error))->((hash,op.protocol),Someerror))inletrefused_seq=ifparams#refusedthenprocess_error_map(Classification.mappv.shell.classification.refused)elseSeq.emptyinletbranch_refused_seq=ifparams#branch_refusedthenprocess_error_map(Classification.mappv.shell.classification.branch_refused)elseSeq.emptyinletbranch_delayed_seq=ifparams#branch_delayedthenprocess_error_map(Classification.mappv.shell.classification.branch_delayed)elseSeq.emptyinletoutdated_seq=ifparams#outdatedthenprocess_error_map(Classification.mappv.shell.classification.outdated)elseSeq.emptyinletfilter((_,op),_)=filter_validation_passesparams#validation_passesopinletcurrent_mempool=Seq.appendoutdated_seqbranch_delayed_seq|>Seq.appendbranch_refused_seq|>Seq.appendrefused_seq|>Seq.appendvalidated_seq|>Seq.filterfilter|>List.of_seqinletcurrent_mempool=ref(Somecurrent_mempool)inletfilter_result=function|`Validated->params#validated&&Option.value~default:trueparams#applied|`Refused_->params#refused|`Outdated_->params#outdated|`Branch_refused_->params#branch_refused|`Branch_delayed_->params#branch_delayedinletrecnext()=letopenLwt_syntaxinmatch!current_mempoolwith|Somemempool->current_mempool:=None;Lwt.return_some(params#version,mempool)|None->(let*o=Lwt_stream.getop_streaminmatchowith|Some(kind,op)whenfilter_resultkind&&filter_validation_passesparams#validation_passesop.protocol->leterrors=matchkindwith|`Validated->None|`Branch_delayederrors|`Branch_refusederrors|`Refusederrors|`Outdatederrors->SomeerrorsinLwt.return_some(params#version,[((op.hash,op.protocol),errors)])|Some_->next()|None->Lwt.return_none)inletshutdown()=Lwt_watcher.shutdownstopperinTezos_rpc.Answer.return_stream{next;shutdown});!dir)(** Module implementing the events at the {!Worker} level. Contrary
to {!Requests}, these functions depend on [Worker]. *)moduleHandlers=structtypeself=workerleton_request:typerrequest_error.worker->(r,request_error)Request.t->(r,request_error)resultLwt.t=funwrequest->letopenLwt_result_syntaxinPrometheus.Counter.inc_onemetrics.worker_counters.worker_request_count;letpv=Worker.statewinletpost_processing:(r,request_error)resultLwt.t->(r,request_error)resultLwt.t=funr->letopenLwt_syntaxinlet*()=handle_unprocessedpvinrinpost_processing@@matchrequestwith|Request.Flush(hash,event,live_blocks,live_operations)->Requests.on_advertisepv.shell;(* TODO: https://gitlab.com/tezos/tezos/-/issues/1727
Rebase the advertisement instead. *)let*block=pv.shell.parameters.tools.read_blockhashinlethandle_branch_refused=Chain_validator_worker_state.(matcheventwith|Head_increment|Ignored_head->false|Branch_switch->true)inLwt_mutex.with_lockpv.lock@@fun():(r,errortrace)resultLwt.t->Requests.on_flush~handle_branch_refusedpvblocklive_blockslive_operations|Request.Notify(peer,mempool)->Requests.on_notifypv.shellpeermempool;return_unit|Request.Leftover->(* unprocessed ops are handled just below *)return_unit|Request.Inject{op;force}->Requests.on_injectpv~forceop|Request.Arrived(oph,op)->Requests.on_arrivedpvophop|Request.Advertise->Requests.on_advertisepv.shell;return_unit|Request.Banoph->Requests.on_banpvophleton_closew=letpv=Worker.statewinLwt_watcher.shutdown_inputpv.operation_stream;Operation_hash.Set.iterpv.shell.parameters.tools.chain_tools.clear_or_cancelpv.shell.fetching;Lwt.return_unitletmk_tools(chain_db:Distributed_db.chain_db):_Tools.tools=letadvertise_current_head~mempoolbh=Distributed_db.Advertise.current_headchain_db~mempoolbhinletchain_tools=mk_chain_toolschain_dbinletflush=Prevalidation_t.flush(Distributed_db.chain_storechain_db)inletfetch?peer?timeoutoph=Distributed_db.Operation.fetchchain_db?timeout?peeroph()inletread_blockbh=letchain_store=Distributed_db.chain_storechain_dbinStore.Block.read_blockchain_storebhinletsend_get_current_head?peer()=matchpeerwith|None->Distributed_db.Request.current_head_from_allchain_db|Somepeer->Distributed_db.Request.current_head_from_peerchain_dbpeerinletset_mempool~headmempool=letchain_store=Distributed_db.chain_storechain_dbinStore.Chain.set_mempoolchain_store~headmempoolin{advertise_current_head;chain_tools;flush;fetch;read_block;send_get_current_head;set_mempool;}letmk_worker_toolsw:Tools.worker_tools=letpush_requestr=Worker.Queue.push_requestwrinletpush_request_nowr=Worker.Queue.push_request_nowwrin{push_request;push_request_now}typelaunch_error=errortraceleton_launchw_(limits,chain_db):(state,launch_error)resultLwt.t=letopenLwt_result_syntaxinletchain_store=Distributed_db.chain_storechain_dbinlet*!head=Store.Chain.current_headchain_storeinlet*!mempool=Store.Chain.mempoolchain_storeinlet*!live_blocks,live_operations=Store.Chain.live_blockschain_storeinlettimestamp_system=Tezos_base.Time.System.now()inlettimestamp=Time.System.to_protocoltimestamp_systeminlet*validation_state=Prevalidation_t.createchain_store~head~timestampinletfetching=mempool.known_validinletclassification_parameters=Classification.{map_size_limit=limits.Shell_limits.max_refused_operations;on_discarded_operation=Distributed_db.Operation.clear_or_cancelchain_db;}inletclassification=Classification.createclassification_parametersinletparameters={limits;tools=mk_toolschain_db}inletshell={classification;parameters;predecessor=head;timestamp=timestamp_system;live_blocks;live_operations;mempool=Mempool.empty;fetching;pending=Pending_ops.empty;advertisement=`None;banned_operations=Operation_hash.Set.empty;worker=mk_worker_toolsw;}inShell_metrics.Mempool.set_validated_collector(fun()->Prevalidator_classification.Sized_map.cardinalshell.classification.validated|>float_of_int);Shell_metrics.Mempool.set_refused_collector(fun()->Prevalidator_classification.cardinalshell.classification.refused|>float_of_int);Shell_metrics.Mempool.set_branch_refused_collector(fun()->Prevalidator_classification.cardinalshell.classification.branch_refused|>float_of_int);Shell_metrics.Mempool.set_branch_delayed_collector(fun()->Prevalidator_classification.cardinalshell.classification.branch_delayed|>float_of_int);Shell_metrics.Mempool.set_outdated_collector(fun()->Prevalidator_classification.cardinalshell.classification.outdated|>float_of_int);Shell_metrics.Mempool.set_unprocessed_collector(fun()->Prevalidator_pending_operations.cardinalshell.pending|>float_of_int);letpv={shell;validation_state;operation_stream=Lwt_watcher.create_input();rpc_directory=build_rpc_directoryw;config=(* TODO: https://gitlab.com/tezos/tezos/-/issues/1725
initialize from config file *)Prevalidation_t.default_config;lock=Lwt_mutex.create();}inSeq.iter(may_fetch_operationpv.shellNone)(Operation_hash.Set.to_seqfetching);returnpvleton_error(typeab)_wst(request:(a,b)Request.t)(errs:b):unittzresultLwt.t=Prometheus.Counter.inc_onemetrics.worker_counters.worker_error_count;letopenLwt_syntaxinmatchrequestwith|Request.(Inject_)asr->let*()=Events.(emitrequest_failed)(Request.viewr,st,errs)inreturn_ok_unit|Request.Notify_->(matcherrswith_->.)|Request.Leftover->(matcherrswith_->.)|Request.Arrived_->(matcherrswith_->.)|Request.Advertise->(matcherrswith_->.)|Request.Flush_->letrequest_view=Request.viewrequestinlet*()=Events.(emitrequest_failed)(request_view,st,errs)inLwt.return_errorerrs|Request.Ban_->letrequest_view=Request.viewrequestinlet*()=Events.(emitrequest_failed)(request_view,st,errs)inLwt.return_errorerrsleton_completion_wr_st=Prometheus.Counter.inc_onemetrics.worker_counters.worker_completion_count;matchRequest.viewrwith|View(Inject_)|View(Ban_)|Request.View(Flush_)->Events.(emitrequest_completed_info)(Request.viewr,st)|View(Notify_)|ViewLeftover|View(Arrived_)|ViewAdvertise->Events.(emitrequest_completed_debug)(Request.viewr,st)leton_no_request_=Lwt.return_unitendlettable=Worker.create_tableQueue(* NOTE: we register a single worker for each instantiation of this Make
* functor (and thus a single worker for the single instantiation of Worker).
* Whilst this is somewhat abusing the intended purpose of worker, it is part
* of a transition plan to a one-worker-per-peer architecture. *)letworker_promise=Worker.launchtablename(Arg.limits,Arg.chain_db)(moduleHandlers)letworker=lazy(matchLwt.stateworker_promisewith|Lwt.Return(Okworker)->worker|Lwt.Return(Error_)|Lwt.Fail_|Lwt.Sleep->assertfalse)endletmakelimitschain_dbchain_id(moduleProto:Protocol_plugin.T)=letmodulePrevalidation_t=Prevalidation.Make(Proto)inletmodulePrevalidator=Make(Proto)(structletlimits=limitsletchain_db=chain_dbletchain_id=chain_idend)(Prevalidation_t)in(modulePrevalidator:T)moduleChainProto_registry=Map.Make(structtypet=Chain_id.t*Protocol_hash.tletcompare(c1,p1)(c2,p2)=letpc=Protocol_hash.comparep1p2inifpc=0thenChain_id.comparec1c2elsepcend)(** {2 Public interface} *)typet=(moduleT)letchain_proto_registry:tChainProto_registry.tref=refChainProto_registry.emptyletcreatelimits(moduleProto:Protocol_plugin.T)chain_db=letopenLwt_result_syntaxinletchain_store=Distributed_db.chain_storechain_dbinletchain_id=Store.Chain.chain_idchain_storeinmatchChainProto_registry.find(chain_id,Proto.hash)!chain_proto_registrywith|None->letprevalidator=makelimitschain_dbchain_id(moduleProto)inlet(modulePrevalidator:T)=prevalidatorinchain_proto_registry:=ChainProto_registry.addPrevalidator.nameprevalidator!chain_proto_registry;returnprevalidator|Somep->returnpletshutdown(t:t)=letmodulePrevalidator:T=(valt)inletw=Lazy.forcePrevalidator.workerinchain_proto_registry:=ChainProto_registry.removePrevalidator.name!chain_proto_registry;Prevalidator.Worker.shutdownwletflush(t:t)eventheadlive_blockslive_operations=letopenLwt_result_syntaxinletmodulePrevalidator:T=(valt)inletw=Lazy.forcePrevalidator.workerinlet*!r=Prevalidator.Worker.Queue.push_request_and_waitw(Request.Flush(head,event,live_blocks,live_operations))inmatchrwith|Okr->Lwt.return_okr|Error(ClosedNone)->fail[Worker_types.Terminated]|Error(Closed(Someerrs))->failerrs|Error(Anyexn)->fail[Exnexn]|Error(Request_errorerror_trace)->failerror_traceletnotify_operations(t:t)peermempool=letmodulePrevalidator:T=(valt)inletw=Lazy.forcePrevalidator.workerinletopenLwt_result_syntaxinlet*!(_was_pushed:bool)=Prevalidator.Worker.Queue.push_requestw(Request.Notify(peer,mempool))inLwt.return_unitletinject_operation(t:t)~forceop=letmodulePrevalidator:T=(valt)inletopenLwt_result_syntaxinletw=Lazy.forcePrevalidator.workerinlet*!r=Prevalidator.Worker.Queue.push_request_and_waitw(Inject{op;force})inmatchrwith|Okr->Lwt.return_okr|Error(ClosedNone)->fail[Worker_types.Terminated]|Error(Closed(Someerrs))->failerrs|Error(Anyexn)->fail[Exnexn]|Error(Request_errorerror_trace)->failerror_traceletstatus(t:t)=letmodulePrevalidator:T=(valt)inletw=Lazy.forcePrevalidator.workerinPrevalidator.Worker.statuswletrunning_workers()=ChainProto_registry.fold(fun(id,proto)tacc->(id,proto,t)::acc)!chain_proto_registry[]letpending_requests(t:t)=letmodulePrevalidator:T=(valt)inletw=Lazy.forcePrevalidator.workerinPrevalidator.Worker.Queue.pending_requestswletcurrent_request(t:t)=letmodulePrevalidator:T=(valt)inletw=Lazy.forcePrevalidator.workerinPrevalidator.Worker.current_requestwletinformation(t:t)=letmodulePrevalidator:T=(valt)inletw=Lazy.forcePrevalidator.workerinPrevalidator.Worker.informationwletpipeline_length(t:t)=letmodulePrevalidator:T=(valt)inletw=Lazy.forcePrevalidator.workerinPrevalidator.Worker.Queue.pending_requests_lengthwletempty_rpc_directory:unitTezos_rpc.Directory.t=Tezos_rpc.Directory.gen_registerTezos_rpc.Directory.empty(Block_services.Empty.S.Mempool.pending_operationsTezos_rpc.Path.open_root)(fun_pvparams()->letpending_operations={Block_services.Empty.Mempool.validated=[];refused=Operation_hash.Map.empty;outdated=Operation_hash.Map.empty;branch_refused=Operation_hash.Map.empty;branch_delayed=Operation_hash.Map.empty;unprocessed=Operation_hash.Map.empty;}inTezos_rpc.Answer.return(params#version,pending_operations))letrpc_directory:toptionTezos_rpc.Directory.t=Tezos_rpc.Directory.register_dynamic_directoryTezos_rpc.Directory.empty(Block_services.mempool_pathTezos_rpc.Path.open_root)(function|None->Lwt.return(Tezos_rpc.Directory.map(fun_->Lwt.return_unit)empty_rpc_directory)|Somet->letmodulePrevalidator:T=(valt:T)inletw=Lazy.forcePrevalidator.workerinletpv=Prevalidator.Worker.statewinletpv_rpc_dir=Lazy.force(Prevalidator.get_rpc_directorypv)inLwt.return(Tezos_rpc.Directory.map(fun_->Lwt.returnpv)pv_rpc_dir))