Source file ws.ml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
(*********************************************************************************)
(*                Stog                                                           *)
(*                                                                               *)
(*    Copyright (C) 2012-2024 INRIA All rights reserved.                         *)
(*    Author: Maxence Guesdon, INRIA Saclay                                      *)
(*                                                                               *)
(*    This program is free software; you can redistribute it and/or modify       *)
(*    it under the terms of the GNU General Public License as                    *)
(*    published by the Free Software Foundation, version 3 of the License.       *)
(*                                                                               *)
(*    This program is distributed in the hope that it will be useful,            *)
(*    but WITHOUT ANY WARRANTY; without even the implied warranty of             *)
(*    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the               *)
(*    GNU General Public License for more details.                               *)
(*                                                                               *)
(*    You should have received a copy of the GNU General Public                  *)
(*    License along with this program; if not, write to the Free Software        *)
(*    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA                   *)
(*    02111-1307  USA                                                            *)
(*                                                                               *)
(*    As a special exception, you have permission to link this program           *)
(*    with the OCaml compiler and distribute executables, as long as you         *)
(*    follow the requirements of the GNU GPL in regard to all of the             *)
(*    software in the executable aside from the OCaml compiler.                  *)
(*                                                                               *)
(*    Contact: Maxence.Guesdon@inria.fr                                          *)
(*                                                                               *)
(*********************************************************************************)

(** *)

open Stog.Types
open Stog.Url
open Types

module J = Yojson.Safe

let (>>=) = Lwt.bind

let rec wait_forever () = Lwt_unix.sleep 1000.0 >>= wait_forever

let push_message active_cons msg (stream, push) =
  Lwt.catch
    (fun () ->
       push msg >>= fun () ->
       prerr_endline "msg pushed";
       active_cons := (stream, push) :: !active_cons ;
       Lwt.return_unit
    )
    (fun e -> prerr_endline ("push_message: "^(Printexc.to_string e)) ; Lwt.return_unit)
;;

let push_message active_cons ?push (msg : Stog_server_types.Types.server_message) =
  (*prerr_endline ("Number of active connections: "^(string_of_int (List.length !active_cons)));*)
  prerr_endline
    ("pushing message: "^
     (match msg with
        Stog_server_types.Types.Update _ -> "Update"
      | Errors (errs,warns) ->
          String.concat "\n" ("Errors:" :: errs @ ["Warnings:"] @ warns)
     ));
  let marshalled = Marshal.to_string  msg [] in
  let hex = Stog_server_types.Types.to_hex marshalled in
  let msg = Websocket.Frame.create ~content: hex () in
  match push with
    None ->
      let l = !active_cons in
      active_cons := [] ;
      Lwt_list.iter_s (push_message active_cons msg) l
  | Some push ->
        push msg

let send_errors active_cons ~errors ~warnings =
  let msg = Stog_server_types.Types.Errors (errors, warnings) in
  push_message active_cons msg

let send_update_message active_cons path op =
  let msg = Stog_server_types.Types.Update (path, op) in
  push_message active_cons msg

let handle_message read_stog current_state active_cons base_path stream push msg =
  match !current_state with
    None -> Lwt.fail (Failure "No state yet!")
  | Some state ->
      match msg with
      | `Refresh ->
          begin
            let send_doc doc =
              match doc.doc_out with
                None
              | Some [] -> Lwt.return_unit
              | Some (xml :: _) ->
                  let path = Stog.Path.to_string doc.doc_path in
                  send_update_message active_cons path
                    (Stog_server_types.Types.Update_all xml)
            in
            try Run.refresh read_stog current_state send_doc
              (fun errors -> send_errors active_cons ~errors ~warnings: [])
            with _ -> Lwt.return_unit
          end
      | `Get path ->
          try
            let (_, doc) = Stog.Types.doc_by_path
              state.Run.stog (Stog.Path.of_string path)
            in
            match doc.Stog.Types.doc_out with
              None
            | Some [] -> Lwt.return_unit
            | Some (tree :: _) ->
                let msg = Stog_server_types.Types.(
                   Update (path, (Update_all tree)))
                in
                push_message active_cons ~push msg
          with e ->
              Lwt.return_unit

