123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572(*****************************************************************************)(* *)(* Open Source License *)(* Copyright (c) 2018 Dynamic Ledger Solutions, Inc. <contact@tezos.com> *)(* Copyright (c) 2020-2021 Nomadic Labs <contact@nomadic-labs.com> *)(* *)(* Permission is hereby granted, free of charge, to any person obtaining a *)(* copy of this software and associated documentation files (the "Software"),*)(* to deal in the Software without restriction, including without limitation *)(* the rights to use, copy, modify, merge, publish, distribute, sublicense, *)(* and/or sell copies of the Software, and to permit persons to whom the *)(* Software is furnished to do so, subject to the following conditions: *)(* *)(* The above copyright notice and this permission notice shall be included *)(* in all copies or substantial portions of the Software. *)(* *)(* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR*)(* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, *)(* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL *)(* THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER*)(* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING *)(* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER *)(* DEALINGS IN THE SOFTWARE. *)(* *)(*****************************************************************************)openLwt.InfixmoduleConnectionMap=Map.Make(Cohttp.Connection)let(>>?)vf=matchvwithOkx->fx|Errorerr->Lwt.return_errorerrletlwt_return_ok_responser=Lwt.return_ok(`Responser)letlwt_return_responser=Lwt.return(`Responser)letwchunkoc(item,offset,length)=iflength=0thenLwt.return_unitelseLwt_io.fprintfoc"%X\r\n"length>>=fun()->Lwt_io.write_from_exactlyocitemoffsetlength>>=fun()->Lwt_io.write_from_string_exactlyoc"\r\n"02>>=fun()->Lwt_io.flushocletrecdrainseq=matchseq()with|Seq.Nil->Lwt.return_unit|Seq.Cons(_,seq)->Lwt.pause()>>=fun()->(drain[@ocaml.tailcall])seqletrecwseqicocseq=matchseq()with|Seq.Nil->Lwt_io.write_from_string_exactlyoc"0\r\n\r\n"05>>=fun()->Lwt_io.flushoc|Seq.Cons(chunk,seq)->Lwt.try_bind(fun()->wchunkocchunk)(fun()->Lwt.pause()>>=fun()->(wseq[@ocaml.tailcall])icocseq)(funexc->drainseq>>=fun()->raiseexc)letwseqicocseq=Lwt.finalize(fun()->wseqicocseq)(fun()->Lwt_io.closeic)moduletypeLOGGING=sigvaldebug:('a,Format.formatter,unit,unit)format4->'avallog_info:('a,Format.formatter,unit,unit)format4->'avallog_notice:('a,Format.formatter,unit,unit)format4->'avalwarn:('a,Format.formatter,unit,unit)format4->'avallog_error:('a,Format.formatter,unit,unit)format4->'avallwt_debug:('a,Format.formatter,unit,unitLwt.t)format4->'avallwt_log_info:('a,Format.formatter,unit,unitLwt.t)format4->'avallwt_log_notice:('a,Format.formatter,unit,unitLwt.t)format4->'avallwt_warn:('a,Format.formatter,unit,unitLwt.t)format4->'avallwt_log_error:('a,Format.formatter,unit,unitLwt.t)format4->'aendlet(>>=?)=Lwt_result.bindmoduleMake_selfserver(Encoding:Resto.ENCODING)(Log:LOGGING)=structopenCohttpmoduleService=Resto.MakeService(Encoding)moduleDirectory=Resto_directory.Make(Encoding)moduleMedia_type=Media_type.Make(Encoding)moduleMedia=structtypemedias={media_types:Media_type.tlist;default_media_type:string*Media_type.t;}letdefault_media_typemedia_types=matchMedia_type.first_complete_mediamedia_typeswith|None->invalid_arg"Resto_directory_cohttp.launch(empty media type list)"|Some((l,r),m)->(l^"/"^r,m)letinput_media_type?headersmedias=matchheaderswith|None->Ok(sndmedias.default_media_type)|Someheaders->(matchHeader.getheaders"content-type"with|None->Ok(sndmedias.default_media_type)|Somecontent_type->(matchResto.Utils.split_pathcontent_typewith|[x;y]->(matchMedia_type.find_media(x,y)medias.media_typeswith|None->Error(`Unsupported_media_typecontent_type)|Somemedia_type->Okmedia_type)|_->Error(`Unsupported_media_typecontent_type)))letoutput_content_media_type?headersmedias=matchheaderswith|None->Okmedias.default_media_type|Someheaders->(matchHeader.getheaders"accept"with|None->Okmedias.default_media_type|Someaccepted->(matchMedia_type.resolve_accept_headermedias.media_types(Someaccepted)with|None->Error`Not_acceptable|Somemedia_type->Okmedia_type))endmoduleAgent=structletdefault_agent="OCaml-Resto"endmoduleHandlers=structletinvalid_cors(cors:Cors.t)headers=cors.allowed_origins<>[]&¬(Cors.check_hostheaderscors)letinvalid_cors_responseagent=letheaders=Cohttp.Header.init_with(Format.asprintf"X-%s-CORS-Error"agent)"invalid host"in(Response.make~headers~status:`Forbidden(),Cohttp_lwt.Body.empty)lethandle_errormedias(error:[<`Cannot_parse_bodyofstring|`Cannot_parse_pathofstringlist*Resto.Arg.descr*string|`Cannot_parse_queryofstring|`Method_not_allowedof[<Resto.meth]list|`Not_acceptable|`Not_found|`Not_implemented|`Unsupported_media_typeof'a]):Cohttp.Response.t*Cohttp_lwt.Body.t=letopenResto.Arginmatcherrorwith|`Not_implemented->(Response.make~status:`Not_implemented(),Cohttp_lwt.Body.empty)|`Method_not_allowedmethods->letheaders=Header.init()inletheaders=Header.add_multiheaders"allow"(List.mapResto.string_of_methmethods)in(Response.make~status:`Method_not_allowed~headers(),Cohttp_lwt.Body.empty)|`Cannot_parse_path(context,arg,value)->letheaders=Header.init()inletheaders=Header.addheaders"content-type""text/plain"in(Response.make~status:`Bad_request~headers(),Format.kasprintfCohttp_lwt.Body.of_string"Failed to parsed an argument in path. After \"%s\", the value \
\"%s\" is not acceptable for type \"%s\""(String.concat"/"context)valuearg.name)|`Cannot_parse_bodys->letheaders=Header.init()inletheaders=Header.addheaders"content-type""text/plain"in(Response.make~status:`Bad_request~headers(),Format.kasprintfCohttp_lwt.Body.of_string"Failed to parse the request body: %s"s)|`Cannot_parse_querys->letheaders=Header.init()inletheaders=Header.addheaders"content-type""text/plain"in(Response.make~status:`Bad_request~headers(),Format.kasprintfCohttp_lwt.Body.of_string"Failed to parse the query string: %s"s)|`Not_acceptable->letaccepted_encoding=Media_type.acceptable_encodingmedias.Media.media_typesin(Response.make~status:`Not_acceptable(),Cohttp_lwt.Body.of_stringaccepted_encoding)|`Unsupported_media_type_->(Response.make~status:`Unsupported_media_type(),Cohttp_lwt.Body.empty)|`Not_found->(Response.make~status:`Not_found(),Cohttp_lwt.Body.empty)lethandle_rpc_answercon_string?headersoutputanswer=matchanswerwith|`Oko->letbody=outputoinLog.debug"(%s) response code:200"con_string;Log.debug"(%s) response body: %s"con_stringbody;letencoding=Transfer.Fixed(Int64.of_int(String.lengthbody))in(Response.make~status:`OK~encoding?headers(),Cohttp_lwt.Body.of_stringbody)|`No_content->Log.debug"(%s) response code:204 (no content)"con_string;(Response.make~status:`No_content(),Cohttp_lwt.Body.empty)|`Createds->letheaders=Header.init()inletheaders=matchswith|None->headers|Somes->Header.addheaders"location"sinLog.debug"(%s) response code:201 (created)"con_string;(Response.make~status:`Created~headers(),Cohttp_lwt.Body.empty)lethandle_rpc_answer_errorcon_string?headerserroranswer=matchanswerwith|`Unauthorizede->Log.log_info"(%s) response code: 401"con_string;let(body,encoding)=erroreinletstatus=`Unauthorizedin(Response.make~status~encoding?headers(),body)|`Forbiddene->Log.log_info"(%s) response code: 403"con_string;let(body,encoding)=erroreinletstatus=`Forbiddenin(Response.make~status~encoding?headers(),body)|`Gonee->Log.log_info"(%s) response code: 410"con_string;let(body,encoding)=erroreinletstatus=`Gonein(Response.make~status~encoding?headers(),body)|`Not_founde->Log.log_info"(%s) response code: 404"con_string;let(body,encoding)=erroreinletstatus=`Not_foundin(Response.make~status~encoding?headers(),body)|`Conflicte->Log.log_info"(%s) response code: 409"con_string;let(body,encoding)=erroreinletstatus=`Conflictin(Response.make~status~encoding?headers(),body)|`Errore->Log.log_info"(%s) response code: 500"con_string;let(body,encoding)=erroreinletstatus=`Internal_server_errorin(Response.make~status~encoding?headers(),body)lethandle_rpc_answer_chunk?headersoutput_seqanswer=matchanswerwith|`OkChunko->letbody=output_seqoinletencoding=Transfer.Chunkedin(Response.make~status:`OK~encoding?headers(),funicoc->wseqicocbody)lethandle_optionsrootcorsheaderspath=letorigin_header=Header.getheaders"origin"in(if(* Default OPTIONS handler for CORS preflight *)origin_header=NonethenDirectory.allowed_methodsroot()pathelsematchHeader.getheaders"Access-Control-Request-Method"with|None->Directory.allowed_methodsroot()path|Somemeth->(matchCode.method_of_stringmethwith|#Resto.methasmeth->Directory.lookuproot()methpath>>=?fun_handler->Lwt.return_ok[meth]|_->Lwt.return_error`Not_found))>>=?funcors_allowed_meths->letheaders=Header.init()inletheaders=Header.add_multiheaders"Access-Control-Allow-Methods"(List.mapResto.string_of_methcors_allowed_meths)inletheaders=Cors.add_headersheaderscorsorigin_headerinLwt.return_ok(Response.make~flush:true~status:`OK~headers(),Cohttp_lwt.Body.empty)endendmoduleMake(Encoding:Resto.ENCODING)(Log:LOGGING)=structincludeMake_selfserver(Encoding)(Log)openCohttptypeserver={root:unitDirectory.directory;mutablestreams:(unit->unit)ConnectionMap.t;cors:Cors.t;medias:Media.medias;stopper:unitLwt.u;mutableacl:Acl.t;agent:string;mutableworker:unitLwt.t;}letcreate_streamserverconto_strings=letcon_string=Connection.to_stringconinletrunning=reftrueinletstream=Lwt_stream.from(fun()->ifnot!runningthenLwt.returnNoneelses.Resto_directory.Answer.next()>|=Option.mapto_string)inletshutdown()=Log.log_info"streamed connection closed %s"con_string;running:=false;s.shutdown();server.streams<-ConnectionMap.removeconserver.streamsinserver.streams<-ConnectionMap.addconshutdownserver.streams;streamletcallbackserver((_io,con):Cohttp_lwt_unix.Server.conn)reqbody=letcon_string=Connection.to_stringconinleturi=Request.urireqinletpath_and_query=Uri.path_and_queryuriinLog.lwt_log_info"(%s) received request %s"con_stringpath_and_query>>=fun()->letreq_headers=Request.headersreqinLog.lwt_debug"(%s) request headers: %s"con_string(Header.to_stringreq_headers)>>=fun()->Cohttp_lwt.Body.to_stringbody>>=funbody->Log.lwt_debug"(%s) request body: %s"con_stringbody>>=fun()->letpath=Uri.pathuriinletpath=Resto.Utils.decode_split_pathpathin(matchRequest.methreqwith|#Resto.methwhenHandlers.invalid_corsserver.corsreq_headers->lwt_return_ok_response@@Handlers.invalid_cors_responseserver.agent|#Resto.methasmeth->(Directory.lookupserver.root()methpath>>=?fun(Directory.Services)->Media.input_media_type~headers:req_headersserver.medias>>?funinput_media_type->Log.lwt_debug"(%s) input media type %s"con_string(Media_type.nameinput_media_type)>>=fun()->Media.output_content_media_type~headers:req_headersserver.medias>>?fun(output_content_type,output_media_type)->(matchResto.Query.parses.types.query(List.map(fun(k,l)->(k,String.concat","l))(Uri.queryuri))with|exceptionResto.Query.Invalids->Lwt.return_error(`Cannot_parse_querys)|query->Lwt.return_okquery)>>=?funquery->Log.lwt_debug"(%s) ouput media type %s"con_string(Media_type.nameoutput_media_type)>>=fun()->letheaders=Header.init()inletheaders=Header.addheaders"content-type"output_content_typeinletheaders=Cors.add_allow_originheadersserver.cors(Header.getreq_headers"origin")in(ifnot@@Acl.allowedserver.acl~meth~paththenLwt.return_ok(`UnauthorizedNone)elsematchs.types.inputwith|Service.No_input->s.handlerquery()>>=Lwt.return_ok|Service.Inputinput->(matchinput_media_type.destructinputbodywith|Errors->Lwt.return_error(`Cannot_parse_bodys)|Okbody->s.handlerquerybody>>=Lwt.return_ok))>>=?funanswer->matchanswerwith|(`Ok_|`No_content|`Created_)asa->letoutput=output_media_type.constructs.types.outputinletresponse=Handlers.handle_rpc_answercon_string~headersoutputainlwt_return_ok_responseresponse|`OkChunk_asa->letoutput_seq=output_media_type.construct_seqs.types.outputinLog.lwt_debug"(%s) response code:200 (with chunk transfer\n\
\ encoding)"con_string>>=fun()->Lwt.return_ok(`Expert(Handlers.handle_rpc_answer_chunk~headersoutput_seqa))|`OkStreamo->letoutput=output_media_type.constructs.types.outputinletbody=create_streamserverconoutputoinletencoding=Transfer.ChunkedinLog.lwt_debug"(%s) response code:200 (streamed)"con_string>>=fun()->lwt_return_ok_response(Response.make~status:`OK~encoding~headers(),Cohttp_lwt.Body.of_streambody)|(`Unauthorized_|`Forbidden_|`Gone_|`Not_found_|`Conflict_|`Error_)asa->leterror=function|None->Log.log_info"(%s) response body (empty)"con_string;(Cohttp_lwt.Body.empty,Transfer.Fixed0L)|Somee->lets=output_media_type.constructs.types.erroreinLog.log_info"(%s) response body: %s"con_strings;(Cohttp_lwt.Body.of_strings,Transfer.Fixed(Int64.of_int(String.lengths)))inletresponse=Handlers.handle_rpc_answer_errorcon_string~headerserrorainlwt_return_ok_responseresponse)|`HEAD->(* TODO ??? *)Lwt.return_error`Not_implemented|`OPTIONS->(Handlers.handle_optionsserver.rootserver.corsreq_headerspath>>=funres->Log.lwt_log_info"(%s) RPC preflight"con_string>>=fun()->matchreswith|Okres->lwt_return_ok_responseres|Error_ase->Lwt.returne)|_->Lwt.return_error`Not_implemented)>>=function|Okanswer->Lwt.returnanswer|Errorerr->lwt_return_response@@Handlers.handle_errorserver.mediaserr(* Promise a running RPC server. *)letlaunch?(host="::")?(cors=Cors.default)?(agent=Agent.default_agent)?(acl=Acl.Allow_all{except=[]})~media_typesmoderoot=letdefault_media_type=Media.default_media_typemedia_typesinlet(stop,stopper)=Lwt.wait()inletmedias:Media.medias={media_types;default_media_type}inletserver={root;streams=ConnectionMap.empty;cors;medias;stopper;acl;agent;worker=Lwt.return_unit;}inConduit_lwt_unix.init~src:host()>>=functx->letctx=Cohttp_lwt_unix.Net.init~ctx()inserver.worker<-(letconn_closed(_,con)=letcon_string=Connection.to_stringconinLog.debug"connection closed %s"con_string;tryConnectionMap.findconserver.streams()withNot_found->()andon_exn=function|Unix.Unix_error(Unix.EADDRINUSE,"bind",_)->Log.log_error"RPC server port already taken, the node will be shutdown";exit1|Unix.Unix_error(ECONNRESET,_,_)|Unix.Unix_error(EPIPE,_,_)->()|exn->Format.eprintf"@[<v 2>Uncaught (asynchronous) exception:@ %s@ %s@]%!"(Printexc.to_stringexn)(Printexc.get_backtrace())andcallback(io,con)reqbody=Lwt.catch(fun()->callbackserver(io,con)reqbody)(function|Not_found->letstatus=`Not_foundinletbody=Cohttp_lwt.Body.emptyinlwt_return_response(Response.make~status(),body)|exn->letheaders=Header.init()inletheaders=Header.addheaders"content-type""text/ocaml.exception"inletstatus=`Internal_server_errorinletbody=Cohttp_lwt.Body.of_string(Printexc.to_stringexn)inlwt_return_response(Response.make~status~headers(),body))inCohttp_lwt_unix.Server.create~stop~ctx~mode~on_exn(Cohttp_lwt_unix.Server.make_response_action~callback~conn_closed()));Log.lwt_log_info"Server started (agent: %s)"server.agent>>=fun()->Lwt.returnserverletshutdownserver=Lwt.wakeup_laterserver.stopper();server.worker>>=fun()->ConnectionMap.iter(fun_f->f())server.streams;Lwt.return_unitletset_aclserveracl=server.acl<-aclend