-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-53636][CORE] Fix thread-safety issue in SortShuffleManager.unregisterShuffle #52386
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala
Outdated
Show resolved
Hide resolved
|
||
def this(conf: SparkConf) = { | ||
this(conf, null, Collections.emptyMap()) | ||
this(conf, null, new ConcurrentHashMap[Int, OpenHashSet[Long]]()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
class EmptyConcurrentMap<K, V> implements ConcurrentMap<K, V> {
private EmptyConcurrentMap() {
}
public V putIfAbsent(K key, V value) {
return null;
}
public boolean remove(Object key, Object value) {
return false;
}
public boolean replace(K key, V oldValue, V newValue) {
return false;
}
public V replace(K key, V value) {
return null;
}
public int size() {
return 0;
}
public boolean isEmpty() {
return true;
}
public boolean containsKey(Object key) {
return false;
}
public boolean containsValue(Object value) {
return false;
}
public V get(Object key) {
return null;
}
public V put(K key, V value) {
return null;
}
public V remove(Object key) {
return null;
}
public void putAll(Map<? extends K, ? extends V> m) {
}
public void clear() {
}
public Set<K> keySet() {
return Collections.emptySet();
}
public Collection<V> values() {
return Collections.emptySet();
}
public Set<Map.Entry<K, V>> entrySet() {
return Collections.emptySet();
}
}
Is it possible to define a similar EmptyConcurrentMap
and also use a singleton pattern for it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, it seems to be overkill to me.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I take back my previous comment.
Passing in Collections.emptyMap()
in the original code seems problematic. When taskIdMapsForShuffle
is an instance of EMPTY_MAP
, calling taskIdMapsForShuffle.computeIfAbsent
will throw an exception, even though it seemingly didn't accidentally enter this path before.
So, it's indeed inappropriate to pass in an EmptyConcurrentMap
similar to Collections.emptyMap()
here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
What changes were proposed in this pull request?
This PR fixes the thread-safetye issue in
SortShuffleManager.unregisterShuffle
by enforcing synchronous lock onmapTaskIds
's iteration. Besides, this PR also addresses the concern to enfore the type oftaskIdMapsForShuffle
asConcurrentHashMap
to ensure its thread-safety.Why are the changes needed?
Fix the potential thread-safety issue as pointed at #52337 (comment) and also the concern.
Does this PR introduce any user-facing change?
No.
How was this patch tested?
N/A
Was this patch authored or co-authored using generative AI tooling?
No.