Source file capability.ml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
open Lwt.Infix
open Capnp_core
module Log = Capnp_rpc.Debug.Log
module StructStorage = Capnp.BytesMessage.StructStorage
type 'a t = Core_types.cap
type 'a capability_t = 'a t
type ('t, 'a, 'b) method_t = ('t, 'a, 'b) Capnp.RPC.MethodID.t
module Request = Request
let inc_ref = Core_types.inc_ref
let dec_ref = Core_types.dec_ref
let with_ref t fn =
Lwt.finalize
(fun () -> fn t)
(fun () -> dec_ref t; Lwt.return_unit)
let pp f x = x#pp f
let broken = Core_types.broken_cap
let when_broken = Core_types.when_broken
let when_released (x:Core_types.cap) f = x#when_released f
let problem x = x#problem
let wait_until_settled (x : _ t) =
let result, set_result = Lwt.wait () in
let rec aux x =
if x#blocker = None then (
Lwt.wakeup set_result ()
) else (
x#when_more_resolved (fun x ->
Core_types.dec_ref x;
aux x
)
)
in
aux x;
result
let await_settled t =
wait_until_settled t >|= fun () ->
match problem t with
| None -> Ok ()
| Some ex -> Error ex
let await_settled_exn t =
wait_until_settled t >|= fun () ->
match problem t with
| None -> ()
| Some e -> Fmt.failwith "%a" Capnp_rpc.Exception.pp e
let equal a b =
match a#blocker, b#blocker with
| None, None ->
let a = a#shortest in
let b = b#shortest in
begin match a#problem, b#problem with
| None, None -> Ok (a = b)
| Some a, Some b -> Ok (a = b)
| _ -> Ok false
end
| _ -> Error `Unsettled
let call (target : 't capability_t) (m : ('t, 'a, 'b) method_t) (req : 'a Request.t) =
Log.debug (fun f -> f "Calling %a" Capnp.RPC.MethodID.pp m);
let msg = Request.finish m req in
let results, resolver = Local_struct_promise.make () in
target#call resolver msg;
results
let call_and_wait cap (m : ('t, 'a, 'b StructStorage.reader_t) method_t) req =
let p, r = Lwt.task () in
let result = call cap m req in
let finish = lazy (Core_types.dec_ref result) in
Lwt.on_cancel p (fun () -> Lazy.force finish);
result#when_resolved (function
| Error e -> Lwt.wakeup r (Error (`Capnp e))
| Ok resp ->
Lazy.force finish;
let payload = Msg.Response.readable resp in
let release_response_caps () = Core_types.Response_payload.release resp in
let contents = Schema.Reader.Payload.content_get payload |> Schema.Reader.of_pointer in
Lwt.wakeup r @@ Ok (contents, release_response_caps)
);
p
let call_for_value cap m req =
call_and_wait cap m req >|= function
| Error _ as response -> response
| Ok (response, release_response_caps) ->
release_response_caps ();
Ok response
let call_for_value_exn cap m req =
call_for_value cap m req >>= function
| Ok x -> Lwt.return x
| Error (`Capnp e) ->
Log.debug (fun f -> f "Error calling %t(%a): %a"
cap#pp
Capnp.RPC.MethodID.pp m
Capnp_rpc.Error.pp e);
Fmt.failwith "%a: %a" Capnp.RPC.MethodID.pp m Capnp_rpc.Error.pp e
let call_for_unit cap m req =
call_for_value cap m req >|= function
| Ok _ -> Ok ()
| Error _ as e -> e
let call_for_unit_exn cap m req = call_for_value_exn cap m req >|= ignore
let call_for_caps cap m req fn =
let q = call cap m req in
match fn q with
| r -> Core_types.dec_ref q; r
| exception ex -> Core_types.dec_ref q; raise ex
type 'a resolver = Cap_proxy.resolver_cap
let promise () =
let cap = Cap_proxy.local_promise () in
(cap :> Core_types.cap), (cap :> 'a resolver)
let resolve_ok r x = r#resolve x
let resolve_exn r ex = r#resolve (Core_types.broken_cap ex)