Skip to content

Commit 3442d57

Browse files
committed
Refactor event listener management in Task and EventFlow
Removed redundant listener management methods and replaced them with a streamlined `unsubscribe` approach. Cleaned up code to improve maintainability and consistency across Task state transitions, ensuring proper cleanup of listeners tied to task instances.
1 parent 1de3f6f commit 3442d57

File tree

4 files changed

+27
-68
lines changed

4 files changed

+27
-68
lines changed

common/src/main/kotlin/com/lambda/event/EventFlow.kt

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,11 @@ object EventFlow {
8787
*/
8888
val concurrentListeners = Subscriber()
8989

90+
fun Any.unsubscribe() {
91+
syncListeners.unsubscribe(this)
92+
concurrentListeners.unsubscribe(this)
93+
}
94+
9095
init {
9196
// parallel event execution on dedicated threads
9297
runConcurrent {

common/src/main/kotlin/com/lambda/event/Subscriber.kt

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,18 @@ class Subscriber : ConcurrentHashMap<KClass<out Event>, ConcurrentSkipListSet<Li
5454
inline fun <reified T : Event> unsubscribe(listener: Listener<T>) =
5555
getOrElse(T::class) { defaultListenerSet }.remove(listener)
5656

57+
/**
58+
* Unsubscribes all listeners associated with the current instance (the caller object).
59+
* This method iterates over all values in the `Subscriber`'s map and removes listeners
60+
* whose `owner` property matches the caller object.
61+
*
62+
* Use this method when you want to clean up listeners that were registered with the
63+
* current instance, preventing further event notifications.
64+
*/
65+
fun unsubscribe(owner: Any) {
66+
values.forEach { it.removeAll { listener -> listener.owner == owner } }
67+
}
68+
5769
/** Allows a [Subscriber] to stop receiving all [Event]s of another [Subscriber] */
5870
infix fun unsubscribe(subscriber: Subscriber) {
5971
subscriber.forEach { (eventType, listeners) ->

common/src/main/kotlin/com/lambda/event/listener/SafeListener.kt

Lines changed: 0 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -130,51 +130,6 @@ class SafeListener<T : Event>(
130130
return listener
131131
}
132132

133-
/**
134-
* Registers a new [SafeListener] for a generic [Event] type [T] within the context of a [Task].
135-
* The [function] is executed on the same thread where the [Event] was dispatched.
136-
* The [function] will only be executed when the context satisfies certain safety conditions.
137-
* These conditions are met when none of the following [SafeContext] properties are null:
138-
* - [SafeContext.world]
139-
* - [SafeContext.player]
140-
* - [SafeContext.interaction]
141-
* - [SafeContext.connection]
142-
*
143-
* Usage:
144-
* ```kotlin
145-
* myTask.listen<MyEvent> { event ->
146-
* player.sendMessage("Event received: $event")
147-
* }
148-
*
149-
* myTask.listen<MyEvent>(priority = 1) { event ->
150-
* player.sendMessage("Event received before the previous listener: $event")
151-
* }
152-
* ```
153-
*
154-
* @param T The type of the event to listen for.
155-
* This should be a subclass of Event.
156-
* @param priority The priority of the listener.
157-
* Listeners with higher priority will be executed first.
158-
* The Default value is 0.
159-
* @param alwaysListen If true, the listener will be executed even if it is muted. The Default value is false.
160-
* @param function The function to be executed when the event is posted.
161-
* This function should take a SafeContext and an event of type T as parameters.
162-
* @return The newly created and registered [SafeListener].
163-
*/
164-
inline fun <reified T : Event> Task<*>.listen(
165-
priority: Int = 0,
166-
alwaysListen: Boolean = false,
167-
noinline function: SafeContext.(T) -> Unit = {},
168-
): SafeListener<T> {
169-
val listener = SafeListener<T>(priority, this, alwaysListen) { event ->
170-
function(event) // ToDo: run function always on game thread
171-
}
172-
173-
syncListeners.subscribe<T>(listener)
174-
175-
return listener
176-
}
177-
178133
/**
179134
* This function registers a new [SafeListener] for a generic [Event] type [T].
180135
* The [transform] is executed on the same thread where the [Event] was dispatched.

common/src/main/kotlin/com/lambda/task/Task.kt

Lines changed: 10 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,8 @@ package com.lambda.task
1919

2020
import com.lambda.Lambda.LOG
2121
import com.lambda.context.SafeContext
22-
import com.lambda.event.EventFlow
23-
import com.lambda.event.Subscriber
22+
import com.lambda.event.EventFlow.unsubscribe
23+
import com.lambda.event.Muteable
2424
import com.lambda.event.events.TickEvent
2525
import com.lambda.event.listener.SafeListener.Companion.listen
2626
import com.lambda.module.modules.client.TaskFlowModule
@@ -33,10 +33,11 @@ typealias TaskGenerator<R> = SafeContext.(R) -> Task<*>
3333
typealias TaskGeneratorOrNull<R> = SafeContext.(R) -> Task<*>?
3434
typealias TaskGeneratorUnit<R> = SafeContext.(R) -> Unit
3535

36-
abstract class Task<Result> : Nameable {
36+
abstract class Task<Result> : Nameable, Muteable {
3737
private var parent: Task<*>? = null
3838
private val subTasks = mutableListOf<Task<*>>()
39-
private var state = State.RUNNING
39+
private var state = State.INIT
40+
override val isMuted: Boolean get() = state == State.PAUSED || state == State.INIT
4041
var age = 0
4142
private val depth: Int get() = parent?.depth?.plus(1) ?: 0
4243
val isCompleted get() = state == State.COMPLETED
@@ -48,10 +49,8 @@ abstract class Task<Result> : Nameable {
4849
private var nextTaskOrNull: TaskGeneratorOrNull<Result>? = null
4950
private var onFinish: TaskGeneratorUnit<Result>? = null
5051

51-
val syncListeners = Subscriber()
52-
private val concurrentListeners = Subscriber()
53-
5452
enum class State {
53+
INIT,
5554
RUNNING,
5655
PAUSED,
5756
CANCELLED,
@@ -118,14 +117,14 @@ abstract class Task<Result> : Nameable {
118117
LOG.info("$name deactivating parent ${owner.name}")
119118
if (owner !is TaskFlow) owner.deactivate()
120119
}
120+
state = State.RUNNING
121121
runSafe { runCatching { onStart() }.onFailure { failure(it) } }
122-
startListening()
123122
return this
124123
}
125124

126125
@Ta5kBuilder
127126
fun success(result: Result) {
128-
stopListening()
127+
unsubscribe()
129128
state = State.COMPLETED
130129
runSafe {
131130
executeNextTask(result)
@@ -141,15 +140,13 @@ abstract class Task<Result> : Nameable {
141140
fun activate() {
142141
if (state != State.PAUSED) return
143142
state = State.RUNNING
144-
startListening()
145143
}
146144

147145
@Ta5kBuilder
148146
fun deactivate() {
149147
if (state != State.RUNNING) return
150148
if (unpausable) return
151149
state = State.PAUSED
152-
stopListening()
153150
}
154151

155152
private fun SafeContext.executeNextTask(result: Result) {
@@ -174,7 +171,7 @@ abstract class Task<Result> : Nameable {
174171
if (this is TaskFlow) return
175172
if (state == State.COMPLETED || state == State.CANCELLED) return
176173
state = State.CANCELLED
177-
stopListening()
174+
unsubscribe()
178175
}
179176

180177
@Ta5kBuilder
@@ -198,7 +195,7 @@ abstract class Task<Result> : Nameable {
198195
stacktrace: MutableList<Task<*>> = mutableListOf()
199196
) {
200197
state = State.FAILED
201-
stopListening()
198+
unsubscribe()
202199
stacktrace.add(this)
203200
parent?.failure(e, stacktrace) ?: run {
204201
val message = buildString {
@@ -214,16 +211,6 @@ abstract class Task<Result> : Nameable {
214211
}
215212
}
216213

217-
private fun startListening() {
218-
EventFlow.syncListeners.subscribe(syncListeners)
219-
EventFlow.concurrentListeners.subscribe(concurrentListeners)
220-
}
221-
222-
private fun stopListening() {
223-
EventFlow.syncListeners.unsubscribe(syncListeners)
224-
EventFlow.concurrentListeners.unsubscribe(concurrentListeners)
225-
}
226-
227214
/**
228215
* Specifies the next task to execute after the current task completes successfully.
229216
*

0 commit comments

Comments
 (0)