Source file websocket_handler.ml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
module Make
    (IO : Graphql_intf.IO)
    (Ws : Graphql_websocket.Connection.S with type 'a IO.t = 'a IO.t) =
struct
  module Json = Yojson.Basic.Util

  let ( >>= ) = IO.bind

  type t = { conn : Ws.t; subscriptions : (string, unit -> unit) Hashtbl.t }

  type client_message =
    | Gql_connection_init
    | Gql_start of {
        id : string;
        query : string;
        variables : (string * Graphql_parser.const_value) list option;
        operation_name : string option;
      }
    | Gql_stop of { id : string }
    | Gql_connection_terminate

  type server_message =
    | Gql_connection_error
    | Gql_connection_ack
    | Gql_data
    | Gql_error
    | Gql_complete

  (* | Gql_connection_keep_alive *)

  let client_message_of_payload msg =
    match msg |> Json.member "type" |> Json.to_string_option with
    | Some "connection_init" -> Ok Gql_connection_init
    | Some "start" ->
        let id = Json.(msg |> member "id" |> to_string) in
        let payload = Json.member "payload" msg in
        let query = Json.(payload |> member "query" |> to_string) in
        let variables =
          Json.(payload |> member "variables" |> to_option to_assoc)
        in
        let variables =
          (variables :> (string * Graphql_parser.const_value) list option)
        in
        let operation_name =
          Json.(payload |> member "operationName" |> to_string_option)
        in
        Ok (Gql_start { id; query; variables; operation_name })
    | Some "stop" ->
        let id = Json.(member "id" msg |> to_string) in
        Ok (Gql_stop { id })
    | Some "connection_terminate" -> Ok Gql_connection_terminate
    | Some typ -> Error (Format.sprintf "Unknown message type `%s`" typ)
    | None -> Error (Format.sprintf "Missing message type")

  let server_message_to_string = function
    | Gql_connection_error -> "connection_error"
    | Gql_connection_ack -> "connection_ack"
    | Gql_data -> "data"
    | Gql_error -> "error"
    | Gql_complete -> "complete"

  (* | Gql_connection_keep_alive -> "ka" *)

  let create_message ?(opcode = Graphql_websocket.Frame.Opcode.Text) ?id
      ?(payload = `Null) typ =
    let frame_payload =
      `Assoc
        [
          ("type", `String (server_message_to_string typ));
          ("id", match id with Some id -> `String id | None -> `Null);
          ("payload", payload);
        ]
    in
    let content = Yojson.Basic.to_string frame_payload in
    Graphql_websocket.Frame.create ~opcode ~content ()

  let handle_frame t ~execute_query frame =
    match frame.Graphql_websocket.Frame.opcode with
    | Ping | Pong | Close | Ctrl _ | Nonctrl _ -> IO.return ()
    | Continuation | Text | Binary -> (
        let json = Yojson.Basic.from_string frame.Graphql_websocket.Frame.content in
        match client_message_of_payload json with
        | Ok Gql_connection_init ->
            Ws.send t.conn (create_message Gql_connection_ack)
        | Ok (Gql_start { id; query; variables; operation_name }) -> (
            execute_query variables operation_name query >>= function
            | Error message ->
                let payload = `Assoc [ ("message", message) ] in
                Ws.send t.conn (create_message ~payload ~id Gql_error)
            | Ok (`Response payload) ->
                Ws.send t.conn (create_message ~id ~payload Gql_data)
            | Ok (`Stream stream) ->
                let close () = IO.Stream.close stream in
                Hashtbl.add t.subscriptions id close;
                IO.Stream.iter stream (fun response ->
                    let (Ok payload | Error payload) = response in
                    Ws.send t.conn (create_message ~id ~payload Gql_data))
                >>= fun () -> Ws.send t.conn (create_message ~id Gql_complete)
            )
        | Ok (Gql_stop { id }) ->
            ( try
                let close = Hashtbl.find t.subscriptions id in
                close ()
              with Not_found -> () );
            IO.return ()
        | Ok Gql_connection_terminate ->
            Hashtbl.iter (fun _id close -> close ()) t.subscriptions;
            Ws.send t.conn
              (create_message ~opcode:Graphql_websocket.Frame.Opcode.Close
                 Gql_connection_error)
        | Error msg ->
            let id = Json.(json |> member "id" |> to_string) in
            let payload = `Assoc [ ("message", `String msg) ] in
            Ws.send t.conn (create_message ~id ~payload Gql_error) )

  let handle execute_query conn =
    let subscriptions = Hashtbl.create 8 in
    let t = { conn; subscriptions } in
    let rec loop () =
      Ws.recv conn >>= fun frame ->
      handle_frame t ~execute_query frame >>= fun () -> loop ()
    in
    loop ()
end