1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
open Core
open Async_kernel
module Handler_result = Transport_intf.Handler_result
module Reader = struct
module type S = Transport_intf.Reader
type t = T : (module S with type t = 'a) * 'a -> t
let pack m t = T (m, t)
let sexp_of_t (T ((module M), t)) : Sexp.t = M.sexp_of_t t
let close (T ((module M), t)) : unit Deferred.t = M.close t
let is_closed (T ((module M), t)) : bool = M.is_closed t
let bytes_read (T ((module M), t)) : Int63.t = M.bytes_read t
let read_forever (T ((module M), t)) ~on_message ~on_end_of_batch : _ Deferred.t =
M.read_forever t ~on_message ~on_end_of_batch
;;
let read_one_message_bin_prot t (bin_reader : _ Bin_prot.Type_class.reader) =
read_forever
t
~on_message:(fun buf ~pos ~len ->
let pos_ref = ref pos in
let x = bin_reader.read buf ~pos_ref in
if !pos_ref <> pos + len
then
failwithf
"message length (%d) did not match expected length (%d)"
(!pos_ref - pos)
len
()
else Stop x)
~on_end_of_batch:ignore
;;
end
module Send_result = Transport_intf.Send_result
module Writer = struct
module type S = Transport_intf.Writer
type 'a writer =
{ impl : (module S with type t = 'a)
; t : 'a
; stopped : unit Deferred.t
}
type t = T : 'a writer -> t
let pack (type a) (module M : S with type t = a) t =
T { impl = (module M); t; stopped = M.stopped t }
;;
let sexp_of_t (T { impl = (module M); t; _ }) : Sexp.t = M.sexp_of_t t
let close (T { impl = (module M); t; _ }) : unit Deferred.t = M.close t
let is_closed (T { impl = (module M); t; _ }) : bool = M.is_closed t
let monitor (T { impl = (module M); t; _ }) : Monitor.t = M.monitor t
let bytes_to_write (T { impl = (module M); t; _ }) : int = M.bytes_to_write t
let bytes_written (T { impl = (module M); t; _ }) : Int63.t = M.bytes_written t
let flushed (T { impl = (module M); t; _ }) : unit Deferred.t = M.flushed t
let ready_to_write (T { impl = (module M); t; _ }) : unit Deferred.t =
M.ready_to_write t
;;
let send_bin_prot (T { impl = (module M); t; _ }) bin_writer x : _ Send_result.t =
M.send_bin_prot t bin_writer x
;;
let send_bin_prot_and_bigstring
(T { impl = (module M); t; _ })
bin_writer
x
~buf
~pos
~len
: _ Send_result.t
=
M.send_bin_prot_and_bigstring t bin_writer x ~buf ~pos ~len
;;
let send_bin_prot_and_bigstring_non_copying
(T { impl = (module M); t; _ })
bin_writer
x
~buf
~pos
~len
: _ Send_result.t
=
M.send_bin_prot_and_bigstring_non_copying t bin_writer x ~buf ~pos ~len
;;
let stopped (T { stopped; _ }) = stopped
let can_send (T { impl = (module M); t; stopped }) =
not (M.is_closed t || Deferred.is_determined stopped)
;;
let transfer t ?(max_num_values_per_read = 1_000) pipe f =
let consumer =
Pipe.add_consumer pipe ~downstream_flushed:(fun () ->
let%map () = flushed t in
`Ok)
in
let end_of_pipe =
Deferred.create (fun ivar ->
let rec iter () =
if can_send t
then (
match
Pipe.read_now' pipe ~consumer ~max_queue_length:max_num_values_per_read
with
| `Ok q ->
Queue.iter q ~f;
Pipe.Consumer.values_sent_downstream consumer;
ready_to_write t >>> iter
| `Nothing_available ->
Pipe.values_available pipe >>> fun (`Ok | `Eof) -> iter ()
| `Eof -> Ivar.fill_exn ivar ())
in
iter ())
in
let%map () = Deferred.any [ end_of_pipe; stopped t ] in
Pipe.close_read pipe
;;
end
type t =
{ reader : Reader.t
; writer : Writer.t
}
[@@deriving sexp_of]
let close t =
let%bind () = Writer.close t.writer in
Reader.close t.reader
;;