Skip to content

Commit 679f9c5

Browse files
committed
Convert DispatchManager.getQueries to return a stream
This avoid materialization of potentially large lists
1 parent 5bef02b commit 679f9c5

File tree

8 files changed

+39
-51
lines changed

8 files changed

+39
-51
lines changed

core/trino-main/src/main/java/io/trino/connector/system/QuerySystemTable.java

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
import io.trino.execution.QueryInfo;
2121
import io.trino.execution.QueryStats;
2222
import io.trino.security.AccessControl;
23-
import io.trino.server.BasicQueryInfo;
2423
import io.trino.spi.ErrorCode;
2524
import io.trino.spi.block.Block;
2625
import io.trino.spi.block.BlockBuilder;
@@ -104,15 +103,11 @@ public ConnectorTableMetadata getTableMetadata()
104103
public RecordCursor cursor(ConnectorTransactionHandle transactionHandle, ConnectorSession session, TupleDomain<Integer> constraint)
105104
{
106105
checkState(dispatchManager.isPresent(), "Query system table can return results only on coordinator");
107-
108-
List<BasicQueryInfo> queries = dispatchManager.get().getQueries();
109-
queries = filterQueries(((FullConnectorSession) session).getSession().getIdentity(), queries, accessControl);
110-
111106
Builder table = InMemoryRecordSet.builder(QUERY_TABLE);
112-
for (BasicQueryInfo queryInfo : queries) {
107+
filterQueries(dispatchManager.get()::getQueries, ((FullConnectorSession) session).getSession().getIdentity(), accessControl).forEach(queryInfo -> {
113108
Optional<QueryInfo> fullQueryInfo = dispatchManager.get().getFullQueryInfo(queryInfo.getQueryId());
114109
if (fullQueryInfo.isEmpty()) {
115-
continue;
110+
return;
116111
}
117112
QueryStats queryStats = fullQueryInfo.get().getQueryStats();
118113
table.addRow(
@@ -134,7 +129,7 @@ public RecordCursor cursor(ConnectorTransactionHandle transactionHandle, Connect
134129

135130
Optional.ofNullable(queryInfo.getErrorType()).map(Enum::name).orElse(null),
136131
Optional.ofNullable(queryInfo.getErrorCode()).map(ErrorCode::getName).orElse(null));
137-
}
132+
});
138133
return table.build().cursor();
139134
}
140135

core/trino-main/src/main/java/io/trino/dispatcher/DispatchManager.java

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -51,13 +51,12 @@
5151
import org.weakref.jmx.Managed;
5252
import org.weakref.jmx.Nested;
5353

54-
import java.util.List;
5554
import java.util.Optional;
5655
import java.util.concurrent.Executor;
5756
import java.util.concurrent.ScheduledExecutorService;
57+
import java.util.stream.Stream;
5858

5959
import static com.google.common.base.Preconditions.checkArgument;
60-
import static com.google.common.collect.ImmutableList.toImmutableList;
6160
import static io.airlift.concurrent.Threads.daemonThreadsNamed;
6261
import static io.trino.execution.QueryState.FINISHING;
6362
import static io.trino.execution.QueryState.QUEUED;
@@ -311,11 +310,10 @@ public ListenableFuture<Void> waitForDispatched(QueryId queryId)
311310
.orElseGet(Futures::immediateVoidFuture);
312311
}
313312

314-
public List<BasicQueryInfo> getQueries()
313+
public Stream<BasicQueryInfo> getQueries()
315314
{
316315
return queryTracker.getAllQueries()
317-
.map(DispatchQuery::getBasicQueryInfo)
318-
.collect(toImmutableList());
316+
.map(DispatchQuery::getBasicQueryInfo);
319317
}
320318

321319
@Managed

core/trino-main/src/main/java/io/trino/security/AccessControlUtil.java

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -13,12 +13,13 @@
1313
*/
1414
package io.trino.security;
1515

16+
import com.google.common.base.Supplier;
1617
import io.trino.SessionRepresentation;
1718
import io.trino.server.BasicQueryInfo;
1819
import io.trino.spi.security.Identity;
1920

2021
import java.util.Collection;
21-
import java.util.List;
22+
import java.util.stream.Stream;
2223

2324
import static com.google.common.collect.ImmutableList.toImmutableList;
2425

@@ -34,22 +35,23 @@ public static void checkCanViewQueryOwnedBy(Identity identity, Identity queryOwn
3435
accessControl.checkCanViewQueryOwnedBy(identity, queryOwner);
3536
}
3637

