1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
open Core_kernel
open Async_kernel
module Handler_result = Transport_intf.Handler_result
module Reader = struct
module type S = Transport_intf.Reader
type t = T : (module S with type t = 'a) * 'a -> t
let pack m t = T (m, t)
let sexp_of_t (T ((module M), t)) : Sexp.t = M.sexp_of_t t
let close (T ((module M), t)) : unit Deferred.t = M.close t
let is_closed (T ((module M), t)) : bool = M.is_closed t
let read_forever (T ((module M), t)) ~on_message ~on_end_of_batch : _ Deferred.t =
M.read_forever t ~on_message ~on_end_of_batch
let read_one_message_bin_prot t
(bin_reader : _ Bin_prot.Type_class.reader) =
read_forever t
~on_message:(fun buf ~pos ~len ->
let pos_ref = ref pos in
let x = bin_reader.read buf ~pos_ref in
if !pos_ref <> pos + len then
failwithf "message length (%d) did not match expected length (%d)"
(!pos_ref - pos) len ()
else
Stop x)
~on_end_of_batch:ignore
;;
end
module Send_result = Transport_intf.Send_result
module Writer = struct
module type S = Transport_intf.Writer
type 'a writer =
{ impl : (module S with type t = 'a)
; t : 'a
; stopped : unit Deferred.t
}
type t = T : 'a writer -> t
let pack (type a) (module M : S with type t = a) t =
T { impl = (module M)
; t
; stopped = M.stopped t
}
;;
let sexp_of_t (T { impl=(module M); t; _ }) : Sexp.t = M.sexp_of_t t
let close (T { impl=(module M); t; _ }) : unit Deferred.t = M.close t
let is_closed (T { impl=(module M); t; _ }) : bool = M.is_closed t
let monitor (T { impl=(module M); t; _ }) : Monitor.t = M.monitor t
let bytes_to_write (T { impl=(module M); t; _ }) : int = M.bytes_to_write t
let flushed (T { impl=(module M); t; _ }) : unit Deferred.t = M.flushed t
let ready_to_write (T { impl=(module M); t; _ }) : unit Deferred.t = M.ready_to_write t
let send_bin_prot (T { impl=(module M); t; _ }) bin_writer x : _ Send_result.t =
M.send_bin_prot t bin_writer x
let send_bin_prot_and_bigstring (T { impl = (module M); t; _ }) bin_writer x
~buf ~pos ~len : _ Send_result.t =
M.send_bin_prot_and_bigstring t bin_writer x ~buf ~pos ~len
let send_bin_prot_and_bigstring_non_copying (T { impl = (module M); t; _ }) bin_writer x
~buf ~pos ~len : _ Send_result.t =
M.send_bin_prot_and_bigstring_non_copying t bin_writer x ~buf ~pos ~len
let stopped (T { stopped; _ }) = stopped
let can_send (T { impl = (module M); t; stopped }) =
not (M.is_closed t || Deferred.is_determined stopped)
;;
let transfer t ?(max_num_values_per_read=1_000) pipe f =
let consumer =
Pipe.add_consumer pipe
~downstream_flushed:(fun () -> flushed t >>| fun () -> `Ok)
in
let end_of_pipe =
Deferred.create (fun ivar ->
let rec iter () =
if can_send t then begin
match
Pipe.read_now' pipe ~consumer ~max_queue_length:max_num_values_per_read
with
| `Ok q ->
Queue.iter q ~f;
Pipe.Consumer.values_sent_downstream consumer;
ready_to_write t >>> iter
| `Nothing_available ->
Pipe.values_available pipe
>>> fun (`Ok | `Eof) -> iter ()
| `Eof -> Ivar.fill ivar ()
end
in
iter ())
in
Deferred.any
[ end_of_pipe
; stopped t
]
>>| fun () ->
Pipe.close_read pipe
;;
end
type t =
{ reader : Reader.t
; writer : Writer.t
}
[@@deriving sexp_of]
let close t =
Writer.close t.writer
>>= fun () ->
Reader.close t.reader
;;