1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
open! Core
open! Async_kernel
open! Import
open! Bus
let subscribe_and_maybe_write_to_pipe1 t here ~maybe_write_fn =
if Bus.is_closed t
then Pipe.empty ()
else (
let r, w = Pipe.create () in
let subscription =
subscribe_exn t here ~f:(maybe_write_fn w) ~on_close:(fun () -> Pipe.close w)
in
upon (Pipe.closed w) (fun () -> unsubscribe t subscription);
r)
;;
let pipe1_exn t here =
subscribe_and_maybe_write_to_pipe1
t
here
~maybe_write_fn:Pipe.write_without_pushback_if_open
;;
let pipe1_filter_map_exn t here ~f =
subscribe_and_maybe_write_to_pipe1 t here ~maybe_write_fn:(fun pipe v ->
match f v with
| None -> ()
| Some v -> Pipe.write_without_pushback_if_open pipe v)
;;
module First_arity = struct
type (_, _, _) t =
| Arity1 : ('a -> unit, 'a -> 'r option, 'r) t
| Arity2 : ('a -> 'b -> unit, 'a -> 'b -> 'r option, 'r) t
| Arity3 : ('a -> 'b -> 'c -> unit, 'a -> 'b -> 'c -> 'r option, 'r) t
| Arity4 : ('a -> 'b -> 'c -> 'd -> unit, 'a -> 'b -> 'c -> 'd -> 'r option, 'r) t
| Arity5
: ( 'a -> 'b -> 'c -> 'd -> 'e -> unit
, 'a -> 'b -> 'c -> 'd -> 'e -> 'r option
, 'r )
t
[@@deriving sexp_of]
end
let first_exn (type c f r) ?stop t here (first_arity : (c, f, r) First_arity.t) ~(f : f) =
Deferred.create (fun ivar ->
let subscriber : c Bus.Subscriber.t option ref = ref None in
let finish : r option -> unit = function
| None -> ()
| Some r ->
Ivar.fill ivar r;
Bus.unsubscribe t (Option.value_exn !subscriber)
in
let can_finish =
match stop with
| None -> fun () -> true
| Some stop ->
upon stop (fun () -> Bus.unsubscribe t (Option.value_exn !subscriber));
fun () -> not (Deferred.is_determined stop)
in
let callback : c =
match first_arity with
| Arity1 -> fun a -> if can_finish () then finish (f a)
| Arity2 -> fun a1 a2 -> if can_finish () then finish (f a1 a2)
| Arity3 -> fun a1 a2 a3 -> if can_finish () then finish (f a1 a2 a3)
| Arity4 -> fun a1 a2 a3 a4 -> if can_finish () then finish (f a1 a2 a3 a4)
| Arity5 -> fun a1 a2 a3 a4 a5 -> if can_finish () then finish (f a1 a2 a3 a4 a5)
in
subscriber
:= Some
(Bus.subscribe_exn
t
here
~on_callback_raise:
(let monitor = Monitor.current () in
fun error -> Monitor.send_exn monitor (Error.to_exn error))
~f:callback))
;;