Source file io.ml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
open Async
module Deferred = struct
  type 'a t = 'a Async_kernel.Deferred.t

  module Or_error = struct
    type nonrec 'a t = ('a, exn) result t
    let return v = Async_kernel.Deferred.return (Ok v)
    let fail exn = Async_kernel.Deferred.return (Error exn)
    let catch f =
      Async_kernel.Monitor.try_with f >>= function
      | Ok v -> Async_kernel.return v
      | Error exn -> Async_kernel.return (Error exn)

    let (>>=) : 'a t -> ('a -> 'b t) -> 'b t = fun v f ->
      v >>= function
      | Ok v -> f v
      | Error exn -> Async_kernel.return (Error exn)
  end

  let (>>=) = Async_kernel.(>>=)
  let (>>|) = Async_kernel.(>>|)
  let (>>=?) v f =
    v >>= function
    | Ok v -> f v
    | Error exn -> return (Error exn)

  let return = Async_kernel.return
  let after delay = Async_kernel.after (Core_kernel.Time_ns.Span.of_sec delay)
  let catch f = Async_kernel.Monitor.try_with f
  let async = don't_wait_for
end

module Ivar = struct
  type 'a t = 'a Async.Ivar.t
  let create () = Async.Ivar.create ()
  let fill t v = Async.Ivar.fill t v
  let wait t = Async.Ivar.read t
end

module Pipe = struct
  open Async_kernel
  open Deferred

  type ('a, 'b) pipe = ('a, 'b) Pipe.pipe
  type writer_phantom = Pipe.Writer.phantom
  type reader_phantom = Pipe.Reader.phantom
  type 'a writer = 'a Pipe.Writer.t
  type 'a reader = 'a Pipe.Reader.t

  let flush writer = Pipe.downstream_flushed writer >>= fun _ -> return ()
  let read reader = Pipe.read reader >>= function
    | `Eof -> return None
    | `Ok v -> return (Some v)
  let write writer data =
    (* Pipe.write writer data *)
    Pipe.write_without_pushback writer data;
    return ()
  let close writer = Pipe.close writer
  let close_reader reader = Pipe.close_read reader
  let create_reader ~f = Pipe.create_reader ~close_on_exception:true f
  let create_writer ~f = Pipe.create_writer f
  let transfer reader writer = Pipe.transfer_id reader writer
  let create () = Pipe.create ()
  let is_closed pipe = Pipe.is_closed pipe
  let closed pipe = Pipe.closed pipe
end

module Net = struct
  let connect ?connect_timeout_ms ~inet ~host ~port ~scheme =
    let uri =
      let scheme = match scheme with
        | `Http -> "http"
        | `Https -> "https"
      in
      Uri.make ~scheme ~host:host ~port ()
    in
    let options =
      let domain : Async_unix.Unix.socket_domain =
        match inet with
          | `V4 -> PF_INET
          | `V6 -> PF_INET6
      in
      Core.Unix.[AI_FAMILY domain]
    in
    let close_socket_no_error = function
      | Conduit_async.V3.Inet_sock socket -> try Socket.shutdown socket `Both; with _ -> ()
    in
    let interrupt = match connect_timeout_ms with
      | None -> None
      | Some ms -> Some (Async.after (Core.Time.Span.of_ms (float ms)))
    in
    Async.try_with (fun () -> Conduit_async.V3.connect_uri ?interrupt ~options uri) >>=? fun (socket, ic, oc) ->
    let reader = Reader.pipe ic in
    don't_wait_for (
      Async_kernel.Pipe.closed reader >>= fun () ->
      Monitor.try_with ~name:"Io.Net.connect connection-cleanup" (fun () ->
        Writer.close oc >>= fun () ->
        Reader.close ic >>= fun () ->
        return ()
      ) >>= fun _ ->
      close_socket_no_error socket;
      return ()
    );
    let writer = Writer.pipe oc in
    Deferred.Or_error.return (reader, writer)
end