1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546modulePotpourri=structletget_option(v:'aoption):'a=matchvwith|None->assertfalse(*BISECT-IGNORE*)|Somev'->v'letmap_default_option(f:('a->'b))(default_value:'b)(opt:'aoption):'b=matchoptwith|Somev->fv|None->default_valueletpost_incr(v:intref):int=letres=!vinv:=!v+1;resletof_option(f:unit->'a):'aoption=trySome(f())with_->Noneletpp_list?first?last?sep(items:'alist)(string_of_item:'a->string)(formatter):unit=iffirst<>NonethenFormat.fprintfformatter(get_optionfirst)else();List.iter(funi->lets=string_of_itemiinFormat.fprintfformatter"%s"s;ifsep<>NonethenFormat.fprintfformatter(get_optionsep)else())items;iflast<>NonethenFormat.fprintfformatter(get_optionlast)else();endmoduleNode_id=structtypet={ip:Unix.inet_addroption;port:intoption;name:string;}letmake_local_nodename={ip=None;port=None;name}letmake_remote_nodeipStrportname={ip=Some(Unix.inet_addr_of_stringipStr);port=Someport;name}letis_localnodelocal_node=node.ip=local_node.ip&&node.port=local_node.portletget_name{name;_}=nameletprint_string_of_nodenodeformatter=letstring_of_ip=ifnode.ip=Nonethen"None"elseUnix.string_of_inet_addr@@Potpourri.get_optionnode.ipinletstring_of_port=ifnode.port=Nonethen"None"elsestring_of_int@@Potpourri.get_optionnode.portinFormat.fprintfformatter"{ip : %s ; port : %s ; name : %s}"string_of_ipstring_of_portnode.nameletget_ip{ip;_}=ipletget_port{port;_}=portendmoduleNode_id_seeded_hash_type=structtypet=Node_id.tletequal(n1:t)(n2:t):bool=(Node_id.get_ipn1,Node_id.get_portn1)=(Node_id.get_ipn2,Node_id.get_portn2)lethash(seed:int)(n:t):int=Hashtbl.seeded_hashseed(Node_id.get_ipn,Node_id.get_portn)endmoduleNode_id_hashtbl=Hashtbl.MakeSeeded(Node_id_seeded_hash_type)moduleProcess_id=structtypet={node:Node_id.t;proc_id:int;}letmakenidpid={node=nid;proc_id=pid}letmake_localnamenext_process_id={node=Node_id.make_local_nodename;proc_id=Potpourri.post_incrnext_process_id}letmake_remoteipStrportnamenext_process_id={node=Node_id.make_remote_nodeipStrportname;proc_id=Potpourri.post_incrnext_process_id}letis_local{node;_}local_node=Node_id.is_localnodelocal_nodeletget_node{node;_}=nodeletget_id{proc_id;_}=proc_idletprint_string_of_pidpformatter=Format.fprintfformatter"{node : ";Node_id.print_string_of_nodep.nodeformatter;Format.fprintfformatter" ; id : %d}"p.proc_idendmoduleProcess_id_seeed_hash_type=structtypet=Process_id.tletequal(p1:t)(p2:t):bool=letp1_ip=Node_id.get_ip@@Process_id.get_nodep1inletp1_port=Node_id.get_port@@Process_id.get_nodep1inletp1_id=Process_id.get_idp1inletp2_ip=Node_id.get_ip@@Process_id.get_nodep2inletp2_port=Node_id.get_port@@Process_id.get_nodep2inletp2_id=Process_id.get_idp2in(p1_ip,p1_port,p1_id)=(p2_ip,p2_port,p2_id)lethash(seed:int)(p:t):int=letp_ip=Node_id.get_ip@@Process_id.get_nodepinletp_port=Node_id.get_port@@Process_id.get_nodepinletp_id=Process_id.get_idpinHashtbl.seeded_hashseed(p_ip,p_port,p_id)endmoduleProcess_id_hashtbl=Hashtbl.MakeSeeded(Process_id_seeed_hash_type)moduletypeNonblock_io=sigtype'attype'astreamtypeinput_channeltypeoutput_channeltypeservertypelevel=Debug|Info|Warning|ErrorexceptionTimeoutvallib_name:stringvallib_version:stringvallib_description:stringvalreturn:'a->'atval(>>=):'at->('a->'bt)->'btvalfail:exn->'atvalcatch:(unit->'at)->(exn->'at)->'atvalasync:(unit->unitt)->unitvalcreate_stream:unit->'astream*('aoption->unit)valget:'astream->'aoptiontvalstream_append:'astream->'astream->'astreamvalclose_input:input_channel->unittvalclose_output:output_channel->unittvalread_value:input_channel->'atvalwrite_value:output_channel->?flags:Marshal.extern_flagslist->'a->unittvalopen_connection:Unix.sockaddr->(input_channel*output_channel)tvalestablish_server:?backlog:int->Unix.sockaddr->(Unix.sockaddr->input_channel*output_channel->unitt)->servertvalshutdown_server:server->unittvallog:level->(unit->string)->unittvalsleep:float->unittvaltimeout:float->'atvalpick:'atlist->'atvalat_exit:(unit->unitt)->unitendmoduletypeMessage_type=sigtypetvalstring_of_message:t->stringendmoduletypeProcess=sigexceptionInit_more_than_onceexceptionInvalidNodeofNode_id.texceptionLocal_only_modetype'aiotype'attypemessage_typetype'amatcher_listtypemonitor_reftypemonitor_reason=NormalofProcess_id.t|ExceptionofProcess_id.t*exn|UnkownNodeIdofProcess_id.t*Node_id.t|NoProcessofProcess_id.tmoduleRemote_config:sigtypet={remote_nodes:(string*int*string)list;local_port:int;connection_backlog:int;node_name:string;node_ip:string;}endmoduleLocal_config:sigtypet={node_name:string;}endtypenode_config=LocalofLocal_config.t|RemoteofRemote_config.tvalreturn:'a->'atval(>>=):'at->('a->'bt)->'btvalfail:exn->'atvalcatch:(unit->'at)->(exn->'at)->'atvalspawn:?monitor:bool->Node_id.t->(unit->unitt)->(Process_id.t*monitor_refoption)tvalcase:(message_type->(unit->'at)option)->'amatcher_listvaltermination_case:(monitor_reason->'at)->'amatcher_listval(|.):'amatcher_list->'amatcher_list->'amatcher_listvalreceive:?timeout_duration:float->'amatcher_list->'aoptiontvalreceive_loop:?timeout_duration:float->boolmatcher_list->unittvalsend:Process_id.t->message_type->unittval(>!):Process_id.t->message_type->unittvalbroadcast:Node_id.t->message_type->unittvalmonitor:Process_id.t->monitor_reftvalunmonitor:monitor_ref->unittvalget_self_pid:Process_id.ttvalget_self_node:Node_id.ttvalget_remote_node:string->Node_id.toptiontvalget_remote_nodes:Node_id.tlisttvaladd_remote_node:string->int->string->Node_id.ttvalremove_remote_node:Node_id.t->unittvallift_io:'aio->'atvalrun_node:?process:(unit->unitt)->node_config->unitioendmoduleMake(I:Nonblock_io)(M:Message_type):(Processwithtypemessage_type=M.tandtype'aio='aI.t)=structexceptionInit_more_than_onceexceptionInvalidNodeofNode_id.texceptionLocal_only_modetype'aio='aI.ttypemessage_type=M.ttypemonitor_ref=Monitor_Refofint*Process_id.t*Process_id.t(* unique id, the process doing the monitoring and the process being monitored *)typemonitor_reason=NormalofProcess_id.t|ExceptionofProcess_id.t*exn|UnkownNodeIdofProcess_id.t*Node_id.t|NoProcessofProcess_id.tmoduleRemote_config=structtypet={remote_nodes:(string*int*string)list;local_port:int;connection_backlog:int;node_name:string;node_ip:string;}endmoduleLocal_config=structtypet={node_name:string;}endtypenode_config=LocalofLocal_config.t|RemoteofRemote_config.tmoduleMonitor_ref_order_type=structtypet=monitor_refletcompare(Monitor_Ref(id1,_,_):t)(Monitor_Ref(id2,_,_):t):int=id1-id2endmoduleMonitor_ref_set=Set.Make(Monitor_ref_order_type)typemessage=DataofProcess_id.t*Process_id.t*message_type(* sending process id, receiving process id and the message *)|BroadcastofProcess_id.t*Node_id.t*message_type(* sending process id, receiving node and the message *)|Procofunitt*Process_id.t(* the process to be spawned elsewhere and the process that requested the spawning *)|Spawn_monitorofunitt*Process_id.t*Process_id.t(* the process to be spawned elsewhere, the monitoring process and the process that requested the spawning.*)|NodeofNode_id.t(* initial message sent to remote node to identify ourselves *)|ExitofProcess_id.t*monitor_reason(* process that was being monitored and the reason for termination *)|MonitorofProcess_id.t*Process_id.t*Process_id.t(* the process doing the monitoring and the id of the process to be monitored and the process that requested the monitoring *)|Unmonitorofmonitor_ref*Process_id.t(* process to unmonitor and the process that requested the unmonitor *)|Proc_resultofProcess_id.t*Process_id.t(* result of spawning a process and the receiver process id *)|Spawn_monitor_resultofmessageoption*monitor_ref*Process_id.t(* result of spawning and monitoring a process and the receiver process id *)|Monitor_resultofmessageoption*monitor_ref*Process_id.t(* result of monitor and the receiving process *)|Unmonitor_resultofmonitor_ref*Process_id.t(* monitor ref that was requested to be unmonitored and the receiving process *)andnode_state={mailboxes:(int,messageI.stream*(messageoption->unit))Hashtbl.t;remote_nodes:I.output_channelNode_id_hashtbl.t;monitor_table:Monitor_ref_set.tProcess_id_hashtbl.t;local_node:Node_id.t;monitor_ref_id:intref;config:Remote_config.toptionref;log_buffer:Buffer.t;log_formatter:Format.formatter;est_in_ch:I.input_channeloptionref;(* input channel from calling I.establish_server, store here so we can close when server exits*)est_out_ch:I.output_channeloptionref;(* output_channel channel from calling I.establish_server, store here so we can close when server exits*)node_server:I.serveroptionref;(* the server from calling I.establish_server, store here so we can close listening socket when server exits *)next_process_id:intref;}and'at=(node_state*Process_id.t)->(node_state*Process_id.t*'a)iotype'amatcher=(message->(unit->'at)option)type'amatcher_list=Matcherof'amatcher|Matchersof'amatcher*'amatcher_listletinitalised=reffalseletdist_lib_version="0.6.0"letprint_string_of_termination_reason(reason:monitor_reason)(formatter:Format.formatter):unit=matchreasonwith|Normalpid->Format.fprintfformatter"{termination reason : normal ; pid : ";Process_id.print_string_of_pidpidformatter;Format.fprintfformatter"}"|Exception(pid,e)->Format.fprintfformatter"{termination reason : exception %s ; pid : "(Printexc.to_stringe);Process_id.print_string_of_pidpidformatter;Format.fprintfformatter"}"|UnkownNodeId(pid,n)->Format.fprintfformatter"{termination reason : unknown node id ";Node_id.print_string_of_nodenformatter;Format.fprintfformatter" ; pid : ";Process_id.print_string_of_pidpidformatter;Format.fprintfformatter"}"|NoProcessp->Format.fprintfformatter"{termination reason : unknown process";Process_id.print_string_of_pidpformatter;Format.fprintfformatter"}"letprint_string_of_monitor_ref(Monitor_Ref(id,pid,monitee_pid))(formatter:Format.formatter):unit=Format.fprintfformatter"{id : %d ; monitor process : "id;Process_id.print_string_of_pidpidformatter;Format.fprintfformatter" ; monitee process : ";Process_id.print_string_of_pidmonitee_pidformatter;Format.fprintfformatter"}"letprint_string_of_monitor_notification(Monitor_Ref(id,pid,monitee_pid))(reason:monitor_reason)(formatter:Format.formatter):unit=Format.fprintfformatter"{id : %d ; monitor process : "id;Process_id.print_string_of_pidpidformatter;Format.fprintfformatter" ; monitee process";Process_id.print_string_of_pidmonitee_pidformatter;Format.fprintfformatter" ; reason : ";print_string_of_termination_reasonreasonformatter;Format.fprintfformatter"}"letrecprint_string_of_message(m:message)(formatter:Format.formatter):unit=matchmwith|Data(sender,recver,msg)->beginFormat.fprintfformatter"Data : {sender pid : ";Process_id.print_string_of_pidsenderformatter;Format.fprintfformatter" ; receiver pid : ";Process_id.print_string_of_pidrecverformatter;Format.fprintfformatter" ; message : %s}"(M.string_of_messagemsg);end|Broadcast(sender,recv_node,msg)->beginFormat.fprintfformatter"Broadcast : {sender pid : ";Process_id.print_string_of_pidsenderformatter;Format.fprintfformatter" ; receiver node : ";Node_id.print_string_of_noderecv_nodeformatter;Format.fprintfformatter" ; message : %s}"(M.string_of_messagemsg);end|Proc(_,sender_pid)->beginFormat.fprintfformatter"Proc { <process> ; sender pid : ";Process_id.print_string_of_pidsender_pidformatter;end|Spawn_monitor(_,pid,sender)->beginFormat.fprintfformatter"Spawn and monitor {<process> ; monitor pid : ";Process_id.print_string_of_pidpidformatter;Format.fprintfformatter" ; sender pid ";Process_id.print_string_of_pidsenderformatter;Format.fprintfformatter"}";end|Nodenid->beginFormat.fprintfformatter"Node ";Node_id.print_string_of_nodenidformatter;end|Exit(pid,mreason)->beginFormat.fprintfformatter"Exit : {exit pid : ";Process_id.print_string_of_pidpidformatter;Format.fprintfformatter" ; reason : ";print_string_of_termination_reasonmreasonformatter;Format.fprintfformatter"}";end|Monitor(monitor_pid,monitee_pid,sender)->beginFormat.fprintfformatter"Monitor : {monitor pid : ";Process_id.print_string_of_pidmonitor_pidformatter;Format.fprintfformatter" ; monitee pid : ";Process_id.print_string_of_pidmonitee_pidformatter;Format.fprintfformatter" ; sender pid : ";Process_id.print_string_of_pidsenderformatter;Format.fprintfformatter"}";end|Unmonitor(mref,sender)->beginFormat.fprintfformatter"Unmonitor : {monitor reference to unmonitor : ";print_string_of_monitor_refmrefformatter;Format.fprintfformatter" ; sender pid : ";Process_id.print_string_of_pidsenderformatter;Format.fprintfformatter"}";end|Proc_result(pid,recv_pid)->beginFormat.fprintfformatter"Proc result {spawned pid : ";Process_id.print_string_of_pidpidformatter;Format.fprintfformatter" ; receiver pid : ";Process_id.print_string_of_pidrecv_pidformatter;Format.fprintfformatter"}";end|Spawn_monitor_result(monitor_msg,monitor_res,receiver)->beginFormat.fprintfformatter"Spawn and monitor result {monitor message : ";Potpourri.map_default_option(funm->print_string_of_messagemformatter)()monitor_msg;Format.fprintfformatter" ; monitor result : ";print_string_of_monitor_refmonitor_resformatter;Format.fprintfformatter" : receiver pid : ";Process_id.print_string_of_pidreceiverformatter;Format.fprintfformatter"}";end|Monitor_result(monitor_msg,monitor_res,receiver)->beginFormat.fprintfformatter"Monitor result {monitor message : ";Potpourri.map_default_option(funm->print_string_of_messagemformatter)()monitor_msg;Format.fprintfformatter" ; monitor result : ";print_string_of_monitor_refmonitor_resformatter;Format.fprintfformatter" ; receiver pid : ";Process_id.print_string_of_pidreceiverformatter;Format.fprintfformatter"}";end|Unmonitor_result(mref,pid)->beginFormat.fprintfformatter"Unmonitor result : {monittor reference to unmonitor: ";print_string_of_monitor_refmrefformatter;Format.fprintfformatter" ; receiver pid : ";Process_id.print_string_of_pidpidformatter;Format.fprintfformatter"}";endletprint_string_of_config(c:node_config)(formatter:Format.formatter):unit=matchcwith|Locall->Format.fprintfformatter"{node type : local ; node name : %s}"l.Local_config.node_name|Remoter->letprint_remote_nodes()=Potpourri.pp_list~first:"["~last:"]"~sep:";"r.Remote_config.remote_nodes(fun(ip,port,name)->Format.sprintf"%s:%d, name : %s"ipportname)formatterinbeginFormat.fprintfformatter"{node type : remote ; remote nodes : ";print_remote_nodes();Format.fprintfformatter" ; local port : %d ; connection backlog : %d ; \
node name : %s ; node ip : %s}"r.Remote_config.local_portr.Remote_config.connection_backlogr.Remote_config.node_namer.Remote_config.node_ipendletlog_msg(ns:node_state)(level:I.level)?exn(action:string)?pid(details:unit->unit):unitI.t=lettime_str()=lettime_float=Unix.gettimeofday()inlettime_record=Unix.gmtimetime_floatinFormat.fprintfns.log_formatter"[%d-%02d-%02d-%02d:%02d:%02d:%03.0f]"(1900+time_record.Unix.tm_year)(1+time_record.Unix.tm_mon)time_record.Unix.tm_mdaytime_record.Unix.tm_hourtime_record.Unix.tm_mintime_record.Unix.tm_sec(mod_float(time_float*.1000.)1000.)inletbacktrace_str()=ifPrintexc.backtrace_status()thenPrintexc.get_backtrace()else""inletstr_of_log_bugger()=letstr_contents=Buffer.contentsns.log_bufferinBuffer.resetns.log_buffer;str_contentsinletprint_log_msg()=time_str();Format.fprintfns.log_formatter"[Node : ";Node_id.print_string_of_nodens.local_nodens.log_formatter;beginmatchpidwith|None->()|Somepid'->Format.fprintfns.log_formatter"|Process : %d"pid'end;Format.fprintfns.log_formatter"] [Action : %s] [Details : "action;details();Format.fprintfns.log_formatter"]";matchexnwith|None->()|Someexn'->Format.fprintfns.log_formatter" [Exception : %s] [Backtrace : %s]"(Printexc.to_stringexn')(backtrace_str())inI.loglevel(fun()->print_log_msg();str_of_log_bugger())letsafe_close_channel(ns:node_state)(ch:[`OutofI.output_channel|`InofI.input_channel])(action:string)(details:unit->unit):unitI.t=letopenIincatch(fun()->matchchwith|`Outout_ch->close_outputout_ch|`Inin_ch->close_inputin_ch)(fune->log_msgnsWarning~exn:eactiondetails)letat_exit_handlerns()=letopenIinlog_msgnsInfo"at exit handler"(fun()->Format.fprintfns.log_formatter"at exit handler started")>>=fun()->beginmatch!(ns.node_server)with|Someserv->catch(fun()->shutdown_serverserv)(funexn->log_msgnsWarning~exn"node shutting down"(fun()->Format.fprintfns.log_formatter"error while shutting down server"))|None->return()end>>=fun()->beginmatch!(ns.est_in_ch),!(ns.est_out_ch)with|Someest_in,None->safe_close_channelns(`Inest_in)"node shutting down"(fun()->Format.fprintfns.log_formatter"error while closing remote connection")|None,Someest_out->safe_close_channelns(`Outest_out)"node shutting down"(fun()->Format.fprintfns.log_formatter"error while closing remote connection")|Someest_in,Someest_out->safe_close_channelns(`Inest_in)"node shutting down"(fun()->Format.fprintfns.log_formatter"error while closing remote connection")>>=fun_->safe_close_channelns(`Outest_out)"node shutting down"(fun()->Format.fprintfns.log_formatter"error while closing remote connection")|_->return()end>>=fun()->Node_id_hashtbl.fold(fun_out_ch_->safe_close_channelns(`Outout_ch)"node shutting down"(fun()->Format.fprintfns.log_formatter"error while closing remote connection"))ns.remote_nodes(return())>>=fun()->log_msgnsInfo"at exit handler"(fun()->Format.fprintfns.log_formatter"at exit handler finished")letreturn(v:'a):'at=fun(ns,pid)->I.return(ns,pid,v)let(>>=)(p:'at)(f:'a->'bt):'bt=fun(ns,pid)->I.(p(ns,pid)>>=fun(ns',pid',v)->(fv)(ns',pid'))letfail(e:exn):'at=fun_->I.faileletcatch(p:(unit->'at))(handler:(exn->'at)):'at=fun(ns,pid)->I.catch(fun()->(p())(ns,pid))(fune->(handlere)(ns,pid))letlift_io(io_comp:'aio):'at=fun(ns,pid)->I.(io_comp>>=funres->return(ns,pid,res))letsend_monitor_response(ns:node_state)(monitors:Monitor_ref_set.toption)(termination_reason:monitor_reason):unitio=letopenIinletsend_monitor_response_local(Monitor_Ref(_,pid,_))=match(Potpourri.of_option@@fun()->Hashtbl.findns.mailboxes(Process_id.get_idpid))with|None->return()|Some(_,push_fn)->return@@push_fn@@Some(Exit(pid,termination_reason))inletsend_monitor_response_remote(Monitor_Ref(_,monitoring_process,monitored_process)asmref)=catch(fun()->match(Potpourri.of_option@@fun()->Node_id_hashtbl.findns.remote_nodes(Process_id.get_nodemonitoring_process))with|None->log_msgnsInfo"sending remote monitor notification"(fun()->Format.fprintfns.log_formatter"monitor reference ";print_string_of_monitor_refmrefns.log_formatter;Format.fprintfns.log_formatter" remote node ";Node_id.print_string_of_node(Process_id.get_nodemonitoring_process)ns.log_formatter;Format.fprintfns.log_formatter" is down, skipping sending monitor message")|Someout_ch->write_valueout_ch(Exit(monitored_process,termination_reason))>>=fun()->log_msgnsInfo"sending remote monitor notification"(fun()->Format.fprintfns.log_formatter"sent monitor notification for monitor ref ";print_string_of_monitor_refmrefns.log_formatter;Format.fprintfns.log_formatter" to remote node ";Node_id.print_string_of_node(Process_id.get_nodemonitoring_process)ns.log_formatter))(fune->log_msgns~exn:eError"sending remote monitor notification"(fun()->Format.fprintfns.log_formatter"monitor reference ";print_string_of_monitor_refmrefns.log_formatter;Format.fprintfns.log_formatter", error sending monitor message to remote node ";Node_id.print_string_of_node(Process_id.get_nodemonitoring_process)ns.log_formatter;Format.fprintfns.log_formatter", removing node")>>=fun()->return@@Node_id_hashtbl.removens.remote_nodes(Process_id.get_nodemonitoring_process))inletiter_fn(Monitor_Ref(_,pid,_)asmref)_=ifProcess_id.is_localpidns.local_nodethensend_monitor_response_localmref>>=fun()->log_msgnsDebug"sent local monitor notification"(fun()->print_string_of_monitor_notificationmreftermination_reasonns.log_formatter)elselog_msgnsDebug"start sending remote monitor notification"(fun()->Format.fprintfns.log_formatter"monitor reference : ";print_string_of_monitor_notificationmreftermination_reasonns.log_formatter)>>=fun()->send_monitor_response_remotemref>>=fun()->log_msgnsDebug"finished sending remote monitor notification"(fun()->Format.fprintfns.log_formatter"monitor reference : ";print_string_of_monitor_notificationmreftermination_reasonns.log_formatter)inmatchmonitorswith|None->return()|Somemonitors'->Monitor_ref_set.folditer_fnmonitors'(return())letrun_process'(ns:node_state)(pid:Process_id.t)(p:unitt):unitio=letopenIincatch(fun()->log_msgnsDebug"starting process"(fun()->Process_id.print_string_of_pidpidns.log_formatter)>>=fun()->p(ns,pid)>>=fun_->log_msgnsDebug"process terminated successfully"(fun()->Process_id.print_string_of_pidpidns.log_formatter)>>=fun()->send_monitor_responsens((Potpourri.of_option@@fun()->Process_id_hashtbl.findns.monitor_tablepid))(Normalpid)>>=fun()->Process_id_hashtbl.removens.monitor_tablepid;return@@Hashtbl.removens.mailboxes(Process_id.get_idpid);)(fune->log_msgns~exn:eError"process failed with error"(fun()->Process_id.print_string_of_pidpidns.log_formatter)>>=fun()->Hashtbl.removens.mailboxes(Process_id.get_idpid);beginmatchewith|InvalidNoden->send_monitor_responsens((Potpourri.of_option@@fun()->Process_id_hashtbl.findns.monitor_tablepid))(UnkownNodeId(pid,n))|_->send_monitor_responsens((Potpourri.of_option@@fun()->Process_id_hashtbl.findns.monitor_tablepid))(Exception(pid,e))end>>=fun()->return@@Process_id_hashtbl.removens.monitor_tablepid)letsync_sendpidns?flagsout_chmsg_create_fnresponse_fn=letopenIinletremote_config=Potpourri.get_option!(ns.config)inletnew_pid=Process_id.make_remoteremote_config.Remote_config.node_ipremote_config.Remote_config.local_portremote_config.Remote_config.node_namens.next_process_idinletnew_mailbox,push_fn=I.create_stream()inHashtbl.replacens.mailboxes(Process_id.get_idnew_pid)(new_mailbox,push_fn);letmsg_to_send=msg_create_fnnew_pidinlog_msgns~pidDebug"sync send start"(fun()->Format.fprintfns.log_formatter"created new process ";Process_id.print_string_of_pidnew_pidns.log_formatter;Format.fprintfns.log_formatter" for sync send of ";print_string_of_messagemsg_to_sendns.log_formatter)>>=fun()->write_valueout_ch?flagsmsg_to_send>>=fun()->getnew_mailbox>>=funresult_pid->Hashtbl.removens.mailboxes(Process_id.get_idnew_pid);log_msgns~pidDebug"sync send end"(fun()->Format.fprintfns.log_formatter"process ";Process_id.print_string_of_pidnew_pidns.log_formatter;Format.fprintfns.log_formatter" finished for sync send of ";print_string_of_messagemsg_to_sendns.log_formatter)>>=fun()->response_fn(Potpourri.get_optionresult_pid)(* we do not send None on mailboxes *)letmonitor_helper(ns:node_state)(monitor_pid:Process_id.t)(monitee_pid:Process_id.t):(messageoption*monitor_ref)=letnew_monitor_ref=Monitor_Ref(Potpourri.post_incrns.monitor_ref_id,monitor_pid,monitee_pid)inmatch(Potpourri.of_option@@fun()->Hashtbl.findns.mailboxes(Process_id.get_idmonitee_pid))with|None->(Some(Exit(monitee_pid,NoProcessmonitee_pid)),new_monitor_ref)|Some_->beginmatch(Potpourri.of_option@@fun()->Process_id_hashtbl.findns.monitor_tablemonitee_pid)with|None->Process_id_hashtbl.addns.monitor_tablemonitee_pid(Monitor_ref_set.of_list[new_monitor_ref])|Somecurr_monitor_set->Process_id_hashtbl.replacens.monitor_tablemonitee_pid(Monitor_ref_set.addnew_monitor_refcurr_monitor_set)end;(None,new_monitor_ref)letmonitor_response_handler(ns:node_state)(res:messageoption*monitor_ref):monitor_ref=matchreswith|(Somemsg,(Monitor_Ref(_,monitor_pid,_)asmref))->let_,push_fn=Hashtbl.findns.mailboxes(Process_id.get_idmonitor_pid)in(* process is currently running so mailbox must be present *)push_fn@@Somemsg;mref|(None,(Monitor_Ref(_,_,monitee_pid)asmref))->beginmatch(Potpourri.of_option@@fun()->Process_id_hashtbl.findns.monitor_tablemonitee_pid)with|None->Process_id_hashtbl.addns.monitor_tablemonitee_pid(Monitor_ref_set.of_list[mref])|Somecurr_monitor_set->Process_id_hashtbl.replacens.monitor_tablemonitee_pid(Monitor_ref_set.addmrefcurr_monitor_set)end;mrefletmonitor_local(ns:node_state)(monitor_pid:Process_id.t)(monitee_pid:Process_id.t):monitor_ref=monitor_response_handlerns@@monitor_helpernsmonitor_pidmonitee_pidletmake_new_pid(node_to_spwan_on:Node_id.t)(ns:node_state):Process_id.t=if!(ns.config)=NonethenProcess_id.make_local(Node_id.get_namenode_to_spwan_on)ns.next_process_idelseletremote_config=Potpourri.get_option!(ns.config)inProcess_id.make_remoteremote_config.Remote_config.node_ipremote_config.Remote_config.local_portremote_config.Remote_config.node_namens.next_process_idletspawn?(monitor=false)(node_id:Node_id.t)(p:(unit->unitt)):(Process_id.t*monitor_refoption)t=letopenIinfun(ns,pid)->ifNode_id.is_localnode_idns.local_nodethenletnew_pid=make_new_pidnode_idnsinHashtbl.replacens.mailboxes(Process_id.get_idnew_pid)(I.create_stream());ifmonitorthenbeginletmonitor_res=monitor_localnspidnew_pidinasync(fun()->run_process'nsnew_pid(p()));log_msg~pid:(Process_id.get_idpid)nsDebug"spawned and monitored local process"(fun()->Format.fprintfns.log_formatter"result pid ";Process_id.print_string_of_pidnew_pidns.log_formatter;Format.fprintfns.log_formatter", result monitor reference : ";print_string_of_monitor_refmonitor_resns.log_formatter)>>=fun()->return(ns,pid,(new_pid,Somemonitor_res))endelsebeginasync(fun()->run_process'nsnew_pid(p()));log_msg~pid:(Process_id.get_idpid)nsDebug"spawned local process"(fun()->Format.fprintfns.log_formatter"result pid ";Process_id.print_string_of_pidnew_pidns.log_formatter)>>=fun()->return(ns,pid,(new_pid,None))endelsematch(Potpourri.of_option@@fun()->Node_id_hashtbl.findns.remote_nodesnode_id)with|Someout_ch->ifmonitorthenbeginlog_msg~pid:(Process_id.get_idpid)nsDebug"spawning and monitoring remote process"(fun()->Format.fprintfns.log_formatter"on remote node ";Node_id.print_string_of_nodenode_idns.log_formatter;Format.fprintfns.log_formatter", local process ";Process_id.print_string_of_pidpidns.log_formatter)>>=fun()->sync_send(Process_id.get_idpid)ns~flags:[Marshal.Closures]out_ch(funreceiver_pid->(Spawn_monitor(p(),pid,receiver_pid)))(funres->letMonitor_Ref(_,_,monitored_proc)asmref=matchreswithSpawn_monitor_result(_,mr,_)->mr|_->assertfalsein(*BISECT-IGNORE*)log_msg~pid:(Process_id.get_idpid)nsDebug"spawned and monitored remote process"(fun()->Format.fprintfns.log_formatter"spawned on remote node ";Node_id.print_string_of_nodenode_idns.log_formatter;Format.fprintfns.log_formatter" : result pid ";Process_id.print_string_of_pidmonitored_procns.log_formatter;Format.fprintfns.log_formatter", result monitor reference : ";print_string_of_monitor_refmrefns.log_formatter)>>=fun()->return(ns,pid,(monitored_proc,Somemref)))endelsebeginlog_msg~pid:(Process_id.get_idpid)nsDebug"spawning remote process"(fun()->Format.fprintfns.log_formatter"on remote node ";Node_id.print_string_of_nodenode_idns.log_formatter;Format.fprintfns.log_formatter", local process ";Process_id.print_string_of_pidpidns.log_formatter)>>=fun()->sync_send(Process_id.get_idpid)ns~flags:[Marshal.Closures]out_ch(funreceiver_pid->(Proc(p(),receiver_pid)))(funres->letremote_proc_pid=matchreswithProc_result(r,_)->r|_->assertfalsein(*BISECT-IGNORE*)log_msg~pid:(Process_id.get_idpid)nsDebug"spawned remote process"(fun()->Format.fprintfns.log_formatter"on remote node ";Node_id.print_string_of_nodenode_idns.log_formatter;Format.fprintfns.log_formatter" : result pid ";Process_id.print_string_of_pidremote_proc_pidns.log_formatter)>>=fun()->return(ns,pid,(remote_proc_pid,None)))end|None->beginlog_msg~pid:(Process_id.get_idpid)nsError"failed to spawn process on remote node"(fun()->Format.fprintfns.log_formatter"remote node ";Node_id.print_string_of_nodenode_idns.log_formatter;Format.fprintfns.log_formatter", is unknown, local process ";Process_id.print_string_of_pidpidns.log_formatter)>>=fun()->fail@@InvalidNodenode_idendletcase(match_fn:(message_type->(unit->'at)option)):'amatcher_list=letmatcher=function|Data(_,_,msg)->match_fnmsg|_->NoneinMatchermatcherlettermination_case(handler_fn:(monitor_reason->'at)):'amatcher_list=letmatcher=function|Exit(_,reason)->Some(fun()->handler_fnreason)|_->NoneinMatchermatcherletrec(|.)(first_matchers:'amatcher_list)(second_matchers:'amatcher_list):'amatcher_list=matchfirst_matchers,second_matcherswith|Matchermatcher,Matchermatcher'->Matchers(matcher,Matchermatcher')|Matchermatcher,Matchers(matcher',matchers)->Matchers(matcher,Matchers(matcher',matchers))|Matchers(matcher,matchers),matchers'->Matchers(matcher,matchers|.matchers')letreceive?timeout_duration(matchers:'amatcher_list):'aoptiont=letopenIinlettemp_stream,temp_push_fn=create_stream()inletresult=refNoneinletmailbox_cleaned_up=reffalseinletrestore_mailboxnspid=mailbox_cleaned_up:=true;letmailbox',old_push_fn=Hashtbl.findns.mailboxes(Process_id.get_idpid)intemp_push_fnNone;(* close new stream so we can append new and old *)Hashtbl.replacens.mailboxes(Process_id.get_idpid)(stream_appendtemp_streammailbox',old_push_fn)inlettest_matchnspidmatchercandidate_msgno_match_fn=matchmatchercandidate_msgwith|None->no_match_fn()|Somefn->beginrestore_mailboxnspid;result:=Some(fn());trueendinletreciter_fnnspidmatch_fnscandidate_msg=matchmatch_fnswith|Matchermatcher->test_matchnspidmatchercandidate_msg(fun()->(temp_push_fn(Somecandidate_msg));false)|Matchers(matcher,xs)->test_matchnspidmatchercandidate_msg(fun()->iter_fnnspidxscandidate_msg)inletreciter_streamiter_fnstream=getstream>>=funv->ifiter_fn(Potpourri.get_optionv)thenreturn()elseiter_streamiter_fnstreamin(* a None is never sent, see send function below. *)letdo_receive_blocking(ns,pid)=letmailbox,_=Hashtbl.findns.mailboxes(Process_id.get_idpid)initer_stream(iter_fnnspidmatchers)mailbox>>=fun()->(Potpourri.get_option!result)(ns,pid)>>=fun(ns',pid',result')->return(ns',pid',Someresult')infun(ns,pid)->matchtimeout_durationwith|None->catch(fun()->log_msg~pid:(Process_id.get_idpid)nsDebug"receiving with no time out"(fun()->Format.fprintfns.log_formatter"receiver process ";Process_id.print_string_of_pidpidns.log_formatter)>>=fun()->do_receive_blocking(ns,pid)>>=fun(ns',pid',res)->log_msg~pid:(Process_id.get_idpid)nsDebug"successfully received and processed message with no time out"(fun()->Format.fprintfns.log_formatter"receiver process ";Process_id.print_string_of_pidpidns.log_formatter)>>=fun()->return(ns',pid',res))(fune->ifnot!mailbox_cleaned_upthenrestore_mailboxnspidelse();log_msg~pid:(Process_id.get_idpid)ns~exn:eError"receiving with no time out failed"(fun()->Format.fprintfns.log_formatter"receiver process ";Process_id.print_string_of_pidpidns.log_formatter;Format.fprintfns.log_formatter", encountred exception")>>=fun()->faile)|Sometimeout_duration'->log_msg~pid:(Process_id.get_idpid)nsDebug"receiving with time out"(fun()->Format.fprintfns.log_formatter"receiver process ";Process_id.print_string_of_pidpidns.log_formatter;Format.fprintfns.log_formatter", time out %f"timeout_duration')>>=fun()->catch(fun()->pick[do_receive_blocking(ns,pid);timeouttimeout_duration']>>=fun(ns',pid',res)->log_msgns~pid:(Process_id.get_idpid)Debug"successfully received and processed a message with time out"(fun()->Format.fprintfns.log_formatter"receiver process ";Process_id.print_string_of_pidpidns.log_formatter;Format.fprintfns.log_formatter", time out %f"timeout_duration')>>=fun()->return(ns',pid',res))(fune->ifnot!mailbox_cleaned_upthenrestore_mailboxnspidelse();matchewith|Timeout->beginlog_msg~pid:(Process_id.get_idpid)nsDebug"receive timed out"(fun()->Format.fprintfns.log_formatter"receiver process ";Process_id.print_string_of_pidpidns.log_formatter;Format.fprintfns.log_formatter", time out %f"timeout_duration')>>=fun()->return(ns,pid,None)end|e->log_msg~pid:(Process_id.get_idpid)ns~exn:eError"receiving with time out failed"(fun()->Format.fprintfns.log_formatter"receiver process ";Process_id.print_string_of_pidpidns.log_formatter;Format.fprintfns.log_formatter", time out %f"timeout_duration')>>=fun()->faile)letrecreceive_loop?timeout_duration(matchers:boolmatcher_list):unitt=letopenIinfun(ns,pid)->(receive?timeout_durationmatchers)(ns,pid)>>=fun(ns',pid',res)->matchreswith|None|Somefalse->return(ns',pid',())|Sometrue->(receive_loop?timeout_durationmatchers)(ns',pid')letsend_to_remote_node_helper(pid:int)(ns:node_state)(node:Node_id.t)(sending_log_action:string)(print_sending_log_msg:unit->unit)(print_unknown_node_msg:unit->unit)(msg:message):unitI.t=letopenIinmatch(Potpourri.of_option@@fun()->Node_id_hashtbl.findns.remote_nodesnode)with|Someremote_output->log_msgns~pidDebugsending_log_action(fun()->print_sending_log_msg())>>=fun()->write_value~flags:[Marshal.Closures]remote_outputmsg(* marshal because the message could be a function *)|None->beginlog_msgns~pidErrorsending_log_action(fun()->print_unknown_node_msg())>>=fun()->fail@@InvalidNodenodeendletsend(remote_pid:Process_id.t)(msg:message_type):unitt=letopenIinfun(ns,pid)->ifProcess_id.is_localremote_pidns.local_nodethenmatch(Potpourri.of_option@@fun()->Hashtbl.findns.mailboxes(Process_id.get_idremote_pid))with|None->log_msgns~pid:(Process_id.get_idpid)I.Warning"unable to send message to local process"(fun()->Format.fprintfns.log_formatter"message : %s, to unknown local process: "(M.string_of_messagemsg);Process_id.print_string_of_pidremote_pidns.log_formatter;Format.fprintfns.log_formatter", from local process: ";Process_id.print_string_of_pidpidns.log_formatter)>>=fun()->return(ns,pid,())|Some(_,push_fn)->log_msgns~pid:(Process_id.get_idpid)I.Debug"successfully sent message to local process"(fun()->Format.fprintfns.log_formatter"message : %s, to local process: "(M.string_of_messagemsg);Process_id.print_string_of_pidremote_pidns.log_formatter;Format.fprintfns.log_formatter", from local process: ";Process_id.print_string_of_pidpidns.log_formatter)>>=fun()->return@@(ns,pid,push_fn@@Some(Data(pid,remote_pid,msg)))elseletsending_msg()=Format.fprintfns.log_formatter"message : %s, to remote process: "(M.string_of_messagemsg);Process_id.print_string_of_pidremote_pidns.log_formatter;Format.fprintfns.log_formatter", from local process: ";Process_id.print_string_of_pidpidns.log_formatterinletunknown_node_msg()=Format.fprintfns.log_formatter"message : %s, to unknown remote process: "(M.string_of_messagemsg);Process_id.print_string_of_pidremote_pidns.log_formatter;Format.fprintfns.log_formatter", from local process: ";Process_id.print_string_of_pidpidns.log_formatterinsend_to_remote_node_helper(Process_id.get_idpid)ns(Process_id.get_noderemote_pid)"sending message to remote process"sending_msgunknown_node_msg(Data(pid,remote_pid,msg))>>=fun()->log_msgns~pid:(Process_id.get_idpid)I.Debug"successfully sent message to remote process"(fun()->Format.fprintfns.log_formatter"message : %s, to remote process: "(M.string_of_messagemsg);Process_id.print_string_of_pidremote_pidns.log_formatter;Format.fprintfns.log_formatter", from local process: ";Process_id.print_string_of_pidpidns.log_formatter)>>=fun()->return(ns,pid,())let(>!)(pid:Process_id.t)(msg:message_type):unitt=sendpidmsgletbroadcast_local?pid(ns:node_state)(sending_pid:Process_id.t)(m:message_type):unitio=letopenIinHashtbl.fold(funrecev_pid(_,push_fn)_->letrecev_pid'=Process_id.makens.local_noderecev_pidinifrecev_pid'=sending_pidthenreturn()elselog_msg?pidnsI.Debug"broadcast"(fun()->Format.fprintfns.log_formatter"sending message %s to local process "(M.string_of_messagem);Process_id.print_string_of_pidrecev_pid'ns.log_formatter;Format.fprintfns.log_formatter" from process ";Process_id.print_string_of_pidsending_pidns.log_formatter;Format.fprintfns.log_formatter" as result of broadcast request";)>>=fun()->return@@push_fn@@Some(Data(sending_pid,recev_pid',m)))ns.mailboxes(return())letbroadcast(node:Node_id.t)(m:message_type):unitt=letopenIinfun(ns,pid)->ifNode_id.is_localnodens.local_nodethenbeginlog_msg~pid:(Process_id.get_idpid)nsI.Debug"broadcast"(fun()->Format.fprintfns.log_formatter"sending broadcast message %s to local processes running on local node "(M.string_of_messagem);Node_id.print_string_of_nodenodens.log_formatter;Format.fprintfns.log_formatter" from local process ";Process_id.print_string_of_pidpidns.log_formatter)>>=fun()->broadcast_localnspidm>>=fun()->return(ns,pid,())endelseletsending_msg()=Format.fprintfns.log_formatter"Process ";Process_id.print_string_of_pidpidns.log_formatter;Format.fprintfns.log_formatter" is sending broadcast message %s to remote node "(M.string_of_messagem);Node_id.print_string_of_nodenodens.log_formatterinletunknwon_node_msg()=Format.fprintfns.log_formatter"Process ";Process_id.print_string_of_pidpidns.log_formatter;Format.fprintfns.log_formatter" failed to send broadcast message %s to remote node "(M.string_of_messagem);Node_id.print_string_of_nodenodens.log_formatter;Format.fprintfns.log_formatter", remote node is unknown"insend_to_remote_node_helper(Process_id.get_idpid)nsnode"broadcasting to remote node"sending_msgunknwon_node_msg(Broadcast(pid,node,m))>>=fun()->log_msgns~pid:(Process_id.get_idpid)I.Debug"successfully sent broadcast message to remote node"(fun()->Format.fprintfns.log_formatter"message : %s, to remote node: "(M.string_of_messagem);Node_id.print_string_of_nodenodens.log_formatter)>>=fun()->return(ns,pid,())letlookup_node_and_send(pid:int)(ns:node_state)(receiver_process:Process_id.t)(action:string)(unknown_node_msg:unit->unit)(node_found_fn:I.output_channel->'aI.t):'aI.t=letopenIinmatch(Potpourri.of_option@@fun()->Node_id_hashtbl.findns.remote_nodes(Process_id.get_node@@receiver_process))with|None->beginlog_msg~pidnsErroraction(fun()->unknown_node_msg())>>=fun()->fail@@InvalidNode(Process_id.get_nodereceiver_process)end|Someout_ch->node_found_fnout_chletmonitor(pid_to_monitor:Process_id.t):monitor_reft=fun(ns,pid)->letopenIinifProcess_id.is_localpid_to_monitorns.local_nodethenbeginlog_msg~pid:(Process_id.get_idpid)nsDebug"monitored"(fun()->Format.fprintfns.log_formatter"Creating monitor for local process ";Process_id.print_string_of_pidpid_to_monitorns.log_formatter;Format.fprintfns.log_formatter" to be monitored by local process ";Process_id.print_string_of_pidpidns.log_formatter)>>=fun()->return(ns,pid,monitor_localnspidpid_to_monitor)endelsebeginlog_msg~pid:(Process_id.get_idpid)nsDebug"monitoring"(fun()->Format.fprintfns.log_formatter"Creating monitor for remote process ";Process_id.print_string_of_pidpid_to_monitorns.log_formatter;Format.fprintfns.log_formatter" to be monitored by local process ";Process_id.print_string_of_pidpidns.log_formatter)>>=fun()->letunknown_mode_msg()=Format.fprintfns.log_formatter"Process ";Process_id.print_string_of_pidpidns.log_formatter;Format.fprintfns.log_formatter" failed to monitor remote process ";Process_id.print_string_of_pidpid_to_monitorns.log_formatter;Format.fprintfns.log_formatter" on remote node ";Node_id.print_string_of_node(Process_id.get_nodepid_to_monitor)ns.log_formatter;Format.fprintfns.log_formatter", remote node is unknown"inletnode_found_fnout_ch=sync_send(Process_id.get_idpid)nsout_ch(funreceiver_pid->(Monitor(pid,pid_to_monitor,receiver_pid)))(funres->letres'=matchreswithMonitor_result(mon_msg,mon_res,_)->(mon_msg,mon_res)|_->assertfalsein(*BISECT-IGNORE*)log_msg~pid:(Process_id.get_idpid)nsI.Debug"successfully monitored remote process"(fun()->Format.fprintfns.log_formatter"result: ";print_string_of_messageresns.log_formatter)>>=fun()->return(ns,pid,monitor_response_handlernsres'))inlookup_node_and_send(Process_id.get_idpid)nspid_to_monitor"monitoring"unknown_mode_msgnode_found_fnendletunmonitor_local(ns:node_state)(Monitor_Ref(_,_,process_to_unmonitor)asmref):unit=match(Potpourri.of_option@@fun()->Process_id_hashtbl.findns.monitor_tableprocess_to_unmonitor)with|None->()|Somecurr_set->letcurr_set'=Monitor_ref_set.removemrefcurr_setinifMonitor_ref_set.is_emptycurr_set'thenProcess_id_hashtbl.removens.monitor_tableprocess_to_unmonitorelseProcess_id_hashtbl.replacens.monitor_tableprocess_to_unmonitorcurr_set'letunmonitor(Monitor_Ref(_,_,process_to_unmonitor)asmref):unitt=letopenIinfun(ns,pid)->ifProcess_id.is_localprocess_to_unmonitorns.local_nodethenbeginlog_msgns~pid:(Process_id.get_idpid)Debug"unmonitored"(fun()->Format.fprintfns.log_formatter"Unmonitor local : ";print_string_of_monitor_refmrefns.log_formatter)>>=fun()->return(ns,pid,unmonitor_localnsmref)endelsebeginlog_msgns~pid:(Process_id.get_idpid)Debug"unmonitoring"(fun()->Format.fprintfns.log_formatter"Unmonitor remote : ";print_string_of_monitor_refmrefns.log_formatter)>>=fun()->letunknown_node_msg()=Format.fprintfns.log_formatter"Process ";Process_id.print_string_of_pidpidns.log_formatter;Format.fprintfns.log_formatter" failed to monitor remote process ";Process_id.print_string_of_pidprocess_to_unmonitorns.log_formatter;Format.fprintfns.log_formatter" on remote node ";Node_id.print_string_of_node(Process_id.get_nodeprocess_to_unmonitor)ns.log_formatter;Format.fprintfns.log_formatter", remote node is unknown"inletnode_found_fnout_ch=sync_send(Process_id.get_idpid)nsout_ch(funrecv_pid->(Unmonitor(mref,recv_pid)))(fun_->log_msg~pid:(Process_id.get_idpid)nsDebug"successfully unmonitored"(fun()->Format.fprintfns.log_formatter"monitor ref : ";print_string_of_monitor_refmrefns.log_formatter)>>=fun()->return(ns,pid,()))inlookup_node_and_send(Process_id.get_idpid)nsprocess_to_unmonitor"unmonitoring"unknown_node_msgnode_found_fnendletget_self_pid:Process_id.tt=fun(ns,proc_id)->I.return(ns,proc_id,proc_id)letget_self_node:Node_id.tt=fun(ns,pid)->I.return(ns,pid,Process_id.get_nodepid)letget_remote_nodenode_name=fun(ns,pid)->letres:Node_id.toptionref=refNoneinletiter_fnnode_=if(Node_id.get_namenode=node_name)&&!res=Nonethenres:=Somenodeelse()inNode_id_hashtbl.iteriter_fnns.remote_nodes;I.return(ns,pid,!res)letget_remote_nodes:Node_id.tlistt=fun(ns,pid)->letres:Node_id.tlist=Node_id_hashtbl.fold(funn_acc->n::acc)ns.remote_nodes[]inI.return(ns,pid,res)letclean_up_node_connectionnsnodein_chout_ch=letopenIinletout_details()=ifnode<>NonethenbeginFormat.fprintfns.log_formatter"encountered error while closing output channel for remote node ";Node_id.print_string_of_node(Potpourri.get_optionnode)ns.log_formatterendelseFormat.fprintfns.log_formatter"encountered error while closing output channel"inletin_details()=ifnode<>NonethenbeginFormat.fprintfns.log_formatter"encountered error while closing input channel for remote node ";Node_id.print_string_of_node(Potpourri.get_optionnode)ns.log_formatterendelseFormat.fprintfns.log_formatter"encountered error while closing input channel"insafe_close_channelns(`Inin_ch)"node connection clean up"out_details>>=fun()->safe_close_channelns(`Outout_ch)"node connection clean up"in_details>>=fun()->ifnode=Nonethenreturn()elsereturn@@Node_id_hashtbl.removens.remote_nodes(Potpourri.get_optionnode)letrecwait_for_node_msgnsin_chout_chclient_addrnode_ref=letopenIinletprint_string_of_client_addr()=matchclient_addrwith|Unix.ADDR_UNIXs->Format.fprintfns.log_formatter"%s"s|Unix.ADDR_INET(ip,port)->Format.fprintfns.log_formatter"%s:%d"(Unix.string_of_inet_addrip)portinread_valuein_ch>>=fun(msg:message)->log_msgnsDebug"node process message"(fun()->Format.fprintfns.log_formatter"received message ";print_string_of_messagemsgns.log_formatter;Format.fprintfns.log_formatter" from ";print_string_of_client_addr())>>=fun()->matchmsgwith|Nodenode->beginnode_ref:=Somenode;Node_id_hashtbl.replacens.remote_nodesnodeout_ch;returnnodeend|_->log_msgnsDebug"node process message"(fun()->Format.fprintfns.log_formatter"ignore message ";print_string_of_messagemsgns.log_formatter;Format.fprintfns.log_formatter", waiting for handshake")>>=fun()->wait_for_node_msgnsin_chout_chclient_addrnode_refletserver_handler(ns:node_state)((in_ch,out_ch):I.input_channel*I.output_channel)(node:Node_id.t):unitI.t=letopenIinletremote_config=Potpourri.get_option!(ns.config)inletspawn_preamble()=letnew_pid=Process_id.make_remoteremote_config.Remote_config.node_ipremote_config.Remote_config.local_portremote_config.Remote_config.node_namens.next_process_idinHashtbl.replacens.mailboxes(Process_id.get_idnew_pid)(I.create_stream());new_pidinletput_in_mailboxreceiver_pidmsg=match(Potpourri.of_option@@fun()->Hashtbl.findns.mailboxes(Process_id.get_idreceiver_pid))with|None->beginletreceiver_not_found_err_msg()=Format.fprintfns.log_formatter"remote node ";Node_id.print_string_of_nodenodens.log_formatter;Format.fprintfns.log_formatter", processed message ";print_string_of_messagemsgns.log_formatter;Format.fprintfns.log_formatter", recipient unknown local process ";Process_id.print_string_of_pidreceiver_pidns.log_formatterinlog_msgnsI.Warning"node process message"receiver_not_found_err_msgend|Some(_,push_fn)->return@@push_fn(Somemsg)inletrechandler()=letlog_handler_stopmsg=letnode_str()=Node_id.print_string_of_nodenodens.log_formatterinlog_msgnsError"node process message"(fun()->Format.fprintfns.log_formatter"remote node ";node_str();Format.fprintfns.log_formatter" %s, stopping handler for remote node "msg;node_str())inif(Potpourri.of_option@@fun()->Node_id_hashtbl.findns.remote_nodesnode)=Nonethenlog_handler_stop"has been previously removed"elseread_valuein_ch>>=fun(msg:message)->log_msgnsDebug"node process message"(fun()->Format.fprintfns.log_formatter"remote node ";Node_id.print_string_of_nodenodens.log_formatter;Format.fprintfns.log_formatter", message ";print_string_of_messagemsgns.log_formatter)>>=fun()->matchmsgwith|Node_->handler()|Proc(p,sender_pid)->beginletresult_pid=spawn_preamble()inwrite_valueout_ch(Proc_result(result_pid,sender_pid))>>=fun()->async(fun()->run_process'nsresult_pidp);handler()end|Spawn_monitor(p,monitor_pid,sender)->beginletnew_pid=spawn_preamble()inlet(monitor_msg,monitor_res)=monitor_helpernsmonitor_pidnew_pidinwrite_valueout_ch(Spawn_monitor_result(monitor_msg,monitor_res,sender))>>=fun()->async(fun()->run_process'nsnew_pidp);handler()end|Monitor(monitor_pid,to_be_monitored,sender)->beginlet(mon_msg,mon_res)=monitor_helpernsmonitor_pidto_be_monitoredinwrite_valueout_ch(Monitor_result(mon_msg,mon_res,sender))>>=fun()->handler()end|Unmonitor(mref,sender)->beginunmonitor_localnsmref;write_valueout_ch(Unmonitor_result(mref,sender))>>=fun()->handler()end|Broadcast(sender_pid,_,msg)->beginbroadcast_localnssender_pidmsg>>=fun()->handler()end|Data(_,r,_)asdata->beginput_in_mailboxrdata>>=fun()->handler()end|Exit(s,m)->beginmatch(Potpourri.of_option@@fun()->Process_id_hashtbl.findns.monitor_tables)with|None->beginlog_msgnsError"node process message"(fun()->Format.fprintfns.log_formatter"no entry for ";Process_id.print_string_of_pidsns.log_formatter;Format.fprintfns.log_formatter" in monitor table when processing ";print_string_of_messagemsgns.log_formatter)>>=fun()->handler()end|Somepids->beginMonitor_ref_set.fold(fun(Monitor_Ref(_,pid,_))_->put_in_mailboxpid(Exit(s,m)))pids(return())>>=fun()->handler()endend|Proc_result(_,receiver_pid)aspres->beginput_in_mailboxreceiver_pidpres>>=fun()->handler()end|Spawn_monitor_result(monitor_msg,mref,receiver)assres->beginignore(monitor_response_handlerns(monitor_msg,mref));put_in_mailboxreceiversres>>=fun()->handler()end|Monitor_result(mon_msg,mref,receiver)asmres->beginignore(monitor_response_handlerns(mon_msg,mref));put_in_mailboxreceivermres>>=fun()->handler()end|Unmonitor_result(mref,receiver_pid)asunmonres->beginunmonitor_localnsmref;put_in_mailboxreceiver_pidunmonres>>=fun()->handler()endinhandler()letnode_server_fn(ns:node_state)(client_addr:Unix.sockaddr)((in_ch,out_ch):I.input_channel*I.output_channel):unitI.t=letopenIinletnode_ref=refNoneincatch(fun()->ns.est_in_ch:=Somein_ch;ns.est_out_ch:=Someout_ch;wait_for_node_msgnsin_chout_chclient_addrnode_ref>>=funnode->write_valueout_ch@@Node(ns.local_node)>>=fun()->log_msgnsDebug"node server loop"(fun()->Format.fprintfns.log_formatter"starting server handler")>>=fun_->server_handlerns(in_ch,out_ch)node)(fune->log_msgns~exn:eError"node process message"(fun()->Format.fprintfns.log_formatter"unexpected exception")>>=fun()->clean_up_node_connectionns!node_refin_chout_ch)letconnect_to_remote_node?pid(ns:node_state)(remote_node:Node_id.t)(ip:string)(port:int)(name:string)(remote_sock_addr:Unix.sockaddr):unitI.t=letopenIinletnode_ref=refNoneinletserver_handler_safeserver_fn(in_ch,out_ch)=catch(fun()->server_fnns(in_ch,out_ch)remote_node)(fune->clean_up_node_connectionns(Someremote_node)in_chout_ch>>=fun()->log_msgns~exn:eError"node client loop"@@(fun()->Format.fprintfns.log_formatter"unexpected exception while processing messages for remote node ";Node_id.print_string_of_noderemote_nodens.log_formatter))inlog_msgns?pidDebug"connecting to remote node"(fun()->Format.fprintfns.log_formatter"remote node %s:%d, name %s"ipportname)>>=fun()->open_connectionremote_sock_addr>>=fun(in_ch,out_ch)->write_valueout_ch@@Node(ns.local_node)>>=fun()->log_msgns?pidDebug"connecting to remote node"(fun()->Format.fprintfns.log_formatter"sent message ";print_string_of_message(Node(ns.local_node))ns.log_formatter;Format.fprintfns.log_formatter" remote node %s:%d, name %s"ipportname;)>>=fun()->wait_for_node_msgnsin_chout_chremote_sock_addrnode_ref>>=fun_->log_msgnsDebug"connected to remote node"(fun()->Format.fprintfns.log_formatter"remote node %s:%d, name %s"ipportname)>>=fun()->return@@async(fun()->server_handler_safeserver_handler(in_ch,out_ch))letadd_remote_node(ip:string)(port:int)(name:string):Node_id.tt=letopenIinfun(ns,pid)->if!(ns.config)=Nonethenlog_msg~pid:(Process_id.get_idpid)nsError"add remote node"(fun()->Format.fprintfns.log_formatter"called add remote node when node is running with local only configuration")>>=fun()->failLocal_only_modeelselog_msg~pid:(Process_id.get_idpid)nsDebug"adding remote node"(fun()->Format.fprintfns.log_formatter"%s:%d, name %s"ipportname)>>=fun()->letremote_sock_addr=Unix.ADDR_INET(Unix.inet_addr_of_stringip,port)inletremote_node=Node_id.make_remote_nodeipportnameinifNode_id_hashtbl.memns.remote_nodesremote_nodethenlog_msg~pid:(Process_id.get_idpid)nsWarning"remote node already exists"(fun()->Format.fprintfns.log_formatter"%s:%d, name %s"ipportname)>>=fun()->return(ns,pid,remote_node)elseconnect_to_remote_nodensremote_nodeipportnameremote_sock_addr>>=fun()->return(ns,pid,remote_node)letremove_remote_node(node:Node_id.t):unitt=letopenIinfun(ns,pid)->if!(ns.config)=Nonethenlog_msg~pid:(Process_id.get_idpid)nsError"remote remote node"(fun()->Format.fprintfns.log_formatter"called remove remote node when node is running with local only configuration")>>=fun()->failLocal_only_modeelselog_msg~pid:(Process_id.get_idpid)nsDebug"removing remote node"(fun()->Format.fprintfns.log_formatter"remote node : ";Node_id.print_string_of_nodenodens.log_formatter)>>=fun()->Node_id_hashtbl.removens.remote_nodesnode;return(ns,pid,())letrecconnect_to_remote_nodes(ns:node_state)(nodes:(string*int*string)list):unitio=letopenIinmatchnodeswith|[]->return()|(ip,port,name)::rest->letremote_sock_addr=Unix.ADDR_INET(Unix.inet_addr_of_stringip,port)inletremote_node=Node_id.make_remote_nodeipportnameinconnect_to_remote_nodensremote_nodeipportnameremote_sock_addr>>=fun()->connect_to_remote_nodesnsrestletrun_node?process(node_config:node_config):unitio=letopenIinif!initalisedthenfailInit_more_than_onceelsebegininitalised:=true;letbuff=Buffer.create1024inmatchnode_configwith|Locallocal_config->letns={mailboxes=Hashtbl.create1000;remote_nodes=Node_id_hashtbl.create~random:true10;monitor_table=Process_id_hashtbl.create~random:true1000;local_node=Node_id.make_local_nodelocal_config.Local_config.node_name;monitor_ref_id=ref0;config=refNone;log_buffer=buff;log_formatter=Format.formatter_of_bufferbuff;est_in_ch=refNone;est_out_ch=refNone;node_server=refNone;next_process_id=ref0;}inlog_msgnsInfo"node start up"(fun()->Format.fprintfns.log_formatter"{Distributed library version : %s ; Threading implementation : [name : %s ; version : %s ; description : %s]}"dist_lib_versionlib_namelib_versionlib_description)>>=fun()->log_msgnsInfo"node start up"(fun()->Format.fprintfns.log_formatter"local only mode with configuration of ";print_string_of_confignode_configns.log_formatter)>>=fun()->ifprocess=Nonethenreturn()elsebeginletnew_pid=Process_id.make_locallocal_config.Local_config.node_namens.next_process_idinHashtbl.replacens.mailboxes(Process_id.get_idnew_pid)(I.create_stream());run_process'nsnew_pid((Potpourri.get_optionprocess)())end|Remoteremote_config->letns={mailboxes=Hashtbl.create1000;remote_nodes=Node_id_hashtbl.create~random:true10;monitor_table=Process_id_hashtbl.create~random:true1000;local_node=Node_id.make_remote_noderemote_config.Remote_config.node_ipremote_config.Remote_config.local_portremote_config.Remote_config.node_name;monitor_ref_id=ref0;config=ref(Someremote_config);log_buffer=buff;log_formatter=Format.formatter_of_bufferbuff;est_in_ch=refNone;est_out_ch=refNone;node_server=refNone;(* fill in below *)next_process_id=ref0;}inlog_msgnsInfo"node start up"(fun()->Format.fprintfns.log_formatter"{Distributed library version : %s ; Threading implementation : [name : %s ; version : %s ; description : %s]}"dist_lib_versionlib_namelib_versionlib_description)>>=fun()->log_msgnsInfo"node start up"(fun()->Format.fprintfns.log_formatter"remote mode with configuration of ";print_string_of_confignode_configns.log_formatter)>>=fun()->I.catch(fun()->connect_to_remote_nodesnsremote_config.Remote_config.remote_nodes>>=fun()->letlocal_sock_addr=Unix.ADDR_INET(Unix.inet_addr_any,remote_config.Remote_config.local_port)inI.establish_server~backlog:remote_config.Remote_config.connection_backloglocal_sock_addr(node_server_fnns)>>=funcommand_process_server->ns.node_server:=Somecommand_process_server;at_exit(fun()->log_msgnsInfo"node shutting down"(fun()->Format.fprintfns.log_formatter"start clean up actions for remote mode with configuration of ";print_string_of_confignode_configns.log_formatter)>>=fun()->at_exit_handlerns()>>=fun()->log_msgnsInfo"node shutting down"(fun()->Format.fprintfns.log_formatter"finished clean up actions for remote mode with configuration of ";print_string_of_confignode_configns.log_formatter));ifprocess=Nonethenreturn()elsebeginletnew_pid=Process_id.make_remoteremote_config.Remote_config.node_ipremote_config.Remote_config.local_portremote_config.Remote_config.node_namens.next_process_idinHashtbl.replacens.mailboxes(Process_id.get_idnew_pid)(I.create_stream());run_process'nsnew_pid((Potpourri.get_optionprocess)())end)(fune->log_msgnsError~exn:e"node start up"(fun()->Format.fprintfns.log_formatter"encountered exception during node startup, shutting down server")>>=fun()->at_exit_handlerns()>>=fun()->(* reraise exception so os level process stops*)faile)endend