Source file block_request.ml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
open Blkproto.Req
type request = {
id: int64 list;
op: op;
sector: int64;
length: int;
buffers: Cstruct.t list;
depends: int64 list;
}
let string_of_request r =
let int64 x = Int64.to_string x in
let int x = string_of_int x in
let list ty xs = String.concat "; " (List.map ty xs) in
Printf.sprintf "{ id = [ %s ]; op = %s; sector = %Ld; length = %d; buffers = [ %s ]; depends = [ %s ]}"
(list int64 r.id) (string_of_op r.op) r.sector r.length (list int (List.map Cstruct.length r.buffers)) (list int64 r.depends)
type t = request list
let empty = []
let conflicts a b = match a.op, b.op with
| Read, Read -> false
| _, _ ->
let open Int64 in
not (add a.sector (of_int a.length) < b.sector
|| (add b.sector (of_int b.length) < a.sector))
let add t id op sector buffers =
let length = List.fold_left (+) 0 (List.map Cstruct.length buffers) / 512 in
let r = { id = [id]; op; sector; length; buffers; depends = [] } in
let depends = List.(concat (map (fun r -> r.id) (filter (conflicts r) t))) in
let r = { r with depends } in
r :: t
let coalesce requests =
let rec reqs finished offset current = function
| [] -> List.rev (if current = [] then finished else (List.rev current) :: finished)
| r :: rs when r.sector = offset -> reqs finished (Int64.(add offset (of_int r.length))) (r :: current) rs
| r :: rs -> reqs (if current = [] then finished else current :: finished) (Int64.(add r.sector (of_int r.length))) [ r ] rs in
let rec merge_buffers finished current = function
| [] -> List.rev (if Cstruct.length current = 0 then finished else current :: finished)
| b :: bs -> merge_buffers (if Cstruct.length current = 0 then finished else current :: finished) b bs in
let merge requests =
let batches = reqs [] (-1L) [] requests in
List.map (function
| [] -> []
| r :: rs -> [ { r with id = List.concat (List.map (fun r -> r.id) (r :: rs));
length = List.fold_left (+) 0 (List.map (fun r -> r.length) (r :: rs));
buffers = merge_buffers [] (Cstruct.create 0) (List.concat (List.map (fun r -> r.buffers) (r :: rs))) } ]
) batches in
let sorted = List.sort (fun a b -> compare a.sector b.sector) requests in
let reads = List.filter (fun r -> r.op = Read) sorted in
let writes = List.filter (fun r -> r.op = Write) sorted in
List.concat (merge reads @ (merge writes))
let pop t =
let nodeps, deps = List.partition (fun t -> t.depends = []) t in
let nodeps_ids = List.(concat (map (fun t -> t.id) nodeps)) in
let deps = List.map (fun t -> { t with depends = List.filter (fun x -> not(List.mem x nodeps_ids)) t.depends }) deps in
let nodeps = List.rev nodeps in
coalesce nodeps, deps