1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
open Capnp_core
open Lwt.Infix
module Log = Capnp_rpc.Debug.Log
module Response = Response
module RO_array = Capnp_rpc.RO_array
type abstract_response_promise = Core_types.struct_ref
type abstract
type abstract_method_t =
abstract Schema.reader_t -> (unit -> unit) -> abstract_response_promise
type 'a response_promise = abstract_response_promise
type ('a, 'b) method_t = 'a -> (unit -> unit) -> Core_types.struct_ref
let pp_method = Capnp.RPC.Registry.pp_method
class type generic = object
method dispatch : interface_id:Stdint.Uint64.t -> method_id:int -> abstract_method_t
method release : unit
method pp : Format.formatter -> unit
end
let local (s:#generic) =
object (_ : Core_types.cap)
inherit Core_types.service as super
method! pp f = Fmt.pf f "%t(%t)" s#pp super#pp_refcount
method! private release =
super#release;
s#release
method call results msg =
let open Schema.Reader in
let call = Msg.Request.readable msg in
let interface_id = Call.interface_id_get call in
let method_id = Call.method_id_get call in
Log.debug (fun f -> f "Invoking local method %a" pp_method (interface_id, method_id));
let p = Call.params_get call in
let m : abstract_method_t = s#dispatch ~interface_id ~method_id in
let release_params () = Core_types.Request_payload.release msg in
let contents : abstract Schema.reader_t =
Payload.content_get p |> Schema.ReaderOps.deref_opt_struct_pointer |> Schema.ReaderOps.cast_struct in
match m contents release_params with
| r -> results#resolve r
| exception ex ->
release_params ();
Log.warn (fun f -> f "Uncaught exception handling %a: %a" pp_method (interface_id, method_id) Fmt.exn ex);
Core_types.resolve_payload results
(Error (Capnp_rpc.Error.exn "Internal error from %a" pp_method (interface_id, method_id)))
end
let return resp =
Core_types.return @@ Response.finish resp
let return_empty () =
return @@ Response.create_empty ()
let return_lwt fn =
let result, resolver = Local_struct_promise.make () in
Lwt.async (fun () ->
Lwt.catch (fun () ->
fn () >|= function
| Ok resp -> Core_types.resolve_ok resolver @@ Response.finish resp;
| Error (`Capnp e) -> Core_types.resolve_payload resolver (Error e)
)
(fun ex ->
Log.warn (fun f -> f "Uncaught exception: %a" Fmt.exn ex);
Core_types.resolve_exn resolver @@ Capnp_rpc.Exception.v "Internal error";
Lwt.return_unit
);
);
result
let fail = Core_types.fail
let fail_lwt ?ty fmt =
fmt |> Fmt.kstr @@ fun msg ->
Lwt_result.fail (`Capnp (`Exception (Capnp_rpc.Exception.v ?ty msg)))