1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
(** *)
open Stog.Types
open Stog.Url
open Types
module J = Yojson.Safe
let (>>=) = Lwt.bind
let rec wait_forever () = Lwt_unix.sleep 1000.0 >>= wait_forever
let push_message active_cons msg (stream, push) =
Lwt.catch
(fun () ->
push msg >>= fun () ->
prerr_endline "msg pushed";
active_cons := (stream, push) :: !active_cons ;
Lwt.return_unit
)
(fun e -> prerr_endline ("push_message: "^(Printexc.to_string e)) ; Lwt.return_unit)
;;
let push_message active_cons ?push (msg : Stog_server_types.Types.server_message) =
prerr_endline
("pushing message: "^
(match msg with
Stog_server_types.Types.Update _ -> "Update"
| Errors (errs,warns) ->
String.concat "\n" ("Errors:" :: errs @ ["Warnings:"] @ warns)
));
let marshalled = Marshal.to_string msg [] in
let hex = Stog_server_types.Types.to_hex marshalled in
let msg = Websocket.Frame.create ~content: hex () in
match push with
None ->
let l = !active_cons in
active_cons := [] ;
Lwt_list.iter_s (push_message active_cons msg) l
| Some push ->
push msg
let send_errors active_cons ~errors ~warnings =
let msg = Stog_server_types.Types.Errors (errors, warnings) in
push_message active_cons msg
let send_update_message active_cons path op =
let msg = Stog_server_types.Types.Update (path, op) in
push_message active_cons msg
let handle_message read_stog current_state active_cons base_path stream push msg =
match !current_state with
None -> Lwt.fail (Failure "No state yet!")
| Some state ->
match msg with
| `Refresh ->
begin
let send_doc doc =
match doc.doc_out with
None
| Some [] -> Lwt.return_unit
| Some (xml :: _) ->
let path = Stog.Path.to_string doc.doc_path in
send_update_message active_cons path
(Stog_server_types.Types.Update_all xml)
in
try Run.refresh read_stog current_state send_doc
(fun errors -> send_errors active_cons ~errors ~warnings: [])
with _ -> Lwt.return_unit
end
| `Get path ->
try
let (_, doc) = Stog.Types.doc_by_path
state.Run.stog (Stog.Path.of_string path)
in
match doc.Stog.Types.doc_out with
None
| Some [] -> Lwt.return_unit
| Some (tree :: _) ->
let msg = Stog_server_types.Types.(
Update (path, (Update_all tree)))
in
push_message active_cons ~push msg
with e ->
Lwt.return_unit
let handle_messages read_stog current_state active_cons base_path stream push =
let f frame =
match frame.Websocket.Frame.opcode with
| Websocket.Frame.Opcode.Close ->
prerr_endline (Printf.sprintf "A Close frame came when there were %d connections" (List.length !active_cons));
active_cons := List.filter (fun (_,p) -> p != push) !active_cons ;
prerr_endline (Printf.sprintf "Now I have only %d." (List.length !active_cons));
Lwt.return_unit
| _ ->
let s = frame.Websocket.Frame.content in
match Stog_server_types.Types.client_msg_of_wsdata s with
None -> Lwt.return_unit
| Some (`Stog_msg msg) ->
handle_message read_stog current_state active_cons
base_path stream push msg
in
Lwt.catch
(fun _ -> Lwt_stream.iter_s f stream)
(fun _ -> Lwt.return_unit)
let handle_con read_stog current_state active_cons base_path client =
let recv () = Websocket_lwt_unix.Connected_client.recv client in
let push = Websocket_lwt_unix.Connected_client.send client in
prerr_endline "new connection";
let stream = Websocket_lwt_unix.mk_frame_stream recv in
active_cons := (stream, push) :: !active_cons ;
handle_messages read_stog current_state active_cons base_path stream push
;;
let run_server read_stog current_state active_cons ws_url base_path =
let host = Stog.Url.host ws_url.priv in
let port = Stog.Url.port ws_url.priv in
prerr_endline ("Setting up websocket server on host="^host^", port="^(string_of_int port));
let uri =
let u = Uri.of_string (Stog.Url.to_string ws_url.priv) in
Uri.with_scheme u (Some "http")
in
Resolver_lwt.resolve_uri ~uri Resolver_lwt_unix.system >>= fun endp ->
let ctx = Lazy.force Conduit_lwt_unix.default_ctx in
Conduit_lwt_unix.endp_to_server ~ctx endp >>= fun server ->
Websocket_lwt_unix.establish_standard_server ~ctx
~check_request:(fun _ -> true)
~mode: server
(handle_con read_stog current_state active_cons base_path)
;;
let diff_cut =
let set = List.fold_left
(fun set tag -> Stog.Types.Str_set.add tag set)
Stog.Types.Str_set.empty
[ "pre" ; "ul" ; "p" ; "svg"]
in
fun (_,tag) _ _ ->
Stog.Types.Str_set.mem (String.lowercase_ascii tag) set
let send_patch active_cons old_stog stog doc_id =
let old_doc = Stog.Types.doc old_stog doc_id in
let new_doc = Stog.Types.doc stog doc_id in
let path = Stog.Path.to_string new_doc.doc_path in
prerr_endline ("sending patch (if needed) for path="^path) ;
match old_doc.doc_out, new_doc.doc_out
with
| None, None -> Lwt.return_unit
| None, Some (t :: _) ->
let op = Stog_server_types.Types.Update_all t in
send_update_message active_cons path op
| Some _, None ->
let op = Stog_server_types.Types.Patch [] in
send_update_message active_cons path op
| Some xtmpl1, Some xtmpl2 when xtmpl1 = xtmpl2 ->
Lwt.return_unit
| Some xtmpl1, Some xtmpl2 ->
let xml1 = Xmldiff.xml_of_string (Xtmpl.Rewrite.to_string xtmpl1) in
let xml2 = Xmldiff.xml_of_string (Xtmpl.Rewrite.to_string xtmpl2) in
Lwt_preemptive.detach
(fun xml2 ->
try `Patch (Xmldiff.diff ~cut: diff_cut xml1 xml2)
with e -> `Error (Printexc.to_string e)
) xml2 >>=
(function
| `Error msg ->
send_errors active_cons ~errors: [msg] ~warnings: []
| `Patch patch ->
prerr_endline "patch computed";
let op = Stog_server_types.Types.Patch patch in
send_update_message active_cons path op;
)
| _, Some [] ->
Lwt.return_unit
;;