Source file paf.ml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
module type RUNTIME = sig
  type t

  val next_read_operation : t -> [ `Read | `Yield | `Close ]
  (** [next_read_connection t] returns a value describing the next operation
      that the caller should conduit on behalf of the connection. *)

  val read : t -> Bigstringaf.t -> off:int -> len:int -> int
  (** [read t bigstring ~off ~len] reads bytes of input from the provided range
      of [bigstring] an returns the number of bytes consumed by the connection.
      {!read} should be called after {!next_read_operation} returns a [`Read]
      value an additional input is available for the connection to consume. *)

  val read_eof : t -> Bigstringaf.t -> off:int -> len:int -> int
  (** [read_eof t bigstring ~off ~len] reads bytes of input from the provided
      range of [bigstring] and returns the number of bytes consumed by the
      connection. {!read_eof} should be called after {!next_read_operation}
      returns a [`Read] and an EOF has been received from the communication
      channel. The connection will attempt to consume any buffered input and
      then shutdown the HTTP parser for the connection. *)

  val yield_reader : t -> (unit -> unit) -> unit
  (** [yield_reader t continue] registers with the connection to call [continue]
      when reading should resume. {!yield_reader} should be called after
      {!next_read_operation} returns a [`Yield] value. *)

  val next_write_operation :
    t -> [ `Write of Bigstringaf.t Faraday.iovec list | `Yield | `Close of int ]
  (** [next_write_operation t] returns a value describing the next operation
      that the caller should conduct on behalf the connection. *)

  val report_write_result : t -> [ `Ok of int | `Closed ] -> unit
  (** [report_write_result t result] reports the result of the latest write
      attempt to the connection. {!report_write_result} should be called after a
      call to {!next_write_operation} that returns a [`Write buffer] value.

      - [`Ok n] indicates that the caller successfully wrote [n] bytes of output
        from the buffer that the caller was provided by {!next_write_operation}
        that returns a [`Write buffer] value.
      - [`Closed] indicates that the output destination will no longer accept
        bytes from the write processor. *)

  val yield_writer : t -> (unit -> unit) -> unit
  (** [yield_writer t continue] registers with the connection to call [continue]
      when writing should resume. {!yield_writer} should be called after
      {!next_write_operation} returns a [`Yield] value. *)

  val report_exn : t -> exn -> unit
  (** [report_exn t exn] reports that an error [exn] has been caught and that it
      has been attributed to [t]. Calling this function will switch [t] into an
      error state. Depending on the tate [t] is transitioning from, it may call
      its error handler before terminating the connection. *)

  val is_closed : t -> bool
  (** [is_closed t] is [true] if both the read and write processors have been
      shutdown. When this is the case {!next_read_operation} will return
      [`Close _] and {!next_write_operation} will return a [`Write _] until all
      buffered output has been flushed, at which point it will return [`Close]. *)

  val shutdown : t -> unit
end

type sleep = int64 -> unit Lwt.t
type 'conn runtime = (module RUNTIME with type t = 'conn)

module Make (Flow : Mirage_flow.S) = struct
  let src = Logs.Src.create "paf-flow"

  module Log = (val Logs.src_log src : Logs.LOG)

  type flow = {
    flow : Flow.flow;
    sleep : sleep;
    queue : (char, Bigarray.int8_unsigned_elt) Ke.Rke.t;
    mutable rd_closed : bool;
    mutable wr_closed : bool;
  }

  let create ~sleep flow =
    let queue = Ke.Rke.create ~capacity:0x1000 Bigarray.char in
    Lwt.return { flow; sleep; queue; rd_closed = false; wr_closed = false }

  let safely_close flow =
    if flow.rd_closed && flow.wr_closed
    then (
      Log.debug (fun m -> m "Close the connection.") ;
      Flow.close flow.flow)
    else Lwt.return ()

  let blit src src_off dst dst_off len =
    let dst = Cstruct.of_bigarray ~off:dst_off ~len dst in
    Cstruct.blit src src_off dst 0 len

  open Lwt.Infix

  let recv flow ~read ~read_eof =
    (* match Ke.Rke.N.peek flow.queue with
       | src :: _ ->
         let len = Bigstringaf.length src in
         let shift = read src ~off:0 ~len in
         Ke.Rke.N.shift_exn flow.queue shift ;
         Lwt.return `Continue
       | [] when flow.rd_closed ->
         let _ = read_eof Bigstringaf.empty ~off:0 ~len:0 in
         Lwt.return `Closed
       | [] -> *)
    Ke.Rke.compress flow.queue ;
    Flow.read flow.flow >>= function
    | Error _ | Ok `Eof ->
        flow.rd_closed <- true ;
        safely_close flow >>= fun () ->
        let _shift =
          match
            Ke.Rke.compress flow.queue ;
            Ke.Rke.N.peek flow.queue
          with
          | [] -> read_eof Bigstringaf.empty ~off:0 ~len:0
          | [ slice ] -> read_eof slice ~off:0 ~len:(Bigstringaf.length slice)
          | _ -> assert false
          (* XXX(dinosaure): impossible due to [compress]. *) in
        Lwt.return `Closed
    | Ok (`Data v) ->
        let len = Cstruct.length v in
        Ke.Rke.N.push flow.queue ~blit ~length:Cstruct.length ~off:0 ~len v ;
        let[@warning "-8"] (slice :: _) = Ke.Rke.N.peek flow.queue in
        let shift = read slice ~off:0 ~len:(Bigstringaf.length slice) in
        Ke.Rke.N.shift_exn flow.queue shift ;
        Lwt.return `Continue
  (* XXX(dinosaure): semantically, this is the closer impl. of [recv] if we
   * compare with HTTP/AF. [compress] is called before any [read] and it ensures
   * some assumptions needed by HTTP/AF (or Angstrom) to parse requests.
   *
   * Indeed, without [compress] at the beginning, it seems that HTTP/AF is not
   * able to decide to close the connection.
   *
   * On the other side, introspect [flow.queue] before and gives slices and limit
   * calls to [read] can finish to a situation with ["\r\n"] into the queue and
   * HTTP/AF is not able to shift nor to finalize.
   *
   * In others words, [compress] seems the key to ensure that we deliver something
   * good for HTTP/AF to terminate or not the connection properly. *)

  let sleep flow timeout =
    flow.sleep timeout >>= fun () -> Lwt.return (Error `Closed)

  let writev ?(timeout = 5_000_000_000L) flow iovecs =
    let rec go acc = function
      | [] -> Lwt.return (`Ok acc)
      | { Faraday.buffer; off; len } :: rest -> (
          let raw = Cstruct.of_bigarray buffer ~off ~len in
          Lwt.pick [ Flow.write flow.flow raw; sleep flow timeout ] >>= function
          | Ok () -> go (acc + len) rest
          | Error `Closed ->
              flow.wr_closed <- true ;
              safely_close flow >>= fun () -> Lwt.return `Closed
          | Error _ -> assert false) in
    go 0 iovecs

  let send flow iovecs =
    if flow.wr_closed
    then safely_close flow >>= fun () -> Lwt.return `Closed
    else writev flow iovecs

  let close flow =
    match (flow.rd_closed, flow.wr_closed) with
    | true, true -> Lwt.return_unit
    | _ ->
        flow.rd_closed <- true ;
        flow.wr_closed <- true ;
        Flow.close flow.flow
end

module Server (Flow : Mirage_flow.S) (Runtime : RUNTIME) : sig
  val server : sleep:sleep -> Runtime.t -> Flow.flow -> unit Lwt.t
end = struct
  let src = Logs.Src.create "paf-server"

  module Log = (val Logs.src_log src : Logs.LOG)
  module Easy_flow = Make (Flow)
  open Lwt.Infix

  let server ~sleep connection flow =
    Easy_flow.create ~sleep flow >>= fun flow ->
    let rd_exit, notify_rd_exit = Lwt.wait () in
    let wr_exit, notify_wr_exit = Lwt.wait () in
    let rec rd_fiber () =
      let rec go () =
        match Runtime.next_read_operation connection with
        | `Read ->
            Log.debug (fun m -> m "next read operation: `read") ;
            Easy_flow.recv flow ~read:(Runtime.read connection)
              ~read_eof:(Runtime.read_eof connection)
            >>= fun _ -> go ()
        | `Yield ->
            Log.debug (fun m -> m "next read operation: `yield") ;
            Runtime.yield_reader connection rd_fiber ;
            Lwt.return_unit
        | `Close ->
            Log.debug (fun m -> m "next read operation: `close") ;
            Lwt.wakeup_later notify_rd_exit () ;
            flow.Easy_flow.rd_closed <- true ;
            Easy_flow.safely_close flow in
      Lwt.async @@ fun () ->
      Lwt.catch go (fun exn ->
          Runtime.report_exn connection exn ;
          Lwt.return_unit) in
    let rec wr_fiber () =
      let rec go () =
        match Runtime.next_write_operation connection with
        | `Write iovecs ->
            Log.debug (fun m -> m "next write operation: `write") ;
            Easy_flow.send flow iovecs >>= fun res ->
            Runtime.report_write_result connection res ;
            go ()
        | `Yield ->
            Log.debug (fun m -> m "next write operation: `yield") ;
            Runtime.yield_writer connection wr_fiber ;
            Lwt.return_unit
        | `Close _ ->
            Log.debug (fun m -> m "next write operation: `close") ;
            Lwt.wakeup_later notify_wr_exit () ;
            flow.Easy_flow.wr_closed <- true ;
            Easy_flow.safely_close flow in
      Lwt.async @@ fun () ->
      Lwt.catch go (fun exn ->
          (* Runtime.report_write_result connection `Closed ; *)
          Runtime.report_exn connection exn ;
          Lwt.return_unit) in
    rd_fiber () ;
    wr_fiber () ;
    Lwt.join [ rd_exit; wr_exit ] >>= fun () ->
    Log.debug (fun m -> m "End of transmission.") ;
    Easy_flow.close flow
end

module Client (Flow : Mirage_flow.S) (Runtime : RUNTIME) : sig
  val run : sleep:sleep -> Runtime.t -> Flow.flow -> unit Lwt.t
end = struct
  open Lwt.Infix

  let src = Logs.Src.create "paf"

  module Log = (val Logs.src_log src : Logs.LOG)
  module Easy_flow = Make (Flow)

  let run ~sleep connection flow =
    Easy_flow.create ~sleep flow >>= fun flow ->
    let rd_exit, notify_rd_exit = Lwt.wait () in
    let wr_exit, notify_wr_exit = Lwt.wait () in

    let rec rd_loop () =
      let rec go () =
        match Runtime.next_read_operation connection with
        | `Read ->
            Log.debug (fun m -> m "[`read] start to read.") ;
            Easy_flow.recv flow ~read:(Runtime.read connection)
              ~read_eof:(Runtime.read_eof connection)
            >>= fun _ -> go ()
        | `Yield ->
            Log.debug (fun m -> m "next read operation: `yield") ;
            Runtime.yield_reader connection rd_loop ;
            Lwt.return_unit
        | `Close ->
            Log.debug (fun m -> m "[`read] close the connection.") ;
            Lwt.wakeup_later notify_rd_exit () ;
            flow.Easy_flow.rd_closed <- true ;
            Easy_flow.safely_close flow in
      Lwt.async @@ fun () ->
      Lwt.catch go (fun exn ->
          Runtime.report_exn connection exn ;
          Lwt.return_unit) in
    let rec wr_loop () =
      let rec go () =
        match Runtime.next_write_operation connection with
        | `Write iovecs ->
            Log.debug (fun m -> m "[`write] start to write.") ;
            Easy_flow.send flow iovecs >>= fun res ->
            Runtime.report_write_result connection res ;
            go ()
        | `Yield ->
            Log.debug (fun m -> m "[`write] yield.") ;
            Runtime.yield_writer connection wr_loop ;
            Lwt.return ()
        | `Close _ ->
            Log.debug (fun m -> m "[`write] close.") ;
            Lwt.wakeup_later notify_wr_exit () ;
            Lwt.return_unit in

      Lwt.async @@ fun () ->
      Lwt.catch go (fun exn ->
          Runtime.report_exn connection exn ;
          Lwt.return ()) in
    wr_loop () ;
    rd_loop () ;
    Lwt.join [ rd_exit; wr_exit ] >>= fun () -> Easy_flow.close flow
