Source file kerberized_rw.ml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
open! Core
open! Async
open Import.Internal
type t =
{ plaintext_reader : Reader.t
; plaintext_writer : Writer.t
; writer_closed_and_flushed : unit Deferred.t
}
[@@deriving fields]
let make_reader_writer info =
Unix.pipe info
>>| fun (`Reader reader_fd, `Writer writer_fd) ->
let reader = Reader.create reader_fd in
let writer =
Writer.create ~buffer_age_limit:`Unlimited ~raise_when_consumer_leaves:false writer_fd
in
reader, writer
;;
let can_actually_write writer = Writer.can_write writer && Writer.fd writer |> Fd.is_open
let create_writer conn_type auth_context writer =
let transformation =
match (conn_type : Conn_type.t) with
| Auth -> None
| Safe -> Some Auth_context.Safe.encode
| Priv -> Some Auth_context.Priv.encode
in
match transformation with
| None -> return (writer, `Closed_and_flushed_downstream (Writer.close_finished writer))
| Some transformation ->
let info = Info.create "Kerberos encryption" (Writer.id writer) Writer.Id.sexp_of_t in
make_reader_writer info
>>| fun (plaintext_r, plaintext_w) ->
let monitor = Writer.monitor plaintext_w in
Monitor.detach_and_iter_errors (Writer.monitor writer) ~f:(Monitor.send_exn monitor);
let downstream_closed =
Writer.close_finished writer
>>= fun () ->
Deferred.all_unit [ Writer.close plaintext_w; Reader.close plaintext_r ]
in
don't_wait_for
(let handle_chunk buf ~pos ~len =
transformation auth_context (Bigsubstring.create ~pos ~len buf)
>>| ok_exn
>>| fun bstr ->
match can_actually_write writer with
| false -> `Stop ()
| true ->
Writer.write_bin_prot writer Bigstring.Stable.V1.bin_writer_t bstr;
`Continue
in
Monitor.try_with
~run:`Schedule
~name:"Kerberized_rw.create_writer"
(fun () ->
Reader.read_one_chunk_at_a_time plaintext_r ~handle_chunk
>>| function
| `Eof | `Stopped () -> ()
| `Eof_with_unconsumed_data _ ->
failwith "Impossible b/c we always consume all data above.")
>>| (function
| Ok () -> ()
| Error exn -> Monitor.send_exn monitor exn)
>>= fun () ->
Deferred.all_unit [ Reader.close plaintext_r; Writer.close plaintext_w ]
>>= fun () -> Writer.close writer);
plaintext_w, `Closed_and_flushed_downstream downstream_closed
;;
let reader_read_all reader read_one =
let pipe_r, pipe_w = Pipe.create () in
let finished =
Deferred.repeat_until_finished () (fun () ->
match%bind read_one reader with
| `Eof -> return (`Finished ())
| `Ok one ->
if Pipe.is_closed pipe_w
then return (`Finished ())
else (
let%map () = Pipe.write pipe_w one in
`Repeat ()))
in
upon finished (fun () -> Pipe.close pipe_w);
pipe_r
;;
let create_reader conn_type auth_context ~writer reader =
let transformation =
match (conn_type : Conn_type.t) with
| Auth -> None
| Safe -> Some Auth_context.Safe.decode
| Priv -> Some Auth_context.Priv.decode
in
match transformation with
| None -> return reader
| Some transformation ->
let info = Info.create "Kerberos decryption" (Reader.id reader) Reader.Id.sexp_of_t in
make_reader_writer info
>>| fun (plaintext_r, plaintext_w) ->
let monitor = Writer.monitor writer in
Monitor.detach_and_iter_errors
(Writer.monitor plaintext_w)
~f:(Monitor.send_exn monitor);
don't_wait_for
(Monitor.try_with_or_error
~here:[%here]
~name:"Kerberized_rw.create_reader"
(fun () ->
let pipe =
reader_read_all reader (fun r ->
Reader.read_bin_prot r Bigstring.Stable.V1.bin_reader_t)
in
Pipe.fold' pipe ~init:(Ok ()) ~f:(fun result ts ->
if Writer.can_write plaintext_w
then
Deferred.Queue.fold ts ~init:result ~f:(fun result t ->
return result
>>=? fun () ->
transformation auth_context (Bigsubstring.create t)
>>|? fun bstr ->
if can_actually_write plaintext_w
then Writer.write_bigstring plaintext_w bstr)
else
return result))
>>| (function
| Ok (Ok ()) -> ()
| Ok (Error krb_error) ->
Error.tag ~tag:"kerberos decryption failed" krb_error
|> Error.to_exn
|> Monitor.send_exn monitor
| Error e -> Monitor.send_exn monitor (Error.to_exn e))
>>= fun () ->
Writer.close writer
>>= fun () -> Reader.close reader >>= fun () -> Writer.close plaintext_w);
plaintext_r
;;
let create connection =
let conn_type = Async_protocol.Connection.conn_type connection in
let reader = Async_protocol.Connection.reader connection in
let writer = Async_protocol.Connection.writer connection in
match Async_protocol.Connection.auth_context connection with
| `Test_mode ->
return
{ plaintext_reader = reader
; plaintext_writer = writer
; writer_closed_and_flushed = Writer.close_finished writer
}
| `Prod auth_context ->
Deferred.both
(create_reader conn_type auth_context ~writer reader)
(create_writer conn_type auth_context writer)
>>| fun ( plaintext_reader
, (plaintext_writer, `Closed_and_flushed_downstream writer_closed_and_flushed)
) ->
{ plaintext_reader; plaintext_writer; writer_closed_and_flushed }
;;