Skip to content

Commit bebf4c4

Browse files
committed
[SPARK-5549] Define TaskContext interface in Scala.
So the interface documentation shows up in ScalaDoc. Author: Reynold Xin <[email protected]> Closes apache#4324 from rxin/TaskContext-scala and squashes the following commits: 2480a17 [Reynold Xin] comment 573756f [Reynold Xin] style fixes and javadoc fixes. 87dd537 [Reynold Xin] [SPARK-5549] Define TaskContext interface in Scala.
1 parent 523a935 commit bebf4c4

File tree

5 files changed

+183
-131
lines changed

5 files changed

+183
-131
lines changed

core/src/main/java/org/apache/spark/TaskContext.java

-126
This file was deleted.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,136 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark
19+
20+
import java.io.Serializable
21+
22+
import org.apache.spark.annotation.DeveloperApi
23+
import org.apache.spark.executor.TaskMetrics
24+
import org.apache.spark.util.TaskCompletionListener
25+
26+
27+
object TaskContext {
28+
/**
29+
* Return the currently active TaskContext. This can be called inside of
30+
* user functions to access contextual information about running tasks.
31+
*/
32+
def get(): TaskContext = taskContext.get
33+
34+
private val taskContext: ThreadLocal[TaskContext] = new ThreadLocal[TaskContext]
35+
36+
// Note: protected[spark] instead of private[spark] to prevent the following two from
37+
// showing up in JavaDoc.
38+
/**
39+
* Set the thread local TaskContext. Internal to Spark.
40+
*/
41+
protected[spark] def setTaskContext(tc: TaskContext): Unit = taskContext.set(tc)
42+
43+
/**
44+
* Unset the thread local TaskContext. Internal to Spark.
45+
*/
46+
protected[spark] def unset(): Unit = taskContext.remove()
47+
}
48+
49+
50+
/**
51+
* Contextual information about a task which can be read or mutated during
52+
* execution. To access the TaskContext for a running task, use:
53+
* {{{
54+
* org.apache.spark.TaskContext.get()
55+
* }}}
56+
*/
57+
abstract class TaskContext extends Serializable {
58+
// Note: TaskContext must NOT define a get method. Otherwise it will prevent the Scala compiler
59+
// from generating a static get method (based on the companion object's get method).
60+
61+
// Note: Update JavaTaskContextCompileCheck when new methods are added to this class.
62+
63+
// Note: getters in this class are defined with parentheses to maintain backward compatibility.
64+
65+
/**
66+
* Returns true if the task has completed.
67+
*/
68+
def isCompleted(): Boolean
69+
70+
/**
71+
* Returns true if the task has been killed.
72+
*/
73+
def isInterrupted(): Boolean
74+
75+
@deprecated("1.2.0", "use isRunningLocally")
76+
def runningLocally(): Boolean
77+
78+
/**
79+
* Returns true if the task is running locally in the driver program.
80+
* @return
81+
*/
82+
def isRunningLocally(): Boolean
83+
84+
/**
85+
* Adds a (Java friendly) listener to be executed on task completion.
86+
* This will be called in all situation - success, failure, or cancellation.
87+
* An example use is for HadoopRDD to register a callback to close the input stream.
88+
*/
89+
def addTaskCompletionListener(listener: TaskCompletionListener): TaskContext
90+
91+
/**
92+
* Adds a listener in the form of a Scala closure to be executed on task completion.
93+
* This will be called in all situations - success, failure, or cancellation.
94+
* An example use is for HadoopRDD to register a callback to close the input stream.
95+
*/
96+
def addTaskCompletionListener(f: (TaskContext) => Unit): TaskContext
97+
98+
/**
99+
* Adds a callback function to be executed on task completion. An example use
100+
* is for HadoopRDD to register a callback to close the input stream.
101+
* Will be called in any situation - success, failure, or cancellation.
102+
*
103+
* @param f Callback function.
104+
*/
105+
@deprecated("1.2.0", "use addTaskCompletionListener")
106+
def addOnCompleteCallback(f: () => Unit)
107+
108+
/**
109+
* The ID of the stage that this task belong to.
110+
*/
111+
def stageId(): Int
112+
113+
/**
114+
* The ID of the RDD partition that is computed by this task.
115+
*/
116+
def partitionId(): Int
117+
118+
/**
119+
* How many times this task has been attempted. The first task attempt will be assigned
120+
* attemptNumber = 0, and subsequent attempts will have increasing attempt numbers.
121+
*/
122+
def attemptNumber(): Int
123+
124+
@deprecated("1.3.0", "use attemptNumber")
125+
def attemptId(): Long
126+
127+
/**
128+
* An ID that is unique to this task attempt (within the same SparkContext, no two task attempts
129+
* will share the same attempt ID). This is roughly equivalent to Hadoop's TaskAttemptID.
130+
*/
131+
def taskAttemptId(): Long
132+
133+
/** ::DeveloperApi:: */
134+
@DeveloperApi
135+
def taskMetrics(): TaskMetrics
136+
}

core/src/main/scala/org/apache/spark/TaskContextImpl.scala

+4-4
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ private[spark] class TaskContextImpl(
3333
with Logging {
3434

3535
// For backwards-compatibility; this method is now deprecated as of 1.3.0.
36-
override def attemptId: Long = taskAttemptId
36+
override def attemptId(): Long = taskAttemptId
3737

3838
// List of callback functions to execute when the task completes.
3939
@transient private val onCompleteCallbacks = new ArrayBuffer[TaskCompletionListener]
@@ -87,10 +87,10 @@ private[spark] class TaskContextImpl(
8787
interrupted = true
8888
}
8989

90-
override def isCompleted: Boolean = completed
90+
override def isCompleted(): Boolean = completed
9191

92-
override def isRunningLocally: Boolean = runningLocally
92+
override def isRunningLocally(): Boolean = runningLocally
9393

94-
override def isInterrupted: Boolean = interrupted
94+
override def isInterrupted(): Boolean = interrupted
9595
}
9696

core/src/test/java/org/apache/spark/util/JavaTaskCompletionListenerImpl.java renamed to core/src/test/java/test/org/apache/spark/JavaTaskCompletionListenerImpl.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,10 @@
1515
* limitations under the License.
1616
*/
1717

18-
package org.apache.spark.util;
18+
package test.org.apache.spark;
1919

2020
import org.apache.spark.TaskContext;
21+
import org.apache.spark.util.TaskCompletionListener;
2122

2223

2324
/**
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package test.org.apache.spark;
19+
20+
import org.apache.spark.TaskContext;
21+
22+
/**
23+
* Something to make sure that TaskContext can be used in Java.
24+
*/
25+
public class JavaTaskContextCompileCheck {
26+
27+
public static void test() {
28+
TaskContext tc = TaskContext.get();
29+
30+
tc.isCompleted();
31+
tc.isInterrupted();
32+
tc.isRunningLocally();
33+
34+
tc.addTaskCompletionListener(new JavaTaskCompletionListenerImpl());
35+
36+
tc.attemptNumber();
37+
tc.partitionId();
38+
tc.stageId();
39+
tc.taskAttemptId();
40+
}
41+
}

0 commit comments

Comments
 (0)