Source file smart_flow.ml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
let ( >>? ) = Lwt_result.bind
module Flow = struct
open Lwt.Infix
let blit0 src src_off dst dst_off len =
let dst = Cstruct.of_bigarray ~off:dst_off ~len dst in
Cstruct.blit src src_off dst 0 len
let blit1 src src_off dst dst_off len =
let src = Cstruct.of_bigarray ~off:src_off ~len src in
Cstruct.blit src 0 dst dst_off len
type t = {
queue: (char, Bigarray.int8_unsigned_elt) Ke.Rke.t;
flow: Mimic.flow;
}
type error = [ `Error of Mimic.error | `Write_error of Mimic.write_error ]
let pp_error ppf = function
| `Error err -> Mimic.pp_error ppf err
| `Write_error err -> Mimic.pp_write_error ppf err
let make flow = {flow; queue= Ke.Rke.create ~capacity:0x1000 Bigarray.char}
let recv flow payload =
if Ke.Rke.is_empty flow.queue then begin
Mimic.read flow.flow >|= Result.map_error (fun err -> `Error err)
>>? function
| `Eof -> Lwt.return_ok `End_of_flow
| `Data res ->
Ke.Rke.N.push flow.queue ~blit:blit0 ~length:Cstruct.length res;
let len = min (Cstruct.length payload) (Ke.Rke.length flow.queue) in
Ke.Rke.N.keep_exn flow.queue ~blit:blit1 ~length:Cstruct.length ~off:0
~len payload;
Ke.Rke.N.shift_exn flow.queue len;
Lwt.return_ok (`Input len)
end
else
let len = min (Cstruct.length payload) (Ke.Rke.length flow.queue) in
Ke.Rke.N.keep_exn flow.queue ~blit:blit1 ~length:Cstruct.length ~len
payload;
Ke.Rke.N.shift_exn flow.queue len;
Lwt.return_ok (`Input len)
let send flow payload =
Mimic.write flow.flow payload >|= function
| Error `Closed -> Error (`Write_error `Closed)
| Error err -> Error (`Write_error err)
| Ok () -> Ok (Cstruct.length payload)
end
let src = Logs.Src.create "smart.flow"
module Log = (val Logs.src_log src : Logs.LOG)
let run : Flow.t -> ('res, [ `Protocol of Smart.error ]) Smart.t -> 'res Lwt.t =
fun flow fiber ->
let ( >>= ) = Lwt.bind in
let tmp = Cstruct.create 65535 in
let failwithf fmt = Format.kasprintf (fun err -> raise (Failure err)) fmt in
let rec go = function
| Smart.Read {k; buffer; off; len; eof} -> (
let max = Int.min (Cstruct.length tmp) len in
Flow.recv flow (Cstruct.sub tmp 0 max) >>= function
| Ok `End_of_flow -> go (eof ())
| Ok (`Input len) ->
Cstruct.blit_to_bytes tmp 0 buffer off len;
Log.debug (fun m ->
m "-> @[<hov>%a@]"
(Hxd_string.pp Hxd.default)
(Bytes.sub_string buffer off len));
go (k len)
| Error err -> failwithf "%a" Flow.pp_error err)
| Smart.Write {k; buffer; off; len} ->
Log.debug (fun m ->
m "<- @[<hov>%a@]"
(Hxd_string.pp Hxd.default)
(String.sub buffer off len));
let rec loop tmp =
if Cstruct.length tmp = 0 then go (k len)
else begin
Flow.send flow tmp >>= function
| Ok shift -> loop (Cstruct.shift tmp shift)
| Error err -> failwithf "%a" Flow.pp_error err
end
in
loop (Cstruct.of_string buffer ~off ~len)
| Smart.Return v -> Lwt.return v
| Smart.Error (`Protocol err) -> failwithf "%a" Smart.pp_error err
in
go fiber