Skip to content

Commit 5510b6d

Browse files
committed
Introduce mutex
1 parent 0260de2 commit 5510b6d

File tree

18 files changed

+780
-283
lines changed

18 files changed

+780
-283
lines changed

atomicfu/api/atomicfu.api

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -143,3 +143,15 @@ public final class kotlinx/atomicfu/locks/ParkingSupport {
143143
public final fun unpark (Ljava/lang/Thread;)V
144144
}
145145

146+
public final class kotlinx/atomicfu/locks/SynchronousMutex {
147+
public fun <init> ()V
148+
public final fun lock ()V
149+
public final fun tryLock ()Z
150+
public final fun tryLock-LRDsOJo (J)Z
151+
public final fun unlock ()V
152+
}
153+
154+
public final class kotlinx/atomicfu/locks/SynchronousMutexKt {
155+
public static final fun withLock (Lkotlinx/atomicfu/locks/SynchronousMutex;Lkotlin/jvm/functions/Function0;)Ljava/lang/Object;
156+
}
157+

atomicfu/build.gradle.kts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,7 @@ kotlin {
127127

128128
jvmTest {
129129
dependencies {
130+
implementation("org.jetbrains.kotlinx:lincheck:2.38")
130131
implementation("org.jetbrains.kotlin:kotlin-reflect")
131132
implementation("org.jetbrains.kotlin:kotlin-test")
132133
implementation("org.jetbrains.kotlin:kotlin-test-junit")

atomicfu/src/androidNativeMain/kotlin/kotlinx/atomicfu/locks/NativeMutexNode.kt

Lines changed: 0 additions & 57 deletions
This file was deleted.
Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
package kotlinx.atomicfu.locks
2+
3+
import kotlin.contracts.ExperimentalContracts
4+
import kotlin.contracts.InvocationKind
5+
import kotlin.contracts.contract
6+
import kotlin.time.Duration
7+
8+
/**
9+
* Mutual exclusion for Kotlin Multiplatform.
10+
*
11+
* It can protect a shared resource or critical section from multiple thread accesses.
12+
* Threads can acquire the lock by calling [lock] and release the lock by calling [unlock].
13+
*
14+
* When a thread calls [lock] while another thread is locked, it will suspend until the lock is released.
15+
* When multiple threads are waiting for the lock, they will acquire it in a fair order (first in first out).
16+
* On JVM, a [lock] call can skip the queue if it happens in between a thread releasing and the first in queue acquiring.
17+
*
18+
* It is reentrant, meaning the lock holding thread can call [lock] multiple times without suspending.
19+
* To release the lock (after multiple [lock] calls) an equal number of [unlock] calls are required.
20+
*
21+
* This Mutex should not be used in combination with coroutines and `suspend` functions
22+
* as it blocks the waiting thread.
23+
* Use the `Mutex` from the coroutines library instead.
24+
*
25+
* ```Kotlin
26+
* mutex.withLock {
27+
* // Critical section only executed by
28+
* // one thread at a time.
29+
* }
30+
* ```
31+
*/
32+
expect class SynchronousMutex() {
33+
/**
34+
* Tries to lock this mutex, returning `false` if this mutex is already locked.
35+
*
36+
* It is recommended to use [withLock] for safety reasons, so that the acquired lock is always
37+
* released at the end of your critical section, and [unlock] is never invoked before a successful
38+
* lock acquisition.
39+
*
40+
* (JVM only) this call can potentially skip line.
41+
*/
42+
fun tryLock(): Boolean
43+
44+
/**
45+
* Tries to lock this mutex within the given [timeout] period,
46+
* returning `false` if the duration passed without locking.
47+
*
48+
* Note: when [tryLock] succeeds the lock needs to be released by [unlock].
49+
* When [tryLock] does not succeed the lock does not have to be released.
50+
*
51+
* (JVM only) throws Interrupted exception when thread is interrupted while waiting for lock.
52+
*/
53+
fun tryLock(timeout: Duration): Boolean
54+
55+
/**
56+
* Locks the mutex, suspends the thread until the lock is acquired.
57+
*
58+
* It is recommended to use [withLock] for safety reasons, so that the acquired lock is always
59+
* released at the end of your critical section, and [unlock] is never invoked before a successful
60+
* lock acquisition.
61+
*/
62+
fun lock()
63+
64+
/**
65+
* Releases the lock.
66+
* Throws [IllegalStateException] when the current thread is not holding the lock.
67+
*
68+
* It is recommended to use [withLock] for safety reasons, so that the acquired lock is always
69+
* released at the end of the critical section, and [unlock] is never invoked before a successful
70+
* lock acquisition.
71+
*/
72+
fun unlock()
73+
}
74+
75+
/**
76+
* Executes the given code [block] under this mutex's lock.
77+
*
78+
* @return result of [block]
79+
*/
80+
@OptIn(ExperimentalContracts::class)
81+
inline fun <T> SynchronousMutex.withLock(block: () -> T): T {
82+
contract { callsInPlace(block, InvocationKind.EXACTLY_ONCE) }
83+
lock()
84+
return try {
85+
block()
86+
} finally {
87+
unlock()
88+
}
89+
}
Lines changed: 201 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,201 @@
1+
package kotlinx.atomicfu.locks
2+
3+
import kotlinx.atomicfu.AtomicRef
4+
import kotlinx.atomicfu.atomic
5+
import kotlin.time.Duration
6+
import kotlin.time.TimeSource
7+
8+
/**
9+
* Mutex implementation for Kotlin/Native.
10+
* In concurrentMain sourceSet to be testable with Lincheck.
11+
* [park] and [unpark] functions can be passed for testability.
12+
*/
13+
internal class NativeMutex(
14+
val park : (Duration) -> Unit = { ParkingSupport.park(it) },
15+
val unpark : (ParkingHandle) -> Unit = ParkingSupport::unpark,
16+
) {
17+
/**
18+
* The [state] variable stands for: 0 -> Lock is free
19+
* 1 -> Lock is locked but no waiters
20+
* 4 -> Lock is locked with 3 waiters
21+
*
22+
* The state.incrementAndGet() call makes my claim on the lock.
23+
* The returned value either means I acquired it (when it is 1).
24+
* Or I need to enqueue and park (when it is > 1).
25+
*
26+
* The [holdCount] variable is to enable reentrancy.
27+
*
28+
* Works by using a [parkingQueue].
29+
* When a thread tries to acquire the lock, but finds it is already locked it enqueues by appending to the [parkingQueue].
30+
* On enqueue the parking queue provides the second last node, this node is used to park on.
31+
* When our thread is woken up that means that the thread parked on the thrid last node called unpark on the second last node.
32+
* Since a woken up thread is first inline it means that it's node is the head and can therefore dequeue.
33+
*
34+
* Unlocking happens by calling state.decrementAndGet().
35+
* When the returned value is 0 it means the lock is free and we can simply return.
36+
* If the new state is > 0, then there are waiters. We wake up the first by unparking the head of the queue.
37+
* This even works when a thread is not parked yet,
38+
* since the ThreadParker can be pre-unparked resulting in the parking call to return immediately.
39+
*/
40+
private val parkingQueue = ParkingQueue()
41+
private val owningThread = atomic<ParkingHandle?>(null)
42+
private val state = atomic(0)
43+
private val holdCount = atomic(0)
44+
45+
fun lock() {
46+
tryLock(Duration.INFINITE)
47+
}
48+
49+
fun tryLock(duration: Duration): Boolean {
50+
val currentParkingHandle = ParkingSupport.currentThreadHandle()
51+
52+
// Has to be checked in this order!
53+
if (holdCount.value > 0 && currentParkingHandle == owningThread.value) {
54+
// Is reentring thread
55+
holdCount.incrementAndGet()
56+
return true
57+
}
58+
59+
// Otherwise try acquire lock
60+
val newState = state.incrementAndGet()
61+
// If new state 1 than I have acquired lock skipping queue.
62+
if (newState == 1) {
63+
owningThread.value = currentParkingHandle
64+
holdCount.incrementAndGet()
65+
return true
66+
}
67+
68+
// If state larger than 1 -> enqueue and park
69+
// When woken up thread has acquired lock and his node in the queue is therefore at the head.
70+
// Remove head
71+
if (newState > 1) {
72+
val prevNode = parkingQueue.enqueue()
73+
// if timeout
74+
if (!prevNode.nodeWait(duration)) return false
75+
parkingQueue.dequeue()
76+
owningThread.value = currentParkingHandle
77+
holdCount.incrementAndGet()
78+
return true
79+
}
80+
81+
return true
82+
}
83+
84+
fun unlock() {
85+
val currentThreadId = ParkingSupport.currentThreadHandle()
86+
val currentOwnerId = owningThread.value
87+
if (currentThreadId != currentOwnerId) throw IllegalStateException("Thread is not holding the lock")
88+
89+
// dec hold count
90+
val newHoldCount = holdCount.decrementAndGet()
91+
if (newHoldCount > 0) return
92+
if (newHoldCount < 0) throw IllegalStateException("Thread unlocked more than it locked")
93+
94+
// Lock is released by decrementing (only if decremented to 0)
95+
val currentState = state.decrementAndGet()
96+
if (currentState == 0) return
97+
98+
// If waiters wake up the first in line. The woken up thread will dequeue the node.
99+
if (currentState > 0) {
100+
var nextParker = parkingQueue.getHead()
101+
// If cancelled And there are other waiting nodes, go to next
102+
while (!nextParker.nodeWake() && state.decrementAndGet() > 0) {
103+
// We only dequeue here in case of timeoud out node.
104+
// Dequeueing woken nodes can lead to issues when pre-unparked.
105+
parkingQueue.dequeue()
106+
nextParker = parkingQueue.getHead()
107+
}
108+
return
109+
}
110+
}
111+
112+
fun tryLock(): Boolean {
113+
val currentThreadId = ParkingSupport.currentThreadHandle()
114+
if (holdCount.value > 0 && owningThread.value == currentThreadId || state.compareAndSet(0, 1)) {
115+
owningThread.value = currentThreadId
116+
holdCount.incrementAndGet()
117+
return true
118+
}
119+
return false
120+
}
121+
122+
// Based on Micheal-Scott Queue
123+
inner class ParkingQueue {
124+
private val head: AtomicRef<Node>
125+
private val tail: AtomicRef<Node>
126+
127+
init {
128+
val first = Node()
129+
head = atomic(first)
130+
tail = atomic(first)
131+
}
132+
133+
fun getHead(): Node {
134+
return head.value
135+
}
136+
137+
fun enqueue(): Node {
138+
while (true) {
139+
val node = Node()
140+
val curTail = tail.value
141+
if (curTail.next.compareAndSet(null, node)) {
142+
tail.compareAndSet(curTail, node)
143+
return curTail
144+
}
145+
else tail.compareAndSet(curTail, curTail.next.value!!)
146+
}
147+
}
148+
149+
fun dequeue() {
150+
while (true) {
151+
val currentHead = head.value
152+
val currentHeadNext = currentHead.next.value ?: throw IllegalStateException("Dequeing parker but already empty, should not be possible")
153+
if (head.compareAndSet(currentHead, currentHeadNext)) return
154+
}
155+
}
156+
157+
}
158+
159+
inner class Node {
160+
val parker = atomic<Any>(Empty)
161+
val next = atomic<Node?>(null)
162+
163+
fun nodeWait(duration: Duration): Boolean {
164+
val deadline = TimeSource.Monotonic.markNow() + duration
165+
while (true) {
166+
when (parker.value) {
167+
Empty -> if (parker.compareAndSet(Empty, ParkingSupport.currentThreadHandle())) {
168+
park(deadline - TimeSource.Monotonic.markNow())
169+
if (deadline < TimeSource.Monotonic.markNow())
170+
parker.compareAndSet(ParkingSupport.currentThreadHandle(), Cancelled)
171+
}
172+
is ParkingHandle -> {
173+
park(deadline - TimeSource.Monotonic.markNow())
174+
if (deadline < TimeSource.Monotonic.markNow())
175+
parker.compareAndSet(ParkingSupport.currentThreadHandle(), Cancelled)
176+
}
177+
Awoken -> return true
178+
Cancelled -> return false
179+
}
180+
}
181+
}
182+
183+
fun nodeWake(): Boolean {
184+
while (true) {
185+
when (val currentState = parker.value) {
186+
Empty -> if (parker.compareAndSet(Empty, Awoken)) return true
187+
is ParkingHandle -> if (parker.compareAndSet(currentState, Awoken)) {
188+
unpark(currentState)
189+
return true
190+
}
191+
Awoken -> throw IllegalStateException("Node is already woken")
192+
Cancelled -> return false
193+
}
194+
}
195+
}
196+
}
197+
198+
private object Empty
199+
private object Awoken
200+
private object Cancelled
201+
}

0 commit comments

Comments
 (0)