spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From t...@apache.org
Subject spark git commit: [SPARK-11335][STREAMING] update kafka direct python docs on how to get the offset ranges for a KafkaRDD
Date Wed, 11 Nov 2015 21:29:34 GMT
Repository: spark
Updated Branches:
  refs/heads/master a9a6b80c7 -> dd77e278b


[SPARK-11335][STREAMING] update kafka direct python docs on how to get the offset ranges for
a KafkaRDD

tdas koeninger

This updates the Spark Streaming + Kafka Integration Guide doc with a working method to access
the offsets of a `KafkaRDD` through Python.

Author: Nick Evans <me@nicolasevans.org>

Closes #9289 from manygrams/update_kafka_direct_python_docs.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/dd77e278
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/dd77e278
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/dd77e278

Branch: refs/heads/master
Commit: dd77e278b99e45c20fdefb1c795f3c5148d577db
Parents: a9a6b80
Author: Nick Evans <me@nicolasevans.org>
Authored: Wed Nov 11 13:29:30 2015 -0800
Committer: Tathagata Das <tathagata.das1565@gmail.com>
Committed: Wed Nov 11 13:29:30 2015 -0800

----------------------------------------------------------------------
 docs/streaming-kafka-integration.md | 15 ++++++++++++++-
 1 file changed, 14 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/dd77e278/docs/streaming-kafka-integration.md
----------------------------------------------------------------------
diff --git a/docs/streaming-kafka-integration.md b/docs/streaming-kafka-integration.md
index ab7f011..b00351b 100644
--- a/docs/streaming-kafka-integration.md
+++ b/docs/streaming-kafka-integration.md
@@ -181,7 +181,20 @@ Next, we discuss how to use this approach in your streaming application.
 		);
 	</div>
 	<div data-lang="python" markdown="1">
-		Not supported yet
+		offsetRanges = []
+
+		def storeOffsetRanges(rdd):
+		    global offsetRanges
+		    offsetRanges = rdd.offsetRanges()
+		    return rdd
+
+		def printOffsetRanges(rdd):
+		    for o in offsetRanges:
+		        print "%s %s %s %s" % (o.topic, o.partition, o.fromOffset, o.untilOffset)
+
+		directKafkaStream\
+		    .transform(storeOffsetRanges)\
+		    .foreachRDD(printOffsetRanges)
 	</div>
    	</div>
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message