Source file proxy.ml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
open Internal

type ('a, 'v, 'role) t = ('a, 'role) Internal.proxy

type ('a, 'v, 'role) proxy = ('a, 'v, 'role) t

type ('v, 'role) generic = Proxy : ('a, 'v, 'role) t -> ('v, 'role) generic

module type TRACE = sig
  type role
  val outbound : ('a, 'v, role) t -> ('a, [`W]) Msg.t -> unit
  val inbound : ('a, 'v, role) t -> ('a, [`R]) Msg.t -> unit
end

let pp = pp_proxy

let missing_dispatch _ = failwith "no handler registered!"

type (_, _) S.user_data += Missing_handler

let missing_handler metadata = object (_ : _ handler)
  method metadata = metadata
  method user_data = Missing_handler
  method dispatch = missing_dispatch
end

let make_proxy ~conn ~version ~handler id =
  { conn; id; version; can_send = true; can_recv = true; handler = (handler :> _ handler); on_delete = Queue.create () }

(* Register [id] as a new object with a temporary dummy handler.
   Call [complete_accept] on the result before blocking. *)
let accept_new t ~version ~handler id =
  let conn = t.conn in
  let is_service_allocated_id = (Int32.unsigned_compare id 0xFF000000l >= 0) in
  begin match conn.role with
    | `Client -> assert is_service_allocated_id
    | `Server -> assert (not is_service_allocated_id)
  end;
  if Objects.mem id conn.objects then
    Fmt.failwith "An object with ID %lu already exists!" id;
  let t' = make_proxy id ~version ~conn ~handler in
  conn.objects <- Objects.add id (Generic t') conn.objects;
  t'

let complete_accept t handler =
  assert (t.handler#user_data == Missing_handler);
  t.handler <- handler

let cast_version t = (t : ('a, _, 'role) t :> ('a, _, 'role) t)

let version t = t.version

let metadata t = t.handler#metadata

let can_send t = t.can_send

let transport_up t = t.conn.transport#up

let ty (type a) t =
  let (module M : Metadata.S with type t = a) = metadata t in
  M.T

let interface (type a) t =
  let (module M : Metadata.S with type t = a) = metadata t in
  M.interface

module Handler = struct
  class type ['a, 'v, 'role] t = object
    method user_data : ('a, 'role) S.user_data
    method metadata : (module Metadata.S with type t = 'a)
    method dispatch : ('a, 'v, 'role) proxy -> ('a, [`R]) Msg.t -> unit
  end

  let interface (type a) (t : (_, _ ,_) #t) =
    let (module M : Metadata.S with type t = a) = t#metadata in
    M.interface

  let cast_version t = (t :> _ t)

  let accept_new (type a) proxy (module M : Metadata.S with type t = a) id =
    accept_new proxy id ~version:proxy.version ~handler:(missing_handler (module M))

  let attach proxy handler =
    complete_accept proxy (handler :> _ handler)
end

module Service_handler = struct
  class type ['a, 'v, 'role] t = object
    inherit ['a, 'v, 'role] Handler.t
    method min_version : int32
    method max_version : int32
  end

  let interface t = Handler.interface t

  let min_version t = t#min_version

  let cast_version t = (t :> _ t)

  let accept_new (type a) proxy id (module M : Metadata.S with type t = a) ~version : (a, [`Unknown], _) proxy =
    accept_new proxy id ~version ~handler:(missing_handler (module M))

  let attach (type a) (proxy : (a, _, _) proxy) (t:(_, _, _) #t) =
    if proxy.version < t#min_version then
      Fmt.invalid_arg "attach: %a has version %ld, but handler requires version >= %ld" pp proxy proxy.version t#min_version;
    if proxy.version > t#max_version then
      Fmt.invalid_arg "attach: %a has version %ld, but handler requires version <= %ld" pp proxy proxy.version t#max_version;
    complete_accept proxy (t :> _ handler)

  let attach_proxy proxy t =
    attach proxy t;
    proxy
end

let id t =
  if t.can_send then t.id
  else Fmt.invalid_arg "Attempt to use %a after destroying it" pp t

let id_opt = function
  | None -> 0l
  | Some t -> id t

let alloc t = Msg.alloc ~obj:t.id

let send (type a) (t:_ t) (msg : (a, [`W]) Msg.t) =
  t.conn.trace.outbound t msg;
  if t.can_send then
    enqueue t.conn (Msg.cast msg)
  else
    Fmt.failwith "Attempt to use object %a after calling destructor!" pp t

let spawn_generic t ~version (handler : (_, _, _) #Handler.t) =
  let conn = t.conn in
  let id = get_unused_id conn in
  let t' = make_proxy id ~version ~conn:t.conn ~handler:(handler :> _ handler) in
  conn.objects <- Objects.add id (Generic t') conn.objects;
  t'

let spawn t (handler : (_, _, _) #Handler.t) =
  spawn_generic t ~version:t.version handler

let spawn_bind (t : (_, _, 'role) t) ((handler : ('a, 'v, 'role) #Service_handler.t), version) : ('a, 'v, 'role) t =
  let min_version = handler#min_version in
  let max_version = handler#max_version in
  if version < min_version then
    Fmt.failwith "Can't ask for %s version %ld when handler requires version >= %ld"
      (Handler.interface handler) version min_version;
  if version > max_version then
    Fmt.failwith "Can't ask for %s version %ld when handler requires version <= %ld"
      (Handler.interface handler) version max_version;
  spawn_generic t ~version handler

let user_data (t:_ t) = t.handler#user_data

let shutdown_send t =
  if t.can_send then
    t.can_send <- false
  else
    Fmt.failwith "%a already shut down!" pp t

let shutdown_recv t =
  if t.can_recv then
    t.can_recv <- false
  else
    Fmt.failwith "%a already shut down!" pp t

let add_root conn (handler : (_, _, _) #Handler.t) =
  let display_proxy = make_proxy 1l ~version:1l ~conn ~handler in
  conn.objects <- Objects.add display_proxy.id (Generic display_proxy) conn.objects;
  display_proxy

let on_delete t fn =
  Queue.add fn t.on_delete

let delete t =
  let conn = t.conn in
  match Objects.find_opt t.id conn.objects with
  | Some (Generic t') when Obj.repr t == Obj.repr t' ->
    t.can_recv <- false;
    t.can_send <- false;
    conn.objects <- Objects.remove t.id conn.objects;
    Internal.free_id conn t.id;
    begin match conn.role with
      | `Client -> ()
      | `Server ->
        if not (id_allocated_by_us conn t.id) then (
          let Generic display = Objects.find 1l conn.objects in
          let msg = alloc display ~op:1 ~ints:1 ~strings:[] ~arrays:[] in
          Msg.add_int msg t.id;
          send display msg
        )
    end;
    Queue.iter (fun f -> f ()) t.on_delete;
    Queue.clear t.on_delete
  | _ -> Fmt.failwith "Object %a is not registered!" pp t

let delete_other proxy id =
  let conn = proxy.conn in
  match Objects.find_opt id conn.objects with
  | None -> Fmt.failwith "Object %ld does not exist!" id
  | Some (Generic proxy) ->
    proxy.can_recv <- false;
    conn.objects <- Objects.remove id conn.objects;
    Internal.free_id conn id;
    Queue.iter (fun f -> f ()) proxy.on_delete;
    Queue.clear proxy.on_delete

let unknown_event = Fmt.str "<unknown event %d>"
let unknown_request = Fmt.str "<unknown request %d>"

let lookup_other (t : _ t) id =
  match Objects.find_opt id t.conn.objects with
  | None -> Fmt.failwith "Proxy with ID %ld not found!" id
  | Some (Generic p) ->
    if p.can_recv then
      Proxy p
    else
      Fmt.failwith "Message referred to object %a, which cannot receive further messages" pp p

let wrong_type ~parent ~expected t =
  Fmt.failwith "Object %a referenced object %a, which should be of type %S but isn't'" pp parent pp t expected

let trace (type r) (module T : TRACE with type role = r) = {
  inbound = T.inbound;
  outbound = T.outbound;
}

let pp_transport f t =
  t.conn.transport#pp f