1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
open Lwt.Infix
open Protocol_j
type content =
| Connect_request
| Comm_info_request of comm_info_request
| Kernel_info_request
| Shutdown_request of shutdown
| Interrupt_request
| Execute_request of execute_request
| Inspect_request of inspect_request
| Complete_request of complete_request
| Is_complete_request of is_complete_request
| History_request of history_request
| Connect_reply of connect_reply
| Comm_info_reply
| Kernel_info_reply of kernel_info_reply
| Shutdown_reply of shutdown
| Interrupt_reply of interrupt_reply
| Execute_reply of execute_reply
| Inspect_reply of inspect_reply
| Complete_reply of complete_reply
| Is_complete_reply of is_complete_reply
| History_reply of history_reply
| Status of status
| Execute_input of pyin
| Execute_result of pyout
| Execute_error of execute_error
| Stream of stream
| Clear of clear_output
| Display_data of display_data
| Comm_open
let content_of_json hdr c =
match hdr.msg_type with
| "connect_request" -> Connect_request
| "comm_info_request" -> Comm_info_request(comm_info_request_of_string c)
| "kernel_info_request" -> Kernel_info_request
| "shutdown_request" -> Shutdown_request(shutdown_of_string c)
| "execute_request" -> Execute_request(execute_request_of_string c)
| "interrupt_request" -> Interrupt_request
| "inspect_request" -> Inspect_request (inspect_request_of_string c)
| "complete_request" -> Complete_request(complete_request_of_string c)
| "is_complete_request" -> Is_complete_request (is_complete_request_of_string c)
| "history_request" -> History_request(history_request_of_string c)
| "connect_reply" -> Connect_reply(connect_reply_of_string c)
| "comm_info_reply" -> Comm_info_reply
| "kernel_info_reply" -> Kernel_info_reply(kernel_info_reply_of_string c)
| "shutdown_reply" -> Shutdown_reply(shutdown_of_string c)
| "interrupt_reply" -> Interrupt_reply (interrupt_reply_of_string c)
| "execute_reply" -> Execute_reply(execute_reply_of_string c)
| "inspect_reply" -> Inspect_reply (inspect_reply_of_string c)
| "complete_reply" -> Complete_reply(complete_reply_of_string c)
| "history_reply" -> History_reply(history_reply_of_string c)
| "status" -> Status(status_of_string c)
| "execute_input" -> Execute_input (pyin_of_string c)
| "stream" -> Stream(stream_of_string c)
| "display_data" -> Display_data(display_data_of_string c)
| "clear_output" -> Clear(clear_output_of_string c)
| "comm_open" -> Comm_open
| _ -> failwith ("content_of_json: " ^ hdr.msg_type)
let json_of_content = function
| Connect_request -> "{}"
| Comm_info_request(x) -> string_of_comm_info_request x
| Kernel_info_request -> "{}"
| Shutdown_request(x) -> string_of_shutdown x
| Interrupt_request -> "{}"
| Execute_request(x) -> string_of_execute_request x
| Inspect_request (x) -> string_of_inspect_request x
| Complete_request(x) -> string_of_complete_request x
| Is_complete_request x -> string_of_is_complete_request x
| History_request(x) -> string_of_history_request x
| Connect_reply(x) -> string_of_connect_reply x
| Comm_info_reply -> "{\"comms\": {}}"
| Kernel_info_reply(x) -> string_of_kernel_info_reply x
| Shutdown_reply(x) -> string_of_shutdown x
| Interrupt_reply x -> string_of_interrupt_reply x
| Execute_reply(x) -> string_of_execute_reply x
| Inspect_reply (x) -> string_of_inspect_reply x
| Complete_reply(x) -> string_of_complete_reply x
| Is_complete_reply(x) -> string_of_is_complete_reply x
| History_reply(x) -> string_of_history_reply x
| Status(x) -> string_of_status x
| Execute_input(x) -> string_of_pyin x
| Execute_result (x) -> string_of_pyout x
| Execute_error e -> string_of_execute_error e
| Stream(x) -> string_of_stream x
| Clear(x) -> string_of_clear_output x
| Display_data(x) -> string_of_display_data x
| Comm_open -> "{}"
let msg_type_of_content = function
| Connect_request -> "connect_request"
| Comm_info_request(_) -> "comm_info_request"
| Kernel_info_request -> "kernel_info_request"
| Shutdown_request(_) -> "shutdown_request"
| Interrupt_request -> "interrupt_request"
| Execute_request(_) -> "execute_request"
| Inspect_request (_) -> "inspect_request"
| Complete_request(_) -> "complete_request"
| Is_complete_request _ -> "is_complete_request"
| History_request(_) -> "history_request"
| Connect_reply(_) -> "connect_reply"
| Comm_info_reply -> "comm_info_reply"
| Kernel_info_reply(_) -> "kernel_info_reply"
| Shutdown_reply(_) -> "shutdown_reply"
| Interrupt_reply _ -> "interrupt_reply"
| Execute_reply(_) -> "execute_reply"
| Inspect_reply (_) -> "inspect_reply"
| Complete_reply(_) -> "complete_reply"
| Is_complete_reply _ -> "is_complete_reply"
| History_reply(_) -> "history_reply"
| Status(_) -> "status"
| Execute_input (_) -> "execute_input"
| Execute_result(_) -> "execute_result"
| Execute_error _ -> "error"
| Stream(_) -> "stream"
| Clear(_) -> "clear_output"
| Display_data(_) -> "display_data"
| Comm_open -> "comm_open"
type t = {
ids : string array;
hmac : string;
header : header_info;
parent : header_info;
meta : string;
content : content;
raw : string array;
}
let rec wrap_retry f s =
Lwt.catch (fun () -> f s)
(function
| Lwt_unix.Retry
| Unix.Unix_error (Unix.EAGAIN, _, _) -> wrap_retry f s
| e -> Lwt.fail e)
let log prefix msg =
Log.debug (fun k->k "message %s: [%s]" prefix (String.concat";" @@ Array.to_list msg.ids));
Log.debug (fun k->k "<IDS|MSG>");
Log.debug (fun k->k " HMAC: %s" msg.hmac);
Log.debug (fun k->k " header: %s" (string_of_header_info msg.header));
Log.debug (fun k->k " parent: %s" (string_of_header_info msg.parent));
Log.debug (fun k->k " content: %s" (json_of_content msg.content))
let enc_utf8 x = x
let dec_utf8 x = x
let recv socket : t Lwt.t =
wrap_retry Zmq_lwt.Socket.recv_all socket >>= fun msg ->
let msg = List.map dec_utf8 msg in
let rec split ids = function
| [] -> failwith "couldn't find <IDS|MSG> marker"
| "<IDS|MSG>" :: t -> Array.of_list (List.rev ids), Array.of_list t
| h :: t -> split (h::ids) t
in
let ids, data = split [] msg in
let len = Array.length data in
let = header_info_of_string data.(1) in
assert (len >= 5);
let msg = {
ids = ids;
hmac = data.(0);
header = header;
parent = header_info_of_string data.(2);
meta = data.(3);
content = content_of_json header data.(4);
raw = Array.init (len-5) (fun i -> data.(i+5))
} in
Lwt.return msg
let send ?key socket msg : unit Lwt.t =
let content = enc_utf8 (json_of_content msg.content) in
let = enc_utf8 (string_of_header_info msg.header) in
let parent = enc_utf8 (string_of_header_info msg.parent) in
let meta = enc_utf8 (msg.meta) in
let hmac = match key with
| None -> msg.hmac
| Some k ->
let c = fun yield -> List.iter yield [header; parent; meta; content] in
Digestif.SHA256.(hmaci_string ~key:k c |> to_hex)
in
wrap_retry (Zmq_lwt.Socket.send_all socket) (List.concat [
Array.to_list (Array.map enc_utf8 msg.ids);
[enc_utf8 "<IDS|MSG>"];
[enc_utf8 hmac];
[header];
[parent];
[meta];
[content];
Array.to_list (Array.map enc_utf8 msg.raw);
])
let mk_id () = Uuidm.(to_string (create `V4))
let make ~parent ~msg_type content = {
parent with
content;
header={
parent.header with
version = "5.0";
date = Ptime.to_rfc3339 ~space:false (Ptime_clock.now ());
msg_type;
msg_id = mk_id();
};
parent = parent.header;
}
let : header_info = {
version = ""; date=""; username=(Some ""); msg_type=""; msg_id = ""; session=(Some "");
}
let msg_type : header_info = {
empty_header with
version = "5.0";
msg_type;
msg_id = mk_id();
}
let make_first ~msg_type content = {
parent=empty_header;
header=mk_header msg_type;
content;
ids=[| msg_type |];
hmac="";
meta="{}";
raw=[||];
}