Source file websocket_handler.ml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
module Make (IO : Graphql_intf.IO) (Ws : Websocket.Connection.S with type 'a IO.t = 'a IO.t) = struct
  module Json = Yojson.Basic.Util

  let (>>=) = IO.bind

  type t = {
    conn : Ws.t;
    subscriptions : (string, unit -> unit) Hashtbl.t;
  }

  type client_message =
    | Gql_connection_init
    | Gql_start of {
        id : string;
        query : string;
        variables : (string * Graphql_parser.const_value) list option;
        operation_name : string option;
      }
    | Gql_stop of {
        id : string;
      }
    | Gql_connection_terminate

  type server_message =
    | Gql_connection_error
    | Gql_connection_ack
    | Gql_data
    | Gql_error
    | Gql_complete
 (* | Gql_connection_keep_alive *)

    let client_message_of_payload msg =
      match msg |> Json.member "type" |> Json.to_string_option with
      | Some "connection_init" ->
          Ok Gql_connection_init
      | Some "start" ->
          let id = Json.(msg |> member "id" |> to_string) in
          let payload = Json.member "payload" msg in
          let query = Json.(payload |> member "query" |> to_string) in
          let variables = Json.(payload |> member "variables" |> to_option to_assoc) in
          let variables = (variables :> (string * Graphql_parser.const_value) list option) in
          let operation_name = Json.(payload |> member "operationName" |> to_string_option)
          in
          Ok (Gql_start { id; query; variables; operation_name })
      | Some "stop" ->
          let id = Json.(member "id" msg |> to_string) in
          Ok (Gql_stop { id })
      | Some "connection_terminate" ->
          Ok Gql_connection_terminate
      | Some typ ->
          Error (Format.sprintf "Unknown message type `%s`" typ)
      | None ->
          Error (Format.sprintf "Missing message type")

    let server_message_to_string = function
      | Gql_connection_error -> "connection_error"
      | Gql_connection_ack -> "connection_ack"
      | Gql_data -> "data"
      | Gql_error -> "error"
      | Gql_complete -> "complete"
    (* | Gql_connection_keep_alive -> "ka" *)

    let create_message ?(opcode=Websocket.Frame.Opcode.Text) ?id ?(payload=`Null) typ =
      let frame_payload = `Assoc [
          "type", `String (server_message_to_string typ);
          "id", begin match id with
            | Some id -> `String id
            | None -> `Null end;
          "payload", payload
        ] in
      let content = Yojson.Basic.to_string frame_payload in
      Websocket.Frame.create ~opcode ~content ()

    let handle_frame t ~execute_query frame =
      match frame.Websocket.Frame.opcode with
      | Ping
      | Pong
      | Close
      | Ctrl _
      | Nonctrl _ ->
        IO.return ()
      | Continuation
      | Text
      | Binary ->
        let json = Yojson.Basic.from_string frame.Websocket.Frame.content in
        match client_message_of_payload json with
        | Ok Gql_connection_init ->
          Ws.send t.conn (create_message Gql_connection_ack);
        | Ok (Gql_start { id; query; variables; operation_name }) ->
          execute_query
            variables
            operation_name
            query
          >>= (function
          | Error message ->
            let payload = `Assoc ["message", message] in
            Ws.send t.conn (create_message ~payload ~id Gql_error)
          | Ok (`Response payload) ->
            Ws.send t.conn (create_message ~id ~payload Gql_data)
          | Ok (`Stream stream) ->
            let close () = IO.Stream.close stream in
            Hashtbl.add t.subscriptions id close;
            IO.Stream.iter stream (fun response ->
              let Ok payload | Error payload = response in
              Ws.send t.conn (create_message ~id ~payload Gql_data)
            ) >>= fun () ->
            Ws.send t.conn (create_message ~id Gql_complete)
          )
        | Ok (Gql_stop { id }) ->
          begin try
            let close = Hashtbl.find t.subscriptions id in
            close ()
          with Not_found -> ()
          end;
          IO.return ()
        | Ok Gql_connection_terminate ->
          Hashtbl.iter (fun _id close -> close ()) t.subscriptions;
          Ws.send t.conn (create_message ~opcode:Websocket.Frame.Opcode.Close Gql_connection_error)
        | Error msg ->
          let id = Json.(json |> member "id" |> to_string) in
          let payload = `Assoc ["message", `String msg] in
          Ws.send t.conn (create_message ~id ~payload Gql_error)

    let handle execute_query conn =
      let subscriptions = Hashtbl.create 8 in
      let t = { conn; subscriptions } in
      let rec loop () =
        Ws.recv conn >>= fun frame ->
        handle_frame t ~execute_query frame >>= fun () ->
        loop ()
      in
      loop ()
end