123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557(*****************************************************************************)(* *)(* Open Source License *)(* Copyright (c) 2020 Nomadic Labs <contact@nomadic-labs.com> *)(* *)(* Permission is hereby granted, free of charge, to any person obtaining a *)(* copy of this software and associated documentation files (the "Software"),*)(* to deal in the Software without restriction, including without limitation *)(* the rights to use, copy, modify, merge, publish, distribute, sublicense, *)(* and/or sell copies of the Software, and to permit persons to whom the *)(* Software is furnished to do so, subject to the following conditions: *)(* *)(* The above copyright notice and this permission notice shall be included *)(* in all copies or substantial portions of the Software. *)(* *)(* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR*)(* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, *)(* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL *)(* THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER*)(* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING *)(* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER *)(* DEALINGS IN THE SOFTWARE. *)(* *)(*****************************************************************************)(** Functor for the common parts of all Tezos daemons: node, baker,
endorser and accuser. Handles event handling in particular. *)moduletypePARAMETERS=sigtypepersistent_statetypesession_statevalbase_default_name:stringvaldefault_colors:Log.Color.tarrayendmoduleLevel=structtypedefault_level=[`Debug|`Info|`Notice]typelevel=[default_level|`Warning|`Error|`Fatal]letto_string=function|`Debug->"debug"|`Info->"info"|`Notice->"notice"|`Warning->"warning"|`Error->"error"|`Fatal->"fatal"endmoduleMake(X:PARAMETERS)=structexceptionTerminated_before_eventof{daemon:string;event:string;where:stringoption;}let()=Printexc.register_printer@@function|Terminated_before_event{daemon;event;where=None}->Some(sf"%s terminated before event occurred: %s"daemonevent)|Terminated_before_event{daemon;event;where=Somewhere}->Some(sf"%s terminated before event occurred: %s where %s"daemoneventwhere)|_->Nonetypesession_status={process:Process.t;stdin:Lwt_io.output_channel;session_state:X.session_state;mutableevent_loop_promise:unitLwt.toption;}typestatus=Not_running|Runningofsession_statustypeevent_handler=|Event_handler:{filter:JSON.t->'aoption;resolver:'aoptionLwt.u;}->event_handlertypeevent={name:string;value:JSON.t;timestamp:float}typet={name:string;color:Tezt.Log.Color.t;path:string;persistent_state:X.persistent_state;mutablestatus:status;event_pipe:string;mutablestdout_handlers:(string->unit)list;mutablestderr_handlers:(string->unit)list;mutablepersistent_event_handlers:(event->unit)list;mutableone_shot_event_handlers:event_handlerlistString_map.t;}letnamedaemon=daemon.name(* Having to wait more that 3 seconds after hitting Ctrl+C is already unreasonable.
We choose a timeout one order of magnitude larger to reduce flakiness in case
the CPU happens to be slower etc. *)letterminate?(timeout=30.)daemon=matchdaemon.statuswith|Not_running->unit|Running{event_loop_promise=None;_}->invalid_arg"you cannot call Daemon.terminate before Daemon.run returns"|Running{process;event_loop_promise=Someevent_loop_promise;_}->Process.terminate~timeoutprocess;event_loop_promiseletstopdaemon=matchdaemon.statuswith|Not_running->unit|Running{event_loop_promise=None;_}->invalid_arg"you cannot call Daemon.stop before Daemon.run returns"|Running{process;_}->letpid=Process.pidprocessinUnix.killpidSys.sigstop;unitletcontinuedaemon=matchdaemon.statuswith|Not_running->unit|Running{event_loop_promise=None;_}->invalid_arg"you cannot call Daemon.continue before Daemon.run returns"|Running{process;_}->letpid=Process.pidprocessinUnix.killpidSys.sigcont;unitletkilldaemon=matchdaemon.statuswith|Not_running->unit|Running{event_loop_promise=None;_}->invalid_arg"you cannot call Daemon.terminate before Daemon.run returns"|Running{process;event_loop_promise=Someevent_loop_promise;_}->Process.killprocess;event_loop_promiseletnext_name=ref1letfresh_name()=letindex=!next_nameinincrnext_name;X.base_default_name^string_of_intindexletnext_color=ref0letget_next_color()=letcolor=X.default_colors.(!next_colormodArray.lengthX.default_colors)inincrnext_color;colorlet()=Test.declare_reset_function@@fun()->next_name:=1;next_color:=0letcreate~path?runner?name?color?event_pipepersistent_state=letname=matchnamewithNone->fresh_name()|Somename->nameinletcolor=matchcolorwithNone->get_next_color()|Somecolor->colorinletevent_pipe=matchevent_pipewith|None->Temp.file?runner(name^"-event-pipe")|Somefile->filein{name;color;path;persistent_state;status=Not_running;event_pipe;stdout_handlers=[];stderr_handlers=[];persistent_event_handlers=[];one_shot_event_handlers=String_map.empty;}(** Takes the given JSON full event of the following form
and evaluates in an event using [<name>] and
[<value>]:
{[{
"fd-sink-item.v0": {
"time_stamp":<timestamp>
[...]
"event": { <name>:<value> }
}
}]}
If the given JSON does not match the right structure,
and in particular if the value of the field ["event"]
is not a one-field object, the function evaluates in
None. *)letget_event_from_full_eventjson=letevent=JSON.(json|->"fd-sink-item.v0"|->"event")inlettimestamp=JSON.(json|->"fd-sink-item.v0"|->"time_stamp"|>as_float)inmatchJSON.as_object_opteventwith|None|Some([]|_::_::_)->None|Some[(name,value)]->Some{name;value;timestamp}letread_json_eventdaemoneven_input=letmax_event_size=1024*1024(* 1MB *)inletorigin="event from "^daemon.nameinletbuff=Buffer.create256inletrecloop()=let*line=Lwt_io.read_line_opteven_inputinmatchlinewith|None->returnNone|Someline->(Buffer.add_stringbuffline;matchJSON.parse_opt~origin(Buffer.contentsbuff)with|NonewhenBuffer.lengthbuff>=max_event_size->Format.ksprintffailwith"Could not parse daemon %s event after %d bytes."daemon.namemax_event_size|None->loop()|Somejson->return(Somejson))inloop()lethandle_raw_eventdaemonjson=matchget_event_from_full_eventjsonwith|None->()|Some(raw_event:event)->(letname=raw_event.nameinList.iter(funhandler->handlerraw_event)daemon.persistent_event_handlers;(* Trigger one-shot events. *)matchString_map.find_optnamedaemon.one_shot_event_handlerswith|None->()|Someevents->(* Trigger matching events and accumulate others in [acc]. *)letrecloopacc=function|[]->daemon.one_shot_event_handlers<-String_map.addname(List.revacc)daemon.one_shot_event_handlers|(Event_handler{filter;resolver}ashead)::tail->letacc=matchfilterjsonwith|exceptionexn->Test.fail"uncaught exception in filter for event %s of daemon \
%s: %s"namedaemon.name(Printexc.to_stringexn)|None->head::acc|Somevalue->Lwt.wakeup_laterresolver(Somevalue);accinloopacctailinloop[]events)letrun?(env=String_map.empty)?runner?(on_terminate=fun_->unit)?(event_level=`Info)?(event_sections_levels=[])?(capture_stderr=false)daemonsession_statearguments=(matchdaemon.statuswith|Not_running->()|Running_->Test.fail"daemon %s is already running"daemon.name);(* Create the named pipe where the daemon will send its internal events in JSON. *)ifRunner.Sys.file_exists?runnerdaemon.event_pipethenRunner.Sys.remove?runnerdaemon.event_pipe;Runner.Sys.mkfifo?runner~perms:0o640daemon.event_pipe;(* Note: in the CI, it seems that if the daemon tries to open the
FIFO for writing before we opened it for reading, the
[Lwt.openfile] call (of the daemon, for writing) blocks
forever. So we need to make sure that we open the file before we
spawn the daemon. *)letevent_process=matchrunnerwith|None->None|Somerunner->letcmd="tail"inletarguments=["--follow";"--retry";daemon.event_pipe]inletname=Filename.basenamedaemon.event_pipeinletprocess=Process.spawn~name~runner~log_output:falsecmdargumentsinSomeprocessin(* The input is either the local pipe or the remote pipe. *)let*event_input=matchevent_processwith|None->Lwt_io.(open_file~mode:input)daemon.event_pipe|Someprocess->Lwt.return@@Process.stdoutprocessinletenv=letargs=List.fold_right(fun(prefix,level)args->sf"section-prefix=%s:%s"prefix(Level.to_stringlevel)::args)(("",(event_level:>Level.level))::event_sections_levels)[]inletargs_str="?"^String.concat"&"(List.revargs)inString_map.add"TEZOS_EVENTS_CONFIG"("file-descriptor-path://"^daemon.event_pipe^args_str)envinletprocess,stdin=Process.spawn_with_stdin?runner~name:daemon.name~color:daemon.color~envdaemon.pathargumentsin(* Make sure the daemon status is [Running], otherwise
[event_loop_promise] would stop immediately thinking the daemon
has been terminated. *)letrunning_status={process;session_state;stdin;event_loop_promise=None}indaemon.status<-Runningrunning_status;letevent_loop_promise=letrecevent_loop()=let*json=read_json_eventdaemonevent_inputinmatchjsonwith|Somejson->handle_raw_eventdaemonjson;event_loop()|None->(matchdaemon.statuswith|Not_running->(matchevent_processwith|None->Lwt_io.closeevent_input|Someprocess->Lwt.return@@Process.killprocess)|Running_->(* It can take a little while before the pipe is opened by the daemon,
and before that, reading from it yields end of file for some reason. *)let*()=Lwt_unix.sleep0.01inevent_loop())inletrecchannel_loopget_channelget_handlers()=let*channel_line=Lwt_io.read_line_opt(get_channelprocess)inmatchchannel_linewith|Someline->List.iter(funhandler->handlerline)(get_handlersdaemon);channel_loopget_channelget_handlers()|None->(matchdaemon.statuswith|Not_running->Lwt.return_unit|Running_->(* TODO: is the sleep necessary here? *)let*()=Lwt_unix.sleep0.01inchannel_loopget_channelget_handlers())inlet(and*!!)=lwt_both_fail_earlyinlet*()=event_loop()and*!!()=channel_loopProcess.stdout(fundaemon->daemon.stdout_handlers)()and*!!()=ifcapture_stderrthenchannel_loopProcess.stderr(fundaemon->daemon.stderr_handlers)()elseunitand*!!()=let*process_status=Process.waitprocessin(* Setting [daemon.status] to [Not_running] stops the event loop cleanly. *)daemon.status<-Not_running;(* Cancel one-shot event handlers. *)letpending=daemon.one_shot_event_handlersindaemon.one_shot_event_handlers<-String_map.empty;String_map.iter(fun_->List.iter(fun(Event_handler{resolver;_})->Lwt.wakeup_laterresolverNone))pending;on_terminateprocess_statusinunitinrunning_status.event_loop_promise<-Someevent_loop_promise;Background.registerevent_loop_promise;unitletwait_for_full?wheredaemonnamefilter=letpromise,resolver=Lwt.task()inletcurrent_events=String_map.find_optnamedaemon.one_shot_event_handlers|>Option.value~default:[]inLog.debug"Waiting for event [%s]%s"name(matchwherewithNone->""|Somewhere->" where "^where);daemon.one_shot_event_handlers<-String_map.addname(Event_handler{filter;resolver}::current_events)daemon.one_shot_event_handlers;let*result=promiseinmatchresultwith|None->raise(Terminated_before_event{daemon=daemon.name;event=name;where})|Somex->returnxletevent_from_full_event_filterfilterjson=letraw=get_event_from_full_eventjsonin(* If [json] does not match the correct JSON structure, it
will be filtered out, which will result in ignoring
the current event.
@see raw_event_from_event *)Option.bindraw(fun{value;_}->filtervalue)letwait_for?wheredaemonnamefilter=wait_for_full?wheredaemonname(event_from_full_event_filterfilter)leton_eventdaemonhandler=daemon.persistent_event_handlers<-handler::daemon.persistent_event_handlersleton_stdoutdaemonhandler=daemon.stdout_handlers<-handler::daemon.stdout_handlersleton_stderrdaemonhandler=daemon.stderr_handlers<-handler::daemon.stderr_handlersletlog_eventsdaemon=on_eventdaemon@@funevent->Log.info"[%s] Received event: %s = %s"daemon.nameevent.name(JSON.encodeevent.value)typeobserve_memory_consumption=Observeof(unit->intoptionLwt.t)letmemory_consumptiondaemon=letfrom_command~cmd~args~expect_failurer=letp=Process.spawn~log_output:truecmdargsinfun()->let*output=Process.check_and_read_stdout~expect_failurepinreturn(output=~*rexr)inletcannot_observe=return@@Observe(fun()->returnNone)inmatchdaemon.statuswith|Not_running->cannot_observe|Running{process;_}->(let*perf=Process.program_path"perf"inlet*heaptrack_print=Process.program_path"heaptrack_print"inmatch(perf,heaptrack_print)with|None,_|_,None->cannot_observe|Someperf,Someheaptrack_print->(tryletpid=Process.pidprocess|>string_of_intinletget_trace=from_command~cmd:perf~args:["stat";"-r";"5";"heaptrack";"-p";pid]~expect_failure:true".* heaptrack --analyze \"(.*)\""inreturn@@Observe(fun()->Process.killprocess;let*dump=get_trace()inmatchdumpwith|None->(*
[perf] may fail if [kernel.perf_event_paranoid] is set to
a permissive enough value. In this case, we cannot observe
memory consumption. We do not consider this situation as an
error because that's a too strong requirement on CI workers.
*)Log.warn"kernel.perf_event_paranoid is not permissive \
enough. Aborting memory observation.";returnNone|Somedump->(let*peak=from_command~cmd:heaptrack_print~args:[dump]~expect_failure:false"peak heap memory consumption: (\\d+\\.?\\d*\\w)"()inmatchOption.getpeak=~**rex"(\\d+\\.?\\d*)(\\w)"with|None->Test.fail"Invalid memory consumption format: %s\n"(matchpeakwith|None->"(empty)"|Somes->s)|Some(size,unit)->letfactor_of_unit=matchunitwith|"K"->1024|"M"->1024*1024|"G"->1024*1024*1024|_->1inletsize=int_of_float@@float_of_stringsize*.float_of_intfactor_of_unitinreturn@@Somesize))withexn->Test.fail"failed to set up memory consumption measurement: %s"(Printexc.to_stringexn)))endletn_events_revnfilter=ifn<=0theninvalid_arg"Base.n_events_rev: n must be > 0.";letacc=ref[]inletsize=ref0inletaccumulation_thresholdvalue=acc:=value::!acc;incrsize;if!size>=nthenSome!accelseNoneinletaccumulating_filterjson=Option.bind(filterjson)accumulation_thresholdinaccumulating_filterletn_eventsnfilter=letaccumulating_filter=n_events_revnfilterinletinverting_filterjson=Option.mapList.rev@@accumulating_filterjsonininverting_filterletnth_eventnfilter=letaccumulating_filter=n_events_revnfilterinletnth_filterjson=Option.mapList.hd@@accumulating_filterjsoninnth_filter