Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -92,4 +92,13 @@ dynamodb-local-metadata*
# Claude Code configuration files
.claude/settings.local.json

/flamingock-cli-dist
/flamingock-cli-dist

# SQLite test DB files generated by tests
test_*.db
test_*.db-wal
test_*.db-shm

# Generic SQLite artifacts
*.db-wal
*.db-shm
1 change: 1 addition & 0 deletions community/flamingock-auditstore-sql/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ dependencies {
testImplementation("com.h2database:h2:2.2.224")
testImplementation("org.mockito:mockito-inline:4.11.0")
testImplementation("org.xerial:sqlite-jdbc:3.41.2.1")
testImplementation("com.ibm.informix:jdbc:4.50.10")
}

description = "SQL audit store implementation for distributed change auditing"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import io.flamingock.internal.common.core.audit.AuditEntry;
import io.flamingock.internal.common.core.audit.AuditReader;
import io.flamingock.internal.common.core.audit.AuditTxType;
import io.flamingock.internal.common.sql.SqlDialect;
import io.flamingock.internal.core.store.audit.LifecycleAuditWriter;
import io.flamingock.internal.util.Result;

Expand Down Expand Up @@ -53,35 +54,53 @@ public void initialize() {

@Override
public Result writeEntry(AuditEntry auditEntry) {
try (Connection conn = dataSource.getConnection();
PreparedStatement ps = conn.prepareStatement(
dialectHelper.getInsertSqlString(auditTableName))) {
ps.setString(1, auditEntry.getExecutionId());
ps.setString(2, auditEntry.getStageId());
ps.setString(3, auditEntry.getTaskId());
ps.setString(4, auditEntry.getAuthor());
ps.setTimestamp(5, Timestamp.valueOf(auditEntry.getCreatedAt()));
ps.setString(6, auditEntry.getState() != null ? auditEntry.getState().name() : null);
ps.setString(7, auditEntry.getClassName());
ps.setString(8, auditEntry.getMethodName());
ps.setString(9, auditEntry.getMetadata() != null ? auditEntry.getMetadata().toString() : null);
ps.setLong(10, auditEntry.getExecutionMillis());
ps.setString(11, auditEntry.getExecutionHostname());
ps.setString(12, auditEntry.getErrorTrace());
ps.setString(13, auditEntry.getType() != null ? auditEntry.getType().name() : null);
ps.setString(14, auditEntry.getTxType() != null ? auditEntry.getTxType().name() : null);
ps.setString(15, auditEntry.getTargetSystemId());
ps.setString(16, auditEntry.getOrder());
ps.setString(17, auditEntry.getRecoveryStrategy() != null ? auditEntry.getRecoveryStrategy().name() : null);
ps.setObject(18, auditEntry.getTransactionFlag());
ps.setObject(19, auditEntry.getSystemChange());
ps.executeUpdate();
Connection conn = null;
try {
conn = dataSource.getConnection();

// For Informix, ensure autoCommit is enabled for audit writes
if (dialectHelper.getSqlDialect() == SqlDialect.INFORMIX) {
conn.setAutoCommit(true);
}

try (PreparedStatement ps = conn.prepareStatement(
dialectHelper.getInsertSqlString(auditTableName))) {
ps.setString(1, auditEntry.getExecutionId());
ps.setString(2, auditEntry.getStageId());
ps.setString(3, auditEntry.getTaskId());
ps.setString(4, auditEntry.getAuthor());
ps.setTimestamp(5, Timestamp.valueOf(auditEntry.getCreatedAt()));
ps.setString(6, auditEntry.getState() != null ? auditEntry.getState().name() : null);
ps.setString(7, auditEntry.getClassName());
ps.setString(8, auditEntry.getMethodName());
ps.setString(9, auditEntry.getMetadata() != null ? auditEntry.getMetadata().toString() : null);
ps.setLong(10, auditEntry.getExecutionMillis());
ps.setString(11, auditEntry.getExecutionHostname());
ps.setString(12, auditEntry.getErrorTrace());
ps.setString(13, auditEntry.getType() != null ? auditEntry.getType().name() : null);
ps.setString(14, auditEntry.getTxType() != null ? auditEntry.getTxType().name() : null);
ps.setString(15, auditEntry.getTargetSystemId());
ps.setString(16, auditEntry.getOrder());
ps.setString(17, auditEntry.getRecoveryStrategy() != null ? auditEntry.getRecoveryStrategy().name() : null);
ps.setObject(18, auditEntry.getTransactionFlag());
ps.setObject(19, auditEntry.getSystemChange());
ps.executeUpdate();
}
return Result.OK();
} catch (SQLException e) {
return new Result.Error(e);
} finally {
if (conn != null) {
try {
conn.close();
} catch (SQLException e) {
// Log but don't throw
}
}
}
}


@Override
public List<AuditEntry> getAuditHistory() {
List<AuditEntry> entries = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ public String getCreateTableSqlString(String tableName) {
case H2:
case HSQLDB:
case FIREBIRD:
case INFORMIX:
return String.format(
"CREATE TABLE IF NOT EXISTS %s (" +
"id %s PRIMARY KEY, " +
Expand Down Expand Up @@ -189,6 +188,32 @@ public String getCreateTableSqlString(String tableName) {
"transaction_flag INTEGER, " +
"system_change INTEGER" +
")", tableName);
case INFORMIX:
return String.format(
"CREATE TABLE IF NOT EXISTS %s (" +
"id SERIAL8 PRIMARY KEY, " +
"execution_id VARCHAR(100), " +
"stage_id VARCHAR(100), " +
"task_id VARCHAR(100), " +
"author VARCHAR(100), " +
"created_at DATETIME YEAR TO FRACTION(3) DEFAULT CURRENT YEAR TO FRACTION(3), " +
"state VARCHAR(50), " +
"class_name VARCHAR(200), " +
"method_name VARCHAR(100), " +
"metadata LVARCHAR(8000), " +
"execution_millis BIGINT, " +
"execution_hostname VARCHAR(100), " +
"error_trace LVARCHAR(8000), " +
"type VARCHAR(50), " +
"tx_type VARCHAR(50), " +
"target_system_id VARCHAR(100), " +
"order_col VARCHAR(50), " +
"recovery_strategy VARCHAR(50), " +
"transaction_flag BOOLEAN, " +
"system_change BOOLEAN" +
")", tableName);


default:
throw new UnsupportedOperationException("Dialect not supported for CREATE TABLE: " + sqlDialect.name());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ public String getCreateTableSqlString(String tableName) {
case H2:
case HSQLDB:
case FIREBIRD:
case INFORMIX:
return String.format(
"CREATE TABLE IF NOT EXISTS %s (" +
"`key` VARCHAR(255) PRIMARY KEY," +
Expand Down Expand Up @@ -86,6 +85,14 @@ public String getCreateTableSqlString(String tableName) {
"owner VARCHAR(255), " +
"expires_at TIMESTAMP)'; " +
"END", tableName);
case INFORMIX:
return String.format(
"CREATE TABLE %s (" +
"lock_key VARCHAR(255) PRIMARY KEY, " +
"status VARCHAR(32), " +
"owner VARCHAR(255), " +
"expires_at DATETIME YEAR TO FRACTION(3)" +
")", tableName);
default:
throw new UnsupportedOperationException("Dialect not supported for CREATE TABLE: " + sqlDialect.name());
}
Expand All @@ -103,6 +110,8 @@ public String getSelectLockSqlString(String tableName) {
return String.format("SELECT [key], status, owner, expires_at FROM %s WITH (UPDLOCK, ROWLOCK) WHERE [key] = ?", tableName);
case ORACLE:
return String.format("SELECT \"key\", status, owner, expires_at FROM %s WHERE \"key\" = ? FOR UPDATE", tableName);
case INFORMIX:
return String.format("SELECT lock_key, status, owner, expires_at FROM %s WHERE lock_key = ?", tableName);
default:
return String.format("SELECT `key`, status, owner, expires_at FROM %s WHERE `key` = ?", tableName);
}
Expand All @@ -112,7 +121,6 @@ public String getInsertOrUpdateLockSqlString(String tableName) {
switch (sqlDialect) {
case MYSQL:
case MARIADB:
case INFORMIX:
return String.format(
"INSERT INTO %s (`key`, status, owner, expires_at) VALUES (?, ?, ?, ?) " +
"ON DUPLICATE KEY UPDATE status = VALUES(status), owner = VALUES(owner), expires_at = VALUES(expires_at)",
Expand Down Expand Up @@ -161,6 +169,15 @@ public String getInsertOrUpdateLockSqlString(String tableName) {
return String.format(
"UPDATE OR INSERT INTO %s (`key`, status, owner, expires_at) VALUES (?, ?, ?, ?) MATCHING (`key`)",
tableName);
case INFORMIX:
// Informix doesn't support ON DUPLICATE KEY UPDATE
// Use a procedural approach similar to SQL Server
return String.format(
"UPDATE %s SET status = ?, owner = ?, expires_at = ? WHERE lock_key = ?; " +
"INSERT INTO %s (lock_key, status, owner, expires_at) " +
"SELECT ?, ?, ?, ? FROM sysmaster:sysdual " +
"WHERE NOT EXISTS (SELECT 1 FROM %s WHERE lock_key = ?)",
tableName, tableName, tableName);
default:
throw new UnsupportedOperationException("Dialect not supported for upsert: " + sqlDialect.name());
}
Expand All @@ -170,7 +187,7 @@ public String getDeleteLockSqlString(String tableName) {
if (Objects.requireNonNull(sqlDialect) == SqlDialect.POSTGRESQL) {
return String.format("DELETE FROM %s WHERE \"key\" = ?", tableName);
}
if (sqlDialect == SqlDialect.DB2) {
if (sqlDialect == SqlDialect.INFORMIX || sqlDialect == SqlDialect.DB2) {
return String.format("DELETE FROM %s WHERE lock_key = ?", tableName);
}
return String.format("DELETE FROM %s WHERE `key` = ?", tableName);
Expand All @@ -196,13 +213,38 @@ public void upsertLockEntry(Connection conn, String tableName, String key, Strin
try (PreparedStatement insert = conn.prepareStatement(
"INSERT INTO " + tableName + " (lock_key, status, owner, expires_at) VALUES (?, ?, ?, ?)")) {
insert.setString(1, key);
// Use "LOCKED" string to avoid using a non-existing enum constant (previous "ACQUIRED" caused failures)
insert.setString(2, LockStatus.LOCK_HELD.name());
insert.setString(3, owner);
insert.setTimestamp(4, Timestamp.valueOf(expiresAt));
insert.executeUpdate();
return;
}
return;
}

if (getSqlDialect() == SqlDialect.INFORMIX) {
// Try UPDATE first
try (PreparedStatement update = conn.prepareStatement(
"UPDATE " + tableName + " SET status = ?, owner = ?, expires_at = ? WHERE lock_key = ?")) {
update.setString(1, LockStatus.LOCK_HELD.name());
update.setString(2, owner);
update.setTimestamp(3, Timestamp.valueOf(expiresAt));
update.setString(4, key);
int updated = update.executeUpdate();
if (updated > 0) {
return;
}
}

// If no row updated, try INSERT
try (PreparedStatement insert = conn.prepareStatement(
"INSERT INTO " + tableName + " (lock_key, status, owner, expires_at) VALUES (?, ?, ?, ?)")) {
insert.setString(1, key);
insert.setString(2, LockStatus.LOCK_HELD.name());
insert.setString(3, owner);
insert.setTimestamp(4, Timestamp.valueOf(expiresAt));
insert.executeUpdate();
}
return;
}

if (getSqlDialect() == SqlDialect.SQLSERVER || getSqlDialect() == SqlDialect.SYBASE) {
Expand All @@ -219,18 +261,20 @@ public void upsertLockEntry(Connection conn, String tableName, String key, Strin
.replaceFirst("\\?", "'" + Timestamp.valueOf(expiresAt) + "'");
stmt.execute(formattedSql);
}
} else {
try (PreparedStatement ps = conn.prepareStatement(sql)) {
// For DB2 we use lock_key but callers pass key as first parameter - that's correct
ps.setString(1, key);
ps.setString(2, LockStatus.LOCK_HELD.name());
ps.setString(3, owner);
ps.setTimestamp(4, Timestamp.valueOf(expiresAt));
ps.executeUpdate();
}
return;
}

// Default case for other dialects
try (PreparedStatement ps = conn.prepareStatement(sql)) {
ps.setString(1, key);
ps.setString(2, LockStatus.LOCK_HELD.name());
ps.setString(3, owner);
ps.setTimestamp(4, Timestamp.valueOf(expiresAt));
ps.executeUpdate();
}
}


public SqlDialect getSqlDialect() {
return sqlDialect;
}
Expand Down
Loading
Loading