Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Array #2

Open
wants to merge 4 commits into
base: multi-threads
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -234,9 +234,23 @@ public Map<TopicPartition, OffsetAndMetadata> preCommit(Map<TopicPartition, Offs
Map<TopicPartition, OffsetAndMetadata> committedOffsets = new HashMap<>();
try {
this.flush(currentOffsets);
Iterator iterator = this.records_map_queue.iterator();
Map<String,Boolean> notProccessedTopics = new HashMap<>();
while (iterator.hasNext()) {
ConcurrentHashMap<String, ConcurrentLinkedQueue<ClickHouseStruct>> records_map = (ConcurrentHashMap<String, ConcurrentLinkedQueue<ClickHouseStruct>>) iterator.next();
for(Map.Entry<String, ConcurrentLinkedQueue<ClickHouseStruct>> entry : records_map.entrySet()) {
if (entry.getKey() == RECORDS_VAR_IN_EXECUTION){
continue;
}else{
notProccessedTopics.put(entry.getKey(), false);
}
}
}
currentOffsets.forEach(
(topicPartition, offsetAndMetadata) -> {
committedOffsets.put(topicPartition, new OffsetAndMetadata(offsetAndMetadata.offset()));
if (!notProccessedTopics.containsKey(topicPartition.topic())){
committedOffsets.put(topicPartition, new OffsetAndMetadata(offsetAndMetadata.offset()));
}
});
} catch (Exception e) {
log.error("preCommit({}):{}", this.id, e.getMessage());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,12 @@

import java.math.BigDecimal;
import java.nio.ByteBuffer;
import java.sql.Array;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.HashMap;
import java.util.Map;
import java.util.ArrayList;

/**
* Function that maps the debezium/kafka connect
Expand All @@ -39,16 +41,16 @@ public class ClickHouseDataTypeMapper {
dataTypesMap.put(new MutablePair(Schema.INT64_SCHEMA.type(), null), ClickHouseDataType.Int64);

// Float
dataTypesMap.put(new MutablePair(Schema.FLOAT32_SCHEMA.type(), null), ClickHouseDataType.Float32);
dataTypesMap.put(new MutablePair(Schema.FLOAT64_SCHEMA.type(), null), ClickHouseDataType.Float32);
dataTypesMap.put(new MutablePair(Schema.FLOAT32_SCHEMA.type(), null), ClickHouseDataType.Float32);
dataTypesMap.put(new MutablePair(Schema.FLOAT64_SCHEMA.type(), null), ClickHouseDataType.Float32);

// String
dataTypesMap.put(new MutablePair(Schema.STRING_SCHEMA.type(), null), ClickHouseDataType.String);
// String
dataTypesMap.put(new MutablePair(Schema.STRING_SCHEMA.type(), null), ClickHouseDataType.String);

// BLOB -> String
dataTypesMap.put(new MutablePair(Schema.BYTES_SCHEMA.type(), Decimal.LOGICAL_NAME), ClickHouseDataType.Decimal);
// BLOB -> String
dataTypesMap.put(new MutablePair(Schema.BYTES_SCHEMA.type(), Decimal.LOGICAL_NAME), ClickHouseDataType.Decimal);

// DATE
// DATE
dataTypesMap.put(new MutablePair<>(Schema.INT32_SCHEMA.type(), Date.SCHEMA_NAME), ClickHouseDataType.Date32);

// TIME
Expand Down Expand Up @@ -92,6 +94,9 @@ public class ClickHouseDataTypeMapper {
// Geometry -> Geometry
dataTypesMap.put(new MutablePair<>(Schema.Type.STRUCT, Geometry.LOGICAL_NAME), ClickHouseDataType.String);

// Array -> Array
dataTypesMap.put(new MutablePair<>(Schema.Type.ARRAY, null), ClickHouseDataType.Array);

}

/**
Expand All @@ -106,7 +111,7 @@ public class ClickHouseDataTypeMapper {
* @return true, if handled, false if the data type is not current handled.
* @throws SQLException
*/
public static boolean convert(Schema.Type type, String schemaName,
public static boolean convert(Schema.Type type, String schemaName, Schema schema,
Object value,
int index,
PreparedStatement ps) throws SQLException {
Expand Down Expand Up @@ -224,6 +229,15 @@ else if (value instanceof Long) {
} else {
ps.setString(index, "");
}
} else if (type == Schema.Type.ARRAY){
// Object[] objects = ((ArrayList) value).toArray();
// Schema.Type valueSchemaType = schema.valueSchema().type();


// for(int i = 0;i < al1.size();i++){

// }
ps.setObject(index, value);
}
else {
result = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -405,7 +405,7 @@ public Map<TopicPartition, Long> insert(ConcurrentLinkedQueue<ClickHouseStruct>
*
* @param queryToRecordsMap
*/
public BlockMetaData addToPreparedStatementBatch(String topicName, Map<MutablePair<String, Map<String, Integer>>,
public boolean addToPreparedStatementBatch(String topicName, Map<MutablePair<String, Map<String, Integer>>,
List<ClickHouseStruct>> queryToRecordsMap, BlockMetaData bmd) throws SQLException {

boolean success = false;
Expand Down Expand Up @@ -508,7 +508,7 @@ public BlockMetaData addToPreparedStatementBatch(String topicName, Map<MutablePa
}
}

return bmd;
return success;
}


Expand Down Expand Up @@ -600,9 +600,10 @@ public void insertPreparedStatement(Map<String, Integer> columnNameToIndexMap, P
Field f = getFieldByColumnName(fields, colName);
Schema.Type type = f.schema().type();
String schemaName = f.schema().name();
Schema schema = f.schema();
Object value = struct.get(f);

if(false == ClickHouseDataTypeMapper.convert(type, schemaName, value, index, ps)) {
if(false == ClickHouseDataTypeMapper.convert(type, schemaName,schema, value, index, ps)) {
log.error(String.format("**** DATA TYPE NOT HANDLED type(%s), name(%s), column name(%s)", type.toString(),
schemaName, colName));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,11 @@ public String createAlterTableSyntax(String tableName, Map<String, String> colNa
} else {
alterTableSyntax.append(ClickHouseDbConstants.ALTER_TABLE_DELETE_COLUMN).append(" ");
}
alterTableSyntax.append("`").append(entry.getKey()).append("`").append(" ").append(entry.getValue()).append(",");
if (entry.getValue().contains("Array")){
alterTableSyntax.append("`").append(entry.getKey()).append("`").append(" ").append(entry.getValue()).append(",");
}else{
alterTableSyntax.append("`").append(entry.getKey()).append("`").append(" Nullable(").append(entry.getValue()).append("),");
}
}
alterTableSyntax.deleteCharAt(alterTableSyntax.lastIndexOf(","));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,9 @@ public void createNewTable(ArrayList<String> primaryKey, String tableName, Field
Map<String, String> colNameToDataTypeMap = this.getColumnNameToCHDataTypeMapping(fields);
String createTableQuery = this.createTableSyntax(primaryKey, tableName, fields, colNameToDataTypeMap);
// ToDO: need to run it before a session is created.
log.info("**** AUTO CREATE TABLE " + createTableQuery);
for (int i=0;i<connections.size();i++){
String createTableQueryFinal = createTableQuery.replace("{replica}", "" + i);
log.info("**** AUTO CREATE TABLE " + createTableQueryFinal);
this.runQuery(createTableQueryFinal, connections.get(i));
this.runQuery(createTableQuery, connections.get(i));
}
}

Expand All @@ -45,7 +44,7 @@ public java.lang.String createTableSyntax(ArrayList<String> primaryKey, String t

StringBuilder createTableSyntax = new StringBuilder();

createTableSyntax.append(CREATE_TABLE).append(" ").append(tableName).append("(");
createTableSyntax.append(CREATE_TABLE).append(" ").append(tableName).append(" on CLUSTER '{cluster}' (");

for(Field f: fields) {
String colName = f.name();
Expand All @@ -60,7 +59,9 @@ public java.lang.String createTableSyntax(ArrayList<String> primaryKey, String t
if(dataType != null && dataType.equalsIgnoreCase(ClickHouseDataType.JSON.name())) {
// ignore adding nulls;
createTableSyntax.append(" ").append(NULL);
} else {
} else if (dataType.contains("Array")){
createTableSyntax.append("");
}else {
if (isNull) {
createTableSyntax.append(" ").append(NULL);
} else {
Expand All @@ -81,7 +82,7 @@ public java.lang.String createTableSyntax(ArrayList<String> primaryKey, String t

createTableSyntax.append(")");
createTableSyntax.append(" ");
createTableSyntax.append("ENGINE = ReplicatedReplacingMergeTree(").append("'/clickhouse/tables/0/{database}/{table}', ").append("'{replica}', ").append(VERSION_COLUMN).append(")");
createTableSyntax.append("ENGINE = ReplicatedReplacingMergeTree(").append("'/clickhouse/{cluster}/tables/{shard}/{database}/{table}', ").append("'{replica}', ").append(VERSION_COLUMN).append(")");
createTableSyntax.append(" ");

if(primaryKey != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,12 @@ public Map<String, String> getColumnNameToCHDataTypeMapping(Field[] fields) {
} else {
columnToDataTypesMap.put(colName, dataType.name());
}
} else {
} else if (dataType == ClickHouseDataType.Array){
Schema.Type valueSchemaType = f.schema().valueSchema().type();
String valueSchemaName = f.schema().valueSchema().name();
ClickHouseDataType valueSchemaDataType = mapper.getClickHouseDataType(valueSchemaType, valueSchemaName);
columnToDataTypesMap.put(colName, "Array("+ valueSchemaDataType.name()+")");
}else {
columnToDataTypesMap.put(colName, dataType.name());
}
}else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.sql.SQLException;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -101,18 +102,20 @@ public void run() {
log.debug(String.format("No records to process ThreadId(%s), TaskId(%s)", Thread.currentThread().getName(), taskId));
return;
}

List<String> processedTopicList = new ArrayList();
// Topic Name -> List of records
for (Map.Entry<String, ConcurrentLinkedQueue<ClickHouseStruct>> entry : this.records.entrySet()) {
// indicator to check that records are in execution
if (entry.getKey() == RECORDS_VAR_IN_EXECUTION){
continue;
}
if (entry.getValue().size() > 0) {
processRecordsByTopic(entry.getKey(), entry.getValue());
if (processRecordsByTopic(entry.getKey(), entry.getValue())){
processedTopicList.add(entry.getKey());
}
}
}

processedTopicList.forEach((topic) -> this.records.remove(topic));
this.records.put(RECORDS_VAR_IN_EXECUTION, new ConcurrentLinkedQueue<>());
} catch(Exception e) {
log.error(String.format("ClickHouseBatchRunnable exception - Task(%s)", taskId), e);
Expand Down Expand Up @@ -170,15 +173,15 @@ public DbWriter getDbWriterForTable(String topicName, String tableName, ClickHou
* @param topicName
* @param records
*/
private void processRecordsByTopic(String topicName, ConcurrentLinkedQueue<ClickHouseStruct> records) throws SQLException {
private boolean processRecordsByTopic(String topicName, ConcurrentLinkedQueue<ClickHouseStruct> records) throws SQLException {

//The user parameter will override the topic mapping to table.
String tableName = getTableFromTopic(topicName);
DbWriter writer = getDbWriterForTable(topicName, tableName, records.peek());

if(writer == null || writer.wasTableMetaDataRetrieved() == false) {
log.error("*** TABLE METADATA not retrieved, retry next time");
return;
return false;
}
// Step 1: The Batch Insert with preparedStatement in JDBC
// works by forming the Query and then adding records to the Batch.
Expand All @@ -197,11 +200,8 @@ private void processRecordsByTopic(String topicName, ConcurrentLinkedQueue<Click
log.info("groupQueryWithRecords time taken in millis: " + (groupQueryEndTime - groupQueryStartTime));

BlockMetaData bmd = new BlockMetaData();

if(flushRecordsToClickHouse(topicName, writer, queryToRecordsMap, bmd)) {
// Remove the entry.
queryToRecordsMap.remove(topicName);
}
boolean result = false;
result = flushRecordsToClickHouse(topicName, writer, queryToRecordsMap, bmd);

if (this.config.getBoolean(ClickHouseSinkConnectorConfigVariables.ENABLE_KAFKA_OFFSET)) {
log.info("***** KAFKA OFFSET MANAGEMENT ENABLED *****");
Expand All @@ -218,7 +218,7 @@ private void processRecordsByTopic(String topicName, ConcurrentLinkedQueue<Click
// Metrics.updateSinkRecordsCounter(blockUuid.toString(), taskId, topicName, tableName,
// bmd.getPartitionToOffsetMap(), numRecords, bmd.getMinSourceLag(),
// bmd.getMaxSourceLag(), bmd.getMinConsumerLag(), bmd.getMaxConsumerLag());

return result;
}

/**
Expand All @@ -237,14 +237,14 @@ private boolean flushRecordsToClickHouse(String topicName, DbWriter writer, Map<
long diffInMs = currentTime - lastFlushTimeInMs;
long bufferFlushTimeout = this.config.getLong(ClickHouseSinkConnectorConfigVariables.BUFFER_FLUSH_TIMEOUT);

writer.addToPreparedStatementBatch(topicName, queryToRecordsMap, bmd);
result = writer.addToPreparedStatementBatch(topicName, queryToRecordsMap, bmd);

try {
Metrics.updateMetrics(bmd);
} catch(Exception e) {
log.error("****** Error updating Metrics ******");
}
result = true;
// result = true;
//
// // Step 2: Check if the buffer can be flushed
// // One if the max buffer size is reached
Expand Down