Source file connection.ml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
open! Core
open! Async
let grpc_recv_streaming body buffer_push =
let request_buffer = Grpc.Buffer.v () in
let on_eof () = Async.Pipe.close buffer_push in
let rec on_read buffer ~off ~len =
Grpc.Buffer.copy_from_bigstringaf ~src_off:off ~src:buffer
~dst:request_buffer ~length:len;
Grpc.Message.extract_all
(Async.Pipe.write_without_pushback buffer_push)
request_buffer;
H2.Body.Reader.schedule_read body ~on_read ~on_eof
in
H2.Body.Reader.schedule_read body ~on_read ~on_eof
let grpc_send_streaming_client body encoder_stream =
let%map () =
Async.Pipe.iter encoder_stream ~f:(fun encoder ->
let payload = Grpc.Message.make encoder in
H2.Body.Writer.write_string body payload;
return ())
in
H2.Body.Writer.close body
let grpc_send_streaming request encoder_stream status_mvar =
let body =
H2.Reqd.respond_with_streaming ~flush_headers_immediately:true request
(H2.Response.create
~headers:
(H2.Headers.of_list [ ("content-type", "application/grpc+proto") ])
`OK)
in
let%bind () =
Async.Pipe.iter encoder_stream ~f:(fun input ->
let payload = Grpc.Message.make input in
H2.Body.Writer.write_string body payload;
H2.Body.Writer.flush body (fun () -> ());
return ())
in
let%map status = Async.Mvar.take status_mvar in
H2.Reqd.schedule_trailers request
(H2.Headers.of_list
([
( "grpc-status",
string_of_int (Grpc.Status.int_of_code (Grpc.Status.code status)) );
]
@
match Grpc.Status.message status with
| None -> []
| Some message -> [ ("grpc-message", message) ]));
H2.Body.Writer.close body