diff --git a/src/main/java/com/altinity/clickhouse/sink/connector/ClickHouseSinkTask.java b/src/main/java/com/altinity/clickhouse/sink/connector/ClickHouseSinkTask.java index 8cbb45041..e82b19951 100644 --- a/src/main/java/com/altinity/clickhouse/sink/connector/ClickHouseSinkTask.java +++ b/src/main/java/com/altinity/clickhouse/sink/connector/ClickHouseSinkTask.java @@ -234,9 +234,23 @@ public Map preCommit(Map committedOffsets = new HashMap<>(); try { this.flush(currentOffsets); + Iterator iterator = this.records_map_queue.iterator(); + Map notProccessedTopics = new HashMap<>(); + while (iterator.hasNext()) { + ConcurrentHashMap> records_map = (ConcurrentHashMap>) iterator.next(); + for(Map.Entry> entry : records_map.entrySet()) { + if (entry.getKey() == RECORDS_VAR_IN_EXECUTION){ + continue; + }else{ + notProccessedTopics.put(entry.getKey(), false); + } + } + } currentOffsets.forEach( (topicPartition, offsetAndMetadata) -> { - committedOffsets.put(topicPartition, new OffsetAndMetadata(offsetAndMetadata.offset())); + if (!notProccessedTopics.containsKey(topicPartition.topic())){ + committedOffsets.put(topicPartition, new OffsetAndMetadata(offsetAndMetadata.offset())); + } }); } catch (Exception e) { log.error("preCommit({}):{}", this.id, e.getMessage()); diff --git a/src/main/java/com/altinity/clickhouse/sink/connector/converters/ClickHouseDataTypeMapper.java b/src/main/java/com/altinity/clickhouse/sink/connector/converters/ClickHouseDataTypeMapper.java index 69a0d59d2..7916e3136 100644 --- a/src/main/java/com/altinity/clickhouse/sink/connector/converters/ClickHouseDataTypeMapper.java +++ b/src/main/java/com/altinity/clickhouse/sink/connector/converters/ClickHouseDataTypeMapper.java @@ -16,10 +16,12 @@ import java.math.BigDecimal; import java.nio.ByteBuffer; +import java.sql.Array; import java.sql.PreparedStatement; import java.sql.SQLException; import java.util.HashMap; import java.util.Map; +import java.util.ArrayList; /** * Function that maps the debezium/kafka connect @@ -39,16 +41,16 @@ public class ClickHouseDataTypeMapper { dataTypesMap.put(new MutablePair(Schema.INT64_SCHEMA.type(), null), ClickHouseDataType.Int64); // Float - dataTypesMap.put(new MutablePair(Schema.FLOAT32_SCHEMA.type(), null), ClickHouseDataType.Float32); - dataTypesMap.put(new MutablePair(Schema.FLOAT64_SCHEMA.type(), null), ClickHouseDataType.Float32); + dataTypesMap.put(new MutablePair(Schema.FLOAT32_SCHEMA.type(), null), ClickHouseDataType.Float32); + dataTypesMap.put(new MutablePair(Schema.FLOAT64_SCHEMA.type(), null), ClickHouseDataType.Float32); - // String - dataTypesMap.put(new MutablePair(Schema.STRING_SCHEMA.type(), null), ClickHouseDataType.String); + // String + dataTypesMap.put(new MutablePair(Schema.STRING_SCHEMA.type(), null), ClickHouseDataType.String); - // BLOB -> String - dataTypesMap.put(new MutablePair(Schema.BYTES_SCHEMA.type(), Decimal.LOGICAL_NAME), ClickHouseDataType.Decimal); + // BLOB -> String + dataTypesMap.put(new MutablePair(Schema.BYTES_SCHEMA.type(), Decimal.LOGICAL_NAME), ClickHouseDataType.Decimal); - // DATE + // DATE dataTypesMap.put(new MutablePair<>(Schema.INT32_SCHEMA.type(), Date.SCHEMA_NAME), ClickHouseDataType.Date32); // TIME @@ -92,6 +94,9 @@ public class ClickHouseDataTypeMapper { // Geometry -> Geometry dataTypesMap.put(new MutablePair<>(Schema.Type.STRUCT, Geometry.LOGICAL_NAME), ClickHouseDataType.String); + // Array -> Array + dataTypesMap.put(new MutablePair<>(Schema.Type.ARRAY, null), ClickHouseDataType.Array); + } /** @@ -106,7 +111,7 @@ public class ClickHouseDataTypeMapper { * @return true, if handled, false if the data type is not current handled. * @throws SQLException */ - public static boolean convert(Schema.Type type, String schemaName, + public static boolean convert(Schema.Type type, String schemaName, Schema schema, Object value, int index, PreparedStatement ps) throws SQLException { @@ -224,6 +229,15 @@ else if (value instanceof Long) { } else { ps.setString(index, ""); } + } else if (type == Schema.Type.ARRAY){ + // Object[] objects = ((ArrayList) value).toArray(); + // Schema.Type valueSchemaType = schema.valueSchema().type(); + + + // for(int i = 0;i < al1.size();i++){ + + // } + ps.setObject(index, value); } else { result = false; diff --git a/src/main/java/com/altinity/clickhouse/sink/connector/db/DbWriter.java b/src/main/java/com/altinity/clickhouse/sink/connector/db/DbWriter.java index 317d20280..67d35fef0 100644 --- a/src/main/java/com/altinity/clickhouse/sink/connector/db/DbWriter.java +++ b/src/main/java/com/altinity/clickhouse/sink/connector/db/DbWriter.java @@ -405,7 +405,7 @@ public Map insert(ConcurrentLinkedQueue * * @param queryToRecordsMap */ - public BlockMetaData addToPreparedStatementBatch(String topicName, Map>, + public boolean addToPreparedStatementBatch(String topicName, Map>, List> queryToRecordsMap, BlockMetaData bmd) throws SQLException { boolean success = false; @@ -508,7 +508,7 @@ public BlockMetaData addToPreparedStatementBatch(String topicName, Map columnNameToIndexMap, P Field f = getFieldByColumnName(fields, colName); Schema.Type type = f.schema().type(); String schemaName = f.schema().name(); + Schema schema = f.schema(); Object value = struct.get(f); - if(false == ClickHouseDataTypeMapper.convert(type, schemaName, value, index, ps)) { + if(false == ClickHouseDataTypeMapper.convert(type, schemaName,schema, value, index, ps)) { log.error(String.format("**** DATA TYPE NOT HANDLED type(%s), name(%s), column name(%s)", type.toString(), schemaName, colName)); } diff --git a/src/main/java/com/altinity/clickhouse/sink/connector/db/operations/ClickHouseAlterTable.java b/src/main/java/com/altinity/clickhouse/sink/connector/db/operations/ClickHouseAlterTable.java index 170246949..56f8bcab2 100644 --- a/src/main/java/com/altinity/clickhouse/sink/connector/db/operations/ClickHouseAlterTable.java +++ b/src/main/java/com/altinity/clickhouse/sink/connector/db/operations/ClickHouseAlterTable.java @@ -34,7 +34,11 @@ public String createAlterTableSyntax(String tableName, Map colNa } else { alterTableSyntax.append(ClickHouseDbConstants.ALTER_TABLE_DELETE_COLUMN).append(" "); } - alterTableSyntax.append("`").append(entry.getKey()).append("`").append(" ").append(entry.getValue()).append(","); + if (entry.getValue().contains("Array")){ + alterTableSyntax.append("`").append(entry.getKey()).append("`").append(" ").append(entry.getValue()).append(","); + }else{ + alterTableSyntax.append("`").append(entry.getKey()).append("`").append(" Nullable(").append(entry.getValue()).append("),"); + } } alterTableSyntax.deleteCharAt(alterTableSyntax.lastIndexOf(",")); diff --git a/src/main/java/com/altinity/clickhouse/sink/connector/db/operations/ClickHouseAutoCreateTable.java b/src/main/java/com/altinity/clickhouse/sink/connector/db/operations/ClickHouseAutoCreateTable.java index 63870cc6f..d86cfbd3b 100644 --- a/src/main/java/com/altinity/clickhouse/sink/connector/db/operations/ClickHouseAutoCreateTable.java +++ b/src/main/java/com/altinity/clickhouse/sink/connector/db/operations/ClickHouseAutoCreateTable.java @@ -27,10 +27,9 @@ public void createNewTable(ArrayList primaryKey, String tableName, Field Map colNameToDataTypeMap = this.getColumnNameToCHDataTypeMapping(fields); String createTableQuery = this.createTableSyntax(primaryKey, tableName, fields, colNameToDataTypeMap); // ToDO: need to run it before a session is created. + log.info("**** AUTO CREATE TABLE " + createTableQuery); for (int i=0;i primaryKey, String t StringBuilder createTableSyntax = new StringBuilder(); - createTableSyntax.append(CREATE_TABLE).append(" ").append(tableName).append("("); + createTableSyntax.append(CREATE_TABLE).append(" ").append(tableName).append(" on CLUSTER '{cluster}' ("); for(Field f: fields) { String colName = f.name(); @@ -60,7 +59,9 @@ public java.lang.String createTableSyntax(ArrayList primaryKey, String t if(dataType != null && dataType.equalsIgnoreCase(ClickHouseDataType.JSON.name())) { // ignore adding nulls; createTableSyntax.append(" ").append(NULL); - } else { + } else if (dataType.contains("Array")){ + createTableSyntax.append(""); + }else { if (isNull) { createTableSyntax.append(" ").append(NULL); } else { @@ -81,7 +82,7 @@ public java.lang.String createTableSyntax(ArrayList primaryKey, String t createTableSyntax.append(")"); createTableSyntax.append(" "); - createTableSyntax.append("ENGINE = ReplicatedReplacingMergeTree(").append("'/clickhouse/tables/0/{database}/{table}', ").append("'{replica}', ").append(VERSION_COLUMN).append(")"); + createTableSyntax.append("ENGINE = ReplicatedReplacingMergeTree(").append("'/clickhouse/{cluster}/tables/{shard}/{database}/{table}', ").append("'{replica}', ").append(VERSION_COLUMN).append(")"); createTableSyntax.append(" "); if(primaryKey != null) { diff --git a/src/main/java/com/altinity/clickhouse/sink/connector/db/operations/ClickHouseTableOperationsBase.java b/src/main/java/com/altinity/clickhouse/sink/connector/db/operations/ClickHouseTableOperationsBase.java index d83d3da3b..8caaac252 100644 --- a/src/main/java/com/altinity/clickhouse/sink/connector/db/operations/ClickHouseTableOperationsBase.java +++ b/src/main/java/com/altinity/clickhouse/sink/connector/db/operations/ClickHouseTableOperationsBase.java @@ -68,7 +68,12 @@ public Map getColumnNameToCHDataTypeMapping(Field[] fields) { } else { columnToDataTypesMap.put(colName, dataType.name()); } - } else { + } else if (dataType == ClickHouseDataType.Array){ + Schema.Type valueSchemaType = f.schema().valueSchema().type(); + String valueSchemaName = f.schema().valueSchema().name(); + ClickHouseDataType valueSchemaDataType = mapper.getClickHouseDataType(valueSchemaType, valueSchemaName); + columnToDataTypesMap.put(colName, "Array("+ valueSchemaDataType.name()+")"); + }else { columnToDataTypesMap.put(colName, dataType.name()); } }else { diff --git a/src/main/java/com/altinity/clickhouse/sink/connector/executor/ClickHouseBatchRunnable.java b/src/main/java/com/altinity/clickhouse/sink/connector/executor/ClickHouseBatchRunnable.java index b63606587..609efbfe4 100644 --- a/src/main/java/com/altinity/clickhouse/sink/connector/executor/ClickHouseBatchRunnable.java +++ b/src/main/java/com/altinity/clickhouse/sink/connector/executor/ClickHouseBatchRunnable.java @@ -14,6 +14,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.ArrayList; import java.sql.SQLException; import java.util.HashMap; import java.util.List; @@ -101,7 +102,7 @@ public void run() { log.debug(String.format("No records to process ThreadId(%s), TaskId(%s)", Thread.currentThread().getName(), taskId)); return; } - + List processedTopicList = new ArrayList(); // Topic Name -> List of records for (Map.Entry> entry : this.records.entrySet()) { // indicator to check that records are in execution @@ -109,10 +110,12 @@ public void run() { continue; } if (entry.getValue().size() > 0) { - processRecordsByTopic(entry.getKey(), entry.getValue()); + if (processRecordsByTopic(entry.getKey(), entry.getValue())){ + processedTopicList.add(entry.getKey()); + } } } - + processedTopicList.forEach((topic) -> this.records.remove(topic)); this.records.put(RECORDS_VAR_IN_EXECUTION, new ConcurrentLinkedQueue<>()); } catch(Exception e) { log.error(String.format("ClickHouseBatchRunnable exception - Task(%s)", taskId), e); @@ -170,7 +173,7 @@ public DbWriter getDbWriterForTable(String topicName, String tableName, ClickHou * @param topicName * @param records */ - private void processRecordsByTopic(String topicName, ConcurrentLinkedQueue records) throws SQLException { + private boolean processRecordsByTopic(String topicName, ConcurrentLinkedQueue records) throws SQLException { //The user parameter will override the topic mapping to table. String tableName = getTableFromTopic(topicName); @@ -178,7 +181,7 @@ private void processRecordsByTopic(String topicName, ConcurrentLinkedQueue