Source file flow.ml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
open Eio.Std

(* When copying between a source with an FD and a sink with an FD, we can share the chunk
   and avoid copying. *)
let fast_copy src dst =
  let fallback () =
    (* No chunks available. Use regular memory instead. *)
    let buf = Cstruct.create 4096 in
    try
      while true do
        let got = Low_level.readv src [buf] in
        Low_level.writev dst [Cstruct.sub buf 0 got]
      done
    with End_of_file -> ()
  in
  Low_level.with_chunk ~fallback @@ fun chunk ->
  let chunk_size = Uring.Region.length chunk in
  try
    while true do
      let got = Low_level.read_upto src chunk chunk_size in
      Low_level.write dst chunk got
    done
  with End_of_file -> ()

(* Try a fast copy using splice. If the FDs don't support that, switch to copying. *)
let _fast_copy_try_splice src dst =
  try
    while true do
      let _ : int = Low_level.splice src ~dst ~len:max_int in
      ()
    done
  with
  | End_of_file -> ()
  | Eio.Exn.Io (Eio.Exn.X Eio_unix.Unix_error ((EAGAIN | EINVAL), "splice", _), _) -> fast_copy src dst

(* XXX workaround for issue #319, PR #327 *)
let fast_copy_try_splice src dst = fast_copy src dst
    
let[@tail_mod_cons] rec list_take n = function
  | [] -> []
  | x :: xs ->
    if n = 0 then []
    else x :: list_take (n - 1) xs

let truncate_to_iomax xs =
  if List.compare_length_with xs Uring.iov_max <= 0 then xs
  else list_take Uring.iov_max xs

(* Copy using the [Read_source_buffer] optimisation.
   Avoids a copy if the source already has the data. *)
let copy_with_rsb rsb dst =
  let write xs = Low_level.writev_single dst (truncate_to_iomax xs) in
  try
    while true do rsb write done
  with End_of_file -> ()

(* Copy by allocating a chunk from the pre-shared buffer and asking
   the source to write into it. This used when the other methods
   aren't available. *)
let fallback_copy (type src) (module Src : Eio.Flow.Pi.SOURCE with type t = src) src dst =
  let fallback () =
    (* No chunks available. Use regular memory instead. *)
    let buf = Cstruct.create 4096 in
    try
      while true do
        let got = Src.single_read src buf in
        Low_level.writev dst [Cstruct.sub buf 0 got]
      done
    with End_of_file -> ()
  in
  Low_level.with_chunk ~fallback @@ fun chunk ->
  let chunk_cs = Uring.Region.to_cstruct chunk in
  try
    while true do
      let got = Src.single_read src chunk_cs in
      Low_level.write dst chunk got
    done
  with End_of_file -> ()

module Impl = struct
  type tag = [`Generic | `Unix]

  type t = Eio_unix.Fd.t

  let fd t = t

  let close = Eio_unix.Fd.close

  let stat = Low_level.fstat

  let single_read t buf =
    Low_level.readv t [buf]

  let pread t ~file_offset bufs =
    Low_level.readv ~file_offset t bufs

  let pwrite t ~file_offset bufs =
    Low_level.writev_single ~file_offset t (truncate_to_iomax bufs)

  let read_methods = []

  let single_write t bufs = Low_level.writev_single t (truncate_to_iomax bufs)

  let copy t ~src =
    match Eio_unix.Resource.fd_opt src with
    | Some src -> fast_copy_try_splice src t
    | None ->
      let Eio.Resource.T (src, ops) = src in
      let module Src = (val (Eio.Resource.get ops Eio.Flow.Pi.Source)) in
      let rec aux = function
        | Eio.Flow.Read_source_buffer rsb :: _ -> copy_with_rsb (rsb src) t
        | _ :: xs -> aux xs
        | [] -> fallback_copy (module Src) src t
      in
      aux Src.read_methods

  let shutdown t cmd =
    Low_level.shutdown t @@ match cmd with
    | `Receive -> Unix.SHUTDOWN_RECEIVE
    | `Send -> Unix.SHUTDOWN_SEND
    | `All -> Unix.SHUTDOWN_ALL

  let send_msg t ~fds data =
    Low_level.send_msg t ~fds data

  let recv_msg_with_fds t ~sw ~max_fds data =
    let _addr, n, fds = Low_level.recv_msg_with_fds t ~sw ~max_fds data in
    n, fds

  let seek = Low_level.lseek
  let sync = Low_level.fsync
  let truncate = Low_level.ftruncate
end

let flow_handler = Eio_unix.Pi.flow_handler (module Impl)

let of_fd fd =
  let r = Eio.Resource.T (fd, flow_handler) in
  (r : [`Unix_fd | Eio_unix.Net.stream_socket_ty | Eio.File.rw_ty] r :>
     [< `Unix_fd | Eio_unix.Net.stream_socket_ty | Eio.File.rw_ty] r)

let source fd = (of_fd fd :> Eio_unix.source_ty r)
let sink   fd = (of_fd fd :> Eio_unix.sink_ty r)

let stdin = source Eio_unix.Fd.stdin
let stdout = sink Eio_unix.Fd.stdout
let stderr = sink Eio_unix.Fd.stderr

module Secure_random = struct
  type t = unit
  let single_read () buf = Low_level.getrandom buf; Cstruct.length buf
  let read_methods = []
end

let secure_random =
  let ops = Eio.Flow.Pi.source (module Secure_random) in
  Eio.Resource.T ((), ops)