1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
open Lwt.Infix
let max_chunk_size = 4096
type t = {
mutable state : [
| `Open of Lwt_unix.file_descr * unit Lwt_condition.t
| `Readonly of string
| `Empty
| `Finished
];
mutable len : int;
}
let with_dup fd fn =
let fd = Lwt_unix.dup ~cloexec:true fd in
Lwt.finalize
(fun () -> fn fd)
(fun () -> Lwt_unix.close fd)
let catch_cancel fn =
Lwt.catch fn
(function
| Lwt.Canceled -> Lwt_result.fail `Cancelled
| ex -> Lwt.fail ex
)
let tail ?switch t dst =
let rec readonly_tail ch buf =
Lwt_io.read_into ch buf 0 max_chunk_size >>= function
| 0 -> Lwt_result.return ()
| n -> dst (Bytes.sub_string buf 0 n); readonly_tail ch buf
in
let rec open_tail fd cond buf i =
match switch with
| Some sw when not (Lwt_switch.is_on sw) -> Lwt_result.fail `Cancelled
| Some _ | None ->
let avail = min (t.len - i) max_chunk_size in
if avail > 0 then (
Lwt_unix.pread fd ~file_offset:i buf 0 avail >>= fun n ->
dst (Bytes.sub_string buf 0 n);
open_tail fd cond buf (i + avail)
) else (
match t.state with
| `Open _ -> Lwt_condition.wait cond >>= fun () -> open_tail fd cond buf i
| `Readonly _ | `Empty | `Finished -> Lwt_result.return ()
)
in
let interrupt th =
catch_cancel @@ fun () ->
Lwt_switch.add_hook_or_exec switch (fun () -> Lwt.cancel th; Lwt.return_unit) >>= fun () ->
th
in
match t.state with
| `Finished -> invalid_arg "tail: log is finished!"
| `Readonly path ->
let flags = [Unix.O_RDONLY; Unix.O_NONBLOCK; Unix.O_CLOEXEC] in
Lwt_io.(with_file ~mode:input ~flags) path @@ fun ch ->
let buf = Bytes.create max_chunk_size in
interrupt (readonly_tail ch buf)
| `Empty -> Lwt_result.return ()
| `Open (fd, cond) ->
with_dup fd @@ fun fd ->
let buf = Bytes.create max_chunk_size in
interrupt (open_tail fd cond buf 0)
let create path =
Lwt_unix.openfile path Lwt_unix.[O_CREAT; O_TRUNC; O_RDWR; O_CLOEXEC] 0o666 >|= fun fd ->
let cond = Lwt_condition.create () in
{
state = `Open (fd, cond);
len = 0;
}
let finish t =
match t.state with
| `Finished -> invalid_arg "Log is already finished!"
| `Open (fd, cond) ->
t.state <- `Finished;
Lwt_unix.close fd >|= fun () ->
Lwt_condition.broadcast cond ()
| `Readonly _ ->
t.state <- `Finished;
Lwt.return_unit
| `Empty ->
Lwt.return_unit
let write t data =
match t.state with
| `Finished -> invalid_arg "write: log is finished!"
| `Readonly _ | `Empty -> invalid_arg "Log is read-only!"
| `Open (fd, cond) ->
let len = String.length data in
Os.write_all fd (Bytes.of_string data) 0 len >>= fun () ->
t.len <- t.len + len;
Lwt_condition.broadcast cond ();
Lwt.return_unit
let of_saved path =
Lwt_unix.lstat path >|= fun stat ->
{
state = `Readonly path;
len = stat.st_size;
}
let printf t fmt =
Fmt.kstr (write t) fmt
let empty = {
state = `Empty;
len = 0;
}
let copy ~src ~dst =
let buf = Bytes.create max_chunk_size in
let rec aux () =
Lwt_unix.read src buf 0 (Bytes.length buf) >>= function
| 0 -> Lwt.return_unit
| n -> write dst (Bytes.sub_string buf 0 n) >>= aux
in
aux ()