Skip to content

Commit 1746e89

Browse files
committed
[FLINK-38516][Table SQL/Gateway] Add config for read-only sql gateway
1 parent 58035ea commit 1746e89

File tree

4 files changed

+57
-0
lines changed

4 files changed

+57
-0
lines changed

docs/content/docs/dev/table/sql-gateway/overview.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -250,6 +250,12 @@ $ ./sql-gateway -Dkey=value
250250
<td>Duration</td>
251251
<td>Keepalive time for an idle worker thread. When the number of workers exceeds min workers, excessive threads are killed after this time interval.</td>
252252
</tr>
253+
<tr>
254+
<td><h5>sql-gateway.read-only</h5></td>
255+
<td style="word-wrap: break-word;">5 min</td>
256+
<td>Duration</td>
257+
<td>When enabled, the SQL Gateway operates in read-only mode and will reject all modify operations.</td>
258+
</tr>
253259
<tr>
254260
<td><h5>sql-gateway.worker.threads.max</h5></td>
255261
<td style="word-wrap: break-word;">500</td>

flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/config/SqlGatewayServiceConfigOptions.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,4 +97,11 @@ public class SqlGatewayServiceConfigOptions {
9797
.withDescription(
9898
"Keepalive time for an idle worker thread. When the number of workers exceeds min workers, "
9999
+ "excessive threads are killed after this time interval.");
100+
101+
public static final ConfigOption<Boolean> SQL_GATEWAY_READ_ONLY_MODE =
102+
key("sql-gateway.read-only")
103+
.booleanType()
104+
.defaultValue(false)
105+
.withDescription(
106+
"When enabled, the SQL Gateway operates in read-only mode and will reject all modify operations.");
100107
}

flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationExecutor.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@
6060
import org.apache.flink.table.factories.PlannerFactoryUtil;
6161
import org.apache.flink.table.functions.FunctionDefinition;
6262
import org.apache.flink.table.functions.FunctionIdentifier;
63+
import org.apache.flink.table.gateway.api.config.SqlGatewayServiceConfigOptions;
6364
import org.apache.flink.table.gateway.api.operation.OperationHandle;
6465
import org.apache.flink.table.gateway.api.results.FunctionInfo;
6566
import org.apache.flink.table.gateway.api.results.TableInfo;
@@ -681,6 +682,17 @@ private ResultFetcher callModifyOperations(
681682
TableEnvironmentInternal tableEnv,
682683
OperationHandle handle,
683684
List<ModifyOperation> modifyOperations) {
685+
// Check if SQL Gateway is in read-only mode
686+
Configuration configuration = sessionContext.getSessionConf().clone();
687+
configuration.addAll(executionConfig);
688+
boolean isReadOnlyMode =
689+
configuration.get(SqlGatewayServiceConfigOptions.SQL_GATEWAY_READ_ONLY_MODE);
690+
691+
if (isReadOnlyMode) {
692+
throw new SqlExecutionException(
693+
"SQL Gateway is in read-only mode. Modify operations are not allowed.");
694+
}
695+
684696
TableResultInternal result = tableEnv.executeInternal(modifyOperations);
685697
// DeleteFromFilterOperation doesn't have a JobClient
686698
if (modifyOperations.size() == 1

flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/SqlGatewayServiceITCase.java

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,7 @@
110110
import static org.apache.flink.table.functions.FunctionKind.AGGREGATE;
111111
import static org.apache.flink.table.functions.FunctionKind.OTHER;
112112
import static org.apache.flink.table.functions.FunctionKind.SCALAR;
113+
import static org.apache.flink.table.gateway.api.config.SqlGatewayServiceConfigOptions.SQL_GATEWAY_READ_ONLY_MODE;
113114
import static org.apache.flink.table.gateway.api.results.ResultSet.ResultType.PAYLOAD;
114115
import static org.apache.flink.table.gateway.service.utils.SqlGatewayServiceTestUtil.awaitOperationTermination;
115116
import static org.apache.flink.table.gateway.service.utils.SqlGatewayServiceTestUtil.createInitializedSession;
@@ -1048,6 +1049,37 @@ void testGetOperationSchemaWhenOperationGetError() throws Exception {
10481049
.satisfies(anyCauseMatches(SqlGatewayException.class, msg)));
10491050
}
10501051

1052+
@Test
1053+
void testReadOnlyModeWhenOperationInsertError() {
1054+
Configuration config = new Configuration(MINI_CLUSTER.getClientConfiguration());
1055+
config.set(SQL_GATEWAY_READ_ONLY_MODE, true);
1056+
1057+
String pipelineName = "test-job";
1058+
config.set(PipelineOptions.NAME, pipelineName);
1059+
1060+
SessionHandle sessionHandle = service.openSession(defaultSessionEnvironment);
1061+
String sourceDdl = "CREATE TABLE source (a STRING) WITH ('connector'='datagen');";
1062+
String sinkDdl = "CREATE TABLE sink (a STRING) WITH ('connector'='blackhole');";
1063+
String selectSql = "SELECT * FROM source;";
1064+
1065+
service.executeStatement(sessionHandle, sourceDdl, -1, config);
1066+
service.executeStatement(sessionHandle, sinkDdl, -1, config);
1067+
service.executeStatement(sessionHandle, selectSql, -1, config);
1068+
1069+
OperationHandle operationHandle =
1070+
service.executeStatement(
1071+
sessionHandle,
1072+
String.format("INSERT INTO sink '%s';", selectSql),
1073+
-1,
1074+
config);
1075+
1076+
assertThatThrownBy(() -> fetchAllResults(service, sessionHandle, operationHandle))
1077+
.satisfies(
1078+
anyCauseMatches(
1079+
SqlExecutionException.class,
1080+
"SQL Gateway is in read-only mode. Modify operations are not allowed."));
1081+
}
1082+
10511083
// --------------------------------------------------------------------------------------------
10521084

10531085
private OperationHandle submitDefaultOperation(

0 commit comments

Comments
 (0)