Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 78 additions & 15 deletions src/hotspot/share/runtime/objectMonitor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,8 @@ OopStorage* ObjectMonitor::_oop_storage = nullptr;
OopHandle ObjectMonitor::_vthread_list_head;
ParkEvent* ObjectMonitor::_vthread_unparker_ParkEvent = nullptr;

static const jlong MAX_RECHECK_INTERVAL = 1000;

// -----------------------------------------------------------------------------
// Theory of operations -- Monitors lists, thread residency, etc:
//
Expand Down Expand Up @@ -294,6 +296,7 @@ ObjectMonitor::ObjectMonitor(oop object) :
_succ(NO_OWNER),
_SpinDuration(ObjectMonitor::Knob_SpinLimit),
_contentions(0),
_unmounted_vthreads(0),
_wait_set(nullptr),
_waiters(0),
_wait_set_lock(0)
Expand Down Expand Up @@ -983,19 +986,18 @@ void ObjectMonitor::enter_internal(JavaThread* current) {
// to defer the state transitions until absolutely necessary,
// and in doing so avoid some transitions ...

// For virtual threads that are pinned, do a timed-park instead to
// alleviate some deadlocks cases where the succesor is an unmounted
// virtual thread that cannot run. This can happen in particular when
// this virtual thread is currently loading/initializing a class, and
// all other carriers have a vthread pinned to it waiting for said class
// to be loaded/initialized.
static int MAX_RECHECK_INTERVAL = 1000;
int recheck_interval = 1;
bool do_timed_parked = false;
ContinuationEntry* ce = current->last_continuation();
if (ce != nullptr && ce->is_virtual_thread()) {
do_timed_parked = true;
}
// If there are unmounted virtual threads ahead in the _entry_list we want
// to do a timed-park instead to alleviate some deadlock cases where one
// of them is picked as the successor but cannot run due to having run out
// of carriers. This can happen, for example, if this is a pinned virtual
// thread currently loading or initializining a class, and all other carriers
// have a pinned vthread waiting for said class to be loaded/initialized.
// Read counter *after* adding this thread to the _entry_list. Adding to
// _entry_list uses Atomic::cmpxchg() which already provides a fence that
// prevents this load from floating up previous store.
// Note that we can have false positives where timed-park is not necessary.
bool do_timed_parked = has_unmounted_vthreads();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't we still only need the timed-park if the current thread is a pinned vthread?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, except if the monitor is also used in the context of a carrier thread. Currently there are only very few such cases and we disable preemption for them (e.g. interruptLock), so it’s likely not needed. With the upcoming changes to preempt on klass initialization, we could also have this situation if a class can be initialized both in the context of a carrier and a vthread. Since code executed in the context of the carriers is limited to library code there will also be very few cases of this, but I’ve seen at least one such case with LockSupport.

jlong recheck_interval = 1;

for (;;) {

Expand All @@ -1006,7 +1008,7 @@ void ObjectMonitor::enter_internal(JavaThread* current) {

// park self
if (do_timed_parked) {
current->_ParkEvent->park((jlong) recheck_interval);
current->_ParkEvent->park(recheck_interval);
// Increase the recheck_interval, but clamp the value.
recheck_interval *= 8;
if (recheck_interval > MAX_RECHECK_INTERVAL) {
Expand Down Expand Up @@ -1090,6 +1092,22 @@ void ObjectMonitor::reenter_internal(JavaThread* current, ObjectWaiter* currentN
assert(_waiters > 0, "invariant");
assert_mark_word_consistency();

// If there are unmounted virtual threads ahead in the _entry_list we want
// to do a timed-park instead to alleviate some deadlock cases where one
// of them is picked as the successor but cannot run due to having run out
// of carriers. This can happen, for example, if a mixed of unmounted and
// pinned vthreads taking up all the carriers are waiting for a class to be
// initialized, and the selected successor is one of the unmounted vthreads.
// Although this method is used for the "notification" case, it could be
// that this thread reached here without been added to the _entry_list yet.
// This can happen if it was interrupted or the wait timed-out at the same
// time. In that case we rely on currentNode->_do_timed_park, which will be
// read on the next loop iteration, after consuming the park permit set by
// the notifier in notify_internal.
// Note that we can have false positives where timed-park is not necessary.
bool do_timed_parked = has_unmounted_vthreads();
jlong recheck_interval = 1;

for (;;) {
ObjectWaiter::TStates v = currentNode->TState;
guarantee(v == ObjectWaiter::TS_ENTER, "invariant");
Expand All @@ -1114,7 +1132,16 @@ void ObjectMonitor::reenter_internal(JavaThread* current, ObjectWaiter* currentN
{
ClearSuccOnSuspend csos(this);
ThreadBlockInVMPreprocess<ClearSuccOnSuspend> tbivs(current, csos, true /* allow_suspend */);
current->_ParkEvent->park();
if (do_timed_parked) {
current->_ParkEvent->park(recheck_interval);
// Increase the recheck_interval, but clamp the value.
recheck_interval *= 8;
if (recheck_interval > MAX_RECHECK_INTERVAL) {
recheck_interval = MAX_RECHECK_INTERVAL;
}
} else {
current->_ParkEvent->park();
}
}
}

Expand All @@ -1134,6 +1161,9 @@ void ObjectMonitor::reenter_internal(JavaThread* current, ObjectWaiter* currentN
// Invariant: after clearing _succ a contending thread
// *must* retry _owner before parking.
OrderAccess::fence();

// See comment in notify_internal
do_timed_parked |= currentNode->_do_timed_park;
}

// Current has acquired the lock -- Unlink current from the _entry_list.
Expand Down Expand Up @@ -1161,9 +1191,16 @@ bool ObjectMonitor::vthread_monitor_enter(JavaThread* current, ObjectWaiter* wai

oop vthread = current->vthread();
ObjectWaiter* node = waiter != nullptr ? waiter : new ObjectWaiter(vthread, this);

// Increment counter *before* adding the vthread to the _entry_list.
// Adding to _entry_list uses Atomic::cmpxchg() which already provides
// a fence that prevents reordering of the stores.
inc_unmounted_vthreads();

if (try_lock_or_add_to_entry_list(current, node)) {
// We got the lock.
if (waiter == nullptr) delete node; // for Object.wait() don't delete yet
dec_unmounted_vthreads();
return true;
}
// This thread is now added to the entry_list.
Expand All @@ -1175,6 +1212,7 @@ bool ObjectMonitor::vthread_monitor_enter(JavaThread* current, ObjectWaiter* wai
unlink_after_acquire(current, node);
if (has_successor(current)) clear_successor();
if (waiter == nullptr) delete node; // for Object.wait() don't delete yet
dec_unmounted_vthreads();
return true;
}

Expand Down Expand Up @@ -1230,6 +1268,7 @@ bool ObjectMonitor::resume_operation(JavaThread* current, ObjectWaiter* node, Co
void ObjectMonitor::vthread_epilog(JavaThread* current, ObjectWaiter* node) {
assert(has_owner(current), "invariant");
add_to_contentions(-1);
dec_unmounted_vthreads();

if (has_successor(current)) clear_successor();

Expand Down Expand Up @@ -2004,6 +2043,10 @@ bool ObjectMonitor::notify_internal(JavaThread* current) {
old_state == java_lang_VirtualThread::TIMED_WAIT) {
java_lang_VirtualThread::cmpxchg_state(vthread, old_state, java_lang_VirtualThread::BLOCKED);
}
// Increment counter *before* adding the vthread to the _entry_list.
// Adding to _entry_list uses Atomic::cmpxchg() which already provides
// a fence that prevents reordering of the stores.
inc_unmounted_vthreads();
}

iterator->_notified = true;
Expand All @@ -2021,6 +2064,24 @@ bool ObjectMonitor::notify_internal(JavaThread* current) {

if (!iterator->is_vthread()) {
iterator->wait_reenter_begin(this);

// Read counter *after* adding the thread to the _entry_list.
// Adding to _entry_list uses Atomic::cmpxchg() which already provides
// a fence that prevents this load from floating up previous store.
if (has_unmounted_vthreads()) {
// Wake up the thread to alleviate some deadlock cases where the successor
// that will be picked up when this thread releases the monitor is an unmounted
// virtual thread that cannot run due to having run out of carriers. Upon waking
// up, the thread will call reenter_internal() which will use timed-park in case
// there is contention and there are still vthreads in the _entry_list.
// If the target was interrupted or the wait timed-out at the same time, it could
// have reached reenter_internal and read a false value of has_unmounted_vthreads()
// before we added it to the _entry_list above. To deal with that case, we set _do_timed_park
// which will be read by the target on the next loop iteration in reenter_internal.
iterator->_do_timed_park = true;
JavaThread* t = iterator->thread();
t->_ParkEvent->unpark();
}
}
}
Thread::SpinRelease(&_wait_set_lock);
Expand Down Expand Up @@ -2460,6 +2521,7 @@ ObjectWaiter::ObjectWaiter(JavaThread* current) {
_is_wait = false;
_at_reenter = false;
_interrupted = false;
_do_timed_park = false;
_active = false;
}

Expand Down Expand Up @@ -2632,6 +2694,7 @@ void ObjectMonitor::print_debug_style_on(outputStream* st) const {
st->print_cr(" _succ = " INT64_FORMAT, successor());
st->print_cr(" _SpinDuration = %d", _SpinDuration);
st->print_cr(" _contentions = %d", contentions());
st->print_cr(" _unmounted_vthreads = " INT64_FORMAT, _unmounted_vthreads);
st->print_cr(" _wait_set = " INTPTR_FORMAT, p2i(_wait_set));
st->print_cr(" _waiters = %d", _waiters);
st->print_cr(" _wait_set_lock = %d", _wait_set_lock);
Expand Down
6 changes: 6 additions & 0 deletions src/hotspot/share/runtime/objectMonitor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ class ObjectWaiter : public CHeapObj<mtThread> {
bool _is_wait;
bool _at_reenter;
bool _interrupted;
bool _do_timed_park;
bool _active; // Contention monitoring is enabled
public:
ObjectWaiter(JavaThread* current);
Expand Down Expand Up @@ -199,6 +200,8 @@ class ObjectMonitor : public CHeapObj<mtObjectMonitor> {
// along with other fields to determine if an ObjectMonitor can be
// deflated. It is also used by the async deflation protocol. See
// ObjectMonitor::deflate_monitor().
int64_t _unmounted_vthreads; // Number of nodes in the _entry_list associated with unmounted vthreads.
// It might be temporarily more than the actual number but never less.

ObjectWaiter* volatile _wait_set; // LL of threads waiting on the monitor - wait()
volatile int _waiters; // number of waiting threads
Expand Down Expand Up @@ -325,6 +328,9 @@ class ObjectMonitor : public CHeapObj<mtObjectMonitor> {
intx recursions() const { return _recursions; }
void set_recursions(size_t recursions);
void increment_recursions(JavaThread* current);
void inc_unmounted_vthreads();
void dec_unmounted_vthreads();
bool has_unmounted_vthreads() const;

// JVM/TI GetObjectMonitorUsage() needs this:
int waiters() const;
Expand Down
15 changes: 15 additions & 0 deletions src/hotspot/share/runtime/objectMonitor.inline.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,21 @@ inline void ObjectMonitor::increment_recursions(JavaThread* current) {
_recursions++;
}

inline void ObjectMonitor::inc_unmounted_vthreads() {
assert(_unmounted_vthreads >= 0, "");
AtomicAccess::inc(&_unmounted_vthreads, memory_order_relaxed);
}

inline void ObjectMonitor::dec_unmounted_vthreads() {
assert(_unmounted_vthreads > 0, "");
AtomicAccess::dec(&_unmounted_vthreads, memory_order_relaxed);
}

inline bool ObjectMonitor::has_unmounted_vthreads() const {
assert(_unmounted_vthreads >= 0, "");
return AtomicAccess::load(&_unmounted_vthreads) > 0;
}

// Clear _owner field; current value must match old_value.
inline void ObjectMonitor::release_clear_owner(JavaThread* old_owner) {
int64_t old_value = owner_id_from(old_owner);
Expand Down
54 changes: 54 additions & 0 deletions test/jdk/java/lang/Thread/virtual/MonitorWaitNotify.java
Original file line number Diff line number Diff line change
Expand Up @@ -640,6 +640,60 @@ void testParkingPermitNotOffered() throws Exception {
assertTrue(completed.get());
}

/**
* Test no deadlock happens when Object.wait is called from a mix of pinned and non-pinned
* paths and notification is done using notifyAll.
*/
@Test
void testMixedPinnedUnmounted() throws Exception {
assumeTrue(VThreadScheduler.supportsCustomScheduler(), "No support for custom schedulers");
try (ExecutorService scheduler = Executors.newFixedThreadPool(1)) {
ThreadFactory factory = VThreadScheduler.virtualThreadFactory(scheduler);
var lock = new Object();

var startedNotPinned = new CountDownLatch(1);
var vthreadNotPinned = factory.newThread(() -> {
synchronized (lock) {
try {
startedNotPinned.countDown();
lock.wait();
} catch (InterruptedException e) {
fail("wait interrupted");
}
}
});
vthreadNotPinned.start();
startedNotPinned.await();
await(vthreadNotPinned, Thread.State.WAITING);

var startedPinned = new CountDownLatch(1);
var vthreadPinned = factory.newThread(() -> {
synchronized (lock) {
try {
startedPinned.countDown();
VThreadPinner.runPinned(() -> {
lock.wait();
});
} catch (InterruptedException e) {
fail("wait interrupted");
}
}
});
vthreadPinned.start();
startedPinned.await();
await(vthreadPinned, Thread.State.WAITING);

// wakeup threads
synchronized (lock) {
lock.notifyAll();
}

// thread should terminate
vthreadNotPinned.join();
vthreadPinned.join();
}
}

/**
* Test that Object.wait releases the carrier. This test uses a custom scheduler
* with one carrier thread.
Expand Down