1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
let (>>=) = Lwt.bind
module J = Yojson.Safe
module SMap = Map.Make(String)
module Ws = Websocket_lwt_unix
let mk_msg_of_wsdata client_msg_of_yojson =
fun s ->
try
let json = J.from_string s in
match client_msg_of_yojson json with
Error s -> raise (Yojson.Json_error s)
| Ok msg -> Some msg
with
Yojson.Json_error s ->
prerr_endline s;
None
| e ->
prerr_endline (Printexc.to_string e);
None
let mk_send_msg wsdata_of_msg push =
fun msg ->
let wsdata = wsdata_of_msg msg in
let frame = Websocket.Frame.create ~content: wsdata () in
push frame
let mk_msg_stream msg_of_wsdata =
let f frame =
match Websocket.Frame.(frame.content, frame.opcode) with
| content, Websocket.Frame.Opcode.Text -> msg_of_wsdata content
| _ -> None
in
Lwt_stream.filter_map f
let handle_messages msg_of_wsdata wsdata_of_msg handle_message stream push =
let push_msg = mk_send_msg wsdata_of_msg push in
let f msg =
try handle_message push_msg
with e ->
Lwt.return (prerr_endline (Printexc.to_string e))
in
Lwt.catch
(fun _ -> Lwt_stream.iter_s f (mk_msg_stream msg_of_wsdata stream))
(fun _ -> Lwt.return_unit)
module type P =
sig
include Ojs_base.Rpc.B
val msg_of_wsdata : string -> app_client_msg option
val wsdata_of_msg : app_server_msg -> string
end
module type S = sig
module Rpc : Ojs_base.Rpc.S
class connection_group :
object
val mutable connections :
((Rpc.app_server_msg -> unit Lwt.t) * Rpc.t) list
val mutable handle_message :
(Rpc.app_server_msg -> unit Lwt.t) ->
Rpc.t -> Rpc.app_client_msg -> unit Lwt.t
method add_connection :
Websocket.Frame.t Lwt_stream.t ->
(Websocket.Frame.t -> unit Lwt.t) -> unit Lwt.t
method broadcall :
Rpc.app_server_msg ->
(Rpc.app_client_msg -> unit Lwt.t) -> unit Lwt.t
method broadcast : Rpc.app_server_msg -> unit Lwt.t
method handle_message :
(Rpc.app_server_msg -> unit Lwt.t) ->
Rpc.t -> Rpc.app_client_msg -> unit Lwt.t
method remove_connection :
(Rpc.app_server_msg -> unit Lwt.t) -> unit
method set_handle_message :
((Rpc.app_server_msg -> unit Lwt.t) ->
Rpc.t -> Rpc.app_client_msg -> unit Lwt.t) ->
unit
end
end
module Make(P:P) =
struct
module Rpc = Ojs_base.Rpc.Make_server(P)
class connection_group =
object(self)
val mutable handle_message = (fun _ _ _ -> Lwt.return_unit)
val mutable connections =
([] : ((P.app_server_msg -> unit Lwt.t) * Rpc.t) list)
method remove_connection send =
let pred (send2, _) = send <> send2 in
connections <- List.filter pred connections
method add_connection stream push =
let send_msg = mk_send_msg P.wsdata_of_msg push in
let rpc = Rpc.rpc_handler send_msg in
connections <- (send_msg, rpc) :: connections;
let stream = mk_msg_stream P.msg_of_wsdata stream in
Lwt.catch
(fun _ -> Lwt_stream.iter_s
(fun msg ->
Lwt.catch
(fun () -> self#handle_message send_msg rpc msg)
(fun e ->
prerr_endline (Printexc.to_string e);
Lwt.return_unit)
)
stream
)
(fun e ->
prerr_endline (Printexc.to_string e);
Lwt.return_unit)
method broadcast msg =
let f (send, _) =
Lwt.catch
(fun _ -> send msg)
(fun _ -> self#remove_connection send; Lwt.return_unit)
in
Lwt_list.iter_s f connections
method broadcall (msg : 'srv) (cb : 'clt -> unit Lwt.t) =
let f (send, rpc) =
Lwt.catch
(fun _ -> Rpc.call rpc msg cb)
(fun _ -> self#remove_connection send; Lwt.return_unit)
in
Lwt_list.iter_s f connections
method handle_message :
(P.app_server_msg -> unit Lwt.t) -> Rpc.t -> P.app_client_msg -> unit Lwt.t =
handle_message
method set_handle_message f = handle_message <- f
end
end