diff --git a/wayang-api/pom.xml b/wayang-api/pom.xml index 9ac61fa54..ce1cd07b1 100644 --- a/wayang-api/pom.xml +++ b/wayang-api/pom.xml @@ -40,6 +40,7 @@ wayang-api-sql wayang-api-json wayang-api-utils + wayang-api-jdbc diff --git a/wayang-api/wayang-api-jdbc/pom.xml b/wayang-api/wayang-api-jdbc/pom.xml new file mode 100644 index 000000000..dded8c4d8 --- /dev/null +++ b/wayang-api/wayang-api-jdbc/pom.xml @@ -0,0 +1,112 @@ + + + + 4.0.0 + + + wayang-api + org.apache.wayang + 1.1.2-SNAPSHOT + + + wayang-api-jdbc + Wayang API JDBC + + JDBC driver for Apache Wayang that allows external tools to connect + to Wayang as if it were a relational database. + + + + + + org.apache.wayang + wayang-api-sql + ${project.version} + + + + + org.apache.wayang + wayang-core + ${project.version} + + + + + org.apache.wayang + wayang-java + ${project.version} + + + + + org.apache.wayang + wayang-basic + ${project.version} + + + + + + com.fasterxml.jackson.core + jackson-databind + test + + + + + com.googlecode.json-simple + json-simple + 1.1.1 + test + + + + + + org.junit.jupiter + junit-jupiter + test + + + + + junit + junit + 4.13.2 + test + + + + + + + + + org.apache.maven.plugins + maven-surefire-plugin + + --add-opens=java.base/java.io=ALL-UNNAMED --add-opens=java.base/java.lang=ALL-UNNAMED + + + + + \ No newline at end of file diff --git a/wayang-api/wayang-api-jdbc/src/main/java/org/apache/wayang/api/jdbc/WayangConnection.java b/wayang-api/wayang-api-jdbc/src/main/java/org/apache/wayang/api/jdbc/WayangConnection.java new file mode 100644 index 000000000..d4255e8c7 --- /dev/null +++ b/wayang-api/wayang-api-jdbc/src/main/java/org/apache/wayang/api/jdbc/WayangConnection.java @@ -0,0 +1,325 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.wayang.api.jdbc; + +import org.apache.wayang.api.sql.context.SqlContext; +import org.apache.wayang.core.api.Configuration; + +import java.sql.*; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.Executor; + +public class WayangConnection implements Connection { + + private final String url; + private final SqlContext sqlContext; + private boolean closed = false; + private final Properties properties; + + public WayangConnection(final String url, final String configPath, final Properties properties) throws SQLException { + this.url = url; + this.properties = properties != null ? properties : new Properties(); + try { + final Configuration configuration; + if (configPath != null && !configPath.isEmpty()) { + configuration = new Configuration(configPath); + } else { + configuration = new Configuration(); + } + this.sqlContext = new SqlContext(configuration); + } catch (Exception e) { + throw new SQLException("Failed to create WayangConnection: " + e.getMessage(), e); + } + } + + /** + * Package-private constructor for testing. + * Accepts a pre-built SqlContext directly, bypassing config file loading. + */ + WayangConnection(final String url, final SqlContext sqlContext, final Properties properties) { + this.url = url; + this.properties = properties != null ? properties : new Properties(); + this.sqlContext = sqlContext; + } + + public SqlContext getSqlContext() { + return sqlContext; + } + + @Override + public Statement createStatement() throws SQLException { + checkClosed(); + return new WayangStatement(this); + } + + @Override + public PreparedStatement prepareStatement(final String sql) throws SQLException { + checkClosed(); + return new WayangPreparedStatement(this, sql); + } + + @Override + public void close() throws SQLException { + this.closed = true; + } + + @Override + public boolean isClosed() throws SQLException { + return closed; + } + + @Override + public boolean isValid(final int timeout) throws SQLException { + return !closed; + } + + @Override + public DatabaseMetaData getMetaData() throws SQLException { + checkClosed(); + return new WayangDatabaseMetaData(this); + } + + @Override + public String getCatalog() throws SQLException { + return "wayang"; + } + + @Override + public void setCatalog(final String catalog) throws SQLException {} + + @Override + public int getTransactionIsolation() throws SQLException { + return Connection.TRANSACTION_NONE; + } + + @Override + public void setTransactionIsolation(final int level) throws SQLException {} + + @Override + public boolean getAutoCommit() throws SQLException { + return true; + } + + @Override + public void setAutoCommit(final boolean autoCommit) throws SQLException {} + + @Override + public void commit() throws SQLException {} + + @Override + public void rollback() throws SQLException { + throw new SQLFeatureNotSupportedException("Wayang does not support transactions"); + } + + @Override + public void rollback(final Savepoint savepoint) throws SQLException { + throw new SQLFeatureNotSupportedException("Wayang does not support transactions"); + } + + @Override + public Statement createStatement(final int resultSetType, final int resultSetConcurrency) throws SQLException { + return createStatement(); + } + + @Override + public Statement createStatement(final int resultSetType, final int resultSetConcurrency, + final int resultSetHoldability) throws SQLException { + return createStatement(); + } + + @Override + public PreparedStatement prepareStatement(final String sql, final int resultSetType, + final int resultSetConcurrency) throws SQLException { + return prepareStatement(sql); + } + + @Override + public PreparedStatement prepareStatement(final String sql, final int resultSetType, + final int resultSetConcurrency, final int resultSetHoldability) throws SQLException { + return prepareStatement(sql); + } + + @Override + public PreparedStatement prepareStatement(final String sql, final int autoGeneratedKeys) throws SQLException { + return prepareStatement(sql); + } + + @Override + public PreparedStatement prepareStatement(final String sql, final int[] columnIndexes) throws SQLException { + return prepareStatement(sql); + } + + @Override + public PreparedStatement prepareStatement(final String sql, final String[] columnNames) throws SQLException { + return prepareStatement(sql); + } + + @Override + public CallableStatement prepareCall(final String sql) throws SQLException { + throw new SQLFeatureNotSupportedException("Wayang does not support stored procedures"); + } + + @Override + public CallableStatement prepareCall(final String sql, final int resultSetType, + final int resultSetConcurrency) throws SQLException { + throw new SQLFeatureNotSupportedException("Wayang does not support stored procedures"); + } + + @Override + public CallableStatement prepareCall(final String sql, final int resultSetType, + final int resultSetConcurrency, final int resultSetHoldability) throws SQLException { + throw new SQLFeatureNotSupportedException("Wayang does not support stored procedures"); + } + + @Override + public String nativeSQL(final String sql) throws SQLException { + return sql; + } + + @Override + public SQLWarning getWarnings() throws SQLException { + return null; + } + + @Override + public void clearWarnings() throws SQLException {} + + @Override + public Map> getTypeMap() throws SQLException { + throw new SQLFeatureNotSupportedException("getTypeMap not supported"); + } + + @Override + public void setTypeMap(final Map> map) throws SQLException { + throw new SQLFeatureNotSupportedException("setTypeMap not supported"); + } + + @Override + public int getHoldability() throws SQLException { + return ResultSet.CLOSE_CURSORS_AT_COMMIT; + } + + @Override + public void setHoldability(final int holdability) throws SQLException {} + + @Override + public Savepoint setSavepoint() throws SQLException { + throw new SQLFeatureNotSupportedException("Wayang does not support savepoints"); + } + + @Override + public Savepoint setSavepoint(final String name) throws SQLException { + throw new SQLFeatureNotSupportedException("Wayang does not support savepoints"); + } + + @Override + public void releaseSavepoint(final Savepoint savepoint) throws SQLException { + throw new SQLFeatureNotSupportedException("Wayang does not support savepoints"); + } + + @Override + public boolean isReadOnly() throws SQLException { + return true; + } + + @Override + public void setReadOnly(final boolean readOnly) throws SQLException {} + + @Override + public String getSchema() throws SQLException { + return "wayang"; + } + + @Override + public void setSchema(final String schema) throws SQLException {} + + @Override + public void abort(final Executor executor) throws SQLException { + close(); + } + + @Override + public void setNetworkTimeout(final Executor executor, final int milliseconds) throws SQLException {} + + @Override + public int getNetworkTimeout() throws SQLException { + return 0; + } + + @Override + public Clob createClob() throws SQLException { + throw new SQLFeatureNotSupportedException("createClob not supported"); + } + + @Override + public Blob createBlob() throws SQLException { + throw new SQLFeatureNotSupportedException("createBlob not supported"); + } + + @Override + public NClob createNClob() throws SQLException { + throw new SQLFeatureNotSupportedException("createNClob not supported"); + } + + @Override + public SQLXML createSQLXML() throws SQLException { + throw new SQLFeatureNotSupportedException("createSQLXML not supported"); + } + + @Override + public java.sql.Array createArrayOf(final String typeName, final Object[] elements) throws SQLException { + throw new SQLFeatureNotSupportedException("createArrayOf not supported"); + } + + @Override + public Struct createStruct(final String typeName, final Object[] attributes) throws SQLException { + throw new SQLFeatureNotSupportedException("createStruct not supported"); + } + + @Override + public void setClientInfo(final String name, final String value) throws SQLClientInfoException {} + + @Override + public void setClientInfo(final Properties properties) throws SQLClientInfoException {} + + @Override + public String getClientInfo(final String name) throws SQLException { + return null; + } + + @Override + public Properties getClientInfo() throws SQLException { + return new Properties(); + } + + @Override + public T unwrap(final Class iface) throws SQLException { + if (iface.isInstance(this)) return iface.cast(this); + throw new SQLException("Cannot unwrap to " + iface.getName()); + } + + @Override + public boolean isWrapperFor(final Class iface) throws SQLException { + return iface.isInstance(this); + } + + private void checkClosed() throws SQLException { + if (closed) throw new SQLException("Connection is closed"); + } +} \ No newline at end of file diff --git a/wayang-api/wayang-api-jdbc/src/main/java/org/apache/wayang/api/jdbc/WayangDatabaseMetaData.java b/wayang-api/wayang-api-jdbc/src/main/java/org/apache/wayang/api/jdbc/WayangDatabaseMetaData.java new file mode 100644 index 000000000..1ea45c99b --- /dev/null +++ b/wayang-api/wayang-api-jdbc/src/main/java/org/apache/wayang/api/jdbc/WayangDatabaseMetaData.java @@ -0,0 +1,544 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.wayang.api.jdbc; + +import java.sql.*; +import java.util.ArrayList; +import java.util.List; + +/** + * JDBC DatabaseMetaData implementation for Apache Wayang. + * + * Describes the Wayang "database" to external tools: + * - Product name and version + * - Supported SQL features + * - Available schemas and tables + * + * Tools like DBeaver call this immediately after connecting + * to understand what they are talking to. + */ +public class WayangDatabaseMetaData implements DatabaseMetaData { + + private final WayangConnection connection; + + public WayangDatabaseMetaData(final WayangConnection connection) { + this.connection = connection; + } + + // ------------------------------------------------------------------------- + // Product identity — what is this database? + // ------------------------------------------------------------------------- + + @Override + public String getDatabaseProductName() throws SQLException { + return "Apache Wayang"; + } + + @Override + public String getDatabaseProductVersion() throws SQLException { + return "1.1.2-SNAPSHOT"; + } + + @Override + public String getDriverName() throws SQLException { + return "Wayang JDBC Driver"; + } + + @Override + public String getDriverVersion() throws SQLException { + return WayangDriver.MAJOR_VERSION + "." + WayangDriver.MINOR_VERSION; + } + + @Override + public int getDriverMajorVersion() { + return WayangDriver.MAJOR_VERSION; + } + + @Override + public int getDriverMinorVersion() { + return WayangDriver.MINOR_VERSION; + } + + @Override + public int getDatabaseMajorVersion() throws SQLException { + return 1; + } + + @Override + public int getDatabaseMinorVersion() throws SQLException { + return 0; + } + + @Override + public int getJDBCMajorVersion() throws SQLException { + return 4; + } + + @Override + public int getJDBCMinorVersion() throws SQLException { + return 2; + } + + // ------------------------------------------------------------------------- + // Connection info + // ------------------------------------------------------------------------- + + @Override + public Connection getConnection() throws SQLException { + return connection; + } + + @Override + public String getURL() throws SQLException { + return connection.getCatalog(); + } + + @Override + public String getUserName() throws SQLException { + return "wayang"; + } + + // ------------------------------------------------------------------------- + // SQL syntax support + // ------------------------------------------------------------------------- + + @Override + public String getSQLKeywords() throws SQLException { + return ""; + } + + @Override + public String getNumericFunctions() throws SQLException { + return "ABS,CEIL,FLOOR,ROUND,MOD"; + } + + @Override + public String getStringFunctions() throws SQLException { + return "CONCAT,LOWER,UPPER,TRIM,SUBSTRING,LENGTH"; + } + + @Override + public String getSystemFunctions() throws SQLException { + return "DATABASE,USER,IFNULL"; + } + + @Override + public String getTimeDateFunctions() throws SQLException { + return "CURRENT_DATE,CURRENT_TIME,CURRENT_TIMESTAMP"; + } + + @Override + public String getSearchStringEscape() throws SQLException { + return "\\"; + } + + @Override + public String getExtraNameCharacters() throws SQLException { + return ""; + } + + @Override + public String getIdentifierQuoteString() throws SQLException { + return "\""; + } + + @Override + public String getCatalogSeparator() throws SQLException { + return "."; + } + + @Override + public String getCatalogTerm() throws SQLException { + return "catalog"; + } + + @Override + public String getSchemaTerm() throws SQLException { + return "schema"; + } + + @Override + public String getProcedureTerm() throws SQLException { + return "procedure"; + } + + // ------------------------------------------------------------------------- + // Feature support flags + // ------------------------------------------------------------------------- + + @Override public boolean allProceduresAreCallable() throws SQLException { return false; } + @Override public boolean allTablesAreSelectable() throws SQLException { return true; } + @Override public boolean isReadOnly() throws SQLException { return true; } + @Override public boolean nullsAreSortedHigh() throws SQLException { return false; } + @Override public boolean nullsAreSortedLow() throws SQLException { return true; } + @Override public boolean nullsAreSortedAtStart() throws SQLException { return false; } + @Override public boolean nullsAreSortedAtEnd() throws SQLException { return false; } + @Override public boolean usesLocalFiles() throws SQLException { return false; } + @Override public boolean usesLocalFilePerTable() throws SQLException { return false; } + @Override public boolean supportsMixedCaseIdentifiers() throws SQLException { return false; } + @Override public boolean storesUpperCaseIdentifiers() throws SQLException { return false; } + @Override public boolean storesLowerCaseIdentifiers() throws SQLException { return true; } + @Override public boolean storesMixedCaseIdentifiers() throws SQLException { return false; } + @Override public boolean supportsMixedCaseQuotedIdentifiers() throws SQLException { return true; } + @Override public boolean storesUpperCaseQuotedIdentifiers() throws SQLException { return false; } + @Override public boolean storesLowerCaseQuotedIdentifiers() throws SQLException { return false; } + @Override public boolean storesMixedCaseQuotedIdentifiers() throws SQLException { return true; } + @Override public boolean supportsAlterTableWithAddColumn() throws SQLException { return false; } + @Override public boolean supportsAlterTableWithDropColumn() throws SQLException { return false; } + @Override public boolean supportsColumnAliasing() throws SQLException { return true; } + @Override public boolean nullPlusNonNullIsNull() throws SQLException { return true; } + @Override public boolean supportsConvert() throws SQLException { return false; } + @Override public boolean supportsConvert(int fromType, int toType) throws SQLException { return false; } + @Override public boolean supportsTableCorrelationNames() throws SQLException { return true; } + @Override public boolean supportsDifferentTableCorrelationNames() throws SQLException { return false; } + @Override public boolean supportsExpressionsInOrderBy() throws SQLException { return true; } + @Override public boolean supportsOrderByUnrelated() throws SQLException { return false; } + @Override public boolean supportsGroupBy() throws SQLException { return true; } + @Override public boolean supportsGroupByUnrelated() throws SQLException { return false; } + @Override public boolean supportsGroupByBeyondSelect() throws SQLException { return false; } + @Override public boolean supportsLikeEscapeClause() throws SQLException { return false; } + @Override public boolean supportsMultipleResultSets() throws SQLException { return false; } + @Override public boolean supportsMultipleTransactions() throws SQLException { return false; } + @Override public boolean supportsNonNullableColumns() throws SQLException { return false; } + @Override public boolean supportsMinimumSQLGrammar() throws SQLException { return true; } + @Override public boolean supportsCoreSQLGrammar() throws SQLException { return false; } + @Override public boolean supportsExtendedSQLGrammar() throws SQLException { return false; } + @Override public boolean supportsANSI92EntryLevelSQL() throws SQLException { return true; } + @Override public boolean supportsANSI92IntermediateSQL() throws SQLException { return false; } + @Override public boolean supportsANSI92FullSQL() throws SQLException { return false; } + @Override public boolean supportsIntegrityEnhancementFacility() throws SQLException { return false; } + @Override public boolean supportsOuterJoins() throws SQLException { return true; } + @Override public boolean supportsFullOuterJoins() throws SQLException { return false; } + @Override public boolean supportsLimitedOuterJoins() throws SQLException { return true; } + @Override public boolean isCatalogAtStart() throws SQLException { return true; } + @Override public boolean supportsSchemasInDataManipulation() throws SQLException { return true; } + @Override public boolean supportsSchemasInProcedureCalls() throws SQLException { return false; } + @Override public boolean supportsSchemasInTableDefinitions() throws SQLException { return false; } + @Override public boolean supportsSchemasInIndexDefinitions() throws SQLException { return false; } + @Override public boolean supportsSchemasInPrivilegeDefinitions() throws SQLException { return false; } + @Override public boolean supportsCatalogsInDataManipulation() throws SQLException { return false; } + @Override public boolean supportsCatalogsInProcedureCalls() throws SQLException { return false; } + @Override public boolean supportsCatalogsInTableDefinitions() throws SQLException { return false; } + @Override public boolean supportsCatalogsInIndexDefinitions() throws SQLException { return false; } + @Override public boolean supportsCatalogsInPrivilegeDefinitions() throws SQLException { return false; } + @Override public boolean supportsPositionedDelete() throws SQLException { return false; } + @Override public boolean supportsPositionedUpdate() throws SQLException { return false; } + @Override public boolean supportsSelectForUpdate() throws SQLException { return false; } + @Override public boolean supportsStoredProcedures() throws SQLException { return false; } + @Override public boolean supportsSubqueriesInComparisons() throws SQLException { return true; } + @Override public boolean supportsSubqueriesInExists() throws SQLException { return true; } + @Override public boolean supportsSubqueriesInIns() throws SQLException { return true; } + @Override public boolean supportsSubqueriesInQuantifieds() throws SQLException { return false; } + @Override public boolean supportsCorrelatedSubqueries() throws SQLException { return false; } + @Override public boolean supportsUnion() throws SQLException { return false; } + @Override public boolean supportsUnionAll() throws SQLException { return false; } + @Override public boolean supportsOpenCursorsAcrossCommit() throws SQLException { return false; } + @Override public boolean supportsOpenCursorsAcrossRollback() throws SQLException { return false; } + @Override public boolean supportsOpenStatementsAcrossCommit() throws SQLException { return false; } + @Override public boolean supportsOpenStatementsAcrossRollback() throws SQLException { return false; } + @Override public boolean supportsTransactions() throws SQLException { return false; } + @Override + public boolean supportsSavepoints() throws SQLException { + return false; + } + @Override public boolean supportsDataDefinitionAndDataManipulationTransactions() throws SQLException { return false; } + @Override public boolean supportsDataManipulationTransactionsOnly() throws SQLException { return false; } + @Override public boolean dataDefinitionCausesTransactionCommit() throws SQLException { return false; } + @Override public boolean dataDefinitionIgnoredInTransactions() throws SQLException { return false; } + @Override public boolean supportsBatchUpdates() throws SQLException { return false; } + @Override public boolean supportsNamedParameters() throws SQLException { return false; } + @Override public boolean supportsMultipleOpenResults() throws SQLException { return false; } + @Override public boolean supportsGetGeneratedKeys() throws SQLException { return false; } + @Override public boolean supportsResultSetType(int type) throws SQLException { return type == ResultSet.TYPE_FORWARD_ONLY; } + @Override public boolean supportsResultSetConcurrency(int type, int concurrency) throws SQLException { return concurrency == ResultSet.CONCUR_READ_ONLY; } + @Override public boolean ownUpdatesAreVisible(int type) throws SQLException { return false; } + @Override public boolean ownDeletesAreVisible(int type) throws SQLException { return false; } + @Override public boolean ownInsertsAreVisible(int type) throws SQLException { return false; } + @Override public boolean othersUpdatesAreVisible(int type) throws SQLException { return false; } + @Override public boolean othersDeletesAreVisible(int type) throws SQLException { return false; } + @Override public boolean othersInsertsAreVisible(int type) throws SQLException { return false; } + @Override public boolean updatesAreDetected(int type) throws SQLException { return false; } + @Override public boolean deletesAreDetected(int type) throws SQLException { return false; } + @Override public boolean insertsAreDetected(int type) throws SQLException { return false; } + @Override public boolean locatorsUpdateCopy() throws SQLException { return false; } + @Override public boolean supportsStatementPooling() throws SQLException { return false; } + @Override public boolean supportsStoredFunctionsUsingCallSyntax() throws SQLException { return false; } + @Override public boolean autoCommitFailureClosesAllResultSets() throws SQLException { return false; } + @Override public boolean generatedKeyAlwaysReturned() throws SQLException { return false; } + + // ------------------------------------------------------------------------- + // Limits + // ------------------------------------------------------------------------- + + @Override public int getMaxBinaryLiteralLength() throws SQLException { return 0; } + @Override public int getMaxCharLiteralLength() throws SQLException { return 0; } + @Override public int getMaxColumnNameLength() throws SQLException { return 0; } + @Override public int getMaxColumnsInGroupBy() throws SQLException { return 0; } + @Override public int getMaxColumnsInIndex() throws SQLException { return 0; } + @Override public int getMaxColumnsInOrderBy() throws SQLException { return 0; } + @Override public int getMaxColumnsInSelect() throws SQLException { return 0; } + @Override public int getMaxColumnsInTable() throws SQLException { return 0; } + @Override public int getMaxConnections() throws SQLException { return 0; } + @Override public int getMaxCursorNameLength() throws SQLException { return 0; } + @Override public int getMaxIndexLength() throws SQLException { return 0; } + @Override public int getMaxSchemaNameLength() throws SQLException { return 0; } + @Override public int getMaxProcedureNameLength() throws SQLException { return 0; } + @Override public int getMaxCatalogNameLength() throws SQLException { return 0; } + @Override public int getMaxRowSize() throws SQLException { return 0; } + @Override public boolean doesMaxRowSizeIncludeBlobs() throws SQLException { return false; } + @Override public int getMaxStatementLength() throws SQLException { return 0; } + @Override public int getMaxStatements() throws SQLException { return 0; } + @Override public int getMaxTableNameLength() throws SQLException { return 0; } + @Override public int getMaxTablesInSelect() throws SQLException { return 0; } + @Override public int getMaxUserNameLength() throws SQLException { return 0; } + + // ------------------------------------------------------------------------- + // Transactions + // ------------------------------------------------------------------------- + + @Override + public boolean supportsTransactionIsolationLevel(final int level) throws SQLException { + return level == Connection.TRANSACTION_NONE; + } + + @Override + public int getDefaultTransactionIsolation() throws SQLException { + return Connection.TRANSACTION_NONE; + } + + // ------------------------------------------------------------------------- + // Schema / Table / Column metadata ResultSets (return empty sets) + // ------------------------------------------------------------------------- + + @Override + public ResultSet getSchemas() throws SQLException { + final java.util.List rows = new ArrayList<>(); + final java.util.List colNames = java.util.Arrays.asList("TABLE_SCHEM", "TABLE_CATALOG"); + rows.add(new org.apache.wayang.basic.data.Record(new Object[]{"wayang", "wayang"})); + final WayangResultSet rs = new WayangResultSet(rows, "getSchemas"); + rs.overrideColumnNames(colNames); + return rs; + } + + @Override + public ResultSet getSchemas(final String catalog, final String schemaPattern) throws SQLException { + return getSchemas(); + } + + @Override + public ResultSet getCatalogs() throws SQLException { + return emptyResultSet(); + } + + @Override + public ResultSet getTables(final String catalog, final String schemaPattern, + final String tableNamePattern, final String[] types) throws SQLException { + final java.util.List rows = new ArrayList<>(); + final java.util.List colNames = java.util.Arrays.asList( + "TABLE_CAT", "TABLE_SCHEM", "TABLE_NAME", "TABLE_TYPE", + "REMARKS", "TYPE_CAT", "TYPE_SCHEM", "TYPE_NAME", + "SELF_REFERENCING_COL_NAME", "REF_GENERATION"); + rows.add(new org.apache.wayang.basic.data.Record(new Object[]{ + "wayang", "wayang", "wayang_table", "TABLE", + "Apache Wayang virtual table", null, null, null, null, null})); + final WayangResultSet rs = new WayangResultSet(rows, "getTables"); + rs.overrideColumnNames(colNames); + return rs; + } + + @Override + public ResultSet getColumns(final String catalog, final String schemaPattern, + final String tableNamePattern, final String columnNamePattern) throws SQLException { + final java.util.List rows = new ArrayList<>(); + final java.util.List colNames = java.util.Arrays.asList( + "TABLE_CAT", "TABLE_SCHEM", "TABLE_NAME", "COLUMN_NAME", + "DATA_TYPE", "TYPE_NAME", "COLUMN_SIZE", "BUFFER_LENGTH", + "DECIMAL_DIGITS", "NUM_PREC_RADIX", "NULLABLE", "REMARKS", + "COLUMN_DEF", "SQL_DATA_TYPE", "SQL_DATETIME_SUB", + "CHAR_OCTET_LENGTH", "ORDINAL_POSITION", "IS_NULLABLE", + "SCOPE_CATALOG", "SCOPE_SCHEMA", "SCOPE_TABLE", + "SOURCE_DATA_TYPE", "IS_AUTOINCREMENT", "IS_GENERATEDCOLUMN"); + final WayangResultSet rs = new WayangResultSet(rows, "getColumns"); + rs.overrideColumnNames(colNames); + return rs; + } + + @Override + public ResultSet getPrimaryKeys(final String catalog, final String schema, + final String table) throws SQLException { + return emptyResultSet(); + } + + @Override + public ResultSet getTableTypes() throws SQLException { + return emptyResultSet(); + } + + @Override + public ResultSet getProcedures(final String catalog, final String schemaPattern, + final String procedureNamePattern) throws SQLException { + return emptyResultSet(); + } + + @Override + public ResultSet getProcedureColumns(final String catalog, final String schemaPattern, + final String procedureNamePattern, final String columnNamePattern) throws SQLException { + return emptyResultSet(); + } + + @Override + public ResultSet getColumnPrivileges(final String catalog, final String schema, + final String table, final String columnNamePattern) throws SQLException { + return emptyResultSet(); + } + + @Override + public ResultSet getTablePrivileges(final String catalog, final String schemaPattern, + final String tableNamePattern) throws SQLException { + return emptyResultSet(); + } + + @Override + public ResultSet getBestRowIdentifier(final String catalog, final String schema, + final String table, final int scope, final boolean nullable) throws SQLException { + return emptyResultSet(); + } + + @Override + public ResultSet getVersionColumns(final String catalog, final String schema, + final String table) throws SQLException { + return emptyResultSet(); + } + + @Override + public ResultSet getImportedKeys(final String catalog, final String schema, + final String table) throws SQLException { + return emptyResultSet(); + } + + @Override + public ResultSet getExportedKeys(final String catalog, final String schema, + final String table) throws SQLException { + return emptyResultSet(); + } + + @Override + public ResultSet getCrossReference(final String parentCatalog, final String parentSchema, + final String parentTable, final String foreignCatalog, final String foreignSchema, + final String foreignTable) throws SQLException { + return emptyResultSet(); + } + + @Override + public ResultSet getTypeInfo() throws SQLException { + return emptyResultSet(); + } + + @Override + public ResultSet getIndexInfo(final String catalog, final String schema, final String table, + final boolean unique, final boolean approximate) throws SQLException { + return emptyResultSet(); + } + + @Override + public ResultSet getUDTs(final String catalog, final String schemaPattern, + final String typeNamePattern, final int[] types) throws SQLException { + return emptyResultSet(); + } + + @Override + public ResultSet getSuperTypes(final String catalog, final String schemaPattern, + final String typeNamePattern) throws SQLException { + return emptyResultSet(); + } + + @Override + public ResultSet getSuperTables(final String catalog, final String schemaPattern, + final String tableNamePattern) throws SQLException { + return emptyResultSet(); + } + + @Override + public ResultSet getAttributes(final String catalog, final String schemaPattern, + final String typeNamePattern, final String attributeNamePattern) throws SQLException { + return emptyResultSet(); + } + + @Override + public ResultSet getClientInfoProperties() throws SQLException { + return emptyResultSet(); + } + + @Override + public ResultSet getFunctions(final String catalog, final String schemaPattern, + final String functionNamePattern) throws SQLException { + return emptyResultSet(); + } + + @Override + public ResultSet getFunctionColumns(final String catalog, final String schemaPattern, + final String functionNamePattern, final String columnNamePattern) throws SQLException { + return emptyResultSet(); + } + + @Override + public ResultSet getPseudoColumns(final String catalog, final String schemaPattern, + final String tableNamePattern, final String columnNamePattern) throws SQLException { + return emptyResultSet(); + } + + @Override + public RowIdLifetime getRowIdLifetime() throws SQLException { + return RowIdLifetime.ROWID_UNSUPPORTED; + } + + @Override + public int getSQLStateType() throws SQLException { + return sqlStateSQL; + } + + @Override + public int getResultSetHoldability() throws SQLException { + return ResultSet.CLOSE_CURSORS_AT_COMMIT; + } + + @Override + public boolean supportsResultSetHoldability(final int holdability) throws SQLException { + return holdability == ResultSet.CLOSE_CURSORS_AT_COMMIT; + } + + // ------------------------------------------------------------------------- + // Unwrap + // ------------------------------------------------------------------------- + + @Override + public T unwrap(final Class iface) throws SQLException { + if (iface.isInstance(this)) return iface.cast(this); + throw new SQLException("Cannot unwrap to " + iface.getName()); + } + + @Override + public boolean isWrapperFor(final Class iface) throws SQLException { + return iface.isInstance(this); + } + + /** Returns an empty ResultSet for metadata queries that have no data yet */ + private ResultSet emptyResultSet() throws SQLException { + return new WayangResultSet(new ArrayList<>(), ""); + } +} \ No newline at end of file diff --git a/wayang-api/wayang-api-jdbc/src/main/java/org/apache/wayang/api/jdbc/WayangDriver.java b/wayang-api/wayang-api-jdbc/src/main/java/org/apache/wayang/api/jdbc/WayangDriver.java new file mode 100644 index 000000000..3130140c5 --- /dev/null +++ b/wayang-api/wayang-api-jdbc/src/main/java/org/apache/wayang/api/jdbc/WayangDriver.java @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.wayang.api.jdbc; + +import java.sql.Connection; +import java.sql.Driver; +import java.sql.DriverManager; +import java.sql.DriverPropertyInfo; +import java.sql.SQLException; +import java.sql.SQLFeatureNotSupportedException; +import java.util.Properties; +import java.util.logging.Logger; + +/** + * JDBC Driver for Apache Wayang. + * + * Allows external tools to connect to Wayang using standard JDBC. + * + * Connection URL format: + * jdbc:wayang: + * + * Example: + * jdbc:wayang:/path/to/wayang.properties + * + * Usage: + * Connection conn = DriverManager.getConnection("jdbc:wayang:/path/to/config.properties"); + * Statement stmt = conn.createStatement(); + * ResultSet rs = stmt.executeQuery("SELECT * FROM myTable"); + */ +public class WayangDriver implements Driver { + + /** The prefix all Wayang JDBC URLs must start with */ + public static final String URL_PREFIX = "jdbc:wayang:"; + + /** JDBC driver version */ + public static final int MAJOR_VERSION = 1; + public static final int MINOR_VERSION = 0; + + // Auto-register this driver with the DriverManager when the class is loaded + static { + try { + DriverManager.registerDriver(new WayangDriver()); + } catch (SQLException e) { + throw new RuntimeException("Failed to register WayangDriver", e); + } + } + + /** + * Attempts to connect to Wayang using the given URL. + * + * @param url Must start with "jdbc:wayang:" + * @param info Optional properties (unused for now) + * @return a WayangConnection, or null if the URL is not for this driver + */ + @Override + public Connection connect(final String url, final Properties info) throws SQLException { + if (!acceptsURL(url)) { + // Returning null tells DriverManager this driver can't handle this URL + return null; + } + + // Extract the config path from the URL + // e.g. "jdbc:wayang:/path/to/config.properties" -> "/path/to/config.properties" + final String configPath = url.substring(URL_PREFIX.length()); + + return new WayangConnection(url, configPath, info); + } + + /** + * Returns true if this driver can handle the given URL. + * Only accepts URLs starting with "jdbc:wayang:" + */ + @Override + public boolean acceptsURL(final String url) throws SQLException { + return url != null && url.startsWith(URL_PREFIX); + } + + @Override + public DriverPropertyInfo[] getPropertyInfo(final String url, final Properties info) throws SQLException { + return new DriverPropertyInfo[0]; + } + + @Override + public int getMajorVersion() { + return MAJOR_VERSION; + } + + @Override + public int getMinorVersion() { + return MINOR_VERSION; + } + + /** + * JDBC-compliant means it fully implements the JDBC spec. + * We return false since this is an incomplete/custom implementation. + */ + @Override + public boolean jdbcCompliant() { + return false; + } + + @Override + public Logger getParentLogger() throws SQLFeatureNotSupportedException { + throw new SQLFeatureNotSupportedException("WayangDriver does not use java.util.logging"); + } +} \ No newline at end of file diff --git a/wayang-api/wayang-api-jdbc/src/main/java/org/apache/wayang/api/jdbc/WayangPreparedStatement.java b/wayang-api/wayang-api-jdbc/src/main/java/org/apache/wayang/api/jdbc/WayangPreparedStatement.java new file mode 100644 index 000000000..68c389019 --- /dev/null +++ b/wayang-api/wayang-api-jdbc/src/main/java/org/apache/wayang/api/jdbc/WayangPreparedStatement.java @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.wayang.api.jdbc; + +import java.io.InputStream; +import java.io.Reader; +import java.math.BigDecimal; +import java.net.URL; +import java.sql.*; +import java.util.ArrayList; +import java.util.Calendar; +import java.util.List; + +public class WayangPreparedStatement extends WayangStatement implements PreparedStatement { + + private final String sqlTemplate; + private final List parameters; + + public WayangPreparedStatement(final WayangConnection connection, final String sql) { + super(connection); + this.sqlTemplate = sql; + this.parameters = new ArrayList<>(); + } + + private String buildSql() throws SQLException { + final StringBuilder result = new StringBuilder(); + int paramIndex = 0; + boolean inQuote = false; + for (int i = 0; i < sqlTemplate.length(); i++) { + final char c = sqlTemplate.charAt(i); + if (c == '\'') { + inQuote = !inQuote; + result.append(c); + } else if (c == '?' && !inQuote) { + if (paramIndex >= parameters.size()) + throw new SQLException("Missing parameter at position " + (paramIndex + 1)); + result.append(formatParameter(parameters.get(paramIndex))); + paramIndex++; + } else { + result.append(c); + } + } + return result.toString(); + } + + private String formatParameter(final Object value) { + if (value == null) return "NULL"; + if (value instanceof String) return "'" + ((String) value).replace("'", "''") + "'"; + if (value instanceof java.sql.Date) return "'" + value + "'"; + if (value instanceof java.sql.Timestamp) return "'" + value + "'"; + if (value instanceof java.sql.Time) return "'" + value + "'"; + return value.toString(); + } + + @Override + public ResultSet executeQuery() throws SQLException { + return executeQuery(buildSql()); + } + + @Override + public boolean execute() throws SQLException { + return execute(buildSql()); + } + + @Override + public int executeUpdate() throws SQLException { + throw new SQLFeatureNotSupportedException("Wayang does not support INSERT/UPDATE/DELETE"); + } + + private void setParam(final int parameterIndex, final Object value) throws SQLException { + while (parameters.size() < parameterIndex) parameters.add(null); + parameters.set(parameterIndex - 1, value); + } + + @Override public void setNull(int i, int t) throws SQLException { setParam(i, null); } + @Override public void setNull(int i, int t, String n) throws SQLException { setParam(i, null); } + @Override public void setBoolean(int i, boolean x) throws SQLException { setParam(i, x); } + @Override public void setByte(int i, byte x) throws SQLException { setParam(i, x); } + @Override public void setShort(int i, short x) throws SQLException { setParam(i, x); } + @Override public void setInt(int i, int x) throws SQLException { setParam(i, x); } + @Override public void setLong(int i, long x) throws SQLException { setParam(i, x); } + @Override public void setFloat(int i, float x) throws SQLException { setParam(i, x); } + @Override public void setDouble(int i, double x) throws SQLException { setParam(i, x); } + @Override public void setBigDecimal(int i, BigDecimal x) throws SQLException { setParam(i, x); } + @Override public void setString(int i, String x) throws SQLException { setParam(i, x); } + @Override public void setBytes(int i, byte[] x) throws SQLException { setParam(i, x); } + @Override public void setDate(int i, Date x) throws SQLException { setParam(i, x); } + @Override public void setDate(int i, Date x, Calendar c) throws SQLException { setParam(i, x); } + @Override public void setTime(int i, Time x) throws SQLException { setParam(i, x); } + @Override public void setTime(int i, Time x, Calendar c) throws SQLException { setParam(i, x); } + @Override public void setTimestamp(int i, Timestamp x) throws SQLException { setParam(i, x); } + @Override public void setTimestamp(int i, Timestamp x, Calendar c) throws SQLException { setParam(i, x); } + @Override public void setObject(int i, Object x) throws SQLException { setParam(i, x); } + @Override public void setObject(int i, Object x, int t) throws SQLException { setParam(i, x); } + @Override public void setObject(int i, Object x, int t, int s) throws SQLException { setParam(i, x); } + @Override public void clearParameters() throws SQLException { parameters.clear(); } + + @Override public ResultSetMetaData getMetaData() throws SQLException { return null; } + @Override public ParameterMetaData getParameterMetaData() throws SQLException { throw new SQLFeatureNotSupportedException(); } + + @Override public void setAsciiStream(int i, InputStream x, int l) throws SQLException { throw new SQLFeatureNotSupportedException(); } + @Override public void setAsciiStream(int i, InputStream x, long l) throws SQLException { throw new SQLFeatureNotSupportedException(); } + @Override public void setAsciiStream(int i, InputStream x) throws SQLException { throw new SQLFeatureNotSupportedException(); } + @Override public void setUnicodeStream(int i, InputStream x, int l) throws SQLException { throw new SQLFeatureNotSupportedException(); } + @Override public void setBinaryStream(int i, InputStream x, int l) throws SQLException { throw new SQLFeatureNotSupportedException(); } + @Override public void setBinaryStream(int i, InputStream x, long l) throws SQLException { throw new SQLFeatureNotSupportedException(); } + @Override public void setBinaryStream(int i, InputStream x) throws SQLException { throw new SQLFeatureNotSupportedException(); } + @Override public void setCharacterStream(int i, Reader x, int l) throws SQLException { throw new SQLFeatureNotSupportedException(); } + @Override public void setCharacterStream(int i, Reader x, long l) throws SQLException { throw new SQLFeatureNotSupportedException(); } + @Override public void setCharacterStream(int i, Reader x) throws SQLException { throw new SQLFeatureNotSupportedException(); } + @Override public void setRef(int i, Ref x) throws SQLException { throw new SQLFeatureNotSupportedException(); } + @Override public void setBlob(int i, Blob x) throws SQLException { throw new SQLFeatureNotSupportedException(); } + @Override public void setBlob(int i, InputStream x, long l) throws SQLException { throw new SQLFeatureNotSupportedException(); } + @Override public void setBlob(int i, InputStream x) throws SQLException { throw new SQLFeatureNotSupportedException(); } + @Override public void setClob(int i, Clob x) throws SQLException { throw new SQLFeatureNotSupportedException(); } + @Override public void setClob(int i, Reader x, long l) throws SQLException { throw new SQLFeatureNotSupportedException(); } + @Override public void setClob(int i, Reader x) throws SQLException { throw new SQLFeatureNotSupportedException(); } + @Override public void setArray(int i, Array x) throws SQLException { throw new SQLFeatureNotSupportedException(); } + @Override public void setURL(int i, URL x) throws SQLException { throw new SQLFeatureNotSupportedException(); } + @Override public void setRowId(int i, RowId x) throws SQLException { throw new SQLFeatureNotSupportedException(); } + @Override public void setNString(int i, String x) throws SQLException { throw new SQLFeatureNotSupportedException(); } + @Override public void setNCharacterStream(int i, Reader x, long l) throws SQLException { throw new SQLFeatureNotSupportedException(); } + @Override public void setNCharacterStream(int i, Reader x) throws SQLException { throw new SQLFeatureNotSupportedException(); } + @Override public void setNClob(int i, NClob x) throws SQLException { throw new SQLFeatureNotSupportedException(); } + @Override public void setNClob(int i, Reader x, long l) throws SQLException { throw new SQLFeatureNotSupportedException(); } + @Override public void setNClob(int i, Reader x) throws SQLException { throw new SQLFeatureNotSupportedException(); } + @Override public void setSQLXML(int i, SQLXML x) throws SQLException { throw new SQLFeatureNotSupportedException(); } + @Override public void addBatch() throws SQLException { throw new SQLFeatureNotSupportedException(); } +} diff --git a/wayang-api/wayang-api-jdbc/src/main/java/org/apache/wayang/api/jdbc/WayangResultSet.java b/wayang-api/wayang-api-jdbc/src/main/java/org/apache/wayang/api/jdbc/WayangResultSet.java new file mode 100644 index 000000000..ab979fb05 --- /dev/null +++ b/wayang-api/wayang-api-jdbc/src/main/java/org/apache/wayang/api/jdbc/WayangResultSet.java @@ -0,0 +1,306 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.wayang.api.jdbc; + +import org.apache.wayang.basic.data.Record; + +import java.io.InputStream; +import java.io.Reader; +import java.math.BigDecimal; +import java.net.URL; +import java.sql.*; +import java.util.ArrayList; +import java.util.Calendar; +import java.util.Collection; +import java.util.List; +import java.util.Map; + +public class WayangResultSet implements ResultSet { + + private final List records; + private int cursor = -1; + private boolean closed = false; + private boolean wasNull = false; + private final String sql; + private List columnNames; + + public WayangResultSet(final Collection records, final String sql) { + this.records = new ArrayList<>(records); + this.sql = sql; + this.columnNames = extractColumnNames(); + } + + private List extractColumnNames() { + final List names = new ArrayList<>(); + if (!records.isEmpty()) { + final Record lastRecord = records.get(records.size() - 1); + boolean isHeader = false; + if (lastRecord.getValues() != null) { + isHeader = true; + for (final Object val : lastRecord.getValues()) { + if (val != null && !(val instanceof String)) { + isHeader = false; + break; + } + } + } + if (isHeader) { + for (final Object val : lastRecord.getValues()) { + names.add(val != null ? val.toString() : "col_" + names.size()); + } + records.remove(records.size() - 1); + } + } + if (names.isEmpty() && !records.isEmpty()) { + final int colCount = records.get(0).getValues().length; + for (int i = 0; i < colCount; i++) names.add("col_" + i); + } + return names; + } + + /** + * Allows overriding column names externally (used by DatabaseMetaData methods). + */ + public void overrideColumnNames(final List names) { + this.columnNames = new ArrayList<>(names); + } + + private Record currentRecord() throws SQLException { + if (cursor < 0 || cursor >= records.size()) + throw new SQLException("No current row. Call next() first."); + return records.get(cursor); + } + + private Object getValue(final int columnIndex) throws SQLException { + final Object[] values = currentRecord().getValues(); + if (columnIndex < 1 || columnIndex > values.length) + throw new SQLException("Column index out of range: " + columnIndex); + final Object val = values[columnIndex - 1]; + wasNull = (val == null); + return val; + } + + private Object getValue(final String columnLabel) throws SQLException { + final int index = columnNames.indexOf(columnLabel); + if (index == -1) throw new SQLException("Column not found: " + columnLabel); + return getValue(index + 1); + } + + @Override public boolean next() throws SQLException { checkClosed(); cursor++; return cursor < records.size(); } + @Override public boolean wasNull() throws SQLException { return wasNull; } + @Override public boolean isBeforeFirst() throws SQLException { return cursor == -1; } + @Override public boolean isAfterLast() throws SQLException { return cursor >= records.size(); } + @Override public boolean isFirst() throws SQLException { return cursor == 0; } + @Override public boolean isLast() throws SQLException { return cursor == records.size() - 1; } + @Override public int getRow() throws SQLException { return cursor + 1; } + @Override public void beforeFirst() throws SQLException { cursor = -1; } + @Override public void afterLast() throws SQLException { cursor = records.size(); } + @Override public boolean first() throws SQLException { cursor = 0; return !records.isEmpty(); } + @Override public boolean last() throws SQLException { cursor = records.size() - 1; return !records.isEmpty(); } + @Override public boolean absolute(int row) throws SQLException { cursor = row - 1; return cursor >= 0 && cursor < records.size(); } + @Override public boolean relative(int rows) throws SQLException { cursor += rows; return cursor >= 0 && cursor < records.size(); } + @Override public boolean previous() throws SQLException { cursor--; return cursor >= 0; } + @Override public void close() throws SQLException { closed = true; } + @Override public boolean isClosed() throws SQLException { return closed; } + + @Override public String getString(int i) throws SQLException { Object v = getValue(i); return v == null ? null : v.toString(); } + @Override public String getString(String s) throws SQLException { Object v = getValue(s); return v == null ? null : v.toString(); } + @Override public boolean getBoolean(int i) throws SQLException { Object v = getValue(i); if (v == null) return false; if (v instanceof Boolean) return (Boolean)v; return Boolean.parseBoolean(v.toString()); } + @Override public boolean getBoolean(String s) throws SQLException { Object v = getValue(s); if (v == null) return false; if (v instanceof Boolean) return (Boolean)v; return Boolean.parseBoolean(v.toString()); } + @Override public byte getByte(int i) throws SQLException { Object v = getValue(i); if (v == null) return 0; if (v instanceof Number) return ((Number)v).byteValue(); return Byte.parseByte(v.toString()); } + @Override public byte getByte(String s) throws SQLException { Object v = getValue(s); if (v == null) return 0; if (v instanceof Number) return ((Number)v).byteValue(); return Byte.parseByte(v.toString()); } + @Override public short getShort(int i) throws SQLException { Object v = getValue(i); if (v == null) return 0; if (v instanceof Number) return ((Number)v).shortValue(); return Short.parseShort(v.toString()); } + @Override public short getShort(String s) throws SQLException { Object v = getValue(s); if (v == null) return 0; if (v instanceof Number) return ((Number)v).shortValue(); return Short.parseShort(v.toString()); } + @Override public int getInt(int i) throws SQLException { Object v = getValue(i); if (v == null) return 0; if (v instanceof Number) return ((Number)v).intValue(); return Integer.parseInt(v.toString()); } + @Override public int getInt(String s) throws SQLException { Object v = getValue(s); if (v == null) return 0; if (v instanceof Number) return ((Number)v).intValue(); return Integer.parseInt(v.toString()); } + @Override public long getLong(int i) throws SQLException { Object v = getValue(i); if (v == null) return 0L; if (v instanceof Number) return ((Number)v).longValue(); return Long.parseLong(v.toString()); } + @Override public long getLong(String s) throws SQLException { Object v = getValue(s); if (v == null) return 0L; if (v instanceof Number) return ((Number)v).longValue(); return Long.parseLong(v.toString()); } + @Override public float getFloat(int i) throws SQLException { Object v = getValue(i); if (v == null) return 0f; if (v instanceof Number) return ((Number)v).floatValue(); return Float.parseFloat(v.toString()); } + @Override public float getFloat(String s) throws SQLException { Object v = getValue(s); if (v == null) return 0f; if (v instanceof Number) return ((Number)v).floatValue(); return Float.parseFloat(v.toString()); } + @Override public double getDouble(int i) throws SQLException { Object v = getValue(i); if (v == null) return 0.0; if (v instanceof Number) return ((Number)v).doubleValue(); return Double.parseDouble(v.toString()); } + @Override public double getDouble(String s) throws SQLException { Object v = getValue(s); if (v == null) return 0.0; if (v instanceof Number) return ((Number)v).doubleValue(); return Double.parseDouble(v.toString()); } + @Override public BigDecimal getBigDecimal(int i) throws SQLException { Object v = getValue(i); if (v == null) return null; if (v instanceof BigDecimal) return (BigDecimal)v; return new BigDecimal(v.toString()); } + @Override public BigDecimal getBigDecimal(String s) throws SQLException { Object v = getValue(s); if (v == null) return null; if (v instanceof BigDecimal) return (BigDecimal)v; return new BigDecimal(v.toString()); } + @Override public BigDecimal getBigDecimal(int i, int scale) throws SQLException { BigDecimal bd = getBigDecimal(i); return bd == null ? null : bd.setScale(scale); } + @Override public BigDecimal getBigDecimal(String s, int scale) throws SQLException { BigDecimal bd = getBigDecimal(s); return bd == null ? null : bd.setScale(scale); } + @Override public Object getObject(int i) throws SQLException { return getValue(i); } + @Override public Object getObject(String s) throws SQLException { return getValue(s); } + @Override public Object getObject(int i, Map> m) throws SQLException { return getObject(i); } + @Override public Object getObject(String s, Map> m) throws SQLException { return getObject(s); } + @Override public T getObject(int i, Class t) throws SQLException { Object v = getValue(i); if (v == null) return null; if (t.isInstance(v)) return t.cast(v); throw new SQLException("Cannot convert"); } + @Override public T getObject(String s, Class t) throws SQLException { Object v = getValue(s); if (v == null) return null; if (t.isInstance(v)) return t.cast(v); throw new SQLException("Cannot convert"); } + + @Override public Date getDate(int i) throws SQLException { Object v = getValue(i); if (v == null) return null; if (v instanceof Date) return (Date)v; return Date.valueOf(v.toString()); } + @Override public Date getDate(String s) throws SQLException { Object v = getValue(s); if (v == null) return null; if (v instanceof Date) return (Date)v; return Date.valueOf(v.toString()); } + @Override public Date getDate(int i, Calendar c) throws SQLException { return getDate(i); } + @Override public Date getDate(String s, Calendar c) throws SQLException { return getDate(s); } + @Override public Time getTime(int i) throws SQLException { Object v = getValue(i); if (v == null) return null; if (v instanceof Time) return (Time)v; return Time.valueOf(v.toString()); } + @Override public Time getTime(String s) throws SQLException { Object v = getValue(s); if (v == null) return null; if (v instanceof Time) return (Time)v; return Time.valueOf(v.toString()); } + @Override public Time getTime(int i, Calendar c) throws SQLException { return getTime(i); } + @Override public Time getTime(String s, Calendar c) throws SQLException { return getTime(s); } + @Override public Timestamp getTimestamp(int i) throws SQLException { Object v = getValue(i); if (v == null) return null; if (v instanceof Timestamp) return (Timestamp)v; return Timestamp.valueOf(v.toString()); } + @Override public Timestamp getTimestamp(String s) throws SQLException { Object v = getValue(s); if (v == null) return null; if (v instanceof Timestamp) return (Timestamp)v; return Timestamp.valueOf(v.toString()); } + @Override public Timestamp getTimestamp(int i, Calendar c) throws SQLException { return getTimestamp(i); } + @Override public Timestamp getTimestamp(String s, Calendar c) throws SQLException { return getTimestamp(s); } + + @Override public ResultSetMetaData getMetaData() throws SQLException { return new WayangResultSetMetaData(columnNames, records); } + @Override public int findColumn(String s) throws SQLException { int i = columnNames.indexOf(s); if (i == -1) throw new SQLException("Column not found: " + s); return i + 1; } + + @Override public SQLWarning getWarnings() throws SQLException { return null; } + @Override public void clearWarnings() throws SQLException {} + @Override public void setFetchDirection(int d) throws SQLException {} + @Override public int getFetchDirection() throws SQLException { return FETCH_FORWARD; } + @Override public void setFetchSize(int r) throws SQLException {} + @Override public int getFetchSize() throws SQLException { return 0; } + @Override public int getType() throws SQLException { return TYPE_FORWARD_ONLY; } + @Override public int getConcurrency() throws SQLException { return CONCUR_READ_ONLY; } + @Override public int getHoldability() throws SQLException { return CLOSE_CURSORS_AT_COMMIT; } + @Override public boolean rowUpdated() throws SQLException { return false; } + @Override public boolean rowInserted() throws SQLException { return false; } + @Override public boolean rowDeleted() throws SQLException { return false; } + @Override public Statement getStatement() throws SQLException { return null; } + + @Override public byte[] getBytes(int i) throws SQLException { throw new SQLFeatureNotSupportedException(); } + @Override public byte[] getBytes(String s) throws SQLException { throw new SQLFeatureNotSupportedException(); } + @Override public InputStream getAsciiStream(int i) throws SQLException { throw new SQLFeatureNotSupportedException(); } + @Override public InputStream getAsciiStream(String s) throws SQLException { throw new SQLFeatureNotSupportedException(); } + @Override public InputStream getUnicodeStream(int i) throws SQLException { throw new SQLFeatureNotSupportedException(); } + @Override public InputStream getUnicodeStream(String s) throws SQLException { throw new SQLFeatureNotSupportedException(); } + @Override public InputStream getBinaryStream(int i) throws SQLException { throw new SQLFeatureNotSupportedException(); } + @Override public InputStream getBinaryStream(String s) throws SQLException { throw new SQLFeatureNotSupportedException(); } + @Override public Reader getCharacterStream(int i) throws SQLException { throw new SQLFeatureNotSupportedException(); } + @Override public Reader getCharacterStream(String s) throws SQLException { throw new SQLFeatureNotSupportedException(); } + @Override public String getCursorName() throws SQLException { throw new SQLFeatureNotSupportedException(); } + @Override public Ref getRef(int i) throws SQLException { throw new SQLFeatureNotSupportedException(); } + @Override public Ref getRef(String s) throws SQLException { throw new SQLFeatureNotSupportedException(); } + @Override public Blob getBlob(int i) throws SQLException { throw new SQLFeatureNotSupportedException(); } + @Override public Blob getBlob(String s) throws SQLException { throw new SQLFeatureNotSupportedException(); } + @Override public Clob getClob(int i) throws SQLException { throw new SQLFeatureNotSupportedException(); } + @Override public Clob getClob(String s) throws SQLException { throw new SQLFeatureNotSupportedException(); } + @Override public Array getArray(int i) throws SQLException { throw new SQLFeatureNotSupportedException(); } + @Override public Array getArray(String s) throws SQLException { throw new SQLFeatureNotSupportedException(); } + @Override public URL getURL(int i) throws SQLException { throw new SQLFeatureNotSupportedException(); } + @Override public URL getURL(String s) throws SQLException { throw new SQLFeatureNotSupportedException(); } + @Override public NClob getNClob(int i) throws SQLException { throw new SQLFeatureNotSupportedException(); } + @Override public NClob getNClob(String s) throws SQLException { throw new SQLFeatureNotSupportedException(); } + @Override public SQLXML getSQLXML(int i) throws SQLException { throw new SQLFeatureNotSupportedException(); } + @Override public SQLXML getSQLXML(String s) throws SQLException { throw new SQLFeatureNotSupportedException(); } + @Override public String getNString(int i) throws SQLException { throw new SQLFeatureNotSupportedException(); } + @Override public String getNString(String s) throws SQLException { throw new SQLFeatureNotSupportedException(); } + @Override public Reader getNCharacterStream(int i) throws SQLException { throw new SQLFeatureNotSupportedException(); } + @Override public Reader getNCharacterStream(String s) throws SQLException { throw new SQLFeatureNotSupportedException(); } + @Override public RowId getRowId(int i) throws SQLException { throw new SQLFeatureNotSupportedException(); } + @Override public RowId getRowId(String s) throws SQLException { throw new SQLFeatureNotSupportedException(); } + + @Override public void insertRow() throws SQLException { throw new SQLFeatureNotSupportedException(); } + @Override public void updateRow() throws SQLException { throw new SQLFeatureNotSupportedException(); } + @Override public void deleteRow() throws SQLException { throw new SQLFeatureNotSupportedException(); } + @Override public void refreshRow() throws SQLException { throw new SQLFeatureNotSupportedException(); } + @Override public void cancelRowUpdates() throws SQLException { throw new SQLFeatureNotSupportedException(); } + @Override public void moveToInsertRow() throws SQLException { throw new SQLFeatureNotSupportedException(); } + @Override public void moveToCurrentRow() throws SQLException { throw new SQLFeatureNotSupportedException(); } + @Override public void updateNull(int i) throws SQLException { throw new SQLFeatureNotSupportedException(); } + @Override public void updateBoolean(int i, boolean x) throws SQLException { throw new SQLFeatureNotSupportedException(); } + @Override public void updateByte(int i, byte x) throws SQLException { throw new SQLFeatureNotSupportedException(); } + @Override public void updateShort(int i, short x) throws SQLException { throw new SQLFeatureNotSupportedException(); } + @Override public void updateInt(int i, int x) throws SQLException { throw new SQLFeatureNotSupportedException(); } + @Override public void updateLong(int i, long x) throws SQLException { throw new SQLFeatureNotSupportedException(); } + @Override public void updateFloat(int i, float x) throws SQLException { throw new SQLFeatureNotSupportedException(); } + @Override public void updateDouble(int i, double x) throws SQLException { throw new SQLFeatureNotSupportedException(); } + @Override public void updateBigDecimal(int i, BigDecimal x) throws SQLException { throw new SQLFeatureNotSupportedException(); } + @Override public void updateString(int i, String x) throws SQLException { throw new SQLFeatureNotSupportedException(); } + @Override public void updateBytes(int i, byte[] x) throws SQLException { throw new SQLFeatureNotSupportedException(); } + @Override public void updateDate(int i, Date x) throws SQLException { throw new SQLFeatureNotSupportedException(); } + @Override public void updateTime(int i, Time x) throws SQLException { throw new SQLFeatureNotSupportedException(); } + @Override public void updateTimestamp(int i, Timestamp x) throws SQLException { throw new SQLFeatureNotSupportedException(); } + @Override public void updateAsciiStream(int i, InputStream x, int l) throws SQLException { throw new SQLFeatureNotSupportedException(); } + @Override public void updateAsciiStream(int i, InputStream x, long l) throws SQLException { throw new SQLFeatureNotSupportedException(); } + @Override public void updateAsciiStream(int i, InputStream x) throws SQLException { throw new SQLFeatureNotSupportedException(); } + @Override public void updateAsciiStream(String s, InputStream x, int l) throws SQLException { throw new SQLFeatureNotSupportedException(); } + @Override public void updateAsciiStream(String s, InputStream x, long l) throws SQLException { throw new SQLFeatureNotSupportedException(); } + @Override public void updateAsciiStream(String s, InputStream x) throws SQLException { throw new SQLFeatureNotSupportedException(); } + @Override public void updateBinaryStream(int i, InputStream x, int l) throws SQLException { throw new SQLFeatureNotSupportedException(); } + @Override public void updateBinaryStream(int i, InputStream x, long l) throws SQLException { throw new SQLFeatureNotSupportedException(); } + @Override public void updateBinaryStream(int i, InputStream x) throws SQLException { throw new SQLFeatureNotSupportedException(); } + @Override public void updateBinaryStream(String s, InputStream x, int l) throws SQLException { throw new SQLFeatureNotSupportedException(); } + @Override public void updateBinaryStream(String s, InputStream x, long l) throws SQLException { throw new SQLFeatureNotSupportedException(); } + @Override public void updateBinaryStream(String s, InputStream x) throws SQLException { throw new SQLFeatureNotSupportedException(); } + @Override public void updateCharacterStream(int i, Reader x, int l) throws SQLException { throw new SQLFeatureNotSupportedException(); } + @Override public void updateCharacterStream(int i, Reader x, long l) throws SQLException { throw new SQLFeatureNotSupportedException(); } + @Override public void updateCharacterStream(int i, Reader x) throws SQLException { throw new SQLFeatureNotSupportedException(); } + @Override public void updateCharacterStream(String s, Reader x, int l) throws SQLException { throw new SQLFeatureNotSupportedException(); } + @Override public void updateCharacterStream(String s, Reader x, long l) throws SQLException { throw new SQLFeatureNotSupportedException(); } + @Override public void updateCharacterStream(String s, Reader x) throws SQLException { throw new SQLFeatureNotSupportedException(); } + @Override public void updateObject(int i, Object x, int s) throws SQLException { throw new SQLFeatureNotSupportedException(); } + @Override public void updateObject(int i, Object x) throws SQLException { throw new SQLFeatureNotSupportedException(); } + @Override public void updateNull(String s) throws SQLException { throw new SQLFeatureNotSupportedException(); } + @Override public void updateBoolean(String s, boolean x) throws SQLException { throw new SQLFeatureNotSupportedException(); } + @Override public void updateByte(String s, byte x) throws SQLException { throw new SQLFeatureNotSupportedException(); } + @Override public void updateShort(String s, short x) throws SQLException { throw new SQLFeatureNotSupportedException(); } + @Override public void updateInt(String s, int x) throws SQLException { throw new SQLFeatureNotSupportedException(); } + @Override public void updateLong(String s, long x) throws SQLException { throw new SQLFeatureNotSupportedException(); } + @Override public void updateFloat(String s, float x) throws SQLException { throw new SQLFeatureNotSupportedException(); } + @Override public void updateDouble(String s, double x) throws SQLException { throw new SQLFeatureNotSupportedException(); } + @Override public void updateBigDecimal(String s, BigDecimal x) throws SQLException { throw new SQLFeatureNotSupportedException(); } + @Override public void updateString(String s, String x) throws SQLException { throw new SQLFeatureNotSupportedException(); } + @Override public void updateBytes(String s, byte[] x) throws SQLException { throw new SQLFeatureNotSupportedException(); } + @Override public void updateDate(String s, Date x) throws SQLException { throw new SQLFeatureNotSupportedException(); } + @Override public void updateTime(String s, Time x) throws SQLException { throw new SQLFeatureNotSupportedException(); } + @Override public void updateTimestamp(String s, Timestamp x) throws SQLException { throw new SQLFeatureNotSupportedException(); } + @Override public void updateObject(String s, Object x, int sc) throws SQLException { throw new SQLFeatureNotSupportedException(); } + @Override public void updateObject(String s, Object x) throws SQLException { throw new SQLFeatureNotSupportedException(); } + @Override public void updateRef(int i, Ref x) throws SQLException { throw new SQLFeatureNotSupportedException(); } + @Override public void updateRef(String s, Ref x) throws SQLException { throw new SQLFeatureNotSupportedException(); } + @Override public void updateBlob(int i, Blob x) throws SQLException { throw new SQLFeatureNotSupportedException(); } + @Override public void updateBlob(String s, Blob x) throws SQLException { throw new SQLFeatureNotSupportedException(); } + @Override public void updateBlob(int i, InputStream x, long l) throws SQLException { throw new SQLFeatureNotSupportedException(); } + @Override public void updateBlob(String s, InputStream x, long l) throws SQLException { throw new SQLFeatureNotSupportedException(); } + @Override public void updateBlob(int i, InputStream x) throws SQLException { throw new SQLFeatureNotSupportedException(); } + @Override public void updateBlob(String s, InputStream x) throws SQLException { throw new SQLFeatureNotSupportedException(); } + @Override public void updateClob(int i, Clob x) throws SQLException { throw new SQLFeatureNotSupportedException(); } + @Override public void updateClob(String s, Clob x) throws SQLException { throw new SQLFeatureNotSupportedException(); } + @Override public void updateClob(int i, Reader x, long l) throws SQLException { throw new SQLFeatureNotSupportedException(); } + @Override public void updateClob(String s, Reader x, long l) throws SQLException { throw new SQLFeatureNotSupportedException(); } + @Override public void updateClob(int i, Reader x) throws SQLException { throw new SQLFeatureNotSupportedException(); } + @Override public void updateClob(String s, Reader x) throws SQLException { throw new SQLFeatureNotSupportedException(); } + @Override public void updateArray(int i, Array x) throws SQLException { throw new SQLFeatureNotSupportedException(); } + @Override public void updateArray(String s, Array x) throws SQLException { throw new SQLFeatureNotSupportedException(); } + @Override public void updateRowId(int i, RowId x) throws SQLException { throw new SQLFeatureNotSupportedException(); } + @Override public void updateRowId(String s, RowId x) throws SQLException { throw new SQLFeatureNotSupportedException(); } + @Override public void updateNString(int i, String x) throws SQLException { throw new SQLFeatureNotSupportedException(); } + @Override public void updateNString(String s, String x) throws SQLException { throw new SQLFeatureNotSupportedException(); } + @Override public void updateNClob(int i, NClob x) throws SQLException { throw new SQLFeatureNotSupportedException(); } + @Override public void updateNClob(String s, NClob x) throws SQLException { throw new SQLFeatureNotSupportedException(); } + @Override public void updateNClob(int i, Reader x, long l) throws SQLException { throw new SQLFeatureNotSupportedException(); } + @Override public void updateNClob(String s, Reader x, long l) throws SQLException { throw new SQLFeatureNotSupportedException(); } + @Override public void updateNClob(int i, Reader x) throws SQLException { throw new SQLFeatureNotSupportedException(); } + @Override public void updateNClob(String s, Reader x) throws SQLException { throw new SQLFeatureNotSupportedException(); } + @Override public void updateNCharacterStream(int i, Reader x, long l) throws SQLException { throw new SQLFeatureNotSupportedException(); } + @Override public void updateNCharacterStream(String s, Reader x, long l) throws SQLException { throw new SQLFeatureNotSupportedException(); } + @Override public void updateNCharacterStream(int i, Reader x) throws SQLException { throw new SQLFeatureNotSupportedException(); } + @Override public void updateNCharacterStream(String s, Reader x) throws SQLException { throw new SQLFeatureNotSupportedException(); } + @Override public void updateSQLXML(int i, SQLXML x) throws SQLException { throw new SQLFeatureNotSupportedException(); } + @Override public void updateSQLXML(String s, SQLXML x) throws SQLException { throw new SQLFeatureNotSupportedException(); } + + @Override public T unwrap(Class iface) throws SQLException { if (iface.isInstance(this)) return iface.cast(this); throw new SQLException("Cannot unwrap"); } + @Override public boolean isWrapperFor(Class iface) throws SQLException { return iface.isInstance(this); } + + private void checkClosed() throws SQLException { + if (closed) throw new SQLException("ResultSet is closed"); + } +} \ No newline at end of file diff --git a/wayang-api/wayang-api-jdbc/src/main/java/org/apache/wayang/api/jdbc/WayangResultSetMetaData.java b/wayang-api/wayang-api-jdbc/src/main/java/org/apache/wayang/api/jdbc/WayangResultSetMetaData.java new file mode 100644 index 000000000..635f15544 --- /dev/null +++ b/wayang-api/wayang-api-jdbc/src/main/java/org/apache/wayang/api/jdbc/WayangResultSetMetaData.java @@ -0,0 +1,218 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.wayang.api.jdbc; + +import org.apache.wayang.basic.data.Record; + +import java.sql.ResultSetMetaData; +import java.sql.SQLException; +import java.sql.Types; +import java.util.List; + +/** + * JDBC ResultSetMetaData implementation for Apache Wayang. + * + * Describes the columns in a WayangResultSet: + * - How many columns are there? + * - What are their names? + * - What are their SQL types? + * + * Tools like DBeaver call this to render column headers in their UI. + */ +public class WayangResultSetMetaData implements ResultSetMetaData { + + /** Column names in order */ + private final List columnNames; + + /** The actual data rows — used to infer column types */ + private final List records; + + public WayangResultSetMetaData(final List columnNames, final List records) { + this.columnNames = columnNames; + this.records = records; + } + + /** + * Returns the number of columns in this ResultSet. + */ + @Override + public int getColumnCount() throws SQLException { + return columnNames.size(); + } + + /** + * Returns the name of the column at the given index (1-based). + */ + @Override + public String getColumnName(final int column) throws SQLException { + checkColumn(column); + return columnNames.get(column - 1); + } + + /** + * Returns the label (display name) of the column — same as name for Wayang. + */ + @Override + public String getColumnLabel(final int column) throws SQLException { + return getColumnName(column); + } + + /** + * Infers the SQL type of a column by looking at the first non-null value. + * Returns Types.VARCHAR as fallback if no data is available. + */ + @Override + public int getColumnType(final int column) throws SQLException { + checkColumn(column); + if (records.isEmpty()) return Types.VARCHAR; + + // Look at first row to infer type + final Object val = records.get(0).getValues()[column - 1]; + if (val == null) return Types.VARCHAR; + if (val instanceof Integer || val instanceof Long) return Types.BIGINT; + if (val instanceof Double || val instanceof Float) return Types.DOUBLE; + if (val instanceof Boolean) return Types.BOOLEAN; + if (val instanceof java.sql.Date) return Types.DATE; + if (val instanceof java.sql.Timestamp) return Types.TIMESTAMP; + return Types.VARCHAR; // default — treat everything else as string + } + + /** + * Returns the SQL type name as a string e.g. "VARCHAR", "BIGINT". + */ + @Override + public String getColumnTypeName(final int column) throws SQLException { + switch (getColumnType(column)) { + case Types.BIGINT: return "BIGINT"; + case Types.DOUBLE: return "DOUBLE"; + case Types.BOOLEAN: return "BOOLEAN"; + case Types.DATE: return "DATE"; + case Types.TIMESTAMP: return "TIMESTAMP"; + default: return "VARCHAR"; + } + } + + /** + * Returns the Java class name for the column type. + */ + @Override + public String getColumnClassName(final int column) throws SQLException { + switch (getColumnType(column)) { + case Types.BIGINT: return Long.class.getName(); + case Types.DOUBLE: return Double.class.getName(); + case Types.BOOLEAN: return Boolean.class.getName(); + case Types.DATE: return java.sql.Date.class.getName(); + case Types.TIMESTAMP: return java.sql.Timestamp.class.getName(); + default: return String.class.getName(); + } + } + + @Override + public String getTableName(final int column) throws SQLException { + return "wayang"; + } + + @Override + public String getSchemaName(final int column) throws SQLException { + return "wayang"; + } + + @Override + public String getCatalogName(final int column) throws SQLException { + return "wayang"; + } + + @Override + public int getColumnDisplaySize(final int column) throws SQLException { + return 255; + } + + @Override + public int getPrecision(final int column) throws SQLException { + return 0; + } + + @Override + public int getScale(final int column) throws SQLException { + return 0; + } + + @Override + public boolean isAutoIncrement(final int column) throws SQLException { + return false; + } + + @Override + public boolean isCaseSensitive(final int column) throws SQLException { + return false; + } + + @Override + public boolean isSearchable(final int column) throws SQLException { + return true; + } + + @Override + public boolean isCurrency(final int column) throws SQLException { + return false; + } + + @Override + public int isNullable(final int column) throws SQLException { + return columnNullable; + } + + @Override + public boolean isSigned(final int column) throws SQLException { + return false; + } + + @Override + public boolean isReadOnly(final int column) throws SQLException { + return true; + } + + @Override + public boolean isWritable(final int column) throws SQLException { + return false; + } + + @Override + public boolean isDefinitelyWritable(final int column) throws SQLException { + return false; + } + + @Override + public T unwrap(final Class iface) throws SQLException { + if (iface.isInstance(this)) return iface.cast(this); + throw new SQLException("Cannot unwrap to " + iface.getName()); + } + + @Override + public boolean isWrapperFor(final Class iface) throws SQLException { + return iface.isInstance(this); + } + + /** Validates column index is in range */ + private void checkColumn(final int column) throws SQLException { + if (column < 1 || column > columnNames.size()) { + throw new SQLException("Column index out of range: " + column + + ". Total columns: " + columnNames.size()); + } + } +} \ No newline at end of file diff --git a/wayang-api/wayang-api-jdbc/src/main/java/org/apache/wayang/api/jdbc/WayangStatement.java b/wayang-api/wayang-api-jdbc/src/main/java/org/apache/wayang/api/jdbc/WayangStatement.java new file mode 100644 index 000000000..68cf215e6 --- /dev/null +++ b/wayang-api/wayang-api-jdbc/src/main/java/org/apache/wayang/api/jdbc/WayangStatement.java @@ -0,0 +1,326 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.wayang.api.jdbc; + +import org.apache.wayang.basic.data.Record; + +import java.sql.*; +import java.util.Collection; + +/** + * JDBC Statement implementation for Apache Wayang. + * + * This is the class that actually executes SQL queries by delegating to + * SqlContext.executeSql() and wrapping results in a WayangResultSet. + * + * Flow: tool calls statement.executeQuery("SELECT ...") -> calls + * sqlContext.executeSql(sql) -> gets back Collection + * -> wraps in WayangResultSet -> returns to tool + */ +public class WayangStatement implements Statement { + + /** + * The connection that created this statement + */ + private final WayangConnection connection; + + /** + * Whether this statement has been closed + */ + private boolean closed = false; + + /** + * The last ResultSet produced by this statement + */ + private ResultSet currentResultSet = null; + + /** + * Query timeout in seconds (0 = no limit) + */ + private int queryTimeout = 0; + + /** + * Max rows to return (0 = no limit) + */ + private int maxRows = 0; + + public WayangStatement(final WayangConnection connection) { + this.connection = connection; + } + + /** + * Executes a SQL SELECT query and returns results as a ResultSet. This is + * the core method — it calls SqlContext.executeSql(). + * + * @param sql The SQL query string e.g. "SELECT * FROM orders" + * @return A WayangResultSet containing the query results + */ + @Override + public ResultSet executeQuery(final String sql) throws SQLException { + checkClosed(); + try { + // THIS IS THE KEY LINE — delegate to Wayang's SQL engine + final Collection records = connection.getSqlContext().executeSql(sql); + currentResultSet = new WayangResultSet(records, sql); + return currentResultSet; + } catch (Exception e) { + throw new SQLException("Failed to execute query: " + sql + "\nReason: " + e.getMessage(), e); + } + } + + /** + * Executes a SQL statement. For SELECT queries, use executeQuery instead. + * Wayang is read-only so INSERT/UPDATE/DELETE are not supported. + */ + @Override + public boolean execute(final String sql) throws SQLException { + checkClosed(); + final String trimmed = sql.trim().toUpperCase(); + if (trimmed.startsWith("SELECT")) { + currentResultSet = executeQuery(sql); + return true; // true means a ResultSet was produced + } + throw new SQLFeatureNotSupportedException( + "Wayang only supports SELECT queries. Got: " + sql); + } + + @Override + public int executeUpdate(final String sql) throws SQLException { + throw new SQLFeatureNotSupportedException("Wayang does not support INSERT/UPDATE/DELETE"); + } + + @Override + public int executeUpdate(final String sql, final int autoGeneratedKeys) throws SQLException { + throw new SQLFeatureNotSupportedException("Wayang does not support INSERT/UPDATE/DELETE"); + } + + @Override + public int executeUpdate(final String sql, final int[] columnIndexes) throws SQLException { + throw new SQLFeatureNotSupportedException("Wayang does not support INSERT/UPDATE/DELETE"); + } + + @Override + public int executeUpdate(final String sql, final String[] columnNames) throws SQLException { + throw new SQLFeatureNotSupportedException("Wayang does not support INSERT/UPDATE/DELETE"); + } + + @Override + public ResultSet getResultSet() throws SQLException { + return currentResultSet; + } + + @Override + public int getUpdateCount() throws SQLException { + return -1; // -1 means no update count (we only do SELECT) + } + + @Override + public boolean getMoreResults() throws SQLException { + return false; // Wayang only returns one result set + } + + @Override + public boolean getMoreResults(final int current) throws SQLException { + return false; + } + + @Override + public Connection getConnection() throws SQLException { + return connection; + } + + @Override + public void close() throws SQLException { + closed = true; + if (currentResultSet != null) { + currentResultSet.close(); + currentResultSet = null; + } + } + + @Override + public boolean isClosed() throws SQLException { + return closed; + } + + @Override + public int getQueryTimeout() throws SQLException { + return queryTimeout; + } + + @Override + public void setQueryTimeout(final int seconds) throws SQLException { + this.queryTimeout = seconds; + } + + @Override + public void setFetchSize(final int rows) throws SQLException { + // no-op + } + + @Override + public int getMaxRows() throws SQLException { + return maxRows; + } + + @Override + public void setMaxRows(final int max) throws SQLException { + this.maxRows = max; + } + + @Override + public int getMaxFieldSize() throws SQLException { + return 0; + } + + @Override + public void setMaxFieldSize(final int max) throws SQLException { + // no-op + } + + @Override + public void setEscapeProcessing(final boolean enable) throws SQLException { + // no-op + } + + @Override + public void cancel() throws SQLException { + // no-op + } + + @Override + public SQLWarning getWarnings() throws SQLException { + return null; + } + + @Override + public void clearWarnings() throws SQLException { + // no-op + } + + @Override + public void setCursorName(final String name) throws SQLException { + throw new SQLFeatureNotSupportedException("setCursorName not supported"); + } + + @Override + public ResultSet getGeneratedKeys() throws SQLException { + throw new SQLFeatureNotSupportedException("getGeneratedKeys not supported"); + } + + @Override + public boolean execute(final String sql, final int autoGeneratedKeys) throws SQLException { + return execute(sql); + } + + @Override + public boolean execute(final String sql, final int[] columnIndexes) throws SQLException { + return execute(sql); + } + + @Override + public boolean execute(final String sql, final String[] columnNames) throws SQLException { + return execute(sql); + } + + @Override + public int getResultSetConcurrency() throws SQLException { + return ResultSet.CONCUR_READ_ONLY; + } + + @Override + public int getResultSetType() throws SQLException { + return ResultSet.TYPE_FORWARD_ONLY; + } + + @Override + public void addBatch(final String sql) throws SQLException { + throw new SQLFeatureNotSupportedException("Batch execution not supported"); + } + + @Override + public void clearBatch() throws SQLException { + throw new SQLFeatureNotSupportedException("Batch execution not supported"); + } + + @Override + public int[] executeBatch() throws SQLException { + throw new SQLFeatureNotSupportedException("Batch execution not supported"); + } + + @Override + public int getResultSetHoldability() throws SQLException { + return ResultSet.CLOSE_CURSORS_AT_COMMIT; + } + + @Override + public boolean isPoolable() throws SQLException { + return false; + } + + @Override + public void setPoolable(final boolean poolable) throws SQLException { + // no-op + } + + @Override + public void closeOnCompletion() throws SQLException { + // no-op + } + + @Override + public boolean isCloseOnCompletion() throws SQLException { + return false; + } + + @Override + public int getFetchSize() throws SQLException { + return 0; + } + + @Override + public void setFetchDirection(final int direction) throws SQLException { + // no-op + } + + @Override + public int getFetchDirection() throws SQLException { + return ResultSet.FETCH_FORWARD; + } + + @Override + public T unwrap(final Class iface) throws SQLException { + if (iface.isInstance(this)) { + return iface.cast(this); + } + throw new SQLException("Cannot unwrap to " + iface.getName()); + } + + @Override + public boolean isWrapperFor(final Class iface) throws SQLException { + return iface.isInstance(this); + } + + /** + * Throws SQLException if this statement is already closed + */ + private void checkClosed() throws SQLException { + if (closed) { + throw new SQLException("Statement is closed"); + } + } +} diff --git a/wayang-api/wayang-api-jdbc/src/main/resources/META-INF/services/java.sql.Driver b/wayang-api/wayang-api-jdbc/src/main/resources/META-INF/services/java.sql.Driver new file mode 100644 index 000000000..61d7aaf11 --- /dev/null +++ b/wayang-api/wayang-api-jdbc/src/main/resources/META-INF/services/java.sql.Driver @@ -0,0 +1,15 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to you under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +org.apache.wayang.api.jdbc.WayangDriver diff --git a/wayang-api/wayang-api-jdbc/src/test/java/org/apache/wayang/api/jdbc/WayangDriverTest.java b/wayang-api/wayang-api-jdbc/src/test/java/org/apache/wayang/api/jdbc/WayangDriverTest.java new file mode 100644 index 000000000..5907d370c --- /dev/null +++ b/wayang-api/wayang-api-jdbc/src/test/java/org/apache/wayang/api/jdbc/WayangDriverTest.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.wayang.api.jdbc; + +import org.junit.jupiter.api.Test; +import static org.junit.jupiter.api.Assertions.*; + +import java.sql.Driver; +import java.sql.DriverManager; +import java.util.Properties; + +/** + * Unit tests for the Wayang JDBC Driver. + * Tests that the driver registers correctly and handles URLs properly. + */ +public class WayangDriverTest { + + @Test + public void testDriverRegistered() throws Exception { + // Force class loading which triggers static registration + Class.forName("org.apache.wayang.api.jdbc.WayangDriver"); + + // Check driver is registered with DriverManager + Driver driver = DriverManager.getDriver("jdbc:wayang:test"); + assertNotNull(driver, "WayangDriver should be registered"); + assertInstanceOf(WayangDriver.class, driver); + } + + @Test + public void testAcceptsWayangUrl() throws Exception { + final WayangDriver driver = new WayangDriver(); + assertTrue(driver.acceptsURL("jdbc:wayang:/path/to/config.properties")); + assertTrue(driver.acceptsURL("jdbc:wayang:")); + } + + @Test + public void testRejectsNonWayangUrl() throws Exception { + final WayangDriver driver = new WayangDriver(); + assertFalse(driver.acceptsURL("jdbc:mysql://localhost/db")); + assertFalse(driver.acceptsURL("jdbc:postgresql://localhost/db")); + assertFalse(driver.acceptsURL(null)); + } + + @Test + public void testDriverVersion() { + final WayangDriver driver = new WayangDriver(); + assertEquals(1, driver.getMajorVersion()); + assertEquals(0, driver.getMinorVersion()); + } + + @Test + public void testConnectReturnsNullForNonWayangUrl() throws Exception { + final WayangDriver driver = new WayangDriver(); + // Should return null for non-Wayang URLs (JDBC spec requirement) + assertNull(driver.connect("jdbc:mysql://localhost/db", new Properties())); + } + + @Test + public void testUrlPrefix() { + assertEquals("jdbc:wayang:", WayangDriver.URL_PREFIX); + } +} \ No newline at end of file diff --git a/wayang-api/wayang-api-jdbc/src/test/java/org/apache/wayang/api/jdbc/WayangJdbcIntegrationTest.java b/wayang-api/wayang-api-jdbc/src/test/java/org/apache/wayang/api/jdbc/WayangJdbcIntegrationTest.java new file mode 100644 index 000000000..551204339 --- /dev/null +++ b/wayang-api/wayang-api-jdbc/src/test/java/org/apache/wayang/api/jdbc/WayangJdbcIntegrationTest.java @@ -0,0 +1,221 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.wayang.api.jdbc; + +import java.io.IOException; +import java.sql.DatabaseMetaData; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.ResultSetMetaData; +import java.sql.SQLException; +import java.sql.Statement; + +import org.apache.wayang.api.sql.calcite.utils.ModelParser; +import org.apache.wayang.api.sql.context.SqlContext; +import org.apache.wayang.core.api.Configuration; +import org.json.simple.parser.ParseException; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; + +public class WayangJdbcIntegrationTest { + + private SqlContext sqlContext; + + @BeforeEach + public void setUp() throws Exception { + sqlContext = createSqlContext("/data/exampleInt.csv"); + } + + private WayangConnection createTestConnection() throws SQLException { + return new WayangConnection("jdbc:wayang:test", sqlContext, null); + } + + @Test + public void testConnectionNotClosed() throws Exception { + final WayangConnection conn = createTestConnection(); + assertFalse(conn.isClosed()); + assertNotNull(conn.getMetaData()); + assertEquals("wayang", conn.getCatalog()); + conn.close(); + } + + @Test + public void testConnectionClose() throws Exception { + final WayangConnection conn = createTestConnection(); + assertFalse(conn.isClosed()); + conn.close(); + assertTrue(conn.isClosed()); + } + + @Test + public void testCreateStatement() throws Exception { + final WayangConnection conn = createTestConnection(); + final Statement stmt = conn.createStatement(); + assertNotNull(stmt); + assertFalse(stmt.isClosed()); + stmt.close(); + conn.close(); + } + + @Test + public void testStatementExecuteQueryReturnsResultSet() throws Exception { + final WayangConnection conn = createTestConnection(); + final Statement stmt = conn.createStatement(); + final ResultSet rs = stmt.executeQuery("SELECT * FROM fs.exampleInt"); + assertNotNull(rs); + rs.close(); + stmt.close(); + conn.close(); + } + + @Test + public void testResultSetHasRows() throws Exception { + final WayangConnection conn = createTestConnection(); + final Statement stmt = conn.createStatement(); + final ResultSet rs = stmt.executeQuery("SELECT * FROM fs.exampleInt"); + int rowCount = 0; + while (rs.next()) { + rowCount++; + } + assertTrue(rowCount > 0, "ResultSet should have at least one row"); + rs.close(); + stmt.close(); + conn.close(); + } + + @Test + public void testResultSetMetaData() throws Exception { + final WayangConnection conn = createTestConnection(); + final Statement stmt = conn.createStatement(); + final ResultSet rs = stmt.executeQuery("SELECT * FROM fs.exampleInt"); + final ResultSetMetaData meta = rs.getMetaData(); + assertNotNull(meta); + assertTrue(meta.getColumnCount() > 0); + rs.close(); + stmt.close(); + conn.close(); + } + + @Test + public void testResultSetColumnNames() throws Exception { + final WayangConnection conn = createTestConnection(); + final Statement stmt = conn.createStatement(); + final ResultSet rs = stmt.executeQuery("SELECT * FROM fs.exampleInt"); + final ResultSetMetaData meta = rs.getMetaData(); + assertTrue(meta.getColumnCount() >= 1); + rs.close(); + stmt.close(); + conn.close(); + } + + @Test + public void testPreparedStatementBasic() throws Exception { + final WayangConnection conn = createTestConnection(); + final PreparedStatement ps = conn.prepareStatement("SELECT * FROM fs.exampleInt"); + assertNotNull(ps); + final ResultSet rs = ps.executeQuery(); + assertNotNull(rs); + rs.close(); + ps.close(); + conn.close(); + } + + @Test + public void testDatabaseMetaData() throws Exception { + final WayangConnection conn = createTestConnection(); + final DatabaseMetaData meta = conn.getMetaData(); + assertNotNull(meta); + assertEquals("Apache Wayang", meta.getDatabaseProductName()); + assertEquals("Wayang JDBC Driver", meta.getDriverName()); + assertEquals(4, meta.getJDBCMajorVersion()); + conn.close(); + } + + @Test + public void testDatabaseMetaDataSupportsSelect() throws Exception { + final WayangConnection conn = createTestConnection(); + final DatabaseMetaData meta = conn.getMetaData(); + assertTrue(meta.supportsANSI92EntryLevelSQL()); + assertTrue(meta.supportsGroupBy()); + assertTrue(meta.supportsOuterJoins()); + conn.close(); + } + + @Test + public void testDriverAcceptsUrl() throws Exception { + final WayangDriver driver = new WayangDriver(); + assertTrue(driver.acceptsURL("jdbc:wayang:/path/to/config")); + assertFalse(driver.acceptsURL("jdbc:mysql://localhost/db")); + } + + @Test + public void testStatementOnClosedConnectionThrows() throws Exception { + final WayangConnection conn = createTestConnection(); + conn.close(); + assertThrows(SQLException.class, () -> conn.createStatement()); + } + + @Test + public void testClosedStatementThrows() throws Exception { + final WayangConnection conn = createTestConnection(); + final Statement stmt = conn.createStatement(); + stmt.close(); + assertThrows(SQLException.class, + () -> stmt.executeQuery("SELECT * FROM fs.exampleInt")); + conn.close(); + } + + private SqlContext createSqlContext(final String tableResourceName) + throws IOException, ParseException, SQLException { + final String calciteModel = "{\r\n" + + " \"calcite\": {\r\n" + + " \"version\": \"1.0\",\r\n" + + " \"defaultSchema\": \"wayang\",\r\n" + + " \"schemas\": [\r\n" + + " {\r\n" + + " \"name\": \"fs\",\r\n" + + " \"type\": \"custom\",\r\n" + + " \"factory\": \"org.apache.calcite.adapter.file.FileSchemaFactory\",\r\n" + + " \"operand\": {\r\n" + + " \"directory\": \"" + "/" + + this.getClass().getResource("/data").getPath() + "\"\r\n" + + " }\r\n" + + " }\r\n" + + " ]\r\n" + + " }\r\n" + + " }"; + final JsonNode calciteModelJSON = new ObjectMapper().readTree(calciteModel); + final Configuration configuration = new ModelParser( + new Configuration(), calciteModelJSON).setProperties(); + assertNotNull(configuration); + final String dataPath = this.getClass().getResource(tableResourceName).getPath(); + assertNotNull(dataPath); + configuration.setProperty("wayang.fs.table.url", dataPath); + configuration.setProperty("wayang.ml.executions.file", "mle.txt"); + configuration.setProperty("wayang.ml.optimizations.file", "mlo.txt"); + configuration.setProperty("wayang.ml.experience.enabled", "false"); + return new SqlContext(configuration); + } +} diff --git a/wayang-api/wayang-api-jdbc/src/test/resources/data/exampleInt.csv b/wayang-api/wayang-api-jdbc/src/test/resources/data/exampleInt.csv new file mode 100644 index 000000000..bab9a6170 --- /dev/null +++ b/wayang-api/wayang-api-jdbc/src/test/resources/data/exampleInt.csv @@ -0,0 +1,6 @@ +id:int,name:string,value:int +1;Alice;100 +2;Bob;200 +3;Charlie;300 +4;Diana;400 +5;Eve;500 diff --git a/wayang-api/wayang-api-jdbc/src/test/resources/data/exampleSmallA.csv b/wayang-api/wayang-api-jdbc/src/test/resources/data/exampleSmallA.csv new file mode 100644 index 000000000..8140a8faa --- /dev/null +++ b/wayang-api/wayang-api-jdbc/src/test/resources/data/exampleSmallA.csv @@ -0,0 +1,3 @@ +COLA:string,COLB:string +item1;item2 +item1;item2 \ No newline at end of file diff --git a/wayang-api/wayang-api-jdbc/src/test/resources/data/exampleSort.csv b/wayang-api/wayang-api-jdbc/src/test/resources/data/exampleSort.csv new file mode 100644 index 000000000..836f09bbe --- /dev/null +++ b/wayang-api/wayang-api-jdbc/src/test/resources/data/exampleSort.csv @@ -0,0 +1,9 @@ +col1:int,col2:string,col3:string +0;a;a +0;b;b +0;a;b +2;a;a +2;a;a +1;a;a +1;b;b +1;a;b \ No newline at end of file diff --git a/wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/context/SqlContext.java b/wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/context/SqlContext.java index 3b8209979..8a44a5be0 100755 --- a/wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/context/SqlContext.java +++ b/wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/context/SqlContext.java @@ -67,6 +67,8 @@ public class SqlContext extends WayangContext { private static final AtomicInteger jobId = new AtomicInteger(0); private final CalciteSchema calciteSchema; + private final Configuration originalConfiguration; + public SqlContext() throws SQLException { this(new Configuration()); @@ -80,6 +82,7 @@ public SqlContext(final Configuration configuration) throws SQLException { this.withPlugin(Postgres.plugin()); calciteSchema = SchemaUtils.getSchema(configuration); + this.originalConfiguration = configuration; } public SqlContext(final Configuration configuration, final List plugins) throws SQLException { @@ -88,6 +91,7 @@ public SqlContext(final Configuration configuration, final List plugins) for (final Plugin plugin : plugins) { this.withPlugin(plugin); } + this.originalConfiguration = configuration; calciteSchema = SchemaUtils.getSchema(configuration); } @@ -215,7 +219,7 @@ public Collection executeSql(final String sql) throws SqlParseException PrintUtils.print("After translating logical intermediate plan", wayangRel); final Collection collector = new ArrayList<>(); - final WayangPlan wayangPlan = Optimizer.convert(wayangRel, collector); + final WayangPlan wayangPlan = Optimizer.convertWithConfig(wayangRel, this.originalConfiguration != null ? this.originalConfiguration : this.getConfiguration(), collector); this.execute(getJobName(), wayangPlan); @@ -227,3 +231,8 @@ private static String getJobName() { } } + + + + +