37-
public static List<BasicQueryInfo> filterQueries(Identity identity, List<BasicQueryInfo> queries, AccessControl accessControl)
38+
public static Stream<BasicQueryInfo> filterQueries(Supplier<Stream<BasicQueryInfo>> queries, Identity identity, AccessControl accessControl)
3839
{
39-
Collection<Identity> owners = queries.stream()
40+
Collection<Identity> owners = queries
41+
.get()
4042
.map(BasicQueryInfo::getSession)
4143
.map(SessionRepresentation::toIdentity)
4244
.filter(owner -> !owner.getUser().equals(identity.getUser()))
4345
.distinct()
4446
.collect(toImmutableList());
4547
Collection<Identity> allowedOwners = accessControl.filterQueriesOwnedBy(identity, owners);
4648

47-
return queries.stream()
49+
return queries
50+
.get()
4851
.filter(queryInfo -> {
4952
Identity queryIdentity = queryInfo.getSession().toIdentity();
5053
return queryIdentity.getUser().equals(identity.getUser()) || allowedOwners.contains(queryIdentity);
51-
})
52-
.collect(toImmutableList());
54+
});
5355
}
5456

5557
public static void checkCanKillQueryOwnedBy(Identity identity, Identity queryOwner, AccessControl accessControl)

core/trino-main/src/main/java/io/trino/server/QueryResource.java

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -36,13 +36,12 @@
3636
import jakarta.ws.rs.core.Response;
3737
import jakarta.ws.rs.core.Response.Status;
3838

39-
import java.util.List;
4039
import java.util.Locale;
4140
import java.util.NoSuchElementException;
4241
import java.util.Optional;
4342
import java.util.Set;
43+
import java.util.stream.Stream;
4444

45-
import static com.google.common.collect.ImmutableList.toImmutableList;
4645
import static com.google.common.collect.ImmutableSet.toImmutableSet;
4746
import static io.trino.connector.system.KillQueryProcedure.createKillQueryException;
4847
import static io.trino.connector.system.KillQueryProcedure.createPreemptQueryException;
@@ -73,22 +72,18 @@ public QueryResource(DispatchManager dispatchManager, AccessControl accessContro
7372
}
7473

7574
@GET
76-
public List<BasicQueryInfo> getAllQueryInfo(@QueryParam("state") Set<String> stateFilters, @Context HttpServletRequest servletRequest, @Context HttpHeaders httpHeaders)
75+
public Stream<BasicQueryInfo> getAllQueryInfo(@QueryParam("state") Set<String> stateFilters, @Context HttpServletRequest servletRequest, @Context HttpHeaders httpHeaders)
7776
{
7877
Set<QueryState> expectedStates = stateFilters.stream()
7978
.map(state -> state.toUpperCase(Locale.ENGLISH))
8079
.map(QueryState::valueOf)
8180
.collect(toImmutableSet());
8281

83-
List<BasicQueryInfo> queries = dispatchManager.getQueries();
84-
queries = filterQueries(sessionContextFactory.extractAuthorizedIdentity(servletRequest, httpHeaders), queries, accessControl);
85-
82+
Stream<BasicQueryInfo> queries = filterQueries(dispatchManager::getQueries, sessionContextFactory.extractAuthorizedIdentity(servletRequest, httpHeaders), accessControl);
8683
if (expectedStates.isEmpty()) {
8784
return queries;
8885
}
89-
return queries.stream()
90-
.filter(query -> expectedStates.contains(query.getState()))
91-
.collect(toImmutableList());
86+
return queries.filter(query -> expectedStates.contains(query.getState()));
9287
}
9388

9489
@GET

core/trino-main/src/main/java/io/trino/server/ui/ClusterStatsResource.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@
3030
import jakarta.ws.rs.Produces;
3131
import jakarta.ws.rs.core.MediaType;
3232

33+
import java.util.Iterator;
34+
3335
import static com.google.common.math.DoubleMath.roundToLong;
3436
import static io.trino.server.security.ResourceSecurity.AccessType.WEB_UI;
3537
import static java.math.RoundingMode.HALF_UP;
@@ -77,7 +79,9 @@ public ClusterStats getClusterStats()
7779
long totalInputBytes = dispatchManager.getStats().getConsumedInputBytes().getTotalCount();
7880
long totalCpuTimeSecs = dispatchManager.getStats().getConsumedCpuTimeSecs().getTotalCount();
7981

