1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
# 1 "src/lib/eliom_bus.server.ml"
module Ecb = Eliom_comet_base
type ('a, 'b) t =
{ stream : 'b Lwt_stream.t
; scope : Eliom_comet.Channel.comet_scope
; name : string option
; channel : 'b Eliom_comet.Channel.t option
; write : 'a -> unit Lwt.t
; service : 'a Ecb.bus_send_service
; service_registered : bool Eliom_state.volatile_table option
; size : int option
; bus_mark : ('a, 'b) t Eliom_common.wrapper }
let register_sender scope service write =
Eliom_registration.Action.register ~scope ~options:`NoReload ~service
(fun () x -> Lwt_list.iter_s write x)
let internal_wrap (bus : ('a, 'b) t)
: ('a, 'b) Ecb.wrapped_bus * Eliom_common.unwrapper
=
let channel =
match bus.channel with
| None ->
Eliom_comet.Channel.create ~scope:bus.scope ?name:bus.name
?size:bus.size
(Lwt_stream.clone bus.stream)
| Some c -> c
in
(match bus.service_registered with
| None -> ()
| Some table -> (
match Eliom_state.get_volatile_data ~table () with
| Eliom_state.Data true -> ()
| _ ->
let {service = Ecb.Bus_send_service srv} = bus in
register_sender bus.scope
(srv
:> ( _
, _ list
, _
, _
, _
, Eliom_service.non_ext
, _
, _
, _
, _
, _ )
Eliom_service.t)
bus.write;
Eliom_state.set_volatile_data ~table true));
( (Eliom_comet.Channel.get_wrapped channel, bus.service)
, Eliom_common.make_unwrapper Eliom_common.bus_unwrap_id )
let bus_mark () = Eliom_common.make_wrapper internal_wrap
let deriving_to_list : 'a Deriving_Json.t -> 'a list Deriving_Json.t =
fun (type typ) typ ->
let (typ_list : typ list Deriving_Json.t) =
let module M = Deriving_Json.Json_list (Deriving_Json.Defaults'' (struct
type a = typ
let t = typ
end))
in
M.t
in
typ_list
let create_filtered ?scope ?name ?size ~filter typ =
let stream, push = Lwt_stream.create () in
let push x =
let%lwt y = filter x in
push (Some y); Lwt.return_unit
in
let scope =
match scope with
| None | Some `Site -> `Site
| Some (`Client_process n) -> `Client_process n
in
let channel =
match scope with
| `Site ->
Some
(Eliom_comet.Channel.create ~scope ?name ?size
(Lwt_stream.clone stream))
| `Client_process _ -> None
in
let typ_list = deriving_to_list typ in
let post_params =
(Eliom_parameter.ocaml "bus_write" typ_list
: ('a, 'aa, 'aaa) Eliom_parameter.params_type)
in
let distant_write =
Eliom_service.create ?name
~meth:(Eliom_service.Post (Eliom_parameter.unit, post_params))
~path:Eliom_service.No_path ()
in
let service_registered =
match scope with
| `Site ->
register_sender scope distant_write push;
None
| `Client_process _ as scope ->
Some (Eliom_state.create_volatile_table ~scope ())
in
let bus =
{ stream
; channel
; scope
; name
; write = push
; service = Eliom_comet_base.Bus_send_service distant_write
; service_registered
; bus_mark = bus_mark ()
; size }
in
bus
let create ?scope ?name ?size typ =
create_filtered ~filter:Lwt.return ?scope ?name ?size typ
let stream bus =
match bus.scope with `Site -> bus.stream | `Client_process _ -> bus.stream
let write bus x = bus.write x