123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158# 1 "src/lib/eliom_bus.client.ml"(* Ocsigen
* http://www.ocsigen.org
* Copyright (C) 2010-2011
* Raphaël Proust
* Pierre Chambart
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published by
* the Free Software Foundation, with linking exception;
* either version 2.1 of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
*)openEliom_libletsection=Lwt_log.Section.make"eliom:bus"moduleEcb=Eliom_comet_basetype('a,'b)t={channel:'bEcb.wrapped_channel;stream:'bLwt_stream.tLazy.t;queue:'aQueue.t;mutablemax_size:int;write:'alist->unitLwt.t;mutablewaiter:unit->unitLwt.t;mutablelast_wait:unitLwt.t;mutableoriginal_stream_available:bool;error_h:'boptionLwt.t*exnLwt.u}(* clone streams such that each clone of the original stream raise the same exceptions *)letconsume(t,u)s=lett'=try%lwtLwt_stream.iter(fun_->())swithe->(matchLwt.statetwithLwt.Sleep->Lwt.wakeup_exnue|_->());Lwt.faileinLwt.choose[Lwt.bindt(fun_->Lwt.return_unit);t']letclone_exn(t,u)s=lets'=Lwt_stream.clonesinLwt_stream.from(fun()->try%lwtLwt.choose[Lwt_stream.gets';t]withe->(matchLwt.statetwithLwt.Sleep->Lwt.wakeup_exnue|_->());Lwt.faile)type('a,'att,'co,'ext,'reg)callable_bus_service=(unit,'alist,Eliom_service.post,'att,'co,'ext,'reg,[`WithoutSuffix],unit,[`Oneof'alistEliom_parameter.ocaml]Eliom_parameter.param_name,Eliom_registration.Action.return)Eliom_service.tletcreateservicechannelwaiter=letwritex=try%lwtlet%lwt_=Eliom_client.call_service~service:(service:>('a,_,_,_,_)callable_bus_service)()xinLwt.return_unitwithEliom_request.Failed_request204->Lwt.return_unitinleterror_h=lett,u=Lwt.wait()in((try%lwtlet%lwt_=tinassertfalsewithe->Lwt.faile),u)inletstream=lazy(letstream=Eliom_comet.registerchannelin(* iterate on the stream to consume messages: avoid memory leak *)let_=consumeerror_hstreaminstream)inlett={channel;stream;queue=Queue.create();max_size=20;write;waiter;last_wait=Lwt.return_unit;original_stream_available=true;error_h}in(* the comet channel start receiving after the load phase, so the
original channel (i.e. without message lost) is only available in
the first loading phase. *)let_=let%lwt()=Eliom_client.wait_load_end()int.original_stream_available<-false;Lwt.return_unitintletinternal_unwrap((wrapped_bus:('a,'b)Ecb.wrapped_bus),_unwrapper)=letwaiter()=Js_of_ocaml_lwt.Lwt_js.sleep0.05inletchannel,Eliom_comet_base.Bus_send_serviceservice=wrapped_busincreateservicechannelwaiterlet()=Eliom_unwrap.register_unwrapperEliom_common.bus_unwrap_idinternal_unwrapletstreamt=clone_exnt.error_h(Lazy.forcet.stream)letoriginal_streamt=ifEliom_client_core.in_onload()&&t.original_stream_availablethenstreamtelseLwt_log.raise_error~section"original_stream: the original stream is not available anymore"letflusht=letl=List.rev(Queue.fold(funlv->v::l)[]t.queue)inQueue.cleart.queue;t.writellettry_flusht=Lwt.cancelt.last_wait;ifQueue.lengtht.queue>=t.max_sizethenflushtelseletth=Lwt.protected(t.waiter())int.last_wait<-th;let_=th>>=fun()->flushtinLwt.return_unitletwritetv=Queue.addvt.queue;try_flushtletclose{channel;_}=Eliom_comet.closechannelletset_queue_sizebs=b.max_size<-sletset_time_before_flushbt=b.waiter<-(ift<=0.thenLwt.pauseelsefun()->Js_of_ocaml_lwt.Lwt_js.sleept)letforce_link=()