1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
type compression_t = [ `None | `Packing ]
module FramingError = struct
type t =
| Incomplete (** less than a full frame is available *)
| Unsupported (** frame header describes a segment count or segment size that
is too large for the implementation *)
end
module UncompStream = struct
type incomplete_frame_t = {
frame_header : string;
complete_segments : string Res.Array.t;
}
type decoder_state_t =
| IncompleteFrame of incomplete_frame_t
type t = {
fragment_buffer : FragmentBuffer.t;
(** Primary storage for incoming stream segments. *)
mutable decoder_state : decoder_state_t;
(** Partially-decoded frame information *)
}
let empty () = {
fragment_buffer = FragmentBuffer.empty ();
decoder_state = IncompleteHeader;
}
let add_fragment stream fragment =
FragmentBuffer.add_fragment stream.fragment_buffer fragment
let bytes_available stream =
match stream.decoder_state with
| IncompleteHeader ->
FragmentBuffer.byte_count stream.fragment_buffer
| IncompleteFrame partial_frame ->
(String.length partial_frame.frame_header) +
(Res.Array.fold_left (fun acc x -> acc + (String.length x))
0
partial_frame.complete_segments) +
(FragmentBuffer.byte_count stream.fragment_buffer)
let is_empty stream =
match stream.decoder_state with
| IncompleteHeader ->
FragmentBuffer.byte_count stream.fragment_buffer = 0
| _ ->
false
let rec get_next_frame stream =
match stream.decoder_state with
| IncompleteHeader -> unpack_header stream
| IncompleteFrame incomplete_frame ->
unpack_frame stream incomplete_frame
and stream =
match FragmentBuffer.peek_exact stream.fragment_buffer 4 with
| Some ->
begin try
let segment_count =
let = Bytes.unsafe_of_string partial_header in
Util.int_of_uint32_exn (BytesStorage.get_uint32 bytes_header 0)
in
let () =
if segment_count > (max_int / 4) - 2 then
Util.out_of_int_range "Uint32.to_int"
in
let segment_count = segment_count + 1 in
let =
let word_size = 8 in
(Util.ceil_ratio (4 * (segment_count + 1)) word_size) * word_size
in
begin match FragmentBuffer.remove_exact stream.fragment_buffer
frame_header_size with
| Some ->
let () = stream.decoder_state <- IncompleteFrame {
frame_header;
complete_segments = Res.Array.empty ();
}
in
get_next_frame stream
| None ->
Result.Error FramingError.Incomplete
end
with Util.Out_of_int_range _ ->
Result.Error FramingError.Unsupported
end
| None ->
Result.Error FramingError.Incomplete
and unpack_frame stream incomplete_frame =
let =
Bytes.unsafe_of_string incomplete_frame.frame_header
in
let segment_count_u32 = BytesStorage.get_uint32 frame_header_bytes 0 in
let segment_count = 1 + (Util.int_of_uint32_exn segment_count_u32) in
let segments_decoded = Res.Array.length incomplete_frame.complete_segments in
if segments_decoded = segment_count then
let () = stream.decoder_state <- IncompleteHeader in
let string_segments = Res.Array.to_list incomplete_frame.complete_segments in
let bytes_segments = ListLabels.map string_segments ~f:Bytes.unsafe_of_string in
Result.Ok (Message.BytesMessage.Message.of_storage bytes_segments)
else
let () = assert (segments_decoded < segment_count) in
let segment_size_words_u32 = BytesStorage.get_uint32
frame_header_bytes (4 + (4 * segments_decoded))
in
begin try
let segment_size = 8 * (Util.int_of_uint32_exn segment_size_words_u32) in
begin match FragmentBuffer.remove_exact stream.fragment_buffer
segment_size with
| Some segment ->
let () = Res.Array.add_one incomplete_frame.complete_segments segment in
unpack_frame stream incomplete_frame
| None ->
Result.Error FramingError.Incomplete
end
with Invalid_argument _ ->
Result.Error FramingError.Unsupported
end
end
module PackedStream = struct
type t = {
packed : FragmentBuffer.t;
(** Packed fragments waiting to be unpacked *)
unpacked : UncompStream.t;
(** Unpacked fragments waiting to be decoded as messages *)
}
let empty () = {
packed = FragmentBuffer.empty ();
unpacked = UncompStream.empty ();
}
let add_fragment stream fragment =
FragmentBuffer.add_fragment stream.packed fragment
let bytes_available stream =
(FragmentBuffer.byte_count stream.packed) +
(UncompStream.bytes_available stream.unpacked)
let is_empty stream =
(FragmentBuffer.byte_count stream.packed = 0) &&
(UncompStream.is_empty stream.unpacked)
let get_next_frame stream =
let () = Packing.unpack ~packed:stream.packed
~unpacked:stream.unpacked.UncompStream.fragment_buffer
in
UncompStream.get_next_frame stream.unpacked
end
module FramedStream = struct
type t =
| NoPack of UncompStream.t
| Pack of PackedStream.t
let empty compression =
match compression with
| `None -> NoPack (UncompStream.empty ())
| `Packing -> Pack (PackedStream.empty ())
let of_string ~compression s =
match compression with
| `None ->
let stream = UncompStream.empty () in
let () = UncompStream.add_fragment stream s in
NoPack stream
| `Packing ->
let stream = PackedStream.empty () in
let () = PackedStream.add_fragment stream s in
Pack stream
let add_fragment stream fragment =
match stream with
| NoPack stream' -> UncompStream.add_fragment stream' fragment
| Pack stream' -> PackedStream.add_fragment stream' fragment
let bytes_available stream =
match stream with
| NoPack stream' -> UncompStream.bytes_available stream'
| Pack stream' -> PackedStream.bytes_available stream'
let is_empty stream =
match stream with
| NoPack stream' -> UncompStream.is_empty stream'
| Pack stream' -> PackedStream.is_empty stream'
let get_next_frame stream =
match stream with
| NoPack stream' -> UncompStream.get_next_frame stream'
| Pack stream' -> PackedStream.get_next_frame stream'
end
let segment_descrs : string =
let buf = Buffer.create 8 in
let () = ListLabels.iter segment_descrs ~f:(fun descr ->
let size_buf = Bytes.create 4 in
let seg_len = descr.Message.BytesMessage.Message.bytes_consumed in
let () = assert ((seg_len mod 8) = 0) in
let seg_word_count = seg_len / 8 in
let () = BytesStorage.set_uint32 size_buf 0
(Util.uint32_of_int_exn seg_word_count)
in
Buffer.add_string buf (Bytes.unsafe_to_string size_buf))
in
let segment_sizes = Buffer.contents buf in
let segment_count = (String.length segment_sizes) / 4 in
if segment_count = 0 then
invalid_arg "make_header requires nonempty message"
else
let count_buf = Bytes.create 4 in
let () = BytesStorage.set_uint32 count_buf 0
(Util.uint32_of_int_exn (segment_count - 1))
in
let count_buf = Bytes.unsafe_to_string count_buf in
if segment_count mod 2 = 0 then
count_buf ^ segment_sizes ^ (String.make 4 '\x00')
else
count_buf ^ segment_sizes
let rec serialize_fold message ~compression ~init ~f =
let segment_descrs = Message.BytesMessage.Message.to_storage message in
match compression with
| `None ->
let = make_header segment_descrs in
ListLabels.fold_left segment_descrs ~init:(f init header) ~f:(fun acc descr ->
let open Message.BytesMessage in
let seg =
if descr.Message.bytes_consumed = Bytes.length descr.Message.segment then
descr.Message.segment
else
Bytes.sub descr.Message.segment 0 descr.Message.bytes_consumed
in
f acc (Bytes.unsafe_to_string seg))
| `Packing ->
serialize_fold message ~compression:`None ~init
~f:(fun acc unpacked_fragment ->
f acc (Packing.pack_string unpacked_fragment))
let serialize_iter message ~compression ~f =
serialize_fold message ~compression ~init:() ~f:(fun () s -> f s)
let serialize_fold_copyless message ~compression ~init ~f =
let segment_descrs = Message.BytesMessage.Message.to_storage message in
match compression with
| `None ->
let = make_header segment_descrs in
ListLabels.fold_left segment_descrs ~init:(f init header (String.length header)) ~f:(fun acc descr ->
let open Message.BytesMessage in
f acc (Bytes.unsafe_to_string descr.Message.segment) descr.Message.bytes_consumed)
| `Packing ->
serialize_fold message ~compression:`None ~init
~f:(fun acc unpacked_fragment ->
let packed_string = Packing.pack_string unpacked_fragment in
f acc packed_string (String.length packed_string))
let serialize_iter_copyless message ~compression ~f =
serialize_fold_copyless message ~compression ~init:() ~f:(fun () s -> f s)
let rec serialize ~compression message =
match compression with
| `None ->
let segment_descrs = Message.BytesMessage.Message.to_storage message in
let = make_header segment_descrs in
let = String.length header in
let segments_size = Message.BytesMessage.Message.total_size message in
let total_size = header_size + segments_size in
let buf = Bytes.create total_size in
Bytes.blit
(Bytes.unsafe_of_string header) 0
buf 0
header_size;
let (_ : int) = ListLabels.fold_left segment_descrs ~init:header_size
~f:(fun pos descr ->
let open Message.BytesMessage in
Bytes.blit
descr.Message.segment 0
buf pos
descr.Message.bytes_consumed;
pos + descr.Message.bytes_consumed)
in
Bytes.unsafe_to_string buf
| `Packing ->
Packing.pack_string (serialize ~compression:`None message)