1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
let make content =
let content_len = String.length content in
let payload = Bytes.create @@ (content_len + 1 + 4) in
Bytes.set payload 0 '\x00';
let length = String.length content in
Bytes.set_uint16_be payload 1 (length lsr 16);
Bytes.set_uint16_be payload 3 (length land 0xFFFF);
Bytes.blit_string content 0 payload 5 content_len;
Bytes.to_string payload
(** [extract_message buf] extracts the grpc message starting in [buf]
in the buffer if there is one *)
let buf =
if Buffer.length buf >= 5 then (
let compressed =
Buffer.get_u8 buf ~pos:0 == 1
and length =
Buffer.get_u32_be buf ~pos:1
in
if compressed then failwith "Compressed flag set but not supported";
if Buffer.length buf - 5 >= length then
Some (Buffer.sub buf ~start:5 ~length |> Buffer.to_string)
else None)
else None
(** [get_message_and_shift buf] tries to extract the first grpc message
from [buf] and if successful shifts these bytes out of the buffer *)
let get_message_and_shift buf =
let message = extract_message buf in
match message with
| None -> None
| Some message ->
let mlen = String.length message in
Buffer.shift_left buf ~by:(5 + mlen);
Some message
let buf = get_message_and_shift buf
let f buf =
let rec loop () =
match extract buf with
| None -> ()
| Some message ->
f message;
loop ()
in
loop ()