Skip to content

Conversation

@wendigo
Copy link
Contributor

@wendigo wendigo commented Nov 17, 2025

Description

Additional context and related issues

Release notes

(x) This is not user-visible or is docs only, and no release notes are required.
( ) Release notes are required. Please propose a release note for me.
( ) Release notes are required, with the following suggested text:

## Section
* Fix some things. ({issue}`issuenumber`)

@cla-bot cla-bot bot added the cla-signed label Nov 17, 2025
@github-actions github-actions bot added the jdbc Relates to Trino JDBC driver label Nov 17, 2025
@wendigo wendigo force-pushed the serafin/query-streams branch from 679f9c5 to 30ea39b Compare November 17, 2025 20:55
@wendigo wendigo force-pushed the serafin/query-streams branch from 30ea39b to 5f77773 Compare November 18, 2025 12:10
@wendigo wendigo force-pushed the serafin/query-streams branch 2 times, most recently from 3d41d08 to 3e46aad Compare November 18, 2025 15:51
@martint
Copy link
Member

martint commented Nov 18, 2025

The PR does exactly what the title says, but what the motivation for this change?

@wendigo
Copy link
Contributor Author

wendigo commented Nov 18, 2025

@martint we process the queries lists in the streaming fashion. This PR avoids unnecessary copies and reduces memory allocations when working with long query lists.

@martint
Copy link
Member

martint commented Nov 18, 2025

Where do we see those unnecessary allocations? Is that a problem in practice? A cluster will usually not be dealing with so many queries that that should be visible.

BTW, my concern with using Stream in a general purpose API like this is that it forces a style of usage that we don't want to thrust upon on every caller. It may also introduce hidden costs if you need to do something with the results twice, especially if the underlying stream has complex filters/transformations applied to it (or force the caller to materialize into a temporary collection).

@wendigo
Copy link
Contributor Author

wendigo commented Nov 18, 2025

@martint I was looking at these while running high concurrency/small queries benchmarks.

In query enforcement code (scan, write, memory, cpu limits) we are processing queries like this, effectively always copying a list of queries every 1 s (x4)

        List<QueryExecution> runningQueries = queryTracker.getAllQueries().stream()
                .filter(query -> query.getState() == RUNNING)
                .collect(toImmutableList());

and then processing queries one-by-one. The only place where we work on the stream twice is the memory leak detector where we run callOomKiller once again over a stream but this happens if we are out of memory (so not every time).

@martint
Copy link
Member

martint commented Nov 18, 2025

QueryTracker.getAllQueries() doesn't copy the queries:

public Collection<T> getAllQueries()
{
    return unmodifiableCollection(queries.values());
}

So, the copying is confined to the method you posted above. It sounds like we just need a better interaction between that method and ClusterQueryManager.

@wendigo wendigo force-pushed the serafin/query-streams branch from 3e46aad to 3d72131 Compare November 18, 2025 17:09
@wendigo
Copy link
Contributor Author

wendigo commented Nov 18, 2025

@martint what's wrong with this change if the elements are processed one by one and we only do filtering/mapping?

@wendigo wendigo force-pushed the serafin/query-streams branch from 3d72131 to ae76d8d Compare November 18, 2025 18:29
@martint
Copy link
Member

martint commented Nov 18, 2025

It's too invasive (making SqlQueryTracker return Stream), when we're trying to solve how SqlQueryManager processes the queries and feeds them to the ClusterQueryManager.

From an abstraction perspective, there's no inherent "streaminess" or "laziness" to SqlQueryTracker. A Stream is also more constraining. It only allows a simple one-pass computations, and forces callers to adopt a style of processing that may not be appropriate in all situations. Streams are more appropriate for representing computation pipelines, not a data model/data structure.

If the caller needs a Stream (for the purpose of filtering/transforming), it can just call stream() on the collection it returns.

@wendigo
Copy link
Contributor Author

wendigo commented Nov 18, 2025

@martint I'm not talking about some future applications but what we have right now. I disagree that "streams are representing computational pipelines". In this case, we are exposing an ability to traverse list of queries, rather than a container for these queries (given that this is a snapshot in time since the list changes frequently). From the consumer perspective, this states an intent - "walk through this structure" rather than "stash it/keep it somewhere". Stream having its "walk once" semantics better reflects the fact that this is heavily mutated structure and you can only peek into its current content, right here, right now.

