1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
module ServiceMap = Map.Make (String)
type service = H2.Reqd.t -> unit
type t = service ServiceMap.t
let v () = ServiceMap.empty
let add_service ~name ~service t = ServiceMap.add name service t
let handle_request t reqd =
let request = H2.Reqd.request reqd in
let respond_with code =
H2.Reqd.respond_with_string reqd (H2.Response.create code) ""
in
let route () =
let parts = String.split_on_char '/' request.target in
if List.length parts > 1 then
let service_name = List.nth parts (List.length parts - 2) in
let service = ServiceMap.find_opt service_name t in
match service with
| Some service -> service reqd
| None -> respond_with `Not_found
else respond_with `Not_found
in
match request.meth with
| `POST -> (
match H2.Headers.get request.headers "content-type" with
| Some s ->
if
Stringext.chop_prefix s ~prefix:"application/grpc" |> Option.is_some
then
match H2.Headers.get request.headers "grpc-encoding" with
| None | Some "identity" -> (
match H2.Headers.get request.headers "grpc-accept-encoding" with
| None -> route ()
| Some encodings ->
let encodings = String.split_on_char ',' encodings in
if List.mem "identity" encodings then route ()
else respond_with `Not_acceptable)
| Some _ ->
respond_with `Bad_request
else respond_with `Unsupported_media_type
| None -> respond_with `Unsupported_media_type)
| _ -> respond_with `Not_found
module Rpc = struct
open Lwt.Syntax
type unary = string -> (Grpc.Status.t * string option) Lwt.t
type client_streaming =
string Lwt_stream.t -> (Grpc.Status.t * string option) Lwt.t
type server_streaming = string -> (string -> unit) -> Grpc.Status.t Lwt.t
type bidirectional_streaming =
string Lwt_stream.t -> (string -> unit) -> Grpc.Status.t Lwt.t
type t =
| Unary of unary
| Client_streaming of client_streaming
| Server_streaming of server_streaming
| Bidirectional_streaming of bidirectional_streaming
let bidirectional_streaming ~(f : bidirectional_streaming) (reqd : H2.Reqd.t)
=
let decoder_stream, decoder_push = Lwt_stream.create () in
let body = H2.Reqd.request_body reqd in
Connection.grpc_recv_streaming body decoder_push;
let encoder_stream, encoder_push = Lwt_stream.create () in
let status_mvar = Lwt_mvar.create_empty () in
Lwt.async (fun () ->
Connection.grpc_send_streaming reqd encoder_stream status_mvar);
let* status =
f decoder_stream (fun encoder -> encoder_push (Some encoder))
in
encoder_push None;
Lwt_mvar.put status_mvar status
let client_streaming ~(f : client_streaming) (reqd : H2.Reqd.t) : unit Lwt.t =
bidirectional_streaming reqd ~f:(fun decoder_stream encoder_push ->
let+ (status : Grpc.Status.t), (encoder : string option) =
f decoder_stream
in
(match encoder with None -> () | Some encoder -> encoder_push encoder);
status)
let server_streaming ~f reqd =
bidirectional_streaming reqd ~f:(fun decoder_stream encoder_push ->
let* decoder = Lwt_stream.get decoder_stream in
match decoder with
| None -> Lwt.return Grpc.Status.(v OK)
| Some decoder -> f decoder encoder_push)
let unary ~f reqd =
bidirectional_streaming reqd ~f:(fun decoder_stream encoder_push ->
let* decoder = Lwt_stream.get decoder_stream in
match decoder with
| None -> Lwt.return Grpc.Status.(v OK)
| Some decoder ->
let+ status, encoder = f decoder in
(match encoder with
| None -> ()
| Some encoder -> encoder_push encoder);
status)
end
module Service = struct
module RpcMap = Map.Make (String)
type t = Rpc.t RpcMap.t
let v () = RpcMap.empty
let add_rpc ~name ~rpc t = RpcMap.add name rpc t
let handle_request (t : t) reqd =
let request = H2.Reqd.request reqd in
let respond_with code =
H2.Reqd.respond_with_string reqd (H2.Response.create code) ""
in
let parts = String.split_on_char '/' request.target in
if List.length parts > 1 then
let rpc_name = List.nth parts (List.length parts - 1) in
let rpc = RpcMap.find_opt rpc_name t in
match rpc with
| Some rpc -> (
match rpc with
| Unary f -> Lwt.async (fun () -> Rpc.unary ~f reqd)
| Client_streaming f ->
Lwt.async (fun () -> Rpc.client_streaming ~f reqd)
| Server_streaming f ->
Lwt.async (fun () -> Rpc.server_streaming ~f reqd)
| Bidirectional_streaming f ->
Lwt.async (fun () -> Rpc.bidirectional_streaming ~f reqd))
| None -> respond_with `Not_found
else respond_with `Not_found
end