123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381(*****************************************************************************)(* *)(* Open Source License *)(* Copyright (c) 2022 Nomadic Labs, <contact@nomadic-labs.com> *)(* *)(* Permission is hereby granted, free of charge, to any person obtaining a *)(* copy of this software and associated documentation files (the "Software"),*)(* to deal in the Software without restriction, including without limitation *)(* the rights to use, copy, modify, merge, publish, distribute, sublicense, *)(* and/or sell copies of the Software, and to permit persons to whom the *)(* Software is furnished to do so, subject to the following conditions: *)(* *)(* The above copyright notice and this permission notice shall be included *)(* in all copies or substantial portions of the Software. *)(* *)(* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR*)(* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, *)(* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL *)(* THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER*)(* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING *)(* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER *)(* DEALINGS IN THE SOFTWARE. *)(* *)(*****************************************************************************)openBatcher_worker_typesmoduleMessage_queue=Hash_queue.Make(L2_message.Id)(L2_message)moduleBatcher_events=Batcher_events.Declare(structletworker_name="batcher"end)moduleL2_batched_message=structtypet={content:string;l1_id:Injector.Inj_operation.id}endmoduleBatched_messages=Hash_queue.Make(L2_message.Id)(L2_batched_message)typestatus=Pending_batch|BatchedofInjector.Inj_operation.idtypestate={node_ctxt:Node_context.ro;messages:Message_queue.t;batched:Batched_messages.t;mutableplugin:(moduleProtocol_plugin_sig.S);}letmessage_sizes=(* Encoded as length of s on 4 bytes + s *)4+String.lengthsletinject_batchstate(l2_messages:L2_message.tlist)=letopenLwt_result_syntaxinletmessages=List.mapL2_message.contentl2_messagesinletoperation=L1_operation.Add_messages{messages}inlet*l1_id=Injector.check_and_add_pending_operationstate.node_ctxt.config.modeoperationinlet+l1_id=matchl1_idwith|Somel1_id->returnl1_id|None->letop=Injector.Inj_operation.makeoperationinreturnop.idinList.iter(funmsg->letcontent=L2_message.contentmsginletid=L2_message.idmsginBatched_messages.replacestate.batchedid{content;l1_id})l2_messagesletinject_batchesstate=List.iter_es(inject_batchstate)letmax_batch_size{node_ctxt;plugin;_}=letmodulePlugin=(valplugin)inOption.valuenode_ctxt.config.batcher.max_batch_size~default:Plugin.Batcher_constants.protocol_max_batch_sizeletget_batchesstate~only_full=let(current_rev_batch,current_batch_size,current_batch_elements,full_batches)=Message_queue.fold(funmsg_idmessage(current_rev_batch,current_batch_size,current_batch_elements,full_batches)->letsize=message_size(L2_message.contentmessage)inletnew_batch_size=current_batch_size+sizeinletnew_batch_elements=current_batch_elements+1inifnew_batch_size<=max_batch_sizestate&&new_batch_elements<=state.node_ctxt.config.batcher.max_batch_elementsthen(* We can add the message to the current batch because we are still
within the bounds. *)((msg_id,message)::current_rev_batch,new_batch_size,new_batch_elements,full_batches)else(* The batch augmented with the message would be too big but it is
below the limit without it. We finalize the current batch and
create a new one for the message. NOTE: Messages in the queue are
always < [state.conf.max_batch_size] because {!on_register} only
accepts those. *)letbatch=List.revcurrent_rev_batchin([(msg_id,message)],size,1,batch::full_batches))state.messages([],0,0,[])inletbatches=if(notonly_full)||current_batch_size>=state.node_ctxt.config.batcher.min_batch_size&¤t_batch_elements>=state.node_ctxt.config.batcher.min_batch_elementsthen(* We have enough to make a batch with the last non-full batch. *)List.revcurrent_rev_batch::full_batcheselsefull_batchesinList.fold_left(fun(batches,to_remove)->function|[]->(batches,to_remove)|batch->letmsg_hashes,batch=List.splitbatchinletto_remove=List.rev_appendmsg_hashesto_removein(batch::batches,to_remove))([],[])batchesletproduce_batchesstate~only_full=letopenLwt_result_syntaxinletbatches,to_remove=get_batchesstate~only_fullinmatchbatcheswith|[]->return_unit|_->let*()=inject_batchesstatebatchesinlet*!()=Batcher_events.(emitbatched)(List.lengthbatches,List.lengthto_remove)inList.iter(funtr_hash->Message_queue.removestate.messagestr_hash)to_remove;return_unitleton_registerstate(messages:stringlist)=letopenLwt_result_syntaxinletmodulePlugin=(valstate.plugin)inletmax_size_msg=min(Plugin.Batcher_constants.message_size_limit+4(* We add 4 because [message_size] adds 4. *))(max_batch_sizestate)inlet*?messages=List.mapi_e(funimessage->ifmessage_sizemessage>max_size_msgthenerror_with"Message %d is too large (max size is %d)"imax_size_msgelseOk(L2_message.makemessage))messagesinlet*!()=Batcher_events.(emitqueue)(List.lengthmessages)inletids=List.map(funmessage->letmsg_id=L2_message.idmessageinMessage_queue.replacestate.messagesmsg_idmessage;msg_id)messagesinlet+()=produce_batchesstate~only_full:trueinidsleton_new_headstate=produce_batchesstate~only_full:falseletinit_batcher_statepluginnode_ctxt={node_ctxt;messages=Message_queue.create100_000(* ~ 400MB *);batched=Batched_messages.create100_000(* ~ 400MB *);plugin;}moduleTypes=structtypenonrecstate=statetypeparameters={node_ctxt:Node_context.ro;plugin:(moduleProtocol_plugin_sig.S);}endmoduleName=struct(* We only have a single batcher in the node *)typet=unitletencoding=Data_encoding.unitletbase=Batcher_events.Worker.section@["worker"]letpp__=()letequal()()=trueendmoduleWorker=Worker.MakeSingle(Name)(Request)(Types)typeworker=Worker.infiniteWorker.queueWorker.tmoduleHandlers=structtypeself=workerleton_request:typerrequest_error.worker->(r,request_error)Request.t->(r,request_error)resultLwt.t=funwrequest->letstate=Worker.statewinmatchrequestwith|Request.Registermessages->protect@@fun()->on_registerstatemessages|Request.Produce_batches->protect@@fun()->on_new_headstatetypelaunch_error=errortraceleton_launch_w()Types.{node_ctxt;plugin}=letopenLwt_result_syntaxinletstate=init_batcher_statepluginnode_ctxtinreturnstateleton_error(typeab)_wst(r:(a,b)Request.t)(errs:b):unittzresultLwt.t=letopenLwt_result_syntaxinletrequest_view=Request.viewrinletemit_and_return_errorserrs=let*!()=Batcher_events.(emitWorker.request_failed)(request_view,st,errs)inreturn_unitinmatchrwith|Request.Register_->emit_and_return_errorserrs|Request.Produce_batches->emit_and_return_errorserrsleton_completion_wr_st=matchRequest.viewrwith|Request.View(Register_|Produce_batches)->Batcher_events.(emitWorker.request_completed_debug)(Request.viewr,st)leton_no_request_=Lwt.return_unitleton_close_w=Lwt.return_unitendlettable=Worker.create_tableQueueletworker_promise,worker_waker=Lwt.task()letcheck_batcher_config(modulePlugin:Protocol_plugin_sig.S)Configuration.{max_batch_size;_}=matchmax_batch_sizewith|Somemwhenm>Plugin.Batcher_constants.protocol_max_batch_size->error_with"batcher.max_batch_size must be smaller than %d"Plugin.Batcher_constants.protocol_max_batch_size|_->Ok()letstartpluginnode_ctxt=letopenLwt_result_syntaxinlet*?()=check_batcher_configpluginnode_ctxt.Node_context.config.batcherinletnode_ctxt=Node_context.readonlynode_ctxtinlet+worker=Worker.launchtable(){node_ctxt;plugin}(moduleHandlers)inLwt.wakeupworker_wakerworkerletstart_in_modemode=letopenConfigurationinmatchmodewith|Batcher|Operator->true|Observer|Accuser|Bailout|Maintenance->false|Customops->purpose_matches_mode(Customops)Batchingletinitplugin(node_ctxt:_Node_context.t)=letopenLwt_result_syntaxinmatchLwt.stateworker_promisewith|Lwt.Return_->(* Worker already started, nothing to do. *)return_unit|Lwt.Failexn->(* Worker crashed, not recoverable. *)fail[Rollup_node_errors.No_batcher;Exnexn]|Lwt.Sleep->(* Never started, start it. *)ifstart_in_modenode_ctxt.config.modethenstartpluginnode_ctxtelsereturn_unit(* This is a batcher worker for a single scoru *)letworker=lazy(matchLwt.stateworker_promisewith|Lwt.Returnworker->Okworker|Lwt.Failexn->Error(Error_monad.error_of_exnexn)|Lwt.Sleep->ErrorRollup_node_errors.No_batcher)letactive()=matchLwt.stateworker_promisewith|Lwt.Return_->true|Lwt.Fail_|Lwt.Sleep->falseletfind_messageid=letopenResult_syntaxinlet+w=Result.map_errorTzTrace.make(Lazy.forceworker)inletstate=Worker.statewinMessage_queue.find_optstate.messagesidletget_queue()=letopenResult_syntaxinlet+w=Result.map_errorTzTrace.make(Lazy.forceworker)inletstate=Worker.statewinMessage_queue.bindingsstate.messageslethandle_request_errorrq=letopenLwt_syntaxinlet*rqinmatchrqwith|Okres->return_okres|Error(Worker.Request_errorerrs)->Lwt.return_errorerrs|Error(ClosedNone)->Lwt.return_error[Worker_types.Terminated]|Error(Closed(Someerrs))->Lwt.return_errorerrs|Error(Anyexn)->Lwt.return_error[Exnexn]letregister_messagesmessages=letopenLwt_result_syntaxinlet*w=lwt_map_errorTzTrace.make(Lwt.return(Lazy.forceworker))inWorker.Queue.push_request_and_waitw(Request.Registermessages)|>handle_request_errorletproduce_batches()=letopenLwt_result_syntaxinmatchLazy.forceworkerwith|ErrorRollup_node_errors.No_batcher->(* There is no batcher, nothing to do *)return_unit|Errore->tzfaile|Okw->Worker.Queue.push_request_and_waitwRequest.Produce_batches|>handle_request_errorletshutdown()=matchLazy.forceworkerwith|Error_->(* There is no batcher, nothing to do *)Lwt.return_unit|Okw->Worker.shutdownwletmessage_statusstatemsg_id=matchMessage_queue.find_optstate.messagesmsg_idwith|Somemsg->Some(Pending_batch,L2_message.contentmsg)|None->(matchBatched_messages.find_optstate.batchedmsg_idwith|Some{content;l1_id}->Some(Batchedl1_id,content)|None->None)letmessage_statusmsg_id=letopenResult_syntaxinlet+w=Result.map_errorTzTrace.make(Lazy.forceworker)inletstate=Worker.statewinmessage_statusstatemsg_id