From 63005b9eb0f2a4bc3c22559530a62b7af1239165 Mon Sep 17 00:00:00 2001 From: Sunil Kumar Saini Date: Thu, 16 Feb 2023 16:07:44 +0530 Subject: [PATCH 1/4] Array --- .../converters/ClickHouseDataTypeMapper.java | 30 ++++++++++++++----- .../sink/connector/db/DbWriter.java | 3 +- .../operations/ClickHouseAutoCreateTable.java | 4 ++- .../ClickHouseTableOperationsBase.java | 7 ++++- 4 files changed, 33 insertions(+), 11 deletions(-) diff --git a/src/main/java/com/altinity/clickhouse/sink/connector/converters/ClickHouseDataTypeMapper.java b/src/main/java/com/altinity/clickhouse/sink/connector/converters/ClickHouseDataTypeMapper.java index 69a0d59d2..7916e3136 100644 --- a/src/main/java/com/altinity/clickhouse/sink/connector/converters/ClickHouseDataTypeMapper.java +++ b/src/main/java/com/altinity/clickhouse/sink/connector/converters/ClickHouseDataTypeMapper.java @@ -16,10 +16,12 @@ import java.math.BigDecimal; import java.nio.ByteBuffer; +import java.sql.Array; import java.sql.PreparedStatement; import java.sql.SQLException; import java.util.HashMap; import java.util.Map; +import java.util.ArrayList; /** * Function that maps the debezium/kafka connect @@ -39,16 +41,16 @@ public class ClickHouseDataTypeMapper { dataTypesMap.put(new MutablePair(Schema.INT64_SCHEMA.type(), null), ClickHouseDataType.Int64); // Float - dataTypesMap.put(new MutablePair(Schema.FLOAT32_SCHEMA.type(), null), ClickHouseDataType.Float32); - dataTypesMap.put(new MutablePair(Schema.FLOAT64_SCHEMA.type(), null), ClickHouseDataType.Float32); + dataTypesMap.put(new MutablePair(Schema.FLOAT32_SCHEMA.type(), null), ClickHouseDataType.Float32); + dataTypesMap.put(new MutablePair(Schema.FLOAT64_SCHEMA.type(), null), ClickHouseDataType.Float32); - // String - dataTypesMap.put(new MutablePair(Schema.STRING_SCHEMA.type(), null), ClickHouseDataType.String); + // String + dataTypesMap.put(new MutablePair(Schema.STRING_SCHEMA.type(), null), ClickHouseDataType.String); - // BLOB -> String - dataTypesMap.put(new MutablePair(Schema.BYTES_SCHEMA.type(), Decimal.LOGICAL_NAME), ClickHouseDataType.Decimal); + // BLOB -> String + dataTypesMap.put(new MutablePair(Schema.BYTES_SCHEMA.type(), Decimal.LOGICAL_NAME), ClickHouseDataType.Decimal); - // DATE + // DATE dataTypesMap.put(new MutablePair<>(Schema.INT32_SCHEMA.type(), Date.SCHEMA_NAME), ClickHouseDataType.Date32); // TIME @@ -92,6 +94,9 @@ public class ClickHouseDataTypeMapper { // Geometry -> Geometry dataTypesMap.put(new MutablePair<>(Schema.Type.STRUCT, Geometry.LOGICAL_NAME), ClickHouseDataType.String); + // Array -> Array + dataTypesMap.put(new MutablePair<>(Schema.Type.ARRAY, null), ClickHouseDataType.Array); + } /** @@ -106,7 +111,7 @@ public class ClickHouseDataTypeMapper { * @return true, if handled, false if the data type is not current handled. * @throws SQLException */ - public static boolean convert(Schema.Type type, String schemaName, + public static boolean convert(Schema.Type type, String schemaName, Schema schema, Object value, int index, PreparedStatement ps) throws SQLException { @@ -224,6 +229,15 @@ else if (value instanceof Long) { } else { ps.setString(index, ""); } + } else if (type == Schema.Type.ARRAY){ + // Object[] objects = ((ArrayList) value).toArray(); + // Schema.Type valueSchemaType = schema.valueSchema().type(); + + + // for(int i = 0;i < al1.size();i++){ + + // } + ps.setObject(index, value); } else { result = false; diff --git a/src/main/java/com/altinity/clickhouse/sink/connector/db/DbWriter.java b/src/main/java/com/altinity/clickhouse/sink/connector/db/DbWriter.java index 317d20280..cb51d0ab0 100644 --- a/src/main/java/com/altinity/clickhouse/sink/connector/db/DbWriter.java +++ b/src/main/java/com/altinity/clickhouse/sink/connector/db/DbWriter.java @@ -600,9 +600,10 @@ public void insertPreparedStatement(Map columnNameToIndexMap, P Field f = getFieldByColumnName(fields, colName); Schema.Type type = f.schema().type(); String schemaName = f.schema().name(); + Schema schema = f.schema(); Object value = struct.get(f); - if(false == ClickHouseDataTypeMapper.convert(type, schemaName, value, index, ps)) { + if(false == ClickHouseDataTypeMapper.convert(type, schemaName,schema, value, index, ps)) { log.error(String.format("**** DATA TYPE NOT HANDLED type(%s), name(%s), column name(%s)", type.toString(), schemaName, colName)); } diff --git a/src/main/java/com/altinity/clickhouse/sink/connector/db/operations/ClickHouseAutoCreateTable.java b/src/main/java/com/altinity/clickhouse/sink/connector/db/operations/ClickHouseAutoCreateTable.java index 63870cc6f..04d0c5acb 100644 --- a/src/main/java/com/altinity/clickhouse/sink/connector/db/operations/ClickHouseAutoCreateTable.java +++ b/src/main/java/com/altinity/clickhouse/sink/connector/db/operations/ClickHouseAutoCreateTable.java @@ -60,7 +60,9 @@ public java.lang.String createTableSyntax(ArrayList primaryKey, String t if(dataType != null && dataType.equalsIgnoreCase(ClickHouseDataType.JSON.name())) { // ignore adding nulls; createTableSyntax.append(" ").append(NULL); - } else { + } else if (dataType.contains("Array")){ + createTableSyntax.append(""); + }else { if (isNull) { createTableSyntax.append(" ").append(NULL); } else { diff --git a/src/main/java/com/altinity/clickhouse/sink/connector/db/operations/ClickHouseTableOperationsBase.java b/src/main/java/com/altinity/clickhouse/sink/connector/db/operations/ClickHouseTableOperationsBase.java index d83d3da3b..8caaac252 100644 --- a/src/main/java/com/altinity/clickhouse/sink/connector/db/operations/ClickHouseTableOperationsBase.java +++ b/src/main/java/com/altinity/clickhouse/sink/connector/db/operations/ClickHouseTableOperationsBase.java @@ -68,7 +68,12 @@ public Map getColumnNameToCHDataTypeMapping(Field[] fields) { } else { columnToDataTypesMap.put(colName, dataType.name()); } - } else { + } else if (dataType == ClickHouseDataType.Array){ + Schema.Type valueSchemaType = f.schema().valueSchema().type(); + String valueSchemaName = f.schema().valueSchema().name(); + ClickHouseDataType valueSchemaDataType = mapper.getClickHouseDataType(valueSchemaType, valueSchemaName); + columnToDataTypesMap.put(colName, "Array("+ valueSchemaDataType.name()+")"); + }else { columnToDataTypesMap.put(colName, dataType.name()); } }else { From 313ccda21c848dc740132185a3805bdf419d989e Mon Sep 17 00:00:00 2001 From: Sunil Kumar Saini Date: Fri, 10 Mar 2023 16:30:47 +0530 Subject: [PATCH 2/4] create table on cluster --- .../db/operations/ClickHouseAutoCreateTable.java | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/src/main/java/com/altinity/clickhouse/sink/connector/db/operations/ClickHouseAutoCreateTable.java b/src/main/java/com/altinity/clickhouse/sink/connector/db/operations/ClickHouseAutoCreateTable.java index 04d0c5acb..d86cfbd3b 100644 --- a/src/main/java/com/altinity/clickhouse/sink/connector/db/operations/ClickHouseAutoCreateTable.java +++ b/src/main/java/com/altinity/clickhouse/sink/connector/db/operations/ClickHouseAutoCreateTable.java @@ -27,10 +27,9 @@ public void createNewTable(ArrayList primaryKey, String tableName, Field Map colNameToDataTypeMap = this.getColumnNameToCHDataTypeMapping(fields); String createTableQuery = this.createTableSyntax(primaryKey, tableName, fields, colNameToDataTypeMap); // ToDO: need to run it before a session is created. + log.info("**** AUTO CREATE TABLE " + createTableQuery); for (int i=0;i primaryKey, String t StringBuilder createTableSyntax = new StringBuilder(); - createTableSyntax.append(CREATE_TABLE).append(" ").append(tableName).append("("); + createTableSyntax.append(CREATE_TABLE).append(" ").append(tableName).append(" on CLUSTER '{cluster}' ("); for(Field f: fields) { String colName = f.name(); @@ -83,7 +82,7 @@ public java.lang.String createTableSyntax(ArrayList primaryKey, String t createTableSyntax.append(")"); createTableSyntax.append(" "); - createTableSyntax.append("ENGINE = ReplicatedReplacingMergeTree(").append("'/clickhouse/tables/0/{database}/{table}', ").append("'{replica}', ").append(VERSION_COLUMN).append(")"); + createTableSyntax.append("ENGINE = ReplicatedReplacingMergeTree(").append("'/clickhouse/{cluster}/tables/{shard}/{database}/{table}', ").append("'{replica}', ").append(VERSION_COLUMN).append(")"); createTableSyntax.append(" "); if(primaryKey != null) { From fc3049df576ff7acdced7c691564c851cb264e19 Mon Sep 17 00:00:00 2001 From: Sunil Kumar Saini Date: Wed, 12 Apr 2023 10:35:42 +0530 Subject: [PATCH 3/4] don't commit for kafka topic if error --- .../sink/connector/ClickHouseSinkTask.java | 16 +++++++++++- .../sink/connector/db/DbWriter.java | 4 +-- .../executor/ClickHouseBatchRunnable.java | 26 +++++++++---------- 3 files changed, 30 insertions(+), 16 deletions(-) diff --git a/src/main/java/com/altinity/clickhouse/sink/connector/ClickHouseSinkTask.java b/src/main/java/com/altinity/clickhouse/sink/connector/ClickHouseSinkTask.java index 8cbb45041..e82b19951 100644 --- a/src/main/java/com/altinity/clickhouse/sink/connector/ClickHouseSinkTask.java +++ b/src/main/java/com/altinity/clickhouse/sink/connector/ClickHouseSinkTask.java @@ -234,9 +234,23 @@ public Map preCommit(Map committedOffsets = new HashMap<>(); try { this.flush(currentOffsets); + Iterator iterator = this.records_map_queue.iterator(); + Map notProccessedTopics = new HashMap<>(); + while (iterator.hasNext()) { + ConcurrentHashMap> records_map = (ConcurrentHashMap>) iterator.next(); + for(Map.Entry> entry : records_map.entrySet()) { + if (entry.getKey() == RECORDS_VAR_IN_EXECUTION){ + continue; + }else{ + notProccessedTopics.put(entry.getKey(), false); + } + } + } currentOffsets.forEach( (topicPartition, offsetAndMetadata) -> { - committedOffsets.put(topicPartition, new OffsetAndMetadata(offsetAndMetadata.offset())); + if (!notProccessedTopics.containsKey(topicPartition.topic())){ + committedOffsets.put(topicPartition, new OffsetAndMetadata(offsetAndMetadata.offset())); + } }); } catch (Exception e) { log.error("preCommit({}):{}", this.id, e.getMessage()); diff --git a/src/main/java/com/altinity/clickhouse/sink/connector/db/DbWriter.java b/src/main/java/com/altinity/clickhouse/sink/connector/db/DbWriter.java index cb51d0ab0..67d35fef0 100644 --- a/src/main/java/com/altinity/clickhouse/sink/connector/db/DbWriter.java +++ b/src/main/java/com/altinity/clickhouse/sink/connector/db/DbWriter.java @@ -405,7 +405,7 @@ public Map insert(ConcurrentLinkedQueue * * @param queryToRecordsMap */ - public BlockMetaData addToPreparedStatementBatch(String topicName, Map>, + public boolean addToPreparedStatementBatch(String topicName, Map>, List> queryToRecordsMap, BlockMetaData bmd) throws SQLException { boolean success = false; @@ -508,7 +508,7 @@ public BlockMetaData addToPreparedStatementBatch(String topicName, Map processedTopicList = new ArrayList(); // Topic Name -> List of records for (Map.Entry> entry : this.records.entrySet()) { // indicator to check that records are in execution @@ -109,10 +110,12 @@ public void run() { continue; } if (entry.getValue().size() > 0) { - processRecordsByTopic(entry.getKey(), entry.getValue()); + if (processRecordsByTopic(entry.getKey(), entry.getValue())){ + processedTopicList.add(entry.getKey()); + } } } - + processedTopicList.forEach((topic) -> this.records.remove(topic)); this.records.put(RECORDS_VAR_IN_EXECUTION, new ConcurrentLinkedQueue<>()); } catch(Exception e) { log.error(String.format("ClickHouseBatchRunnable exception - Task(%s)", taskId), e); @@ -170,7 +173,7 @@ public DbWriter getDbWriterForTable(String topicName, String tableName, ClickHou * @param topicName * @param records */ - private void processRecordsByTopic(String topicName, ConcurrentLinkedQueue records) throws SQLException { + private boolean processRecordsByTopic(String topicName, ConcurrentLinkedQueue records) throws SQLException { //The user parameter will override the topic mapping to table. String tableName = getTableFromTopic(topicName); @@ -178,7 +181,7 @@ private void processRecordsByTopic(String topicName, ConcurrentLinkedQueue Date: Tue, 1 Aug 2023 11:08:09 +0530 Subject: [PATCH 4/4] nullable in alter table --- .../sink/connector/db/operations/ClickHouseAlterTable.java | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/main/java/com/altinity/clickhouse/sink/connector/db/operations/ClickHouseAlterTable.java b/src/main/java/com/altinity/clickhouse/sink/connector/db/operations/ClickHouseAlterTable.java index 170246949..56f8bcab2 100644 --- a/src/main/java/com/altinity/clickhouse/sink/connector/db/operations/ClickHouseAlterTable.java +++ b/src/main/java/com/altinity/clickhouse/sink/connector/db/operations/ClickHouseAlterTable.java @@ -34,7 +34,11 @@ public String createAlterTableSyntax(String tableName, Map colNa } else { alterTableSyntax.append(ClickHouseDbConstants.ALTER_TABLE_DELETE_COLUMN).append(" "); } - alterTableSyntax.append("`").append(entry.getKey()).append("`").append(" ").append(entry.getValue()).append(","); + if (entry.getValue().contains("Array")){ + alterTableSyntax.append("`").append(entry.getKey()).append("`").append(" ").append(entry.getValue()).append(","); + }else{ + alterTableSyntax.append("`").append(entry.getKey()).append("`").append(" Nullable(").append(entry.getValue()).append("),"); + } } alterTableSyntax.deleteCharAt(alterTableSyntax.lastIndexOf(","));