Skip to content

Patch/abhi tablename : Adding import query type property and modify tableName property for redshift and postgres plugin #607

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: develop
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -22,7 +22,8 @@
import io.cdap.plugin.db.CommonSchemaReader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.sql.Connection;
import java.sql.DatabaseMetaData;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
@@ -72,17 +73,16 @@ public Schema getSchema(ResultSetMetaData metadata, int index) throws SQLExcepti
int precision = metadata.getPrecision(index);
if (precision == 0) {
LOG.warn(String.format("Field '%s' is a %s type without precision and scale, "
+ "converting into STRING type to avoid any precision loss.",
metadata.getColumnName(index),
metadata.getColumnTypeName(index)));
+ "converting into STRING type to avoid any precision loss.",
metadata.getColumnName(index),
metadata.getColumnTypeName(index)));
return Schema.of(Schema.Type.STRING);
}
}

if (typeName.equalsIgnoreCase("timestamp")) {
return Schema.of(Schema.LogicalType.DATETIME);
}

return super.getSchema(metadata, index);
}

@@ -113,5 +113,4 @@ public List<Schema.Field> getSchemaFields(ResultSet resultSet) throws SQLExcepti
}
return schemaFields;
}

}
Original file line number Diff line number Diff line change
@@ -17,13 +17,15 @@
package io.cdap.plugin.amazon.redshift;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Strings;
import io.cdap.cdap.api.annotation.Description;
import io.cdap.cdap.api.annotation.Macro;
import io.cdap.cdap.api.annotation.Metadata;
import io.cdap.cdap.api.annotation.MetadataProperty;
import io.cdap.cdap.api.annotation.Name;
import io.cdap.cdap.api.annotation.Plugin;
import io.cdap.cdap.etl.api.FailureCollector;
import io.cdap.cdap.etl.api.PipelineConfigurer;
import io.cdap.cdap.etl.api.batch.BatchSource;
import io.cdap.cdap.etl.api.batch.BatchSourceContext;
import io.cdap.cdap.etl.api.connector.Connector;
@@ -36,6 +38,9 @@
import io.cdap.plugin.util.DBUtils;
import org.apache.hadoop.mapreduce.lib.db.DBWritable;

import java.sql.Connection;
import java.sql.DatabaseMetaData;
import java.sql.SQLException;
import java.util.Collections;
import java.util.Map;
import javax.annotation.Nullable;
@@ -59,6 +64,20 @@ public RedshiftSource(RedshiftSourceConfig redshiftSourceConfig) {
this.redshiftSourceConfig = redshiftSourceConfig;
}

@Override
public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
FailureCollector collector = pipelineConfigurer.getStageConfigurer().getFailureCollector();
if ((!sourceConfig.containsMacro("tableName") && !sourceConfig.containsMacro("importQuery"))
&& (Strings.isNullOrEmpty(sourceConfig.getTableName()) &&
(Strings.isNullOrEmpty(sourceConfig.getImportQuery())))) {
collector.addFailure(
"Either 'tableName' or 'importQuery' must be specified.",
"Provide a value for either 'tableName' or 'importQuery' in the configuration."
).withConfigProperty(sourceConfig.getTableName()).withConfigProperty(sourceConfig.getImportQuery());
}
super.configurePipeline(pipelineConfigurer);
}