@martint
Copy link
Member

martint commented Nov 18, 2025

@dain, can you comment on the original intent and abstraction provided by QueryTracker from when you refactored that code a few years ago (if you recall)?

@dain
Copy link
Member

dain commented Nov 18, 2025

@dain, can you comment on the original intent and abstraction provided by QueryTracker from when you refactored that code a few years ago (if you recall)?

Quite simply to have a master list of queries, so we could handle all of the complex timeout logic in one, easy to understand, place. In the old days, we had, abandonded query detection, timeout, and history management spread throughtout the even more complex query manager system, and we had tons of bugs (memory leaks). I pulled this part out so that we could test independently and have a better chance of not getting more bugs.

For this PR, I'm not sure of the value of this change. It basically forces a .stream() call on every caller, and that is it. It doesn't change the underlying behavior of the collection, which is a live, unmodifiable, view over a concurrent map, So it takes away flexibility from the users. That said if I want a snapshot then, I can call .toList()... obviously with this change you no longer get a live collection, but I'm not sure I'd every want that either.

Is there some bad behavior users get into with the original design?

@wendigo
Copy link
Contributor Author

wendigo commented Nov 18, 2025

@dain no, just optimizing for memory usage by reducing number of unnecessary memory copies/materialization

@dain
Copy link
Member

dain commented Nov 18, 2025

@wendigo I don't see how this changes "memory copies/materialization"

@wendigo
Copy link
Contributor Author

wendigo commented Nov 18, 2025

@dain:

List<QueryExecution> runningQueries = queryTracker.getAllQueries().stream()
                .filter(query -> query.getState() == RUNNING)
                .collect(toImmutableList());

this copies running queries to a new list

@dain
Copy link
Member

dain commented Nov 18, 2025

I assume you are talking about:

    private void enforceMemoryLimits()
    {
        List<QueryExecution> runningQueries = queryTracker.getAllQueries().stream()
                .filter(query -> query.getState() == RUNNING)
                .collect(toImmutableList());
        memoryManager.process(runningQueries, this::getQueries);
    }

Which has been updated to:

    private void enforceMemoryLimits()
    {
        memoryManager.process(queryTracker::getAllQueries, queryTracker::tryGetQuery);
    }

But you could have done the same thing with:

    private void enforceMemoryLimits()
    {
        memoryManager.process(queryTracker.getAllQueries()::stream, queryTracker::tryGetQuery);
    }

So my point is I don't see how changing getAllQueries to return Stream instead of Collection improves memory copies/materialization. I do see how changing memoryManager.process to take Supplier<Stream<QueryExecution>> does do this.

@wendigo
Copy link
Contributor Author

wendigo commented Nov 18, 2025

@dain agreed but then on every usage site I'm doing ::stream so what's the point?

@dain
Copy link
Member

dain commented Nov 18, 2025

@dain agreed but then on every usage site I'm doing ::stream so what's the point?

There are lots of cases where I have collections an only interact with them as streams, but I don't change them to be Supplier<Stream<T>>. I'm not saying that is can't be done, or that there aren't cases that it should be done. I am saying, I don't think you have made your caase, and I don't see the need for that specific part of this PR. I don't feel super strongly about this, as this is an internal API and we can just switch back so 🤷

@wendigo
Copy link
Contributor Author

wendigo commented Nov 18, 2025

@dain yeah as an internal API it's easy to change it back when usage patter changes

@wendigo wendigo force-pushed the serafin/query-streams branch from ae76d8d to f1d7e90 Compare November 19, 2025 10:38
@wendigo wendigo changed the title Return lazy streams from QueryManager, DispatchManager and QueryTracker Optimize memory usage around query tracking Nov 19, 2025
@wendigo wendigo force-pushed the serafin/query-streams branch from f1d7e90 to 4d9527f Compare November 19, 2025 16:48
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

cla-signed jdbc Relates to Trino JDBC driver

Development

Successfully merging this pull request may close these issues.

5 participants