1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
open Internal
type ('a, 'v, 'role) t = ('a, 'role) Internal.proxy
type ('a, 'v, 'role) proxy = ('a, 'v, 'role) t
type ('v, 'role) generic = Proxy : ('a, 'v, 'role) t -> ('v, 'role) generic
module type TRACE = sig
type role
val outbound : ('a, 'v, role) t -> ('a, [`W]) Msg.t -> unit
val inbound : ('a, 'v, role) t -> ('a, [`R]) Msg.t -> unit
end
let pp = pp_proxy
let missing_dispatch _ = failwith "no handler registered!"
type (_, _) S.user_data += Missing_handler
let missing_handler metadata = object (_ : _ handler)
method metadata = metadata
method user_data = Missing_handler
method dispatch = missing_dispatch
end
let make_proxy ~conn ~version ~handler id =
{ conn; id; version; can_send = true; can_recv = true; handler = (handler :> _ handler); on_delete = Queue.create () }
let accept_new t ~version ~handler id =
let conn = t.conn in
let is_service_allocated_id = (Int32.unsigned_compare id 0xFF000000l >= 0) in
begin match conn.role with
| `Client -> assert is_service_allocated_id
| `Server -> assert (not is_service_allocated_id)
end;
if Objects.mem id conn.objects then
Fmt.failwith "An object with ID %lu already exists!" id;
let t' = make_proxy id ~version ~conn ~handler in
conn.objects <- Objects.add id (Generic t') conn.objects;
t'
let complete_accept t handler =
assert (t.handler#user_data == Missing_handler);
t.handler <- handler
let cast_version t = (t : ('a, _, 'role) t :> ('a, _, 'role) t)
let version t = t.version
let metadata t = t.handler#metadata
let can_send t = t.can_send
let transport_up t = t.conn.transport#up
let ty (type a) t =
let (module M : Metadata.S with type t = a) = metadata t in
M.T
let interface (type a) t =
let (module M : Metadata.S with type t = a) = metadata t in
M.interface
module Handler = struct
class type ['a, 'v, 'role] t = object
method user_data : ('a, 'role) S.user_data
method metadata : (module Metadata.S with type t = 'a)
method dispatch : ('a, 'v, 'role) proxy -> ('a, [`R]) Msg.t -> unit
end
let interface (type a) (t : (_, _ ,_) #t) =
let (module M : Metadata.S with type t = a) = t#metadata in
M.interface
let cast_version t = (t :> _ t)
let accept_new (type a) proxy (module M : Metadata.S with type t = a) id =
accept_new proxy id ~version:proxy.version ~handler:(missing_handler (module M))
let attach proxy handler =
complete_accept proxy (handler :> _ handler)
end
module Service_handler = struct
class type ['a, 'v, 'role] t = object
inherit ['a, 'v, 'role] Handler.t
method min_version : int32
method max_version : int32
end
let interface t = Handler.interface t
let min_version t = t#min_version
let cast_version t = (t :> _ t)
let accept_new (type a) proxy id (module M : Metadata.S with type t = a) ~version : (a, [`Unknown], _) proxy =
accept_new proxy id ~version ~handler:(missing_handler (module M))
let attach (type a) (proxy : (a, _, _) proxy) (t:(_, _, _) #t) =
if proxy.version < t#min_version then
Fmt.invalid_arg "attach: %a has version %ld, but handler requires version >= %ld" pp proxy proxy.version t#min_version;
if proxy.version > t#max_version then
Fmt.invalid_arg "attach: %a has version %ld, but handler requires version <= %ld" pp proxy proxy.version t#max_version;
complete_accept proxy (t :> _ handler)
let attach_proxy proxy t =
attach proxy t;
proxy
end
let id t =
if t.can_send then t.id
else Fmt.invalid_arg "Attempt to use %a after destroying it" pp t
let id_opt = function
| None -> 0l
| Some t -> id t
let alloc t = Msg.alloc ~obj:t.id
let send (type a) (t:_ t) (msg : (a, [`W]) Msg.t) =
t.conn.trace.outbound t msg;
if t.can_send then
enqueue t.conn (Msg.cast msg)
else
Fmt.failwith "Attempt to use object %a after calling destructor!" pp t
let spawn_generic t ~version (handler : (_, _, _) #Handler.t) =
let conn = t.conn in
let id = get_unused_id conn in
let t' = make_proxy id ~version ~conn:t.conn ~handler:(handler :> _ handler) in
conn.objects <- Objects.add id (Generic t') conn.objects;
t'
let spawn t (handler : (_, _, _) #Handler.t) =
spawn_generic t ~version:t.version handler
let spawn_bind (t : (_, _, 'role) t) ((handler : ('a, 'v, 'role) #Service_handler.t), version) : ('a, 'v, 'role) t =
let min_version = handler#min_version in
let max_version = handler#max_version in
if version < min_version then
Fmt.failwith "Can't ask for %s version %ld when handler requires version >= %ld"
(Handler.interface handler) version min_version;
if version > max_version then
Fmt.failwith "Can't ask for %s version %ld when handler requires version <= %ld"
(Handler.interface handler) version max_version;
spawn_generic t ~version handler
let user_data (t:_ t) = t.handler#user_data
let shutdown_send t =
if t.can_send then
t.can_send <- false
else
Fmt.failwith "%a already shut down!" pp t
let shutdown_recv t =
if t.can_recv then
t.can_recv <- false
else
Fmt.failwith "%a already shut down!" pp t
let add_root conn (handler : (_, _, _) #Handler.t) =
let display_proxy = make_proxy 1l ~version:1l ~conn ~handler in
conn.objects <- Objects.add display_proxy.id (Generic display_proxy) conn.objects;
display_proxy
let on_delete t fn =
Queue.add fn t.on_delete
let delete t =
let conn = t.conn in
match Objects.find_opt t.id conn.objects with
| Some (Generic t') when Obj.repr t == Obj.repr t' ->
t.can_recv <- false;
t.can_send <- false;
conn.objects <- Objects.remove t.id conn.objects;
Internal.free_id conn t.id;
begin match conn.role with
| `Client -> ()
| `Server ->
if not (id_allocated_by_us conn t.id) then (
let Generic display = Objects.find 1l conn.objects in
let msg = alloc display ~op:1 ~ints:1 ~strings:[] ~arrays:[] in
Msg.add_int msg t.id;
send display msg
)
end;
Queue.iter (fun f -> f ()) t.on_delete;
Queue.clear t.on_delete
| _ -> Fmt.failwith "Object %a is not registered!" pp t
let delete_other proxy id =
let conn = proxy.conn in
match Objects.find_opt id conn.objects with
| None -> Fmt.failwith "Object %ld does not exist!" id
| Some (Generic proxy) ->
proxy.can_recv <- false;
conn.objects <- Objects.remove id conn.objects;
Internal.free_id conn id;
Queue.iter (fun f -> f ()) proxy.on_delete;
Queue.clear proxy.on_delete
let unknown_event = Fmt.str "<unknown event %d>"
let unknown_request = Fmt.str "<unknown request %d>"
let lookup_other (t : _ t) id =
match Objects.find_opt id t.conn.objects with
| None -> Fmt.failwith "Proxy with ID %ld not found!" id
| Some (Generic p) ->
if p.can_recv then
Proxy p
else
Fmt.failwith "Message referred to object %a, which cannot receive further messages" pp p
let wrong_type ~parent ~expected t =
Fmt.failwith "Object %a referenced object %a, which should be of type %S but isn't'" pp parent pp t expected
let trace (type r) (module T : TRACE with type role = r) = {
inbound = T.inbound;
outbound = T.outbound;
}
let pp_transport f t =
t.conn.transport#pp f