1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
open Core
open Import
include Writer0
module Unix = Unix_syscalls
let of_pipe ?time_source info pipe_w =
let%map `Reader reader_fd, `Writer writer_fd = Unix.pipe info in
let reader = Reader.create reader_fd in
let writer = create ?time_source writer_fd in
if Debug.writer
then
Debug.log
"Writer.of_pipe"
(pipe_w, reader, writer)
[%sexp_of: string Pipe.Writer.t * Reader.t * t];
let closed_and_flushed_downstream =
let%bind () = Reader.transfer reader pipe_w in
if raise_when_consumer_leaves writer && not (is_closed writer)
then
Monitor.send_exn
(monitor writer)
(Unix.Unix_error (EPIPE, "Writer.of_pipe", Sexp.to_string (Info.sexp_of_t info)));
let%map (), () = Deferred.both (Reader.close reader) (close writer) in
if not (Pipe.is_closed pipe_w) then Pipe.close pipe_w
in
writer, `Closed_and_flushed_downstream closed_and_flushed_downstream
;;
let splice_result t ~from =
match%map
Reader.read_one_chunk_at_a_time from ~handle_chunk:(fun buffer ~pos ~len ->
schedule_bigstring t ~pos ~len buffer;
match%map flushed_or_failed_with_result t with
| Flushed (_ : Time_ns.t) -> `Continue
| Error -> `Stop `Error
| Consumer_left | Force_closed -> `Stop `Consumer_left)
with
| `Eof_with_unconsumed_data (_ : string) -> assert false
| `Eof -> `Ok
| `Stopped result -> result
;;
let splice t ~from = splice_result t ~from |> Deferred.ignore_m