1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
open StdLabels
let sprintf = Printf.sprintf
let debug = false
let log fmt = match debug with
| true -> Printf.kfprintf (fun _ -> ()) stderr ("%s: " ^^ fmt ^^ "\n%!") __MODULE__
| false -> Printf.ikfprintf (fun _ -> ()) stderr fmt
let _ = log
let empty_sha = Authorization.hash_sha256 "" |> Authorization.to_hex
let get_chunked_length ~chunk_size payload_length =
let lengths =
(sprintf "%x" chunk_size |> String.length) * (payload_length / chunk_size) +
(match payload_length mod chunk_size with
| 0 -> 0
| n -> (sprintf "%x" n |> String.length))
in
let chunks = (payload_length + (chunk_size - 1)) / chunk_size + 1 in
chunks * (4 + 17 + 64) + lengths + 1 +
payload_length
let%test "get_chunk_length" =
get_chunked_length ~chunk_size:(64*1024) (65*1024) = 66824
module Make(Io : Types.Io) = struct
module Body = Body.Make(Io)
module Http = Http.Make(Io)
open Io
open Deferred
let chunk_writer ~signing_key ~scope ~initial_signature ~date ~time ~chunk_size reader =
let open Deferred in
let sub ~pos ?len str =
let res = match pos, len with
| 0, None -> str
| 0, Some len when len = String.length str -> str
| pos, None ->
String.sub ~pos ~len:(String.length str - pos) str
| pos, Some len ->
String.sub ~pos ~len str
in
res
in
let send writer sha previous_signature elements length =
let flush_done = Pipe.flush writer in
let sha = Digestif.SHA256.get sha in
let signature = Authorization.chunk_signature ~signing_key ~date ~time ~scope
~previous_signature ~sha |> Authorization.to_hex in
Io.Pipe.write writer
(Printf.sprintf "%x;chunk-signature=%s\r\n" length signature) >>= fun () ->
List.fold_left
~init:(Deferred.return ()) ~f:(fun x data -> x >>= fun () -> Io.Pipe.write writer data)
elements >>= fun () ->
Io.Pipe.write writer "\r\n" >>= fun () ->
flush_done >>= fun () ->
return signature
in
let rec transfer previous_signature ctx (buffered:int) queue current writer =
begin
match current with
| Some v -> return (Some v)
| None ->
Io.Pipe.read reader >>= function
| None ->
return None
| Some v ->
return (Some (v, 0))
end >>= function
| None ->
send writer ctx previous_signature (List.rev queue) buffered >>= fun signature ->
send writer Digestif.SHA256.empty signature [] 0 >>= fun _signature ->
Deferred.return ()
| Some (data, offset) -> begin
let remain = chunk_size - buffered in
match (String.length data) - offset with
| n when n >= remain ->
let elem = sub data ~pos:offset ~len:remain in
let ctx = Digestif.SHA256.feed_string ctx elem in
let elements = elem :: queue |> List.rev in
send writer ctx previous_signature elements chunk_size >>= fun signature ->
let data = match String.length data > remain with
| true ->
Some (data, offset + remain)
| false ->
None
in
transfer signature Digestif.SHA256.empty 0 [] data writer
| _ ->
let elem = sub ~pos:offset data in
let ctx = Digestif.SHA256.feed_string ctx elem in
transfer previous_signature ctx
(buffered + String.length elem)
(elem :: queue) None writer
end
in
Pipe.create_reader ~f:(transfer initial_signature Digestif.SHA256.empty 0 [] None)
let make_request ~(endpoint: Region.endpoint) ?connect_timeout_ms ?(expect=false) ~sink ?(body=Body.Empty) ?(credentials:Credentials.t option) ~ ~meth ~path ~query () =
let (date, time) = Unix.gettimeofday () |> Time.iso8601_of_time in
let content_length =
match meth, body with
| (`PUT | `POST), Body.String body -> Some (String.length body |> string_of_int)
| (`PUT | `POST), Body.Chunked { length; chunk_size; _ } ->
Some (get_chunked_length ~chunk_size length |> string_of_int )
| (`PUT | `POST), Body.Empty -> Some "0"
| _ -> None
in
let payload_sha = match body with
| Body.Empty -> empty_sha
| Body.String body -> Authorization.hash_sha256 body |> Authorization.to_hex
| Body.Chunked _ -> "STREAMING-AWS4-HMAC-SHA256-PAYLOAD"
in
let token = match credentials with
| Some { Credentials.token ; _ } -> token
| None -> None
in
let , body =
let decoded_content_length = match body with
| Body.Chunked { length; _ } -> Some (string_of_int length)
| _ -> None
in
let open Headers in
let =
List.fold_left ~init:empty ~f:(fun m (key, value) -> add ~key ~value m) headers
|> add ~key:"Host" ~value:endpoint.host
|> add ~key:"x-amz-content-sha256" ~value:payload_sha
|> add ~key:"x-amz-date" ~value:(sprintf "%sT%sZ" date time)
|> add_opt ~key:"x-amz-security-token" ~value:token
|> add_opt ~key:"Content-Length" ~value:content_length
|> change ~key:"User-Agent" ~f:(function Some _ as r -> r | None -> Some "aws-s3 ocaml client")
|> add_opt ~key:"x-amz-decoded-content-length" ~value:decoded_content_length
|> change ~key:"Content-Encoding" ~f:(fun v -> match body, v with
| Body.Chunked _, Some v -> Some ("aws-chunked," ^ v)
| Body.Chunked _, None -> Some "aws-chunked"
| _, v -> v)
in
let auth, body =
match credentials with
| Some credentials ->
let verb = Http.string_of_method meth in
let region = Region.to_string endpoint.region in
let signing_key =
Authorization.make_signing_key ~date ~region ~service:"s3" ~credentials ()
in
let scope = Authorization.make_scope ~date ~region ~service:"s3" in
let signature, =
Authorization.make_signature ~date ~time ~verb ~path
~headers ~query:query ~scope ~signing_key ~payload_sha
in
let auth = (Authorization.make_auth_header ~credentials ~scope ~signed_headers ~signature) in
let body = match body with
| Body.String body ->
let reader, writer = Pipe.create () in
Pipe.write writer body >>= fun () ->
Pipe.close writer;
return (Some reader)
| Body.Empty -> return None
| Body.Chunked { pipe; chunk_size; _ } ->
let pipe =
chunk_writer ~signing_key ~scope
~initial_signature:signature ~date ~time ~chunk_size pipe
in
return (Some pipe)
in
Some auth, body
| None ->
let body = match body with
| Body.String body ->
let reader, writer = Pipe.create () in
Pipe.write writer body >>= fun () ->
Pipe.close writer;
return (Some reader)
| Body.Empty -> return None
| Body.Chunked { pipe; _} ->
return (Some pipe)
in
None, body
in
(Headers.add_opt ~key:"Authorization" ~value:auth headers), body
in
body >>= fun body ->
Http.call ?connect_timeout_ms ~endpoint ~path ~query ~headers ~expect ~sink ?body meth >>=? fun (code, msg, , body) ->
Deferred.Or_error.return (code, msg, headers, body)
end