Skip to content

Commit 8826eac

Browse files
updating RedshiftSchemaReader and resolved pr commnets .
1 parent 0590df4 commit 8826eac

File tree

12 files changed

+50
-228
lines changed

12 files changed

+50
-228
lines changed

amazon-redshift-plugin/src/main/java/io/cdap/plugin/amazon/redshift/RedshiftSchemaReader.java

Lines changed: 26 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -57,10 +57,32 @@ public RedshiftSchemaReader(String sessionID) {
5757
public Schema getSchema(ResultSetMetaData metadata, int index) throws SQLException {
5858
String typeName = metadata.getColumnTypeName(index);
5959
int columnType = metadata.getColumnType(index);
60-
int precision = metadata.getPrecision(index);
61-
int scale = metadata.getScale(index);
62-
String columnName = metadata.getColumnName(index);
63-
getSchema(typeName, columnType , precision , scale , columnName);
60+
61+
if (STRING_MAPPED_REDSHIFT_TYPES_NAMES.contains(typeName)) {
62+
return Schema.of(Schema.Type.STRING);
63+
}
64+
if (typeName.equalsIgnoreCase("INT")) {
65+
return Schema.of(Schema.Type.INT);
66+
}
67+
if (typeName.equalsIgnoreCase("BIGINT")) {
68+
return Schema.of(Schema.Type.LONG);
69+
}
70+
71+
// If it is a numeric type without precision then use the Schema of String to avoid any precision loss
72+
if (Types.NUMERIC == columnType) {
73+
int precision = metadata.getPrecision(index);
74+
if (precision == 0) {
75+
LOG.warn(String.format("Field '%s' is a %s type without precision and scale, "
76+
+ "converting into STRING type to avoid any precision loss.",
77+
metadata.getColumnName(index),
78+
metadata.getColumnTypeName(index)));
79+
return Schema.of(Schema.Type.STRING);
80+
}
81+
}
82+
83+
if (typeName.equalsIgnoreCase("timestamp")) {
84+
return Schema.of(Schema.LogicalType.DATETIME);
85+
}
6486
return super.getSchema(metadata, index);
6587
}
6688

@@ -91,70 +113,4 @@ public List<Schema.Field> getSchemaFields(ResultSet resultSet) throws SQLExcepti
91113
}
92114
return schemaFields;
93115
}
94-
95-
/**
96-
* Override: Fetches schema fields for a specific table using database metadata.
97-
*/
98-
@Override
99-
public List<Schema.Field> getSchemaFields(Connection connection, String tableName) throws SQLException {
100-
DatabaseMetaData dbMetaData = connection.getMetaData();
101-
String schema = null;
102-
String table = tableName;
103-
if (tableName.contains(".")) {
104-
String[] parts = tableName.split("\\.", 2);
105-
schema = parts[0];
106-
table = parts[1];
107-
}
108-
try (ResultSet columns = dbMetaData.getColumns(null, schema, table, null)) {
109-
List<Schema.Field> schemaFields = Lists.newArrayList();
110-
while (columns.next()) {
111-
String columnName = columns.getString("COLUMN_NAME");
112-
String typeName = columns.getString("TYPE_NAME");
113-
int columnType = columns.getInt("DATA_TYPE");
114-
int precision = columns.getInt("COLUMN_SIZE");
115-
int scale = columns.getInt("DECIMAL_DIGITS");
116-
int nullable = columns.getInt("NULLABLE");
117-
Schema columnSchema = getSchema(typeName, columnType, precision, scale, columnName);
118-
if (nullable == DatabaseMetaData.columnNullable) {
119-
columnSchema = Schema.nullableOf(columnSchema);
120-
}
121-
Schema.Field field = Schema.Field.of(columnName, columnSchema);
122-
schemaFields.add(field);
123-
}
124-
return schemaFields;
125-
}
126-
}
127-
128-
/**
129-
* Maps database column type information to a corresponding {@link Schema}.
130-
*
131-
* @param typeName the SQL type name
132-
* @param columnType the JDBC type code
133-
* @param precision the column precision
134-
* @param scale the column scale
135-
* @param columnName the column name
136-
* @return the mapped {@link Schema} type
137-
*/
138-
139-
public Schema getSchema(String typeName, int columnType, int precision, int scale, String columnName) {
140-
if (STRING_MAPPED_REDSHIFT_TYPES_NAMES.contains(typeName)) {
141-
return Schema.of(Schema.Type.STRING);
142-
}
143-
if ("INT".equalsIgnoreCase(typeName)) {
144-
return Schema.of(Schema.Type.INT);
145-
}
146-
if ("BIGINT".equalsIgnoreCase(typeName)) {
147-
return Schema.of(Schema.Type.LONG);
148-
}
149-
if (Types.NUMERIC == columnType && precision == 0) {
150-
LOG.warn(String.format("Field '%s' is a %s type without precision and scale," +
151-
" converting into STRING type to avoid any precision loss.",
152-
columnName, typeName));
153-
return Schema.of(Schema.Type.STRING);
154-
}
155-
if ("timestamp".equalsIgnoreCase(typeName)) {
156-
return Schema.of(Schema.LogicalType.DATETIME);
157-
}
158-
return Schema.of(Schema.Type.STRING);
159-
}
160116
}

