Skip to content

Commit f69dc74

Browse files
feat: improve sql audit store test (#722)
1 parent fa9082c commit f69dc74

File tree

6 files changed

+415
-329
lines changed

6 files changed

+415
-329
lines changed

community/flamingock-auditstore-sql/src/main/java/io/flamingock/community/sql/internal/SqlAuditorDialectHelper.java

Lines changed: 27 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -138,28 +138,33 @@ public String getCreateTableSqlString(String tableName) {
138138
")'; EXCEPTION WHEN OTHERS THEN IF SQLCODE != -955 THEN RAISE; END IF; END;", tableName, getAutoIncrementType(), getClobType(), getBigIntType(), getClobType(), getBooleanType(), getBooleanType());
139139
case DB2:
140140
return String.format(
141-
"CREATE TABLE %s (" +
142-
"id %s PRIMARY KEY, " +
143-
"execution_id VARCHAR(255), " +
144-
"stage_id VARCHAR(255), " +
145-
"task_id VARCHAR(255), " +
146-
"author VARCHAR(255), " +
147-
"created_at TIMESTAMP DEFAULT CURRENT TIMESTAMP, " +
148-
"state VARCHAR(255), " +
149-
"class_name VARCHAR(255), " +
150-
"method_name VARCHAR(255), " +
151-
"metadata %s, " +
152-
"execution_millis %s, " +
153-
"execution_hostname VARCHAR(255), " +
154-
"error_trace %s, " +
155-
"type VARCHAR(50), " +
156-
"tx_type VARCHAR(50), " +
157-
"target_system_id VARCHAR(255), " +
158-
"order_col VARCHAR(50), " +
159-
"recovery_strategy VARCHAR(50), " +
160-
"transaction_flag %s, " +
161-
"system_change %s" +
162-
")", tableName, getAutoIncrementType(), getClobType(), getBigIntType(), getClobType(), getBooleanType(), getBooleanType());
141+
"BEGIN\n" +
142+
" DECLARE CONTINUE HANDLER FOR SQLSTATE '42710' BEGIN END;\n" +
143+
" EXECUTE IMMEDIATE 'CREATE TABLE %s (" +
144+
"id BIGINT NOT NULL GENERATED ALWAYS AS IDENTITY (START WITH 1, INCREMENT BY 1) PRIMARY KEY," +
145+
"execution_id VARCHAR(255)," +
146+
"stage_id VARCHAR(255)," +
147+
"task_id VARCHAR(255)," +
148+
"author VARCHAR(255)," +
149+
"created_at TIMESTAMP," +
150+
"state VARCHAR(32)," +
151+
"class_name VARCHAR(255)," +
152+
"method_name VARCHAR(255)," +
153+
"metadata CLOB," +
154+
"execution_millis BIGINT," +
155+
"execution_hostname VARCHAR(255)," +
156+
"error_trace CLOB," +
157+
"type VARCHAR(50)," +
158+
"tx_type VARCHAR(50)," +
159+
"target_system_id VARCHAR(255)," +
160+
"order_col VARCHAR(50)," +
161+
"recovery_strategy VARCHAR(50)," +
162+
"transaction_flag SMALLINT," +
163+
"system_change SMALLINT" +
164+
")';\n" +
165+
"END",
166+
tableName);
167+
163168
case SQLITE:
164169
return String.format(
165170
"CREATE TABLE IF NOT EXISTS %s (" +

community/flamingock-auditstore-sql/src/main/java/io/flamingock/community/sql/internal/SqlLockDialectHelper.java

Lines changed: 46 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -78,12 +78,14 @@ public String getCreateTableSqlString(String tableName) {
7878
")'; EXCEPTION WHEN OTHERS THEN IF SQLCODE != -955 THEN RAISE; END IF; END;", tableName);
7979
case DB2:
8080
return String.format(
81-
"CREATE TABLE %s (" +
82-
"\"key\" VARCHAR(255) PRIMARY KEY," +
83-
"status VARCHAR(32)," +
84-
"owner VARCHAR(255)," +
85-
"expires_at TIMESTAMP" +
86-
")", tableName);
81+
"BEGIN " +
82+
"DECLARE CONTINUE HANDLER FOR SQLSTATE '42710' BEGIN END; " +
83+
"EXECUTE IMMEDIATE 'CREATE TABLE %s (" +
84+
"lock_key VARCHAR(255) NOT NULL PRIMARY KEY, " +
85+
"status VARCHAR(32), " +
86+
"owner VARCHAR(255), " +
87+
"expires_at TIMESTAMP)'; " +
88+
"END", tableName);
8789
default:
8890
throw new UnsupportedOperationException("Dialect not supported for CREATE TABLE: " + sqlDialect.name());
8991
}
@@ -93,6 +95,9 @@ public String getSelectLockSqlString(String tableName) {
9395
switch (sqlDialect) {
9496
case POSTGRESQL:
9597
return String.format("SELECT \"key\", status, owner, expires_at FROM %s WHERE \"key\" = ?", tableName);
98+
case DB2:
99+
// Select lock_key as the first column (getLockEntry expects rs.getString(1) to be the key)
100+
return String.format("SELECT lock_key, status, owner, expires_at FROM %s WHERE lock_key = ?", tableName);
96101
case SQLSERVER:
97102
case SYBASE:
98103
return String.format("SELECT [key], status, owner, expires_at FROM %s WITH (UPDLOCK, ROWLOCK) WHERE [key] = ?", tableName);
@@ -145,12 +150,13 @@ public String getInsertOrUpdateLockSqlString(String tableName) {
145150
"MERGE INTO %s (`key`, status, owner, expires_at) KEY (`key`) VALUES (?, ?, ?, ?)",
146151
tableName);
147152
case DB2:
153+
// Use a VALUES-derived table and a target alias for DB2 to avoid parsing issues
148154
return String.format(
149-
"MERGE INTO %s USING (SELECT ? AS \"key\", ? AS status, ? AS owner, ? AS expires_at FROM SYSIBM.SYSDUMMY1) AS src " +
150-
"ON (%s.\"key\" = src.\"key\") " +
155+
"MERGE INTO %s tgt USING (VALUES (?, ?, ?, ?)) src(lock_key, status, owner, expires_at) " +
156+
"ON (tgt.lock_key = src.lock_key) " +
151157
"WHEN MATCHED THEN UPDATE SET status = src.status, owner = src.owner, expires_at = src.expires_at " +
152-
"WHEN NOT MATCHED THEN INSERT (\"key\", status, owner, expires_at) VALUES (src.\"key\", src.status, src.owner, src.expires_at)",
153-
tableName, tableName);
158+
"WHEN NOT MATCHED THEN INSERT (lock_key, status, owner, expires_at) VALUES (src.lock_key, src.status, src.owner, src.expires_at)",
159+
tableName);
154160
case FIREBIRD:
155161
return String.format(
156162
"UPDATE OR INSERT INTO %s (`key`, status, owner, expires_at) VALUES (?, ?, ?, ?) MATCHING (`key`)",
@@ -164,12 +170,41 @@ public String getDeleteLockSqlString(String tableName) {
164170
if (Objects.requireNonNull(sqlDialect) == SqlDialect.POSTGRESQL) {
165171
return String.format("DELETE FROM %s WHERE \"key\" = ?", tableName);
166172
}
173+
if (sqlDialect == SqlDialect.DB2) {
174+
return String.format("DELETE FROM %s WHERE lock_key = ?", tableName);
175+
}
167176
return String.format("DELETE FROM %s WHERE `key` = ?", tableName);
168177
}
169178

170179
public void upsertLockEntry(Connection conn, String tableName, String key, String owner, LocalDateTime expiresAt) throws SQLException {
171180
String sql = getInsertOrUpdateLockSqlString(tableName);
172181

182+
if (sqlDialect == SqlDialect.DB2) {
183+
// UPDATE first
184+
try (PreparedStatement update = conn.prepareStatement(
185+
"UPDATE " + tableName + " SET owner = ?, expires_at = ? WHERE lock_key = ?")) {
186+
update.setString(1, owner);
187+
update.setTimestamp(2, Timestamp.valueOf(expiresAt));
188+
update.setString(3, key);
189+
int updated = update.executeUpdate();
190+
if (updated > 0) {
191+
return;
192+
}
193+
}
194+
195+
// If no row updated, try INSERT
196+
try (PreparedStatement insert = conn.prepareStatement(
197+
"INSERT INTO " + tableName + " (lock_key, status, owner, expires_at) VALUES (?, ?, ?, ?)")) {
198+
insert.setString(1, key);
199+
// Use "LOCKED" string to avoid using a non-existing enum constant (previous "ACQUIRED" caused failures)
200+
insert.setString(2, LockStatus.LOCK_HELD.name());
201+
insert.setString(3, owner);
202+
insert.setTimestamp(4, Timestamp.valueOf(expiresAt));
203+
insert.executeUpdate();
204+
return;
205+
}
206+
}
207+
173208
if (getSqlDialect() == SqlDialect.SQLSERVER || getSqlDialect() == SqlDialect.SYBASE) {
174209
// For SQL Server/Sybase, use Statement and format SQL
175210
try (Statement stmt = conn.createStatement()) {
@@ -186,6 +221,7 @@ public void upsertLockEntry(Connection conn, String tableName, String key, Strin
186221
}
187222
} else {
188223
try (PreparedStatement ps = conn.prepareStatement(sql)) {
224+
// For DB2 we use lock_key but callers pass key as first parameter - that's correct
189225
ps.setString(1, key);
190226
ps.setString(2, LockStatus.LOCK_HELD.name());
191227
ps.setString(3, owner);

community/flamingock-auditstore-sql/src/main/java/io/flamingock/community/sql/internal/SqlLockService.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -136,7 +136,7 @@ public void releaseLock(LockKey lockKey, RunnerId owner) {
136136
String keyStr = lockKey.toString();
137137
try (Connection conn = dataSource.getConnection();
138138
PreparedStatement ps = conn.prepareStatement(
139-
"SELECT owner FROM " + lockRepositoryName + " WHERE `key` = ?")) {
139+
dialectHelper.getSelectLockSqlString(lockRepositoryName))) {
140140
ps.setString(1, keyStr);
141141
try (ResultSet rs = ps.executeQuery()) {
142142
if (rs.next()) {
Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
/*
2+
* Copyright 2025 Flamingock (https://www.flamingock.io)
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.flamingock.community.sql;
17+
18+
import org.testcontainers.containers.*;
19+
import org.testcontainers.containers.wait.strategy.Wait;
20+
import org.testcontainers.containers.wait.strategy.WaitAllStrategy;
21+
import org.testcontainers.utility.DockerImageName;
22+
23+
import java.time.Duration;
24+
import java.util.concurrent.ConcurrentHashMap;
25+
import javax.sql.DataSource;
26+
import com.zaxxer.hikari.HikariConfig;
27+
import com.zaxxer.hikari.HikariDataSource;
28+
29+
public final class SharedSqlContainers {
30+
31+
private static final ConcurrentHashMap<String, JdbcDatabaseContainer<?>> CONTAINERS = new ConcurrentHashMap<>();
32+
33+
private SharedSqlContainers() { }
34+
35+
public static JdbcDatabaseContainer<?> getContainer(String dialectName) {
36+
boolean isCi = System.getenv("CI") != null || System.getenv("GITHUB_ACTIONS") != null;
37+
return CONTAINERS.computeIfAbsent(dialectName, key -> createContainerInternal(key, isCi));
38+
}
39+
40+
private static JdbcDatabaseContainer<?> createContainerInternal(String dialectName, boolean isCi) {
41+
switch (dialectName) {
42+
case "mysql": {
43+
MySQLContainer<?> c = new MySQLContainer<>("mysql:8.0")
44+
.withDatabaseName("testdb")
45+
.withUsername("testuser")
46+
.withPassword("testpass");
47+
if (!isCi) c.withReuse(true);
48+
return c;
49+
}
50+
case "sqlserver": {
51+
MSSQLServerContainer<?> c = new MSSQLServerContainer<>("mcr.microsoft.com/mssql/server:2019-CU18-ubuntu-20.04")
52+
.acceptLicense()
53+
.withPassword("TestPass123!");
54+
if (!isCi) c.withReuse(true);
55+
return c;
56+
}
57+
case "oracle": {
58+
OracleContainer c = new OracleContainer(
59+
DockerImageName.parse("gvenzl/oracle-free:23-slim-faststart")
60+
.asCompatibleSubstituteFor("gvenzl/oracle-xe")) {
61+
@Override
62+
public String getDatabaseName() {
63+
return "FREEPDB1";
64+
}
65+
}
66+
.withPassword("oracle123")
67+
.withSharedMemorySize(1073741824L)
68+
.withStartupTimeout(Duration.ofMinutes(20))
69+
.waitingFor(new WaitAllStrategy()
70+
.withStrategy(Wait.forListeningPort())
71+
.withStrategy(Wait.forLogMessage(".*DATABASE IS READY TO USE.*\\n", 1))
72+
)
73+
.withEnv("ORACLE_CHARACTERSET", "AL32UTF8");
74+
if (!isCi) c.withReuse(true);
75+
return c;
76+
}
77+
case "postgresql": {
78+
PostgreSQLContainer<?> c = new PostgreSQLContainer<>(DockerImageName.parse("postgres:15"))
79+
.withDatabaseName("testdb")
80+
.withUsername("test")
81+
.withPassword("test");
82+
if (!isCi) c.withReuse(true);
83+
return c;
84+
}
85+
case "mariadb": {
86+
MariaDBContainer<?> c = new MariaDBContainer<>("mariadb:11.3.2")
87+
.withDatabaseName("testdb")
88+
.withUsername("testuser")
89+
.withPassword("testpass");
90+
if (!isCi) c.withReuse(true);
91+
return c;
92+
}
93+
default:
94+
throw new IllegalArgumentException("Unsupported dialect: " + dialectName);
95+
}
96+
}
97+
98+
public static void stopAll() {
99+
CONTAINERS.values().forEach(JdbcDatabaseContainer::stop);
100+
CONTAINERS.clear();
101+
}
102+
103+
public static DataSource createDataSource(JdbcDatabaseContainer<?> container) {
104+
HikariConfig config = new HikariConfig();
105+
config.setJdbcUrl(container.getJdbcUrl());
106+
config.setUsername(container.getUsername());
107+
config.setPassword(container.getPassword());
108+
config.setDriverClassName(container.getDriverClassName());
109+
return new HikariDataSource(config);
110+
}
111+
}

0 commit comments

Comments
 (0)