123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819(*****************************************************************************)(* *)(* Open Source License *)(* Copyright (c) 2018 Dynamic Ledger Solutions, Inc. <contact@tezos.com> *)(* Copyright (c) 2018-2021 Nomadic Labs, <contact@nomadic-labs.com> *)(* *)(* Permission is hereby granted, free of charge, to any person obtaining a *)(* copy of this software and associated documentation files (the "Software"),*)(* to deal in the Software without restriction, including without limitation *)(* the rights to use, copy, modify, merge, publish, distribute, sublicense, *)(* and/or sell copies of the Software, and to permit persons to whom the *)(* Software is furnished to do so, subject to the following conditions: *)(* *)(* The above copyright notice and this permission notice shall be included *)(* in all copies or substantial portions of the Software. *)(* *)(* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR*)(* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, *)(* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL *)(* THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER*)(* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING *)(* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER *)(* DEALINGS IN THE SOFTWARE. *)(* *)(*****************************************************************************)(** An error returned when trying to communicate with a worker that
has been closed.*)typeworker_name={base:string;name:string}moduletypeT=sigmoduleName:Worker_intf.NAMEmoduleRequest:Worker_intf.REQUESTmoduleTypes:Worker_intf.TYPES(** A handle to a specific worker, parameterized by the type of
internal message buffer. *)type'kindt(** A handle to a table of workers. *)type'kindtable(** Internal buffer kinds used as parameters to {!t}. *)type'aqueueandboundedandinfinitetypedropboxtype'amessage_error=|Closedoferrorlistoption|Request_errorof'a|Anyofexn(** Supported kinds of internal buffers. *)type_buffer_kind=|Queue:infinitequeuebuffer_kind|Bounded:{size:int}->boundedqueuebuffer_kind|Dropbox:{merge:dropboxt->any_request->any_requestoption->any_requestoption;}->dropboxbuffer_kindandany_request=Any_request:_Request.t->any_request(** Create a table of workers. *)valcreate_table:'kindbuffer_kind->'kindtable(** The callback handlers specific to each worker instance. *)moduletypeHANDLERS=sig(** Placeholder replaced with {!t} with the right parameters
provided by the type of buffer chosen at {!launch}.*)typeselftypelaunch_error(** Builds the initial internal state of a worker at launch.
It is possible to initialize the message queue.
Of course calling {!state} will fail at that point. *)valon_launch:self->Name.t->Types.parameters->(Types.state,launch_error)resultLwt.t(** The main request processor, i.e. the body of the event loop. *)valon_request:self->('a,'request_error)Request.t->('a,'request_error)resultLwt.t(** Called when no request has been made before the timeout, if
the parameter has been passed to {!launch}. *)valon_no_request:self->unitLwt.t(** A function called when terminating a worker. *)valon_close:self->unitLwt.t(** A function called at the end of the worker loop in case of an
abnormal error. This function can handle the error by
returning [Ok ()], or leave the default unexpected error
behaviour by returning its parameter. A possibility is to
handle the error for ad-hoc logging, and still use
{!trigger_shutdown} to kill the worker. *)valon_error:self->Worker_types.request_status->('a,'request_error)Request.t->'request_error->unittzresultLwt.t(** A function called at the end of the worker loop in case of a
successful treatment of the current request. *)valon_completion:self->('a,'request_error)Request.t->'a->Worker_types.request_status->unitLwt.tend(** Creates a new worker instance.
Parameter [queue_size] not passed means unlimited queue. *)vallaunch:'kindtable->?timeout:Time.System.Span.t->Name.t->Types.parameters->(moduleHANDLERSwithtypeself='kindtandtypelaunch_error='launch_error)->('kindt,'launch_error)resultLwt.t(** Triggers a worker termination and waits for its completion.
Cannot be called from within the handlers. *)valshutdown:_t->unitLwt.tmoduletypeBOX=sigtypetvalput_request:t->('a,'request_error)Request.t->unitvalput_request_and_wait:t->('a,'request_error)Request.t->('a,'request_errormessage_error)resultLwt.tendmoduletypeQUEUE=sigtype'atvalpush_request_and_wait:'qt->('a,'request_error)Request.t->('a,'request_errormessage_error)resultLwt.tvalpush_request:'qt->('a,'request_error)Request.t->boolLwt.tvalpending_requests:'at->(Time.System.t*Request.view)listvalpending_requests_length:'at->intendmoduleDropbox:sigincludeBOXwithtypet:=dropboxtendmoduleQueue:sigincludeQUEUEwithtype'at:='aqueuet(** Adds a message to the queue immediately. *)valpush_request_now:infinitequeuet->('a,'request_error)Request.t->unitend(** Exports the canceler to allow cancellation of other tasks when this
worker is shut down or when it dies. *)valcanceler:_t->Lwt_canceler.t(** Triggers a worker termination. *)valtrigger_shutdown:_t->unit(** Access the internal state, once initialized. *)valstate:_t->Types.statevalwith_state:_t->(Types.state->(unit,'b)resultLwt.t)->(unit,'b)resultLwt.t(** Introspect the message queue, gives the times requests were pushed. *)valpending_requests:_queuet->(Time.System.t*Request.view)list(** Get the running status of a worker. *)valstatus:_t->Worker_types.worker_status(** Get the request being treated by a worker.
Gives the time the request was pushed, and the time its
treatment started. *)valcurrent_request:_t->(Time.System.t*Time.System.t*Request.view)optionvalinformation:_t->Worker_types.worker_information(** Lists the running workers in this group. *)vallist:'atable->(Name.t*'at)list(** [find_opt table n] is [Some worker] if the [worker] is in the [table] and
has name [n]. *)valfind_opt:'atable->Name.t->'atoptionendmoduleMake_internal(Name:Worker_intf.NAME)(Request:Worker_intf.REQUEST)(Types:Worker_intf.TYPES)(Worker_events:Worker_events.Swithtypeview=Request.viewandtypecritical_error=tztrace)=structmoduleName=NamemoduleRequest=RequestmoduleTypes=TypesmoduleNametbl=Hashtbl.MakeSeeded(structtypet=Name.t(* See [src/lib_base/tzPervasives.ml] for an explanation *)[@@@ocaml.warning"-32"]lethash=Hashtbl.seeded_hashletseeded_hash=Hashtbl.seeded_hash[@@@ocaml.warning"+32"]letequal=Name.equalend)letbase_name=String.concat"-"Name.basetype'amessage_error=|Closedoferrorlistoption|Request_errorof'a|Anyofexntypemessage=|Message:('a,'b)Request.t*('a,'bmessage_error)resultLwt.uoption->messagetype'aqueueandboundedandinfinitetypedropboxtype_buffer_kind=|Queue:infinitequeuebuffer_kind|Bounded:{size:int}->boundedqueuebuffer_kind|Dropbox:{merge:dropboxt->any_request->any_requestoption->any_requestoption;}->dropboxbuffer_kindandany_request=Any_request:_Request.t->any_requestand_buffer=|Queue_buffer:(Time.System.t*message)Lwt_pipe.Unbounded.t->infinitequeuebuffer|Bounded_buffer:(Time.System.t*message)Lwt_pipe.Bounded.t->boundedqueuebuffer|Dropbox_buffer:(Time.System.t*message)Lwt_dropbox.t->dropboxbufferand'kindt={timeout:Time.System.Span.toption;parameters:Types.parameters;mutable(* only for init *)worker:unitLwt.t;mutable(* only for init *)state:Types.stateoption;buffer:'kindbuffer;canceler:Lwt_canceler.t;name:Name.t;id:int;mutablestatus:Worker_types.worker_status;mutablecurrent_request:(Time.System.t*Time.System.t*Request.view)option;table:'kindtable;}and'kindtable={buffer_kind:'kindbuffer_kind;mutablelast_id:int;instances:'kindtNametbl.t;}letextract_status_errorsw=matchw.statuswith|Worker_types.Launching_|Running_|Closing_->None|Closed(_,_,errs)->errsletqueue_item?ur=(Time.System.now(),Message(r,u))letdrop_requestwmergemessage_boxrequest=trymatchmatchLwt_dropbox.peekmessage_boxwith|None->mergew(Any_requestrequest)None|Some(_,Message(old,_))->Lwt.ignore_result(Lwt_dropbox.takemessage_box);mergew(Any_requestrequest)(Some(Any_requestold))with|None->()|Some(Any_requestneu)->Lwt_dropbox.putmessage_box(Time.System.now(),Message(neu,None))withLwt_dropbox.Closed->()letdrop_request_and_waitwmessage_boxrequest=lett,u=Lwt.wait()inLwt.catch(fun()->Lwt_dropbox.putmessage_box(queue_item~urequest);t)(function|Lwt_dropbox.Closed->Lwt.return_error(Closed(extract_status_errorsw))|exn->(* [Lwt_dropbox.put] can only raise [Closed] which is caught above.
We don't want to catch any other exception but we cannot use an
incomplete pattern like we would in a [try]-[with] construct so
we must explicitly match and re-raise [exn]. *)Lwt.return_error(Anyexn))moduletypeBOX=sigtypetvalput_request:t->('a,'request_error)Request.t->unitvalput_request_and_wait:t->('a,'request_error)Request.t->('a,'request_errormessage_error)resultLwt.tendmoduletypeQUEUE=sigtype'atvalpush_request_and_wait:'qt->('a,'request_error)Request.t->('a,'request_errormessage_error)resultLwt.tvalpush_request:'qt->('a,'request_error)Request.t->boolLwt.tvalpending_requests:'at->(Time.System.t*Request.view)listvalpending_requests_length:'at->intendmoduleDropbox=structletput_request(w:dropboxt)request=let(Dropbox{merge})=w.table.buffer_kindinlet(Dropbox_buffermessage_box)=w.bufferindrop_requestwmergemessage_boxrequestletput_request_and_wait(w:dropboxt)request=let(Dropbox_buffermessage_box)=w.bufferindrop_request_and_waitwmessage_boxrequestendmoduleQueue=structletpush_request(typea)(w:aqueuet)request=matchw.bufferwith|Queue_buffermessage_queue->ifLwt_pipe.Unbounded.is_closedmessage_queuethenLwt.return_falseelse(Lwt_pipe.Unbounded.pushmessage_queue(queue_itemrequest);(* because pushing on an unbounded pipe is immediate, we return within
Lwt explicitly for compatibility with the other case *)Lwt.return_true)|Bounded_buffermessage_queue->ifLwt_pipe.Bounded.is_closedmessage_queuethenLwt.return_falseelseletopenLwt_syntaxinlet*()=Lwt_pipe.Bounded.pushmessage_queue(queue_itemrequest)inLwt.return_trueletpush_request_now(w:infinitequeuet)request=let(Queue_buffermessage_queue)=w.bufferinifLwt_pipe.Unbounded.is_closedmessage_queuethen()elseLwt_pipe.Unbounded.pushmessage_queue(queue_itemrequest)letpush_request_and_wait(typea)(w:aqueuet)request=matchw.bufferwith|Queue_buffermessage_queue->(trylett,u=Lwt.wait()inLwt_pipe.Unbounded.pushmessage_queue(queue_item~urequest);twithLwt_pipe.Closed->Lwt.return_error(Closed(extract_status_errorsw)))|Bounded_buffermessage_queue->lett,u=Lwt.wait()inLwt.try_bind(fun()->Lwt_pipe.Bounded.pushmessage_queue(queue_item~urequest))(fun()->t)(function|Lwt_pipe.Closed->Lwt.return_error(Closed(extract_status_errorsw))|exn->Lwt.return_error(Anyexn))letpending_requests(typea)(w:aqueuet)=letpeeked=trymatchw.bufferwith|Queue_buffermessage_queue->Lwt_pipe.Unbounded.peek_all_nowmessage_queue|Bounded_buffermessage_queue->Lwt_pipe.Bounded.peek_all_nowmessage_queuewithLwt_pipe.Closed->[]inList.map(functiont,Message(req,_)->(t,Request.viewreq))peekedletpending_requests_length(typea)(w:aqueuet)=letpipe_length(typea)(q:abuffer)=matchqwith|Queue_bufferqueue->Lwt_pipe.Unbounded.lengthqueue|Bounded_bufferqueue->Lwt_pipe.Bounded.lengthqueue|Dropbox_buffer_->1inpipe_lengthw.bufferendletclose(typea)(w:at)=letwakeup=function|_,Message(_,Someu)->Lwt.wakeup_lateru(Error(Closed(extract_status_errorsw)))|_,Message(_,None)->()inletclose_queuemessage_queue=letmessages=Lwt_pipe.Bounded.pop_all_nowmessage_queueinList.iterwakeupmessages;Lwt_pipe.Bounded.closemessage_queueinletclose_unbounded_queuemessage_queue=letmessages=Lwt_pipe.Unbounded.pop_all_nowmessage_queueinList.iterwakeupmessages;Lwt_pipe.Unbounded.closemessage_queueinmatchw.bufferwith|Queue_buffermessage_queue->close_unbounded_queuemessage_queue|Bounded_buffermessage_queue->close_queuemessage_queue|Dropbox_buffermessage_box->(tryOption.iterwakeup(Lwt_dropbox.peekmessage_box)withLwt_dropbox.Closed->());Lwt_dropbox.closemessage_boxletpop(typea)(w:at)=letopenLwt_syntaxinletpop_queuemessage_queue=matchw.timeoutwith|None->let*m=Lwt_pipe.Bounded.popmessage_queueinreturn_somem|Sometimeout->Lwt_pipe.Bounded.pop_with_timeout(Systime_os.sleeptimeout)message_queueinletpop_unbounded_queuemessage_queue=matchw.timeoutwith|None->let*m=Lwt_pipe.Unbounded.popmessage_queueinreturn_somem|Sometimeout->Lwt_pipe.Unbounded.pop_with_timeout(Systime_os.sleeptimeout)message_queueinmatchw.bufferwith|Queue_buffermessage_queue->pop_unbounded_queuemessage_queue|Bounded_buffermessage_queue->pop_queuemessage_queue|Dropbox_buffermessage_box->(matchw.timeoutwith|None->let*m=Lwt_dropbox.takemessage_boxinreturn_somem|Sometimeout->Lwt_dropbox.take_with_timeout(Systime_os.sleeptimeout)message_box)lettrigger_shutdownw=Lwt.ignore_result(Lwt_canceler.cancelw.canceler)letcanceler{canceler;_}=cancelermoduletypeHANDLERS=sigtypeselftypelaunch_errorvalon_launch:self->Name.t->Types.parameters->(Types.state,launch_error)resultLwt.tvalon_request:self->('a,'request_error)Request.t->('a,'request_error)resultLwt.tvalon_no_request:self->unitLwt.tvalon_close:self->unitLwt.tvalon_error:self->Worker_types.request_status->('a,'request_error)Request.t->'request_error->unittzresultLwt.tvalon_completion:self->('a,'request_error)Request.t->'a->Worker_types.request_status->unitLwt.tendletcreate_tablebuffer_kind={buffer_kind;last_id=0;instances=Nametbl.create~random:true10}letclose(typekind)handlers(w:kindt)errs=(* FIXME: https://gitlab.com/tezos/tezos/-/issues/3264
close should be called only once in a worker lifetime *)let(moduleHandlers:HANDLERSwithtypeself=kindt)=handlersinletopenLwt_syntaxinmatchw.statuswith(* Launching is not accessible from here, as the only occurrence
in form [launch] and happens before the call to [worker_loop] *)|Closed_|Closing_|Launching_->Lwt.return_unit|Runningt0->w.status<-Closing(t0,Time.System.now());closew;let*()=Error_monad.cancel_with_exceptionsw.cancelerinw.status<-Closed(t0,Time.System.now(),errs);let*()=Handlers.on_closewinNametbl.removew.table.instancesw.name;w.state<-None;return_unitletworker_loop(typekind)handlers(w:kindt)=let(moduleHandlers:HANDLERSwithtypeself=kindt)=handlersinletopenLwt_syntaxinletrecloop()=(* The call to [protect] here allows the call to [pop] (responsible
for fetching the next request) to be canceled by the use of the
[canceler].
These cancellations cannot affect the processing of ongoing requests.
This is due to the limited scope of the argument of [protect]. As a
result, ongoing requests are never canceled by this mechanism.
In the case when the [canceler] is canceled whilst a request is being
processed, the processing eventually resolves, at which point a
recursive call to this [loop] at which point this call to [protect]
fails immediately with [Canceled]. *)let*popped=protect_result(fun()->popw)inmatchpoppedwith|Errorexn->raiseexn|OkNone->let*()=Handlers.on_no_requestwinloop()|Ok(Some(pushed,Message(request,u)))->(letcurrent_request=Request.viewrequestinlettreated=Time.System.now()inw.current_request<-Some(pushed,treated,current_request);let*r=matchuwith|None->(letopenLwt_result_syntaxinlet*!res=Handlers.on_requestwrequestinmatchreswith|Errorerr->Lwt.return_errorerr|Okres->letcompleted=Time.System.now()inw.current_request<-None;letstatus=Worker_types.{pushed;treated;completed}inlet*!()=Handlers.on_completionwrequestresstatusinlet*!()=Worker_events.(emitrequest_no_errors)(current_request,status)inreturn_unit)|Someu->((* [res] is a result. But the side effect [wakeup]
needs to happen regardless of success (Ok) or failure
(Error). To that end, we treat it locally like a regular
promise (which happens to carry a [result]) within the Lwt
monad. *)let*res=Handlers.on_requestwrequestinmatchreswith|Errorerr->Lwt.wakeup_lateru(Error(Request_errorerr));Lwt.return(Errorerr)|Okres->Lwt.wakeup_lateru(Okres);letcompleted=Time.System.now()inletstatus=Worker_types.{pushed;treated;completed}inw.current_request<-None;let*()=Handlers.on_completionwrequestresstatusinlet*()=Worker_events.(emitrequest_no_errors)(current_request,status)inreturn(Ok()))inmatchrwith|Ok()->loop()|Errorerr->(let*r=matchw.current_requestwith|Some(pushed,treated,_request_view)->letcompleted=Time.System.now()inw.current_request<-None;Handlers.on_errorwWorker_types.{pushed;treated;completed}requesterr|None->assertfalseinmatchrwith|Ok()->loop()|Errorerrs->let*()=Worker_events.(emitcrashed)errsinclosehandlersw(Someerrs)))inlet*r=protect_result~canceler:w.canceler(fun()->loop())inmatchrwith|Ok()->Lwt.return_unit|ErrorLwt.Canceled|ErrorLwt_pipe.Closed|ErrorLwt_dropbox.Closed->let*()=Worker_events.(emitterminated)()inclosehandlerswNone|Errorexn->let*()=Worker_events.(emitcrashed)[Exnexn]inraiseexnletlaunch:typekindlaunch_error.kindtable->?timeout:Time.System.Span.t->Name.t->Types.parameters->(moduleHANDLERSwithtypeself=kindtandtypelaunch_error=launch_error)->(kindt,launch_error)resultLwt.t=funtable?timeoutnameparameters(moduleHandlers)->letname_s=Format.asprintf"%a"Name.ppnameinletfull_name=ifname_s=""thenbase_nameelseFormat.asprintf"%s_%s"base_namename_sinifNametbl.memtable.instancesnametheninvalid_arg(Format.asprintf"Worker.launch: duplicate worker %s"full_name)elseletid=table.last_id<-table.last_id+1;table.last_idinletid_name=ifname_s=""thenbase_nameelseFormat.asprintf"%s_%d"base_nameidinletcanceler=Lwt_canceler.create()inletbuffer:kindbuffer=matchtable.buffer_kindwith|Queue->Queue_buffer(Lwt_pipe.Unbounded.create())|Bounded{size}->Bounded_buffer(Lwt_pipe.Bounded.create~max_size:size~compute_size:(fun_->1)())|Dropbox_->Dropbox_buffer(Lwt_dropbox.create())inletw={parameters;name;canceler;table;buffer;state=None;id;worker=Lwt.return_unit;timeout;current_request=None;status=Launching(Time.System.now());}inNametbl.addtable.instancesnamew;letopenLwt_result_syntaxinlet*!()=ifid_name=base_namethenWorker_events.(emitstarted)()elseWorker_events.(emitstarted_for)name_sinlet*state=Handlers.on_launchwnameparametersinw.status<-Running(Time.System.now());w.state<-Somestate;w.worker<-Lwt_utils.workerfull_name~on_event:Internal_event.Lwt_worker_logger.on_event~run:(fun()->worker_loop(moduleHandlers)w)~cancel:(fun()->Error_monad.cancel_with_exceptionsw.canceler);returnwletshutdownw=(* The actual cancellation ([Lwt_canceler.cancel w.canceler]) resolves
immediately because no hooks are registered on the canceler. However, the
worker ([w.worker]) resolves only once the ongoing request has resolved
(if any) and some clean-up operations have completed. *)letopenLwt_syntaxinlet*()=Worker_events.(emittriggering_shutdown)()inlet*()=Error_monad.cancel_with_exceptionsw.cancelerinw.workerletstatew=match(w.state,w.status)with|None,Launching_->invalid_arg(Format.asprintf"Worker.state (%s[%a]): state called before worker was initialized"base_nameName.ppw.name)|None,(Closing_|Closed_)->invalid_arg(Format.asprintf"Worker.state (%s[%a]): state called after worker was terminated"base_nameName.ppw.name)|None,_->assertfalse|Somestate,_->stateletwith_state:_t->(Types.state->(unit,'b)resultLwt.t)->(unit,'b)resultLwt.t=funwf->matchw.statewith|Somestate->fstate|None->Lwt_result_syntax.return_unitletpending_requestsq=Queue.pending_requestsqletstatus{status;_}=statusletcurrent_request{current_request;_}=current_requestletinformation(typea)(w:at)={Worker_types.instances_number=Nametbl.lengthw.table.instances;wstatus=w.status;queue_length=(matchw.bufferwith|Queue_bufferpipe->Lwt_pipe.Unbounded.lengthpipe|Bounded_bufferpipe->Lwt_pipe.Bounded.lengthpipe|Dropbox_buffer_->1);}letlist{instances;_}=Nametbl.fold(funnwacc->(n,w)::acc)instances[]letfind_opt{instances;_}=Nametbl.findinstanceslet()=Internal_event.register_section(Internal_event.Section.make_sanitizedName.base)endmoduleMakeGroup(Name:Worker_intf.NAME)(Request:Worker_intf.REQUEST)=structmoduleEvents=Worker_events.Make(Name)(Request)(structtypet=tztraceletencoding=Error_monad.trace_encodingletpp=Error_monad.pp_print_traceend)moduleMakeWorker(Types:Worker_intf.TYPES)=structincludeMake_internal(Name)(Request)(Types)(Events)endendmoduleMakeSingle(Name:Worker_intf.NAME)(Request:Worker_intf.REQUEST)(Types:Worker_intf.TYPES)=structmoduleWG=MakeGroup(Name)(Request)includeWG.MakeWorker(Types)end