spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From t...@apache.org
Subject [1/2] spark git commit: [SPARK-8389] [STREAMING] [KAFKA] Example of getting offset ranges out o…
Date Sat, 20 Jun 2015 00:37:10 GMT
Repository: spark
Updated Branches:
  refs/heads/branch-1.4 2248ad8b7 -> a7b773a8b


[SPARK-8389] [STREAMING] [KAFKA] Example of getting offset ranges out o…

…f the existing java direct stream api

Author: cody koeninger <cody@koeninger.org>

Closes #6846 from koeninger/SPARK-8389 and squashes the following commits:

3f3c57a [cody koeninger] [Streaming][Kafka][SPARK-8389] Example of getting offset ranges out
of the existing java direct stream api


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/78d0ceea
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/78d0ceea
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/78d0ceea

Branch: refs/heads/branch-1.4
Commit: 78d0ceea82794d97f38d3e177e57b7e46ec98afc
Parents: 2248ad8
Author: cody koeninger <cody@koeninger.org>
Authored: Fri Jun 19 14:51:19 2015 +0200
Committer: Tathagata Das <tathagata.das1565@gmail.com>
Committed: Fri Jun 19 17:36:54 2015 -0700

----------------------------------------------------------------------
 .../streaming/kafka/JavaDirectKafkaStreamSuite.java  | 15 +++++++++++++--
 1 file changed, 13 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/78d0ceea/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaDirectKafkaStreamSuite.java
----------------------------------------------------------------------
diff --git a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaDirectKafkaStreamSuite.java
b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaDirectKafkaStreamSuite.java
index c0669fb..3913b71 100644
--- a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaDirectKafkaStreamSuite.java
+++ b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaDirectKafkaStreamSuite.java
@@ -32,6 +32,7 @@ import org.junit.Test;
 
 import org.apache.spark.SparkConf;
 import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaPairRDD;
 import org.apache.spark.api.java.function.Function;
 import org.apache.spark.streaming.Durations;
 import org.apache.spark.streaming.api.java.JavaDStream;
@@ -65,8 +66,8 @@ public class JavaDirectKafkaStreamSuite implements Serializable {
 
   @Test
   public void testKafkaStream() throws InterruptedException {
-    String topic1 = "topic1";
-    String topic2 = "topic2";
+    final String topic1 = "topic1";
+    final String topic2 = "topic2";
 
     String[] topic1data = createTopicAndSendData(topic1);
     String[] topic2data = createTopicAndSendData(topic2);
@@ -87,6 +88,16 @@ public class JavaDirectKafkaStreamSuite implements Serializable {
         StringDecoder.class,
         kafkaParams,
         topicToSet(topic1)
+    ).transformToPair(
+        // Make sure you can get offset ranges from the rdd
+        new Function<JavaPairRDD<String, String>, JavaPairRDD<String, String>>()
{
+          @Override
+          public JavaPairRDD<String, String> call(JavaPairRDD<String, String>
rdd) throws Exception {
+            OffsetRange[] offsets = ((HasOffsetRanges)rdd.rdd()).offsetRanges();
+            Assert.assertEquals(offsets[0].topic(), topic1);
+            return rdd;
+          }
+        }
     ).map(
         new Function<Tuple2<String, String>, String>() {
           @Override


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message