123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254(*****************************************************************************)(* *)(* Open Source License *)(* Copyright (c) 2023 Nomadic Labs, <contact@nomadic-labs.com> *)(* *)(* Permission is hereby granted, free of charge, to any person obtaining a *)(* copy of this software and associated documentation files (the "Software"),*)(* to deal in the Software without restriction, including without limitation *)(* the rights to use, copy, modify, merge, publish, distribute, sublicense, *)(* and/or sell copies of the Software, and to permit persons to whom the *)(* Software is furnished to do so, subject to the following conditions: *)(* *)(* The above copyright notice and this permission notice shall be included *)(* in all copies or substantial portions of the Software. *)(* *)(* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR*)(* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, *)(* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL *)(* THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER*)(* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING *)(* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER *)(* DEALINGS IN THE SOFTWARE. *)(* *)(*****************************************************************************)moduleEvent=structincludeInternal_event.Simpleletsection=["node";"main"]letshutting_down_rpc_process=declare_0~section~name:"shutting_down_rpc_process"~msg:"shutting down the RPC process"~level:Notice()letrpc_process_started=declare_1~section~name:"rpc_process_started"~msg:"RPC process was started on pid {pid}"~level:Notice("pid",Data_encoding.int31)letrpc_process_exited_abnormally=letopenUnixinletexit_status_encoding=letopenData_encodinginunion[case(Tag0)~title:"wexited"int31(functionWEXITEDi->Somei|_->None)(funi->WEXITEDi);case(Tag1)~title:"wsignaled"int31(functionWSIGNALEDi->Somei|_->None)(funi->WSIGNALEDi);case(Tag2)~title:"wstopped"int31(functionWSTOPPEDi->Somei|_->None)(funi->WSTOPPEDi);]indeclare_2~section~level:Error~name:"rpc_process_exited_status"~msg:"rpc process (pid {pid}) {status_msg}"("pid",Data_encoding.int31)~pp2:(funfmtstatus->matchstatuswith|WEXITEDi->Format.fprintffmt"terminated abnormally with exit code %i"i|WSIGNALEDi->Format.fprintffmt"was killed by signal %s"(Lwt_exit.signal_namei)|WSTOPPEDi->Format.fprintffmt"was stopped by signal %s"(Lwt_exit.signal_namei))("status_msg",exit_status_encoding)letcannot_start_rpc_process=declare_1~section~name:"cannot_start_rpc_process"~level:Error~msg:"cannot start rpc process: {trace}"("trace",Data_encoding.string)letwaiting_for_rpc_process_restart=declare_1~section~name:"waiting_for_rpc_process_restart"~level:Error~msg:"restarting RPC process in {sleep} seconds"("sleep",Data_encoding.float)end(* State of the worker. *)typet={mutableserver:Lwt_process.process_noneoption;stop:(int*Unix.process_status)Lwt.t;stopper:(int*Unix.process_status)Lwt.u;external_process_parameters:Parameters.t;}letcreate~comm_socket_path(config:Config_file.t)node_versionevents_config=letstop,stopper=Lwt.wait()in{server=None;stop;stopper;external_process_parameters={internal_events=events_config;config;rpc_comm_socket_path=comm_socket_path;node_version;};}letshutdownt=letopenLwt_syntaxinmatcht.serverwith|None->return_unit|Someprocess->let*()=Event.(emitshutting_down_rpc_process)()inprocess#terminate;return_unitletstopt=Lwt.wakeupt.stopper(0,Lwt_unix.WSTOPPED0);shutdowntletrun_servert()=letopenLwt_result_syntaxinletsocket_dir=Tezos_base_unix.Socket.get_temporary_socket_dir()inletsocket_dir_arg=["--socket-dir";socket_dir]inletargs="octez-rpc-process"::socket_dir_arginletprocess=Lwt_process.open_process_none~stdout:(`FD_copyUnix.stdout)~stderr:(`FD_copyUnix.stderr)(Sys.executable_name,Array.of_listargs)inletpid=process#pidinletinit_socket_path=Main.get_init_socket_pathsocket_dirpidinlet*init_socket_fd=let*fds=Tezos_base_unix.Socket.bind(Unixinit_socket_path)inmatchfdswith|[fd]->let*!init_socket_fd,_=Lwt_unix.accept~cloexec:truefdinlet*!()=Lwt_unix.closefdinreturninit_socket_fd|_->(* This assertions holds as long as
Tezos_base_unix.Socket.bind returns a single list element
when binding Unix sockets. *)assertfalseinlet*()=Tezos_base_unix.Socket.handshakeinit_socket_fdMain.socket_magicinlet*()=Socket.sendinit_socket_fdParameters.parameters_encodingt.external_process_parametersinlet*()=Socket.recvinit_socket_fdData_encoding.unitinlet*!()=Lwt_unix.closeinit_socket_fdinlet*!()=Event.(emitrpc_process_started)pidint.server<-Someprocess;returnt(* Evaluates [f]. If [f] fails, the error is caught, printed as an
error event, and [f] is re-evaluated after a [backoff] delay. The
delay increases at each failing try. *)letrecmay_startbackofff=letopenLwt_result_syntaxinlettimestamp,sleep=backoffinletnow=Time.System.now()inletdiff=Ptime.diffnowtimestampinifPtime.Span.to_float_sdiff>sleepthenprotect(fun()->f())~on_error:(function|errs->let*!()=Event.(emitcannot_start_rpc_process(Format.asprintf"%a"pp_print_traceerrs))inmay_start(Time.System.now(),sleep*.1.2)f)elselet*!()=Event.(emitwaiting_for_rpc_process_restartsleep)inlet*!()=Lwt_unix.sleepsleepinmay_start(timestamp,sleep)f(* Watch_dog make sure that the RPC process is restarted as soon as it
dies. *)letwatch_dogrun_server=letopenLwt_result_syntaxinletrecloopt=matcht.serverwith|None->let*new_server=may_start(Time.System.epoch,0.5)run_serverinloopnew_server|Someprocess->(letwait_pid_t=let*!_,status=Lwt_unix.waitpid[]process#pidin(* Sleep is necessary here to avoid waitpid to be faster than
the Lwt_exit stack. It avoids the clean_up_starts to be
pending while the node is properly shutting down. *)let*!()=Lwt_unix.sleep1.inLwt.return(`Wait_pidstatus)inletstop_t=let*!_=t.stopinLwt.return`Stoppedinlet*!res=Lwt.choose[wait_pid_t;stop_t]inmatchreswith|`Stopped->return_unit|`Wait_pid_whennot(Lwt.is_sleepingLwt_exit.clean_up_starts)->return_unit|`Wait_pidstatus->t.server<-None;let*!()=Event.(emitrpc_process_exited_abnormally)(process#pid,status)inlet*new_server=may_start(Time.System.epoch,0.5)run_serverinloopnew_server)inloopletstartserver=letopenLwt_result_syntaxinlet*new_server=run_serverserver()inlet_=watch_dog(run_serverserver)new_serverinreturn_unit