Source file websocket_handler.ml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
module Make (IO : Graphql_intf.IO) (Ws : Websocket.Connection.S with type 'a IO.t = 'a IO.t) = struct
module Json = Yojson.Basic.Util
let (>>=) = IO.bind
type t = {
conn : Ws.t;
subscriptions : (string, unit -> unit) Hashtbl.t;
}
type client_message =
| Gql_connection_init
| Gql_start of {
id : string;
query : string;
variables : (string * Graphql_parser.const_value) list option;
operation_name : string option;
}
| Gql_stop of {
id : string;
}
| Gql_connection_terminate
type server_message =
| Gql_connection_error
| Gql_connection_ack
| Gql_data
| Gql_error
| Gql_complete
let client_message_of_payload msg =
match msg |> Json.member "type" |> Json.to_string_option with
| Some "connection_init" ->
Ok Gql_connection_init
| Some "start" ->
let id = Json.(msg |> member "id" |> to_string) in
let payload = Json.member "payload" msg in
let query = Json.(payload |> member "query" |> to_string) in
let variables = Json.(payload |> member "variables" |> to_option to_assoc) in
let variables = (variables :> (string * Graphql_parser.const_value) list option) in
let operation_name = Json.(payload |> member "operationName" |> to_string_option)
in
Ok (Gql_start { id; query; variables; operation_name })
| Some "stop" ->
let id = Json.(member "id" msg |> to_string) in
Ok (Gql_stop { id })
| Some "connection_terminate" ->
Ok Gql_connection_terminate
| Some typ ->
Error (Format.sprintf "Unknown message type `%s`" typ)
| None ->
Error (Format.sprintf "Missing message type")
let server_message_to_string = function
| Gql_connection_error -> "connection_error"
| Gql_connection_ack -> "connection_ack"
| Gql_data -> "data"
| Gql_error -> "error"
| Gql_complete -> "complete"
let create_message ?(opcode=Websocket.Frame.Opcode.Text) ?id ?(payload=`Null) typ =
let frame_payload = `Assoc [
"type", `String (server_message_to_string typ);
"id", begin match id with
| Some id -> `String id
| None -> `Null end;
"payload", payload
] in
let content = Yojson.Basic.to_string frame_payload in
Websocket.Frame.create ~opcode ~content ()
let handle_frame t ~execute_query frame =
match frame.Websocket.Frame.opcode with
| Ping
| Pong
| Close
| Ctrl _
| Nonctrl _ ->
IO.return ()
| Continuation
| Text
| Binary ->
let json = Yojson.Basic.from_string frame.Websocket.Frame.content in
match client_message_of_payload json with
| Ok Gql_connection_init ->
Ws.send t.conn (create_message Gql_connection_ack);
| Ok (Gql_start { id; query; variables; operation_name }) ->
execute_query
variables
operation_name
query
>>= (function
| Error message ->
let payload = `Assoc ["message", message] in
Ws.send t.conn (create_message ~payload ~id Gql_error)
| Ok (`Response payload) ->
Ws.send t.conn (create_message ~id ~payload Gql_data)
| Ok (`Stream stream) ->
let close () = IO.Stream.close stream in
Hashtbl.add t.subscriptions id close;
IO.Stream.iter stream (fun response ->
let Ok payload | Error payload = response in
Ws.send t.conn (create_message ~id ~payload Gql_data)
) >>= fun () ->
Ws.send t.conn (create_message ~id Gql_complete)
)
| Ok (Gql_stop { id }) ->
begin try
let close = Hashtbl.find t.subscriptions id in
close ()
with Not_found -> ()
end;
IO.return ()
| Ok Gql_connection_terminate ->
Hashtbl.iter (fun _id close -> close ()) t.subscriptions;
Ws.send t.conn (create_message ~opcode:Websocket.Frame.Opcode.Close Gql_connection_error)
| Error msg ->
let id = Json.(json |> member "id" |> to_string) in
let payload = `Assoc ["message", `String msg] in
Ws.send t.conn (create_message ~id ~payload Gql_error)
let handle execute_query conn =
let subscriptions = Hashtbl.create 8 in
let t = { conn; subscriptions } in
let rec loop () =
Ws.recv conn >>= fun frame ->
handle_frame t ~execute_query frame >>= fun () ->
loop ()
in
loop ()
end