diff --git a/src/hotspot/share/runtime/objectMonitor.cpp b/src/hotspot/share/runtime/objectMonitor.cpp index 8859f6e7f5f29..8a5ba16c9b8cc 100644 --- a/src/hotspot/share/runtime/objectMonitor.cpp +++ b/src/hotspot/share/runtime/objectMonitor.cpp @@ -117,6 +117,8 @@ OopStorage* ObjectMonitor::_oop_storage = nullptr; OopHandle ObjectMonitor::_vthread_list_head; ParkEvent* ObjectMonitor::_vthread_unparker_ParkEvent = nullptr; +static const jlong MAX_RECHECK_INTERVAL = 1000; + // ----------------------------------------------------------------------------- // Theory of operations -- Monitors lists, thread residency, etc: // @@ -294,6 +296,7 @@ ObjectMonitor::ObjectMonitor(oop object) : _succ(NO_OWNER), _SpinDuration(ObjectMonitor::Knob_SpinLimit), _contentions(0), + _unmounted_vthreads(0), _wait_set(nullptr), _waiters(0), _wait_set_lock(0) @@ -983,19 +986,18 @@ void ObjectMonitor::enter_internal(JavaThread* current) { // to defer the state transitions until absolutely necessary, // and in doing so avoid some transitions ... - // For virtual threads that are pinned, do a timed-park instead to - // alleviate some deadlocks cases where the succesor is an unmounted - // virtual thread that cannot run. This can happen in particular when - // this virtual thread is currently loading/initializing a class, and - // all other carriers have a vthread pinned to it waiting for said class - // to be loaded/initialized. - static int MAX_RECHECK_INTERVAL = 1000; - int recheck_interval = 1; - bool do_timed_parked = false; - ContinuationEntry* ce = current->last_continuation(); - if (ce != nullptr && ce->is_virtual_thread()) { - do_timed_parked = true; - } + // If there are unmounted virtual threads ahead in the _entry_list we want + // to do a timed-park instead to alleviate some deadlock cases where one + // of them is picked as the successor but cannot run due to having run out + // of carriers. This can happen, for example, if this is a pinned virtual + // thread currently loading or initializining a class, and all other carriers + // have a pinned vthread waiting for said class to be loaded/initialized. + // Read counter *after* adding this thread to the _entry_list. Adding to + // _entry_list uses Atomic::cmpxchg() which already provides a fence that + // prevents this load from floating up previous store. + // Note that we can have false positives where timed-park is not necessary. + bool do_timed_parked = has_unmounted_vthreads(); + jlong recheck_interval = 1; for (;;) { @@ -1006,7 +1008,7 @@ void ObjectMonitor::enter_internal(JavaThread* current) { // park self if (do_timed_parked) { - current->_ParkEvent->park((jlong) recheck_interval); + current->_ParkEvent->park(recheck_interval); // Increase the recheck_interval, but clamp the value. recheck_interval *= 8; if (recheck_interval > MAX_RECHECK_INTERVAL) { @@ -1090,6 +1092,22 @@ void ObjectMonitor::reenter_internal(JavaThread* current, ObjectWaiter* currentN assert(_waiters > 0, "invariant"); assert_mark_word_consistency(); + // If there are unmounted virtual threads ahead in the _entry_list we want + // to do a timed-park instead to alleviate some deadlock cases where one + // of them is picked as the successor but cannot run due to having run out + // of carriers. This can happen, for example, if a mixed of unmounted and + // pinned vthreads taking up all the carriers are waiting for a class to be + // initialized, and the selected successor is one of the unmounted vthreads. + // Although this method is used for the "notification" case, it could be + // that this thread reached here without been added to the _entry_list yet. + // This can happen if it was interrupted or the wait timed-out at the same + // time. In that case we rely on currentNode->_do_timed_park, which will be + // read on the next loop iteration, after consuming the park permit set by + // the notifier in notify_internal. + // Note that we can have false positives where timed-park is not necessary. + bool do_timed_parked = has_unmounted_vthreads(); + jlong recheck_interval = 1; + for (;;) { ObjectWaiter::TStates v = currentNode->TState; guarantee(v == ObjectWaiter::TS_ENTER, "invariant"); @@ -1114,7 +1132,16 @@ void ObjectMonitor::reenter_internal(JavaThread* current, ObjectWaiter* currentN { ClearSuccOnSuspend csos(this); ThreadBlockInVMPreprocess tbivs(current, csos, true /* allow_suspend */); - current->_ParkEvent->park(); + if (do_timed_parked) { + current->_ParkEvent->park(recheck_interval); + // Increase the recheck_interval, but clamp the value. + recheck_interval *= 8; + if (recheck_interval > MAX_RECHECK_INTERVAL) { + recheck_interval = MAX_RECHECK_INTERVAL; + } + } else { + current->_ParkEvent->park(); + } } } @@ -1134,6 +1161,9 @@ void ObjectMonitor::reenter_internal(JavaThread* current, ObjectWaiter* currentN // Invariant: after clearing _succ a contending thread // *must* retry _owner before parking. OrderAccess::fence(); + + // See comment in notify_internal + do_timed_parked |= currentNode->_do_timed_park; } // Current has acquired the lock -- Unlink current from the _entry_list. @@ -1161,9 +1191,16 @@ bool ObjectMonitor::vthread_monitor_enter(JavaThread* current, ObjectWaiter* wai oop vthread = current->vthread(); ObjectWaiter* node = waiter != nullptr ? waiter : new ObjectWaiter(vthread, this); + + // Increment counter *before* adding the vthread to the _entry_list. + // Adding to _entry_list uses Atomic::cmpxchg() which already provides + // a fence that prevents reordering of the stores. + inc_unmounted_vthreads(); + if (try_lock_or_add_to_entry_list(current, node)) { // We got the lock. if (waiter == nullptr) delete node; // for Object.wait() don't delete yet + dec_unmounted_vthreads(); return true; } // This thread is now added to the entry_list. @@ -1175,6 +1212,7 @@ bool ObjectMonitor::vthread_monitor_enter(JavaThread* current, ObjectWaiter* wai unlink_after_acquire(current, node); if (has_successor(current)) clear_successor(); if (waiter == nullptr) delete node; // for Object.wait() don't delete yet + dec_unmounted_vthreads(); return true; } @@ -1230,6 +1268,7 @@ bool ObjectMonitor::resume_operation(JavaThread* current, ObjectWaiter* node, Co void ObjectMonitor::vthread_epilog(JavaThread* current, ObjectWaiter* node) { assert(has_owner(current), "invariant"); add_to_contentions(-1); + dec_unmounted_vthreads(); if (has_successor(current)) clear_successor(); @@ -2004,6 +2043,10 @@ bool ObjectMonitor::notify_internal(JavaThread* current) { old_state == java_lang_VirtualThread::TIMED_WAIT) { java_lang_VirtualThread::cmpxchg_state(vthread, old_state, java_lang_VirtualThread::BLOCKED); } + // Increment counter *before* adding the vthread to the _entry_list. + // Adding to _entry_list uses Atomic::cmpxchg() which already provides + // a fence that prevents reordering of the stores. + inc_unmounted_vthreads(); } iterator->_notified = true; @@ -2021,6 +2064,24 @@ bool ObjectMonitor::notify_internal(JavaThread* current) { if (!iterator->is_vthread()) { iterator->wait_reenter_begin(this); + + // Read counter *after* adding the thread to the _entry_list. + // Adding to _entry_list uses Atomic::cmpxchg() which already provides + // a fence that prevents this load from floating up previous store. + if (has_unmounted_vthreads()) { + // Wake up the thread to alleviate some deadlock cases where the successor + // that will be picked up when this thread releases the monitor is an unmounted + // virtual thread that cannot run due to having run out of carriers. Upon waking + // up, the thread will call reenter_internal() which will use timed-park in case + // there is contention and there are still vthreads in the _entry_list. + // If the target was interrupted or the wait timed-out at the same time, it could + // have reached reenter_internal and read a false value of has_unmounted_vthreads() + // before we added it to the _entry_list above. To deal with that case, we set _do_timed_park + // which will be read by the target on the next loop iteration in reenter_internal. + iterator->_do_timed_park = true; + JavaThread* t = iterator->thread(); + t->_ParkEvent->unpark(); + } } } Thread::SpinRelease(&_wait_set_lock); @@ -2460,6 +2521,7 @@ ObjectWaiter::ObjectWaiter(JavaThread* current) { _is_wait = false; _at_reenter = false; _interrupted = false; + _do_timed_park = false; _active = false; } @@ -2632,6 +2694,7 @@ void ObjectMonitor::print_debug_style_on(outputStream* st) const { st->print_cr(" _succ = " INT64_FORMAT, successor()); st->print_cr(" _SpinDuration = %d", _SpinDuration); st->print_cr(" _contentions = %d", contentions()); + st->print_cr(" _unmounted_vthreads = " INT64_FORMAT, _unmounted_vthreads); st->print_cr(" _wait_set = " INTPTR_FORMAT, p2i(_wait_set)); st->print_cr(" _waiters = %d", _waiters); st->print_cr(" _wait_set_lock = %d", _wait_set_lock); diff --git a/src/hotspot/share/runtime/objectMonitor.hpp b/src/hotspot/share/runtime/objectMonitor.hpp index 3c925928be251..cbf6ace6cfe95 100644 --- a/src/hotspot/share/runtime/objectMonitor.hpp +++ b/src/hotspot/share/runtime/objectMonitor.hpp @@ -55,6 +55,7 @@ class ObjectWaiter : public CHeapObj { bool _is_wait; bool _at_reenter; bool _interrupted; + bool _do_timed_park; bool _active; // Contention monitoring is enabled public: ObjectWaiter(JavaThread* current); @@ -199,6 +200,8 @@ class ObjectMonitor : public CHeapObj { // along with other fields to determine if an ObjectMonitor can be // deflated. It is also used by the async deflation protocol. See // ObjectMonitor::deflate_monitor(). + int64_t _unmounted_vthreads; // Number of nodes in the _entry_list associated with unmounted vthreads. + // It might be temporarily more than the actual number but never less. ObjectWaiter* volatile _wait_set; // LL of threads waiting on the monitor - wait() volatile int _waiters; // number of waiting threads @@ -325,6 +328,9 @@ class ObjectMonitor : public CHeapObj { intx recursions() const { return _recursions; } void set_recursions(size_t recursions); void increment_recursions(JavaThread* current); + void inc_unmounted_vthreads(); + void dec_unmounted_vthreads(); + bool has_unmounted_vthreads() const; // JVM/TI GetObjectMonitorUsage() needs this: int waiters() const; diff --git a/src/hotspot/share/runtime/objectMonitor.inline.hpp b/src/hotspot/share/runtime/objectMonitor.inline.hpp index 52ef904b404d5..eeb451235dc34 100644 --- a/src/hotspot/share/runtime/objectMonitor.inline.hpp +++ b/src/hotspot/share/runtime/objectMonitor.inline.hpp @@ -143,6 +143,21 @@ inline void ObjectMonitor::increment_recursions(JavaThread* current) { _recursions++; } +inline void ObjectMonitor::inc_unmounted_vthreads() { + assert(_unmounted_vthreads >= 0, "invariant"); + AtomicAccess::inc(&_unmounted_vthreads, memory_order_relaxed); +} + +inline void ObjectMonitor::dec_unmounted_vthreads() { + assert(_unmounted_vthreads > 0, "invariant"); + AtomicAccess::dec(&_unmounted_vthreads, memory_order_relaxed); +} + +inline bool ObjectMonitor::has_unmounted_vthreads() const { + assert(_unmounted_vthreads >= 0, "invariant"); + return AtomicAccess::load(&_unmounted_vthreads) > 0; +} + // Clear _owner field; current value must match old_value. inline void ObjectMonitor::release_clear_owner(JavaThread* old_owner) { int64_t old_value = owner_id_from(old_owner); diff --git a/test/jdk/java/lang/Thread/virtual/RetryMonitorEnterWhenPinned.java b/test/jdk/java/lang/Thread/virtual/RetryMonitorEnterWhenPinned.java index 464259ecb869b..b402ec634a434 100644 --- a/test/jdk/java/lang/Thread/virtual/RetryMonitorEnterWhenPinned.java +++ b/test/jdk/java/lang/Thread/virtual/RetryMonitorEnterWhenPinned.java @@ -23,13 +23,13 @@ /* * @test - * @summary Test that a virtual thread waiting to enter a monitor, while pinning its - * carrier, will retry until it enters the monitor. This avoids starvation when the + * @summary Test that a virtual thread waiting to enter/reenter a monitor, while pinning + * its carrier, will retry until it enters the monitor. This avoids starvation when the * monitor is exited, an unmounted thread is the chosen successor, and the successor * can't continue because there are no carriers available. * @modules java.base/java.lang:+open * @library /test/lib - * @run main/othervm/native/timeout=480 --enable-native-access=ALL-UNNAMED RetryMonitorEnterWhenPinned + * @run junit/othervm/native/timeout=480 --enable-native-access=ALL-UNNAMED RetryMonitorEnterWhenPinned */ import java.time.Duration; @@ -38,18 +38,23 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import jdk.test.lib.thread.VThreadPinner; +import jdk.test.lib.thread.VThreadRunner; + +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.RepeatedTest; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; public class RetryMonitorEnterWhenPinned { - public static void main(String[] args) throws Exception { - int iterations = (args.length > 0) ? Integer.parseInt(args[0]) : 10; - for (int i = 1; i <= iterations; i++) { - System.out.printf("%s -- iteration %d --%n", Instant.now(), i); - run(); - System.out.println(); - } + + @BeforeAll + static void setup() { + // need >=2 carriers for testing pinning when main thread is a virtual thread + VThreadRunner.ensureParallelism(2); } - static void run() throws Exception { + @RepeatedTest(10) + void testMonitorEnter() throws Exception { var threads = new ArrayList(); Object lock = new Object(); @@ -116,6 +121,94 @@ static void run() throws Exception { System.out.printf("%s done%n", Instant.now()); } + @ParameterizedTest + @ValueSource(ints = { 0, 30000, Integer.MAX_VALUE }) + void testMonitorReenter(int timeout) throws Exception { + var threads = new ArrayList(); + + Object lock = new Object(); + + // start virtual threads that wait on Object.wait + for (int i = 0; i < 100; i++) { + var started = new CountDownLatch(1); + Thread thread = Thread.startVirtualThread(() -> { + try { + synchronized (lock) { + started.countDown(); + if (timeout > 0) { + lock.wait(timeout); + } else { + lock.wait(); + } + } + } catch (InterruptedException e) { } + }); + + // wait for thread to start and wait + started.await(); + await(thread, timeout > 0 ? Thread.State.TIMED_WAITING : Thread.State.WAITING); + threads.add(thread); + } + + // start virtual threads that wait on Object.wait while pinnned + int carriersAvailable = Runtime.getRuntime().availableProcessors(); + if (Thread.currentThread().isVirtual()) { + carriersAvailable--; + } + for (int i = 0; i < 100; i++) { + var started = new CountDownLatch(1); + boolean hasCarrier = carriersAvailable > 0; + Thread thread = Thread.startVirtualThread(() -> { + VThreadPinner.runPinned(() -> { + try { + synchronized (lock) { + started.countDown(); + if (!hasCarrier) { + // This thread will run at the very + // end and won't be notified. + lock.wait(1); + } else if (timeout > 0) { + lock.wait(timeout); + } else { + lock.wait(); + } + } + } catch (InterruptedException e) { } + }); + }); + + // if there are carriers available when wait until the thread blocks. + if (hasCarrier) { + System.out.printf("%s waiting for thread #%d to block%n", + Instant.now(), thread.threadId()); + started.await(); + await(thread, timeout > 0 ? Thread.State.TIMED_WAITING : Thread.State.WAITING); + carriersAvailable--; + } + threads.add(thread); + } + + // wakeup all threads + synchronized (lock) { + lock.notifyAll(); + } + + // wait for all threads to terminate + int threadsRemaining = threads.size(); + while (threadsRemaining > 0) { + System.out.printf("%s waiting for %d threads to terminate%n", + Instant.now(), threadsRemaining); + int terminated = 0; + for (Thread t : threads) { + if (t.join(Duration.ofSeconds(1))) { + terminated++; + } + } + threadsRemaining = threads.size() - terminated; + } + System.out.printf("%s done%n", Instant.now()); + } + /** * Spin for the given number of milliseconds. */