Source file Client.ml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73

open Lwt.Infix
module R = Redis_lwt.Client

type config = {
  debug: bool;
  prefix: string;
  host: string;
  port: int;
}

type t = {
  config: config;
  send: R.connection;
  received: R.connection;
  stream: Message_t.message Lwt_stream.t;
  stop: unit Lwt.u;
  join: unit Lwt.t;
}

let received self = self.stream
let join self = self.join

let on_received self f : unit =
  Lwt.async (fun () -> Lwt_stream.iter_p f self.stream)

let send (self:t) (m:Message_t.send) : unit Lwt.t =
  if self.config.debug then (
    Printf.eprintf "send msg %s\n%!" (Message_j.string_of_send m);
  );
  R.publish self.send (self.config.prefix ^ ":send") (Message_j.string_of_send m)
  >|= fun (_:int) -> ()

let get_msg (l:R.reply list) : _ list Lwt.t =
  Lwt_list.filter_map_p
    (function
      | `Bulk (Some s) ->
        begin match Message_j.message_of_string s with
          | m -> Lwt.return_some m
          | exception _ -> Lwt.return None
        end
      | _ -> Lwt.return None)
    l

let close (self:t) =
  Lwt.wakeup self.stop ();
  R.disconnect self.received >>= fun () ->
  R.disconnect self.send

let connect_ (config:config) : (t,_) result Lwt.t =
  Lwt.catch
    (fun () ->
       let {host;port;_} = config in
       let spec = {R.host;port} in
       if config.debug then Printf.eprintf "connecting to redis on %s:%d...\n%!" host port;
       let join, stop = Lwt.wait () in
       R.connect spec >>= fun send ->
       R.connect spec >>= fun received ->
       let str = R.stream received in
       Lwt.on_termination (Lwt_stream.closed str) (fun _ -> Lwt.wakeup stop ());
       R.subscribe received [config.prefix ^ ":received"] >>= fun () ->
       let stream = Lwt_stream.map_list_s get_msg str in
       if config.debug then (
         Printf.eprintf "connected to redis on %s:%d\n%!" host port;
       );
       let st = {config; send; received; join; stop; stream} in
       Lwt.return (Ok st))
    (fun e ->
       Lwt.return (Error ("redis lib : " ^ Printexc.to_string e)))

let make ?(debug=false) ?(prefix="irc") ?(host="127.0.0.1") ?(port=6379) () : (t,_) result Lwt.t =
  let config = {prefix; port; host; debug} in
  connect_ config