Skip to content

Commit 0561c45

Browse files
Davies Liutdas
Davies Liu
authored andcommitted
[SPARK-5154] [PySpark] [Streaming] Kafka streaming support in Python
This PR brings the Python API for Spark Streaming Kafka data source. ``` class KafkaUtils(__builtin__.object) | Static methods defined here: | | createStream(ssc, zkQuorum, groupId, topics, storageLevel=StorageLevel(True, True, False, False, 2), keyDecoder=<function utf8_decoder>, valueDecoder=<function utf8_decoder>) | Create an input stream that pulls messages from a Kafka Broker. | | :param ssc: StreamingContext object | :param zkQuorum: Zookeeper quorum (hostname:port,hostname:port,..). | :param groupId: The group id for this consumer. | :param topics: Dict of (topic_name -> numPartitions) to consume. | Each partition is consumed in its own thread. | :param storageLevel: RDD storage level. | :param keyDecoder: A function used to decode key | :param valueDecoder: A function used to decode value | :return: A DStream object ``` run the example: ``` bin/spark-submit --driver-class-path external/kafka-assembly/target/scala-*/spark-streaming-kafka-assembly-*.jar examples/src/main/python/streaming/kafka_wordcount.py localhost:2181 test ``` Author: Davies Liu <[email protected]> Author: Tathagata Das <[email protected]> Closes apache#3715 from davies/kafka and squashes the following commits: d93bfe0 [Davies Liu] Update make-distribution.sh 4280d04 [Davies Liu] address comments e6d0427 [Davies Liu] Merge branch 'master' of github.com:apache/spark into kafka f257071 [Davies Liu] add tests for null in RDD 23b039a [Davies Liu] address comments 9af51c4 [Davies Liu] Merge branch 'kafka' of github.com:davies/spark into kafka a74da87 [Davies Liu] address comments dc1eed0 [Davies Liu] Update kafka_wordcount.py 31e2317 [Davies Liu] Update kafka_wordcount.py 370ba61 [Davies Liu] Update kafka.py 97386b3 [Davies Liu] address comment 2c567a5 [Davies Liu] update logging and comment 33730d1 [Davies Liu] Merge branch 'master' of github.com:apache/spark into kafka adeeb38 [Davies Liu] Merge pull request apache#3 from tdas/kafka-python-api aea8953 [Tathagata Das] Kafka-assembly for Python API eea16a7 [Davies Liu] refactor f6ce899 [Davies Liu] add example and fix bugs 98c8d17 [Davies Liu] fix python style 5697a01 [Davies Liu] bypass decoder in scala 048dbe6 [Davies Liu] fix python style 75d485e [Davies Liu] add mqtt 07923c4 [Davies Liu] support kafka in Python
1 parent 554403f commit 0561c45

File tree

10 files changed

+313
-58
lines changed

10 files changed

+313
-58
lines changed

core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala

+19-47
Original file line numberDiff line numberDiff line change
@@ -316,6 +316,7 @@ private object SpecialLengths {
316316
val PYTHON_EXCEPTION_THROWN = -2
317317
val TIMING_DATA = -3
318318
val END_OF_STREAM = -4
319+
val NULL = -5
319320
}
320321

