Skip to content

Commit

Permalink
email skipped
Browse files Browse the repository at this point in the history
  • Loading branch information
Amjad committed Jun 6, 2024
1 parent 9f8a514 commit f3356c4
Showing 1 changed file with 21 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -139,22 +139,26 @@ private boolean executePreparedStatement(String insertQuery, String topicName,
continue;
}

boolean error = false;
if (CdcRecordState.CDC_RECORD_STATE_BEFORE == getCdcSectionBasedOnOperation(record.getCdcOperation())) {
insertPreparedStatement(entry.getKey().right, ps, record.getBeforeModifiedFields(), record, record.getBeforeStruct(),
error = insertPreparedStatement(entry.getKey().right, ps, record.getBeforeModifiedFields(), record, record.getBeforeStruct(),
true, config, columnToDataTypeMap, engine, conn);
} else if (CdcRecordState.CDC_RECORD_STATE_AFTER == getCdcSectionBasedOnOperation(record.getCdcOperation())) {
insertPreparedStatement(entry.getKey().right, ps, record.getAfterModifiedFields(), record, record.getAfterStruct(),
error = insertPreparedStatement(entry.getKey().right, ps, record.getAfterModifiedFields(), record, record.getAfterStruct(),
false, config, columnToDataTypeMap, engine, conn);
} else if (CdcRecordState.CDC_RECORD_STATE_BOTH == getCdcSectionBasedOnOperation(record.getCdcOperation())) {
if (engine != null && engine.getEngine().equalsIgnoreCase(DBMetadata.TABLE_ENGINE.COLLAPSING_MERGE_TREE.getEngine())) {
insertPreparedStatement(entry.getKey().right, ps, record.getBeforeModifiedFields(), record, record.getBeforeStruct(),
error = insertPreparedStatement(entry.getKey().right, ps, record.getBeforeModifiedFields(), record, record.getBeforeStruct(),
true, config, columnToDataTypeMap, engine, conn);
}
insertPreparedStatement(entry.getKey().right, ps, record.getAfterModifiedFields(), record, record.getAfterStruct(),
error = insertPreparedStatement(entry.getKey().right, ps, record.getAfterModifiedFields(), record, record.getAfterStruct(),
false, config, columnToDataTypeMap, engine, conn);
} else {
log.error("INVALID CDC RECORD STATE");
}
if (error) {
continue;
}

ps.addBatch();
}
Expand Down Expand Up @@ -199,14 +203,23 @@ private boolean executePreparedStatement(String insertQuery, String topicName,
* @param fields
* @param record
*/
public void insertPreparedStatement(Map<String, Integer> columnNameToIndexMap, PreparedStatement ps, List<Field> fields,
public boolean insertPreparedStatement(Map<String, Integer> columnNameToIndexMap, PreparedStatement ps, List<Field> fields,
ClickHouseStruct record, Struct struct, boolean beforeSection,
ClickHouseSinkConnectorConfig config,
Map<String, String> columnNameToDataTypeMap,
DBMetadata.TABLE_ENGINE engine,
ClickHouseConnection conn) throws Exception {



try {
Object emailVal = struct.get("email");
if (emailVal == null || emailVal.toString().isEmpty()) {
log.error("Email empty skipping");
return true;
}
} catch (Exception e) {
log.error("Email data fetch exception:- ", e);
return true;
}
// int index = 1;
// Use this map's key natural ordering as the source of truth.
for (Map.Entry<String, String> entry : columnNameToDataTypeMap.entrySet()) {
Expand Down Expand Up @@ -394,6 +407,7 @@ public void insertPreparedStatement(Map<String, Integer> columnNameToIndexMap, P
}
}
}
return false;
}

/**
Expand Down

0 comments on commit f3356c4

Please sign in to comment.