Source file versioned_plain_rpc.ml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
open! Core
open! Async_kernel
open! Import
include Versioned_plain_rpc_intf
module Caller_converts = struct
module type S = Caller_converts
module Make (Model : sig
val name : string
type query
type response
end) =
struct
let name = Model.name
type dispatch_fun =
Rpc.Connection.t -> Model.query -> Model.response Or_error.t Deferred.Or_error.t
let registry : dispatch_fun Callers_rpc_version_table.t =
Callers_rpc_version_table.create ~rpc_name:name
;;
let dispatch_multi' query =
let conn = Versioned_rpc.Connection_with_menu.connection conn_with_menu in
let = Versioned_rpc.Connection_with_menu.menu conn_with_menu in
match Callers_rpc_version_table.lookup_most_recent registry ~callee_menu:menu with
| Error e -> return (Error e)
| Ok dispatch' -> dispatch' conn query
;;
let dispatch_multi query =
dispatch_multi' conn_with_menu query |> Deferred.map ~f:Or_error.join
;;
module Register (Version : sig
type query [@@deriving bin_io]
type response
module Response : Main.S_rpc with type t = response
val version : int
val query_of_model : Model.query -> query
val model_of_response : response -> Model.response
val client_pushes_back : bool
end) =
struct
include Plain_rpc.Make (struct
let name = name
include Version
end)
let version = Version.version
let dispatch' conn query =
let open Deferred.Or_error.Let_syntax in
let query = Version.query_of_model query in
let%bind server_response = Plain_rpc.dispatch' rpc conn query in
Or_error.map server_response ~f:Version.model_of_response |> return
;;
let () = Callers_rpc_version_table.add_exn registry ~version dispatch'
end
end
end
module Callee_converts = struct
module type S = Callee_converts
module Make (Model : sig
val name : string
type query
type response
end) =
struct
let name = Model.name
type implementation =
{ implement :
's.
?on_exception:Rpc.On_exception.t
-> ('s -> version:int -> Model.query -> Model.response Deferred.Or_error.t)
-> 's Rpc.Implementation.t
}
let registry : implementation Callers_rpc_version_table.t =
Callers_rpc_version_table.create ~rpc_name:name
;;
let implement_multi ?on_exception f =
List.map (Callers_rpc_version_table.data registry) ~f:(fun { implement } ->
implement ?on_exception f)
;;
module Register (Version : sig
type query [@@deriving bin_io]
type response
module Response : Main.S_rpc with type t = response
val version : int
val model_of_query : query -> Model.query
val response_of_model : Model.response -> response
val client_pushes_back : bool
end) =
struct
include Plain_rpc.Make (struct
let name = name
include Version
end)
let version = Version.version
let implement
(type s)
?on_exception
(f : s -> version:int -> Model.query -> Model.response Deferred.Or_error.t)
=
Plain_rpc.implement ?on_exception rpc (fun conn_state query ->
let open Deferred.Or_error.Let_syntax in
let query = Version.model_of_query query in
let%bind response = f ~version conn_state query in
return (Version.response_of_model response))
;;
let () = Callers_rpc_version_table.add_exn registry ~version { implement }
end
end
end
module Both_convert = struct
module type S = Both_convert
module Make (Model : sig
val name : string
module Caller : sig
type query
type response
end
module Callee : sig
type query
type response
end
end) =
struct
let name = Model.name
module Caller = Caller_converts.Make (struct
let name = name
include Model.Caller
end)
module Callee = Callee_converts.Make (struct
let name = name
include Model.Callee
end)
module Register (Version : sig
val version : int
type query [@@deriving bin_io]
type response
module Response : Main.S_rpc with type t = response
val query_of_caller_model : Model.Caller.query -> query
val callee_model_of_query : query -> Model.Callee.query
val response_of_callee_model : Model.Callee.response -> response
val caller_model_of_response : response -> Model.Caller.response
val client_pushes_back : bool
end) =
struct
include Callee.Register (struct
include Version
let model_of_query = callee_model_of_query
let response_of_model = response_of_callee_model
end)
include Caller.Register (struct
include Version
let query_of_model = query_of_caller_model
let model_of_response = caller_model_of_response
end)
end
let dispatch_multi = Caller.dispatch_multi
let dispatch_multi' = Caller.dispatch_multi'
let implement_multi = Callee.implement_multi
end
end