1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
open Conan.Sigs
module Make (S : sig
type +'a t
end) : sig
type t
val prj : ('a, t) io -> 'a S.t
val inj : 'a S.t -> ('a, t) io
end = struct
type t
external prj : ('a, t) io -> 'a S.t = "%identity"
external inj : 'a S.t -> ('a, t) io = "%identity"
end
module Lwt_scheduler = Make (struct
type +'a t = 'a Lwt.t
end)
let ( <.> ) f g x = f (g x)
let lwt =
let open Lwt.Infix in
let open Lwt_scheduler in
let bind x f = inj (prj x >>= (prj <.> f)) in
{ bind; return = (fun x -> inj (Lwt.return x)) }
external get_uint16 : string -> int -> int = "%caml_string_get16"
external get_uint32 : string -> int -> int32 = "%caml_string_get32"
external get_uint64 : string -> int -> int64 = "%caml_string_get64"
module Stream = struct
type t = {
mutable buffer : Bstr.t;
mutable save : int;
mutable seek : int;
stream : unit -> string option Lwt.t;
}
let openfile stream =
{ buffer = Bstr.create 0x1000; save = 0; seek = 0; stream }
let resize t more =
let new_len = ref (Int.max 1 t.save) in
while t.save + more >= !new_len do
new_len := 2 * !new_len
done;
let buffer = Bstr.create !new_len in
Bstr.blit t.buffer ~src_off:0 buffer ~dst_off:0 ~len:t.save;
t.buffer <- buffer
let _max_int = Int64.of_int max_int
open Lwt.Infix
let ( >>? ) = Lwt_result.bind
let rec consume_and_save_to ~abs_offset t =
if abs_offset < t.save then Lwt.return_ok ()
else
t.stream () >>= function
| None -> Lwt.return_error `Out_of_bound
| Some str ->
let max = Bstr.length t.buffer - t.save in
if String.length str > max then resize t (String.length str - max);
Bstr.blit_from_string str ~src_off:0 t.buffer ~dst_off:t.save
~len:(String.length str);
t.save <- t.save + String.length str;
consume_and_save_to ~abs_offset t
let rec save_all t =
t.stream () >>= function
| None -> Lwt.return_unit
| Some str ->
let max = Bstr.length t.buffer - t.save in
if String.length str > max then resize t (String.length str - max);
Bstr.blit_from_string str ~src_off:0 t.buffer ~dst_off:t.save
~len:(String.length str);
t.save <- t.save + String.length str;
save_all t
let seek t offset seek =
if offset > _max_int || offset < 0L then Lwt.return_error `Out_of_bound
else
let offset = Int64.to_int offset in
match seek with
| Conan.Sigs.SET ->
consume_and_save_to ~abs_offset:offset t >>? fun () ->
t.seek <- offset;
Lwt.return_ok ()
| Conan.Sigs.CUR ->
let abs_offset = t.seek + offset in
consume_and_save_to ~abs_offset t >>? fun () ->
t.seek <- t.seek + offset;
Lwt.return_ok ()
| Conan.Sigs.END ->
save_all t >>= fun () ->
let abs_offset = t.save - offset in
if abs_offset >= 0 && abs_offset < t.save then begin
t.seek <- abs_offset;
Lwt.return_ok ()
end
else Lwt.return_error `Out_of_bound
let read t required =
consume_and_save_to ~abs_offset:(t.seek + required) t >>= fun _ ->
let len = Int.min required (t.save - t.seek) in
if len <= 0 then Lwt.return_none
else Lwt.return_some (Bstr.sub_string t.buffer ~off:t.seek ~len)
let read_int8 t =
read t 1 >>= function
| Some str -> Lwt.return_ok (Char.code str.[0])
| None -> Lwt.return_error `Out_of_bound
let read_int16_ne t =
read t 2 >>= function
| Some str when String.length str >= 2 -> Lwt.return_ok (get_uint16 str 0)
| _ -> Lwt.return_error `Out_of_bound
let read_int32_ne t =
read t 4 >>= function
| Some str when String.length str >= 4 -> Lwt.return_ok (get_uint32 str 0)
| _ -> Lwt.return_error `Out_of_bound
let read_int64_ne t =
read t 8 >>= function
| Some str when String.length str >= 8 -> Lwt.return_ok (get_uint64 str 0)
| _ -> Lwt.return_error `Out_of_bound
let rec index buf chr pos limit =
if pos >= limit then raise Not_found;
if Bstr.get buf pos = chr then pos else index buf chr (succ pos) limit
let index str chr ~off ~len = index str chr off (off + len) - off
let line t =
read t 80 >>= fun _ ->
let len = Int.min (t.save - t.seek) 80 in
let off = t.seek in
match index t.buffer '\n' ~off ~len with
| pos ->
t.seek <- t.seek + pos;
let str = Bstr.sub_string t.buffer ~off ~len:(pos + 1) in
Lwt.return_ok (0, pos, str)
| exception Not_found -> Lwt.return_error `Out_of_bound
let read t required =
read t required >>= function
| Some str when String.length str >= required -> Lwt.return_ok str
| _ -> Lwt.return_error `Out_of_bound
let syscall =
let open Lwt_scheduler in
{
seek = (fun f p w -> inj (seek f p w));
read = (fun f l -> inj (read f l));
read_int8 = (fun f -> inj (read_int8 f));
read_int16_ne = (fun f -> inj (read_int16_ne f));
read_int32_ne = (fun f -> inj (read_int32_ne f));
read_int64_ne = (fun f -> inj (read_int64_ne f));
line = (fun f -> inj (line f));
}
end
let msgf fmt = Format.kasprintf (fun msg -> `Msg msg) fmt
open Conan
let run ~database stream =
let open Lwt.Infix in
let fd = Stream.openfile stream in
Lwt.catch (fun () ->
Lwt_scheduler.prj (Process.descending_walk lwt Stream.syscall fd database)
>>= fun x -> Lwt.return_ok x)
@@ fun exn ->
Lwt.return_error (msgf "Internal error: %s" (Printexc.to_string exn))