1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
module Database = Sihl_database
module MariaDb : Sig.REPO = struct
let create_request =
Caqti_request.exec
Caqti_type.unit
{sql|
CREATE TABLE IF NOT EXISTS core_migration_state (
namespace VARCHAR(128) NOT NULL,
version INTEGER,
dirty BOOL NOT NULL,
PRIMARY KEY (namespace)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci;
|sql}
;;
let create_table_if_not_exists () =
Database.Service.query (fun (module Connection : Caqti_lwt.CONNECTION) ->
Connection.exec create_request () |> Lwt.map Database.Service.raise_error)
;;
let get_request =
Caqti_request.find_opt
Caqti_type.string
Caqti_type.(tup3 string int bool)
{sql|
SELECT
namespace,
version,
dirty
FROM core_migration_state
WHERE namespace = ?;
|sql}
;;
let get ~namespace =
Database.Service.query (fun (module Connection : Caqti_lwt.CONNECTION) ->
Connection.find_opt get_request namespace |> Lwt.map Database.Service.raise_error)
|> Lwt.map (Option.map Model.of_tuple)
;;
let upsert_request =
Caqti_request.exec
Caqti_type.(tup3 string int bool)
{sql|
INSERT INTO core_migration_state (
namespace,
version,
dirty
) VALUES (
?,
?,
?
) ON DUPLICATE KEY UPDATE
version = VALUES(version),
dirty = VALUES(dirty)
|sql}
;;
let upsert ~state =
Database.Service.query (fun (module Connection : Caqti_lwt.CONNECTION) ->
Connection.exec upsert_request (Model.to_tuple state)
|> Lwt.map Database.Service.raise_error)
;;
end
module PostgreSql : Sig.REPO = struct
let create_request =
Caqti_request.exec
Caqti_type.unit
{sql|
CREATE TABLE IF NOT EXISTS core_migration_state (
namespace VARCHAR(128) NOT NULL PRIMARY KEY,
version INTEGER,
dirty BOOL NOT NULL
);
|sql}
;;
let create_table_if_not_exists () =
Database.Service.query (fun (module Connection : Caqti_lwt.CONNECTION) ->
Connection.exec create_request () |> Lwt.map Database.Service.raise_error)
;;
let get_request =
Caqti_request.find_opt
Caqti_type.string
Caqti_type.(tup3 string int bool)
{sql|
SELECT
namespace,
version,
dirty
FROM core_migration_state
WHERE namespace = ?;
|sql}
;;
let get ~namespace =
Database.Service.query (fun (module Connection : Caqti_lwt.CONNECTION) ->
Connection.find_opt get_request namespace |> Lwt.map Database.Service.raise_error)
|> Lwt.map (Option.map Model.of_tuple)
;;
let upsert_request =
Caqti_request.exec
Caqti_type.(tup3 string int bool)
{sql|
INSERT INTO core_migration_state (
namespace,
version,
dirty
) VALUES (
?,
?,
?
) ON CONFLICT (namespace)
DO UPDATE SET version = EXCLUDED.version,
dirty = EXCLUDED.dirty
|sql}
;;
let upsert ~state =
Database.Service.query (fun (module Connection : Caqti_lwt.CONNECTION) ->
Connection.exec upsert_request (Model.to_tuple state)
|> Lwt.map Database.Service.raise_error)
;;
end