123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209(*****************************************************************************)(* *)(* Open Source License *)(* Copyright (c) 2020-2021 Nomadic Labs, <contact@nomadic-labs.com> *)(* *)(* Permission is hereby granted, free of charge, to any person obtaining a *)(* copy of this software and associated documentation files (the "Software"),*)(* to deal in the Software without restriction, including without limitation *)(* the rights to use, copy, modify, merge, publish, distribute, sublicense, *)(* and/or sell copies of the Software, and to permit persons to whom the *)(* Software is furnished to do so, subject to the following conditions: *)(* *)(* The above copyright notice and this permission notice shall be included *)(* in all copies or substantial portions of the Software. *)(* *)(* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR*)(* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, *)(* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL *)(* THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER*)(* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING *)(* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER *)(* DEALINGS IN THE SOFTWARE. *)(* *)(*****************************************************************************)openError_monadtypeaddr=|Unixofstring|Tcpofstring*string*Unix.getaddrinfo_optionlistlethandle_literal_ipv6host=(* To strip '[' and ']' when a literal IPv6 is provided *)matchIpaddr.of_stringhostwith|Error(`Msg_)->host|Okipaddr->Ipaddr.to_stringipaddrletconnect?(timeout=!Lwt_utils_unix.default_net_timeout)=letopenLwt_result_syntaxinfunction|Unixpath->letaddr=Lwt_unix.ADDR_UNIXpathinletsock=Lwt_unix.socketPF_UNIXSOCK_STREAM0inLwt_unix.set_close_on_execsock;let*!()=Lwt_unix.connectsockaddrinreturnsock|Tcp(host,service,opts)->(lethost=handle_literal_ipv6hostinlet*!addrs=Lwt_unix.getaddrinfohostserviceoptsinmatchaddrswith|[]->failwith"could not resolve host '%s'"host|addrs->letrectry_connectacc=function|[]->Lwt.return(Error(leterr=error_of_fmt"could not connect to '%s'"hostinmatchaccwith|None->TzTrace.makeerr|Sometr->TzTrace.conserrtr))|{Unix.ai_family;ai_socktype;ai_protocol;ai_addr;_}::addrs->(letsock=Lwt_unix.socketai_familyai_socktypeai_protocolinLwt_unix.set_close_on_execsock;let*!r=protect~on_error:(fune->let*!()=Lwt_unix.closesockinLwt.return_errore)(fun()->Lwt_unix.with_timeout(Ptime.Span.to_float_stimeout)(fun()->let*!()=Lwt_unix.connectsockai_addrinreturnsock))inmatchrwith|Oksock->returnsock|Error(e:errortrace)->letacc=matchaccwith|None->Somee|Sometr->Some(TzTrace.conpetr)intry_connectaccaddrs)intry_connectNoneaddrs)letwith_connection?timeoutaddrf=letopenLwt_result_syntaxinlet*conn=connect?timeoutaddrinprotect(fun()->let*a=fconninlet*()=Lwt_utils_unix.safe_closeconninreturna)~on_error:(fune->let*()=Lwt_utils_unix.safe_closeconninLwt.return(Errore))letbind?(backlog=10)=letopenLwt_syntaxinfunction|Unixpath->letaddr=Lwt_unix.ADDR_UNIXpathinletsock=Lwt_unix.socketPF_UNIXSOCK_STREAM0inLwt_unix.set_close_on_execsock;let*()=Lwt_unix.bindsockaddrinLwt_unix.listensockbacklog;return_ok[sock]|Tcp(host,service,opts)->(let*addrs=Lwt_unix.getaddrinfo(handle_literal_ipv6host)service(AI_PASSIVE::opts)inmatchaddrswith|[]->failwith"could not resolve host '%s'"host|addrs->letdo_bind{Unix.ai_family;ai_socktype;ai_protocol;ai_addr;_}=letsock=Lwt_unix.socketai_familyai_socktypeai_protocolinLwt_unix.set_close_on_execsock;Lwt_unix.setsockoptsockSO_REUSEADDRtrue;let*()=Lwt_unix.bindsockai_addrinLwt_unix.listensockbacklog;return_oksockinTezos_error_monad.TzLwtreslib.List.map_esdo_bindaddrs)(* To get the encoding/decoding errors into scope. *)openData_encoding_wrapper(* length information is encoded as an [int16] which has a size of [2] bytes *)letsize_of_length_of_message_payload=2(* some messages may be too long for their length to be encoded *)letmaximum_length_of_message_payload=(* or [0b1111_1111_1111_1111] *)1lsl(size_of_length_of_message_payload*8)letsendfdencodingmessage=letopenLwt_result_syntaxinletlength_of_message_payload=Data_encoding.Binary.lengthencodingmessageinassert(length_of_message_payload>=0);let*()=fail_unless(length_of_message_payload<maximum_length_of_message_payload)Unexpected_size_of_encoded_valueinlettotal_length_of_message=size_of_length_of_message_payload+length_of_message_payloadinletmessage_serialisation_buffer=Bytes.createtotal_length_of_messageinletserialisation_state=Data_encoding.Binary.make_writer_statemessage_serialisation_buffer~offset:size_of_length_of_message_payload~allowed_bytes:length_of_message_payloadin(* By construction, the length of the serialisation buffer is the state's
offset + the state's allowed_length. As a result, we are within the range
of valid parameter for [make_writer_state]. *)assert(Option.is_someserialisation_state);letserialisation_state=Stdlib.Option.getserialisation_stateinmatchData_encoding.Binary.writeencodingmessageserialisation_statewith|Errorwe->tzfail(Encoding_errorwe)|Oklast->let*()=fail_unless(last=total_length_of_message)Unexpected_size_of_encoded_valuein(* we set the beginning of the buf with the length of what is next *)Tezos_stdlib.TzEndian.set_int16message_serialisation_buffer0length_of_message_payload;protect(fun()->Lwt_result.ok@@Lwt_utils_unix.write_bytesfdmessage_serialisation_buffer)letrecv?timeoutfdencoding=letopenLwt_result_syntaxinletheader_buf=Bytes.createsize_of_length_of_message_payloadinlet*()=protect(fun()->Lwt_result.ok@@Lwt_utils_unix.read_bytes_with_timeout?timeout~len:size_of_length_of_message_payloadfdheader_buf)inletlen=Tezos_stdlib.TzEndian.get_uint16header_buf0inletbuf=Bytes.createleninlet*()=protect(fun()->Lwt_result.ok@@Lwt_utils_unix.read_bytes_with_timeout?timeout~lenfdbuf)inletbuf=Bytes.unsafe_to_stringbufinmatchData_encoding.Binary.readencodingbuf0lenwith|Errorre->tzfail(Decoding_errorre)|Ok(read_len,message)->ifread_len<>lenthentzfail(Decoding_errorExtra_bytes)elsereturnmessage