Source file flow_lwt_unix.ml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
open Lwt.Infix
type error = [`Unix of Unix.error]
type write_error = [`Closed | `Unix of Unix.error]
let pp_error ppf (`Unix e) = Fmt.string ppf (Unix.error_message e)
let pp_write_error ppf = function
| #Mirage_flow.write_error as e -> Mirage_flow.pp_write_error ppf e
| #error as e -> pp_error ppf e
type flow = {
fd: Lwt_unix.file_descr;
read_buffer_size: int;
mutable read_buffer: Cstruct.t;
mutable shutdown: [ `None | `Read | `Write | `Both ];
}
let connect fd =
let read_buffer_size = 32768 in
let read_buffer = Cstruct.create read_buffer_size in
let shutdown = `None in
{ fd; read_buffer_size; read_buffer; shutdown }
let close t =
match t.shutdown with
| `None ->
t.shutdown <- `Both;
Lwt_unix.close t.fd
| `Read ->
t.shutdown <- `Both;
Lwt_unix.shutdown t.fd Lwt_unix.SHUTDOWN_SEND;
Lwt.return_unit
| `Write ->
t.shutdown <- `Both;
Lwt_unix.shutdown t.fd Lwt_unix.SHUTDOWN_RECEIVE;
Lwt.return_unit
| `Both ->
Lwt.return_unit
let safe op f r =
Lwt.catch (fun () -> op f r) (function
| Unix.Unix_error (Unix.EPIPE, _, _) -> Lwt.return 0
| e -> Lwt.fail e)
let read flow =
match flow.shutdown with
| `Read | `Both -> Lwt.return (Ok `Eof)
| _ ->
if Cstruct.length flow.read_buffer = 0
then flow.read_buffer <- Cstruct.create flow.read_buffer_size;
safe Lwt_cstruct.read flow.fd flow.read_buffer >|= function
| 0 -> Ok `Eof
| n ->
let result = Cstruct.sub flow.read_buffer 0 n in
flow.read_buffer <- Cstruct.shift flow.read_buffer n;
Ok (`Data result)
let protect f =
Lwt.catch f (function
| End_of_file -> Lwt.return (Error `Closed)
| Unix.Unix_error (e, _, _) -> Lwt.return (Error (`Unix e))
| e -> Lwt.fail e
)
let write flow buf =
match flow.shutdown with
| `Write | `Both -> Lwt.return (Error `Closed)
| _ ->
protect (fun () ->
Lwt_cstruct.complete (safe Lwt_cstruct.write flow.fd) buf >|= fun () ->
Ok ()
)
let writev flow bufs =
let rec loop = function
| [] -> Lwt.return (Ok ())
| x :: xs ->
match flow.shutdown with
| `Write | `Both -> Lwt.return (Error `Closed)
| _ ->
Lwt_cstruct.complete (safe Lwt_cstruct.write flow.fd) x >>= fun () ->
loop xs
in
protect (fun () -> loop bufs)
let shutdown flow cmd =
let cmd', status = match flow.shutdown, cmd with
| `Both, _ -> None, `Both
| `None, x -> Some x, (match cmd with `write -> `Write | `read -> `Read | `read_write -> `Both)
| `Read, (`write | `read_write) -> Some `write, `Both
| `Write, (`read | `read_write) -> Some `read, `Both
| s, _ -> None, s
in
let lwt_cmd = Option.map (function
| `write -> Lwt_unix.SHUTDOWN_SEND
| `read -> SHUTDOWN_RECEIVE
| `read_write -> SHUTDOWN_ALL)
cmd'
in
flow.shutdown <- status;
Option.iter (Lwt_unix.shutdown flow.fd) lwt_cmd;
Lwt.return_unit