123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280(*****************************************************************************)(* *)(* Open Source License *)(* Copyright (c) 2018 Dynamic Ledger Solutions, Inc. <contact@tezos.com> *)(* Copyright (c) 2019 Nomadic Labs, <contact@nomadic-labs.com> *)(* *)(* Permission is hereby granted, free of charge, to any person obtaining a *)(* copy of this software and associated documentation files (the "Software"),*)(* to deal in the Software without restriction, including without limitation *)(* the rights to use, copy, modify, merge, publish, distribute, sublicense, *)(* and/or sell copies of the Software, and to permit persons to whom the *)(* Software is furnished to do so, subject to the following conditions: *)(* *)(* The above copyright notice and this permission notice shall be included *)(* in all copies or substantial portions of the Software. *)(* *)(* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR*)(* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, *)(* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL *)(* THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER*)(* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING *)(* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER *)(* DEALINGS IN THE SOFTWARE. *)(* *)(*****************************************************************************)moduleEvents=P2p_events.Discoverytypepool=Pool:('msg,'meta,'meta_conn)P2p_pool.t->poolmoduleMessage=structletencoding=Data_encoding.(tup3(Fixed.string10)P2p_peer.Id.encodingint16)letlength=WithExceptions.Option.get~loc:__LOC__@@Data_encoding.Binary.fixed_lengthencodingletkey="DISCOMAGIC"letmakepeer_idport=Data_encoding.Binary.to_bytes_exnencoding(key,peer_id,port)endmoduleAnswer=structtypet={my_peer_id:P2p_peer.Id.t;pool:pool;discovery_port:int;canceler:Lwt_canceler.t;trust_discovered_peers:bool;mutableworker:unitLwt.t;}letcreate_socketst=letopenLwt_syntaxinLwt.catch(fun()->letsocket=Lwt_unix.socketPF_INETSOCK_DGRAM0inLwt_unix.set_close_on_execsocket;Lwt_canceler.on_cancelst.canceler(fun()->let*r=Lwt_utils_unix.safe_closesocketinResult.iter_error(Format.eprintf"Uncaught error: %a\n%!"pp_print_trace)r;return_unit);Lwt_unix.setsockoptsocketSO_BROADCASTtrue;Lwt_unix.setsockoptsocketSO_REUSEADDRtrue;letaddr=Lwt_unix.ADDR_INET(Unix.inet_addr_any,st.discovery_port)inlet*()=Lwt_unix.bindsocketaddrinreturnsocket)(funexn->let*()=Events.(emitcreate_socket_error)()inLwt.failexn)letloopst=letopenLwt_result_syntaxinlet*socket=protect~canceler:st.canceler(fun()->Lwt_result.ok@@create_socketst)in(* Infinite loop, should never exit. *)letrecaux()=letbuf=Bytes.createMessage.lengthinlet*rd=protect~canceler:st.canceler(fun()->let*!content=Lwt_unix.recvfromsocketbuf0Message.length[]inlet*!()=Events.(emitmessage_received)()inreturncontent)inmatchrdwith|len,Lwt_unix.ADDR_INET(remote_addr,_)whenCompare.Int.equallenMessage.length->(matchData_encoding.Binary.of_bytes_optMessage.encodingbufwith|Some(key,remote_peer_id,remote_port)whenCompare.String.equalkeyMessage.key&¬(P2p_peer.Id.equalremote_peer_idst.my_peer_id)->(lets_addr=Unix.string_of_inet_addrremote_addrinmatchP2p_addr.of_string_opts_addrwith|None->let*!()=Events.(emitparse_error)s_addrinaux()|Someaddr->let(Poolpool)=st.poolinlet*!()=Events.(emitregister_new)(addr,remote_port)inP2p_pool.register_new_point~trusted:st.trust_discovered_peerspool(addr,remote_port)|>ignore;aux())|_->aux())|_->aux()inaux()letworker_loopst=letopenLwt_syntaxinlet*r=loopstinmatchrwith|Error(Canceled::_)->return_unit|Errorerr->let*()=Events.(emitunexpected_error)("answer",err)inError_monad.cancel_with_exceptionsst.canceler|Ok()->let*()=Events.(emitunexpected_exit)()inError_monad.cancel_with_exceptionsst.cancelerletcreatemy_peer_idpool~trust_discovered_peers~discovery_port={canceler=Lwt_canceler.create();my_peer_id;discovery_port;trust_discovered_peers;pool=Poolpool;worker=Lwt.return_unit;}letactivatest=st.worker<-Lwt_utils.worker"discovery_answer"~on_event:Internal_event.Lwt_worker_logger.on_event~run:(fun()->worker_loopst)~cancel:(fun()->Error_monad.cancel_with_exceptionsst.canceler)end(* ************************************************************ *)(* Sender *)moduleSender=structtypet={canceler:Lwt_canceler.t;my_peer_id:P2p_peer.Id.t;listening_port:int;discovery_port:int;discovery_addr:Ipaddr.V4.t;pool:pool;restart_discovery:unitLwt_condition.t;mutableworker:unitLwt.t;}moduleConfig=structtypet={delay:float;loop:int}letinitial={delay=0.1;loop=0}letincrease_delayconfig={configwithdelay=2.0*.config.delay}letmax_loop=10endletbroadcast_messagest=letopenLwt_syntaxinletmsg=Message.makest.my_peer_idst.listening_portinLwt.catch(fun()->letsocket=Lwt_unix.(socketPF_INETSOCK_DGRAM0)inLwt_canceler.on_cancelst.canceler(fun()->let*r=Lwt_utils_unix.safe_closesocketinResult.iter_error(Format.eprintf"Uncaught error: %a\n%!"pp_print_trace)r;return_unit);Lwt_unix.setsockoptsocketLwt_unix.SO_BROADCASTtrue;letbroadcast_ipv4=Ipaddr_unix.V4.to_inet_addrst.discovery_addrinletaddr=Lwt_unix.ADDR_INET(broadcast_ipv4,st.discovery_port)inlet*()=Lwt_unix.connectsocketaddrinlet*()=Events.(emitbroadcast_message)()inlet*_len=Lwt_unix.sendtosocketmsg0Message.length[]addrinlet*r=Lwt_utils_unix.safe_closesocketinResult.iter_error(funtrace->Format.eprintf"Uncaught error: %a\n%!"pp_print_tracetrace)r;return_unit)(fun_exn->Events.(emitbroadcast_error)())letrecworker_loopsender_configst=letopenLwt_syntaxinlet*r=Lwt_result.bind(protect~canceler:st.canceler(fun()->Lwt_result.ok@@broadcast_messagest))@@fun()->protect~canceler:st.canceler(fun()->Lwt_result.ok@@Lwt.pick[(let*()=Lwt_condition.waitst.restart_discoveryinreturnConfig.initial);(let*()=Lwt_unix.sleepsender_config.Config.delayinreturn{sender_configwithConfig.loop=succsender_config.loop});])inmatchrwith|Okconfigwhenconfig.Config.loop=Config.max_loop->letnew_sender_config={configwithConfig.loop=predconfig.loop}inworker_loopnew_sender_configst|Okconfig->letnew_sender_config=Config.increase_delayconfiginworker_loopnew_sender_configst|Error(Canceled::_)->return_unit|Errorerr->let*()=Events.(emitunexpected_error)("sender",err)inError_monad.cancel_with_exceptionsst.cancelerletcreatemy_peer_idpool~listening_port~discovery_port~discovery_addr={canceler=Lwt_canceler.create();my_peer_id;listening_port;discovery_port;discovery_addr;restart_discovery=Lwt_condition.create();pool=Poolpool;worker=Lwt.return_unit;}letactivatest=st.worker<-Lwt_utils.worker"discovery_sender"~on_event:Internal_event.Lwt_worker_logger.on_event~run:(fun()->worker_loopConfig.initialst)~cancel:(fun()->Error_monad.cancel_with_exceptionsst.canceler)end(* ********************************************************************** *)typet={answer:Answer.t;sender:Sender.t}letcreate~listening_port~discovery_port~discovery_addr~trust_discovered_peerspoolmy_peer_id=letanswer=Answer.createmy_peer_idpool~discovery_port~trust_discovered_peersinletsender=Sender.createmy_peer_idpool~listening_port~discovery_port~discovery_addrin{answer;sender}letactivate{answer;sender}=Answer.activateanswer;Sender.activatesenderletwakeupt=Lwt_condition.signalt.sender.restart_discovery()letshutdownt=Lwt.join[Error_monad.cancel_with_exceptionst.answer.canceler;Error_monad.cancel_with_exceptionst.sender.canceler;]