80-
for (BasicQueryInfo query : dispatchManager.getQueries()) {
82+
Iterator<BasicQueryInfo> iterator = dispatchManager.getQueries().iterator();
83+
while (iterator.hasNext()) {
84+
BasicQueryInfo query = iterator.next();
8185
if (query.getState() == QueryState.QUEUED) {
8286
queuedQueries++;
8387
}

core/trino-main/src/main/java/io/trino/server/ui/UiQueryResource.java

Lines changed: 7 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616
import com.fasterxml.jackson.annotation.JsonTypeInfo;
1717
import com.fasterxml.jackson.databind.ObjectMapper;
1818
import com.fasterxml.jackson.databind.cfg.ContextAttributes;
19-
import com.google.common.collect.ImmutableList;
2019
import com.google.inject.Inject;
2120
import io.airlift.json.JsonCodec;
2221
import io.airlift.json.JsonCodecFactory;
@@ -47,10 +46,10 @@
4746
import jakarta.ws.rs.core.Response;
4847
import jakarta.ws.rs.core.Response.Status;
4948

50-
import java.util.List;
5149
import java.util.Locale;
5250
import java.util.NoSuchElementException;
5351
import java.util.Optional;
52+
import java.util.stream.Stream;
5453

5554
import static com.fasterxml.jackson.annotation.JsonIgnoreProperties.Value.forIgnoredProperties;
5655
import static io.trino.connector.system.KillQueryProcedure.createKillQueryException;
@@ -86,20 +85,16 @@ public UiQueryResource(ObjectMapper objectMapper, DispatchManager dispatchManage
8685
}
8786

8887
@GET
89-
public List<TrimmedBasicQueryInfo> getAllQueryInfo(@QueryParam("state") String stateFilter, @Context HttpServletRequest servletRequest, @Context HttpHeaders httpHeaders)
88+
public Stream<TrimmedBasicQueryInfo> getAllQueryInfo(@QueryParam("state") String stateFilter, @Context HttpServletRequest servletRequest, @Context HttpHeaders httpHeaders)
9089
{
9190
QueryState expectedState = stateFilter == null ? null : QueryState.valueOf(stateFilter.toUpperCase(Locale.ENGLISH));
92-
93-
List<BasicQueryInfo> queries = dispatchManager.getQueries();
94-
queries = filterQueries(sessionContextFactory.extractAuthorizedIdentity(servletRequest, httpHeaders), queries, accessControl);
95-
96-
ImmutableList.Builder<TrimmedBasicQueryInfo> builder = ImmutableList.builder();
97-
for (BasicQueryInfo queryInfo : queries) {
91+
Stream<BasicQueryInfo> queries = filterQueries(dispatchManager::getQueries, sessionContextFactory.extractAuthorizedIdentity(servletRequest, httpHeaders), accessControl);
92+
return queries.flatMap(queryInfo -> {
9893
if (stateFilter == null || queryInfo.getState() == expectedState) {
99-
builder.add(new TrimmedBasicQueryInfo(queryInfo));
94+
return Stream.of(new TrimmedBasicQueryInfo(queryInfo));
10095
}
101-
}
102-
return builder.build();
96+
return Stream.empty();
97+
});
10398
}
10499

105100
@GET

testing/trino-tests/src/test/java/io/trino/execution/QueryRunnerUtil.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717
import io.opentelemetry.api.trace.Span;
1818
import io.trino.Session;
1919
import io.trino.dispatcher.DispatchManager;
20-
import io.trino.server.BasicQueryInfo;
2120
import io.trino.server.SessionContext;
2221
import io.trino.server.protocol.Slug;
2322
import io.trino.spi.QueryId;
@@ -55,13 +54,14 @@ public static void waitForQueryState(QueryRunner queryRunner, QueryId queryId, S
5554
throws InterruptedException
5655
{
5756
DispatchManager dispatchManager = queryRunner.getCoordinator().getDispatchManager();
57+
QueryManager queryManager = queryRunner.getCoordinator().getQueryManager();
5858
do {
5959
// Heartbeat all the running queries, so they don't die while we're waiting
60-
for (BasicQueryInfo queryInfo : dispatchManager.getQueries()) {
60+
dispatchManager.getQueries().forEach(queryInfo -> {
6161
if (queryInfo.getState() == RUNNING) {
62-
dispatchManager.getQueryInfo(queryInfo.getQueryId());
62+
queryManager.recordHeartbeat(queryInfo.getQueryId());
6363
}
64-
}
64+
});
6565
MILLISECONDS.sleep(100);
6666
}
6767
while (!expectedQueryStates.contains(dispatchManager.getQueryInfo(queryId).getState()));

testing/trino-tests/src/test/java/io/trino/security/TestSessionImpersonation.java

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -139,12 +139,11 @@ private void assertSessionUsersAndGroups(
139139

140140
private BasicQueryInfo getQueryInfo(String query)
141141
{
142-
QueryId queryId = null;
143-
for (BasicQueryInfo basicQueryInfo : server.getDispatchManager().getQueries()) {
144-
if (basicQueryInfo.getQuery().equals(query)) {
145-
queryId = basicQueryInfo.getQueryId();
146-
}
147-
}
142+
QueryId queryId = server.getDispatchManager().getQueries()
143+
.filter(basicQueryInfo -> basicQueryInfo.getQuery().equals(query))
144+
.map(BasicQueryInfo::getQueryId)
145+
.findFirst()
146+
.orElseThrow(() -> new IllegalStateException("Could not find query"));
148147
return server.getDispatchManager().getQueryInfo(queryId);
149148
}
150149

0 commit comments

Comments
 (0)