321322
private[spark] object PythonRDD extends Logging {
@@ -374,54 +375,25 @@ private[spark] object PythonRDD extends Logging {
374375
}
375376

376377
def writeIteratorToStream[T](iter: Iterator[T], dataOut: DataOutputStream) {
377-
// The right way to implement this would be to use TypeTags to get the full
378-
// type of T. Since I don't want to introduce breaking changes throughout the
379-
// entire Spark API, I have to use this hacky approach:
380-
if (iter.hasNext) {
381-
val first = iter.next()
382-
val newIter = Seq(first).iterator ++ iter
383-
first match {
384-
case arr: Array[Byte] =>
385-
newIter.asInstanceOf[Iterator[Array[Byte]]].foreach { bytes =>
386-
dataOut.writeInt(bytes.length)
387-
dataOut.write(bytes)
388-
}
389-
case string: String =>
390-
newIter.asInstanceOf[Iterator[String]].foreach { str =>
391-
writeUTF(str, dataOut)
392-
}
393-
case stream: PortableDataStream =>
394-
newIter.asInstanceOf[Iterator[PortableDataStream]].foreach { stream =>
395-
val bytes = stream.toArray()
396-
dataOut.writeInt(bytes.length)
397-
dataOut.write(bytes)
398-
}
399-
case (key: String, stream: PortableDataStream) =>
400-
newIter.asInstanceOf[Iterator[(String, PortableDataStream)]].foreach {
401-
case (key, stream) =>
402-
writeUTF(key, dataOut)
403-
val bytes = stream.toArray()
404-
dataOut.writeInt(bytes.length)
405-
dataOut.write(bytes)
406-
}
407-
case (key: String, value: String) =>
408-
newIter.asInstanceOf[Iterator[(String, String)]].foreach {
409-
case (key, value) =>
410-
writeUTF(key, dataOut)
411-
writeUTF(value, dataOut)
412-
}
413-
case (key: Array[Byte], value: Array[Byte]) =>
414-
newIter.asInstanceOf[Iterator[(Array[Byte], Array[Byte])]].foreach {
415-
case (key, value) =>
416-
dataOut.writeInt(key.length)
417-
dataOut.write(key)
418-
dataOut.writeInt(value.length)
419-
dataOut.write(value)
420-
}
421-
case other =>
422-
throw new SparkException("Unexpected element type " + first.getClass)
423-
}
378+
379+
def write(obj: Any): Unit = obj match {
380+
case null =>
381+
dataOut.writeInt(SpecialLengths.NULL)
382+
case arr: Array[Byte] =>
383+
dataOut.writeInt(arr.length)
384+
dataOut.write(arr)
385+
case str: String =>
386+
writeUTF(str, dataOut)
387+
case stream: PortableDataStream =>
388+
write(stream.toArray())
389+
case (key, value) =>
390+
write(key)
391+
write(value)
392+
case other =>
393+
throw new SparkException("Unexpected element type " + other.getClass)
424394
}
395+
396+
iter.foreach(write)
425397
}
426398

427399
/**

core/src/main/scala/org/apache/spark/api/python/PythonUtils.scala

+5
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import java.io.{File, InputStream, IOException, OutputStream}
2222
import scala.collection.mutable.ArrayBuffer
2323

2424
import org.apache.spark.SparkContext
25+
import org.apache.spark.api.java.{JavaSparkContext, JavaRDD}
2526

2627
private[spark] object PythonUtils {
2728
/** Get the PYTHONPATH for PySpark, either from SPARK_HOME, if it is set, or from our JAR */
@@ -39,4 +40,8 @@ private[spark] object PythonUtils {
3940
def mergePythonPaths(paths: String*): String = {
4041
paths.filter(_ != "").mkString(File.pathSeparator)
4142
}
43+
44+
def generateRDDWithNull(sc: JavaSparkContext): JavaRDD[String] = {
45+
sc.parallelize(List("a", null, "b"))
46+
}
4247
}

core/src/test/scala/org/apache/spark/api/python/PythonRDDSuite.scala

+17-6
Original file line numberDiff line numberDiff line change
@@ -23,11 +23,22 @@ import org.scalatest.FunSuite
2323

2424
class PythonRDDSuite extends FunSuite {
2525

26-
test("Writing large strings to the worker") {
27-
val input: List[String] = List("a"*100000)
28-
val buffer = new DataOutputStream(new ByteArrayOutputStream)
29-
PythonRDD.writeIteratorToStream(input.iterator, buffer)
30-
}
26+
test("Writing large strings to the worker") {
27+
val input: List[String] = List("a"*100000)
28+
val buffer = new DataOutputStream(new ByteArrayOutputStream)
29+
PythonRDD.writeIteratorToStream(input.iterator, buffer)
30+
}
3131

32+
test("Handle nulls gracefully") {
33+
val buffer = new DataOutputStream(new ByteArrayOutputStream)
34+
// Should not have NPE when write an Iterator with null in it
35+
// The correctness will be tested in Python
36+
PythonRDD.writeIteratorToStream(Iterator("a", null), buffer)
37+
PythonRDD.writeIteratorToStream(Iterator(null, "a"), buffer)
38+
PythonRDD.writeIteratorToStream(Iterator("a".getBytes, null), buffer)
39+
PythonRDD.writeIteratorToStream(Iterator(null, "a".getBytes), buffer)
40+
PythonRDD.writeIteratorToStream(Iterator((null, null), ("a", null), (null, "b")), buffer)
41+
PythonRDD.writeIteratorToStream(
42+
Iterator((null, null), ("a".getBytes, null), (null, "b".getBytes)), buffer)
43+
}
3244
}
33-
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
#
2+
# Licensed to the Apache Software Foundation (ASF) under one or more
3+
# contributor license agreements. See the NOTICE file distributed with
4+
# this work for additional information regarding copyright ownership.
5+
# The ASF licenses this file to You under the Apache License, Version 2.0
6+
# (the "License"); you may not use this file except in compliance with
7+
# the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
#
17+
18+
"""
19+
Counts words in UTF8 encoded, '\n' delimited text received from the network every second.
20+
Usage: network_wordcount.py <zk> <topic>
21+
22+
To run this on your local machine, you need to setup Kafka and create a producer first, see
23+
http://kafka.apache.org/documentation.html#quickstart
24+
25+
and then run the example
26+
`$ bin/spark-submit --driver-class-path external/kafka-assembly/target/scala-*/\
27+
spark-streaming-kafka-assembly-*.jar examples/src/main/python/streaming/kafka_wordcount.py \
28+
localhost:2181 test`
29+
"""
30+
31+
import sys
32+
33+
from pyspark import SparkContext
34+
from pyspark.streaming import StreamingContext
35+
from pyspark.streaming.kafka import KafkaUtils
36+
37+
if __name__ == "__main__":
38+
if len(sys.argv) != 3:
39+
print >> sys.stderr, "Usage: kafka_wordcount.py <zk> <topic>"
40+
exit(-1)
41+
42+
sc = SparkContext(appName="PythonStreamingKafkaWordCount")
43+
ssc = StreamingContext(sc, 1)
44+
45+
zkQuorum, topic = sys.argv[1:]
46+
kvs = KafkaUtils.createStream(ssc, zkQuorum, "spark-streaming-consumer", {topic: 1})
47+
lines = kvs.map(lambda x: x[1])
48+
counts = lines.flatMap(lambda line: line.split(" ")) \
49+
.map(lambda word: (word, 1)) \
50+
.reduceByKey(lambda a, b: a+b)
51+
counts.pprint()
52+
53+
ssc.start()
54+
ssc.awaitTermination()

external/kafka-assembly/pom.xml

+106
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<!--
3+
~ Licensed to the Apache Software Foundation (ASF) under one or more
4+
~ contributor license agreements. See the NOTICE file distributed with
5+
~ this work for additional information regarding copyright ownership.
6+
~ The ASF licenses this file to You under the Apache License, Version 2.0
7+
~ (the "License"); you may not use this file except in compliance with
8+
~ the License. You may obtain a copy of the License at
9+
~
10+
~ http://www.apache.org/licenses/LICENSE-2.0
11+
~
12+
~ Unless required by applicable law or agreed to in writing, software
13+
~ distributed under the License is distributed on an "AS IS" BASIS,
14+
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
~ See the License for the specific language governing permissions and
16+
~ limitations under the License.
17+
-->
18+
19+
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
20+
<modelVersion>4.0.0</modelVersion>
21+
<parent>
22+
<groupId>org.apache.spark</groupId>
23+
<artifactId>spark-parent</artifactId>
24+
<version>1.3.0-SNAPSHOT</version>
25+
<relativePath>../../pom.xml</relativePath>
26+
</parent>
27+
28+
<groupId>org.apache.spark</groupId>
29+
<artifactId>spark-streaming-kafka-assembly_2.10</artifactId>
30+
<packaging>jar</packaging>
31+
<name>Spark Project External Kafka Assembly</name>
32+
<url>http://spark.apache.org/</url>
33+
34+
<properties>
35+
<sbt.project.name>streaming-kafka-assembly</sbt.project.name>
36+
<spark.jar.dir>scala-${scala.binary.version}</spark.jar.dir>
37+
<spark.jar.basename>spark-streaming-kafka-assembly-${project.version}.jar</spark.jar.basename>
38+
<spark.jar>${project.build.directory}/${spark.jar.dir}/${spark.jar.basename}</spark.jar>
39+
</properties>
40+
41+
<dependencies>
42+
<dependency>
43+
<groupId>org.apache.spark</groupId>
44+
<artifactId>spark-streaming-kafka_${scala.binary.version}</artifactId>
45+
<version>${project.version}</version>
46+
</dependency>
47+
<dependency>
48+
<groupId>org.apache.spark</groupId>
49+
<artifactId>spark-streaming_${scala.binary.version}</artifactId>
50+
<version>${project.version}</version>
51+
<scope>provided</scope>
52+
</dependency>
53+
</dependencies>
54+
55+
<build>
56+
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
57+
<testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
58+
<plugins>
59+
<plugin>
60+
<groupId>org.apache.maven.plugins</groupId>
61+
<artifactId>maven-shade-plugin</artifactId>
62+
<configuration>
63+
<shadedArtifactAttached>false</shadedArtifactAttached>
64+
<outputFile>${spark.jar}</outputFile>
65+
<artifactSet>
66+
<includes>
67+
<include>*:*</include>
68+
</includes>
69+
</artifactSet>
70+
<filters>
71+
<filter>
72+
<artifact>*:*</artifact>
73+
<excludes>
74+
<exclude>META-INF/*.SF</exclude>
75+
<exclude>META-INF/*.DSA</exclude>
76+
<exclude>META-INF/*.RSA</exclude>
77+
</excludes>
78+
</filter>
79+
</filters>
80+
</configuration>
81+
<executions>
82+
<execution>
83+
<phase>package</phase>
84+
<goals>
85+
<goal>shade</goal>
86+
</goals>
87+
<configuration>
88+
<transformers>
89+
<transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
90+
<transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
91+
<resource>reference.conf</resource>
92+
</transformer>
93+
<transformer implementation="org.apache.maven.plugins.shade.resource.DontIncludeResourceTransformer">
94+
<resource>log4j.properties</resource>
95+
</transformer>
96+
<transformer implementation="org.apache.maven.plugins.shade.resource.ApacheLicenseResourceTransformer"/>
97+
<transformer implementation="org.apache.maven.plugins.shade.resource.ApacheNoticeResourceTransformer"/>
98+
</transformers>
99+
</configuration>
100+
</execution>
101+
</executions>
102+
</plugin>
103+
</plugins>
104+
</build>
105+
</project>
106+

pom.xml

+1
Original file line numberDiff line numberDiff line change
@@ -1629,6 +1629,7 @@
16291629
</properties>
16301630
<modules>
16311631
<module>external/kafka</module>
1632+
<module>external/kafka-assembly</module>
16321633
</modules>
16331634
</profile>
16341635

project/SparkBuild.scala

+11-3
Original file line numberDiff line numberDiff line change
@@ -44,8 +44,9 @@ object BuildCommons {
4444
sparkKinesisAsl) = Seq("yarn", "yarn-stable", "java8-tests", "ganglia-lgpl",
4545
"kinesis-asl").map(ProjectRef(buildLocation, _))
4646

47-
val assemblyProjects@Seq(assembly, examples, networkYarn) =
48-
Seq("assembly", "examples", "network-yarn").map(ProjectRef(buildLocation, _))
47+
val assemblyProjects@Seq(assembly, examples, networkYarn, streamingKafkaAssembly) =
48+
Seq("assembly", "examples", "network-yarn", "streaming-kafka-assembly")
49+
.map(ProjectRef(buildLocation, _))
4950

5051
val tools = ProjectRef(buildLocation, "tools")
5152
// Root project.
@@ -300,7 +301,14 @@ object Assembly {
300301
sys.props.get("hadoop.version")
301302
.getOrElse(SbtPomKeys.effectivePom.value.getProperties.get("hadoop.version").asInstanceOf[String])
302303
},
303-
jarName in assembly := s"${moduleName.value}-${version.value}-hadoop${hadoopVersion.value}.jar",
304+
jarName in assembly <<= (version, moduleName, hadoopVersion) map { (v, mName, hv) =>
305+
if (mName.contains("streaming-kafka-assembly")) {
306+
// This must match the same name used in maven (see external/kafka-assembly/pom.xml)
307+
s"${mName}-${v}.jar"
308+
} else {
309+
s"${mName}-${v}-hadoop${hv}.jar"
310+
}
311+
},
304312
mergeStrategy in assembly := {
305313
case PathList("org", "datanucleus", xs @ _*) => MergeStrategy.discard
306314
case m if m.toLowerCase.endsWith("manifest.mf") => MergeStrategy.discard

python/pyspark/serializers.py

+8-1
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@ class SpecialLengths(object):
7070
PYTHON_EXCEPTION_THROWN = -2
7171
TIMING_DATA = -3
7272
END_OF_STREAM = -4
73+
NULL = -5
7374

7475

7576
class Serializer(object):
@@ -133,6 +134,8 @@ def load_stream(self, stream):
133134

134135
def _write_with_length(self, obj, stream):
135136
serialized = self.dumps(obj)
137+
if serialized is None:
138+
raise ValueError("serialized value should not be None")
136139
if len(serialized) > (1 << 31):
137140
raise ValueError("can not serialize object larger than 2G")
138141
write_int(len(serialized), stream)
@@ -145,8 +148,10 @@ def _read_with_length(self, stream):
145148
length = read_int(stream)
146149
if length == SpecialLengths.END_OF_DATA_SECTION:
147150
raise EOFError
151+
elif length == SpecialLengths.NULL:
152+
return None
148153
obj = stream.read(length)
149-
if obj == "":
154+
if len(obj) < length:
150155
raise EOFError
151156
return self.loads(obj)
152157

@@ -484,6 +489,8 @@ def loads(self, stream):
484489
length = read_int(stream)
485490
if length == SpecialLengths.END_OF_DATA_SECTION:
486491
raise EOFError
492+
elif length == SpecialLengths.NULL:
493+
return None
487494
s = stream.read(length)
488495
return s.decode("utf-8") if self.use_unicode else s
489496

0 commit comments

Comments
 (0)