@Override
protected SchemaReader getSchemaReader() {
return new RedshiftSchemaReader();
48 changes: 48 additions & 0 deletions amazon-redshift-plugin/widgets/Redshift-batchsource.json
Original file line number Diff line number Diff line change
@@ -108,6 +108,30 @@
{
"label": "SQL Query",
"properties": [
{
"widget-type": "radio-group",
"label": "Import Query Type",
"name": "importQueryType",
"widget-attributes": {
"layout": "inline",
"default": "importQuery",
"options": [
{
"id": "importQuery",
"label": "Native Query"
},
{
"id": "tableName",
"label": "Named Table"
}
]
}
},
{
"widget-type": "textbox",
"label": "Table Name",
"name": "tableName"
},
{
"widget-type": "textarea",
"label": "Import Query",
@@ -229,6 +253,30 @@
}
]
},
{
"name": "ImportQuery",
"condition": {
"expression": "importQueryType != 'tableName'"
},
"show": [
{
"type": "property",
"name": "importQuery"
}
]
},
{
"name": "NativeTableName",
"condition": {
"expression": "importQueryType == 'tableName'"
},
"show": [
{
"type": "property",
"name": "tableName"
}
]
}
],
"jump-config": {
"datasets": [
24 changes: 24 additions & 0 deletions cloudsql-mysql-plugin/widgets/CloudSQLMySQL-batchsource.json
Original file line number Diff line number Diff line change
@@ -127,6 +127,30 @@
{
"label": "CloudSQL Properties",
"properties": [
{
"widget-type": "hidden",
"label": "Import Query Type",
"name": "importQueryType",
"widget-attributes": {
"layout": "inline",
"default": "importQuery",
"options": [
{
"id": "importQuery",
"label": "Native Query"
},
{
"id": "tableName",
"label": "Named Table"
}
]
}
},
{
"widget-type": "hidden",
"label": "Table Name",
"name": "tableName"
},
{
"widget-type": "textarea",
"label": "Import Query",
Original file line number Diff line number Diff line change
@@ -127,6 +127,30 @@
{
"label": "CloudSQL Properties",
"properties": [
{
"widget-type": "hidden",
"label": "Import Query Type",
"name": "importQueryType",
"widget-attributes": {
"layout": "inline",
"default": "importQuery",
"options": [
{
"id": "importQuery",
"label": "Native Query"
},
{
"id": "tableName",
"label": "Named Table"
}
]
}
},
{
"widget-type": "hidden",
"label": "Table Name",
"name": "tableName"
},
{
"widget-type": "textarea",
"label": "Import Query",
Original file line number Diff line number Diff line change
@@ -20,6 +20,8 @@
import io.cdap.cdap.api.data.schema.Schema;
import io.cdap.plugin.common.db.DBUtils;

import java.sql.Connection;
import java.sql.DatabaseMetaData;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
@@ -29,7 +31,6 @@
* Common schema reader for mapping non specific DB types.
*/
public class CommonSchemaReader implements SchemaReader {

@Override
public List<Schema.Field> getSchemaFields(ResultSet resultSet) throws SQLException {
List<Schema.Field> schemaFields = Lists.newArrayList();
@@ -61,4 +62,50 @@ public Schema getSchema(ResultSetMetaData metadata, int index) throws SQLExcepti
public boolean shouldIgnoreColumn(ResultSetMetaData metadata, int index) throws SQLException {
return false;
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove empty line

Copy link
Author

@AbhishekKumar9984 AbhishekKumar9984 Jun 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed

/**
* Returns the schema fields for the specified table using JDBC metadata.
* Supports schema-qualified table names (e.g. "schema.table").
* Throws SQLException if the table has no columns.
*
* @param connection JDBC connection
* @param tableName table name, optionally schema-qualified
* @return list of schema fields
* @throws SQLException if no columns found or on database error
*/
@Override
public List<Schema.Field> getSchemaFields(Connection connection, String tableName) throws SQLException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add javadoc whenever adding a public method

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added javdoc

DatabaseMetaData dbMetaData = connection.getMetaData();
String schema = null;
String table = tableName;
// Support schema-qualified table names like "schema.table"
if (tableName != null && tableName.contains(".")) {
String[] parts = tableName.split("\\.", 2);
schema = parts[0];
table = parts[1];
}
try (ResultSet columns = dbMetaData.getColumns(null, schema, table, null)) {
List<Schema.Field> schemaFields = Lists.newArrayList();
while (columns.next()) {
String columnName = columns.getString("COLUMN_NAME");
String typeName = columns.getString("TYPE_NAME");
int columnType = columns.getInt("DATA_TYPE");
int precision = columns.getInt("COLUMN_SIZE");
int scale = columns.getInt("DECIMAL_DIGITS");
int nullable = columns.getInt("NULLABLE");

Schema columnSchema = DBUtils.getSchema(typeName, columnType, precision, scale, columnName, true, true);
if (nullable == DatabaseMetaData.columnNullable) {
columnSchema = Schema.nullableOf(columnSchema);
}
Schema.Field field = Schema.Field.of(columnName, columnSchema);
schemaFields.add(field);
}
if (schemaFields.isEmpty()) {
throw new SQLException("No columns found for table: " +
(schema != null ? schema + "." : "") + table);
}
return schemaFields;
}
}
}
Original file line number Diff line number Diff line change
@@ -18,6 +18,7 @@

import io.cdap.cdap.api.data.schema.Schema;

import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
@@ -64,4 +65,6 @@ public interface SchemaReader {
* @throws SQLException
*/
boolean shouldIgnoreColumn(ResultSetMetaData metadata, int index) throws SQLException;

List<Schema.Field> getSchemaFields(Connection connection, String tableName) throws SQLException;
}
Original file line number Diff line number Diff line change
@@ -40,8 +40,9 @@
* Abstract Config for DB Specific Source plugin
*/
public abstract class AbstractDBSpecificSourceConfig extends PluginConfig implements DatabaseSourceConfig {

public static final String TABLE_NAME = "tableName";
public static final String IMPORT_QUERY = "importQuery";
public static final String PROPERTY_IMPORT_QUERY_TYPE = "importQueryType";
public static final String BOUNDING_QUERY = "boundingQuery";
public static final String SPLIT_BY = "splitBy";
public static final String NUM_SPLITS = "numSplits";
@@ -54,6 +55,19 @@ public abstract class AbstractDBSpecificSourceConfig extends PluginConfig implem
@Description(Constants.Reference.REFERENCE_NAME_DESCRIPTION)
public String referenceName;

@Name(PROPERTY_IMPORT_QUERY_TYPE)
@Description("Whether to select Table Name or Import Query to extract the data.")
@Macro
@Nullable
public String importQueryType;

@Nullable
@Name(TABLE_NAME)
@Description("The name of the table to import data from. This can be used instead of specifying an import query.")
@Macro
protected String tableName;

@Nullable
@Name(IMPORT_QUERY)
@Description("The SELECT query to use to import data from the specified table. " +
"You can specify an arbitrary number of columns to import, or import all columns using *. " +
@@ -103,10 +117,15 @@ public String getImportQuery() {
return cleanQuery(importQuery);
}

public String getTableName() {
return tableName;
}

public String getBoundingQuery() {
return cleanQuery(boundingQuery);
}


public void validate(FailureCollector collector) {
boolean hasOneSplit = false;
if (!containsMacro(NUM_SPLITS) && numSplits != null) {
@@ -125,16 +144,19 @@ public void validate(FailureCollector collector) {
TransactionIsolationLevel.validate(getTransactionIsolationLevel(), collector);
}

if (!containsMacro(IMPORT_QUERY) && Strings.isNullOrEmpty(importQuery)) {
collector.addFailure("Import Query is empty.", "Specify the Import Query.")
.withConfigProperty(IMPORT_QUERY);
if ((!containsMacro(TABLE_NAME) && !containsMacro(IMPORT_QUERY)) &&
(Strings.isNullOrEmpty(tableName) && Strings.isNullOrEmpty(importQuery))) {
collector.addFailure(" Import Query must be specified.",
" Import Query, Can not be empty.")
.withConfigProperty(IMPORT_QUERY);
}

if (!hasOneSplit && !containsMacro(IMPORT_QUERY) && !getImportQuery().contains("$CONDITIONS")) {
collector.addFailure(String.format(
"Import Query %s must contain the string '$CONDITIONS'. if Number of Splits is not set to 1.", importQuery),
"Include '$CONDITIONS' in the Import Query")
.withConfigProperty(IMPORT_QUERY);
if (!Strings.isNullOrEmpty(importQuery) &&
(!hasOneSplit && !containsMacro(IMPORT_QUERY) && !getImportQuery().contains("$CONDITIONS"))) {
collector.addFailure(String.format(
"Import Query %s must contain the string '$CONDITIONS'. " +
"if Number of Splits is not set to 1.", importQuery),
"Include '$CONDITIONS' in the Import Query")
.withConfigProperty(IMPORT_QUERY);
}

if (!hasOneSplit && !containsMacro(SPLIT_BY) && (splitBy == null || splitBy.isEmpty())) {
@@ -177,8 +199,7 @@ public void validateSchema(Schema actualSchema, FailureCollector collector) {
actualField.getSchema().getNonNullable() : actualField.getSchema();
Schema expectedFieldSchema = field.getSchema().isNullable() ?
field.getSchema().getNonNullable() : field.getSchema();

validateField(collector, field, actualFieldSchema, expectedFieldSchema);
validateField(collector, field, actualFieldSchema, expectedFieldSchema);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove space

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed

}
}

Original file line number Diff line number Diff line change
@@ -90,4 +90,6 @@ public interface DatabaseSourceConfig extends DatabaseConnectionConfig {
* @return the number of rows to fetch at a time per split
*/
Integer getFetchSize();

String getTableName();
}
Loading