-
Notifications
You must be signed in to change notification settings - Fork 244
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
AsymmetricJoinSizer passes wrong buildSize to JoinInfo #12354
base: branch-25.04
Are you sure you want to change the base?
AsymmetricJoinSizer passes wrong buildSize to JoinInfo #12354
Conversation
Signed-off-by: Hongbin Ma (Mahone) <[email protected]>
build |
sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuShuffledSizedHashJoinExec.scala
Show resolved
Hide resolved
sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuShuffledSizedHashJoinExec.scala
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some tests failed
Signed-off-by: Hongbin Ma (Mahone) <[email protected]>
build |
@firestarman CI fixed, please take another look |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@binmahone can we add a bit more detail in the description of this PR. It's not obvious how the code change closes the issue.
@abellina PR description refined, pls take another look |
// We should provide the actual buildSize instead of the truncated one. | ||
// By calling fetchProbeTargetSize again, we'll move all batches to the queue, and | ||
// at the same time we'll get the actual buildSize. | ||
val (_, remainingBytes) = closeOnExcept(buildQueue) { _ => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not a big fan of this solution which moves all the remaining bathes to the queue after a call to setupForJoin
, since it will change the queue but setupForJoin
can not sense. Then this forces a limit on setupForJoin
that it must refer to the input queue directly.
Shall we have a follow-up issue to eliminate this limit ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hi @firestarman , the reason why I'm not draining the iterator after setupForJoin is because the iterator returned by setupForJoin is unspillable, whereras what we have in the queue is still spillable. The current implementation, as you pointed out, might be a little bit tricky and imposes some contraints on how we should use the queue
, but it can save the cost of registering the batch again. I can add some comments to remind people to be careful about the queue
, what you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Filed #12355 for this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note that the queue here is not spillable (HostHostAsymmetricJoinSizer
). It is host memory, afaik, but still adds another place where we need to go back later and try to control the memory usage.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok I think we need to resolve that these are not spillable for the host. Kudo isn't going to be on by default in 25.04, and this change will be on by default for all users.
I think we are going to have to solve the issue of queue
not holding spillable elements. We will need to inspect each batch as we pop them from the iterator, and decide: is it host batch? then we need it to be a SpillableHostColumnarBatch
, is it a gpu batch? then we need to hold a SpillableColumnarBatch
. If it is Kudo, then we do that differently, but it should be consistent, all ColumnarBatch instances in that case would be Kudo.
If we don't solve the above, then we run the risk of running out of memory on the host where we didn't use to before.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, but better have more reviews.
Signed-off-by: Hongbin Ma (Mahone) <[email protected]>
more comment added |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
build |
fetchProbeTargetSize(probeStreamIter, streamQueue, gpuBatchSizeBytes) | ||
} | ||
val streamIter = setupForJoin(streamQueue, rawStreamIter, exprs.streamTypes, | ||
gpuBatchSizeBytes, metrics) | ||
if (streamRows <= Int.MaxValue && streamSize <= gpuBatchSizeBytes) { | ||
if (mayTruncatedStreamRows <= Int.MaxValue && mayTruncatedStreamSize <= gpuBatchSizeBytes) { | ||
assert(!probeStreamIter.hasNext, "stream side not exhausted") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think if we have a stream side that is exactly gpuBatchSizeBytes
or exactly Int.MaxValue
then this assert will trigger. Not specific to your PR, but the question is, is that possible, and what happens in that case? It seems our metric (BUILD_DATA_SIZE
) would be wrong, at least.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm afraid not. In your described case, the probeStreamIter will be drained in fetchProbeTargetSize
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The changes look okay to me. But I don't see any benchmark results. An artificial benchmark would be fine.
The reason I am asking is because this code change makes it much more likely that we are going to spill the build side of the join twice. It limits that spilling to be twice, but I fear that the common case may be much larger than before.
If I remember correctly we have two different ways of partitioning the build side of the join.
BuildSidePartitioner
assumes that it knows the size up front and will partition the data according to that, and do one pass through the build/stream side to partition the data.
GpuSubPartitionPairIterator
assumes that it has no knowledge about the size of the build table and will partition it a configurable 16 ways recursively. In theory It could do multiple passes through the data, but in practice it will do one + possibly some partial passes.
If we don't fully know the build size I think GpuSubPartitionPairIterator
is a better choice.
Long term I would like a single partitioning class that optionally takes a build side size, and then uses that information to make proper decisions on how to do the join. That would even possibly let us play games with AQE data to try and get the right partition number without ever toughing the data. In the short term I am fine with something less clean, but I want to see some numbers about how this impacts the common case.
Please correct me if I am wrong about this.
Hi @revans2 , a few questions:
|
I was thinking of something closer to a worst case. Something where we would hit this code, because I doubt NDS will ever exercise it. scale factor 3k is just too small.
I am fine if we put this in as is, but then we need a follow on issue early in 25.06 to look at the performance of it.
There are lots of cases where it is possible. We could have two cutoffs for pulling in data. Right now we go to 1 target batch size per side. It might be better if we pulled in up to 2x a target batch size because it covers something like 99% of all of the join cases we see. Then we would know. Another possibility is that we would use metrics from AQE for shuffles to estimate the size of the build table. Then we would have some "knowledge" even if it is not perfect. |
Sorry I forgot about the comment from @abellina I'm not sure how I feel about us pulling the entire build side into host memory. Especially in the case where we have no limits on the amount of host memory being used. I am going to be out for a few days, but if you can convince @abellina that the fix is okay, then I am fine with it. |
the follow up issue is in #12387 |
hi @abellina need your input on this |
@binmahone please see #12354 (comment) |
this PR closes #12353
Today, buildSize is inaccurate because fetchProbeTargetSize will stop when the size is already exceeding gpu batch size. We'll add a parameter named
truncateIfNecessary
intofetchProbeTargetSize
, so that when it's false, it will exhaust the inputiter
, put all of the retrieved batches into the inputqueue
, so that the byte size of the remaining batches can be calculated, returned, and added up the the originalbuildSize
to correct it.When we're sure the build side is going to be used as build side, and also we know for sure that the build side is going to be large. Then we'll call
fetchProbeTargetSize
withtruncateIfNecessary=false
, so that the buildSize will be accurate.