Skip to content

Commit 349f142

Browse files
authoredMar 13, 2025··
[Improve] easysearch options (#8951)
1 parent 710044e commit 349f142

File tree

15 files changed

+228
-287
lines changed

15 files changed

+228
-287
lines changed
 

‎seatunnel-ci-tools/src/test/java/org/apache/seatunnel/api/ConnectorOptionCheckTest.java

-2
Original file line numberDiff line numberDiff line change
@@ -183,13 +183,11 @@ private Set<String> buildWhiteList() {
183183
whiteList.add("PulsarSourceOptions");
184184
whiteList.add("MongodbSinkOptions");
185185
whiteList.add("IoTDBSinkOptions");
186-
whiteList.add("EasysearchSourceOptions");
187186
whiteList.add("IcebergSourceOptions");
188187
whiteList.add("PaimonSourceOptions");
189188
whiteList.add("IoTDBSourceOptions");
190189
whiteList.add("SlsSourceOptions");
191190
whiteList.add("SentrySinkOptions");
192-
whiteList.add("EasysearchSinkOptions");
193191
whiteList.add("QdrantSinkOptions");
194192
whiteList.add("MilvusSourceOptions");
195193
whiteList.add("RocketMqSinkOptions");

‎seatunnel-connectors-v2/connector-easysearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/easysearch/catalog/EasysearchCatalog.java

+4-3
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,8 @@
1818
package org.apache.seatunnel.connectors.seatunnel.easysearch.catalog;
1919

2020
import org.apache.seatunnel.shade.com.google.common.collect.Lists;
21-
import org.apache.seatunnel.shade.com.typesafe.config.Config;
2221

22+
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
2323
import org.apache.seatunnel.api.configuration.util.ConfigUtil;
2424
import org.apache.seatunnel.api.table.catalog.Catalog;
2525
import org.apache.seatunnel.api.table.catalog.CatalogTable;
@@ -57,12 +57,13 @@ public class EasysearchCatalog implements Catalog {
5757

5858
private final String catalogName;
5959
private final String defaultDatabase;
60-
private final Config pluginConfig;
60+
private final ReadonlyConfig pluginConfig;
6161

6262
private EasysearchClient ezsClient;
6363

6464
// todo: do we need default database?
65-
public EasysearchCatalog(String catalogName, String defaultDatabase, Config easySearchConfig) {
65+
public EasysearchCatalog(
66+
String catalogName, String defaultDatabase, ReadonlyConfig easySearchConfig) {
6667
this.catalogName = checkNotNull(catalogName, "catalogName cannot be null");
6768
this.defaultDatabase = defaultDatabase;
6869
this.pluginConfig = checkNotNull(easySearchConfig, "easySearchConfig cannot be null");

‎seatunnel-connectors-v2/connector-easysearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/easysearch/client/EasysearchClient.java

+18-56
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,10 @@
2121
import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.ObjectMapper;
2222
import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.node.ObjectNode;
2323
import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.node.TextNode;
24-
import org.apache.seatunnel.shade.com.typesafe.config.Config;
2524

25+
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
2626
import org.apache.seatunnel.common.utils.JsonUtils;
27-
import org.apache.seatunnel.connectors.seatunnel.easysearch.config.EzsClusterConnectionConfig;
27+
import org.apache.seatunnel.connectors.seatunnel.easysearch.config.EasysearchSinkCommonOptions;
2828
import org.apache.seatunnel.connectors.seatunnel.easysearch.dto.BulkResponse;
2929
import org.apache.seatunnel.connectors.seatunnel.easysearch.dto.EasysearchClusterInfo;
3030
import org.apache.seatunnel.connectors.seatunnel.easysearch.dto.source.IndexDocsCount;
@@ -78,61 +78,23 @@ private EasysearchClient(RestClient restClient) {
7878
this.restClient = restClient;
7979
}
8080

81-
public static EasysearchClient createInstance(Config pluginConfig) {
82-
List<String> hosts = pluginConfig.getStringList(EzsClusterConnectionConfig.HOSTS.key());
83-
Optional<String> username = Optional.empty();
84-
Optional<String> password = Optional.empty();
85-
if (pluginConfig.hasPath(EzsClusterConnectionConfig.USERNAME.key())) {
86-
username =
87-
Optional.of(pluginConfig.getString(EzsClusterConnectionConfig.USERNAME.key()));
88-
if (pluginConfig.hasPath(EzsClusterConnectionConfig.PASSWORD.key())) {
89-
password =
90-
Optional.of(
91-
pluginConfig.getString(EzsClusterConnectionConfig.PASSWORD.key()));
92-
}
93-
}
94-
Optional<String> keystorePath = Optional.empty();
95-
Optional<String> keystorePassword = Optional.empty();
96-
Optional<String> truststorePath = Optional.empty();
97-
Optional<String> truststorePassword = Optional.empty();
81+
public static EasysearchClient createInstance(ReadonlyConfig pluginConfig) {
82+
List<String> hosts = pluginConfig.get(EasysearchSinkCommonOptions.HOSTS);
83+
Optional<String> username = pluginConfig.getOptional(EasysearchSinkCommonOptions.USERNAME);
84+
Optional<String> password = pluginConfig.getOptional(EasysearchSinkCommonOptions.PASSWORD);
85+
Optional<String> keystorePath =
86+
pluginConfig.getOptional(EasysearchSinkCommonOptions.TLS_KEY_STORE_PATH);
87+
Optional<String> keystorePassword =
88+
pluginConfig.getOptional(EasysearchSinkCommonOptions.TLS_KEY_STORE_PASSWORD);
89+
Optional<String> truststorePath =
90+
pluginConfig.getOptional(EasysearchSinkCommonOptions.TLS_TRUST_STORE_PATH);
91+
Optional<String> truststorePassword =
92+
pluginConfig.getOptional(EasysearchSinkCommonOptions.TLS_TRUST_STORE_PASSWORD);
9893
boolean tlsVerifyCertificate =
99-
EzsClusterConnectionConfig.TLS_VERIFY_CERTIFICATE.defaultValue();
100-
if (pluginConfig.hasPath(EzsClusterConnectionConfig.TLS_VERIFY_CERTIFICATE.key())) {
101-
tlsVerifyCertificate =
102-
pluginConfig.getBoolean(
103-
EzsClusterConnectionConfig.TLS_VERIFY_CERTIFICATE.key());
104-
}
105-
if (tlsVerifyCertificate) {
106-
if (pluginConfig.hasPath(EzsClusterConnectionConfig.TLS_KEY_STORE_PATH.key())) {
107-
keystorePath =
108-
Optional.of(
109-
pluginConfig.getString(
110-
EzsClusterConnectionConfig.TLS_KEY_STORE_PATH.key()));
111-
}
112-
if (pluginConfig.hasPath(EzsClusterConnectionConfig.TLS_KEY_STORE_PASSWORD.key())) {
113-
keystorePassword =
114-
Optional.of(
115-
pluginConfig.getString(
116-
EzsClusterConnectionConfig.TLS_KEY_STORE_PASSWORD.key()));
117-
}
118-
if (pluginConfig.hasPath(EzsClusterConnectionConfig.TLS_TRUST_STORE_PATH.key())) {
119-
truststorePath =
120-
Optional.of(
121-
pluginConfig.getString(
122-
EzsClusterConnectionConfig.TLS_TRUST_STORE_PATH.key()));
123-
}
124-
if (pluginConfig.hasPath(EzsClusterConnectionConfig.TLS_TRUST_STORE_PASSWORD.key())) {
125-
truststorePassword =
126-
Optional.of(
127-
pluginConfig.getString(
128-
EzsClusterConnectionConfig.TLS_TRUST_STORE_PASSWORD.key()));
129-
}
130-
}
131-
boolean tlsVerifyHostnames = EzsClusterConnectionConfig.TLS_VERIFY_HOSTNAME.defaultValue();
132-
if (pluginConfig.hasPath(EzsClusterConnectionConfig.TLS_VERIFY_HOSTNAME.key())) {
133-
tlsVerifyHostnames =
134-
pluginConfig.getBoolean(EzsClusterConnectionConfig.TLS_VERIFY_HOSTNAME.key());
135-
}
94+
pluginConfig.get(EasysearchSinkCommonOptions.TLS_VERIFY_CERTIFICATE);
95+
96+
boolean tlsVerifyHostnames =
97+
pluginConfig.get(EasysearchSinkCommonOptions.TLS_VERIFY_HOSTNAME);
13698
return createInstance(
13799
hosts,
138100
username,
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222

2323
import java.util.List;
2424

25-
public class EzsClusterConnectionConfig {
25+
public class EasysearchSinkCommonOptions {
2626

2727
public static final Option<List<String>> HOSTS =
2828
Options.key("hosts")
@@ -31,6 +31,12 @@ public class EzsClusterConnectionConfig {
3131
.withDescription(
3232
"Easysearch cluster http address, the format is host:port, allowing multiple hosts to be specified. Such as [\"host1:9200\", \"host2:9200\"]");
3333

34+
public static final Option<String> INDEX =
35+
Options.key("index")
36+
.stringType()
37+
.noDefaultValue()
38+
.withDescription("Easysearch index name, support * fuzzy matching");
39+
3440
public static final Option<String> USERNAME =
3541
Options.key("username")
3642
.stringType()
+1-7
Original file line numberDiff line numberDiff line change
@@ -22,14 +22,8 @@
2222

2323
import java.util.List;
2424

25-
public class SinkConfig {
25+
public class EasysearchSinkOptions extends EasysearchSinkCommonOptions {
2626

27-
public static final Option<String> INDEX =
28-
Options.key("index")
29-
.stringType()
30-
.noDefaultValue()
31-
.withDescription(
32-
"Easysearch index name.Index support contains variables of field name,such as seatunnel_${age},and the field must appear at seatunnel row. If not, we will treat it as a normal index");
3327
public static final Option<List<String>> PRIMARY_KEYS =
3428
Options.key("primary_keys")
3529
.listType(String.class)
+8-14
Original file line numberDiff line numberDiff line change
@@ -25,20 +25,7 @@
2525
import java.util.List;
2626
import java.util.Map;
2727

28-
public class SourceConfig {
29-
30-
public static final Option<String> INDEX =
31-
Options.key("index")
32-
.stringType()
33-
.noDefaultValue()
34-
.withDescription("Easysearch index name, support * fuzzy matching");
35-
36-
public static final Option<List<String>> SOURCE =
37-
Options.key("source")
38-
.listType()
39-
.noDefaultValue()
40-
.withDescription(
41-
"The fields of index. You can get the document id by specifying the field _id.If sink _id to other index,you need specify an alias for _id due to the Easysearch limit");
28+
public class EasysearchSourceOptions extends EasysearchSinkCommonOptions {
4229

4330
public static final Option<String> SCROLL_TIME =
4431
Options.key("scroll_time")
@@ -61,4 +48,11 @@ public class SourceConfig {
6148
Collections.singletonMap("match_all", new HashMap<String, String>()))
6249
.withDescription(
6350
"Easysearch query language. You can control the range of data read");
51+
52+
public static final Option<List<String>> SOURCE =
53+
Options.key("source")
54+
.listType()
55+
.noDefaultValue()
56+
.withDescription(
57+
"The fields of index. You can get the document id by specifying the field _id.If sink _id to other index,you need specify an alias for _id due to the Easysearch limit");
6458
}

‎seatunnel-connectors-v2/connector-easysearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/easysearch/dto/IndexInfo.java

+8-13
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,9 @@
1717

1818
package org.apache.seatunnel.connectors.seatunnel.easysearch.dto;
1919

20-
import org.apache.seatunnel.shade.com.typesafe.config.Config;
21-
22-
import org.apache.seatunnel.connectors.seatunnel.easysearch.config.SinkConfig;
20+
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
21+
import org.apache.seatunnel.connectors.seatunnel.easysearch.config.EasysearchSinkCommonOptions;
22+
import org.apache.seatunnel.connectors.seatunnel.easysearch.config.EasysearchSinkOptions;
2323

2424
import lombok.Data;
2525

@@ -31,17 +31,12 @@ public class IndexInfo {
3131
private String[] primaryKeys;
3232
private String keyDelimiter;
3333

34-
public IndexInfo(Config pluginConfig) {
35-
index = pluginConfig.getString(SinkConfig.INDEX.key());
36-
if (pluginConfig.hasPath(SinkConfig.PRIMARY_KEYS.key())) {
34+
public IndexInfo(ReadonlyConfig pluginConfig) {
35+
index = pluginConfig.get(EasysearchSinkCommonOptions.INDEX);
36+
if (pluginConfig.getOptional(EasysearchSinkOptions.PRIMARY_KEYS).isPresent()) {
3737
primaryKeys =
38-
pluginConfig
39-
.getStringList(SinkConfig.PRIMARY_KEYS.key())
40-
.toArray(new String[0]);
41-
}
42-
keyDelimiter = SinkConfig.KEY_DELIMITER.defaultValue();
43-
if (pluginConfig.hasPath(SinkConfig.KEY_DELIMITER.key())) {
44-
keyDelimiter = pluginConfig.getString(SinkConfig.KEY_DELIMITER.key());
38+
pluginConfig.get(EasysearchSinkOptions.PRIMARY_KEYS).toArray(new String[0]);
4539
}
40+
keyDelimiter = pluginConfig.get(EasysearchSinkOptions.KEY_DELIMITER);
4641
}
4742
}

‎seatunnel-connectors-v2/connector-easysearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/easysearch/sink/EasysearchSink.java

+8-33
Original file line numberDiff line numberDiff line change
@@ -17,66 +17,41 @@
1717

1818
package org.apache.seatunnel.connectors.seatunnel.easysearch.sink;
1919

20-
import org.apache.seatunnel.shade.com.typesafe.config.Config;
21-
22-
import org.apache.seatunnel.api.common.PrepareFailException;
20+
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
2321
import org.apache.seatunnel.api.sink.SeaTunnelSink;
2422
import org.apache.seatunnel.api.sink.SinkWriter;
2523
import org.apache.seatunnel.api.table.catalog.CatalogTable;
2624
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
27-
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
2825
import org.apache.seatunnel.connectors.seatunnel.easysearch.state.EasysearchAggregatedCommitInfo;
2926
import org.apache.seatunnel.connectors.seatunnel.easysearch.state.EasysearchCommitInfo;
3027
import org.apache.seatunnel.connectors.seatunnel.easysearch.state.EasysearchSinkState;
3128

32-
import com.google.auto.service.AutoService;
33-
3429
import java.util.Optional;
3530

36-
import static org.apache.seatunnel.connectors.seatunnel.easysearch.config.SinkConfig.MAX_BATCH_SIZE;
37-
import static org.apache.seatunnel.connectors.seatunnel.easysearch.config.SinkConfig.MAX_RETRY_COUNT;
38-
39-
@AutoService(SeaTunnelSink.class)
4031
public class EasysearchSink
4132
implements SeaTunnelSink<
4233
SeaTunnelRow,
4334
EasysearchSinkState,
4435
EasysearchCommitInfo,
4536
EasysearchAggregatedCommitInfo> {
4637

47-
private Config pluginConfig;
48-
private SeaTunnelRowType seaTunnelRowType;
49-
50-
private int maxBatchSize = MAX_BATCH_SIZE.defaultValue();
38+
private final ReadonlyConfig pluginConfig;
39+
private final CatalogTable catalogTable;
5140

52-
private int maxRetryCount = MAX_RETRY_COUNT.defaultValue();
53-
54-
@Override
55-
public String getPluginName() {
56-
return "Easysearch";
57-
}
58-
59-
@Override
60-
public void prepare(Config pluginConfig) throws PrepareFailException {
41+
public EasysearchSink(ReadonlyConfig pluginConfig, CatalogTable catalogTable) {
42+
this.catalogTable = catalogTable;
6143
this.pluginConfig = pluginConfig;
62-
if (pluginConfig.hasPath(MAX_BATCH_SIZE.key())) {
63-
maxBatchSize = pluginConfig.getInt(MAX_BATCH_SIZE.key());
64-
}
65-
if (pluginConfig.hasPath(MAX_RETRY_COUNT.key())) {
66-
maxRetryCount = pluginConfig.getInt(MAX_RETRY_COUNT.key());
67-
}
6844
}
6945

7046
@Override
71-
public void setTypeInfo(SeaTunnelRowType seaTunnelRowType) {
72-
this.seaTunnelRowType = seaTunnelRowType;
47+
public String getPluginName() {
48+
return "Easysearch";
7349
}
7450

7551
@Override
7652
public SinkWriter<SeaTunnelRow, EasysearchCommitInfo, EasysearchSinkState> createWriter(
7753
SinkWriter.Context context) {
78-
return new EasysearchSinkWriter(
79-
context, seaTunnelRowType, pluginConfig, maxBatchSize, maxRetryCount);
54+
return new EasysearchSinkWriter(context, catalogTable.getSeaTunnelRowType(), pluginConfig);
8055
}
8156

8257
@Override

‎seatunnel-connectors-v2/connector-easysearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/easysearch/sink/EasysearchSinkFactory.java

+21-28
Original file line numberDiff line numberDiff line change
@@ -18,26 +18,14 @@
1818
package org.apache.seatunnel.connectors.seatunnel.easysearch.sink;
1919

2020
import org.apache.seatunnel.api.configuration.util.OptionRule;
21+
import org.apache.seatunnel.api.table.connector.TableSink;
2122
import org.apache.seatunnel.api.table.factory.Factory;
2223
import org.apache.seatunnel.api.table.factory.TableSinkFactory;
24+
import org.apache.seatunnel.api.table.factory.TableSinkFactoryContext;
25+
import org.apache.seatunnel.connectors.seatunnel.easysearch.config.EasysearchSinkOptions;
2326

2427
import com.google.auto.service.AutoService;
2528

26-
import static org.apache.seatunnel.connectors.seatunnel.easysearch.config.EzsClusterConnectionConfig.HOSTS;
27-
import static org.apache.seatunnel.connectors.seatunnel.easysearch.config.EzsClusterConnectionConfig.PASSWORD;
28-
import static org.apache.seatunnel.connectors.seatunnel.easysearch.config.EzsClusterConnectionConfig.TLS_KEY_STORE_PASSWORD;
29-
import static org.apache.seatunnel.connectors.seatunnel.easysearch.config.EzsClusterConnectionConfig.TLS_KEY_STORE_PATH;
30-
import static org.apache.seatunnel.connectors.seatunnel.easysearch.config.EzsClusterConnectionConfig.TLS_TRUST_STORE_PASSWORD;
31-
import static org.apache.seatunnel.connectors.seatunnel.easysearch.config.EzsClusterConnectionConfig.TLS_TRUST_STORE_PATH;
32-
import static org.apache.seatunnel.connectors.seatunnel.easysearch.config.EzsClusterConnectionConfig.TLS_VERIFY_CERTIFICATE;
33-
import static org.apache.seatunnel.connectors.seatunnel.easysearch.config.EzsClusterConnectionConfig.TLS_VERIFY_HOSTNAME;
34-
import static org.apache.seatunnel.connectors.seatunnel.easysearch.config.EzsClusterConnectionConfig.USERNAME;
35-
import static org.apache.seatunnel.connectors.seatunnel.easysearch.config.SinkConfig.KEY_DELIMITER;
36-
import static org.apache.seatunnel.connectors.seatunnel.easysearch.config.SinkConfig.MAX_BATCH_SIZE;
37-
import static org.apache.seatunnel.connectors.seatunnel.easysearch.config.SinkConfig.MAX_RETRY_COUNT;
38-
import static org.apache.seatunnel.connectors.seatunnel.easysearch.config.SinkConfig.PRIMARY_KEYS;
39-
import static org.apache.seatunnel.connectors.seatunnel.easysearch.config.SourceConfig.INDEX;
40-
4129
@AutoService(Factory.class)
4230
public class EasysearchSinkFactory implements TableSinkFactory {
4331
@Override
@@ -48,20 +36,25 @@ public String factoryIdentifier() {
4836
@Override
4937
public OptionRule optionRule() {
5038
return OptionRule.builder()
51-
.required(HOSTS, INDEX)
39+
.required(EasysearchSinkOptions.HOSTS, EasysearchSinkOptions.INDEX)
5240
.optional(
53-
PRIMARY_KEYS,
54-
KEY_DELIMITER,
55-
USERNAME,
56-
PASSWORD,
57-
MAX_RETRY_COUNT,
58-
MAX_BATCH_SIZE,
59-
TLS_VERIFY_CERTIFICATE,
60-
TLS_VERIFY_HOSTNAME,
61-
TLS_KEY_STORE_PATH,
62-
TLS_KEY_STORE_PASSWORD,
63-
TLS_TRUST_STORE_PATH,
64-
TLS_TRUST_STORE_PASSWORD)
41+
EasysearchSinkOptions.USERNAME,
42+
EasysearchSinkOptions.PASSWORD,
43+
EasysearchSinkOptions.PRIMARY_KEYS,
44+
EasysearchSinkOptions.KEY_DELIMITER,
45+
EasysearchSinkOptions.MAX_RETRY_COUNT,
46+
EasysearchSinkOptions.MAX_BATCH_SIZE,
47+
EasysearchSinkOptions.TLS_VERIFY_CERTIFICATE,
48+
EasysearchSinkOptions.TLS_VERIFY_HOSTNAME,
49+
EasysearchSinkOptions.TLS_KEY_STORE_PATH,
50+
EasysearchSinkOptions.TLS_KEY_STORE_PASSWORD,
51+
EasysearchSinkOptions.TLS_TRUST_STORE_PATH,
52+
EasysearchSinkOptions.TLS_TRUST_STORE_PASSWORD)
6553
.build();
6654
}
55+
56+
@Override
57+
public TableSink createSink(TableSinkFactoryContext context) {
58+
return () -> new EasysearchSink(context.getOptions(), context.getCatalogTable());
59+
}
6760
}

‎seatunnel-connectors-v2/connector-easysearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/easysearch/sink/EasysearchSinkWriter.java

+9-7
Original file line numberDiff line numberDiff line change
@@ -17,15 +17,15 @@
1717

1818
package org.apache.seatunnel.connectors.seatunnel.easysearch.sink;
1919

20-
import org.apache.seatunnel.shade.com.typesafe.config.Config;
21-
20+
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
2221
import org.apache.seatunnel.api.sink.SinkWriter;
2322
import org.apache.seatunnel.api.table.type.RowKind;
2423
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
2524
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
2625
import org.apache.seatunnel.common.utils.RetryUtils;
2726
import org.apache.seatunnel.common.utils.RetryUtils.RetryMaterial;
2827
import org.apache.seatunnel.connectors.seatunnel.easysearch.client.EasysearchClient;
28+
import org.apache.seatunnel.connectors.seatunnel.easysearch.config.EasysearchSinkOptions;
2929
import org.apache.seatunnel.connectors.seatunnel.easysearch.dto.BulkResponse;
3030
import org.apache.seatunnel.connectors.seatunnel.easysearch.dto.IndexInfo;
3131
import org.apache.seatunnel.connectors.seatunnel.easysearch.exception.EasysearchConnectorErrorCode;
@@ -60,19 +60,21 @@ public class EasysearchSinkWriter
6060
public EasysearchSinkWriter(
6161
SinkWriter.Context context,
6262
SeaTunnelRowType seaTunnelRowType,
63-
Config pluginConfig,
64-
int maxBatchSize,
65-
int maxRetryCount) {
63+
ReadonlyConfig pluginConfig) {
6664
this.context = context;
67-
this.maxBatchSize = maxBatchSize;
65+
this.maxBatchSize = pluginConfig.get(EasysearchSinkOptions.MAX_BATCH_SIZE);
6866

6967
IndexInfo indexInfo = new IndexInfo(pluginConfig);
7068
ezsClient = EasysearchClient.createInstance(pluginConfig);
7169
this.seaTunnelRowSerializer = new EasysearchRowSerializer(indexInfo, seaTunnelRowType);
7270

7371
this.requestEzsList = new ArrayList<>(maxBatchSize);
7472
this.retryMaterial =
75-
new RetryMaterial(maxRetryCount, true, exception -> true, DEFAULT_SLEEP_TIME_MS);
73+
new RetryMaterial(
74+
pluginConfig.get(EasysearchSinkOptions.MAX_RETRY_COUNT),
75+
true,
76+
exception -> true,
77+
DEFAULT_SLEEP_TIME_MS);
7678
}
7779

7880
@Override

‎seatunnel-connectors-v2/connector-easysearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/easysearch/source/EasysearchSource.java

+16-69
Original file line numberDiff line numberDiff line change
@@ -17,108 +17,55 @@
1717

1818
package org.apache.seatunnel.connectors.seatunnel.easysearch.source;
1919

20-
import org.apache.seatunnel.shade.com.google.common.collect.Lists;
21-
import org.apache.seatunnel.shade.com.typesafe.config.Config;
22-
23-
import org.apache.seatunnel.api.common.PrepareFailException;
24-
import org.apache.seatunnel.api.options.ConnectorCommonOptions;
20+
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
2521
import org.apache.seatunnel.api.source.Boundedness;
2622
import org.apache.seatunnel.api.source.SeaTunnelSource;
2723
import org.apache.seatunnel.api.source.SourceReader;
2824
import org.apache.seatunnel.api.source.SourceSplitEnumerator;
2925
import org.apache.seatunnel.api.source.SupportColumnProjection;
3026
import org.apache.seatunnel.api.source.SupportParallelism;
31-
import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
32-
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
27+
import org.apache.seatunnel.api.table.catalog.CatalogTable;
3328
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
34-
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
35-
import org.apache.seatunnel.connectors.seatunnel.easysearch.catalog.EasysearchDataTypeConvertor;
36-
import org.apache.seatunnel.connectors.seatunnel.easysearch.client.EasysearchClient;
37-
import org.apache.seatunnel.connectors.seatunnel.easysearch.config.SourceConfig;
38-
39-
import org.apache.commons.collections4.CollectionUtils;
40-
41-
import com.google.auto.service.AutoService;
4229

43-
import java.util.ArrayList;
44-
import java.util.Arrays;
30+
import java.util.Collections;
4531
import java.util.List;
46-
import java.util.Map;
4732

48-
@AutoService(SeaTunnelSource.class)
4933
public class EasysearchSource
5034
implements SeaTunnelSource<SeaTunnelRow, EasysearchSourceSplit, EasysearchSourceState>,
5135
SupportParallelism,
5236
SupportColumnProjection {
5337

54-
private Config pluginConfig;
38+
private final ReadonlyConfig pluginConfig;
39+
private final List<String> source;
40+
private final CatalogTable catalogTable;
5541

56-
private SeaTunnelRowType rowTypeInfo;
57-
58-
private List<String> source;
42+
public EasysearchSource(
43+
ReadonlyConfig pluginConfig, List<String> source, CatalogTable catalogTable) {
44+
this.pluginConfig = pluginConfig;
45+
this.source = source;
46+
this.catalogTable = catalogTable;
47+
}
5948

6049
@Override
6150
public String getPluginName() {
6251
return "Easysearch";
6352
}
6453

65-
@Override
66-
public void prepare(Config pluginConfig) throws PrepareFailException {
67-
this.pluginConfig = pluginConfig;
68-
if (pluginConfig.hasPath(ConnectorCommonOptions.SCHEMA.key())) {
69-
// todo: We need to remove the schema in EZS.
70-
rowTypeInfo = CatalogTableUtil.buildWithConfig(pluginConfig).getSeaTunnelRowType();
71-
source = Arrays.asList(rowTypeInfo.getFieldNames());
72-
} else {
73-
if (pluginConfig.hasPath(SourceConfig.SOURCE.key())) {
74-
source = pluginConfig.getStringList(SourceConfig.SOURCE.key());
75-
} else {
76-
source = Lists.newArrayList();
77-
}
78-
EasysearchClient ezsClient = EasysearchClient.createInstance(this.pluginConfig);
79-
Map<String, String> ezsFieldType =
80-
ezsClient.getFieldTypeMapping(
81-
pluginConfig.getString(SourceConfig.INDEX.key()), source);
82-
ezsClient.close();
83-
EasysearchDataTypeConvertor easySearchDataTypeConvertor =
84-
new EasysearchDataTypeConvertor();
85-
if (CollectionUtils.isEmpty(source)) {
86-
List<String> keys = new ArrayList<>(ezsFieldType.keySet());
87-
SeaTunnelDataType[] fieldTypes = new SeaTunnelDataType[keys.size()];
88-
for (int i = 0; i < keys.size(); i++) {
89-
String esType = ezsFieldType.get(keys.get(i));
90-
SeaTunnelDataType seaTunnelDataType =
91-
easySearchDataTypeConvertor.toSeaTunnelType(keys.get(i), esType);
92-
fieldTypes[i] = seaTunnelDataType;
93-
}
94-
rowTypeInfo = new SeaTunnelRowType(keys.toArray(new String[0]), fieldTypes);
95-
} else {
96-
SeaTunnelDataType[] fieldTypes = new SeaTunnelDataType[source.size()];
97-
for (int i = 0; i < source.size(); i++) {
98-
String esType = ezsFieldType.get(source.get(i));
99-
SeaTunnelDataType seaTunnelDataType =
100-
easySearchDataTypeConvertor.toSeaTunnelType(source.get(i), esType);
101-
fieldTypes[i] = seaTunnelDataType;
102-
}
103-
rowTypeInfo = new SeaTunnelRowType(source.toArray(new String[0]), fieldTypes);
104-
}
105-
}
106-
}
107-
10854
@Override
10955
public Boundedness getBoundedness() {
11056
return Boundedness.BOUNDED;
11157
}
11258

11359
@Override
114-
public SeaTunnelDataType<SeaTunnelRow> getProducedType() {
115-
return this.rowTypeInfo;
60+
public List<CatalogTable> getProducedCatalogTables() {
61+
return Collections.singletonList(catalogTable);
11662
}
11763

11864
@Override
11965
public SourceReader<SeaTunnelRow, EasysearchSourceSplit> createReader(
12066
SourceReader.Context readerContext) {
121-
return new EasysearchSourceReader(readerContext, pluginConfig, rowTypeInfo);
67+
return new EasysearchSourceReader(
68+
readerContext, pluginConfig, catalogTable.getSeaTunnelRowType());
12269
}
12370

12471
@Override

‎seatunnel-connectors-v2/connector-easysearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/easysearch/source/EasysearchSourceFactory.java

+110-27
Original file line numberDiff line numberDiff line change
@@ -17,28 +17,37 @@
1717

1818
package org.apache.seatunnel.connectors.seatunnel.easysearch.source;
1919

20+
import org.apache.seatunnel.shade.com.google.common.collect.Lists;
21+
22+
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
2023
import org.apache.seatunnel.api.configuration.util.OptionRule;
2124
import org.apache.seatunnel.api.options.ConnectorCommonOptions;
2225
import org.apache.seatunnel.api.source.SeaTunnelSource;
26+
import org.apache.seatunnel.api.source.SourceSplit;
27+
import org.apache.seatunnel.api.table.catalog.CatalogTable;
28+
import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
29+
import org.apache.seatunnel.api.table.catalog.Column;
30+
import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
31+
import org.apache.seatunnel.api.table.catalog.TableIdentifier;
32+
import org.apache.seatunnel.api.table.catalog.TableSchema;
33+
import org.apache.seatunnel.api.table.connector.TableSource;
2334
import org.apache.seatunnel.api.table.factory.Factory;
2435
import org.apache.seatunnel.api.table.factory.TableSourceFactory;
36+
import org.apache.seatunnel.api.table.factory.TableSourceFactoryContext;
37+
import org.apache.seatunnel.connectors.seatunnel.easysearch.catalog.EasysearchDataTypeConvertor;
38+
import org.apache.seatunnel.connectors.seatunnel.easysearch.client.EasysearchClient;
39+
import org.apache.seatunnel.connectors.seatunnel.easysearch.config.EasysearchSourceOptions;
40+
41+
import org.apache.commons.collections4.CollectionUtils;
2542

2643
import com.google.auto.service.AutoService;
2744

28-
import static org.apache.seatunnel.connectors.seatunnel.easysearch.config.EzsClusterConnectionConfig.HOSTS;
29-
import static org.apache.seatunnel.connectors.seatunnel.easysearch.config.EzsClusterConnectionConfig.PASSWORD;
30-
import static org.apache.seatunnel.connectors.seatunnel.easysearch.config.EzsClusterConnectionConfig.TLS_KEY_STORE_PASSWORD;
31-
import static org.apache.seatunnel.connectors.seatunnel.easysearch.config.EzsClusterConnectionConfig.TLS_KEY_STORE_PATH;
32-
import static org.apache.seatunnel.connectors.seatunnel.easysearch.config.EzsClusterConnectionConfig.TLS_TRUST_STORE_PASSWORD;
33-
import static org.apache.seatunnel.connectors.seatunnel.easysearch.config.EzsClusterConnectionConfig.TLS_TRUST_STORE_PATH;
34-
import static org.apache.seatunnel.connectors.seatunnel.easysearch.config.EzsClusterConnectionConfig.TLS_VERIFY_CERTIFICATE;
35-
import static org.apache.seatunnel.connectors.seatunnel.easysearch.config.EzsClusterConnectionConfig.TLS_VERIFY_HOSTNAME;
36-
import static org.apache.seatunnel.connectors.seatunnel.easysearch.config.EzsClusterConnectionConfig.USERNAME;
37-
import static org.apache.seatunnel.connectors.seatunnel.easysearch.config.SourceConfig.INDEX;
38-
import static org.apache.seatunnel.connectors.seatunnel.easysearch.config.SourceConfig.QUERY;
39-
import static org.apache.seatunnel.connectors.seatunnel.easysearch.config.SourceConfig.SCROLL_SIZE;
40-
import static org.apache.seatunnel.connectors.seatunnel.easysearch.config.SourceConfig.SCROLL_TIME;
41-
import static org.apache.seatunnel.connectors.seatunnel.easysearch.config.SourceConfig.SOURCE;
45+
import java.io.Serializable;
46+
import java.util.ArrayList;
47+
import java.util.Arrays;
48+
import java.util.Collections;
49+
import java.util.List;
50+
import java.util.Map;
4251

4352
@AutoService(Factory.class)
4453
public class EasysearchSourceFactory implements TableSourceFactory {
@@ -50,23 +59,97 @@ public String factoryIdentifier() {
5059
@Override
5160
public OptionRule optionRule() {
5261
return OptionRule.builder()
53-
.required(HOSTS, INDEX)
62+
.required(EasysearchSourceOptions.HOSTS, EasysearchSourceOptions.INDEX)
5463
.optional(
55-
USERNAME,
56-
PASSWORD,
57-
SCROLL_TIME,
58-
SCROLL_SIZE,
59-
QUERY,
60-
TLS_VERIFY_CERTIFICATE,
61-
TLS_VERIFY_HOSTNAME,
62-
TLS_KEY_STORE_PATH,
63-
TLS_KEY_STORE_PASSWORD,
64-
TLS_TRUST_STORE_PATH,
65-
TLS_TRUST_STORE_PASSWORD)
66-
.exclusive(SOURCE, ConnectorCommonOptions.SCHEMA)
64+
EasysearchSourceOptions.USERNAME,
65+
EasysearchSourceOptions.PASSWORD,
66+
EasysearchSourceOptions.SCROLL_TIME,
67+
EasysearchSourceOptions.SCROLL_SIZE,
68+
EasysearchSourceOptions.QUERY,
69+
EasysearchSourceOptions.TLS_VERIFY_CERTIFICATE,
70+
EasysearchSourceOptions.TLS_VERIFY_HOSTNAME,
71+
EasysearchSourceOptions.TLS_KEY_STORE_PATH,
72+
EasysearchSourceOptions.TLS_KEY_STORE_PASSWORD,
73+
EasysearchSourceOptions.TLS_TRUST_STORE_PATH,
74+
EasysearchSourceOptions.TLS_TRUST_STORE_PASSWORD)
75+
.exclusive(EasysearchSourceOptions.SOURCE, ConnectorCommonOptions.SCHEMA)
6776
.build();
6877
}
6978

79+
@Override
80+
public <T, SplitT extends SourceSplit, StateT extends Serializable>
81+
TableSource<T, SplitT, StateT> createSource(TableSourceFactoryContext context) {
82+
ReadonlyConfig contextOptions = context.getOptions();
83+
List<String> source;
84+
CatalogTable catalogTable;
85+
if (contextOptions.getOptional(ConnectorCommonOptions.SCHEMA).isPresent()) {
86+
// todo: We need to remove the schema in EZS.
87+
catalogTable = CatalogTableUtil.buildWithConfig(contextOptions);
88+
source =
89+
Arrays.asList(
90+
CatalogTableUtil.buildWithConfig(contextOptions)
91+
.getSeaTunnelRowType()
92+
.getFieldNames());
93+
} else {
94+
if (contextOptions.getOptional(EasysearchSourceOptions.SOURCE).isPresent()) {
95+
source = contextOptions.get(EasysearchSourceOptions.SOURCE);
96+
} else {
97+
source = Lists.newArrayList();
98+
}
99+
EasysearchClient ezsClient = EasysearchClient.createInstance(contextOptions);
100+
Map<String, String> ezsFieldType =
101+
ezsClient.getFieldTypeMapping(
102+
contextOptions.get(EasysearchSourceOptions.INDEX), source);
103+
ezsClient.close();
104+
EasysearchDataTypeConvertor easySearchDataTypeConvertor =
105+
new EasysearchDataTypeConvertor();
106+
List<Column> columns = new ArrayList<>();
107+
if (CollectionUtils.isEmpty(source)) {
108+
List<String> keys = new ArrayList<>(ezsFieldType.keySet());
109+
for (int i = 0; i < keys.size(); i++) {
110+
String esType = ezsFieldType.get(keys.get(i));
111+
PhysicalColumn physicalColumn =
112+
PhysicalColumn.of(
113+
keys.get(i),
114+
easySearchDataTypeConvertor.toSeaTunnelType(
115+
keys.get(i), esType),
116+
null,
117+
null,
118+
true,
119+
null,
120+
null);
121+
columns.add(physicalColumn);
122+
}
123+
} else {
124+
for (int i = 0; i < source.size(); i++) {
125+
String esType = ezsFieldType.get(source.get(i));
126+
PhysicalColumn physicalColumn =
127+
PhysicalColumn.of(
128+
source.get(i),
129+
easySearchDataTypeConvertor.toSeaTunnelType(
130+
source.get(i), esType),
131+
null,
132+
null,
133+
true,
134+
null,
135+
null);
136+
columns.add(physicalColumn);
137+
}
138+
}
139+
catalogTable =
140+
CatalogTable.of(
141+
TableIdentifier.of("default", "default", "default"),
142+
TableSchema.builder().columns(columns).build(),
143+
Collections.emptyMap(),
144+
Collections.emptyList(),
145+
"");
146+
}
147+
148+
return () ->
149+
(SeaTunnelSource<T, SplitT, StateT>)
150+
new EasysearchSource(contextOptions, source, catalogTable);
151+
}
152+
70153
@Override
71154
public Class<? extends SeaTunnelSource> getSourceClass() {
72155
return EasysearchSource.class;

‎seatunnel-connectors-v2/connector-easysearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/easysearch/source/EasysearchSourceReader.java

+5-4
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,7 @@
1717

1818
package org.apache.seatunnel.connectors.seatunnel.easysearch.source;
1919

20-
import org.apache.seatunnel.shade.com.typesafe.config.Config;
21-
20+
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
2221
import org.apache.seatunnel.api.source.Collector;
2322
import org.apache.seatunnel.api.source.SourceReader;
2423
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
@@ -44,14 +43,16 @@ public class EasysearchSourceReader implements SourceReader<SeaTunnelRow, Easyse
4443

4544
private final SeaTunnelRowDeserializer deserializer;
4645
private final long pollNextWaitTime = 1000L;
47-
private final Config pluginConfig;
46+
private final ReadonlyConfig pluginConfig;
4847
SourceReader.Context context;
4948
Deque<EasysearchSourceSplit> splits = new LinkedList<>();
5049
boolean noMoreSplit;
5150
private EasysearchClient ezsClient;
5251

5352
public EasysearchSourceReader(
54-
SourceReader.Context context, Config pluginConfig, SeaTunnelRowType rowTypeInfo) {
53+
SourceReader.Context context,
54+
ReadonlyConfig pluginConfig,
55+
SeaTunnelRowType rowTypeInfo) {
5556
this.context = context;
5657
this.pluginConfig = pluginConfig;
5758
this.deserializer = new DefaultSeaTunnelRowDeserializer(rowTypeInfo);

‎seatunnel-connectors-v2/connector-easysearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/easysearch/source/EasysearchSourceSplitEnumerator.java

+10-19
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,10 @@
1717

1818
package org.apache.seatunnel.connectors.seatunnel.easysearch.source;
1919

20-
import org.apache.seatunnel.shade.com.typesafe.config.Config;
21-
20+
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
2221
import org.apache.seatunnel.api.source.SourceSplitEnumerator;
2322
import org.apache.seatunnel.connectors.seatunnel.easysearch.client.EasysearchClient;
24-
import org.apache.seatunnel.connectors.seatunnel.easysearch.config.SourceConfig;
23+
import org.apache.seatunnel.connectors.seatunnel.easysearch.config.EasysearchSourceOptions;
2524
import org.apache.seatunnel.connectors.seatunnel.easysearch.dto.source.IndexDocsCount;
2625
import org.apache.seatunnel.connectors.seatunnel.easysearch.dto.source.SourceIndexInfo;
2726
import org.apache.seatunnel.connectors.seatunnel.easysearch.exception.EasysearchConnectorException;
@@ -47,23 +46,23 @@ public class EasysearchSourceSplitEnumerator
4746

4847
private final Object stateLock = new Object();
4948
private final SourceSplitEnumerator.Context<EasysearchSourceSplit> context;
50-
private final Config pluginConfig;
49+
private final ReadonlyConfig pluginConfig;
5150
private final Map<Integer, List<EasysearchSourceSplit>> pendingSplit;
5251
private final List<String> source;
5352
private EasysearchClient ezsClient;
5453
private volatile boolean shouldEnumerate;
5554

5655
public EasysearchSourceSplitEnumerator(
5756
SourceSplitEnumerator.Context<EasysearchSourceSplit> context,
58-
Config pluginConfig,
57+
ReadonlyConfig pluginConfig,
5958
List<String> source) {
6059
this(context, null, pluginConfig, source);
6160
}
6261

6362
public EasysearchSourceSplitEnumerator(
6463
SourceSplitEnumerator.Context<EasysearchSourceSplit> context,
6564
EasysearchSourceState sourceState,
66-
Config pluginConfig,
65+
ReadonlyConfig pluginConfig,
6766
List<String> source) {
6867
this.context = context;
6968
this.pluginConfig = pluginConfig;
@@ -136,21 +135,13 @@ private void assignSplit(Collection<Integer> readers) {
136135

137136
private List<EasysearchSourceSplit> getEasysearchSplit() {
138137
List<EasysearchSourceSplit> splits = new ArrayList<>();
139-
String scrollTime = SourceConfig.SCROLL_TIME.defaultValue();
140-
if (pluginConfig.hasPath(SourceConfig.SCROLL_TIME.key())) {
141-
scrollTime = pluginConfig.getString(SourceConfig.SCROLL_TIME.key());
142-
}
143-
int scrollSize = SourceConfig.SCROLL_SIZE.defaultValue();
144-
if (pluginConfig.hasPath(SourceConfig.SCROLL_SIZE.key())) {
145-
scrollSize = pluginConfig.getInt(SourceConfig.SCROLL_SIZE.key());
146-
}
147-
Map query = SourceConfig.QUERY.defaultValue();
148-
if (pluginConfig.hasPath(SourceConfig.QUERY.key())) {
149-
query = (Map) pluginConfig.getAnyRef(SourceConfig.QUERY.key());
150-
}
138+
String scrollTime = pluginConfig.get(EasysearchSourceOptions.SCROLL_TIME);
139+
140+
int scrollSize = pluginConfig.get(EasysearchSourceOptions.SCROLL_SIZE);
141+
Map query = pluginConfig.get(EasysearchSourceOptions.QUERY);
151142

152143
List<IndexDocsCount> indexDocsCounts =
153-
ezsClient.getIndexDocsCount(pluginConfig.getString(SourceConfig.INDEX.key()));
144+
ezsClient.getIndexDocsCount(pluginConfig.get(EasysearchSourceOptions.INDEX));
154145
indexDocsCounts =
155146
indexDocsCounts.stream()
156147
.filter(x -> x.getDocsCount() != null && x.getDocsCount() > 0)

‎seatunnel-e2e/seatunnel-connector-v2-e2e/connector-easysearch-e2e/src/test/java/org/apache/seatunnel/e2e/connector/easysearch/EasysearchIT.java

+3-4
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,8 @@
2121
import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.JsonNode;
2222
import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.ObjectMapper;
2323
import org.apache.seatunnel.shade.com.google.common.collect.Lists;
24-
import org.apache.seatunnel.shade.com.typesafe.config.Config;
25-
import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
2624

25+
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
2726
import org.apache.seatunnel.api.table.catalog.Catalog;
2827
import org.apache.seatunnel.api.table.catalog.TablePath;
2928
import org.apache.seatunnel.api.table.catalog.exception.CatalogException;
@@ -84,7 +83,7 @@ public class EasysearchIT extends TestSuiteBase implements TestResource {
8483

8584
private EasysearchClient easysearchClient;
8685

87-
private Config easysearchConfig;
86+
private ReadonlyConfig easysearchConfig;
8887

8988
private Catalog catalog;
9089

@@ -126,7 +125,7 @@ private void initConnection() {
126125
config.put("tls_verify_certificate", false);
127126
config.put("tls_verify_hostname", false);
128127

129-
easysearchConfig = ConfigFactory.parseMap(config);
128+
easysearchConfig = ReadonlyConfig.fromMap(config);
130129

131130
easysearchClient = EasysearchClient.createInstance(easysearchConfig);
132131
catalog = new EasysearchCatalog("easysearch", "default", easysearchConfig);

0 commit comments

Comments
 (0)
Please sign in to comment.