123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768(*****************************************************************************)(* *)(* Open Source License *)(* Copyright (c) 2018 Dynamic Ledger Solutions, Inc. <contact@tezos.com> *)(* Copyright (c) 2018-2021 Nomadic Labs, <contact@nomadic-labs.com> *)(* *)(* Permission is hereby granted, free of charge, to any person obtaining a *)(* copy of this software and associated documentation files (the "Software"),*)(* to deal in the Software without restriction, including without limitation *)(* the rights to use, copy, modify, merge, publish, distribute, sublicense, *)(* and/or sell copies of the Software, and to permit persons to whom the *)(* Software is furnished to do so, subject to the following conditions: *)(* *)(* The above copyright notice and this permission notice shall be included *)(* in all copies or substantial portions of the Software. *)(* *)(* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR*)(* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, *)(* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL *)(* THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER*)(* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING *)(* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER *)(* DEALINGS IN THE SOFTWARE. *)(* *)(*****************************************************************************)(** An error returned when trying to communicate with a worker that
has been closed.*)typeworker_name={base:string;name:string}typeError_monad.error+=Closedofworker_namelet()=register_error_kind`Permanent~id:"worker.closed"~title:"Worker closed"~description:"An operation on a worker could not complete before it was shut down."~pp:(funppfw->Format.fprintfppf"Worker %s[%s] has been shut down."w.basew.name)Data_encoding.(conv(fun{base;name}->(base,name))(fun(name,base)->{base;name})(obj1(req"worker"(tup2stringstring))))(functionClosedw->Somew|_->None)(funw->Closedw)moduletypeT=sigmoduleName:Worker_intf.NAMEmoduleEvent:Worker_intf.EVENTmoduleRequest:Worker_intf.REQUESTmoduleTypes:Worker_intf.TYPES(** A handle to a specific worker, parameterized by the type of
internal message buffer. *)type'kindt(** A handle to a table of workers. *)type'kindtable(** Internal buffer kinds used as parameters to {!t}. *)type'aqueueandboundedandinfinitetypedropbox(** Supported kinds of internal buffers. *)type_buffer_kind=|Queue:infinitequeuebuffer_kind|Bounded:{size:int}->boundedqueuebuffer_kind|Dropbox:{merge:dropboxt->any_request->any_requestoption->any_requestoption;}->dropboxbuffer_kindandany_request=Any_request:_Request.t->any_request(** Create a table of workers. *)valcreate_table:'kindbuffer_kind->'kindtable(** The callback handlers specific to each worker instance. *)moduletypeHANDLERS=sig(** Placeholder replaced with {!t} with the right parameters
provided by the type of buffer chosen at {!launch}.*)typeself(** Builds the initial internal state of a worker at launch.
It is possible to initialize the message queue.
Of course calling {!state} will fail at that point. *)valon_launch:self->Name.t->Types.parameters->Types.statetzresultLwt.t(** The main request processor, i.e. the body of the event loop. *)valon_request:self->'aRequest.t->'atzresultLwt.t(** Called when no request has been made before the timeout, if
the parameter has been passed to {!launch}. *)valon_no_request:self->unittzresultLwt.t(** A function called when terminating a worker. *)valon_close:self->unitLwt.t(** A function called at the end of the worker loop in case of an
abnormal error. This function can handle the error by
returning [Ok ()], or leave the default unexpected error
behaviour by returning its parameter. A possibility is to
handle the error for ad-hoc logging, and still use
{!trigger_shutdown} to kill the worker. *)valon_error:self->Request.view->Worker_types.request_status->errorlist->unittzresultLwt.t(** A function called at the end of the worker loop in case of a
successful treatment of the current request. *)valon_completion:self->'aRequest.t->'a->Worker_types.request_status->unitLwt.tend(** Creates a new worker instance.
Parameter [queue_size] not passed means unlimited queue. *)vallaunch:'kindtable->?timeout:Time.System.Span.t->Name.t->Types.parameters->(moduleHANDLERSwithtypeself='kindt)->'kindttzresultLwt.t(** Triggers a worker termination and waits for its completion.
Cannot be called from within the handlers. *)valshutdown:_t->unitLwt.tmoduletypeBOX=sigtypetvalput_request:t->'aRequest.t->unitvalput_request_and_wait:t->'aRequest.t->'atzresultLwt.tendmoduletypeQUEUE=sigtype'atvalpush_request_and_wait:'qt->'aRequest.t->'atzresultLwt.tvalpush_request:'qt->'aRequest.t->unitLwt.tvalpending_requests:'at->(Time.System.t*Request.view)listvalpending_requests_length:'at->intendmoduleDropbox:sigincludeBOXwithtypet:=dropboxtendmoduleQueue:sigincludeQUEUEwithtype'at:='aqueuet(** Adds a message to the queue immediately. *)valpush_request_now:infinitequeuet->'aRequest.t->unitend(** Detects cancellation from within the request handler to stop
asynchronous operations. *)valprotect:_t->?on_error:(errorlist->'btzresultLwt.t)->(unit->'btzresultLwt.t)->'btzresultLwt.t(** Exports the canceler to allow cancellation of other tasks when this
worker is shut down or when it dies. *)valcanceler:_t->Lwt_canceler.t(** Triggers a worker termination. *)valtrigger_shutdown:_t->unit(** Record an event in the backlog. *)valrecord_event:_t->Event.t->unit(** Record an event and make sure it is logged. *)vallog_event:_t->Event.t->unitLwt.t(** Access the internal state, once initialized. *)valstate:_t->Types.state(** Introspect the message queue, gives the times requests were pushed. *)valpending_requests:_queuet->(Time.System.t*Request.view)list(** Get the running status of a worker. *)valstatus:_t->Worker_types.worker_status(** Get the request being treated by a worker.
Gives the time the request was pushed, and the time its
treatment started. *)valcurrent_request:_t->(Time.System.t*Time.System.t*Request.view)optionvalinformation:_t->Worker_types.worker_information(** Lists the running workers in this group. *)vallist:'atable->(Name.t*'at)list(** [find_opt table n] is [Some worker] if the [worker] is in the [table] and
has name [n]. *)valfind_opt:'atable->Name.t->'atoptionendmoduleMake(Name:Worker_intf.NAME)(Event:Worker_intf.EVENT)(Request:Worker_intf.REQUEST)(Types:Worker_intf.TYPES)(Logger:Worker_intf.LOGGERwithmoduleEvent=EventandtypeRequest.view=Request.view)=structmoduleName=NamemoduleEvent=EventmoduleRequest=RequestmoduleTypes=TypesmoduleLogger=LoggermoduleNametbl=Hashtbl.MakeSeeded(structtypet=Name.tlethash=Hashtbl.seeded_hashletequal=Name.equalend)letbase_name=String.concat"-"Name.basetypemessage=Message:'aRequest.t*'atzresultLwt.uoption->messagetype'aqueueandboundedandinfinitetypedropboxtype_buffer_kind=|Queue:infinitequeuebuffer_kind|Bounded:{size:int}->boundedqueuebuffer_kind|Dropbox:{merge:dropboxt->any_request->any_requestoption->any_requestoption;}->dropboxbuffer_kindandany_request=Any_request:_Request.t->any_requestand_buffer=|Queue_buffer:(Time.System.t*message)Lwt_pipe.Unbounded.t->infinitequeuebuffer|Bounded_buffer:(Time.System.t*message)Lwt_pipe.Bounded.t->boundedqueuebuffer|Dropbox_buffer:(Time.System.t*message)Lwt_dropbox.t->dropboxbufferand'kindt={timeout:Time.System.Span.toption;parameters:Types.parameters;mutable(* only for init *)worker:unitLwt.t;mutable(* only for init *)state:Types.stateoption;buffer:'kindbuffer;canceler:Lwt_canceler.t;name:Name.t;id:int;mutablestatus:Worker_types.worker_status;mutablecurrent_request:(Time.System.t*Time.System.t*Request.view)option;logEvent:(moduleInternal_event.EVENTwithtypet=Logger.t);table:'kindtable;}and'kindtable={buffer_kind:'kindbuffer_kind;mutablelast_id:int;instances:'kindtNametbl.t;}letqueue_item?ur=(Systime_os.now(),Message(r,u))letdrop_requestwmergemessage_boxrequest=trymatchmatchLwt_dropbox.peekmessage_boxwith|None->mergew(Any_requestrequest)None|Some(_,Message(old,_))->Lwt.ignore_result(Lwt_dropbox.takemessage_box);mergew(Any_requestrequest)(Some(Any_requestold))with|None->()|Some(Any_requestneu)->Lwt_dropbox.putmessage_box(Systime_os.now(),Message(neu,None))withLwt_dropbox.Closed->()letdrop_request_and_waitwmessage_boxrequest=let(t,u)=Lwt.wait()inLwt.catch(fun()->Lwt_dropbox.putmessage_box(queue_item~urequest);t)(function|Lwt_dropbox.Closed->letname=Format.asprintf"%a"Name.ppw.nameinfail(Closed{base=base_name;name})|exn->(* [Lwt_dropbox.put] can only raise [Closed] which is caught above.
We don't want to catch any other exception but we cannot use an
incomplete pattern like we would in a [try]-[with] construct so
we must explicitly match and re-raise [exn]. *)raiseexn)moduletypeBOX=sigtypetvalput_request:t->'aRequest.t->unitvalput_request_and_wait:t->'aRequest.t->'atzresultLwt.tendmoduletypeQUEUE=sigtype'atvalpush_request_and_wait:'qt->'aRequest.t->'atzresultLwt.tvalpush_request:'qt->'aRequest.t->unitLwt.tvalpending_requests:'at->(Time.System.t*Request.view)listvalpending_requests_length:'at->intendmoduleDropbox=structletput_request(w:dropboxt)request=let(Dropbox{merge})=w.table.buffer_kindinlet(Dropbox_buffermessage_box)=w.bufferindrop_requestwmergemessage_boxrequestletput_request_and_wait(w:dropboxt)request=let(Dropbox_buffermessage_box)=w.bufferindrop_request_and_waitwmessage_boxrequestendmoduleQueue=structletpush_request(typea)(w:aqueuet)request=matchw.bufferwith|Queue_buffermessage_queue->Lwt_pipe.Unbounded.pushmessage_queue(queue_itemrequest);(* because pushing on an unbounded pipe is immediate, we return within
Lwt explicitly for compatibility with the other case *)Lwt.return_unit|Bounded_buffermessage_queue->Lwt_pipe.Bounded.pushmessage_queue(queue_itemrequest)letpush_request_now(w:infinitequeuet)request=let(Queue_buffermessage_queue)=w.bufferinifLwt_pipe.Unbounded.is_closedmessage_queuethen()elseLwt_pipe.Unbounded.pushmessage_queue(queue_itemrequest)letpush_request_and_wait(typea)(w:aqueuet)request=matchw.bufferwith|Queue_buffermessage_queue->(trylet(t,u)=Lwt.wait()inLwt_pipe.Unbounded.pushmessage_queue(queue_item~urequest);twithLwt_pipe.Closed->letname=Format.asprintf"%a"Name.ppw.nameinfail(Closed{base=base_name;name}))|Bounded_buffermessage_queue->let(t,u)=Lwt.wait()inLwt.try_bind(fun()->Lwt_pipe.Bounded.pushmessage_queue(queue_item~urequest))(fun()->t)(function|Lwt_pipe.Closed->letname=Format.asprintf"%a"Name.ppw.nameinfail(Closed{base=base_name;name})|exn->raiseexn)letpending_requests(typea)(w:aqueuet)=letpeeked=trymatchw.bufferwith|Queue_buffermessage_queue->Lwt_pipe.Unbounded.peek_all_nowmessage_queue|Bounded_buffermessage_queue->Lwt_pipe.Bounded.peek_all_nowmessage_queuewithLwt_pipe.Closed->[]inList.map(function(t,Message(req,_))->(t,Request.viewreq))peekedletpending_requests_length(typea)(w:aqueuet)=letpipe_length(typea)(q:abuffer)=matchqwith|Queue_bufferqueue->Lwt_pipe.Unbounded.lengthqueue|Bounded_bufferqueue->Lwt_pipe.Bounded.lengthqueue|Dropbox_buffer_->1inpipe_lengthw.bufferendletclose(typea)(w:at)=letwakeup=function|(_,Message(_,Someu))->letname=Format.asprintf"%a"Name.ppw.nameinLwt.wakeup_lateru(error(Closed{base=base_name;name}))|(_,Message(_,None))->()inletclose_queuemessage_queue=letmessages=Lwt_pipe.Bounded.pop_all_nowmessage_queueinList.iterwakeupmessages;Lwt_pipe.Bounded.closemessage_queueinletclose_unbounded_queuemessage_queue=letmessages=Lwt_pipe.Unbounded.pop_all_nowmessage_queueinList.iterwakeupmessages;Lwt_pipe.Unbounded.closemessage_queueinmatchw.bufferwith|Queue_buffermessage_queue->close_unbounded_queuemessage_queue|Bounded_buffermessage_queue->close_queuemessage_queue|Dropbox_buffermessage_box->(tryOption.iterwakeup(Lwt_dropbox.peekmessage_box)withLwt_dropbox.Closed->());Lwt_dropbox.closemessage_boxletpop(typea)(w:at)=letopenLwt_syntaxinletpop_queuemessage_queue=matchw.timeoutwith|None->let*m=Lwt_pipe.Bounded.popmessage_queueinreturn_somem|Sometimeout->Lwt_pipe.Bounded.pop_with_timeout(Systime_os.sleeptimeout)message_queueinletpop_unbounded_queuemessage_queue=matchw.timeoutwith|None->let*m=Lwt_pipe.Unbounded.popmessage_queueinreturn_somem|Sometimeout->Lwt_pipe.Unbounded.pop_with_timeout(Systime_os.sleeptimeout)message_queueinmatchw.bufferwith|Queue_buffermessage_queue->pop_unbounded_queuemessage_queue|Bounded_buffermessage_queue->pop_queuemessage_queue|Dropbox_buffermessage_box->(matchw.timeoutwith|None->let*m=Lwt_dropbox.takemessage_boxinreturn_somem|Sometimeout->Lwt_dropbox.take_with_timeout(Systime_os.sleeptimeout)message_box)lettrigger_shutdownw=Lwt.ignore_result(Lwt_canceler.cancelw.canceler)letcanceler{canceler;_}=cancelerletlwt_emitw(status:Logger.status)=let(moduleLogEvent)=w.logEventinlettime=Systime_os.now()inLwt.bind(LogEvent.emit~section:(Internal_event.Section.make_sanitizedName.base)(fun()->Time.System.stamp~timestatus))(function|Ok()->Lwt.return_unit|Errorel->Format.kasprintfLwt.fail_with"Worker_event.emit: %a"pp_print_traceel)letlog_eventwevt=lwt_emitw(Logger.WorkerEvent(evt,Event.levelevt))letrecord_eventwevt=Lwt.ignore_result(log_eventwevt)moduletypeHANDLERS=sigtypeselfvalon_launch:self->Name.t->Types.parameters->Types.statetzresultLwt.tvalon_request:self->'aRequest.t->'atzresultLwt.tvalon_no_request:self->unittzresultLwt.tvalon_close:self->unitLwt.tvalon_error:self->Request.view->Worker_types.request_status->errorlist->unittzresultLwt.tvalon_completion:self->'aRequest.t->'a->Worker_types.request_status->unitLwt.tendletcreate_tablebuffer_kind={buffer_kind;last_id=0;instances=Nametbl.create~random:true10}letworker_loop(typekind)handlers(w:kindt)=let(moduleHandlers:HANDLERSwithtypeself=kindt)=handlersinletdo_closeerrs=letopenLwt_syntaxinlett0=matchw.statuswith|Runningt0->t0|Launching_|Closing_|Closed_->assertfalseinw.status<-Closing(t0,Systime_os.now());closew;let*()=Error_monad.cancel_with_exceptionsw.cancelerinw.status<-Closed(t0,Systime_os.now(),errs);let*()=Handlers.on_closewinNametbl.removew.table.instancesw.name;w.state<-None;return_unitinletrecloop()=(* The call to [protect] here allows the call to [pop] (responsible
for fetching the next request) to be canceled by the use of the
[canceler].
These cancellations cannot affect the processing of ongoing requests.
This is due to the limited scope of the argument of [protect]. As a
result, ongoing requests are never canceled by this mechanism.
In the case when the [canceler] is canceled whilst a request is being
processed, the processing eventually resolves, at which point a
recursive call to this [loop] at which point this call to [protect]
fails immediately with [Canceled]. *)Lwt.bindLwt_tzresult_syntax.(let*popped=protect~canceler:w.canceler(fun()->Lwt_result.ok@@popw)inmatchpoppedwith|None->Handlers.on_no_requestw|Some(pushed,Message(request,u))->(letcurrent_request=Request.viewrequestinlettreated_time=Systime_os.now()inw.current_request<-Some(pushed,treated_time,current_request);matchuwith|None->let*res=Handlers.on_requestwrequestinletcompleted_time=Systime_os.now()inlettreated=Ptime.difftreated_timepushedinletcompleted=Ptime.diffcompleted_timetreated_timeinw.current_request<-None;letstatus=Worker_types.{pushed;treated;completed}inlet*!()=Handlers.on_completionwrequestresstatusinlet*!()=lwt_emitw(Request(current_request,status,None))inreturn_unit|Someu->(* [res] is a result. But the side effect [wakeup]
needs to happen regardless of success (Ok) or failure
(Error). To that end, we treat it locally like a regular
promise (which happens to carry a [result]) within the Lwt
monad. *)let*!res=Handlers.on_requestwrequestinLwt.wakeup_laterures;let*?res=resinletcompleted_time=Systime_os.now()inlettreated=Ptime.difftreated_timepushedinletcompleted=Ptime.diffcompleted_timetreated_timeinletstatus=Worker_types.{pushed;treated;completed}inw.current_request<-None;let*!()=Handlers.on_completionwrequestresstatusinlet*!()=lwt_emitw(Request(current_request,status,None))inreturn_unit))Lwt_syntax.(function|Ok()->loop()|Error(Canceled::_)|Error(ExnLwt.Canceled::_)|Error(ExnLwt_pipe.Closed::_)|Error(ExnLwt_dropbox.Closed::_)->let*()=lwt_emitwTerminatedindo_closeNone|Errorerrs->(let*r=matchw.current_requestwith|Some(pushed,treated_time,request)->letcompleted_time=Systime_os.now()inlettreated=Ptime.difftreated_timepushedinletcompleted=Ptime.diffcompleted_timetreated_timeinw.current_request<-None;Handlers.on_errorwrequestWorker_types.{pushed;treated;completed}errs|None->assertfalseinmatchrwith|Ok()->loop()|Error(Timeout::_aserrs)->let*()=lwt_emitwTerminatedindo_close(Someerrs)|Errorerrs->let*()=lwt_emitw(Crashederrs)indo_close(Someerrs)))inloop()letlaunch:typekind.kindtable->?timeout:Time.System.Span.t->Name.t->Types.parameters->(moduleHANDLERSwithtypeself=kindt)->kindttzresultLwt.t=funtable?timeoutnameparameters(moduleHandlers)->letname_s=Format.asprintf"%a"Name.ppnameinletfull_name=ifname_s=""thenbase_nameelseFormat.asprintf"%s_%s"base_namename_sinifNametbl.memtable.instancesnametheninvalid_arg(Format.asprintf"Worker.launch: duplicate worker %s"full_name)elseletid=table.last_id<-table.last_id+1;table.last_idinletid_name=ifname_s=""thenbase_nameelseFormat.asprintf"%s_%d"base_nameidinletcanceler=Lwt_canceler.create()inletbuffer:kindbuffer=matchtable.buffer_kindwith|Queue->Queue_buffer(Lwt_pipe.Unbounded.create())|Bounded{size}->Bounded_buffer(Lwt_pipe.Bounded.create~max_size:size~compute_size:(fun_->1)())|Dropbox_->Dropbox_buffer(Lwt_dropbox.create())inletw={parameters;name;canceler;table;buffer;state=None;id;worker=Lwt.return_unit;timeout;current_request=None;logEvent=(moduleLogger.LogEvent);status=Launching(Systime_os.now());}inNametbl.addtable.instancesnamew;letopenLwt_tzresult_syntaxinletstarted=ifid_name=base_namethenNoneelseSomename_sinlet*!()=lwt_emitw(Startedstarted)inlet*state=Handlers.on_launchwnameparametersinw.status<-Running(Systime_os.now());w.state<-Somestate;w.worker<-Lwt_utils.workerfull_name~on_event:Internal_event.Lwt_worker_event.on_event~run:(fun()->worker_loop(moduleHandlers)w)~cancel:(fun()->Error_monad.cancel_with_exceptionsw.canceler);returnwletshutdownw=(* The actual cancellation ([Lwt_canceler.cancel w.canceler]) resolves
immediately because no hooks are registered on the canceler. However, the
worker ([w.worker]) resolves only once the ongoing request has resolved
(if any) and some clean-up operations have completed. *)letopenLwt_syntaxinlet*()=lwt_emitwTriggering_shutdowninlet*()=Error_monad.cancel_with_exceptionsw.cancelerinw.workerletstatew=match(w.state,w.status)with|(None,Launching_)->invalid_arg(Format.asprintf"Worker.state (%s[%a]): state called before worker was initialized"base_nameName.ppw.name)|(None,(Closing_|Closed_))->invalid_arg(Format.asprintf"Worker.state (%s[%a]): state called after worker was terminated"base_nameName.ppw.name)|(None,_)->assertfalse|(Somestate,_)->stateletpending_requestsq=Queue.pending_requestsqletstatus{status;_}=statusletcurrent_request{current_request;_}=current_requestletinformation(typea)(w:at)={Worker_types.instances_number=Nametbl.lengthw.table.instances;wstatus=w.status;queue_length=(matchw.bufferwith|Queue_bufferpipe->Lwt_pipe.Unbounded.lengthpipe|Bounded_bufferpipe->Lwt_pipe.Bounded.lengthpipe|Dropbox_buffer_->1);}letlist{instances;_}=Nametbl.fold(funnwacc->(n,w)::acc)instances[]letfind_opt{instances;_}=Nametbl.findinstances(* TODO? add a list of cancelers for nested protection ? *)letprotect{canceler;_}?on_errorf=protect?on_error~cancelerflet()=Internal_event.register_section(Internal_event.Section.make_sanitizedName.base)end