123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220(*****************************************************************************)(* *)(* Open Source License *)(* Copyright (c) 2022 Nomadic Labs, <contact@nomadic-labs.com> *)(* *)(* Permission is hereby granted, free of charge, to any person obtaining a *)(* copy of this software and associated documentation files (the "Software"),*)(* to deal in the Software without restriction, including without limitation *)(* the rights to use, copy, modify, merge, publish, distribute, sublicense, *)(* and/or sell copies of the Software, and to permit persons to whom the *)(* Software is furnished to do so, subject to the following conditions: *)(* *)(* The above copyright notice and this permission notice shall be included *)(* in all copies or substantial portions of the Software. *)(* *)(* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR*)(* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, *)(* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL *)(* THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER*)(* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING *)(* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER *)(* DEALINGS IN THE SOFTWARE. *)(* *)(*****************************************************************************)moduleParameters=structtypepersistent_state={data_dir:string;rpc_host:string;rpc_port:int;listen_addr:string;(** The TCP address and port at which this instance can be reached. *)node:Node.t;client:Client.t;mutablepending_ready:unitoptionLwt.ulist;}typesession_state={mutableready:bool}letbase_default_name="octez-dal-node"letdefault_colors=Log.Color.[|FG.gray;FG.magenta;FG.yellow;FG.green|]endopenParametersincludeDaemon.Make(Parameters)letwaitdal_node=matchdal_node.statuswith|Not_running->Test.fail"DAL node %s is not running, cannot wait for it to terminate"(namedal_node)|Running{process;_}->Process.waitprocessletis_running_not_readydal_node=matchdal_node.statuswith|Running{session_state={ready};_}whennotready->true|_->falseletnamedal_node=dal_node.nameletrpc_hostdal_node=dal_node.persistent_state.rpc_hostletrpc_portdal_node=dal_node.persistent_state.rpc_portletlisten_addrdal_node=dal_node.persistent_state.listen_addrletlayer1_addrdal_node=Node.rpc_hostdal_node.persistent_state.nodeletlayer1_portdal_node=Node.rpc_portdal_node.persistent_state.nodeletendpointdal_node=Printf.sprintf"http://%s:%d"(rpc_hostdal_node)(rpc_portdal_node)letdata_dirdal_node=dal_node.persistent_state.data_dirletspawn_commanddal_node=Process.spawn~name:dal_node.name~color:dal_node.colordal_node.pathletspawn_config_init?(use_unsafe_srs=true)dal_node=spawn_commanddal_node@@List.filter_mapFun.id[Some"init-config";Some"--data-dir";Some(data_dirdal_node);Some"--rpc-port";Some(string_of_int(rpc_portdal_node));Some"--rpc-addr";Some(rpc_hostdal_node);Some"--net-addr";Some(listen_addrdal_node);(ifuse_unsafe_srsthenSome"--use-unsafe-srs-for-tests"elseNone);]letinit_config?use_unsafe_srsdal_node=letprocess=spawn_config_init?use_unsafe_srsdal_nodeinlet*output=Process.check_and_read_stdoutprocessinmatchoutput=~*rex"DAL node configuration written in ([^\n]*)"with|None->failwith"DAL node configuration initialization failed"|Somefilename->returnfilenamemoduleConfig_file=structletfilenamedal_node=sf"%s/config.json"@@data_dirdal_nodeletreaddal_node=JSON.parse_file(filenamedal_node)letwritedal_nodeconfig=JSON.encode_to_file(filenamedal_node)configletupdatedal_nodeupdate=readdal_node|>update|>writedal_nodeendletcheck_event?timeout?wheredal_nodenamepromise=let*result=matchtimeoutwith|None->promise|Sometimeout->Lwt.pick[promise;(let*()=Lwt_unix.sleeptimeoutinLwt.returnNone);]inmatchresultwith|None->raise(Terminated_before_event{daemon=dal_node.name;event=name;where})|Somex->returnxlettrigger_readydal_nodevalue=letpending=dal_node.persistent_state.pending_readyindal_node.persistent_state.pending_ready<-[];List.iter(funpending->Lwt.wakeup_laterpendingvalue)pendingletset_readydal_node=(matchdal_node.statuswith|Not_running->()|Runningstatus->status.session_state.ready<-true);trigger_readydal_node(Some())letwait_for_readydal_node=matchdal_node.statuswith|Running{session_state={ready=true;_};_}->unit|Not_running|Running{session_state={ready=false;_};_}->letpromise,resolver=Lwt.task()indal_node.persistent_state.pending_ready<-resolver::dal_node.persistent_state.pending_ready;check_eventdal_node"dal_node_is_ready.v0"promiselethandle_eventdal_node{name;value=_;timestamp=_}=matchnamewith"dal_node_is_ready.v0"->set_readydal_node|_->()letcreate?(path=Constant.dal_node)?name?color?data_dir?event_pipe?(rpc_host="127.0.0.1")?rpc_port?listen_addr~node~client()=letname=matchnamewithNone->fresh_name()|Somename->nameinletdata_dir=matchdata_dirwithNone->Temp.dirname|Somedir->dirinletrpc_port=matchrpc_portwithNone->Port.fresh()|Someport->portinletlisten_addr=matchlisten_addrwith|None->Format.sprintf"127.0.0.1:%d"@@Port.fresh()|Someaddr->addrinletdal_node=create~path~name?color?event_pipe{data_dir;rpc_host;rpc_port;listen_addr;pending_ready=[];node;client;}inon_eventdal_node(handle_eventdal_node);dal_nodeletmake_argumentsnode=letbase_dir_args=["--base-dir";Client.base_dirnode.persistent_state.client]inletendpoint_args=["--endpoint";Printf.sprintf"http://%s:%d"(layer1_addrnode)(layer1_portnode);]inbase_dir_args@endpoint_argsletdo_runlike_command?envnodearguments=ifnode.status<>Not_runningthenTest.fail"DAL node %s is already running"node.name;leton_terminate_status=trigger_readynodeNone;unitinletarguments=make_argumentsnode@argumentsinrun?envnode{ready=false}arguments~on_terminateletrun?envnode=do_runlike_command?envnode["run";"--data-dir";node.persistent_state.data_dir]letrun?(wait_ready=true)?envnode=let*()=run?envnodeinlet*()=ifwait_readythenwait_for_readynodeelseLwt.return_unitinreturn()