end

type impl = Runtime : 'conn runtime * 'conn -> impl

type 't service =
  | Service : {
      accept : 't -> ('socket, ([> `Closed ] as 'error)) result Lwt.t;
      handshake : 'socket -> ('flow, ([> `Closed ] as 'error)) result Lwt.t;
      connection : 'flow -> (Mimic.flow * impl, 'error) result Lwt.t;
      close : 't -> unit Lwt.t;
    }
      -> 't service

and ('t, 'socket, 'flow, 'error) posix = {
  accept : 't -> ('socket, 'error) result Lwt.t;
  handshake : 'socket -> ('flow, 'error) result Lwt.t;
  close : 't -> unit Lwt.t;
}
  constraint 'error = [> `Closed ]

let service connection handshake accept close =
  Service { accept; connection; handshake; close }

open Lwt.Infix

let serve_when_ready :
    type t socket flow.
    (t, socket, flow, _) posix ->
    ?stop:Lwt_switch.t ->
    handler:(flow -> unit Lwt.t) ->
    t ->
    [ `Initialized of unit Lwt.t ] =
 fun service ?stop ~handler t ->
  let { accept; handshake; close } = service in
  `Initialized
    (let switched_off =
       let t, u = Lwt.wait () in
       Lwt_switch.add_hook stop (fun () ->
           Lwt.wakeup_later u (Ok `Stopped) ;
           Lwt.return_unit) ;
       t in
     let rec loop () =
       accept t >>= function
       | Ok socket ->
           Lwt.async (fun () ->
               handshake socket >>= function
               | Ok flow -> handler flow
               | Error `Closed ->
                   Logs.info (fun m -> m "Connection closed by peer") ;
                   Lwt.return ()
               | Error _err ->
                   Logs.err (fun m ->
                       m "Got an error from a TCP/IP connection.") ;
                   Lwt.return ()) ;
           loop ()
       | Error `Closed -> Lwt.return_error `Closed
       | Error _ -> Lwt.pause () >>= loop in
     let stop_result =
       Lwt.pick [ switched_off; loop () ] >>= function
       | Ok `Stopped -> close t >>= fun () -> Lwt.return_ok ()
       | Error _ as err -> close t >>= fun () -> Lwt.return err in
     stop_result >>= function Ok () | Error `Closed -> Lwt.return_unit)

let server : type t. t runtime -> sleep:sleep -> t -> Mimic.flow -> unit Lwt.t =
 fun (module Runtime) ~sleep conn flow ->
  let module Server = Server (Mimic) (Runtime) in
  Server.server ~sleep conn flow

let serve ~sleep ?stop service t =
  let (Service { accept; handshake; connection; close }) = service in
  let handler flow =
    connection flow >>= function
    | Ok (flow, Runtime (runtime, conn)) -> server runtime ~sleep conn flow
    | Error _ -> Lwt.return_unit in
  serve_when_ready ?stop ~handler { accept; handshake; close } t

let run : type t. t runtime -> sleep:sleep -> t -> Mimic.flow -> unit Lwt.t =
 fun (module Runtime) ~sleep conn flow ->
  let module Client = Client (Mimic) (Runtime) in
  Client.run ~sleep conn flow