1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
open Lwt.Infix
let max_chunk_size = 4096
type t = {
mutable state : [
| `Open of Lwt_unix.file_descr * unit Lwt_condition.t
| `Readonly of string
| `Empty
| `Finished
];
mutable len : int;
}
let with_dup fd fn =
let fd = Lwt_unix.dup fd in
Lwt_unix.set_close_on_exec fd;
Lwt.finalize
(fun () -> fn fd)
(fun () -> Lwt_unix.close fd)
let catch_cancel fn =
Lwt.catch fn
(function
| Lwt.Canceled -> Lwt_result.fail `Cancelled
| ex -> Lwt.fail ex
)
let tail ?switch t dst =
match t.state with
| `Finished -> invalid_arg "tail: log is finished!"
| `Readonly path ->
Lwt_io.(with_file ~mode:input) path @@ fun ch ->
let buf = Bytes.create max_chunk_size in
let rec aux () =
Lwt_io.read_into ch buf 0 max_chunk_size >>= function
| 0 -> Lwt_result.return ()
| n -> dst (Bytes.sub_string buf 0 n); aux ()
in
catch_cancel @@ fun () ->
let th = aux () in
Lwt_switch.add_hook_or_exec switch (fun () -> Lwt.cancel th; Lwt.return_unit) >>= fun () ->
th
| `Empty -> Lwt_result.return ()
| `Open (fd, cond) ->
with_dup fd @@ fun fd ->
let buf = Bytes.create max_chunk_size in
let rec aux i =
match switch with
| Some sw when not (Lwt_switch.is_on sw) -> Lwt_result.fail `Cancelled
| _ ->
let avail = min (t.len - i) max_chunk_size in
if avail > 0 then (
Lwt_unix.pread fd ~file_offset:i buf 0 avail >>= fun n ->
dst (Bytes.sub_string buf 0 n);
aux (i + avail)
) else (
match t.state with
| `Open _ -> Lwt_condition.wait cond >>= fun () -> aux i
| _ -> Lwt_result.return ()
)
in
catch_cancel @@ fun () ->
let th = aux 0 in
Lwt_switch.add_hook_or_exec switch (fun () -> Lwt.cancel th; Lwt.return_unit) >>= fun () ->
th
let create path =
Lwt_unix.openfile path Lwt_unix.[O_CREAT; O_TRUNC; O_RDWR; O_CLOEXEC] 0o666 >|= fun fd ->
let cond = Lwt_condition.create () in
{
state = `Open (fd, cond);
len = 0;
}
let finish t =
match t.state with
| `Finished -> invalid_arg "Log is already finished!"
| `Open (fd, cond) ->
t.state <- `Finished;
Lwt_unix.close fd >|= fun () ->
Lwt_condition.broadcast cond ()
| `Readonly _ ->
t.state <- `Finished;
Lwt.return_unit
| `Empty ->
Lwt.return_unit
let write t data =
match t.state with
| `Finished -> invalid_arg "write: log is finished!"
| `Readonly _ | `Empty -> invalid_arg "Log is read-only!"
| `Open (fd, cond) ->
let len = String.length data in
Os.write_all fd (Bytes.of_string data) 0 len >>= fun () ->
t.len <- t.len + len;
Lwt_condition.broadcast cond ();
Lwt.return_unit
let of_saved path =
Lwt_unix.lstat path >|= fun stat ->
{
state = `Readonly path;
len = stat.st_size;
}
let printf t fmt =
Fmt.kstrf (write t) fmt
let empty = {
state = `Empty;
len = 0;
}