1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
open Core
open Import
include Writer0
module Unix = Unix_syscalls
let of_pipe ?time_source info pipe_w =
let%map `Reader reader_fd, `Writer writer_fd = Unix.pipe info in
let reader = Reader.create reader_fd in
let writer = create ?time_source writer_fd in
if Debug.writer
then
Debug.log
"Writer.of_pipe"
(pipe_w, reader, writer)
[%sexp_of: string Pipe.Writer.t * Reader.t * t];
let closed_and_flushed_downstream =
let%bind () = Reader.transfer reader pipe_w in
if raise_when_consumer_leaves writer && not (is_closed writer)
then
Monitor.send_exn
(monitor writer)
(Unix.Unix_error (EPIPE, "Writer.of_pipe", Sexp.to_string (Info.sexp_of_t info)));
let%map (), () = Deferred.both (Reader.close reader) (close writer) in
if not (Pipe.is_closed pipe_w) then Pipe.close pipe_w
in
writer, `Closed_and_flushed_downstream closed_and_flushed_downstream
;;