1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
open! Import
module Io = struct
include Awsm.Http.Monad.Make (struct
type +'a t = 'a Deferred.t
end)
let monad =
{ Awsm.Http.Monad.bind = (fun x f -> inj (prj x >>= fun x -> prj (f x)))
; return = (fun x -> inj (return x))
}
;;
let make_stream pipe =
let read pipe = inj (Pipe.read pipe) in
let ( >>= ) = monad.bind in
let return = monad.return in
fun () ->
read pipe
>>= function
| `Ok v -> return (Some v)
| `Eof -> return None
;;
module Call : sig
val cohttp_async
: ?endpoint_url:string
-> cfg:Awsm.Cfg.t
-> service:Awsm.Service.t
-> Awsm.Http.Meth.t
-> Awsm.Http.Request.t
-> Uri.t
-> (t Awsm.Http.Response.t, Awsm.Http.Io.Error.call) result Deferred.t
end = struct
let find_xml_redirect_endpoint xml =
let get x = Awsm.Xml.child_exn xml x |> Awsm.Xml.string_data_exn in
let code = get "Code" in
assert (String.equal "PermanentRedirect" code);
get "Endpoint"
;;
let ~host = Cohttp.Header.replace headers "host" host
let set_host request ~host =
{ request with
Cohttp.Request.headers =
request |> Cohttp.Request.headers |> set_host_headers ~host
}
;;
let rec interpret_response ~limit request (resp, body)
: (Cohttp.Response.t * Cohttp.Body.t, Awsm.Http.Io.Error.call) result s
=
if limit >= 50
then return (Error `Too_many_redirects)
else (
match Cohttp.Response.status resp with
| #Cohttp.Code.success_status -> return (Ok (resp, body))
| #Cohttp.Code.redirection_status ->
Cohttp.Body.to_string body
>>= fun body ->
let xml = Awsm.Xml.parse_response body in
let host = find_xml_redirect_endpoint xml in
let new_request = set_host request ~host in
Cohttp.Client.request new_request
>>= interpret_response ~limit:(succ limit) new_request
| code ->
Cohttp.Body.to_string body
>>= fun body ->
let x_amzn_error_type =
let = Cohttp.Response.headers resp in
match Cohttp.Header.get headers "x-amzn-ErrorType" with
| None -> None
| Some value -> (
match String.lsplit2 value ~on:':' with
| None -> Some value
| Some (v, _) -> Some v)
in
let bad_response =
{ Awsm.Http.Io.Error.code = Cohttp.Code.code_of_status code
; body
; x_amzn_error_type
}
in
return (Error (`Bad_response bad_response)))
;;
let interpret_response = interpret_response ~limit:0
(** Wrapper around [Cohttp.Client.request] that always uses https.
@see <https://github.com/mirage/ocaml-cohttp/issues/670> *)
let cohttp_async_client_request request req_body =
let uri = Uri.with_scheme (Cohttp.Request.uri request) (Some "https") in
let body = Cohttp.Body.of_string req_body in
Cohttp.Client.request ~uri ~body request
;;
let request_and_follow request req_body =
cohttp_async_client_request request req_body >>= interpret_response request
;;
let stream_of_body = function
| `Empty -> fun () -> monad.return None
| `Pipe pipe -> make_stream pipe
| `String x ->
let pipe = Pipe.of_list [ x ] in
make_stream pipe
| `Strings l ->
let pipe = Pipe.of_list l in
make_stream pipe
;;
let cohttp_async ?endpoint_url ~cfg ~service meth request uri =
let { Awsm.Cfg.region
; aws_access_key_id
; aws_secret_access_key
; aws_session_token
; _
}
=
cfg
in
let region = Option.value_exn region ~message:"config must set 'region'" in
let meth = Cohttp.to_meth meth in
let endpoint =
match endpoint_url with
| Some endpoint_url -> Uri.of_string endpoint_url
| None -> Awsm.Botocore_endpoints.lookup_uri ~region service `HTTPS
in
let uri =
Uri.with_uri ~scheme:(Uri.scheme endpoint) ~host:(Uri.host endpoint) uri
in
let host =
Core.Option.value_exn
(Uri.host endpoint)
~message:
(sprintf "could not extract 'host' from url %s" (Uri.to_string endpoint))
in
let =
let = Cohttp.to_headers request in
Cohttp.Header.add headers "host" host
in
let req_body = Awsm.Http.Request.body request in
let body_length = Int64.of_int (String.length req_body) in
let payload_hash = Awsm.Auth.payload_hash req_body in
let request =
Cohttp.Request.make_for_client ~headers ~chunked:false ~body_length meth uri
|> Awsm.Auth.sign_request
~region
~service
?session_token:aws_session_token
?aws_access_key_id
?aws_secret_access_key
~payload_hash
in
request_and_follow request req_body
>>= function
| Error _ as err -> return err
| Ok (resp, body) ->
let version = Cohttp.of_version resp in
let = Cohttp.of_headers resp in
let status = Cohttp.of_status resp in
let body = stream_of_body body in
return (Ok (Awsm.Http.Response.make ~version ~headers ~body status))
;;
end
let make_http http meth request uri = inj (http meth request uri)
let call ?endpoint_url ~cfg ~service meth request uri =
make_http (Call.cohttp_async ?endpoint_url ~cfg ~service) meth request uri
;;
end
module Response = struct
include Cohttp.Response
let resp = headers resp |> Cohttp.Header.to_list
end
module Body = Cohttp.Body
module Client = struct
let get ? uri = Cohttp.Client.get ?headers uri
let post ? ?body ?chunked uri = Cohttp.Client.post ?headers ?body ?chunked uri
let put ? ?body ?chunked uri = Cohttp.Client.put ?headers ?body ?chunked uri
let delete ? ?body ?chunked uri =
Cohttp.Client.delete ?headers ?body ?chunked uri
;;
end