spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Tathagata Das (JIRA)" <j...@apache.org>
Subject [jira] [Updated] (SPARK-10122) AttributeError: 'RDD' object has no attribute 'offsetRanges'
Date Mon, 24 Aug 2015 19:29:46 GMT

     [ https://issues.apache.org/jira/browse/SPARK-10122?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]

Tathagata Das updated SPARK-10122:
----------------------------------
    Fix Version/s:     (was: 1.5.1)
                   1.5.0

> AttributeError: 'RDD' object has no attribute 'offsetRanges'
> ------------------------------------------------------------
>
>                 Key: SPARK-10122
>                 URL: https://issues.apache.org/jira/browse/SPARK-10122
>             Project: Spark
>          Issue Type: Bug
>          Components: PySpark, Streaming
>            Reporter: Amit Ramesh
>            Assignee: Saisai Shao
>              Labels: kafka
>             Fix For: 1.5.0
>
>
> SPARK-8389 added the offsetRanges interface to Kafka direct streams. This however appears
to break when chaining operations after a transform operation. Following is example code that
would result in an error (stack trace below). Note that if the 'count()' operation is taken
out of the example code then this error does not occur anymore, and the Kafka data is printed.
> {code:title=kafka_test.py|collapse=true}
> from pyspark import SparkContext
> from pyspark.streaming import StreamingContext
> from pyspark.streaming.kafka import KafkaUtils
> def attach_kafka_metadata(kafka_rdd):
>     offset_ranges = kafka_rdd.offsetRanges()
>     return kafka_rdd
> if __name__ == "__main__":
>     sc = SparkContext(appName='kafka-test')
>     ssc = StreamingContext(sc, 10)
>     kafka_stream = KafkaUtils.createDirectStream(
>         ssc,
>         [TOPIC],
>         kafkaParams={
>             'metadata.broker.list': BROKERS,
>         },
>     )
>     kafka_stream.transform(attach_kafka_metadata).count().pprint()
>     ssc.start()
>     ssc.awaitTermination()
> {code}
> {code:title=Stack trace|collapse=true}
> Traceback (most recent call last):
>   File "/home/spark/spark/python/lib/pyspark.zip/pyspark/streaming/util.py", line 62,
in call
>     r = self.func(t, *rdds)
>   File "/home/spark/spark/python/lib/pyspark.zip/pyspark/streaming/dstream.py", line
616, in <lambda>
>     self.func = lambda t, rdd: func(t, prev_func(t, rdd))
>   File "/home/spark/spark/python/lib/pyspark.zip/pyspark/streaming/dstream.py", line
616, in <lambda>
>     self.func = lambda t, rdd: func(t, prev_func(t, rdd))
>   File "/home/spark/spark/python/lib/pyspark.zip/pyspark/streaming/dstream.py", line
616, in <lambda>
>     self.func = lambda t, rdd: func(t, prev_func(t, rdd))
>   File "/home/spark/spark/python/lib/pyspark.zip/pyspark/streaming/dstream.py", line
616, in <lambda>
>     self.func = lambda t, rdd: func(t, prev_func(t, rdd))
>   File "/home/spark/spark/python/lib/pyspark.zip/pyspark/streaming/kafka.py", line 332,
in <lambda>
>     func = lambda t, rdd: oldfunc(rdd)
>   File "/home/spark/ad_realtime/batch/kafka_test.py", line 7, in attach_kafka_metadata
>     offset_ranges = kafka_rdd.offsetRanges()
> AttributeError: 'RDD' object has no attribute 'offsetRanges'
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org


Mime
View raw message