Skip to content

Commit

Permalink
[feature](jdbc catalog) support gbase jdbc catalog
Browse files Browse the repository at this point in the history
  • Loading branch information
zy-kkk committed Sep 23, 2024
1 parent 00be77e commit bdfc07a
Show file tree
Hide file tree
Showing 9 changed files with 428 additions and 1 deletion.
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.jdbc;

import org.apache.doris.common.jni.vec.ColumnType;
import org.apache.doris.common.jni.vec.ColumnType.Type;
import org.apache.doris.common.jni.vec.ColumnValueConverter;

import java.math.BigDecimal;
import java.sql.Date;
import java.sql.SQLException;
import java.sql.Timestamp;
import java.util.Objects;

public class GbaseJdbcExecutor extends BaseJdbcExecutor {

public GbaseJdbcExecutor(byte[] thriftParams) throws Exception {
super(thriftParams);
}

@Override
protected Object getColumnValue(int columnIndex, ColumnType type, String[] replaceStringList) throws SQLException {
switch (type.getType()) {
case TINYINT:
byte tinyIntVal = resultSet.getByte(columnIndex + 1);
return resultSet.wasNull() ? null : tinyIntVal;
case SMALLINT:
short smallIntVal = resultSet.getShort(columnIndex + 1);
return resultSet.wasNull() ? null : smallIntVal;
case INT:
int intVal = resultSet.getInt(columnIndex + 1);
return resultSet.wasNull() ? null : intVal;
case BIGINT:
long bigIntVal = resultSet.getLong(columnIndex + 1);
return resultSet.wasNull() ? null : bigIntVal;
case FLOAT:
float floatVal = resultSet.getFloat(columnIndex + 1);
return resultSet.wasNull() ? null : floatVal;
case DOUBLE:
double doubleVal = resultSet.getDouble(columnIndex + 1);
return resultSet.wasNull() ? null : doubleVal;
case DECIMALV2:
case DECIMAL32:
case DECIMAL64:
case DECIMAL128:
BigDecimal decimalVal = resultSet.getBigDecimal(columnIndex + 1);
return resultSet.wasNull() ? null : decimalVal;
case DATE:
case DATEV2:
Date dateVal = resultSet.getDate(columnIndex + 1);
return resultSet.wasNull() ? null : dateVal.toLocalDate();
case DATETIME:
case DATETIMEV2:
Timestamp timestampVal = resultSet.getTimestamp(columnIndex + 1);
return resultSet.wasNull() ? null : timestampVal.toLocalDateTime();
case CHAR:
case VARCHAR:
case STRING:
String stringVal = resultSet.getString(columnIndex + 1);
return resultSet.wasNull() ? null : stringVal;
default:
throw new IllegalArgumentException("Unsupported column type: " + type.getType());
}
}

@Override
protected ColumnValueConverter getOutputConverter(ColumnType columnType, String replaceString) {
if (Objects.requireNonNull(columnType.getType()) == Type.CHAR) {
return createConverter(
input -> trimSpaces(input.toString()), String.class);
}
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ public static String getExecutorClass(TOdbcTableType type) {
case TRINO:
case PRESTO:
return "org/apache/doris/jdbc/TrinoJdbcExecutor";
case GBASE:
return "org/apache/doris/jdbc/GbaseJdbcExecutor";
default:
throw new IllegalArgumentException("Unsupported jdbc type: " + type);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ public class JdbcResource extends Resource {
public static final String JDBC_PRESTO = "jdbc:presto";
public static final String JDBC_OCEANBASE = "jdbc:oceanbase";
public static final String JDBC_DB2 = "jdbc:db2";
public static final String JDBC_GBASE = "jdbc:gbase";

public static final String MYSQL = "MYSQL";
public static final String POSTGRESQL = "POSTGRESQL";
Expand All @@ -89,6 +90,7 @@ public class JdbcResource extends Resource {
public static final String OCEANBASE = "OCEANBASE";
public static final String OCEANBASE_ORACLE = "OCEANBASE_ORACLE";
public static final String DB2 = "DB2";
public static final String GBASE = "GBASE";

public static final String JDBC_PROPERTIES_PREFIX = "jdbc.";
public static final String JDBC_URL = "jdbc_url";
Expand Down Expand Up @@ -331,6 +333,8 @@ public static String parseDbType(String url) throws DdlException {
return OCEANBASE;
} else if (url.startsWith(JDBC_DB2)) {
return DB2;
} else if (url.startsWith(JDBC_GBASE)) {
return GBASE;
}
throw new DdlException("Unsupported jdbc database type, please check jdbcUrl: " + url);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ public class JdbcTable extends Table {
tempMap.put("oceanbase", TOdbcTableType.OCEANBASE);
tempMap.put("oceanbase_oracle", TOdbcTableType.OCEANBASE_ORACLE);
tempMap.put("db2", TOdbcTableType.DB2);
tempMap.put("gbase", TOdbcTableType.GBASE);
TABLE_TYPE_MAP = Collections.unmodifiableMap(tempMap);
}

Expand Down Expand Up @@ -464,6 +465,7 @@ public static String databaseProperName(TOdbcTableType tableType, String name) {
switch (tableType) {
case MYSQL:
case OCEANBASE:
case GBASE:
return formatName(name, "`", "`", false, false);
case SQLSERVER:
return formatName(name, "[", "]", false, false);
Expand All @@ -486,6 +488,7 @@ public static String properNameWithRemoteName(TOdbcTableType tableType, String r
switch (tableType) {
case MYSQL:
case OCEANBASE:
case GBASE:
return formatNameWithRemoteName(remoteName, "`", "`");
case SQLSERVER:
return formatNameWithRemoteName(remoteName, "[", "]");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,8 @@ public static JdbcClient createJdbcClient(JdbcClientConfig jdbcClientConfig) {
return new JdbcTrinoClient(jdbcClientConfig);
case JdbcResource.DB2:
return new JdbcDB2Client(jdbcClientConfig);
case JdbcResource.GBASE:
return new JdbcGbaseClient(jdbcClientConfig);
default:
throw new IllegalArgumentException("Unsupported DB type: " + dbType);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.datasource.jdbc.client;

import org.apache.doris.catalog.PrimitiveType;
import org.apache.doris.catalog.ScalarType;
import org.apache.doris.catalog.Type;
import org.apache.doris.common.util.Util;
import org.apache.doris.datasource.jdbc.util.JdbcFieldSchema;

import com.google.common.collect.Lists;

import java.sql.Connection;
import java.sql.DatabaseMetaData;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Types;
import java.util.List;
import java.util.function.Consumer;

public class JdbcGbaseClient extends JdbcClient {

protected JdbcGbaseClient(JdbcClientConfig jdbcClientConfig) {
super(jdbcClientConfig);
}

@Override
public List<String> getDatabaseNameList() {
Connection conn = getConnection();
ResultSet rs = null;
List<String> remoteDatabaseNames = Lists.newArrayList();
try {
if (isOnlySpecifiedDatabase && includeDatabaseMap.isEmpty() && excludeDatabaseMap.isEmpty()) {
String currentDatabase = conn.getCatalog();
remoteDatabaseNames.add(currentDatabase);
} else {
rs = conn.getMetaData().getCatalogs();
while (rs.next()) {
remoteDatabaseNames.add(rs.getString("TABLE_CAT"));
}
}
} catch (SQLException e) {
throw new JdbcClientException("failed to get database name list from jdbc", e);
} finally {
close(rs, conn);
}
return filterDatabaseNames(remoteDatabaseNames);
}

@Override
protected void processTable(String remoteDbName, String remoteTableName, String[] tableTypes,
Consumer<ResultSet> resultSetConsumer) {
Connection conn = null;
ResultSet rs = null;
try {
conn = super.getConnection();
DatabaseMetaData databaseMetaData = conn.getMetaData();
rs = databaseMetaData.getTables(remoteDbName, null, remoteTableName, tableTypes);
resultSetConsumer.accept(rs);
} catch (SQLException e) {
throw new JdbcClientException("Failed to process table", e);
} finally {
close(rs, conn);
}
}

@Override
protected ResultSet getRemoteColumns(DatabaseMetaData databaseMetaData, String catalogName, String remoteDbName,
String remoteTableName) throws SQLException {
return databaseMetaData.getColumns(remoteDbName, null, remoteTableName, null);
}

@Override
public List<JdbcFieldSchema> getJdbcColumnsInfo(String localDbName, String localTableName) {
Connection conn = getConnection();
ResultSet rs = null;
List<JdbcFieldSchema> tableSchema = Lists.newArrayList();
String remoteDbName = getRemoteDatabaseName(localDbName);
String remoteTableName = getRemoteTableName(localDbName, localTableName);
try {
DatabaseMetaData databaseMetaData = conn.getMetaData();
String catalogName = getCatalogName(conn);
rs = getRemoteColumns(databaseMetaData, catalogName, remoteDbName, remoteTableName);
while (rs.next()) {
JdbcFieldSchema field = new JdbcFieldSchema(rs);
tableSchema.add(field);
}
} catch (SQLException e) {
throw new JdbcClientException("failed to get jdbc columns info for remote table `%s.%s`: %s",
remoteDbName, remoteTableName, Util.getRootCauseMessage(e));
} finally {
close(rs, conn);
}
return tableSchema;
}

@Override
protected Type jdbcTypeToDoris(JdbcFieldSchema fieldSchema) {
switch (fieldSchema.getDataType()) {
case Types.TINYINT:
return Type.TINYINT;
case Types.SMALLINT:
return Type.SMALLINT;
case Types.INTEGER:
return Type.INT;
case Types.BIGINT:
return Type.BIGINT;
case Types.FLOAT:
case Types.REAL:
return Type.FLOAT;
case Types.DOUBLE:
return Type.DOUBLE;
case Types.NUMERIC:
case Types.DECIMAL: {
int precision = fieldSchema.getColumnSize()
.orElseThrow(() -> new IllegalArgumentException("Precision not present"));
int scale = fieldSchema.getDecimalDigits()
.orElseThrow(() -> new JdbcClientException("Scale not present"));
return createDecimalOrStringType(precision, scale);
}
case Types.DATE:
return Type.DATEV2;
case Types.TIMESTAMP: {
int scale = fieldSchema.getDecimalDigits().orElse(0);
if (scale > 6) {
scale = 6;
}
return ScalarType.createDatetimeV2Type(scale);
}
case Types.TIME:
case Types.CHAR:
ScalarType charType = ScalarType.createType(PrimitiveType.CHAR);
charType.setLength(fieldSchema.getColumnSize()
.orElseThrow(() -> new IllegalArgumentException("Length not present")));
return charType;
case Types.VARCHAR:
case Types.LONGVARCHAR:
return ScalarType.createStringType();
default:
return Type.UNSUPPORTED;
}
}
}
3 changes: 2 additions & 1 deletion gensrc/thrift/Types.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -419,7 +419,8 @@ enum TOdbcTableType {
OCEANBASE,
OCEANBASE_ORACLE,
NEBULA, // Deprecated
DB2
DB2,
GBASE
}

struct TJdbcExecutorCtorParams {
Expand Down

Large diffs are not rendered by default.

Loading

0 comments on commit bdfc07a

Please sign in to comment.