Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -104,12 +104,8 @@ public ConnectorTableMetadata getTableMetadata()
public RecordCursor cursor(ConnectorTransactionHandle transactionHandle, ConnectorSession session, TupleDomain<Integer> constraint)
{
checkState(dispatchManager.isPresent(), "Query system table can return results only on coordinator");

List<BasicQueryInfo> queries = dispatchManager.get().getQueries();
queries = filterQueries(((FullConnectorSession) session).getSession().getIdentity(), queries, accessControl);

Builder table = InMemoryRecordSet.builder(QUERY_TABLE);
for (BasicQueryInfo queryInfo : queries) {
for (BasicQueryInfo queryInfo : filterQueries(dispatchManager.get().getQueries(), ((FullConnectorSession) session).getSession().getIdentity(), accessControl)) {
Optional<QueryInfo> fullQueryInfo = dispatchManager.get().getFullQueryInfo(queryInfo.getQueryId());
if (fullQueryInfo.isEmpty()) {
continue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,7 @@ private void enforceMemoryLimits()
List<QueryExecution> runningQueries = queryTracker.getAllQueries().stream()
.filter(query -> query.getState() == RUNNING)
.collect(toImmutableList());
memoryManager.process(runningQueries, this::getQueries);
memoryManager.process(runningQueries, queryTracker::tryGetQuery);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,21 +14,19 @@
package io.trino.memory;

import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Maps;
import com.google.errorprone.annotations.ThreadSafe;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import io.airlift.log.Logger;
import io.trino.server.BasicQueryInfo;
import io.trino.execution.QueryExecution;
import io.trino.execution.QueryState;
import io.trino.spi.QueryId;
import it.unimi.dsi.fastutil.objects.Object2LongMap;

import java.time.Instant;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.Set;
import java.util.function.Supplier;
import java.util.function.Function;

import static com.google.common.collect.ImmutableMap.toImmutableMap;
import static io.trino.execution.QueryState.RUNNING;
import static java.time.Instant.now;
import static java.util.Objects.requireNonNull;
Expand All @@ -46,47 +44,48 @@ public class ClusterMemoryLeakDetector
private Set<QueryId> leakedQueries;

/**
* @param queryInfoSupplier All queries that the coordinator knows about.
* @param executionInfoSupplier Provided QueryId returns a QueryExecution if the query is still tracked by the coordinator.
* @param queryMemoryReservations The memory reservations of queries in the cluster memory pool.
*/
void checkForMemoryLeaks(Supplier<List<BasicQueryInfo>> queryInfoSupplier, Map<QueryId, Long> queryMemoryReservations)
void checkForMemoryLeaks(Function<QueryId, Optional<QueryExecution>> executionInfoSupplier, Object2LongMap<QueryId> queryMemoryReservations)
{
requireNonNull(queryInfoSupplier);
requireNonNull(queryMemoryReservations);

Map<QueryId, BasicQueryInfo> queryIdToInfo = Maps.uniqueIndex(queryInfoSupplier.get(), BasicQueryInfo::getQueryId);

Map<QueryId, Long> leakedQueryReservations = queryMemoryReservations.entrySet()
.stream()
.filter(entry -> entry.getValue() > 0)
.filter(entry -> isLeaked(queryIdToInfo, entry.getKey()))
.collect(toImmutableMap(Entry::getKey, Entry::getValue));
ImmutableSet.Builder<QueryId> leakedQueriesBuilder = ImmutableSet.builder();
queryMemoryReservations.forEach((queryId, reservation) -> {
if (reservation > 0) {
if (isLeaked(executionInfoSupplier.apply(queryId))) {
leakedQueriesBuilder.add(queryId);
}
}
});

Set<QueryId> leakedQueryReservations = leakedQueriesBuilder.build();
if (!leakedQueryReservations.isEmpty()) {
log.debug("Memory leak detected. The following queries are already finished, " +
"but they have memory reservations on some worker node(s): %s", leakedQueryReservations);
}

synchronized (this) {
leakedQueries = ImmutableSet.copyOf(leakedQueryReservations.keySet());
leakedQueries = ImmutableSet.copyOf(leakedQueryReservations);
}
}

private static boolean isLeaked(Map<QueryId, BasicQueryInfo> queryIdToInfo, QueryId queryId)
private static boolean isLeaked(Optional<QueryExecution> execution)
{
BasicQueryInfo queryInfo = queryIdToInfo.get(queryId);

if (queryInfo == null) {
if (execution.isEmpty()) {
// We have a memory reservation but query isn't tracked
return true;
}

Instant queryEndTime = queryInfo.getQueryStats().getEndTime();
Optional<Instant> queryEndTime = execution.orElseThrow().getEndTime();
QueryState state = execution.orElseThrow().getState();

if (queryInfo.getState() == RUNNING || queryEndTime == null) {
if (state == RUNNING || queryEndTime.isEmpty()) {
return false;
}

return queryEndTime.plusSeconds(DEFAULT_LEAK_CLAIM_DELTA_SEC).isBefore(now());
return queryEndTime.orElseThrow().plusSeconds(DEFAULT_LEAK_CLAIM_DELTA_SEC).isBefore(now());
}

synchronized boolean wasQueryPossiblyLeaked(QueryId queryId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Streams;
import com.google.common.io.Closer;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import com.google.inject.Inject;
Expand All @@ -30,6 +28,7 @@
import io.trino.execution.LocationFactory;
import io.trino.execution.QueryExecution;
import io.trino.execution.QueryInfo;
import io.trino.execution.QueryState;
import io.trino.execution.TaskId;
import io.trino.execution.TaskInfo;
import io.trino.execution.scheduler.NodeSchedulerConfig;
Expand All @@ -39,7 +38,6 @@
import io.trino.node.InternalNode;
import io.trino.node.InternalNodeManager;
import io.trino.operator.RetryPolicy;
import io.trino.server.BasicQueryInfo;
import io.trino.server.ServerConfig;
import io.trino.spi.QueryId;
import io.trino.spi.TrinoException;
Expand All @@ -51,6 +49,7 @@

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand All @@ -60,14 +59,13 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.function.Function;

import static com.google.common.base.Preconditions.checkState;
import static com.google.common.base.Verify.verify;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.collect.ImmutableMap.toImmutableMap;
import static com.google.common.collect.ImmutableSet.toImmutableSet;
import static com.google.common.collect.MoreCollectors.toOptional;
import static com.google.common.collect.Sets.difference;
import static io.airlift.concurrent.Threads.daemonThreadsNamed;
import static io.airlift.units.DataSize.succinctBytes;
Expand Down Expand Up @@ -174,18 +172,23 @@ public synchronized void addChangeListener(Consumer<MemoryPoolInfo> listener)
changeListeners.add(listener);
}

public synchronized void process(Iterable<QueryExecution> runningQueries, Supplier<List<BasicQueryInfo>> allQueryInfoSupplier)
public synchronized void process(Collection<QueryExecution> allQueries, Function<QueryId, Optional<QueryExecution>> executionInfoSupplier)
{
// TODO revocable memory reservations can also leak and may need to be detected in the future
// We are only concerned about the leaks in the memory pool.
memoryLeakDetector.checkForMemoryLeaks(allQueryInfoSupplier, pool.getQueryMemoryReservations());
memoryLeakDetector.checkForMemoryLeaks(executionInfoSupplier, pool.getQueryMemoryReservations());

boolean outOfMemory = isClusterOutOfMemory();

boolean queryKilled = false;
long totalUserMemoryBytes = 0L;
long totalMemoryBytes = 0L;
for (QueryExecution query : runningQueries) {
int queriesCount = 0;
for (QueryExecution query : allQueries) {
if (query.getState() != QueryState.RUNNING) {
continue;
}
queriesCount++;
boolean resourceOvercommit = resourceOvercommit(query.getSession());
long userMemoryReservation = query.getUserMemoryReservation().toBytes();
long totalMemoryReservation = query.getTotalMemoryReservation().toBytes();
Expand Down Expand Up @@ -226,20 +229,21 @@ public synchronized void process(Iterable<QueryExecution> runningQueries, Suppli

if (!lowMemoryKillers.isEmpty() && outOfMemory && !queryKilled) {
if (isLastKillTargetGone()) {
callOomKiller(runningQueries);
callOomKiller(allQueries, executionInfoSupplier);
}
else {
log.debug("Last killed target is still not gone: %s", lastKillTarget);
}
}

updateMemoryPool(Iterables.size(runningQueries));
updateMemoryPool(queriesCount);
updateNodes();
}

private synchronized void callOomKiller(Iterable<QueryExecution> runningQueries)
private synchronized void callOomKiller(Collection<QueryExecution> allQueries, Function<QueryId, Optional<QueryExecution>> queryExecutionSupplier)
{
List<RunningQueryInfo> runningQueryInfos = Streams.stream(runningQueries)
List<RunningQueryInfo> runningQueryInfos = allQueries
.stream()
.map(this::createQueryMemoryInfo)
.collect(toImmutableList());

Expand All @@ -257,7 +261,7 @@ private synchronized void callOomKiller(Iterable<QueryExecution> runningQueries)
if (killTarget.get().isWholeQuery()) {
QueryId queryId = killTarget.get().getQuery();
log.debug("Low memory killer chose %s", queryId);
Optional<QueryExecution> chosenQuery = findRunningQuery(runningQueries, killTarget.get().getQuery());
Optional<QueryExecution> chosenQuery = queryExecutionSupplier.apply(killTarget.get().getQuery());
if (chosenQuery.isPresent()) {
// See comments in isQueryGone for why chosenQuery might be absent.
chosenQuery.get().fail(new TrinoException(CLUSTER_OUT_OF_MEMORY, "Query killed because the cluster is out of memory. Please try again in a few minutes."));
Expand All @@ -271,7 +275,7 @@ private synchronized void callOomKiller(Iterable<QueryExecution> runningQueries)
log.debug("Low memory killer chose %s", tasks);
ImmutableSet.Builder<TaskId> killedTasksBuilder = ImmutableSet.builder();
for (TaskId task : tasks) {
Optional<QueryExecution> runningQuery = findRunningQuery(runningQueries, task.queryId());
Optional<QueryExecution> runningQuery = queryExecutionSupplier.apply(task.queryId());
if (runningQuery.isPresent()) {
runningQuery.get().failTask(task, new TrinoException(CLUSTER_OUT_OF_MEMORY, "Task killed because the cluster is out of memory."));
tasksKilledDueToOutOfMemory.incrementAndGet();
Expand Down Expand Up @@ -343,11 +347,6 @@ private Set<TaskId> getRunningTasks()
.collect(toImmutableSet());
}

private Optional<QueryExecution> findRunningQuery(Iterable<QueryExecution> runningQueries, QueryId queryId)
{
return Streams.stream(runningQueries).filter(query -> queryId.equals(query.getQueryId())).collect(toOptional());
}

private void logQueryKill(QueryId killedQueryId, Map<String, MemoryInfo> nodeMemoryInfosByNode)
{
if (!log.isInfoEnabled()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,17 @@
import io.trino.spi.QueryId;
import io.trino.spi.memory.MemoryAllocation;
import io.trino.spi.memory.MemoryPoolInfo;
import it.unimi.dsi.fastutil.objects.Object2LongMap;
import it.unimi.dsi.fastutil.objects.Object2LongMaps;
import it.unimi.dsi.fastutil.objects.Object2LongOpenHashMap;
import org.weakref.jmx.Managed;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import static com.google.common.base.MoreObjects.toStringHelper;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static java.util.Objects.requireNonNull;

@ThreadSafe
Expand All @@ -52,14 +55,11 @@ public class ClusterMemoryPool

// Does not include queries with zero memory usage
@GuardedBy("this")
private final Map<QueryId, Long> queryMemoryReservations = new HashMap<>();
private final Object2LongMap<QueryId> queryMemoryReservations = new Object2LongOpenHashMap<>();

@GuardedBy("this")
private final Map<QueryId, List<MemoryAllocation>> queryMemoryAllocations = new HashMap<>();

@GuardedBy("this")
private final Map<QueryId, Long> queryMemoryRevocableReservations = new HashMap<>();

public synchronized MemoryPoolInfo getInfo()
{
return new MemoryPoolInfo(
Expand All @@ -68,7 +68,6 @@ public synchronized MemoryPoolInfo getInfo()
reservedRevocableDistributedBytes,
ImmutableMap.copyOf(queryMemoryReservations),
ImmutableMap.copyOf(queryMemoryAllocations),
ImmutableMap.copyOf(queryMemoryRevocableReservations),
// not providing per-task memory info for cluster-wide pool
ImmutableMap.of(),
ImmutableMap.of());
Expand Down Expand Up @@ -116,14 +115,9 @@ public synchronized int getAssignedQueries()
return assignedQueries;
}

public synchronized Map<QueryId, Long> getQueryMemoryReservations()
{
return ImmutableMap.copyOf(queryMemoryReservations);
}

public synchronized Map<QueryId, Long> getQueryMemoryRevocableReservations()
public synchronized Object2LongMap<QueryId> getQueryMemoryReservations()
{
return ImmutableMap.copyOf(queryMemoryRevocableReservations);
return Object2LongMaps.unmodifiable(queryMemoryReservations);
}

public synchronized void update(List<MemoryInfo> memoryInfos, int assignedQueries)
Expand All @@ -136,7 +130,6 @@ public synchronized void update(List<MemoryInfo> memoryInfos, int assignedQuerie
this.assignedQueries = assignedQueries;
this.queryMemoryReservations.clear();
this.queryMemoryAllocations.clear();
this.queryMemoryRevocableReservations.clear();

for (MemoryInfo info : memoryInfos) {
MemoryPoolInfo poolInfo = info.getPool();
Expand All @@ -148,14 +141,11 @@ public synchronized void update(List<MemoryInfo> memoryInfos, int assignedQuerie
reservedDistributedBytes += poolInfo.getReservedBytes();
reservedRevocableDistributedBytes += poolInfo.getReservedRevocableBytes();
for (Map.Entry<QueryId, Long> entry : poolInfo.getQueryMemoryReservations().entrySet()) {
queryMemoryReservations.merge(entry.getKey(), entry.getValue(), Long::sum);
queryMemoryReservations.mergeLong(entry.getKey(), entry.getValue(), Long::sum);
}
for (Map.Entry<QueryId, List<MemoryAllocation>> entry : poolInfo.getQueryMemoryAllocations().entrySet()) {
queryMemoryAllocations.merge(entry.getKey(), entry.getValue(), this::mergeQueryAllocations);
}
for (Map.Entry<QueryId, Long> entry : poolInfo.getQueryMemoryRevocableReservations().entrySet()) {
queryMemoryRevocableReservations.merge(entry.getKey(), entry.getValue(), Long::sum);
}
}
}

Expand All @@ -164,20 +154,22 @@ private List<MemoryAllocation> mergeQueryAllocations(List<MemoryAllocation> left
requireNonNull(left, "left is null");
requireNonNull(right, "right is null");

Map<String, MemoryAllocation> mergedAllocations = new HashMap<>();
Object2LongMap<String> mergedAllocations = new Object2LongOpenHashMap<>();

for (MemoryAllocation allocation : left) {
mergedAllocations.put(allocation.getTag(), allocation);
mergedAllocations.put(allocation.tag(), allocation.allocation());
}

for (MemoryAllocation allocation : right) {
mergedAllocations.merge(
allocation.getTag(),
allocation,
(a, b) -> new MemoryAllocation(a.getTag(), a.getAllocation() + b.getAllocation()));
mergedAllocations.mergeLong(
allocation.tag(),
allocation.allocation(),
Long::sum);
}

return new ArrayList<>(mergedAllocations.values());
return mergedAllocations.object2LongEntrySet().stream()
.map(entry -> new MemoryAllocation(entry.getKey(), entry.getLongValue()))
.collect(toImmutableList());
}

@Override
Expand All @@ -193,7 +185,6 @@ public synchronized String toString()
.add("assignedQueries", assignedQueries)
.add("queryMemoryReservations", queryMemoryReservations)
.add("queryMemoryAllocations", queryMemoryAllocations)
.add("queryMemoryRevocableReservations", queryMemoryRevocableReservations)
.toString();
}
}
Loading
Loading