1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
type buffer =
(char, Bigarray.int8_unsigned_elt, Bigarray.c_layout) Bigarray.Array1.t
type 'a promise =
'a Lwt.t
type read =
data:(buffer -> int -> int -> bool -> bool -> unit) ->
flush:(unit -> unit) ->
ping:(buffer -> int -> int -> unit) ->
pong:(buffer -> int -> int -> unit) ->
close:(int -> unit) ->
exn:(exn -> unit) ->
unit
type write =
close:(int -> unit) ->
exn:(exn -> unit) ->
(unit -> unit) ->
unit
type reader = {
read : read;
close : int -> unit;
abort : exn -> unit;
}
type writer = {
data : buffer -> int -> int -> bool -> bool -> write;
flush : write;
ping : buffer -> int -> int -> write;
pong : buffer -> int -> int -> write;
close : int -> unit;
abort : exn -> unit;
}
type stream = {
reader : reader;
writer : writer;
}
let stream reader writer =
{reader; writer}
let no_reader = {
read =
(fun ~data:_ ~flush:_ ~ping:_ ~pong:_ ~close:_ ~exn:_ ->
raise (Failure "read from a non-readable stream"));
close =
ignore;
abort =
ignore;
}
let no_writer = {
data =
(fun _buffer _offset _length _binary _fin ~close:_ ~exn:_ _ok ->
raise (Failure "write to a read-only stream"));
flush =
(fun ~close:_ ~exn:_ _ok ->
raise (Failure "flush of a read-only stream"));
ping =
(fun _buffer _offset _length ~close:_ ~exn:_ _ok ->
raise (Failure "ping on a read-only stream"));
pong =
(fun _buffer _offset _length ~close:_ ~exn:_ _ok ->
raise (Failure "pong on a read-only stream"));
close =
ignore;
abort =
ignore;
}
let reader ~read ~close ~abort = {
read;
close;
abort;
}
let null = {
reader = no_reader;
writer = no_writer;
}
let empty_reader =
reader
~read:(fun ~data:_ ~flush:_ ~ping:_ ~pong:_ ~close ~exn:_ -> close 1000)
~close:ignore
~abort:ignore
let empty = {
reader = empty_reader;
writer = no_writer;
}
let string_reader the_string =
let string_ref = ref (Some the_string) in
let exn_ref = ref None in
let read ~data ~flush:_ ~ping:_ ~pong:_ ~close ~exn =
match !exn_ref with
| Some the_exn ->
exn the_exn
| None ->
match !string_ref with
| Some stored_string ->
string_ref := None;
let length = String.length stored_string in
data
(Bigstringaf.of_string ~off:0 ~len:length stored_string)
0 length true true
| None ->
close 1000
in
let close _code =
string_ref := None
in
let abort exn =
string_ref := None;
exn_ref := Some exn
in
reader ~read ~close ~abort
let string the_string =
if String.length the_string = 0 then
empty
else
{
reader = string_reader the_string;
writer = no_writer;
}
let read stream ~data ~flush ~ping ~pong ~close ~exn =
stream.reader.read ~data ~flush ~ping ~pong ~close ~exn
let close stream code =
stream.reader.close code;
stream.writer.close code
let abort stream exn =
stream.reader.abort exn;
stream.writer.abort exn
let write stream buffer offset length binary fin ~close ~exn ok =
stream.writer.data buffer offset length binary fin ~close ~exn ok
let flush stream ~close ~exn ok =
stream.writer.flush ~close ~exn ok
let ping stream buffer offset length ~close ~exn ok =
stream.writer.ping buffer offset length ~close ~exn ok
let pong stream buffer offset length ~close ~exn ok =
stream.writer.pong buffer offset length ~close ~exn ok
type pipe = {
mutable state : [
| `Idle
| `Reader_waiting
| `Closed of int
| `Aborted of exn
];
mutable read_data_callback : buffer -> int -> int -> bool -> bool -> unit;
mutable read_flush_callback : unit -> unit;
mutable read_ping_callback : buffer -> int -> int -> unit;
mutable read_pong_callback : buffer -> int -> int -> unit;
mutable read_close_callback : int -> unit;
mutable read_abort_callback : exn -> unit;
mutable write_ok_callback : unit -> unit;
mutable write_close_callback : int -> unit;
mutable write_abort_callback : exn -> unit;
}
let dummy_read_data_callback _buffer _offset _length _binary _fin =
() [@coverage off]
let dummy_ping_pong_callback _buffer _offset _length =
() [@coverage off]
let clean_up_reader_fields pipe =
pipe.read_data_callback <- dummy_read_data_callback;
pipe.read_flush_callback <- ignore;
pipe.read_ping_callback <- dummy_ping_pong_callback;
pipe.read_pong_callback <- dummy_ping_pong_callback;
pipe.read_close_callback <- ignore;
pipe.read_abort_callback <- ignore
let clean_up_writer_fields pipe =
pipe.write_ok_callback <- ignore;
pipe.write_close_callback <- ignore;
pipe.write_abort_callback <- ignore
let pipe () =
let internal = {
state = `Idle;
read_data_callback = dummy_read_data_callback;
read_flush_callback = ignore;
read_ping_callback = dummy_ping_pong_callback;
read_pong_callback = dummy_ping_pong_callback;
read_close_callback = ignore;
read_abort_callback = ignore;
write_ok_callback = ignore;
write_close_callback = ignore;
write_abort_callback = ignore;
} in
let read ~data ~flush ~ping ~pong ~close ~exn =
match internal.state with
| `Idle ->
internal.state <- `Reader_waiting;
internal.read_data_callback <- data;
internal.read_flush_callback <- flush;
internal.read_ping_callback <- ping;
internal.read_pong_callback <- pong;
internal.read_close_callback <- close;
internal.read_abort_callback <- exn;
let write_ok_callback = internal.write_ok_callback in
clean_up_writer_fields internal;
write_ok_callback ()
| `Reader_waiting ->
raise (Failure "stream read: the previous read has not completed")
| `Closed code ->
close code
| `Aborted the_exn ->
exn the_exn
in
let rec data buffer offset length binary fin ~close ~exn ok =
match internal.state with
| `Idle ->
internal.write_ok_callback <- (fun () ->
data buffer offset length binary fin ~close ~exn ok);
internal.write_close_callback <- close;
internal.write_abort_callback <- exn
| `Reader_waiting ->
internal.state <- `Idle;
let read_data_callback = internal.read_data_callback in
clean_up_reader_fields internal;
read_data_callback buffer offset length binary fin;
ok ()
| `Closed code ->
close code
| `Aborted the_exn ->
exn the_exn
in
let rec flush ~close ~exn ok =
match internal.state with
| `Idle ->
internal.write_ok_callback <- (fun () ->
flush ~close ~exn ok);
internal.write_close_callback <- close;
internal.write_abort_callback <- exn
| `Reader_waiting ->
internal.state <- `Idle;
let read_flush_callback = internal.read_flush_callback in
clean_up_reader_fields internal;
read_flush_callback ();
ok ()
| `Closed code ->
close code
| `Aborted the_exn ->
exn the_exn
in
let rec ping buffer offset length ~close ~exn ok =
match internal.state with
| `Idle ->
internal.write_ok_callback <- (fun () ->
ping buffer offset length ~close ~exn ok);
internal.write_close_callback <- close;
internal.write_abort_callback <- exn
| `Reader_waiting ->
internal.state <- `Idle;
let read_ping_callback = internal.read_ping_callback in
clean_up_reader_fields internal;
read_ping_callback buffer offset length;
ok ()
| `Closed code ->
close code
| `Aborted the_exn ->
exn the_exn
in
let rec pong buffer offset length ~close ~exn ok =
match internal.state with
| `Idle ->
internal.write_ok_callback <- (fun () ->
pong buffer offset length ~close ~exn ok);
internal.write_close_callback <- close;
internal.write_abort_callback <- exn
| `Reader_waiting ->
internal.state <- `Idle;
let read_pong_callback = internal.read_pong_callback in
clean_up_reader_fields internal;
read_pong_callback buffer offset length;
ok ()
| `Closed code ->
close code
| `Aborted the_exn ->
exn the_exn
in
let close code =
match internal.state with
| `Idle ->
internal.state <- `Closed code;
let write_close_callback = internal.write_close_callback in
clean_up_writer_fields internal;
write_close_callback code
| `Reader_waiting ->
internal.state <- `Closed code;
let read_close_callback = internal.read_close_callback in
clean_up_reader_fields internal;
read_close_callback code
| `Closed _code ->
()
| `Aborted _the_exn ->
()
in
let abort exn =
match internal.state with
| `Idle ->
internal.state <- `Aborted exn;
let write_abort_callback = internal.write_abort_callback in
clean_up_writer_fields internal;
write_abort_callback exn
| `Reader_waiting ->
internal.state <- `Aborted exn;
let read_abort_callback = internal.read_abort_callback in
clean_up_reader_fields internal;
read_abort_callback exn
| `Closed _code ->
()
| `Aborted _the_exn ->
()
in
let reader = {
read;
close;
abort;
}
and writer = {
data;
flush;
ping;
pong;
close;
abort;
} in
(reader, writer)
let forward (reader : reader) stream =
let rec loop () =
reader.read
~data:(fun buffer offset length binary fin ->
stream.writer.data
buffer offset length
binary fin
~close:reader.close ~exn:reader.abort
loop)
~flush:(fun () ->
stream.writer.flush ~close:reader.close ~exn:reader.abort loop)
~ping:(fun buffer offset length ->
stream.writer.ping
buffer offset length ~close:reader.close ~exn:reader.abort loop)
~pong:(fun buffer offset length ->
stream.writer.pong
buffer offset length ~close:reader.close ~exn:reader.abort loop)
~close:stream.writer.close
~exn:stream.writer.abort
in
loop ()
let read_convenience stream =
let promise, resolver = Lwt.wait () in
let close _code = Lwt.wakeup_later resolver None in
let abort exn = Lwt.wakeup_later_exn resolver exn in
let rec loop () =
stream.reader.read
~data:(fun buffer offset length _binary _fin ->
Bigstringaf.sub buffer ~off:offset ~len:length
|> Bigstringaf.to_string
|> Option.some
|> Lwt.wakeup_later resolver)
~flush:loop
~ping:(fun buffer offset length ->
stream.writer.pong buffer offset length ~close ~exn:abort loop)
~pong:(fun _buffer _offset _length ->
loop ())
~close
~exn:abort
in
loop ();
promise
let read_until_close stream =
let promise, resolver = Lwt.wait () in
let length = ref 0 in
let buffer = ref (Bigstringaf.create 4096) in
let close _code =
Bigstringaf.sub !buffer ~off:0 ~len:!length
|> Bigstringaf.to_string
|> Lwt.wakeup_later resolver
in
let abort exn = Lwt.wakeup_later_exn resolver exn in
let rec loop () =
stream.reader.read
~data:(fun chunk offset chunk_length _binary _fin ->
let new_length = !length + chunk_length in
if new_length > Bigstringaf.length !buffer then begin
let new_buffer = Bigstringaf.create (new_length * 2) in
Bigstringaf.blit
!buffer ~src_off:0 new_buffer ~dst_off:0 ~len:!length;
buffer := new_buffer
end;
Bigstringaf.blit
chunk ~src_off:offset !buffer ~dst_off:!length ~len:chunk_length;
length := new_length;
loop ())
~flush:loop
~ping:(fun buffer offset length ->
stream.writer.pong buffer offset length ~close ~exn:abort loop)
~pong:(fun _buffer _offset _length ->
loop ())
~close
~exn:abort
in
loop ();
promise