123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384(*****************************************************************************)(* *)(* Open Source License *)(* Copyright (c) 2018 Dynamic Ledger Solutions, Inc. <contact@tezos.com> *)(* Copyright (c) 2019-2021 Nomadic Labs, <contact@nomadic-labs.com> *)(* *)(* Permission is hereby granted, free of charge, to any person obtaining a *)(* copy of this software and associated documentation files (the "Software"),*)(* to deal in the Software without restriction, including without limitation *)(* the rights to use, copy, modify, merge, publish, distribute, sublicense, *)(* and/or sell copies of the Software, and to permit persons to whom the *)(* Software is furnished to do so, subject to the following conditions: *)(* *)(* The above copyright notice and this permission notice shall be included *)(* in all copies or substantial portions of the Software. *)(* *)(* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR*)(* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, *)(* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL *)(* THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER*)(* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING *)(* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER *)(* DEALINGS IN THE SOFTWARE. *)(* *)(*****************************************************************************)moduleMessage=Distributed_db_messagemoduleRequester_event=Distributed_db_event.Requester_eventtype'arequest_param={p2p:(Message.t,Peer_metadata.t,Connection_metadata.t)P2p.t;data:'a;active:unit->P2p_peer.Set.t;send:P2p_peer.Id.t->Message.t->unit;}moduletypeEXTENDED_REQUESTER=sigincludeRequester.FULL_REQUESTERvalstate_of_t:t->Chain_validator_worker_state.Distributed_db_state.table_schedulerendmoduletypeEXTENDED_REQUESTER_2=sigincludeEXTENDED_REQUESTERvalclear_all:t->Block_hash.t->int->unitendmoduletypeREQUEST_MESSAGE=sigtypeparamtypehashvalmax_length:intvalinitial_delay:Time.System.Span.tvalforge:param->hashlist->Message.tendmoduleMake_raw(Hash:Requester.HASH)(Disk_table:Requester.DISK_TABLEwithtypekey:=Hash.t)(Memory_table:Hashtbl.SeededSwithtypekey:=Hash.t)(Request_message:REQUEST_MESSAGEwithtypehash:=Hash.t)(Probe:Requester.PROBEwithtypekey:=Hash.tandtypevalue:=Disk_table.value):EXTENDED_REQUESTERwithtypekey=Hash.tandtypevalue=Disk_table.valueandtyperequest_param=Request_message.paramrequest_paramandtypestore=Disk_table.storeandtypeparam=Probe.paramandtypenotified_value=Probe.notified_value=structmoduleRequest=structtypeparam=Request_message.paramrequest_paramletactive{active;_}=active()letinitial_delay=Request_message.initial_delayletrecsendstategidkeys=letfirst_keys,keys=List.split_nRequest_message.max_lengthkeysinletmsg=Request_message.forgestate.datafirst_keysinstate.sendgidmsg;letopenPeer_metadatainlet(req:requests_kind)=matchmsgwith|Get_current_branch_->Branch|Get_current_head_->Head|Get_block_headers_->Block_header|Get_operations_->Operations|Get_protocols_->Protocols|Get_operations_for_blocks_->Operations_for_block|_->Otherinletmeta=P2p.get_peer_metadatastate.p2pgidinPeer_metadata.incrmeta@@Scheduled_requestreq;ifkeys<>[]thensendstategidkeysendmoduleMonitored_memory_table=structtype'at={table:'aMemory_table.t;metrics:Shell_metrics.Distributed_db.t;}letcreate~entry_type?randoms={table=Memory_table.create?randoms;metrics=Shell_metrics.Distributed_db.init~kind:Hash.name~entry_type;}letfindtx=Memory_table.findt.tablexletaddtkx=Memory_table.addt.tablekx;Shell_metrics.Distributed_db.updatet.metrics~length:(Memory_table.lengtht.table)letreplacetkx=Memory_table.replacet.tablekx;Shell_metrics.Distributed_db.updatet.metrics~length:(Memory_table.lengtht.table)letremovetk=Memory_table.removet.tablek;Shell_metrics.Distributed_db.updatet.metrics~length:(Memory_table.lengtht.table)letlengtht=Memory_table.lengtht.tableletfoldftx=Memory_table.foldft.tablexendmoduleTable=Requester.Make(Hash)(Disk_table)(Monitored_memory_table)(Request)(Probe)includeTableletstate_of_tt=lettable_length=Table.memory_table_lengthtinletscheduler_length=Table.pending_requeststin{Chain_validator_worker_state.Distributed_db_state.table_length;scheduler_length;}letcreate?random_table?global_inputrequest_paramdisk=Table.create?random_table?global_inputrequest_paramdiskletshutdownt=letopenLwt_syntaxinlet*()=Requester_event.(emitshutting_down_requester)()inTable.shutdowntendmoduleFake_operation_storage=structtypestore=Store.chain_storetypevalue=Operation.tletknown__=Lwt.return_falseletread__=fail_with_exnNot_foundletread_opt__=Lwt.return_noneendmoduleRaw_operation=Make_raw(structincludeOperation_hashletname="operation"end)(Fake_operation_storage)(Operation_hash.Table)(structtypeparam=unitletmax_length=10letinitial_delay=Time.System.Span.of_seconds_exn0.5letforge()keys=Message.Get_operationskeysend)(structtypeparam=unittypenotified_value=Operation.tletprobe__v=Somevend)moduleBlock_header_storage=structtypestore=Store.chain_storetypevalue=Block_header.tletknownchain_storehash=letopenLwt_syntaxinlet*b=Store.Block.is_known_validchain_storehashinmatchbwith|true->Lwt.return_true|false->Store.Block.is_known_validatedchain_storehashletreadchain_storeh=letopenLwt_result_syntaxinlet*b=let*!r=Store.Block.read_blockchain_storehinmatchrwith|Okb->returnb|Error_->Store.Block.read_validated_blockchain_storehinreturn(Store.Block.headerb)letread_optchain_storeh=letopenLwt_syntaxinlet*b=let*o=Store.Block.read_block_optchain_storehinmatchowith|Someb->Lwt.return_someb|None->Store.Block.read_validated_block_optchain_storehinLwt.return(Option.mapStore.Block.headerb)endmoduleRaw_block_header=Make_raw(structincludeBlock_hashletname="block_header"end)(Block_header_storage)(Block_hash.Table)(structtypeparam=unitletmax_length=10letinitial_delay=Time.System.Span.of_seconds_exn0.5letforge()keys=Message.Get_block_headerskeysend)(structtypeparam=unittypenotified_value=Block_header.tletprobe__v=Somevend)moduleOperations_table=Hashtbl.MakeSeeded(structtypet=Block_hash.t*int(* See [src/lib_base/tzPervasives.ml] for an explanation *)[@@@ocaml.warning"-32"]lethash=Hashtbl.seeded_hashletseeded_hash=Hashtbl.seeded_hash[@@@ocaml.warning"+32"]letequal(b1,i1)(b2,i2)=Block_hash.equalb1b2&&i1=i2end)moduleOperations_storage=structtypestore=Store.chain_storetypevalue=Operation.tlistletknownchain_store(h,_)=Store.Block.is_known_validchain_storehletreadchain_store(h,i)=letopenLwt_result_syntaxinlet*b=Store.Block.read_blockchain_storehinletops=List.nth(Store.Block.operationsb)i|>WithExceptions.Option.to_exn~none:Not_foundinreturnopsletread_optchain_store(h,i)=letopenLwt_syntaxinlet*o=Store.Block.read_block_optchain_storehinmatchowith|None->Lwt.return_none|Someb->Lwt.return(List.nth(Store.Block.operationsb)i)endmoduleRaw_operations=structincludeMake_raw(structtypet=Block_hash.t*intletname="operations"letppppf(h,n)=Format.fprintfppf"%a:%d"Block_hash.pphnletencoding=letopenData_encodinginobj2(req"block"Block_hash.encoding)(req"index"uint16)end)(Operations_storage)(Operations_table)(structtypeparam=unitletmax_length=10letinitial_delay=Time.System.Span.of_seconds_exn1.letforge()keys=Message.Get_operations_for_blockskeysend)(structtypeparam=Operation_list_list_hash.ttypenotified_value=Operation.tlist*Operation_list_list_hash.pathletprobe(_block,expected_ofs)expected_hash(ops,path)=letreceived_hash,received_ofs=Operation_list_list_hash.check_pathpath(Operation_list_hash.compute(List.mapOperation.hashops))inifreceived_ofs=expected_ofs&&Operation_list_list_hash.compareexpected_hashreceived_hash=0thenSomeopselseNoneend)letclear_alltablehashn=List.iter(funi->clear_or_canceltable(hash,i))(0--(n-1))endmoduleProtocol_storage=structtypestore=Store.storetypevalue=Protocol.tletknownstoreph=Lwt.return(Store.Protocol.memstoreph)letread_optstoreph=Store.Protocol.readstorephletreadstoreph=letopenLwt_syntaxinlet*o=read_optstorephinmatchowithNone->fail_with_exnNot_found|Somep->return_okpendmoduleRaw_protocol=Make_raw(structincludeProtocol_hashletname="protocol"end)(Protocol_storage)(Protocol_hash.Table)(structtypeparam=unitletinitial_delay=Time.System.Span.of_seconds_exn10.letmax_length=10letforge()keys=Message.Get_protocolskeysend)(structtypeparam=unittypenotified_value=Protocol.tletprobe__v=Somevend)