123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267(**************************************************************************)(* The OUnit library *)(* *)(* Copyright (C) 2002-2008 Maas-Maarten Zeeman. *)(* Copyright (C) 2010 OCamlCore SARL *)(* Copyright (C) 2013 Sylvain Le Gall *)(* *)(* The package OUnit is copyright by Maas-Maarten Zeeman, OCamlCore SARL *)(* and Sylvain Le Gall. *)(* *)(* Permission is hereby granted, free of charge, to any person obtaining *)(* a copy of this document and the OUnit software ("the Software"), to *)(* deal in the Software without restriction, including without limitation *)(* the rights to use, copy, modify, merge, publish, distribute, *)(* sublicense, and/or sell copies of the Software, and to permit persons *)(* to whom the Software is furnished to do so, subject to the following *)(* conditions: *)(* *)(* The above copyright notice and this permission notice shall be *)(* included in all copies or substantial portions of the Software. *)(* *)(* The Software is provided ``as is'', without warranty of any kind, *)(* express or implied, including but not limited to the warranties of *)(* merchantability, fitness for a particular purpose and noninfringement. *)(* In no event shall Maas-Maarten Zeeman be liable for any claim, damages *)(* or other liability, whether in an action of contract, tort or *)(* otherwise, arising from, out of or in connection with the Software or *)(* the use or other dealings in the software. *)(* *)(* See LICENSE.txt for details. *)(**************************************************************************)(** Use processes to run several tests in parallel.
*
* Run processes that handle running tests. The processes read test, execute
* it, and communicate back to the master the log.
*
* This need to be done in another process because OCaml Threads are not truly
* running in parallel. Moreover we cannot use Unix.fork because it's not
* portable
*)openUnixopenOUnitRunner.GenericWorkerletunix_fork=refUnix.fork(* Create functions to handle sending and receiving data over a file descriptor.
*)letmake_channelshard_idstring_of_read_messagestring_of_written_messagefd_readfd_write=let()=set_nonblockfd_read;set_close_on_execfd_read;set_close_on_execfd_writeinletchn_write=out_channel_of_descrfd_writeinletreally_readfdstr=letoff=ref0inletread=ref0inwhile!read<Bytes.lengthstrdotryletone_read=Unix.readfdstr!off(Bytes.lengthstr-!off)inread:=!read+one_read;off:=!off+one_readwithUnix_error(EAGAIN,_,_)->()done;strinletheader_str=Bytes.createMarshal.header_sizeinletsend_datamsg=Marshal.to_channelchn_writemsg[];Stdlib.flushchn_writeinletreceive_data()=tryletdata_size=Marshal.data_size(really_readfd_readheader_str)0inletdata_str=really_readfd_read(Bytes.createdata_size)inletmsg=(* TODO: use Marshal.from_bytes when OCaml requirement is > 4.01. *)Marshal.from_string(Bytes.unsafe_to_string(Bytes.catheader_strdata_str))0inmsgwithFailure(msg)->OUnitUtils.failwithf"Communication error with worker processes: %s"msginletclose()=close_outchn_write;inwrap_channelshard_idstring_of_read_messagestring_of_written_message{send_data=send_data;receive_data=receive_data;close=close}letprocesses_grace_period=OUnitConf.make_float"processes_grace_period"5.0"Delay to wait for a process to stop."letprocesses_kill_period=OUnitConf.make_float"processes_kill_period"5.0"Delay to wait for a process to stop after killing it."letrecselect_no_interruptread_descrswrite_descrsexcept_descrstimeout=iftimeout<0.0thenbegin[],[],[]endelsebegintryUnix.selectread_descrswrite_descrsexcept_descrs0.1withUnix.Unix_error(Unix.EINTR,"select","")->select_no_interruptread_descrswrite_descrsexcept_descrs(timeout-.0.1)endletcreate_worker~shard_id~master_id~worker_log_fileconfmap_test_cases=letsafe_closefd=tryclosefdwithUnix_error_->()inletpipe_read_from_worker,pipe_write_to_master=Unix.pipe()inletpipe_read_from_master,pipe_write_to_worker=Unix.pipe()inmatch!unix_fork()with|0->(* Child process. *)let()=safe_closepipe_read_from_worker;safe_closepipe_write_to_worker;(* stdin/stdout/stderr remain open and shared with master. *)()inletchannel=make_channelshard_idstring_of_message_to_workerstring_of_message_from_workerpipe_read_from_masterpipe_write_to_masterinmain_worker_loopconf~yield:ignorechannel~shard_idmap_test_cases~worker_log_file;channel.close();safe_closepipe_read_from_master;safe_closepipe_write_to_master;exit0|pid->letchannel=make_channelmaster_idstring_of_message_from_workerstring_of_message_to_workerpipe_read_from_workerpipe_write_to_workerinletrstatus=refNoneinletmsg_of_process_statusstatus=ifstatus=WEXITED0thenNoneelseSome(OUnitUtils.string_of_process_statusstatus)inletis_running()=match!rstatuswith|None->letpid,status=waitpid[WNOHANG]pidinifpid<>0thenbeginrstatus:=Somestatus;falseendelsebegintrueend|Some_->falseinletclose_worker()=letrecwait_endtimeout=iftimeout<0.0thenbeginfalse,Noneendelsebeginifis_running()thenlet_,_,_=select_no_interrupt[][][]0.1inwait_end(timeout-.0.1)elsematch!rstatuswith|Somestatus->true,msg_of_process_statusstatus|None->true,Noneendinletended,msg_opt=channel.close();safe_closepipe_read_from_worker;safe_closepipe_write_to_worker;(* Recovery for worker going wild and not dying. *)List.fold_left(fun(ended,msg_opt)signal->ifendedthenbeginended,msg_optendelsebeginkillpidsignal;wait_end(processes_kill_periodconf)end)(wait_end(processes_grace_periodconf))[15(* SIGTERM *);9(* SIGKILL *)]inifendedthenmsg_optelseSome(Printf.sprintf"unable to kill process %d"pid)in{channel=channel;close_worker=close_worker;select_fd=pipe_read_from_worker;shard_id=shard_id;is_running=is_running;}(* Filter running workers waiting data. *)letworkers_waiting~timeoutworkers=letworkers_fd_lst=List.rev_map(funworker->worker.select_fd)workersinletworkers_fd_waiting_lst,_,_=select_no_interruptworkers_fd_lst[][]timeoutinList.filter(funworkers->List.memqworkers.select_fdworkers_fd_waiting_lst)workersletinit()=ifSys.os_type="Unix"thenmatchSys.backend_typewith|Native|Bytecode->OUnitRunner.register"processes"100(runnercreate_workerworkers_waiting)|Other_->()