diff --git a/seatunnel-connectors-v2/connector-hbase/pom.xml b/seatunnel-connectors-v2/connector-hbase/pom.xml
index 663bdcfdd30..bda49ade0ec 100644
--- a/seatunnel-connectors-v2/connector-hbase/pom.xml
+++ b/seatunnel-connectors-v2/connector-hbase/pom.xml
@@ -47,6 +47,12 @@
${hbase.version}
+
+ org.apache.seatunnel
+ seatunnel-format-json
+ ${project.version}
+
+
diff --git a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/catalog/HbaseCatalog.java b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/catalog/HbaseCatalog.java
new file mode 100644
index 00000000000..f6a48150732
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/catalog/HbaseCatalog.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.hbase.catalog;
+
+import org.apache.seatunnel.api.configuration.util.ConfigUtil;
+import org.apache.seatunnel.api.table.catalog.Catalog;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.InfoPreviewResult;
+import org.apache.seatunnel.api.table.catalog.PreviewResult;
+import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.api.table.catalog.exception.CatalogException;
+import org.apache.seatunnel.api.table.catalog.exception.DatabaseAlreadyExistException;
+import org.apache.seatunnel.api.table.catalog.exception.DatabaseNotExistException;
+import org.apache.seatunnel.api.table.catalog.exception.TableAlreadyExistException;
+import org.apache.seatunnel.api.table.catalog.exception.TableNotExistException;
+import org.apache.seatunnel.connectors.seatunnel.hbase.client.HbaseClient;
+import org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseParameters;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/** Hbase catalog implementation. */
+@Slf4j
+public class HbaseCatalog implements Catalog {
+
+ private final String catalogName;
+ private final String defaultDatabase;
+ private final HbaseParameters hbaseParameters;
+
+ private HbaseClient hbaseClient;
+
+ public HbaseCatalog(
+ String catalogName, String defaultDatabase, HbaseParameters hbaseParameters) {
+ this.catalogName = checkNotNull(catalogName, "catalogName cannot be null");
+ this.defaultDatabase = defaultDatabase;
+ this.hbaseParameters = checkNotNull(hbaseParameters, "Hbase Config cannot be null");
+ }
+
+ @Override
+ public void open() throws CatalogException {
+ try {
+ hbaseClient = HbaseClient.createInstance(hbaseParameters);
+ } catch (Exception e) {
+ throw new CatalogException(String.format("Failed to open catalog %s", catalogName), e);
+ }
+ }
+
+ @Override
+ public void close() throws CatalogException {
+ hbaseClient.close();
+ }
+
+ @Override
+ public String name() {
+ return catalogName;
+ }
+
+ @Override
+ public String getDefaultDatabase() throws CatalogException {
+ return defaultDatabase;
+ }
+
+ @Override
+ public boolean databaseExists(String databaseName) throws CatalogException {
+ return hbaseClient.databaseExists(databaseName);
+ }
+
+ @Override
+ public List listDatabases() throws CatalogException {
+ return hbaseClient.listDatabases();
+ }
+
+ @Override
+ public List listTables(String databaseName)
+ throws CatalogException, DatabaseNotExistException {
+ if (!databaseExists(databaseName)) {
+ throw new DatabaseNotExistException(catalogName, databaseName);
+ }
+ return hbaseClient.listTables(databaseName);
+ }
+
+ @Override
+ public boolean tableExists(TablePath tablePath) throws CatalogException {
+ checkNotNull(tablePath);
+ return hbaseClient.tableExists(tablePath.getTableName());
+ }
+
+ @Override
+ public CatalogTable getTable(TablePath tablePath)
+ throws CatalogException, TableNotExistException {
+ throw new UnsupportedOperationException("Not implement");
+ }
+
+ @Override
+ public void createTable(TablePath tablePath, CatalogTable table, boolean ignoreIfExists)
+ throws TableAlreadyExistException, DatabaseNotExistException, CatalogException {
+ checkNotNull(tablePath, "tablePath cannot be null");
+ hbaseClient.createTable(
+ tablePath.getDatabaseName(),
+ tablePath.getTableName(),
+ hbaseParameters.getFamilyNames().values().stream()
+ .filter(value -> !"all_columns".equals(value))
+ .collect(Collectors.toList()),
+ ignoreIfExists);
+ }
+
+ @Override
+ public void dropTable(TablePath tablePath, boolean ignoreIfNotExists)
+ throws TableNotExistException, CatalogException {
+ checkNotNull(tablePath);
+ if (!tableExists(tablePath) && !ignoreIfNotExists) {
+ throw new TableNotExistException(catalogName, tablePath);
+ }
+ hbaseClient.dropTable(tablePath.getDatabaseName(), tablePath.getTableName());
+ }
+
+ @Override
+ public void createDatabase(TablePath tablePath, boolean ignoreIfExists)
+ throws DatabaseAlreadyExistException, CatalogException {
+ if (databaseExists(tablePath.getDatabaseName()) && !ignoreIfExists) {
+ throw new DatabaseAlreadyExistException(catalogName, tablePath.getDatabaseName());
+ }
+ hbaseClient.createNamespace(tablePath.getDatabaseName());
+ }
+
+ @Override
+ public void dropDatabase(TablePath tablePath, boolean ignoreIfNotExists)
+ throws DatabaseNotExistException, CatalogException {
+ if (!databaseExists(tablePath.getDatabaseName()) && !ignoreIfNotExists) {
+ throw new DatabaseNotExistException(catalogName, tablePath.getDatabaseName());
+ }
+ hbaseClient.deleteNamespace(tablePath.getDatabaseName());
+ }
+
+ @Override
+ public void truncateTable(TablePath tablePath, boolean ignoreIfNotExists) {
+ if (!tableExists(tablePath) && !ignoreIfNotExists) {
+ throw new TableNotExistException(catalogName, tablePath);
+ }
+ hbaseClient.truncateTable(tablePath.getDatabaseName(), tablePath.getTableName());
+ }
+
+ @Override
+ public boolean isExistsData(TablePath tablePath) {
+ return hbaseClient.isExistsData(tablePath.getDatabaseName(), tablePath.getTableName());
+ }
+
+ private Map buildTableOptions(TablePath tablePath) {
+ Map options = new HashMap<>();
+ options.put("connector", "hbase");
+ options.put("config", ConfigUtil.convertToJsonString(tablePath));
+ return options;
+ }
+
+ @Override
+ public PreviewResult previewAction(
+ ActionType actionType, TablePath tablePath, Optional catalogTable) {
+ if (actionType == ActionType.CREATE_TABLE) {
+ return new InfoPreviewResult("create index " + tablePath.getTableName());
+ } else if (actionType == ActionType.DROP_TABLE) {
+ return new InfoPreviewResult("delete index " + tablePath.getTableName());
+ } else if (actionType == ActionType.TRUNCATE_TABLE) {
+ return new InfoPreviewResult("delete and create index " + tablePath.getTableName());
+ } else if (actionType == ActionType.CREATE_DATABASE) {
+ return new InfoPreviewResult("create index " + tablePath.getTableName());
+ } else if (actionType == ActionType.DROP_DATABASE) {
+ return new InfoPreviewResult("delete index " + tablePath.getTableName());
+ } else {
+ throw new UnsupportedOperationException("Unsupported action type: " + actionType);
+ }
+ }
+}
diff --git a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/catalog/HbaseCatalogFactory.java b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/catalog/HbaseCatalogFactory.java
new file mode 100644
index 00000000000..b9a3fc25fd4
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/catalog/HbaseCatalogFactory.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.hbase.catalog;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.table.catalog.Catalog;
+import org.apache.seatunnel.api.table.factory.CatalogFactory;
+import org.apache.seatunnel.api.table.factory.Factory;
+import org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseParameters;
+import org.apache.seatunnel.connectors.seatunnel.hbase.constant.HbaseIdentifier;
+
+import com.google.auto.service.AutoService;
+
+@AutoService(Factory.class)
+public class HbaseCatalogFactory implements CatalogFactory {
+
+ @Override
+ public Catalog createCatalog(String catalogName, ReadonlyConfig options) {
+ // Create an instance of HbaseCatalog, passing in the catalog name, namespace, and Hbase
+ // parameters
+ HbaseParameters hbaseParameters = HbaseParameters.buildWithConfig(options);
+ return new HbaseCatalog(catalogName, hbaseParameters.getNamespace(), hbaseParameters);
+ }
+
+ @Override
+ public String factoryIdentifier() {
+ return HbaseIdentifier.IDENTIFIER_NAME;
+ }
+
+ @Override
+ public OptionRule optionRule() {
+ return OptionRule.builder().build();
+ }
+}
diff --git a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/client/HbaseClient.java b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/client/HbaseClient.java
new file mode 100644
index 00000000000..aec64bf7cf4
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/client/HbaseClient.java
@@ -0,0 +1,393 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.hbase.client;
+
+import org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseParameters;
+import org.apache.seatunnel.connectors.seatunnel.hbase.exception.HbaseConnectorErrorCode;
+import org.apache.seatunnel.connectors.seatunnel.hbase.exception.HbaseConnectorException;
+import org.apache.seatunnel.connectors.seatunnel.hbase.source.HbaseSourceSplit;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.NamespaceDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.BufferedMutator;
+import org.apache.hadoop.hbase.client.BufferedMutatorParams;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.RegionLocator;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
+import org.apache.hadoop.hbase.util.Bytes;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.apache.seatunnel.connectors.seatunnel.hbase.exception.HbaseConnectorErrorCode.CONNECTION_FAILED_FOR_ADMIN;
+
+@Slf4j
+public class HbaseClient {
+
+ private final Connection connection;
+ private final Admin admin;
+ private final BufferedMutator hbaseMutator;
+ public static Configuration hbaseConfiguration;
+
+ /**
+ * Constructor for HbaseClient.
+ *
+ * @param connection Hbase connection
+ * @param hbaseParameters Hbase parameters
+ */
+ private HbaseClient(Connection connection, HbaseParameters hbaseParameters) {
+ this.connection = connection;
+ try {
+ this.admin = connection.getAdmin();
+
+ BufferedMutatorParams bufferedMutatorParams =
+ new BufferedMutatorParams(
+ TableName.valueOf(
+ hbaseParameters.getNamespace(),
+ hbaseParameters.getTable()))
+ .pool(HTable.getDefaultExecutor(hbaseConfiguration))
+ .writeBufferSize(hbaseParameters.getWriteBufferSize());
+ hbaseMutator = connection.getBufferedMutator(bufferedMutatorParams);
+ } catch (IOException e) {
+ throw new HbaseConnectorException(
+ CONNECTION_FAILED_FOR_ADMIN, CONNECTION_FAILED_FOR_ADMIN.getDescription(), e);
+ }
+ }
+
+ /**
+ * Create a new instance of HbaseClient.
+ *
+ * @param hbaseParameters Hbase parameters
+ * @return HbaseClient
+ */
+ public static HbaseClient createInstance(HbaseParameters hbaseParameters) {
+ return new HbaseClient(getHbaseConnection(hbaseParameters), hbaseParameters);
+ }
+
+ /**
+ * Get Hbase connection.
+ *
+ * @param hbaseParameters Hbase parameters
+ * @return Hbase connection
+ */
+ private static Connection getHbaseConnection(HbaseParameters hbaseParameters) {
+ hbaseConfiguration = HBaseConfiguration.create();
+ hbaseConfiguration.set("hbase.zookeeper.quorum", hbaseParameters.getZookeeperQuorum());
+ if (hbaseParameters.getHbaseExtraConfig() != null) {
+ hbaseParameters.getHbaseExtraConfig().forEach(hbaseConfiguration::set);
+ }
+ try {
+ Connection connection = ConnectionFactory.createConnection(hbaseConfiguration);
+ return connection;
+ } catch (IOException e) {
+ String errorMsg = "Build Hbase connection failed.";
+ throw new HbaseConnectorException(
+ HbaseConnectorErrorCode.CONNECTION_FAILED, errorMsg, e);
+ }
+ }
+
+ /**
+ * Check if a database exists.
+ *
+ * @param databaseName database name
+ * @return true if the database exists, false otherwise
+ */
+ public boolean databaseExists(String databaseName) {
+ try {
+ return Arrays.stream(admin.listNamespaceDescriptors())
+ .anyMatch(descriptor -> descriptor.getName().equals(databaseName));
+ } catch (IOException e) {
+ throw new HbaseConnectorException(
+ HbaseConnectorErrorCode.DATABASE_QUERY_EXCEPTION,
+ HbaseConnectorErrorCode.DATABASE_QUERY_EXCEPTION.getErrorMessage(),
+ e);
+ }
+ }
+
+ /**
+ * List all databases.
+ *
+ * @return List of database names
+ */
+ public List listDatabases() {
+ try {
+ return Arrays.stream(admin.listNamespaceDescriptors())
+ .map(NamespaceDescriptor::getName)
+ .collect(Collectors.toList());
+ } catch (IOException e) {
+ throw new HbaseConnectorException(
+ HbaseConnectorErrorCode.DATABASE_QUERY_EXCEPTION,
+ HbaseConnectorErrorCode.DATABASE_QUERY_EXCEPTION.getErrorMessage(),
+ e);
+ }
+ }
+
+ /**
+ * List all tables in a database.
+ *
+ * @param databaseName database name
+ * @return List of table names
+ */
+ public List listTables(String databaseName) {
+ try {
+ return Arrays.stream(admin.listTableNamesByNamespace(databaseName))
+ .map(tableName -> tableName.getNameAsString())
+ .collect(Collectors.toList());
+ } catch (IOException e) {
+ throw new HbaseConnectorException(
+ HbaseConnectorErrorCode.DATABASE_QUERY_EXCEPTION,
+ HbaseConnectorErrorCode.DATABASE_QUERY_EXCEPTION.getErrorMessage(),
+ e);
+ }
+ }
+
+ /**
+ * Check if a table exists.
+ *
+ * @param tableName table name
+ * @return true if the table exists, false otherwise
+ */
+ public boolean tableExists(String tableName) {
+ try {
+ return admin.tableExists(TableName.valueOf(tableName));
+ } catch (IOException e) {
+ throw new HbaseConnectorException(
+ HbaseConnectorErrorCode.TABLE_QUERY_EXCEPTION,
+ HbaseConnectorErrorCode.TABLE_QUERY_EXCEPTION.getErrorMessage(),
+ e);
+ }
+ }
+
+ /**
+ * Create a table.
+ *
+ * @param databaseName database name
+ * @param tableName table name
+ * @param columnFamilies column families
+ * @param ignoreIfExists ignore if the table already exists
+ */
+ public void createTable(
+ String databaseName,
+ String tableName,
+ List columnFamilies,
+ boolean ignoreIfExists) {
+ try {
+ if (!databaseExists(databaseName)) {
+ admin.createNamespace(NamespaceDescriptor.create(databaseName).build());
+ }
+ TableName table = TableName.valueOf(databaseName, tableName);
+ if (tableExists(table.getNameAsString())) {
+ log.info("Table {} already exists.", table.getNameAsString());
+ if (!ignoreIfExists) {
+ throw new HbaseConnectorException(
+ HbaseConnectorErrorCode.TABLE_EXISTS_EXCEPTION,
+ HbaseConnectorErrorCode.TABLE_EXISTS_EXCEPTION.getErrorMessage());
+ }
+ return;
+ }
+ TableDescriptorBuilder hbaseTableDescriptor = TableDescriptorBuilder.newBuilder(table);
+ columnFamilies.forEach(
+ family ->
+ hbaseTableDescriptor.setColumnFamily(
+ ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(family))
+ .build()));
+ admin.createTable(hbaseTableDescriptor.build());
+ } catch (IOException e) {
+ throw new HbaseConnectorException(
+ HbaseConnectorErrorCode.TABLE_CREATE_EXCEPTION,
+ HbaseConnectorErrorCode.TABLE_CREATE_EXCEPTION.getErrorMessage(),
+ e);
+ }
+ }
+
+ /**
+ * Drop a table.
+ *
+ * @param databaseName database name
+ * @param tableName table name
+ */
+ public void dropTable(String databaseName, String tableName) {
+ try {
+ TableName table = TableName.valueOf(databaseName, tableName);
+ admin.disableTable(table);
+ admin.deleteTable(table);
+ } catch (IOException e) {
+ throw new HbaseConnectorException(
+ HbaseConnectorErrorCode.TABLE_DELETE_EXCEPTION,
+ HbaseConnectorErrorCode.TABLE_DELETE_EXCEPTION.getErrorMessage(),
+ e);
+ }
+ }
+
+ /**
+ * Create a namespace.
+ *
+ * @param namespace namespace name
+ */
+ public void createNamespace(String namespace) {
+ try {
+ admin.createNamespace(NamespaceDescriptor.create(namespace).build());
+ } catch (IOException e) {
+ throw new HbaseConnectorException(
+ HbaseConnectorErrorCode.NAMESPACE_CREATE_EXCEPTION,
+ HbaseConnectorErrorCode.NAMESPACE_CREATE_EXCEPTION.getErrorMessage(),
+ e);
+ }
+ }
+
+ /**
+ * Drop a namespace.
+ *
+ * @param namespace namespace name
+ */
+ public void deleteNamespace(String namespace) {
+ try {
+ admin.deleteNamespace(namespace);
+ } catch (IOException e) {
+ throw new HbaseConnectorException(
+ HbaseConnectorErrorCode.NAMESPACE_DELETE_EXCEPTION,
+ HbaseConnectorErrorCode.NAMESPACE_DELETE_EXCEPTION.getErrorMessage(),
+ e);
+ }
+ }
+
+ /**
+ * Truncate a table.
+ *
+ * @param databaseName database name
+ * @param tableName table name
+ */
+ public void truncateTable(String databaseName, String tableName) {
+ try {
+ TableName table = TableName.valueOf(databaseName, tableName);
+ admin.disableTable(table);
+ admin.truncateTable(table, true);
+ } catch (IOException e) {
+ throw new HbaseConnectorException(
+ HbaseConnectorErrorCode.TABLE_TRUNCATE_EXCEPTION,
+ HbaseConnectorErrorCode.TABLE_TRUNCATE_EXCEPTION.getErrorMessage(),
+ e);
+ }
+ }
+
+ /**
+ * Check if a table has data.
+ *
+ * @param databaseName database name
+ * @param tableName table name
+ * @return true if the table has data, false otherwise
+ */
+ public boolean isExistsData(String databaseName, String tableName) {
+ try {
+ Table table = connection.getTable(TableName.valueOf(databaseName, tableName));
+ Scan scan = new Scan();
+ scan.setCaching(1);
+ scan.setLimit(1);
+ try (ResultScanner scanner = table.getScanner(scan)) {
+ Result result = scanner.next();
+ return !result.isEmpty();
+ }
+ } catch (IOException e) {
+ throw new HbaseConnectorException(
+ HbaseConnectorErrorCode.TABLE_QUERY_EXCEPTION,
+ HbaseConnectorErrorCode.TABLE_QUERY_EXCEPTION.getErrorMessage(),
+ e);
+ }
+ }
+
+ /** Close Hbase connection. */
+ public void close() {
+ try {
+ if (hbaseMutator != null) {
+ hbaseMutator.flush();
+ hbaseMutator.close();
+ }
+ if (admin != null) {
+ admin.close();
+ }
+ if (connection != null) {
+ connection.close();
+ }
+ } catch (IOException e) {
+ log.error("Close Hbase connection failed.", e);
+ }
+ }
+
+ /**
+ * Mutate a Put.
+ *
+ * @param put Hbase put
+ * @throws IOException exception
+ */
+ public void mutate(Put put) throws IOException {
+ hbaseMutator.mutate(put);
+ }
+
+ /**
+ * Scan a table.
+ *
+ * @param split Hbase source split
+ * @param hbaseParameters Hbase parameters
+ * @param columnNames column names
+ * @return ResultScanner
+ * @throws IOException exception
+ */
+ public ResultScanner scan(
+ HbaseSourceSplit split, HbaseParameters hbaseParameters, List columnNames)
+ throws IOException {
+ Scan scan = new Scan();
+ scan.withStartRow(split.getStartRow(), true);
+ scan.withStopRow(split.getEndRow(), true);
+ scan.setCacheBlocks(hbaseParameters.isCacheBlocks());
+ scan.setCaching(hbaseParameters.getCaching());
+ scan.setBatch(hbaseParameters.getBatch());
+ for (String columnName : columnNames) {
+ String[] columnNameSplit = columnName.split(":");
+ scan.addColumn(Bytes.toBytes(columnNameSplit[0]), Bytes.toBytes(columnNameSplit[1]));
+ }
+ return this.connection
+ .getTable(TableName.valueOf(hbaseParameters.getTable()))
+ .getScanner(scan);
+ }
+
+ /**
+ * Get a RegionLocator.
+ *
+ * @param tableName table name
+ * @return RegionLocator
+ * @throws IOException exception
+ */
+ public RegionLocator getRegionLocator(String tableName) throws IOException {
+ return this.connection.getRegionLocator(TableName.valueOf(tableName));
+ }
+}
diff --git a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/config/HbaseConfig.java b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/config/HbaseConfig.java
index 44a5640ffed..2921e1f91c8 100644
--- a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/config/HbaseConfig.java
+++ b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/config/HbaseConfig.java
@@ -19,10 +19,17 @@
import org.apache.seatunnel.api.configuration.Option;
import org.apache.seatunnel.api.configuration.Options;
+import org.apache.seatunnel.api.sink.DataSaveMode;
+import org.apache.seatunnel.api.sink.SchemaSaveMode;
+import java.util.Arrays;
import java.util.List;
import java.util.Map;
+import static org.apache.seatunnel.api.sink.DataSaveMode.APPEND_DATA;
+import static org.apache.seatunnel.api.sink.DataSaveMode.DROP_DATA;
+import static org.apache.seatunnel.api.sink.DataSaveMode.ERROR_WHEN_DATA_EXISTS;
+
public class HbaseConfig {
private static final Integer DEFAULT_BUFFER_SIZE = 8 * 1024 * 1024;
@@ -119,6 +126,20 @@ public class HbaseConfig {
.withDescription(
"Set the batch size to control the maximum number of cells returned each time, thereby controlling the amount of data returned by a single RPC call. The default value is -1.");
+ public static final Option SCHEMA_SAVE_MODE =
+ Options.key("schema_save_mode")
+ .enumType(SchemaSaveMode.class)
+ .defaultValue(SchemaSaveMode.CREATE_SCHEMA_WHEN_NOT_EXIST)
+ .withDescription("schema_save_mode");
+
+ public static final Option DATA_SAVE_MODE =
+ Options.key("data_save_mode")
+ .singleChoice(
+ DataSaveMode.class,
+ Arrays.asList(DROP_DATA, APPEND_DATA, ERROR_WHEN_DATA_EXISTS))
+ .defaultValue(APPEND_DATA)
+ .withDescription("data_save_mode");
+
public enum NullMode {
SKIP,
EMPTY;
diff --git a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/config/HbaseParameters.java b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/config/HbaseParameters.java
index 4d020700ad6..66b4eb967b5 100644
--- a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/config/HbaseParameters.java
+++ b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/config/HbaseParameters.java
@@ -51,6 +51,8 @@ public class HbaseParameters implements Serializable {
private String zookeeperQuorum;
+ private String namespace;
+
private String table;
private List rowkeyColumns;
@@ -83,13 +85,22 @@ public class HbaseParameters implements Serializable {
public static HbaseParameters buildWithConfig(ReadonlyConfig config) {
HbaseParametersBuilder builder = HbaseParameters.builder();
+ String table = config.get(TABLE);
+ int colonIndex = table.indexOf(':');
+ if (colonIndex != -1) {
+ String namespace = table.substring(0, colonIndex);
+ builder.namespace(namespace);
+ builder.table(table.substring(colonIndex + 1));
+ } else {
+ builder.table(table);
+ builder.namespace("default");
+ }
// required parameters
builder.zookeeperQuorum(config.get(ZOOKEEPER_QUORUM));
builder.rowkeyColumns(config.get(ROWKEY_COLUMNS));
builder.familyNames(config.get(FAMILY_NAME));
- builder.table(config.get(TABLE));
builder.rowkeyDelimiter(config.get(ROWKEY_DELIMITER));
builder.versionColumn(config.get(VERSION_COLUMN));
String nullMode = String.valueOf(config.get(NULL_MODE));
@@ -108,7 +119,15 @@ public static HbaseParameters buildWithSourceConfig(Config pluginConfig) {
// required parameters
builder.zookeeperQuorum(pluginConfig.getString(ZOOKEEPER_QUORUM.key()));
- builder.table(pluginConfig.getString(TABLE.key()));
+ String table = pluginConfig.getString(TABLE.key());
+ int colonIndex = table.indexOf(':');
+ if (colonIndex != -1) {
+ String namespace = table.substring(0, colonIndex);
+ builder.namespace(namespace);
+ builder.table(table.substring(colonIndex + 1));
+ } else {
+ builder.table(table);
+ }
if (pluginConfig.hasPath(HBASE_EXTRA_CONFIG.key())) {
Config extraConfig = pluginConfig.getConfig(HBASE_EXTRA_CONFIG.key());
diff --git a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/constant/HbaseIdentifier.java b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/constant/HbaseIdentifier.java
new file mode 100644
index 00000000000..3d84216d660
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/constant/HbaseIdentifier.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.hbase.constant;
+
+public class HbaseIdentifier {
+ public static final String IDENTIFIER_NAME = "Hbase";
+}
diff --git a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/exception/HbaseConnectorErrorCode.java b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/exception/HbaseConnectorErrorCode.java
index 5717c933b0d..7f6a60f9558 100644
--- a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/exception/HbaseConnectorErrorCode.java
+++ b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/exception/HbaseConnectorErrorCode.java
@@ -21,8 +21,17 @@
import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
public enum HbaseConnectorErrorCode implements SeaTunnelErrorCode {
- CONNECTION_FAILED("Hbase-01", "Build Hbase connection failed");
-
+ CONNECTION_FAILED("Hbase-01", "Build Hbase connection failed"),
+ CONNECTION_FAILED_FOR_ADMIN("Hbase-02", "Build Hbase Admin failed"),
+ DATABASE_QUERY_EXCEPTION("Hbase-03", "Hbase namespace query failed"),
+ TABLE_QUERY_EXCEPTION("Hbase-04", "Hbase table query failed"),
+ TABLE_CREATE_EXCEPTION("Hbase-05", "Hbase table create failed"),
+ TABLE_DELETE_EXCEPTION("Hbase-06", "Hbase table delete failed"),
+ TABLE_EXISTS_EXCEPTION("Hbase-07", "Hbase table exists failed"),
+ NAMESPACE_CREATE_EXCEPTION("Hbase-08", "Hbase namespace create failed"),
+ NAMESPACE_DELETE_EXCEPTION("Hbase-09", "Hbase namespace delete failed"),
+ TABLE_TRUNCATE_EXCEPTION("Hbase-10", "Hbase table truncate failed"),
+ ;
private final String code;
private final String description;
diff --git a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSink.java b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSink.java
index 0c592dd65a0..0a46b1baefa 100644
--- a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSink.java
+++ b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSink.java
@@ -17,52 +17,97 @@
package org.apache.seatunnel.connectors.seatunnel.hbase.sink;
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.sink.DataSaveMode;
+import org.apache.seatunnel.api.sink.DefaultSaveModeHandler;
+import org.apache.seatunnel.api.sink.SaveModeHandler;
+import org.apache.seatunnel.api.sink.SchemaSaveMode;
+import org.apache.seatunnel.api.sink.SeaTunnelSink;
import org.apache.seatunnel.api.sink.SinkWriter;
import org.apache.seatunnel.api.sink.SupportMultiTableSink;
+import org.apache.seatunnel.api.sink.SupportSaveMode;
+import org.apache.seatunnel.api.table.catalog.Catalog;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.api.table.factory.CatalogFactory;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSimpleSink;
+import org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig;
import org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseParameters;
+import org.apache.seatunnel.connectors.seatunnel.hbase.constant.HbaseIdentifier;
+import org.apache.seatunnel.connectors.seatunnel.hbase.state.HbaseAggregatedCommitInfo;
+import org.apache.seatunnel.connectors.seatunnel.hbase.state.HbaseCommitInfo;
+import org.apache.seatunnel.connectors.seatunnel.hbase.state.HbaseSinkState;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
+import java.util.Optional;
-public class HbaseSink extends AbstractSimpleSink
- implements SupportMultiTableSink {
+import static org.apache.seatunnel.api.table.factory.FactoryUtil.discoverFactory;
- private Config pluginConfig;
+public class HbaseSink
+ implements SeaTunnelSink<
+ SeaTunnelRow, HbaseSinkState, HbaseCommitInfo, HbaseAggregatedCommitInfo>,
+ SupportMultiTableSink,
+ SupportSaveMode {
- private SeaTunnelRowType seaTunnelRowType;
+ private ReadonlyConfig config;
+
+ private CatalogTable catalogTable;
- private HbaseParameters hbaseParameters;
+ private final HbaseParameters hbaseParameters;
+
+ private SeaTunnelRowType seaTunnelRowType;
private List rowkeyColumnIndexes = new ArrayList<>();
private int versionColumnIndex = -1;
+ public HbaseSink(ReadonlyConfig config, CatalogTable catalogTable) {
+ this.hbaseParameters = HbaseParameters.buildWithConfig(config);
+ this.config = config;
+ this.catalogTable = catalogTable;
+ this.seaTunnelRowType = catalogTable.getSeaTunnelRowType();
+ if (hbaseParameters.getVersionColumn() != null) {
+ this.versionColumnIndex = seaTunnelRowType.indexOf(hbaseParameters.getVersionColumn());
+ }
+ }
+
@Override
public String getPluginName() {
- return HbaseSinkFactory.IDENTIFIER;
+ return HbaseIdentifier.IDENTIFIER_NAME;
}
- public HbaseSink(HbaseParameters hbaseParameters, CatalogTable catalogTable) {
- this.hbaseParameters = hbaseParameters;
- this.seaTunnelRowType = catalogTable.getTableSchema().toPhysicalRowDataType();
+ @Override
+ public HbaseSinkWriter createWriter(SinkWriter.Context context) throws IOException {
for (String rowkeyColumn : hbaseParameters.getRowkeyColumns()) {
this.rowkeyColumnIndexes.add(seaTunnelRowType.indexOf(rowkeyColumn));
}
if (hbaseParameters.getVersionColumn() != null) {
this.versionColumnIndex = seaTunnelRowType.indexOf(hbaseParameters.getVersionColumn());
}
+ return new HbaseSinkWriter(
+ seaTunnelRowType, hbaseParameters, rowkeyColumnIndexes, versionColumnIndex);
}
@Override
- public HbaseSinkWriter createWriter(SinkWriter.Context context) throws IOException {
- return new HbaseSinkWriter(
- seaTunnelRowType, hbaseParameters, rowkeyColumnIndexes, versionColumnIndex);
+ public Optional getSaveModeHandler() {
+ CatalogFactory catalogFactory =
+ discoverFactory(
+ Thread.currentThread().getContextClassLoader(),
+ CatalogFactory.class,
+ getPluginName());
+ if (catalogFactory == null) {
+ return Optional.empty();
+ }
+ Catalog catalog = catalogFactory.createCatalog(catalogFactory.factoryIdentifier(), config);
+ SchemaSaveMode schemaSaveMode = config.get(HbaseConfig.SCHEMA_SAVE_MODE);
+ DataSaveMode dataSaveMode = config.get(HbaseConfig.DATA_SAVE_MODE);
+ TablePath tablePath =
+ TablePath.of(hbaseParameters.getNamespace(), hbaseParameters.getTable());
+ return Optional.of(
+ new DefaultSaveModeHandler(
+ schemaSaveMode, dataSaveMode, catalog, tablePath, null, null));
}
}
diff --git a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSinkFactory.java b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSinkFactory.java
index 1bbeb43f4e3..0992b11d710 100644
--- a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSinkFactory.java
+++ b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSinkFactory.java
@@ -17,23 +17,25 @@
package org.apache.seatunnel.connectors.seatunnel.hbase.sink;
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.api.sink.SinkCommonOptions;
-import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.connector.TableSink;
import org.apache.seatunnel.api.table.factory.Factory;
import org.apache.seatunnel.api.table.factory.TableSinkFactory;
import org.apache.seatunnel.api.table.factory.TableSinkFactoryContext;
-import org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseParameters;
+import org.apache.seatunnel.connectors.seatunnel.hbase.constant.HbaseIdentifier;
import com.google.auto.service.AutoService;
+import static org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.DATA_SAVE_MODE;
import static org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.ENCODING;
import static org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.FAMILY_NAME;
import static org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.HBASE_EXTRA_CONFIG;
import static org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.NULL_MODE;
import static org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.ROWKEY_COLUMNS;
import static org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.ROWKEY_DELIMITER;
+import static org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.SCHEMA_SAVE_MODE;
import static org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.TABLE;
import static org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.VERSION_COLUMN;
import static org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.WAL_WRITE;
@@ -47,29 +49,34 @@ public class HbaseSinkFactory implements TableSinkFactory {
@Override
public String factoryIdentifier() {
- return IDENTIFIER;
+ return HbaseIdentifier.IDENTIFIER_NAME;
}
@Override
public OptionRule optionRule() {
return OptionRule.builder()
- .required(ZOOKEEPER_QUORUM, TABLE, ROWKEY_COLUMNS, FAMILY_NAME)
+ .required(
+ ZOOKEEPER_QUORUM,
+ TABLE,
+ ROWKEY_COLUMNS,
+ FAMILY_NAME,
+ SCHEMA_SAVE_MODE,
+ DATA_SAVE_MODE)
.optional(
- SinkCommonOptions.MULTI_TABLE_SINK_REPLICA,
ROWKEY_DELIMITER,
VERSION_COLUMN,
NULL_MODE,
WAL_WRITE,
WRITE_BUFFER_SIZE,
ENCODING,
- HBASE_EXTRA_CONFIG)
+ HBASE_EXTRA_CONFIG,
+ SinkCommonOptions.MULTI_TABLE_SINK_REPLICA)
.build();
}
@Override
public TableSink createSink(TableSinkFactoryContext context) {
- HbaseParameters hbaseParameters = HbaseParameters.buildWithConfig(context.getOptions());
- CatalogTable catalogTable = context.getCatalogTable();
- return () -> new HbaseSink(hbaseParameters, catalogTable);
+ ReadonlyConfig readonlyConfig = context.getOptions();
+ return () -> new HbaseSink(readonlyConfig, context.getCatalogTable());
}
}
diff --git a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSinkWriter.java b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSinkWriter.java
index e1e312d3057..73ee19f9369 100644
--- a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSinkWriter.java
+++ b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSinkWriter.java
@@ -17,52 +17,46 @@
package org.apache.seatunnel.connectors.seatunnel.hbase.sink;
+import org.apache.seatunnel.api.sink.SinkWriter;
import org.apache.seatunnel.api.sink.SupportMultiTableSinkWriter;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
-import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
+import org.apache.seatunnel.connectors.seatunnel.hbase.client.HbaseClient;
import org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseParameters;
import org.apache.seatunnel.connectors.seatunnel.hbase.exception.HbaseConnectorException;
+import org.apache.seatunnel.connectors.seatunnel.hbase.state.HbaseCommitInfo;
+import org.apache.seatunnel.connectors.seatunnel.hbase.state.HbaseSinkState;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.BufferedMutator;
-import org.apache.hadoop.hbase.client.BufferedMutatorParams;
-import org.apache.hadoop.hbase.client.Connection;
-import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Durability;
-import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.util.Bytes;
import java.io.IOException;
import java.nio.charset.Charset;
import java.util.List;
+import java.util.Map;
+import java.util.Optional;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
-public class HbaseSinkWriter extends AbstractSinkWriter
- implements SupportMultiTableSinkWriter {
+public class HbaseSinkWriter
+ implements SinkWriter,
+ SupportMultiTableSinkWriter {
private static final String ALL_COLUMNS = "all_columns";
- private final Configuration hbaseConfiguration = HBaseConfiguration.create();
-
- private final Connection hbaseConnection;
-
- private final BufferedMutator hbaseMutator;
+ private final HbaseClient hbaseClient;
private final SeaTunnelRowType seaTunnelRowType;
private final HbaseParameters hbaseParameters;
- private final List rowkeyColumnIndexes;
+ private List rowkeyColumnIndexes;
- private final int versionColumnIndex;
+ private int versionColumnIndex;
private String defaultFamilyName = "value";
@@ -70,8 +64,7 @@ public HbaseSinkWriter(
SeaTunnelRowType seaTunnelRowType,
HbaseParameters hbaseParameters,
List rowkeyColumnIndexes,
- int versionColumnIndex)
- throws IOException {
+ int versionColumnIndex) {
this.seaTunnelRowType = seaTunnelRowType;
this.hbaseParameters = hbaseParameters;
this.rowkeyColumnIndexes = rowkeyColumnIndexes;
@@ -82,34 +75,27 @@ public HbaseSinkWriter(
hbaseParameters.getFamilyNames().getOrDefault(ALL_COLUMNS, defaultFamilyName);
}
- // initialize hbase configuration
- hbaseConfiguration.set("hbase.zookeeper.quorum", hbaseParameters.getZookeeperQuorum());
- if (hbaseParameters.getHbaseExtraConfig() != null) {
- hbaseParameters.getHbaseExtraConfig().forEach(hbaseConfiguration::set);
- }
- // initialize hbase connection
- hbaseConnection = ConnectionFactory.createConnection(hbaseConfiguration);
- // initialize hbase mutator
- BufferedMutatorParams bufferedMutatorParams =
- new BufferedMutatorParams(TableName.valueOf(hbaseParameters.getTable()))
- .pool(HTable.getDefaultExecutor(hbaseConfiguration))
- .writeBufferSize(hbaseParameters.getWriteBufferSize());
- hbaseMutator = hbaseConnection.getBufferedMutator(bufferedMutatorParams);
+ this.hbaseClient = HbaseClient.createInstance(hbaseParameters);
}
@Override
public void write(SeaTunnelRow element) throws IOException {
Put put = convertRowToPut(element);
- hbaseMutator.mutate(put);
+ hbaseClient.mutate(put);
}
+ @Override
+ public Optional prepareCommit() throws IOException {
+ return Optional.empty();
+ }
+
+ @Override
+ public void abortPrepare() {}
+
@Override
public void close() throws IOException {
- if (hbaseMutator != null) {
- hbaseMutator.close();
- }
- if (hbaseConnection != null) {
- hbaseConnection.close();
+ if (hbaseClient != null) {
+ hbaseClient.close();
}
}
@@ -134,6 +120,7 @@ private Put convertRowToPut(SeaTunnelRow row) {
.collect(Collectors.toList());
for (Integer writeColumnIndex : writeColumnIndexes) {
String fieldName = seaTunnelRowType.getFieldName(writeColumnIndex);
+ Map configurationFamilyNames = hbaseParameters.getFamilyNames();
String familyName =
hbaseParameters.getFamilyNames().getOrDefault(fieldName, defaultFamilyName);
byte[] bytes = convertColumnToBytes(row, writeColumnIndex);
diff --git a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/source/HbaseSource.java b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/source/HbaseSource.java
index 3aca3161516..1a597eea133 100644
--- a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/source/HbaseSource.java
+++ b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/source/HbaseSource.java
@@ -35,11 +35,9 @@
import org.apache.seatunnel.common.config.CheckResult;
import org.apache.seatunnel.common.constants.PluginType;
import org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseParameters;
+import org.apache.seatunnel.connectors.seatunnel.hbase.constant.HbaseIdentifier;
import org.apache.seatunnel.connectors.seatunnel.hbase.exception.HbaseConnectorException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import com.google.common.collect.Lists;
import java.util.List;
@@ -51,9 +49,6 @@ public class HbaseSource
implements SeaTunnelSource,
SupportParallelism,
SupportColumnProjection {
- private static final Logger LOG = LoggerFactory.getLogger(HbaseSource.class);
- public static final String PLUGIN_NAME = "Hbase";
- private Config pluginConfig;
private SeaTunnelRowType seaTunnelRowType;
private HbaseParameters hbaseParameters;
@@ -61,11 +56,10 @@ public class HbaseSource
@Override
public String getPluginName() {
- return PLUGIN_NAME;
+ return HbaseIdentifier.IDENTIFIER_NAME;
}
HbaseSource(Config pluginConfig) {
- this.pluginConfig = pluginConfig;
CheckResult result =
CheckConfigUtil.checkAllExists(pluginConfig, ZOOKEEPER_QUORUM.key(), TABLE.key());
if (!result.isSuccess()) {
diff --git a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/source/HbaseSourceFactory.java b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/source/HbaseSourceFactory.java
index 2de385dbd18..5e250337d7b 100644
--- a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/source/HbaseSourceFactory.java
+++ b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/source/HbaseSourceFactory.java
@@ -26,18 +26,17 @@
import org.apache.seatunnel.api.table.factory.TableSourceFactory;
import org.apache.seatunnel.api.table.factory.TableSourceFactoryContext;
import org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig;
+import org.apache.seatunnel.connectors.seatunnel.hbase.constant.HbaseIdentifier;
import com.google.auto.service.AutoService;
import java.io.Serializable;
-import static org.apache.seatunnel.connectors.seatunnel.hbase.sink.HbaseSinkFactory.IDENTIFIER;
-
@AutoService(Factory.class)
public class HbaseSourceFactory implements TableSourceFactory {
@Override
public String factoryIdentifier() {
- return IDENTIFIER;
+ return HbaseIdentifier.IDENTIFIER_NAME;
}
@Override
diff --git a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/source/HbaseSourceReader.java b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/source/HbaseSourceReader.java
index 526ac826db1..2f78fb280c0 100644
--- a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/source/HbaseSourceReader.java
+++ b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/source/HbaseSourceReader.java
@@ -22,16 +22,12 @@
import org.apache.seatunnel.api.source.SourceReader;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.connectors.seatunnel.hbase.client.HbaseClient;
import org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseParameters;
import org.apache.seatunnel.connectors.seatunnel.hbase.format.HBaseDeserializationFormat;
-import org.apache.seatunnel.connectors.seatunnel.hbase.utils.HbaseConnectionUtil;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.util.Bytes;
import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
@@ -55,13 +51,13 @@ public class HbaseSourceReader implements SourceReader namesMap;
- private final SourceReader.Context context;
+ private final Context context;
private final SeaTunnelRowType seaTunnelRowType;
private volatile boolean noMoreSplit = false;
+ private final HbaseClient hbaseClient;
private HbaseParameters hbaseParameters;
private final List columnNames;
- private final transient Connection connection;
private HBaseDeserializationFormat hbaseDeserializationFormat =
new HBaseDeserializationFormat();
@@ -85,8 +81,7 @@ public HbaseSourceReader(
Preconditions.checkArgument(
column.contains(":") && column.split(":").length == 2,
"Invalid column names, it should be [ColumnFamily:Column] format"));
-
- connection = HbaseConnectionUtil.getHbaseConnection(hbaseParameters);
+ hbaseClient = HbaseClient.createInstance(hbaseParameters);
}
@Override
@@ -103,9 +98,9 @@ public void close() throws IOException {
throw new IOException("Failed to close HBase Scanner.", e);
}
}
- if (this.connection != null) {
+ if (this.hbaseClient != null) {
try {
- this.connection.close();
+ this.hbaseClient.close();
} catch (Exception e) {
throw new IOException("Failed to close HBase connection.", e);
}
@@ -119,23 +114,8 @@ public void pollNext(Collector output) throws Exception {
final HbaseSourceSplit split = sourceSplits.poll();
if (Objects.nonNull(split)) {
// read logic
- if (this.currentScanner == null) {
- Scan scan = new Scan();
- scan.withStartRow(split.getStartRow(), true);
- scan.withStopRow(split.getEndRow(), true);
- scan.setCacheBlocks(hbaseParameters.isCacheBlocks());
- scan.setCaching(hbaseParameters.getCaching());
- scan.setBatch(hbaseParameters.getBatch());
- for (String columnName : this.columnNames) {
- String[] columnNameSplit = columnName.split(":");
- scan.addColumn(
- Bytes.toBytes(columnNameSplit[0]),
- Bytes.toBytes(columnNameSplit[1]));
- }
- this.currentScanner =
- this.connection
- .getTable(TableName.valueOf(hbaseParameters.getTable()))
- .getScanner(scan);
+ if (currentScanner == null) {
+ currentScanner = hbaseClient.scan(split, hbaseParameters, this.columnNames);
}
for (Result result : currentScanner) {
SeaTunnelRow seaTunnelRow =
diff --git a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/source/HbaseSourceSplitEnumerator.java b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/source/HbaseSourceSplitEnumerator.java
index 094128b1747..f5508c9037d 100644
--- a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/source/HbaseSourceSplitEnumerator.java
+++ b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/source/HbaseSourceSplitEnumerator.java
@@ -18,14 +18,10 @@
package org.apache.seatunnel.connectors.seatunnel.hbase.source;
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
import org.apache.seatunnel.api.source.SourceSplitEnumerator;
+import org.apache.seatunnel.connectors.seatunnel.hbase.client.HbaseClient;
import org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseParameters;
-import org.apache.seatunnel.connectors.seatunnel.hbase.utils.HbaseConnectionUtil;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.RegionLocator;
import lombok.extern.slf4j.Slf4j;
@@ -43,7 +39,6 @@ public class HbaseSourceSplitEnumerator
/** Source split enumerator context */
private final Context context;
- private Config pluginConfig;
/** The splits that has assigned */
private final Set assignedSplit;
@@ -51,24 +46,29 @@ public class HbaseSourceSplitEnumerator
private Set pendingSplit;
private HbaseParameters hbaseParameters;
- private Connection connection;
+
+ private HbaseClient hbaseClient;
public HbaseSourceSplitEnumerator(
Context context, HbaseParameters hbaseParameters) {
- this.context = context;
- this.hbaseParameters = hbaseParameters;
- this.assignedSplit = new HashSet<>();
- connection = HbaseConnectionUtil.getHbaseConnection(hbaseParameters);
+ this(context, hbaseParameters, new HashSet<>());
}
public HbaseSourceSplitEnumerator(
Context context,
HbaseParameters hbaseParameters,
HbaseSourceState sourceState) {
+ this(context, hbaseParameters, sourceState.getAssignedSplits());
+ }
+
+ private HbaseSourceSplitEnumerator(
+ Context context,
+ HbaseParameters hbaseParameters,
+ Set assignedSplit) {
this.context = context;
this.hbaseParameters = hbaseParameters;
- this.assignedSplit = sourceState.getAssignedSplits();
- connection = HbaseConnectionUtil.getHbaseConnection(hbaseParameters);
+ this.assignedSplit = assignedSplit;
+ this.hbaseClient = HbaseClient.createInstance(hbaseParameters);
}
@Override
@@ -157,8 +157,7 @@ private Set getTableSplits() {
List splits = new ArrayList<>();
try {
- RegionLocator regionLocator =
- connection.getRegionLocator(TableName.valueOf(hbaseParameters.getTable()));
+ RegionLocator regionLocator = hbaseClient.getRegionLocator(hbaseParameters.getTable());
byte[][] startKeys = regionLocator.getStartKeys();
byte[][] endKeys = regionLocator.getEndKeys();
if (startKeys.length != endKeys.length) {
diff --git a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/state/HbaseAggregatedCommitInfo.java b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/state/HbaseAggregatedCommitInfo.java
new file mode 100644
index 00000000000..c1996dc057e
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/state/HbaseAggregatedCommitInfo.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.hbase.state;
+
+import java.io.Serializable;
+
+public class HbaseAggregatedCommitInfo implements Serializable {}
diff --git a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/state/HbaseCommitInfo.java b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/state/HbaseCommitInfo.java
new file mode 100644
index 00000000000..39999ceddcd
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/state/HbaseCommitInfo.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.hbase.state;
+
+import java.io.Serializable;
+
+public class HbaseCommitInfo implements Serializable {}
diff --git a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/state/HbaseSinkState.java b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/state/HbaseSinkState.java
new file mode 100644
index 00000000000..6e1f068cf68
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/state/HbaseSinkState.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.hbase.state;
+
+import java.io.Serializable;
+
+public class HbaseSinkState implements Serializable {}
diff --git a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/utils/HbaseConnectionUtil.java b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/utils/HbaseConnectionUtil.java
deleted file mode 100644
index f006986e660..00000000000
--- a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/utils/HbaseConnectionUtil.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.connectors.seatunnel.hbase.utils;
-
-import org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseParameters;
-import org.apache.seatunnel.connectors.seatunnel.hbase.exception.HbaseConnectorErrorCode;
-import org.apache.seatunnel.connectors.seatunnel.hbase.exception.HbaseConnectorException;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.client.Connection;
-import org.apache.hadoop.hbase.client.ConnectionFactory;
-
-import java.io.IOException;
-
-public class HbaseConnectionUtil {
- public static Connection getHbaseConnection(HbaseParameters hbaseParameters) {
- Configuration hbaseConfiguration = HBaseConfiguration.create();
- hbaseConfiguration.set("hbase.zookeeper.quorum", hbaseParameters.getZookeeperQuorum());
- if (hbaseParameters.getHbaseExtraConfig() != null) {
- hbaseParameters.getHbaseExtraConfig().forEach(hbaseConfiguration::set);
- }
- // initialize hbase connection
- try {
- Connection connection = ConnectionFactory.createConnection(hbaseConfiguration);
- return connection;
- } catch (IOException e) {
- String errorMsg = "Build Hbase connection failed.";
- throw new HbaseConnectorException(HbaseConnectorErrorCode.CONNECTION_FAILED, errorMsg);
- }
- }
-}
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hbase/HbaseIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hbase/HbaseIT.java
index fe736f965ef..1957e1bd08e 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hbase/HbaseIT.java
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hbase/HbaseIT.java
@@ -29,10 +29,12 @@
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.jupiter.api.AfterAll;
@@ -47,6 +49,7 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Objects;
+import java.util.UUID;
@Slf4j
@DisabledOnContainer(
@@ -79,6 +82,7 @@ public class HbaseIT extends TestSuiteBase implements TestResource {
public void startUp() throws Exception {
hbaseCluster = new HbaseCluster();
hbaseConnection = hbaseCluster.startService();
+ admin = hbaseConnection.getAdmin();
// Create table for hbase sink test
log.info("initial");
hbaseCluster.createTable(TABLE_NAME, Arrays.asList(FAMILY_NAME));
@@ -112,6 +116,87 @@ public void testHbaseSink(TestContainer container) throws IOException, Interrupt
Assertions.assertEquals(0, sourceExecResult.getExitCode());
}
+ @TestTemplate
+ public void testHbaseSinkWithErrorWhenDataExists(TestContainer container)
+ throws IOException, InterruptedException {
+ deleteData(table);
+ insertData(table);
+ Assertions.assertEquals(5, countData(table));
+ Container.ExecResult execResult =
+ container.executeJob("/fake_to_hbase_with_error_when_data_exists.conf");
+ Assertions.assertEquals(1, execResult.getExitCode());
+ }
+
+ @TestTemplate
+ public void testHbaseSinkWithRecreateSchema(TestContainer container)
+ throws IOException, InterruptedException {
+ String tableName = "seatunnel_test_with_recreate_schema";
+ TableName table = TableName.valueOf(tableName);
+ dropTable(table);
+ hbaseCluster.createTable(tableName, Arrays.asList("test_rs"));
+ TableDescriptor descriptorBefore = hbaseConnection.getTable(table).getDescriptor();
+ String[] familiesBefore =
+ Arrays.stream(descriptorBefore.getColumnFamilies())
+ .map(f -> f.getNameAsString())
+ .toArray(String[]::new);
+ Assertions.assertTrue(Arrays.equals(familiesBefore, new String[] {"test_rs"}));
+ Container.ExecResult execResult =
+ container.executeJob("/fake_to_hbase_with_recreate_schema.conf");
+ Assertions.assertEquals(0, execResult.getExitCode());
+ TableDescriptor descriptorAfter = hbaseConnection.getTable(table).getDescriptor();
+ String[] familiesAfter =
+ Arrays.stream(descriptorAfter.getColumnFamilies())
+ .map(f -> f.getNameAsString())
+ .toArray(String[]::new);
+ Assertions.assertTrue(!Arrays.equals(familiesBefore, familiesAfter));
+ }
+
+ @TestTemplate
+ public void testHbaseSinkWithDropData(TestContainer container)
+ throws IOException, InterruptedException {
+ deleteData(table);
+ insertData(table);
+ countData(table);
+ Assertions.assertEquals(5, countData(table));
+ Container.ExecResult execResult =
+ container.executeJob("/fake_to_hbase_with_drop_data.conf");
+ Assertions.assertEquals(0, execResult.getExitCode());
+ Assertions.assertEquals(5, countData(table));
+ }
+
+ @TestTemplate
+ public void testHbaseSinkWithCreateWhenNotExists(TestContainer container)
+ throws IOException, InterruptedException {
+ TableName seatunnelTestWithCreateWhenNotExists =
+ TableName.valueOf("seatunnel_test_with_create_when_not_exists");
+ dropTable(seatunnelTestWithCreateWhenNotExists);
+ Container.ExecResult execResult =
+ container.executeJob("/fake_to_hbase_with_create_when_not_exists.conf");
+ Assertions.assertEquals(0, execResult.getExitCode());
+ Assertions.assertEquals(5, countData(seatunnelTestWithCreateWhenNotExists));
+ }
+
+ @TestTemplate
+ public void testHbaseSinkWithAppendData(TestContainer container)
+ throws IOException, InterruptedException {
+ deleteData(table);
+ insertData(table);
+ countData(table);
+ Assertions.assertEquals(5, countData(table));
+ Container.ExecResult execResult =
+ container.executeJob("/fake_to_hbase_with_append_data.conf");
+ Assertions.assertEquals(0, execResult.getExitCode());
+ Assertions.assertEquals(10, countData(table));
+ }
+
+ @TestTemplate
+ public void testHbaseSinkWithErrorWhenNotExists(TestContainer container)
+ throws IOException, InterruptedException {
+ Container.ExecResult execResult =
+ container.executeJob("/fake_to_hbase_with_error_when_not_exists.conf");
+ Assertions.assertEquals(1, execResult.getExitCode());
+ }
+
@TestTemplate
public void testHbaseSinkWithArray(TestContainer container)
throws IOException, InterruptedException {
@@ -223,6 +308,13 @@ private void fakeToHbase(TestContainer container) throws IOException, Interrupte
scanner.close();
}
+ private void dropTable(TableName tableName) throws IOException {
+ if (admin.tableExists(tableName)) {
+ admin.disableTable(tableName);
+ admin.deleteTable(tableName);
+ }
+ }
+
private void deleteData(TableName table) throws IOException {
Table hbaseTable = hbaseConnection.getTable(table);
Scan scan = new Scan();
@@ -234,6 +326,32 @@ private void deleteData(TableName table) throws IOException {
}
}
+ private void insertData(TableName table) throws IOException {
+ Table hbaseTable = hbaseConnection.getTable(table);
+ for (int i = 0; i < 5; i++) {
+ String rowKey = "row" + UUID.randomUUID();
+ String value = "value" + i;
+ hbaseTable.put(
+ new Put(Bytes.toBytes(rowKey))
+ .addColumn(
+ Bytes.toBytes(FAMILY_NAME),
+ Bytes.toBytes("name"),
+ Bytes.toBytes(value)));
+ }
+ }
+
+ private int countData(TableName table) throws IOException {
+ Table hbaseTable = hbaseConnection.getTable(table);
+ Scan scan = new Scan();
+ ResultScanner scanner = hbaseTable.getScanner(scan);
+ int count = 0;
+ for (Result result = scanner.next(); result != null; result = scanner.next()) {
+ count++;
+ }
+ scanner.close();
+ return count;
+ }
+
public ArrayList readData(TableName table) throws IOException {
Table hbaseTable = hbaseConnection.getTable(table);
Scan scan = new Scan();
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/resources/fake_to_hbase_with_append_data.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/resources/fake_to_hbase_with_append_data.conf
new file mode 100644
index 00000000000..0778d8cb36b
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/resources/fake_to_hbase_with_append_data.conf
@@ -0,0 +1,52 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+}
+
+source {
+ FakeSource {
+ row.num = 5
+ schema {
+ fields {
+ name = string
+ age = int
+ c_tinyint = tinyint
+ c_smallint = smallint
+ c_bigint = bigint
+ c_float = float
+ c_double = double
+ c_boolean = boolean
+ }
+ }
+ }
+}
+
+sink {
+ Hbase {
+ zookeeper_quorum = "hbase_e2e:2181"
+ table = "seatunnel_test"
+ rowkey_column = ["name"]
+ family_name {
+ all_columns = info
+ }
+ schema_save_mode = "CREATE_SCHEMA_WHEN_NOT_EXIST"
+ data_save_mode = "APPEND_DATA"
+ }
+}
\ No newline at end of file
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/resources/fake_to_hbase_with_create_when_not_exists.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/resources/fake_to_hbase_with_create_when_not_exists.conf
new file mode 100644
index 00000000000..21327170824
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/resources/fake_to_hbase_with_create_when_not_exists.conf
@@ -0,0 +1,51 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+}
+
+source {
+ FakeSource {
+ row.num = 5
+ schema {
+ fields {
+ name = string
+ age = int
+ c_tinyint = tinyint
+ c_smallint = smallint
+ c_bigint = bigint
+ c_float = float
+ c_double = double
+ c_boolean = boolean
+ }
+ }
+ }
+}
+
+sink {
+ Hbase {
+ zookeeper_quorum = "hbase_e2e:2181"
+ table = "seatunnel_test_with_create_when_not_exists"
+ rowkey_column = ["name"]
+ family_name {
+ all_columns = info
+ }
+ schema_save_mode = "CREATE_SCHEMA_WHEN_NOT_EXIST"
+ }
+}
\ No newline at end of file
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/resources/fake_to_hbase_with_drop_data.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/resources/fake_to_hbase_with_drop_data.conf
new file mode 100644
index 00000000000..66b3981206a
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/resources/fake_to_hbase_with_drop_data.conf
@@ -0,0 +1,52 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+}
+
+source {
+ FakeSource {
+ row.num = 5
+ schema {
+ fields {
+ name = string
+ age = int
+ c_tinyint = tinyint
+ c_smallint = smallint
+ c_bigint = bigint
+ c_float = float
+ c_double = double
+ c_boolean = boolean
+ }
+ }
+ }
+}
+
+sink {
+ Hbase {
+ zookeeper_quorum = "hbase_e2e:2181"
+ table = "seatunnel_test_with_create_when_not_exists"
+ rowkey_column = ["name"]
+ family_name {
+ all_columns = info
+ }
+ schema_save_mode = "CREATE_SCHEMA_WHEN_NOT_EXIST"
+ data_save_mode = "DROP_DATA"
+ }
+}
\ No newline at end of file
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/resources/fake_to_hbase_with_error_when_data_exists.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/resources/fake_to_hbase_with_error_when_data_exists.conf
new file mode 100644
index 00000000000..00e0485e3d0
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/resources/fake_to_hbase_with_error_when_data_exists.conf
@@ -0,0 +1,52 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+}
+
+source {
+ FakeSource {
+ row.num = 5
+ schema {
+ fields {
+ name = string
+ age = int
+ c_tinyint = tinyint
+ c_smallint = smallint
+ c_bigint = bigint
+ c_float = float
+ c_double = double
+ c_boolean = boolean
+ }
+ }
+ }
+}
+
+sink {
+ Hbase {
+ zookeeper_quorum = "hbase_e2e:2181"
+ table = "seatunnel_test"
+ rowkey_column = ["name"]
+ family_name {
+ all_columns = info
+ }
+ schema_save_mode = "CREATE_SCHEMA_WHEN_NOT_EXIST"
+ data_save_mode = "ERROR_WHEN_DATA_EXISTS"
+ }
+}
\ No newline at end of file
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/resources/fake_to_hbase_with_error_when_not_exists.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/resources/fake_to_hbase_with_error_when_not_exists.conf
new file mode 100644
index 00000000000..359b71b79f3
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/resources/fake_to_hbase_with_error_when_not_exists.conf
@@ -0,0 +1,51 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+}
+
+source {
+ FakeSource {
+ row.num = 5
+ schema {
+ fields {
+ name = string
+ age = int
+ c_tinyint = tinyint
+ c_smallint = smallint
+ c_bigint = bigint
+ c_float = float
+ c_double = double
+ c_boolean = boolean
+ }
+ }
+ }
+}
+
+sink {
+ Hbase {
+ zookeeper_quorum = "hbase_e2e:2181"
+ table = "seatunnel_test_with_error_when_not_exists"
+ rowkey_column = ["name"]
+ family_name {
+ all_columns = info
+ }
+ schema_save_mode = "ERROR_WHEN_SCHEMA_NOT_EXIST"
+ }
+}
\ No newline at end of file
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/resources/fake_to_hbase_with_recreate_schema.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/resources/fake_to_hbase_with_recreate_schema.conf
new file mode 100644
index 00000000000..c8a8c43d9ce
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/resources/fake_to_hbase_with_recreate_schema.conf
@@ -0,0 +1,51 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+}
+
+source {
+ FakeSource {
+ row.num = 5
+ schema {
+ fields {
+ name = string
+ age = int
+ c_tinyint = tinyint
+ c_smallint = smallint
+ c_bigint = bigint
+ c_float = float
+ c_double = double
+ c_boolean = boolean
+ }
+ }
+ }
+}
+
+sink {
+ Hbase {
+ zookeeper_quorum = "hbase_e2e:2181"
+ table = "seatunnel_test_with_recreate_schema"
+ rowkey_column = ["name"]
+ family_name {
+ all_columns = info
+ }
+ schema_save_mode = "RECREATE_SCHEMA"
+ }
+}
\ No newline at end of file