123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164(*********************************************************************************)(* Ojs-base *)(* *)(* Copyright (C) 2014-2021 INRIA. All rights reserved. *)(* *)(* This program is free software; you can redistribute it and/or modify *)(* it under the terms of the GNU General Public License as *)(* published by the Free Software Foundation, version 3 of the License. *)(* *)(* This program is distributed in the hope that it will be useful, *)(* but WITHOUT ANY WARRANTY; without even the implied warranty of *)(* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *)(* GNU Library General Public License for more details. *)(* *)(* You should have received a copy of the GNU General Public *)(* License along with this program; if not, write to the Free Software *)(* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA *)(* 02111-1307 USA *)(* *)(* As a special exception, you have permission to link this program *)(* with the OCaml compiler and distribute executables, as long as you *)(* follow the requirements of the GNU GPL in regard to all of the *)(* software in the executable aside from the OCaml compiler. *)(* *)(* Contact: Maxence.Guesdon@inria.fr *)(* *)(*********************************************************************************)let(>>=)=Lwt.bindmoduleJ=Yojson.SafemoduleSMap=Map.Make(String)moduleWs=Websocket_lwt_unixletmk_msg_of_wsdataclient_msg_of_yojson=funs->tryletjson=J.from_stringsinmatchclient_msg_of_yojsonjsonwithErrors->raise(Yojson.Json_errors)|Okmsg->SomemsgwithYojson.Json_errors->prerr_endlines;None|e->prerr_endline(Printexc.to_stringe);Noneletmk_send_msgwsdata_of_msgpush=funmsg->letwsdata=wsdata_of_msgmsginletframe=Websocket.Frame.create~content:wsdata()inpushframeletmk_msg_streammsg_of_wsdata=letfframe=matchWebsocket.Frame.(frame.content,frame.opcode)with|content,Websocket.Frame.Opcode.Text->msg_of_wsdatacontent|_->NoneinLwt_stream.filter_mapflethandle_messagesmsg_of_wsdatawsdata_of_msghandle_messagestreampush=letpush_msg=mk_send_msgwsdata_of_msgpushinletfmsg=tryhandle_messagepush_msgwithe->Lwt.return(prerr_endline(Printexc.to_stringe))inLwt.catch(fun_->Lwt_stream.iter_sf(mk_msg_streammsg_of_wsdatastream))(fun_->Lwt.return_unit)moduletypeP=sigincludeOjs_base.Rpc.Bvalmsg_of_wsdata:string->app_client_msgoptionvalwsdata_of_msg:app_server_msg->stringendmoduletypeS=sigmoduleRpc:Ojs_base.Rpc.Sclassconnection_group:objectvalmutableconnections:((Rpc.app_server_msg->unitLwt.t)*Rpc.t)listvalmutablehandle_message:(Rpc.app_server_msg->unitLwt.t)->Rpc.t->Rpc.app_client_msg->unitLwt.tmethodadd_connection:Websocket.Frame.tLwt_stream.t->(Websocket.Frame.t->unitLwt.t)->unitLwt.tmethodbroadcall:Rpc.app_server_msg->(Rpc.app_client_msg->unitLwt.t)->unitLwt.tmethodbroadcast:Rpc.app_server_msg->unitLwt.tmethodhandle_message:(Rpc.app_server_msg->unitLwt.t)->Rpc.t->Rpc.app_client_msg->unitLwt.tmethodremove_connection:(Rpc.app_server_msg->unitLwt.t)->unitmethodset_handle_message:((Rpc.app_server_msg->unitLwt.t)->Rpc.t->Rpc.app_client_msg->unitLwt.t)->unitendendmoduleMake(P:P)=structmoduleRpc=Ojs_base.Rpc.Make_server(P)classconnection_group=object(self)valmutablehandle_message=(fun___->Lwt.return_unit)valmutableconnections=([]:((P.app_server_msg->unitLwt.t)*Rpc.t)list)methodremove_connectionsend=letpred(send2,_)=send<>send2inconnections<-List.filterpredconnectionsmethodadd_connectionstreampush=letsend_msg=mk_send_msgP.wsdata_of_msgpushinletrpc=Rpc.rpc_handlersend_msginconnections<-(send_msg,rpc)::connections;letstream=mk_msg_streamP.msg_of_wsdatastreaminLwt.catch(fun_->Lwt_stream.iter_s(funmsg->Lwt.catch(fun()->self#handle_messagesend_msgrpcmsg)(fune->prerr_endline(Printexc.to_stringe);Lwt.return_unit))stream)(fune->prerr_endline(Printexc.to_stringe);Lwt.return_unit)methodbroadcastmsg=letf(send,_)=Lwt.catch(fun_->sendmsg)(fun_->self#remove_connectionsend;Lwt.return_unit)inLwt_list.iter_sfconnectionsmethodbroadcall(msg:'srv)(cb:'clt->unitLwt.t)=letf(send,rpc)=Lwt.catch(fun_->Rpc.callrpcmsgcb)(fun_->self#remove_connectionsend;Lwt.return_unit)inLwt_list.iter_sfconnectionsmethodhandle_message:(P.app_server_msg->unitLwt.t)->Rpc.t->P.app_client_msg->unitLwt.t=handle_messagemethodset_handle_messagef=handle_message<-fendend