123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186(*****************************************************************************)(* *)(* Open Source License *)(* Copyright (c) 2018 Dynamic Ledger Solutions, Inc. <contact@tezos.com> *)(* Copyright (c) 2018-2022 Nomadic Labs, <contact@nomadic-labs.com> *)(* *)(* Permission is hereby granted, free of charge, to any person obtaining a *)(* copy of this software and associated documentation files (the "Software"),*)(* to deal in the Software without restriction, including without limitation *)(* the rights to use, copy, modify, merge, publish, distribute, sublicense, *)(* and/or sell copies of the Software, and to permit persons to whom the *)(* Software is furnished to do so, subject to the following conditions: *)(* *)(* The above copyright notice and this permission notice shall be included *)(* in all copies or substantial portions of the Software. *)(* *)(* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR*)(* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, *)(* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL *)(* THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER*)(* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING *)(* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER *)(* DEALINGS IN THE SOFTWARE. *)(* *)(*****************************************************************************)includePrevalidator_internal_commonopenPrevalidator_worker_statemoduleChainProto_registry=Map.Make(structtypet=Chain_id.t*Protocol_hash.tletcompare(c1,p1)(c2,p2)=letpc=Protocol_hash.comparep1p2inifpc=0thenChain_id.comparec1c2elsepcend)letchain_proto_registry:tChainProto_registry.tref=refChainProto_registry.emptyletcreatelimits(filter:Shell_plugin.filter_t)chain_db=letopenLwt_result_syntaxinletchain_store=Distributed_db.chain_storechain_dbinletchain_id=Store.Chain.chain_idchain_storeinletproto_hash=matchfilterwith|Recent(moduleFilter)->Filter.Proto.hash|Legacy(moduleFilter)->Filter.Proto.hashinmatchChainProto_registry.find(chain_id,proto_hash)!chain_proto_registrywith|None->letprevalidator=matchfilterwith|Recent(moduleFilter)->Prevalidator_internal.makelimitschain_dbchain_id(moduleFilter)|Legacy(moduleFilter)->Legacy_prevalidator_internal.makelimitschain_dbchain_id(moduleFilter)inlet(modulePrevalidator:T)=prevalidatorinchain_proto_registry:=ChainProto_registry.addPrevalidator.nameprevalidator!chain_proto_registry;returnprevalidator|Somep->returnpletshutdown(t:t)=letmodulePrevalidator:T=(valt)inletw=Lazy.forcePrevalidator.workerinchain_proto_registry:=ChainProto_registry.removePrevalidator.name!chain_proto_registry;Prevalidator.Worker.shutdownwletflush(t:t)eventheadlive_blockslive_operations=letopenLwt_result_syntaxinletmodulePrevalidator:T=(valt)inletw=Lazy.forcePrevalidator.workerinlet*!r=Prevalidator.Worker.Queue.push_request_and_waitw(Request.Flush(head,event,live_blocks,live_operations))inmatchrwith|Okr->Lwt.return_okr|Error(ClosedNone)->fail[Worker_types.Terminated]|Error(Closed(Someerrs))->failerrs|Error(Anyexn)->fail[Exnexn]|Error(Request_errorerror_trace)->failerror_traceletnotify_operations(t:t)peermempool=letmodulePrevalidator:T=(valt)inletw=Lazy.forcePrevalidator.workerinletopenLwt_result_syntaxinlet*!(_was_pushed:bool)=Prevalidator.Worker.Queue.push_requestw(Request.Notify(peer,mempool))inLwt.return_unitletinject_operation(t:t)~forceop=letmodulePrevalidator:T=(valt)inletopenLwt_result_syntaxinletw=Lazy.forcePrevalidator.workerinlet*!r=Prevalidator.Worker.Queue.push_request_and_waitw(Inject{op;force})inmatchrwith|Okr->Lwt.return_okr|Error(ClosedNone)->fail[Worker_types.Terminated]|Error(Closed(Someerrs))->failerrs|Error(Anyexn)->fail[Exnexn]|Error(Request_errorerror_trace)->failerror_traceletstatus(t:t)=letmodulePrevalidator:T=(valt)inletw=Lazy.forcePrevalidator.workerinPrevalidator.Worker.statuswletrunning_workers()=ChainProto_registry.fold(fun(id,proto)tacc->(id,proto,t)::acc)!chain_proto_registry[]letpending_requests(t:t)=letmodulePrevalidator:T=(valt)inletw=Lazy.forcePrevalidator.workerinPrevalidator.Worker.Queue.pending_requestswletcurrent_request(t:t)=letmodulePrevalidator:T=(valt)inletw=Lazy.forcePrevalidator.workerinPrevalidator.Worker.current_requestwletinformation(t:t)=letmodulePrevalidator:T=(valt)inletw=Lazy.forcePrevalidator.workerinPrevalidator.Worker.informationwletpipeline_length(t:t)=letmodulePrevalidator:T=(valt)inletw=Lazy.forcePrevalidator.workerinPrevalidator.Worker.Queue.pending_requests_lengthwletempty_rpc_directory:unitTezos_rpc.Directory.t=Tezos_rpc.Directory.gen_registerTezos_rpc.Directory.empty(Block_services.Empty.S.Mempool.pending_operationsTezos_rpc.Path.open_root)(fun_pvparams()->letpending_operations={Block_services.Empty.Mempool.applied=[];refused=Operation_hash.Map.empty;outdated=Operation_hash.Map.empty;branch_refused=Operation_hash.Map.empty;branch_delayed=Operation_hash.Map.empty;unprocessed=Operation_hash.Map.empty;}inBlock_services.Empty.Mempool.pending_operations_version_dispatcher~version:params#versionpending_operations)letrpc_directory:toptionTezos_rpc.Directory.t=Tezos_rpc.Directory.register_dynamic_directoryTezos_rpc.Directory.empty(Block_services.mempool_pathTezos_rpc.Path.open_root)(function|None->Lwt.return(Tezos_rpc.Directory.map(fun_->Lwt.return_unit)empty_rpc_directory)|Somet->letmodulePrevalidator:T=(valt:T)inletw=Lazy.forcePrevalidator.workerinletpv=Prevalidator.Worker.statewinletpv_rpc_dir=Lazy.force(Prevalidator.get_rpc_directorypv)inLwt.return(Tezos_rpc.Directory.map(fun_->Lwt.returnpv)pv_rpc_dir))