amazon-redshift-plugin/src/main/java/io/cdap/plugin/amazon/redshift/RedshiftSource.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -67,15 +67,14 @@ public RedshiftSource(RedshiftSourceConfig redshiftSourceConfig) {
6767
@Override
6868
public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
6969
FailureCollector collector = pipelineConfigurer.getStageConfigurer().getFailureCollector();
70-
if (!sourceConfig.containsMacro("tableName") && !sourceConfig.containsMacro("importQuery")) {
71-
if ((Strings.isNullOrEmpty(sourceConfig.getTableName()))
72-
&& (Strings.isNullOrEmpty(sourceConfig.getImportQuery()))) {
70+
if ((!sourceConfig.containsMacro("tableName") && !sourceConfig.containsMacro("importQuery"))
71+
&& (Strings.isNullOrEmpty(sourceConfig.getTableName()) &&
72+
(Strings.isNullOrEmpty(sourceConfig.getImportQuery())))) {
7373
collector.addFailure(
7474
"Either 'tableName' or 'importQuery' must be specified.",
7575
"Provide a value for either 'tableName' or 'importQuery' in the configuration."
7676
).withConfigProperty(sourceConfig.getTableName()).withConfigProperty(sourceConfig.getImportQuery());
7777
}
78-
}
7978
super.configurePipeline(pipelineConfigurer);
8079
}
8180

cloudsql-mysql-plugin/widgets/CloudSQLMySQL-batchsource.json

Lines changed: 0 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -275,30 +275,6 @@
275275
"name": "port"
276276
}
277277
]
278-
},
279-
{
280-
"name": "ImportQuery",
281-
"condition": {
282-
"expression": "importQueryType == 'importQuery'"
283-
},
284-
"show": [
285-
{
286-
"type": "property",
287-
"name": "importQuery"
288-
}
289-
]
290-
},
291-
{
292-
"name": "NativeTableName",
293-
"condition": {
294-
"expression": "importQueryType == 'tableName'"
295-
},
296-
"show": [
297-
{
298-
"type": "property",
299-
"name": "tableName"
300-
}
301-
]
302278
}
303279
],
304280
"jump-config": {

cloudsql-postgresql-plugin/widgets/CloudSQLPostgreSQL-batchsource.json

Lines changed: 0 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -279,30 +279,6 @@
279279
"name": "port"
280280
}
281281
]
282-
},
283-
{
284-
"name": "ImportQuery",
285-
"condition": {
286-
"expression": "importQueryType == 'importQuery'"
287-
},
288-
"show": [
289-
{
290-
"type": "property",
291-
"name": "importQuery"
292-
}
293-
]
294-
},
295-
{
296-
"name": "NativeTableName",
297-
"condition": {
298-
"expression": "importQueryType == 'tableName'"
299-
},
300-
"show": [
301-
{
302-
"type": "property",
303-
"name": "tableName"
304-
}
305-
]
306282
}
307283
],
308284
"jump-config": {

database-commons/src/main/java/io/cdap/plugin/db/CommonSchemaReader.java

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,16 @@ public boolean shouldIgnoreColumn(ResultSetMetaData metadata, int index) throws
6363
return false;
6464
}
6565

66-
66+
/**
67+
* Returns the schema fields for the specified table using JDBC metadata.
68+
* Supports schema-qualified table names (e.g. "schema.table").
69+
* Throws SQLException if the table has no columns.
70+
*
71+
* @param connection JDBC connection
72+
* @param tableName table name, optionally schema-qualified
73+
* @return list of schema fields
74+
* @throws SQLException if no columns found or on database error
75+
*/
6776
@Override
6877
public List<Schema.Field> getSchemaFields(Connection connection, String tableName) throws SQLException {
6978
DatabaseMetaData dbMetaData = connection.getMetaData();
@@ -85,16 +94,17 @@ public List<Schema.Field> getSchemaFields(Connection connection, String tableNam
8594
int scale = columns.getInt("DECIMAL_DIGITS");
8695
int nullable = columns.getInt("NULLABLE");
8796

88-
// Use DBUtils to map SQL type to CDAP schema
8997
Schema columnSchema = DBUtils.getSchema(typeName, columnType, precision, scale, columnName, true, true);
90-
9198
if (nullable == DatabaseMetaData.columnNullable) {
9299
columnSchema = Schema.nullableOf(columnSchema);
93100
}
94-
95101
Schema.Field field = Schema.Field.of(columnName, columnSchema);
96102
schemaFields.add(field);
97103
}
104+
if (schemaFields.isEmpty()) {
105+
throw new SQLException("No columns found for table: " +
106+
(schema != null ? schema + "." : "") + table);
107+
}
98108
return schemaFields;
99109
}
100110
}

