1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
open! Core_kernel
open! Async_kernel
open! Import
open! Bus
let pipe1_exn (t : ('a -> unit, _) t) here =
let r, w = Pipe.create () in
let subscription =
subscribe_exn
t
here
~f:(function
| v -> Pipe.write_without_pushback_if_open w v)
~on_close:(fun () -> Pipe.close w)
in
upon (Pipe.closed w) (fun () -> unsubscribe t subscription);
r
;;
module First_arity = struct
type (_, _, _) t =
| Arity1 : ('a -> unit, 'a -> 'r option, 'r) t
| Arity2 : ('a -> 'b -> unit, 'a -> 'b -> 'r option, 'r) t
| Arity3 : ('a -> 'b -> 'c -> unit, 'a -> 'b -> 'c -> 'r option, 'r) t
| Arity4 : ('a -> 'b -> 'c -> 'd -> unit, 'a -> 'b -> 'c -> 'd -> 'r option, 'r) t
| Arity5
: ( 'a -> 'b -> 'c -> 'd -> 'e -> unit
, 'a -> 'b -> 'c -> 'd -> 'e -> 'r option
, 'r )
t
[@@deriving sexp_of]
end
let first_exn (type c f r) ?stop t here (first_arity : (c, f, r) First_arity.t) ~(f : f) =
let module A = First_arity in
Deferred.create (fun ivar ->
let subscriber : c Bus.Subscriber.t option ref = ref None in
let finish : r option -> unit = function
| None -> ()
| Some r ->
Ivar.fill ivar r;
Bus.unsubscribe t (Option.value_exn !subscriber)
in
let can_finish =
match stop with
| None -> fun () -> true
| Some stop ->
upon stop (fun () -> Bus.unsubscribe t (Option.value_exn !subscriber));
fun () -> not (Deferred.is_determined stop)
in
let callback : c =
match first_arity with
| Arity1 -> fun a -> if can_finish () then finish (f a)
| Arity2 -> fun a1 a2 -> if can_finish () then finish (f a1 a2)
| Arity3 -> fun a1 a2 a3 -> if can_finish () then finish (f a1 a2 a3)
| Arity4 -> fun a1 a2 a3 a4 -> if can_finish () then finish (f a1 a2 a3 a4)
| Arity5 -> fun a1 a2 a3 a4 a5 -> if can_finish () then finish (f a1 a2 a3 a4 a5)
in
subscriber
:= Some
(Bus.subscribe_exn
t
here
~on_callback_raise:
(let monitor = Monitor.current () in
fun error -> Monitor.send_exn monitor (Error.to_exn error))
~f:callback))
;;