Integrating Apache Kafka with Apache Spark Streaming using Python

I am trying to integrate Apache Kafka with Apache spark streaming using Python (I am new to all these).

For this I have done the following steps

  1. Started Zookeeper
  2. Started Apache Kafka
  3. Added topic in Apache Kafka
  4. Managed to list available topics using this command

bin/ –list –zookeeper localhost:2181

  1. I have taken the Kafka word count code from here

and the code is

from __future__ import print_function

import sys

from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils

if __name__ == "__main__":
    if len(sys.argv) != 3:
        print("Usage: <zk> <topic>", file=sys.stderr)

    sc = SparkContext(appName="PythonStreamingKafkaWordCount")
    ssc = StreamingContext(sc, 1)

    zkQuorum, topic = sys.argv[1:]
    kvs = KafkaUtils.createStream(ssc, zkQuorum, "spark-streaming-consumer", {topic: 1})
    lines = x: x[1])
    counts = lines.flatMap(lambda line: line.split(" ")) 
        .map(lambda word: (word, 1)) 
        .reduceByKey(lambda a, b: a+b)

  1. I executed the code using the command

./spark-submit /root/girish/python/ localhost:2181

and I got this error

Traceback (most recent call last):
  File "/root/girish/python/", line 28, in <module>
    kvs = KafkaUtils.createStream(ssc, zkQuorum, "spark-streaming-consumer", {topic: 1})
  File "/root/spark-", line 72, in createStream
    raise e
py4j.protocol.Py4JJavaError: An error occurred while calling o23.loadClass.
: java.lang.ClassNotFoundException: org.apache.spark.streaming.kafka.KafkaUtilsPythonHelper
        at Method)
        at java.lang.ClassLoader.loadClass(
        at java.lang.ClassLoader.loadClass(
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(
        at java.lang.reflect.Method.invoke(
        at py4j.reflection.MethodInvoker.invoke(
        at py4j.reflection.ReflectionEngine.invoke(
        at py4j.Gateway.invoke(
        at py4j.commands.AbstractCommand.invokeMethod(
        at py4j.commands.CallCommand.execute(
  1. I have updated the execution code using the answer from this question

spark submit failed with spark streaming workdcount python code


 ./spark-submit --jars /root/spark-,/usr/hdp/,/usr/hdp/,/usr/hdp/  /root/girish/python/ localhost:2181 <topic name>

Now I am getting this error

File "/root/girish/python/", line 28, in <module>
    kvs = KafkaUtils.createStream(ssc, zkQuorum, "spark-streaming-consumer", {topic: 1})
  File "/root/spark-", line 67, in createStream
    jstream = helper.createStream(ssc._jssc, kafkaParams, topics, jlevel)
  File "/root/spark-", line 529, in __call__
  File "/root/spark-", line 265, in get_command_part
AttributeError: 'dict' object has no attribute '_get_object_id'

Please help to solve this issue.

Thanks in advance

PS: I am using Apache Spark 1.2

Source: apache

Leave a Reply