database-commons/src/main/java/io/cdap/plugin/db/config/AbstractDBSpecificSourceConfig.java

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -144,21 +144,19 @@ public void validate(FailureCollector collector) {
144144
TransactionIsolationLevel.validate(getTransactionIsolationLevel(), collector);
145145
}
146146

147-
if (!containsMacro(TABLE_NAME) && !containsMacro(IMPORT_QUERY)) {
148-
if (Strings.isNullOrEmpty(tableName) && Strings.isNullOrEmpty(importQuery)) {
147+
if ((!containsMacro(TABLE_NAME) && !containsMacro(IMPORT_QUERY)) &&
148+
(Strings.isNullOrEmpty(tableName) && Strings.isNullOrEmpty(importQuery))) {
149149
collector.addFailure(" Import Query must be specified.",
150150
" Import Query, Can not be empty.")
151151
.withConfigProperty(IMPORT_QUERY);
152-
}
153152
}
154-
if (!Strings.isNullOrEmpty(importQuery)) {
155-
if (!hasOneSplit && !containsMacro(IMPORT_QUERY) && !getImportQuery().contains("$CONDITIONS")) {
153+
if (!Strings.isNullOrEmpty(importQuery) &&
154+
(!hasOneSplit && !containsMacro(IMPORT_QUERY) && !getImportQuery().contains("$CONDITIONS"))) {
156155
collector.addFailure(String.format(
157156
"Import Query %s must contain the string '$CONDITIONS'. " +
158157
"if Number of Splits is not set to 1.", importQuery),
159158
"Include '$CONDITIONS' in the Import Query")
160159
.withConfigProperty(IMPORT_QUERY);
161-
}
162160
}
163161

164162
if (!hasOneSplit && !containsMacro(SPLIT_BY) && (splitBy == null || splitBy.isEmpty())) {
@@ -201,7 +199,6 @@ public void validateSchema(Schema actualSchema, FailureCollector collector) {
201199
actualField.getSchema().getNonNullable() : actualField.getSchema();
202200
Schema expectedFieldSchema = field.getSchema().isNullable() ?
203201
field.getSchema().getNonNullable() : field.getSchema();
204-
205202
validateField(collector, field, actualFieldSchema, expectedFieldSchema);
206203
}
207204
}

database-commons/src/test/java/io/cdap/plugin/db/CommonSchemaReaderTest.java

Lines changed: 2 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -66,31 +66,19 @@ public void before() {
6666
*/
6767
@Test
6868
public void testGetSchemaFieldsWithConnection() throws Exception {
69-
// Setup mocks for DatabaseMetaData and columns ResultSet
7069
when(mockConn.getMetaData()).thenReturn(mockDbMeta);
71-
// Simulate resolveTableName: table exists with name "MYTABLE"
72-
when(mockDbMeta.getTables(any(), any(), any(), eq(new String[]{"TABLE"}))).thenReturn(mockTables);
73-
when(mockTables.next()).thenReturn(true, false);
74-
when(mockTables.getString("TABLE_NAME")).thenReturn("MYTABLE");
7570

76-
// Simulate columns: two columns, one nullable, one not
7771
when(mockDbMeta.getColumns(any(), any(), eq("MYTABLE"), any())).thenReturn(mockColumns);
7872
when(mockColumns.next()).thenReturn(true, true, false);
79-
// Column 1
8073
when(mockColumns.getString("COLUMN_NAME")).thenReturn("id", "name");
8174
when(mockColumns.getString("TYPE_NAME")).thenReturn("INTEGER", "VARCHAR");
8275
when(mockColumns.getInt("DATA_TYPE")).thenReturn(Types.INTEGER, Types.VARCHAR);
8376
when(mockColumns.getInt("COLUMN_SIZE")).thenReturn(10, 255);
8477
when(mockColumns.getInt("DECIMAL_DIGITS")).thenReturn(0, 0);
8578
when(mockColumns.getInt("NULLABLE")).thenReturn(DatabaseMetaData.columnNoNulls, DatabaseMetaData.columnNullable);
8679

87-
// NOTE: In a real test, you may need to mock DBUtils.getSchema if it is static.
88-
// For demonstration, we assume the mapping is correct.
89-
90-
// Run
9180
java.util.List<Schema.Field> fields = reader.getSchemaFields(mockConn, "MYTABLE");
9281

93-
// Verify
9482
Assert.assertEquals(2, fields.size());
9583
Assert.assertEquals("id", fields.get(0).getName());
9684
Assert.assertEquals(Schema.of(Schema.Type.INT), fields.get(0).getSchema());
@@ -107,9 +95,6 @@ public void testGetSchemaFieldsWithConnection() throws Exception {
10795
public void testGetSchemaFieldsWithSchemaQualifiedName() throws Exception {
10896
// Setup for schema-qualified table name "myschema.MYTABLE"
10997
when(mockConn.getMetaData()).thenReturn(mockDbMeta);
110-
when(mockDbMeta.getTables(any(), eq("myschema"), any(), eq(new String[]{"TABLE"}))).thenReturn(mockTables);
111-
when(mockTables.next()).thenReturn(true, false);
112-
when(mockTables.getString("TABLE_NAME")).thenReturn("MYTABLE");
11398

11499
when(mockDbMeta.getColumns(any(), eq("myschema"), eq("MYTABLE"), any())).thenReturn(mockColumns);
115100
when(mockColumns.next()).thenReturn(true, false);
@@ -132,10 +117,6 @@ public void testGetSchemaFieldsWithSchemaQualifiedName() throws Exception {
132117
@Test
133118
public void testGetSchemaFieldsHandlesNullability() throws Exception {
134119
when(mockConn.getMetaData()).thenReturn(mockDbMeta);
135-
when(mockDbMeta.getTables(any(), any(), any(), eq(new String[]{"TABLE"}))).thenReturn(mockTables);
136-
when(mockTables.next()).thenReturn(true, false);
137-
when(mockTables.getString("TABLE_NAME")).thenReturn("MYTABLE");
138-
139120
when(mockDbMeta.getColumns(any(), any(), eq("MYTABLE"), any())).thenReturn(mockColumns);
140121
when(mockColumns.next()).thenReturn(true, true, false);
141122
when(mockColumns.getString("COLUMN_NAME")).thenReturn("col1", "col2");
@@ -156,8 +137,8 @@ public void testGetSchemaFieldsHandlesNullability() throws Exception {
156137
@Test(expected = SQLException.class)
157138
public void testGetSchemaFieldsThrowsWhenTableNotFound() throws Exception {
158139
when(mockConn.getMetaData()).thenReturn(mockDbMeta);
159-
when(mockDbMeta.getTables(any(), any(), any(), eq(new String[]{"TABLE"}))).thenReturn(mockTables);
160-
when(mockTables.next()).thenReturn(false); // Table not found
140+
when(mockDbMeta.getColumns(any(), any(), eq("NOTABLE"), any())).thenReturn(mockColumns);
141+
when(mockColumns.next()).thenReturn(false); // No columns found
161142

162143
reader.getSchemaFields(mockConn, "NOTABLE");
163144
}

mssql-plugin/widgets/SqlServer-batchsource.json

Lines changed: 0 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -372,30 +372,6 @@
372372
"name": "connection"
373373
}
374374
]
375-
},
376-
{
377-
"name": "ImportQuery",
378-
"condition": {
379-
"expression": "importQueryType == 'importQuery'"
380-
},
381-
"show": [
382-
{
383-
"type": "property",
384-
"name": "importQuery"
385-
}
386-
]
387-
},
388-
{
389-
"name": "NativeTableName",
390-
"condition": {
391-
"expression": "importQueryType == 'tableName'"
392-
},
393-
"show": [
394-
{
395-
"type": "property",
396-
"name": "tableName"
397-
}
398-
]
399375
}
400376
],
401377
"jump-config": {

mysql-plugin/widgets/Mysql-batchsource.json

Lines changed: 0 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -344,30 +344,6 @@
344344
"name": "connection"
345345
}
346346
]
347-
},
348-
{
349-
"name": "ImportQuery",
350-
"condition": {
351-
"expression": "importQueryType == 'importQuery'"
352-
},
353-
"show": [
354-
{
355-
"type": "property",
356-
"name": "importQuery"
357-
}
358-
]
359-
},
360-
{
361-
"name": "NativeTableName",
362-
"condition": {
363-
"expression": "importQueryType == 'tableName'"
364-
},
365-
"show": [
366-
{
367-
"type": "property",
368-
"name": "tableName"
369-
}
370-
]
371347
}
372348
],
373349
"jump-config": {

0 commit comments

Comments
 (0)