diff --git a/atomicfu/api/atomicfu.api b/atomicfu/api/atomicfu.api index a9497bd8..988cdc40 100644 --- a/atomicfu/api/atomicfu.api +++ b/atomicfu/api/atomicfu.api @@ -143,3 +143,15 @@ public final class kotlinx/atomicfu/locks/ParkingSupport { public final fun unpark (Ljava/lang/Thread;)V } +public final class kotlinx/atomicfu/locks/SynchronousMutex { + public fun ()V + public final fun lock ()V + public final fun tryLock ()Z + public final fun tryLock-LRDsOJo (J)Z + public final fun unlock ()V +} + +public final class kotlinx/atomicfu/locks/SynchronousMutexKt { + public static final fun withLock (Lkotlinx/atomicfu/locks/SynchronousMutex;Lkotlin/jvm/functions/Function0;)Ljava/lang/Object; +} + diff --git a/atomicfu/build.gradle.kts b/atomicfu/build.gradle.kts index 131add18..f4389f4c 100644 --- a/atomicfu/build.gradle.kts +++ b/atomicfu/build.gradle.kts @@ -127,6 +127,7 @@ kotlin { jvmTest { dependencies { + implementation("org.jetbrains.kotlinx:lincheck:2.38") implementation("org.jetbrains.kotlin:kotlin-reflect") implementation("org.jetbrains.kotlin:kotlin-test") implementation("org.jetbrains.kotlin:kotlin-test-junit") diff --git a/atomicfu/src/androidNativeMain/kotlin/kotlinx/atomicfu/locks/NativeMutexNode.kt b/atomicfu/src/androidNativeMain/kotlin/kotlinx/atomicfu/locks/NativeMutexNode.kt deleted file mode 100644 index 25a01442..00000000 --- a/atomicfu/src/androidNativeMain/kotlin/kotlinx/atomicfu/locks/NativeMutexNode.kt +++ /dev/null @@ -1,57 +0,0 @@ -/* - * Copyright 2025 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. - */ -package kotlinx.atomicfu.locks - -import kotlinx.atomicfu.atomic -import kotlinx.cinterop.Arena -import kotlinx.cinterop.ExperimentalForeignApi -import kotlinx.cinterop.UnsafeNumber -import kotlinx.cinterop.alloc -import kotlinx.cinterop.ptr -import platform.posix.* - -@OptIn(ExperimentalForeignApi::class, UnsafeNumber::class) -actual class NativeMutexNode { - actual var next: NativeMutexNode? = null - - private val arena: Arena = Arena() - private val cond: pthread_cond_t = arena.alloc() - private val mutex: pthread_mutex_t = arena.alloc() - private val attr: pthread_mutexattr_tVar = arena.alloc() - - - init { - require(pthread_cond_init(cond.ptr, null) == 0) - require(pthread_mutexattr_init(attr.ptr) == 0) - require(pthread_mutexattr_settype(attr.ptr, PTHREAD_MUTEX_ERRORCHECK.toInt()) == 0) - require(pthread_mutex_init(mutex.ptr, attr.ptr) == 0) - } - - actual fun lock() { - pthread_mutex_lock(mutex.ptr) - } - - actual fun unlock() { - pthread_mutex_unlock(mutex.ptr) - } - - internal actual fun wait(lockOwner: Long) { - pthread_cond_wait(cond.ptr, mutex.ptr) - } - - internal actual fun notify() { - pthread_cond_signal(cond.ptr) - } - - internal actual fun dispose() { - pthread_cond_destroy(cond.ptr) - pthread_mutex_destroy(mutex.ptr) - pthread_mutexattr_destroy(attr.ptr) - arena.clear() - } -} - -private val threadCounter = atomic(0L) - -actual fun createThreadId(): Long = threadCounter.incrementAndGet() diff --git a/atomicfu/src/appleMain/kotlin/kotlinx/atomicfu/locks/NativeMutexNode.kt b/atomicfu/src/appleMain/kotlin/kotlinx/atomicfu/locks/NativeMutexNode.kt deleted file mode 100644 index 1e1a0f79..00000000 --- a/atomicfu/src/appleMain/kotlin/kotlinx/atomicfu/locks/NativeMutexNode.kt +++ /dev/null @@ -1,112 +0,0 @@ -/* - * Copyright 2025 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. - */ -package kotlinx.atomicfu.locks - -import kotlinx.cinterop.Arena -import kotlinx.cinterop.ExperimentalForeignApi -import kotlinx.cinterop.alloc -import kotlinx.cinterop.ptr -import kotlinx.cinterop.IntVar -import kotlinx.cinterop.UIntVar -import kotlinx.cinterop.toCPointer -import kotlinx.cinterop.value -import platform.posix.pthread_cond_destroy -import platform.posix.pthread_cond_init -import platform.posix.pthread_cond_signal -import platform.posix.pthread_cond_t -import platform.posix.pthread_cond_wait -import platform.posix.pthread_mutex_destroy -import platform.posix.pthread_mutex_init -import platform.posix.pthread_mutex_lock -import platform.posix.pthread_mutex_t -import platform.posix.pthread_mutex_unlock -import platform.posix.pthread_mutexattr_destroy -import platform.posix.pthread_mutexattr_init -import platform.posix.pthread_mutexattr_settype -import platform.posix.pthread_mutexattr_t -import platform.posix.pthread_get_qos_class_np -import platform.posix.pthread_override_t -import platform.posix.pthread_override_qos_class_end_np -import platform.posix.pthread_override_qos_class_start_np -import platform.posix.qos_class_self - -import platform.posix.PTHREAD_MUTEX_ERRORCHECK - -@OptIn(ExperimentalForeignApi::class) -actual class NativeMutexNode { - actual var next: NativeMutexNode? = null - - private val arena: Arena = Arena() - private val cond: pthread_cond_t = arena.alloc() - private val mutex: pthread_mutex_t = arena.alloc() - private val attr: pthread_mutexattr_t = arena.alloc() - private var qosOverride: pthread_override_t? = null - private var qosOverrideQosClass: UInt = 0U - - // Used locally as return parameters in donateQos - private val lockOwnerQosClass = arena.alloc() - private val lockOwnerRelPrio = arena.alloc() - - init { - require(pthread_cond_init(cond.ptr, null) == 0) - require(pthread_mutexattr_init(attr.ptr) == 0) - require(pthread_mutexattr_settype(attr.ptr, PTHREAD_MUTEX_ERRORCHECK) == 0) - require(pthread_mutex_init(mutex.ptr, attr.ptr) == 0) - } - - actual fun lock() { - pthread_mutex_lock(mutex.ptr) - } - - actual fun unlock() { - pthread_mutex_unlock(mutex.ptr) - } - - internal actual fun notify() { - pthread_cond_signal(cond.ptr) - } - - internal actual fun wait(lockOwner: Long) { - donateQos(lockOwner) - require(pthread_cond_wait(cond.ptr, mutex.ptr) == 0) - clearDonation() - } - - private fun donateQos(lockOwner: Long) { - if (lockOwner == NO_OWNER) { - return - } - val ourQosClass = qos_class_self() - // Set up a new override if required: - if (qosOverride != null) { - // There is an existing override, but we need to go higher. - if (ourQosClass > qosOverrideQosClass) { - pthread_override_qos_class_end_np(qosOverride) - qosOverride = pthread_override_qos_class_start_np(lockOwner.toCPointer(), qos_class_self(), 0) - qosOverrideQosClass = ourQosClass - } - } else { - // No existing override, check if we need to set one up. - pthread_get_qos_class_np(lockOwner.toCPointer(), lockOwnerQosClass.ptr, lockOwnerRelPrio.ptr) - if (ourQosClass > lockOwnerQosClass.value) { - qosOverride = pthread_override_qos_class_start_np(lockOwner.toCPointer(), ourQosClass, 0) - qosOverrideQosClass = ourQosClass - } - } - } - - private fun clearDonation() { - if (qosOverride != null) { - pthread_override_qos_class_end_np(qosOverride) - qosOverride = null - } - } - - internal actual fun dispose() { - pthread_cond_destroy(cond.ptr) - pthread_mutex_destroy(mutex.ptr) - pthread_mutexattr_destroy(attr.ptr) - arena.clear() - } -} diff --git a/atomicfu/src/appleMain/kotlin/kotlinx/atomicfu/locks/ThreadId.kt b/atomicfu/src/appleMain/kotlin/kotlinx/atomicfu/locks/ThreadId.kt deleted file mode 100644 index 1a019c8f..00000000 --- a/atomicfu/src/appleMain/kotlin/kotlinx/atomicfu/locks/ThreadId.kt +++ /dev/null @@ -1,10 +0,0 @@ -/* - * Copyright 2025 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. - */ - -package kotlinx.atomicfu.locks - -import kotlinx.cinterop.toLong -import platform.posix.pthread_self - -internal actual fun createThreadId() = pthread_self().toLong() \ No newline at end of file diff --git a/atomicfu/src/commonMain/kotlin/kotlinx/atomicfu/locks/SynchronousMutex.kt b/atomicfu/src/commonMain/kotlin/kotlinx/atomicfu/locks/SynchronousMutex.kt new file mode 100644 index 00000000..ab7623c4 --- /dev/null +++ b/atomicfu/src/commonMain/kotlin/kotlinx/atomicfu/locks/SynchronousMutex.kt @@ -0,0 +1,89 @@ +package kotlinx.atomicfu.locks + +import kotlin.contracts.ExperimentalContracts +import kotlin.contracts.InvocationKind +import kotlin.contracts.contract +import kotlin.time.Duration + +/** + * Mutual exclusion for Kotlin Multiplatform. + * + * It can protect a shared resource or critical section from multiple thread accesses. + * Threads can acquire the lock by calling [lock] and release the lock by calling [unlock]. + * + * When a thread calls [lock] while another thread is locked, it will suspend until the lock is released. + * When multiple threads are waiting for the lock, they will acquire it in a fair order (first in first out). + * On JVM, a [lock] call can skip the queue if it happens in between a thread releasing and the first in queue acquiring. + * + * It is reentrant, meaning the lock holding thread can call [lock] multiple times without suspending. + * To release the lock (after multiple [lock] calls) an equal number of [unlock] calls are required. + * + * This Mutex should not be used in combination with coroutines and `suspend` functions + * as it blocks the waiting thread. + * Use the `Mutex` from the coroutines library instead. + * + * ```Kotlin + * mutex.withLock { + * // Critical section only executed by + * // one thread at a time. + * } + * ``` + */ +expect class SynchronousMutex() { + /** + * Tries to lock this mutex, returning `false` if this mutex is already locked. + * + * It is recommended to use [withLock] for safety reasons, so that the acquired lock is always + * released at the end of your critical section, and [unlock] is never invoked before a successful + * lock acquisition. + * + * (JVM only) this call can potentially skip line. + */ + fun tryLock(): Boolean + + /** + * Tries to lock this mutex within the given [timeout] period, + * returning `false` if the duration passed without locking. + * + * Note: when [tryLock] succeeds the lock needs to be released by [unlock]. + * When [tryLock] does not succeed the lock does not have to be released. + * + * (JVM only) throws Interrupted exception when thread is interrupted while waiting for lock. + */ + fun tryLock(timeout: Duration): Boolean + + /** + * Locks the mutex, suspends the thread until the lock is acquired. + * + * It is recommended to use [withLock] for safety reasons, so that the acquired lock is always + * released at the end of your critical section, and [unlock] is never invoked before a successful + * lock acquisition. + */ + fun lock() + + /** + * Releases the lock. + * Throws [IllegalStateException] when the current thread is not holding the lock. + * + * It is recommended to use [withLock] for safety reasons, so that the acquired lock is always + * released at the end of the critical section, and [unlock] is never invoked before a successful + * lock acquisition. + */ + fun unlock() +} + +/** + * Executes the given code [block] under this mutex's lock. + * + * @return result of [block] + */ +@OptIn(ExperimentalContracts::class) +inline fun SynchronousMutex.withLock(block: () -> T): T { + contract { callsInPlace(block, InvocationKind.EXACTLY_ONCE) } + lock() + return try { + block() + } finally { + unlock() + } +} diff --git a/atomicfu/src/concurrentMain/kotlin/kotlinx/atomicfu/locks/Mutex.kt b/atomicfu/src/concurrentMain/kotlin/kotlinx/atomicfu/locks/Mutex.kt new file mode 100644 index 00000000..fda2fb44 --- /dev/null +++ b/atomicfu/src/concurrentMain/kotlin/kotlinx/atomicfu/locks/Mutex.kt @@ -0,0 +1,201 @@ +package kotlinx.atomicfu.locks + +import kotlinx.atomicfu.AtomicRef +import kotlinx.atomicfu.atomic +import kotlin.time.Duration +import kotlin.time.TimeSource + +/** + * Mutex implementation for Kotlin/Native. + * In concurrentMain sourceSet to be testable with Lincheck. + * [park] and [unpark] functions can be passed for testability. + */ +internal class NativeMutex( + val park : (Duration) -> Unit = { ParkingSupport.park(it) }, + val unpark : (ParkingHandle) -> Unit = ParkingSupport::unpark, +) { + /** + * The [state] variable stands for: 0 -> Lock is free + * 1 -> Lock is locked but no waiters + * 4 -> Lock is locked with 3 waiters + * + * The state.incrementAndGet() call makes my claim on the lock. + * The returned value either means I acquired it (when it is 1). + * Or I need to enqueue and park (when it is > 1). + * + * The [holdCount] variable is to enable reentrancy. + * + * Works by using a [parkingQueue]. + * When a thread tries to acquire the lock, but finds it is already locked it enqueues by appending to the [parkingQueue]. + * On enqueue the parking queue provides the second last node, this node is used to park on. + * When our thread is woken up that means that the thread parked on the thrid last node called unpark on the second last node. + * Since a woken up thread is first inline it means that it's node is the head and can therefore dequeue. + * + * Unlocking happens by calling state.decrementAndGet(). + * When the returned value is 0 it means the lock is free and we can simply return. + * If the new state is > 0, then there are waiters. We wake up the first by unparking the head of the queue. + * This even works when a thread is not parked yet, + * since the ThreadParker can be pre-unparked resulting in the parking call to return immediately. + */ + private val parkingQueue = ParkingQueue() + private val owningThread = atomic(null) + private val state = atomic(0) + private val holdCount = atomic(0) + + fun lock() { + tryLock(Duration.INFINITE) + } + + fun tryLock(duration: Duration): Boolean { + val currentParkingHandle = ParkingSupport.currentThreadHandle() + + // Has to be checked in this order! + if (holdCount.value > 0 && currentParkingHandle == owningThread.value) { + // Is reentring thread + holdCount.incrementAndGet() + return true + } + + // Otherwise try acquire lock + val newState = state.incrementAndGet() + // If new state 1 than I have acquired lock skipping queue. + if (newState == 1) { + owningThread.value = currentParkingHandle + holdCount.incrementAndGet() + return true + } + + // If state larger than 1 -> enqueue and park + // When woken up thread has acquired lock and his node in the queue is therefore at the head. + // Remove head + if (newState > 1) { + val prevNode = parkingQueue.enqueue() + // if timeout + if (!prevNode.nodeWait(duration)) return false + parkingQueue.dequeue() + owningThread.value = currentParkingHandle + holdCount.incrementAndGet() + return true + } + + return true + } + + fun unlock() { + val currentThreadId = ParkingSupport.currentThreadHandle() + val currentOwnerId = owningThread.value + if (currentThreadId != currentOwnerId) throw IllegalStateException("Thread is not holding the lock") + + // dec hold count + val newHoldCount = holdCount.decrementAndGet() + if (newHoldCount > 0) return + if (newHoldCount < 0) throw IllegalStateException("Thread unlocked more than it locked") + + // Lock is released by decrementing (only if decremented to 0) + val currentState = state.decrementAndGet() + if (currentState == 0) return + + // If waiters wake up the first in line. The woken up thread will dequeue the node. + if (currentState > 0) { + var nextParker = parkingQueue.getHead() + // If cancelled And there are other waiting nodes, go to next + while (!nextParker.nodeWake() && state.decrementAndGet() > 0) { + // We only dequeue here in case of timeoud out node. + // Dequeueing woken nodes can lead to issues when pre-unparked. + parkingQueue.dequeue() + nextParker = parkingQueue.getHead() + } + return + } + } + + fun tryLock(): Boolean { + val currentThreadId = ParkingSupport.currentThreadHandle() + if (holdCount.value > 0 && owningThread.value == currentThreadId || state.compareAndSet(0, 1)) { + owningThread.value = currentThreadId + holdCount.incrementAndGet() + return true + } + return false + } + + // Based on Micheal-Scott Queue + inner class ParkingQueue { + private val head: AtomicRef + private val tail: AtomicRef + + init { + val first = Node() + head = atomic(first) + tail = atomic(first) + } + + fun getHead(): Node { + return head.value + } + + fun enqueue(): Node { + while (true) { + val node = Node() + val curTail = tail.value + if (curTail.next.compareAndSet(null, node)) { + tail.compareAndSet(curTail, node) + return curTail + } + else tail.compareAndSet(curTail, curTail.next.value!!) + } + } + + fun dequeue() { + while (true) { + val currentHead = head.value + val currentHeadNext = currentHead.next.value ?: throw IllegalStateException("Dequeing parker but already empty, should not be possible") + if (head.compareAndSet(currentHead, currentHeadNext)) return + } + } + + } + + inner class Node { + val parker = atomic(Empty) + val next = atomic(null) + + fun nodeWait(duration: Duration): Boolean { + val deadline = TimeSource.Monotonic.markNow() + duration + while (true) { + when (parker.value) { + Empty -> if (parker.compareAndSet(Empty, ParkingSupport.currentThreadHandle())) { + park(deadline - TimeSource.Monotonic.markNow()) + if (deadline < TimeSource.Monotonic.markNow()) + parker.compareAndSet(ParkingSupport.currentThreadHandle(), Cancelled) + } + is ParkingHandle -> { + park(deadline - TimeSource.Monotonic.markNow()) + if (deadline < TimeSource.Monotonic.markNow()) + parker.compareAndSet(ParkingSupport.currentThreadHandle(), Cancelled) + } + Awoken -> return true + Cancelled -> return false + } + } + } + + fun nodeWake(): Boolean { + while (true) { + when (val currentState = parker.value) { + Empty -> if (parker.compareAndSet(Empty, Awoken)) return true + is ParkingHandle -> if (parker.compareAndSet(currentState, Awoken)) { + unpark(currentState) + return true + } + Awoken -> throw IllegalStateException("Node is already woken") + Cancelled -> return false + } + } + } + } + + private object Empty + private object Awoken + private object Cancelled +} diff --git a/atomicfu/src/concurrentTest/kotlin/kotlinx/atomicfu/locks/LockWithTimoutTests.kt b/atomicfu/src/concurrentTest/kotlin/kotlinx/atomicfu/locks/LockWithTimoutTests.kt new file mode 100644 index 00000000..2b7a524c --- /dev/null +++ b/atomicfu/src/concurrentTest/kotlin/kotlinx/atomicfu/locks/LockWithTimoutTests.kt @@ -0,0 +1,58 @@ +package kotlinx.atomicfu.locks + +import kotlinx.atomicfu.atomic +import kotlin.random.Random +import kotlin.test.Test +import kotlin.test.assertEquals +import kotlin.time.Duration.Companion.milliseconds + +class LockWithTimeoutTests { + + // Helper class with atomic counter in constructor + class AtomicCounter(initialValue: Int = 0) { + val counter = atomic(initialValue) + + fun incrementAndGet(): Int = counter.incrementAndGet() + val value: Int get() = counter.value + } + + @Test + fun timeoutLockStressTest() { + val mutex = SynchronousMutex() + val counter = AtomicCounter(0) + val targetCount = 1000 + val threads = mutableListOf() + + // Create 5 test threads + repeat(5) { threadId -> + val thread = testThread { + while (counter.value < targetCount) { + // Try to acquire the lock with a timeout + if (mutex.tryLock((Random.nextInt(1, 10)).milliseconds)) { + try { + // Increment the counter if lock was acquired + if (counter.value < targetCount) { + counter.incrementAndGet() + } + // Random sleep to increase variation + sleepMillis(Random.nextInt(0, 5).toLong()) + } finally { + mutex.unlock() + } + } + + // Random sleep between attempts to increase variation + sleepMillis(Random.nextInt(0, 3).toLong()) + } + } + threads.add(thread) + } + + // Wait for all threads to complete + threads.forEach { it.join() } + + // Verify the counter reached the target + assertEquals(targetCount, counter.value) + } + +} diff --git a/atomicfu/src/concurrentTest/kotlin/kotlinx/atomicfu/locks/NativeMutexTest.kt b/atomicfu/src/concurrentTest/kotlin/kotlinx/atomicfu/locks/NativeMutexTest.kt new file mode 100644 index 00000000..e472697d --- /dev/null +++ b/atomicfu/src/concurrentTest/kotlin/kotlinx/atomicfu/locks/NativeMutexTest.kt @@ -0,0 +1,88 @@ +package kotlinx.atomicfu.locks + +import kotlin.test.Test +import kotlin.test.assertEquals + +class NativeMutexTest { + + + @Test + fun testNativeMutexSlow() { + val mutex = NativeMutex() + val resultList = mutableListOf() + + val fut1 = testThread { + repeat(30) { i -> + mutex.lock() + resultList.add("a$i") + sleepMillis(100) + resultList.add("a$i") + mutex.unlock() + } + } + + val fut2 = testThread { + repeat(30) { i -> + mutex.lock() + resultList.add("b$i") + sleepMillis(100) + resultList.add("b$i") + mutex.unlock() + } + } + + repeat(30) { i -> + mutex.lock() + resultList.add("c$i") + sleepMillis(100) + resultList.add("c$i") + mutex.unlock() + } + fut1.join() + fut2.join() + + resultList.filterIndexed { i, _ -> i % 2 == 0 } + .zip(resultList.filterIndexed {i, _ -> i % 2 == 1}) { a, b -> + assertEquals(a, b) + } + } + + @Test + fun testNativeMutexFast() { + val mutex = SynchronousMutex() + val resultList = mutableListOf() + + val fut1 = testThread { + repeat(30000) { i -> + mutex.lock() + resultList.add("a$i") + resultList.add("a$i") + mutex.unlock() + } + } + + val fut2 = testThread { + repeat(30000) { i -> + mutex.lock() + resultList.add("b$i") + resultList.add("b$i") + mutex.unlock() + } + } + + repeat(30000) { i -> + mutex.lock() + resultList.add("c$i") + resultList.add("c$i") + mutex.unlock() + } + fut1.join() + fut2.join() + + resultList + .filterIndexed { i, _ -> i % 2 == 0 } + .zip(resultList.filterIndexed {i, _ -> i % 2 == 1}) { a, b -> + assertEquals(a, b) + } + } +} \ No newline at end of file diff --git a/atomicfu/src/concurrentTest/kotlin/kotlinx/atomicfu/locks/ReentrancyTests.kt b/atomicfu/src/concurrentTest/kotlin/kotlinx/atomicfu/locks/ReentrancyTests.kt new file mode 100644 index 00000000..28fe7c78 --- /dev/null +++ b/atomicfu/src/concurrentTest/kotlin/kotlinx/atomicfu/locks/ReentrancyTests.kt @@ -0,0 +1,28 @@ +package kotlinx.atomicfu.locks + +import kotlin.test.Test +import kotlin.test.assertFails + +class ReentrancyTests { + + @Test + fun reentrantTestSuccess() { + val lock = NativeMutex() + lock.lock() + lock.lock() + lock.unlock() + lock.unlock() + } + + @Test + fun reentrantTestFail() { + val lock = NativeMutex() + lock.lock() + lock.lock() + lock.unlock() + lock.unlock() + assertFails { + lock.unlock() + } + } +} \ No newline at end of file diff --git a/atomicfu/src/concurrentTest/kotlin/kotlinx/atomicfu/locks/VaryingContentionTest.kt b/atomicfu/src/concurrentTest/kotlin/kotlinx/atomicfu/locks/VaryingContentionTest.kt new file mode 100644 index 00000000..067bb0fd --- /dev/null +++ b/atomicfu/src/concurrentTest/kotlin/kotlinx/atomicfu/locks/VaryingContentionTest.kt @@ -0,0 +1,68 @@ +package kotlinx.atomicfu.locks + +import kotlinx.atomicfu.atomic +import kotlin.test.Test +import kotlin.test.assertTrue + +class VaryingContentionTest { + + @Test + fun varyingContentionTest() { + val lockInt = LockInt() + multiTestLock(lockInt, 10, 100000) + println("Varying Contention Test 1") + multiTestLock(lockInt, 1, 200000) + println("Varying Contention Test 2") + multiTestLock(lockInt, 20, 300000) + println("Varying Contention Test 3") + multiTestLock(lockInt, 1, 400000) + println("Varying Contention Test 4") + multiTestLock(lockInt, 2, 1000000) + println("Varying Contention Test Done") + } + + + private fun multiTestLock(lockInt: LockInt, nThreads: Int, countTo: Int) { + val futureList = mutableListOf() + repeat(nThreads) { i -> + val test = LockIntTest(lockInt, countTo, nThreads, i) + futureList.add(testWithThread(test)) + } + Fut.waitAllAndThrow(futureList) + } + + private fun testWithThread(t: LockIntTest): Fut { + return Fut { + while (true) { + t.lockInt.lock() + if (t.lockInt.n % t.mod == t.id) t.lockInt.n++ + if (t.lockInt.n >= t.max) { + t.lockInt.unlock() + break + } + t.lockInt.unlock() + } + } + } + + data class LockIntTest( + val lockInt: LockInt, + val max: Int, + val mod: Int, + val id: Int, + ) + + class LockInt { + private val lock = NativeMutex() + private val check = atomic(0) + var n = 0 + fun lock() { + lock.lock() + assertTrue(check.incrementAndGet() == 1) + } + fun unlock() { + assertTrue(check.decrementAndGet() == 0) + lock.unlock() + } + } +} \ No newline at end of file diff --git a/atomicfu/src/jsAndWasmSharedMain/kotlin/kotlinx/atomicfu/locks/Mutex.jsAndWasmShared.kt b/atomicfu/src/jsAndWasmSharedMain/kotlin/kotlinx/atomicfu/locks/Mutex.jsAndWasmShared.kt new file mode 100644 index 00000000..9b31ddfd --- /dev/null +++ b/atomicfu/src/jsAndWasmSharedMain/kotlin/kotlinx/atomicfu/locks/Mutex.jsAndWasmShared.kt @@ -0,0 +1,17 @@ +package kotlinx.atomicfu.locks + +import kotlin.time.Duration + +/** + * Part of multiplatform mutex. + * Since this mutex will run in a single threaded environment, it doesn't provide any real synchronization. + * + * It does keep track of reentrancy. + */ +actual class SynchronousMutex { + private var state = 0 + actual fun tryLock(): Boolean = true + actual fun tryLock(timeout: Duration): Boolean = true + actual fun lock(): Unit { state++ } + actual fun unlock(): Unit { if (state-- < 0) throw IllegalStateException("Mutex already unlocked") } +} \ No newline at end of file diff --git a/atomicfu/src/jvmMain/kotlin/kotlinx/atomicfu/locks/SynchronousMutex.kt b/atomicfu/src/jvmMain/kotlin/kotlinx/atomicfu/locks/SynchronousMutex.kt new file mode 100644 index 00000000..ad670f8a --- /dev/null +++ b/atomicfu/src/jvmMain/kotlin/kotlinx/atomicfu/locks/SynchronousMutex.kt @@ -0,0 +1,17 @@ +package kotlinx.atomicfu.locks + +import java.util.concurrent.TimeUnit +import kotlin.time.Duration + +/** + * This mutex uses a [ReentrantLock]. + * + * Construct with `Mutex(reentrantLock)` to create a [SynchronousMutex] that uses an existing instance of [ReentrantLock]. + */ +actual class SynchronousMutex { + private val reentrantLock = ReentrantLock() + actual fun tryLock(timeout: Duration): Boolean = reentrantLock.tryLock(timeout.inWholeNanoseconds, TimeUnit.NANOSECONDS) + actual fun tryLock(): Boolean = reentrantLock.tryLock() + actual fun lock() = reentrantLock.lock() + actual fun unlock() = reentrantLock.unlock() +} \ No newline at end of file diff --git a/atomicfu/src/jvmTest/kotlin/kotlinx/atomicfu/locks/NativeMutexLincheckReentrantTest.kt b/atomicfu/src/jvmTest/kotlin/kotlinx/atomicfu/locks/NativeMutexLincheckReentrantTest.kt new file mode 100644 index 00000000..2e294dc1 --- /dev/null +++ b/atomicfu/src/jvmTest/kotlin/kotlinx/atomicfu/locks/NativeMutexLincheckReentrantTest.kt @@ -0,0 +1,66 @@ +import kotlinx.atomicfu.locks.NativeMutex +import kotlinx.atomicfu.locks.ParkingHandle +import kotlinx.atomicfu.locks.ParkingSupport +import kotlinx.atomicfu.locks.ThreadParker +import org.jetbrains.kotlinx.lincheck.annotations.Operation +import org.jetbrains.kotlinx.lincheck.check +import org.jetbrains.kotlinx.lincheck.strategy.managed.modelchecking.ModelCheckingOptions +import org.jetbrains.kotlinx.lincheck.util.LoggingLevel +import java.util.concurrent.ConcurrentHashMap +import kotlin.test.Test + +class NativeMutexLincheckReentrantTest { + class Counter { + @Volatile + private var value = 0 + + fun inc(): Int = ++value + fun get() = value + } + + private val counter = Counter() + private val localParkers = ConcurrentHashMap() + + private val lock = NativeMutex( + park = { localParkers[ParkingSupport.currentThreadHandle()]!!.park() }, + unpark = { localParkers[it]!!.unpark() } + ) + + @Test + fun modelCheckingTest(): Unit = ModelCheckingOptions() + .iterations(2) // Change to 300 for exhaustive testing + .invocationsPerIteration(5_000) + .actorsBefore(1) + .threads(3) + .actorsPerThread(3) + .actorsAfter(0) + .hangingDetectionThreshold(100) + .logLevel(LoggingLevel.INFO) + .check(this::class.java) + + @Operation + fun inc(): Int { + localParkers.computeIfAbsent(ParkingSupport.currentThreadHandle()) { ThreadParker() } + lock.lock() + if (!lock.tryLock()) throw IllegalStateException("couldnt reent with trylock") + if (!lock.tryLock()) throw IllegalStateException("couldnt reent with trylock") + val result = counter.inc() + lock.unlock() + lock.unlock() + lock.unlock() + return result + } + + @Operation + fun get(): Int { + localParkers.computeIfAbsent(ParkingSupport.currentThreadHandle()) { ThreadParker() } + lock.lock() + if (!lock.tryLock()) throw IllegalStateException("couldnt reent with trylock") + if (!lock.tryLock()) throw IllegalStateException("couldnt reent with trylock") + val result = counter.get() + lock.unlock() + lock.unlock() + lock.unlock() + return result + } +} diff --git a/atomicfu/src/jvmTest/kotlin/kotlinx/atomicfu/locks/NativeMutexLincheckTest.kt b/atomicfu/src/jvmTest/kotlin/kotlinx/atomicfu/locks/NativeMutexLincheckTest.kt new file mode 100644 index 00000000..e639a7e7 --- /dev/null +++ b/atomicfu/src/jvmTest/kotlin/kotlinx/atomicfu/locks/NativeMutexLincheckTest.kt @@ -0,0 +1,55 @@ +import kotlinx.atomicfu.locks.NativeMutex +import kotlinx.atomicfu.locks.ParkingHandle +import kotlinx.atomicfu.locks.ParkingSupport +import kotlinx.atomicfu.locks.ThreadParker +import org.jetbrains.kotlinx.lincheck.annotations.Operation +import org.jetbrains.kotlinx.lincheck.check +import org.jetbrains.kotlinx.lincheck.strategy.managed.modelchecking.ModelCheckingOptions +import org.jetbrains.kotlinx.lincheck.util.LoggingLevel +import java.util.concurrent.ConcurrentHashMap +import kotlin.test.Test + +class NativeMutexLincheckTest { + class Counter { + @Volatile + private var value = 0 + + fun inc(): Int = ++value + fun get() = value + } + private val counter = Counter() + private val localParkers = ConcurrentHashMap() + + private val lock = NativeMutex( + park = { localParkers[ParkingSupport.currentThreadHandle()]!!.park() }, + unpark = { localParkers[it]!!.unpark() } + ) + + @Test + fun modelCheckingTest(): Unit = ModelCheckingOptions() + .iterations(2) // Change to 300 for exhaustive testing + .invocationsPerIteration(5_000) + .actorsBefore(1) + .threads(3) + .actorsPerThread(3) + .actorsAfter(0) + .hangingDetectionThreshold(100) + .logLevel(LoggingLevel.INFO) + .check(this::class.java) + + @Operation + fun inc() { + localParkers.computeIfAbsent(ParkingSupport.currentThreadHandle()) { ThreadParker() } + lock.lock() + counter.inc() + lock.unlock() + } + + @Operation + fun get() { + localParkers.computeIfAbsent(ParkingSupport.currentThreadHandle()) { ThreadParker() } + lock.lock() + counter.get() + lock.unlock() + } +} \ No newline at end of file diff --git a/atomicfu/src/jvmTest/kotlin/kotlinx/atomicfu/locks/NativeMutexTimeoutLincheckTest.kt b/atomicfu/src/jvmTest/kotlin/kotlinx/atomicfu/locks/NativeMutexTimeoutLincheckTest.kt new file mode 100644 index 00000000..4469ed56 --- /dev/null +++ b/atomicfu/src/jvmTest/kotlin/kotlinx/atomicfu/locks/NativeMutexTimeoutLincheckTest.kt @@ -0,0 +1,63 @@ +package kotlinx.atomicfu.locks + +import org.jetbrains.kotlinx.lincheck.annotations.Operation +import org.jetbrains.kotlinx.lincheck.check +import org.jetbrains.kotlinx.lincheck.strategy.managed.modelchecking.ModelCheckingOptions +import org.jetbrains.kotlinx.lincheck.util.LoggingLevel +import org.junit.Test +import java.util.concurrent.ConcurrentHashMap +import kotlin.time.Duration.Companion.nanoseconds + +class NativeMutexTimeoutLincheckTest { + class Counter { + @Volatile + private var value = 0 + + fun inc(): Int = ++value + fun get() = value + } + private val counter = Counter() + private val localParkers = ConcurrentHashMap() + + private val lock = NativeMutex( + park = { localParkers[ParkingSupport.currentThreadHandle()]!!.parkNanos(it.inWholeNanoseconds) }, + unpark = { localParkers[it]!!.unpark() } + ) + + @Test + fun modelCheckingTest(): Unit = ModelCheckingOptions() + .iterations(2) // Change to 300 for exhaustive testing + .invocationsPerIteration(5_000) + .actorsBefore(1) + .threads(3) + .actorsPerThread(3) + .actorsAfter(0) + .hangingDetectionThreshold(100) + .logLevel(LoggingLevel.INFO) + .check(this::class.java) + + @Operation + fun incNoTimeout() { + localParkers.computeIfAbsent(ParkingSupport.currentThreadHandle()) { ThreadParker() } + lock.lock() + counter.inc() + lock.unlock() + } + + @Operation + fun incTimeout() { + localParkers.computeIfAbsent(ParkingSupport.currentThreadHandle()) { ThreadParker() } + if (lock.tryLock(0.nanoseconds)) { + counter.inc() + lock.unlock() + } + } + + @Operation + fun get() { + localParkers.computeIfAbsent(ParkingSupport.currentThreadHandle()) { ThreadParker() } + lock.lock() + counter.get() + lock.unlock() + } +} \ No newline at end of file diff --git a/atomicfu/src/linuxMain/kotlin/kotlinx/atomicfu/locks/NativeMutexNode.kt b/atomicfu/src/linuxMain/kotlin/kotlinx/atomicfu/locks/NativeMutexNode.kt deleted file mode 100644 index b13bce3b..00000000 --- a/atomicfu/src/linuxMain/kotlin/kotlinx/atomicfu/locks/NativeMutexNode.kt +++ /dev/null @@ -1,55 +0,0 @@ -/* - * Copyright 2025 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. - */ -package kotlinx.atomicfu.locks - -import kotlinx.atomicfu.atomic -import kotlinx.cinterop.Arena -import kotlinx.cinterop.ExperimentalForeignApi -import kotlinx.cinterop.alloc -import kotlinx.cinterop.ptr -import platform.posix.* - -@OptIn(ExperimentalForeignApi::class) -actual class NativeMutexNode { - actual var next: NativeMutexNode? = null - - private val arena: Arena = Arena() - private val cond: pthread_cond_t = arena.alloc() - private val mutex: pthread_mutex_t = arena.alloc() - private val attr: pthread_mutexattr_t = arena.alloc() - - init { - require(pthread_cond_init(cond.ptr, null) == 0) - require(pthread_mutexattr_init(attr.ptr) == 0) - require(pthread_mutexattr_settype(attr.ptr, PTHREAD_MUTEX_ERRORCHECK.toInt()) == 0) - require(pthread_mutex_init(mutex.ptr, attr.ptr) == 0) - } - - actual fun lock() { - pthread_mutex_lock(mutex.ptr) - } - - actual fun unlock() { - pthread_mutex_unlock(mutex.ptr) - } - - internal actual fun wait(lockOwner: Long) { - pthread_cond_wait(cond.ptr, mutex.ptr) - } - - internal actual fun notify() { - pthread_cond_signal(cond.ptr) - } - - internal actual fun dispose() { - pthread_cond_destroy(cond.ptr) - pthread_mutex_destroy(mutex.ptr) - pthread_mutexattr_destroy(attr.ptr) - arena.clear() - } -} - -private val threadCounter = atomic(0L) - -internal actual fun createThreadId(): Long = threadCounter.incrementAndGet() diff --git a/atomicfu/src/mingwMain/kotlin/kotlinx/atomicfu/locks/NativeMutexNode.kt b/atomicfu/src/mingwMain/kotlin/kotlinx/atomicfu/locks/NativeMutexNode.kt deleted file mode 100644 index 591034ea..00000000 --- a/atomicfu/src/mingwMain/kotlin/kotlinx/atomicfu/locks/NativeMutexNode.kt +++ /dev/null @@ -1,55 +0,0 @@ -/* - * Copyright 2025 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. - */ -package kotlinx.atomicfu.locks - -import kotlinx.atomicfu.atomic -import kotlinx.cinterop.Arena -import kotlinx.cinterop.ExperimentalForeignApi -import kotlinx.cinterop.alloc -import kotlinx.cinterop.ptr -import platform.posix.* - -@OptIn(ExperimentalForeignApi::class) -actual class NativeMutexNode { - actual var next: NativeMutexNode? = null - - private val arena: Arena = Arena() - private val cond: pthread_cond_tVar = arena.alloc() - private val mutex: pthread_mutex_tVar = arena.alloc() - private val attr: pthread_mutexattr_tVar = arena.alloc() - - init { - require(pthread_cond_init(cond.ptr, null) == 0) - require(pthread_mutexattr_init(attr.ptr) == 0) - require(pthread_mutexattr_settype(attr.ptr, PTHREAD_MUTEX_ERRORCHECK) == 0) - require(pthread_mutex_init(mutex.ptr, attr.ptr) == 0) - } - - actual fun lock() { - pthread_mutex_lock(mutex.ptr) - } - - actual fun unlock() { - pthread_mutex_unlock(mutex.ptr) - } - - internal actual fun wait(lockOwner: Long) { - pthread_cond_wait(cond.ptr, mutex.ptr) - } - - internal actual fun notify() { - pthread_cond_signal(cond.ptr) - } - - internal actual fun dispose() { - pthread_cond_destroy(cond.ptr) - pthread_mutex_destroy(mutex.ptr) - pthread_mutexattr_destroy(attr.ptr) - arena.clear() - } -} - -private val threadCounter = atomic(0L) - -actual fun createThreadId(): Long = threadCounter.incrementAndGet() \ No newline at end of file diff --git a/atomicfu/src/nativeMain/kotlin/kotlinx/atomicfu/locks/NativeMutexNode.kt b/atomicfu/src/nativeMain/kotlin/kotlinx/atomicfu/locks/NativeMutexNode.kt deleted file mode 100644 index 3b6be80d..00000000 --- a/atomicfu/src/nativeMain/kotlin/kotlinx/atomicfu/locks/NativeMutexNode.kt +++ /dev/null @@ -1,21 +0,0 @@ -/* - * Copyright 2025 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. - */ -package kotlinx.atomicfu.locks - - -public expect class NativeMutexNode() { - - internal var next: NativeMutexNode? - - public fun lock() - - public fun unlock() - - // The lockOwner is used for qos donation on iOS - internal fun wait(lockOwner: Long) - - internal fun notify() - - internal fun dispose() -} diff --git a/atomicfu/src/nativeMain/kotlin/kotlinx/atomicfu/locks/Synchronized.kt b/atomicfu/src/nativeMain/kotlin/kotlinx/atomicfu/locks/Synchronized.kt index 79e5e0ae..e112155b 100644 --- a/atomicfu/src/nativeMain/kotlin/kotlinx/atomicfu/locks/Synchronized.kt +++ b/atomicfu/src/nativeMain/kotlin/kotlinx/atomicfu/locks/Synchronized.kt @@ -1,106 +1,13 @@ -/* - * Copyright 2025 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. - */ package kotlinx.atomicfu.locks -import kotlin.native.ref.createCleaner -import kotlinx.atomicfu.* - - -import kotlin.native.concurrent.ThreadLocal - -internal const val NO_OWNER = 0L -private const val UNSET = 0L - -@ThreadLocal -private var currentThreadId = UNSET - -// Based on the compose-multiplatform-core implementation with added qos and the pool back-ported -// from the atomicfu implementation. public actual open class SynchronizedObject { - private val ownerThreadId: AtomicLong = atomic(NO_OWNER) - private var reEnterCount: Int = 0 - private val threadsOnLock: AtomicInt = atomic(0) - - private val monitor: MonitorWrapper by lazy { MonitorWrapper() } - - - public fun lock() { - var self = currentThreadId - if (self == UNSET) { - currentThreadId = createThreadId() - self = currentThreadId - } - if (ownerThreadId.value == self) { - reEnterCount += 1 - } else if (threadsOnLock.incrementAndGet() > 1) { - waitForUnlockAndLock(self) - } else { - if (!ownerThreadId.compareAndSet(NO_OWNER, self)) { - waitForUnlockAndLock(self) - } - } - } - - public fun tryLock(): Boolean { - var self = currentThreadId - if (self == 0L) { - currentThreadId = createThreadId() - self = currentThreadId - } - return if (ownerThreadId.value == self) { - reEnterCount += 1 - true - } else if (threadsOnLock.incrementAndGet() == 1 && ownerThreadId.compareAndSet(NO_OWNER, self)) { - true - } else { - threadsOnLock.decrementAndGet() - false - } - } - - - private fun waitForUnlockAndLock(self: Long) { - withMonitor(monitor) { - while (!ownerThreadId.compareAndSet(NO_OWNER, self)) { - monitor.nativeMutex.wait(ownerThreadId.value) - } - } - } - - public fun unlock() { - require (ownerThreadId.value == currentThreadId) - if (reEnterCount > 0) { - reEnterCount -= 1 - } else { - ownerThreadId.value = NO_OWNER - if (threadsOnLock.decrementAndGet() > 0) { - withMonitor(monitor) { - // We expect the highest priority thread to be woken up, but this should work - // in any case. - monitor.nativeMutex.notify() - } - } - } - } - - private inline fun withMonitor(monitor: MonitorWrapper, block: () -> Unit) { - monitor.nativeMutex.lock() - return try { - block() - } finally { - monitor.nativeMutex.unlock() - } - } - - @OptIn(kotlin.experimental.ExperimentalNativeApi::class) - private class MonitorWrapper { - val nativeMutex = mutexPool.allocate() - val cleaner = createCleaner(nativeMutex) { mutexPool.release(it) } - } + + private val nativeMutex = SynchronousMutex() + public fun lock() = nativeMutex.lock() + public fun tryLock(): Boolean = nativeMutex.tryLock() + public fun unlock() = nativeMutex.unlock() } - public actual fun reentrantLock() = ReentrantLock() public actual typealias ReentrantLock = SynchronizedObject @@ -121,55 +28,4 @@ public actual inline fun synchronized(lock: SynchronizedObject, block: () -> } finally { lock.unlock() } -} - - -private const val INITIAL_POOL_CAPACITY = 64 -private const val MAX_POOL_SIZE = 1024 - -internal val mutexPool by lazy { MutexPool() } - -internal class MutexPool() { - private val size = atomic(0) - private val top = atomic(null) - - init { - // Immediately form a stack - for (i in 0 until INITIAL_POOL_CAPACITY) { - release(NativeMutexNode()) - } - } - - private fun allocMutexNode() = NativeMutexNode() - - fun allocate(): NativeMutexNode = pop() ?: allocMutexNode() - - fun release(mutexNode: NativeMutexNode) { - if (size.value > MAX_POOL_SIZE) { - mutexNode.dispose() - } else { - while (true) { - val oldTop = top.value - mutexNode.next = oldTop - if (top.compareAndSet(oldTop, mutexNode)) { - size.incrementAndGet() - return - } - } - } - } - - private fun pop(): NativeMutexNode? { - while (true) { - val oldTop = top.value - if (oldTop == null) { - return null - } - val newHead = oldTop.next - if (top.compareAndSet(oldTop, newHead)) { - size.decrementAndGet() - return oldTop - } - } - } -} +} \ No newline at end of file diff --git a/atomicfu/src/nativeMain/kotlin/kotlinx/atomicfu/locks/SynchronousMutex.kt b/atomicfu/src/nativeMain/kotlin/kotlinx/atomicfu/locks/SynchronousMutex.kt new file mode 100644 index 00000000..ced631d5 --- /dev/null +++ b/atomicfu/src/nativeMain/kotlin/kotlinx/atomicfu/locks/SynchronousMutex.kt @@ -0,0 +1,11 @@ +package kotlinx.atomicfu.locks + +import kotlin.time.Duration + +actual class SynchronousMutex { + private val lock = NativeMutex() + actual fun tryLock() = lock.tryLock() + actual fun tryLock(timeout: Duration) = lock.tryLock(timeout) + actual fun lock() = lock.lock() + actual fun unlock() = lock.unlock() +} \ No newline at end of file diff --git a/atomicfu/src/nativeMain/kotlin/kotlinx/atomicfu/locks/ThreadId.kt b/atomicfu/src/nativeMain/kotlin/kotlinx/atomicfu/locks/ThreadId.kt deleted file mode 100644 index acb782bd..00000000 --- a/atomicfu/src/nativeMain/kotlin/kotlinx/atomicfu/locks/ThreadId.kt +++ /dev/null @@ -1,3 +0,0 @@ -package kotlinx.atomicfu.locks - -internal expect fun createThreadId(): Long \ No newline at end of file