Skip to content

Commit eb5f2b6

Browse files
kanzhangpwendell
authored andcommitted
SPARK-1407 drain event queue before stopping event logger
Author: Kan Zhang <[email protected]> Closes #366 from kanzhang/SPARK-1407 and squashes the following commits: cd0629f [Kan Zhang] code refactoring and adding test b073ee6 [Kan Zhang] SPARK-1407 drain event queue before stopping event logger
1 parent bde9cc1 commit eb5f2b6

File tree

4 files changed

+67
-16
lines changed

4 files changed

+67
-16
lines changed

core/src/main/scala/org/apache/spark/SparkContext.scala

+2-2
Original file line numberDiff line numberDiff line change
@@ -931,7 +931,6 @@ class SparkContext(config: SparkConf) extends Logging {
931931
/** Shut down the SparkContext. */
932932
def stop() {
933933
ui.stop()
934-
eventLogger.foreach(_.stop())
935934
// Do this only if not stopped already - best case effort.
936935
// prevent NPE if stopped more than once.
937936
val dagSchedulerCopy = dagScheduler
@@ -940,13 +939,14 @@ class SparkContext(config: SparkConf) extends Logging {
940939
metadataCleaner.cancel()
941940
cleaner.foreach(_.stop())
942941
dagSchedulerCopy.stop()
943-
listenerBus.stop()
944942
taskScheduler = null
945943
// TODO: Cache.stop()?
946944
env.stop()
947945
SparkEnv.set(null)
948946
ShuffleMapTask.clearCache()
949947
ResultTask.clearCache()
948+
listenerBus.stop()
949+
eventLogger.foreach(_.stop())
950950
logInfo("Successfully stopped SparkContext")
951951
} else {
952952
logInfo("SparkContext already stopped")

core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala

+19-13
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,22 @@ private[spark] class LiveListenerBus extends SparkListenerBus with Logging {
3636
private val eventQueue = new LinkedBlockingQueue[SparkListenerEvent](EVENT_QUEUE_CAPACITY)
3737
private var queueFullErrorMessageLogged = false
3838
private var started = false
39+
private val listenerThread = new Thread("SparkListenerBus") {
40+
setDaemon(true)
41+
override def run() {
42+
while (true) {
43+
val event = eventQueue.take
44+
if (event == SparkListenerShutdown) {
45+
// Get out of the while loop and shutdown the daemon thread
46+
return
47+
}
48+
postToAll(event)
49+
}
50+
}
51+
}
52+
53+
// Exposed for testing
54+
@volatile private[spark] var stopCalled = false
3955

4056
/**
4157
* Start sending events to attached listeners.
@@ -48,20 +64,8 @@ private[spark] class LiveListenerBus extends SparkListenerBus with Logging {
4864
if (started) {
4965
throw new IllegalStateException("Listener bus already started!")
5066
}
67+
listenerThread.start()
5168
started = true
52-
new Thread("SparkListenerBus") {
53-
setDaemon(true)
54-
override def run() {
55-
while (true) {
56-
val event = eventQueue.take
57-
if (event == SparkListenerShutdown) {
58-
// Get out of the while loop and shutdown the daemon thread
59-
return
60-
}
61-
postToAll(event)
62-
}
63-
}
64-
}.start()
6569
}
6670

6771
def post(event: SparkListenerEvent) {
@@ -93,9 +97,11 @@ private[spark] class LiveListenerBus extends SparkListenerBus with Logging {
9397
}
9498

9599
def stop() {
100+
stopCalled = true
96101
if (!started) {
97102
throw new IllegalStateException("Attempted to stop a listener bus that has not yet started!")
98103
}
99104
post(SparkListenerShutdown)
105+
listenerThread.join()
100106
}
101107
}

core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala

+45
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
package org.apache.spark.scheduler
1919

20+
import java.util.concurrent.Semaphore
21+
2022
import scala.collection.mutable
2123

2224
import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll, FunSuite}
@@ -72,6 +74,49 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with ShouldMatc
7274
}
7375
}
7476

77+
test("bus.stop() waits for the event queue to completely drain") {
78+
@volatile var drained = false
79+
80+
// Tells the listener to stop blocking
81+
val listenerWait = new Semaphore(1)
82+
83+
// When stop has returned
84+
val stopReturned = new Semaphore(1)
85+
86+
class BlockingListener extends SparkListener {
87+
override def onJobEnd(jobEnd: SparkListenerJobEnd) = {
88+
listenerWait.acquire()
89+
drained = true
90+
}
91+
}
92+
93+
val bus = new LiveListenerBus
94+
val blockingListener = new BlockingListener
95+
96+
bus.addListener(blockingListener)
97+
bus.start()
98+
bus.post(SparkListenerJobEnd(0, JobSucceeded))
99+
100+
// the queue should not drain immediately
101+
assert(!drained)
102+
103+
new Thread("ListenerBusStopper") {
104+
override def run() {
105+
// stop() will block until notify() is called below
106+
bus.stop()
107+
stopReturned.release(1)
108+
}
109+
}.start()
110+
111+
while (!bus.stopCalled) {
112+
Thread.sleep(10)
113+
}
114+
115+
listenerWait.release()
116+
stopReturned.acquire()
117+
assert(drained)
118+
}
119+
75120
test("basic creation of StageInfo") {
76121
val listener = new SaveStageAndTaskInfo
77122
sc.addSparkListener(listener)

examples/src/main/scala/org/apache/spark/examples/SparkHdfsLR.scala

+1-1
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,6 @@ object SparkHdfsLR {
7373
}
7474

7575
println("Final w: " + w)
76-
System.exit(0)
76+
sc.stop()
7777
}
7878
}

0 commit comments

Comments
 (0)