Source file curl_lwt.ml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
(** Lwt support for Curl *)

module M = Curl.Multi

type multi = {
  mt : Curl.Multi.mt;
  all_events : (Unix.file_descr, Lwt_engine.event list) Hashtbl.t;
  wakeners : (Curl.t, Curl.curlCode Lwt.u) Hashtbl.t;
}

let create () =
  let mt = M.create () in
  let timer_event = ref Lwt_engine.fake_event in
  let all_events = Hashtbl.create 32 in
  let wakeners = Hashtbl.create 32 in
  let finished _ =
    let rec loop n =
      match M.remove_finished mt with
      | None -> ()
      | Some (h,code) ->
        begin try
          let w = Hashtbl.find wakeners h in
          Hashtbl.remove wakeners h;
          Lwt.wakeup w code
        with Not_found ->
          prerr_endline "curl_lwt: orphan handle, how come?"
        end;
        loop (n+1)
    in
    loop 0
  in
  let on_readable fd _ =
    let (_:int) = M.action mt fd M.EV_IN in
    finished "on_readable";
  in
  let on_writable fd _ =
    let (_:int) = M.action mt fd M.EV_OUT in
    finished "on_writable";
  in
  let on_timer _ =
    Lwt_engine.stop_event !timer_event;
    M.action_timeout mt;
    finished "on_timer"
  in
  M.set_timer_function mt begin fun timeout ->
    Lwt_engine.stop_event !timer_event; (* duplicate stop_event is ok *)
    timer_event := Lwt_engine.on_timer (float_of_int timeout /. 1000.) false on_timer
  end;
  M.set_socket_function mt begin fun fd what ->
    begin
    try
      List.iter Lwt_engine.stop_event (Hashtbl.find all_events fd);
      Hashtbl.remove all_events fd;
    with
      Not_found -> () (* first event for the socket - no association *)
    end;
    let events = match what with
    | M.POLL_REMOVE | M.POLL_NONE -> []
    | M.POLL_IN -> [Lwt_engine.on_readable fd (on_readable fd)]
    | M.POLL_OUT -> [Lwt_engine.on_writable fd (on_writable fd)]
    | M.POLL_INOUT -> [Lwt_engine.on_readable fd (on_readable fd); Lwt_engine.on_writable fd (on_writable fd)]
    in
    match events with
    | [] -> ()
    | _ -> Hashtbl.add all_events fd events;
  end;
  { mt; all_events; wakeners; }

(* lwt may not run in parallel so one global is OK'ish *)
let global = lazy (create ())

let setopt opt =
  let t = Lazy.force global in
  M.setopt t.mt opt

let perform h =
  let t = Lazy.force global in
  let (waiter,wakener) = Lwt.wait () in
  let waiter = Lwt.protected waiter in
  Lwt.on_cancel waiter (fun () ->
    Curl.Multi.remove t.mt h;
    Hashtbl.remove t.wakeners h;
  );
  Hashtbl.add t.wakeners h wakener;
  M.add t.mt h;
  waiter