let handle_messages read_stog current_state active_cons base_path stream push =
  let f frame =
    match frame.Websocket.Frame.opcode with
    | Websocket.Frame.Opcode.Close ->
        prerr_endline (Printf.sprintf "A Close frame came when there were %d connections" (List.length !active_cons));
        active_cons := List.filter (fun (_,p) -> p != push) !active_cons ;
        prerr_endline (Printf.sprintf "Now I have only %d." (List.length !active_cons));
        Lwt.return_unit
    | _ ->
        let s = frame.Websocket.Frame.content in
        match Stog_server_types.Types.client_msg_of_wsdata s with
          None -> Lwt.return_unit
        | Some (`Stog_msg msg) ->
            handle_message read_stog current_state active_cons
              base_path stream push msg
  in
  Lwt.catch
    (fun _ -> Lwt_stream.iter_s f stream)
    (fun _ -> Lwt.return_unit)

let handle_con read_stog current_state active_cons base_path client =
  let recv () = Websocket_lwt_unix.Connected_client.recv client in
  let push = Websocket_lwt_unix.Connected_client.send client in
  prerr_endline "new connection";
  let stream = Websocket_lwt_unix.mk_frame_stream recv in
  active_cons := (stream, push) :: !active_cons ;
  handle_messages read_stog current_state active_cons base_path stream push
;;

let run_server read_stog current_state active_cons ws_url base_path =
  let host = Stog.Url.host ws_url.priv in
  let port = Stog.Url.port ws_url.priv in
  prerr_endline ("Setting up websocket server on host="^host^", port="^(string_of_int port));
  (* set scheme to http to be resolved correctly *)
  let uri =
    let u = Uri.of_string (Stog.Url.to_string ws_url.priv) in
    Uri.with_scheme u (Some "http")
  in
  Resolver_lwt.resolve_uri ~uri Resolver_lwt_unix.system >>= fun endp ->
  let ctx = Lazy.force Conduit_lwt_unix.default_ctx in
  Conduit_lwt_unix.endp_to_server ~ctx endp >>= fun server ->
    Websocket_lwt_unix.establish_standard_server ~ctx
    ~check_request:(fun _ -> true)
    ~mode: server
    (handle_con read_stog current_state active_cons base_path)
;;

let diff_cut =
  let set = List.fold_left
    (fun set tag -> Stog.Types.Str_set.add tag set)
    Stog.Types.Str_set.empty
    [ "pre" ; "ul" ; "p" ; "svg"]
  in
  fun (_,tag) _ _ ->
    Stog.Types.Str_set.mem (String.lowercase_ascii tag) set

let send_patch active_cons old_stog stog doc_id =
  let old_doc = Stog.Types.doc old_stog doc_id in
  let new_doc = Stog.Types.doc stog doc_id in
  let path = Stog.Path.to_string new_doc.doc_path in

  prerr_endline ("sending patch (if needed) for path="^path) ;

  match old_doc.doc_out, new_doc.doc_out
    (*Some [Xtmpl.xml_of_string ~add_main: false "<html><body>coucou</body></html>"] *)
  with
  | None, None -> Lwt.return_unit
  | None, Some (t :: _) ->
      (*let s = Marshal.to_string t [] in*)
      let op = Stog_server_types.Types.Update_all t in
      send_update_message active_cons path op
  | Some _, None ->
      let op = Stog_server_types.Types.Patch [] in
      send_update_message active_cons path op
  | Some xtmpl1, Some xtmpl2 when xtmpl1 = xtmpl2 ->
      Lwt.return_unit
  | Some xtmpl1, Some xtmpl2 -> (* xml1 <> xml2 *)
      let xml1 = Xmldiff.xml_of_string (Xtmpl.Rewrite.to_string xtmpl1) in
      let xml2 = Xmldiff.xml_of_string (Xtmpl.Rewrite.to_string xtmpl2) in
      Lwt_preemptive.detach
       (fun xml2 ->
           try `Patch (Xmldiff.diff ~cut: diff_cut xml1 xml2)
           with e -> `Error (Printexc.to_string e)
        ) xml2 >>=
        (function
         | `Error msg ->
             send_errors active_cons ~errors: [msg] ~warnings: []
         | `Patch patch ->
             prerr_endline "patch computed";
             let op = Stog_server_types.Types.Patch patch in
             send_update_message active_cons path op;
             (*let s = Marshal.to_string (List.hd xtmpl2) [] in
                Ocsigen_messages.warning (Printf.sprintf "sending marshalled operation, size=%d" (String.length s));
                send_update_message path
                (Stog.Server_common.Update_all s);
                *)
          )
    | _, Some [] ->
        Lwt.return_unit
;;