Source file websocket_handler.ml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
module Make
(IO : Graphql_intf.IO)
(Ws : Graphql_websocket.Connection.S with type 'a IO.t = 'a IO.t) =
struct
module Json = Yojson.Basic.Util
let ( >>= ) = IO.bind
type t = { conn : Ws.t; subscriptions : (string, unit -> unit) Hashtbl.t }
type client_message =
| Gql_connection_init
| Gql_start of {
id : string;
query : string;
variables : (string * Graphql_parser.const_value) list option;
operation_name : string option;
}
| Gql_stop of { id : string }
| Gql_connection_terminate
type server_message =
| Gql_connection_error
| Gql_connection_ack
| Gql_data
| Gql_error
| Gql_complete
let client_message_of_payload msg =
match msg |> Json.member "type" |> Json.to_string_option with
| Some "connection_init" -> Ok Gql_connection_init
| Some "start" ->
let id = Json.(msg |> member "id" |> to_string) in
let payload = Json.member "payload" msg in
let query = Json.(payload |> member "query" |> to_string) in
let variables =
Json.(payload |> member "variables" |> to_option to_assoc)
in
let variables =
(variables :> (string * Graphql_parser.const_value) list option)
in
let operation_name =
Json.(payload |> member "operationName" |> to_string_option)
in
Ok (Gql_start { id; query; variables; operation_name })
| Some "stop" ->
let id = Json.(member "id" msg |> to_string) in
Ok (Gql_stop { id })
| Some "connection_terminate" -> Ok Gql_connection_terminate
| Some typ -> Error (Format.sprintf "Unknown message type `%s`" typ)
| None -> Error (Format.sprintf "Missing message type")
let server_message_to_string = function
| Gql_connection_error -> "connection_error"
| Gql_connection_ack -> "connection_ack"
| Gql_data -> "data"
| Gql_error -> "error"
| Gql_complete -> "complete"
let create_message ?(opcode = Graphql_websocket.Frame.Opcode.Text) ?id
?(payload = `Null) typ =
let frame_payload =
`Assoc
[
("type", `String (server_message_to_string typ));
("id", match id with Some id -> `String id | None -> `Null);
("payload", payload);
]
in
let content = Yojson.Basic.to_string frame_payload in
Graphql_websocket.Frame.create ~opcode ~content ()
let handle_frame t ~execute_query frame =
match frame.Graphql_websocket.Frame.opcode with
| Ping | Pong | Close | Ctrl _ | Nonctrl _ -> IO.return ()
| Continuation | Text | Binary -> (
let json = Yojson.Basic.from_string frame.Graphql_websocket.Frame.content in
match client_message_of_payload json with
| Ok Gql_connection_init ->
Ws.send t.conn (create_message Gql_connection_ack)
| Ok (Gql_start { id; query; variables; operation_name }) -> (
execute_query variables operation_name query >>= function
| Error message ->
let payload = `Assoc [ ("message", message) ] in
Ws.send t.conn (create_message ~payload ~id Gql_error)
| Ok (`Response payload) ->
Ws.send t.conn (create_message ~id ~payload Gql_data)
| Ok (`Stream stream) ->
let close () = IO.Stream.close stream in
Hashtbl.add t.subscriptions id close;
IO.Stream.iter stream (fun response ->
let (Ok payload | Error payload) = response in
Ws.send t.conn (create_message ~id ~payload Gql_data))
>>= fun () -> Ws.send t.conn (create_message ~id Gql_complete)
)
| Ok (Gql_stop { id }) ->
( try
let close = Hashtbl.find t.subscriptions id in
close ()
with Not_found -> () );
IO.return ()
| Ok Gql_connection_terminate ->
Hashtbl.iter (fun _id close -> close ()) t.subscriptions;
Ws.send t.conn
(create_message ~opcode:Graphql_websocket.Frame.Opcode.Close
Gql_connection_error)
| Error msg ->
let id = Json.(json |> member "id" |> to_string) in
let payload = `Assoc [ ("message", `String msg) ] in
Ws.send t.conn (create_message ~id ~payload Gql_error) )
let handle execute_query conn =
let subscriptions = Hashtbl.create 8 in
let t = { conn; subscriptions } in
let rec loop () =
Ws.recv conn >>= fun frame ->
handle_frame t ~execute_query frame >>= fun () -> loop ()
in
loop ()
end