Source file smtp_events.ml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
open! Core
open! Async
open Async_smtp_types
module Time = Time_unix
module Event = struct
module Envelope_received = struct
type t =
{ sender : string
; recipients : string list
}
[@@deriving sexp, bin_io]
end
type t = Time.t * [ `Envelope_received of Envelope_received.t ]
[@@deriving sexp, bin_io]
end
type t = { event_stream : (Event.t -> unit) Bus.Read_write.t }
let envelope_received t envelope =
let sender = Smtp_envelope.string_sender envelope in
let recipients = Smtp_envelope.string_recipients envelope in
let event =
Time.now (), `Envelope_received { Event.Envelope_received.sender; recipients }
in
Bus.write t.event_stream event
;;
let create () =
let event_stream =
Bus.create
[%here]
Arity1
~on_subscription_after_first_write:Allow
~on_callback_raise:Error.raise
in
{ event_stream }
;;
let event_stream t = Async_bus.pipe1_exn t.event_stream [%here]