1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
open Lwt.Infix
let max_chunk_size = 10240L
type t = {
data : Buffer.t;
mutable cond : [ `Running of unit Lwt_condition.t
| `Finished ]
}
let create () =
{
data = Buffer.create 10240;
cond = `Running (Lwt_condition.create ());
}
let rec stream t ~start =
let len = Int64.of_int (Buffer.length t.data) in
let start = if start < 0L then max 0L (Int64.add len start) else start in
let avail = Int64.sub len start in
if avail < 0L then Fmt.failwith "Start value out of range!";
if avail = 0L then (
match t.cond with
| `Running cond ->
Lwt_condition.wait cond >>= fun () ->
stream t ~start
| `Finished ->
Lwt.return ("", start)
) else (
let chunk = min avail max_chunk_size in
let next = Int64.add start chunk in
let start = Int64.to_int start in
let chunk = Int64.to_int chunk in
Lwt.return (Buffer.sub t.data start chunk, next)
)
let write t data =
match t.cond with
| `Running cond ->
Buffer.add_string t.data data;
Lwt_condition.broadcast cond ()
| `Finished ->
Fmt.failwith "Attempt to write to log after close: %S" data
let copy_from_stream t src =
let rec aux () =
Lwt_io.read ~count:4096 src >>= function
| "" -> Lwt.return_unit
| data -> write t data; aux ()
in
aux ()
let close t =
match t.cond with
| `Running cond ->
t.cond <- `Finished;
Lwt_condition.broadcast cond ()
| `Finished ->
Fmt.failwith "Log already closed!"
let info t fmt =
Fmt.kstr (write t) (fmt ^^ "@.")