spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From pwend...@apache.org
Subject [2/3] [java8API] SPARK-964 Investigate the potential for using JDK 8 lambda expressions for the Java/Scala APIs
Date Tue, 04 Mar 2014 06:31:37 GMT
http://git-wip-us.apache.org/repos/asf/spark/blob/181ec503/examples/src/main/java/org/apache/spark/mllib/examples/JavaALS.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/spark/mllib/examples/JavaALS.java b/examples/src/main/java/org/apache/spark/mllib/examples/JavaALS.java
index 435a86e..64a3a04 100644
--- a/examples/src/main/java/org/apache/spark/mllib/examples/JavaALS.java
+++ b/examples/src/main/java/org/apache/spark/mllib/examples/JavaALS.java
@@ -35,7 +35,7 @@ import scala.Tuple2;
  */
 public final class JavaALS {
 
-  static class ParseRating extends Function<String, Rating> {
+  static class ParseRating implements Function<String, Rating> {
     private static final Pattern COMMA = Pattern.compile(",");
 
     @Override
@@ -48,7 +48,7 @@ public final class JavaALS {
     }
   }
 
-  static class FeaturesToString extends Function<Tuple2<Object, double[]>, String> {
+  static class FeaturesToString implements Function<Tuple2<Object, double[]>, String> {
     @Override
     public String call(Tuple2<Object, double[]> element) {
       return element._1() + "," + Arrays.toString(element._2());

http://git-wip-us.apache.org/repos/asf/spark/blob/181ec503/examples/src/main/java/org/apache/spark/mllib/examples/JavaKMeans.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/spark/mllib/examples/JavaKMeans.java b/examples/src/main/java/org/apache/spark/mllib/examples/JavaKMeans.java
index 4b2658f..76ebdcc 100644
--- a/examples/src/main/java/org/apache/spark/mllib/examples/JavaKMeans.java
+++ b/examples/src/main/java/org/apache/spark/mllib/examples/JavaKMeans.java
@@ -32,7 +32,7 @@ import java.util.regex.Pattern;
  */
 public final class JavaKMeans {
 
-  static class ParsePoint extends Function<String, double[]> {
+  static class ParsePoint implements Function<String, double[]> {
     private static final Pattern SPACE = Pattern.compile(" ");
 
     @Override

http://git-wip-us.apache.org/repos/asf/spark/blob/181ec503/examples/src/main/java/org/apache/spark/mllib/examples/JavaLR.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/spark/mllib/examples/JavaLR.java b/examples/src/main/java/org/apache/spark/mllib/examples/JavaLR.java
index 21586ce..667c72f 100644
--- a/examples/src/main/java/org/apache/spark/mllib/examples/JavaLR.java
+++ b/examples/src/main/java/org/apache/spark/mllib/examples/JavaLR.java
@@ -34,7 +34,7 @@ import java.util.regex.Pattern;
  */
 public final class JavaLR {
 
-  static class ParsePoint extends Function<String, LabeledPoint> {
+  static class ParsePoint implements Function<String, LabeledPoint> {
     private static final Pattern COMMA = Pattern.compile(",");
     private static final Pattern SPACE = Pattern.compile(" ");
 

http://git-wip-us.apache.org/repos/asf/spark/blob/181ec503/examples/src/main/java/org/apache/spark/streaming/examples/JavaKafkaWordCount.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/spark/streaming/examples/JavaKafkaWordCount.java b/examples/src/main/java/org/apache/spark/streaming/examples/JavaKafkaWordCount.java
index 2ffd351..d704be0 100644
--- a/examples/src/main/java/org/apache/spark/streaming/examples/JavaKafkaWordCount.java
+++ b/examples/src/main/java/org/apache/spark/streaming/examples/JavaKafkaWordCount.java
@@ -89,7 +89,7 @@ public final class JavaKafkaWordCount {
       }
     });
 
-    JavaPairDStream<String, Integer> wordCounts = words.map(
+    JavaPairDStream<String, Integer> wordCounts = words.mapToPair(
       new PairFunction<String, String, Integer>() {
         @Override
         public Tuple2<String, Integer> call(String s) {

http://git-wip-us.apache.org/repos/asf/spark/blob/181ec503/examples/src/main/java/org/apache/spark/streaming/examples/JavaNetworkWordCount.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/spark/streaming/examples/JavaNetworkWordCount.java b/examples/src/main/java/org/apache/spark/streaming/examples/JavaNetworkWordCount.java
index 7777c98..7f68d45 100644
--- a/examples/src/main/java/org/apache/spark/streaming/examples/JavaNetworkWordCount.java
+++ b/examples/src/main/java/org/apache/spark/streaming/examples/JavaNetworkWordCount.java
@@ -69,7 +69,7 @@ public final class JavaNetworkWordCount {
         return Lists.newArrayList(SPACE.split(x));
       }
     });
-    JavaPairDStream<String, Integer> wordCounts = words.map(
+    JavaPairDStream<String, Integer> wordCounts = words.mapToPair(
       new PairFunction<String, String, Integer>() {
         @Override
         public Tuple2<String, Integer> call(String s) {

http://git-wip-us.apache.org/repos/asf/spark/blob/181ec503/examples/src/main/java/org/apache/spark/streaming/examples/JavaQueueStream.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/spark/streaming/examples/JavaQueueStream.java b/examples/src/main/java/org/apache/spark/streaming/examples/JavaQueueStream.java
index 26c4462..88ad341 100644
--- a/examples/src/main/java/org/apache/spark/streaming/examples/JavaQueueStream.java
+++ b/examples/src/main/java/org/apache/spark/streaming/examples/JavaQueueStream.java
@@ -63,7 +63,7 @@ public final class JavaQueueStream {
 
     // Create the QueueInputDStream and use it do some processing
     JavaDStream<Integer> inputStream = ssc.queueStream(rddQueue);
-    JavaPairDStream<Integer, Integer> mappedStream = inputStream.map(
+    JavaPairDStream<Integer, Integer> mappedStream = inputStream.mapToPair(
         new PairFunction<Integer, Integer, Integer>() {
           @Override
           public Tuple2<Integer, Integer> call(Integer i) {

http://git-wip-us.apache.org/repos/asf/spark/blob/181ec503/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQUtils.scala
----------------------------------------------------------------------
diff --git a/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQUtils.scala b/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQUtils.scala
index c989ec0..b254e00 100644
--- a/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQUtils.scala
+++ b/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQUtils.scala
@@ -75,7 +75,7 @@ object ZeroMQUtils {
     ): JavaDStream[T] = {
     implicit val cm: ClassTag[T] =
       implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
-    val fn = (x: Seq[ByteString]) => bytesToObjects.apply(x.map(_.toArray).toArray).toIterator
+    val fn = (x: Seq[ByteString]) => bytesToObjects.call(x.map(_.toArray).toArray).toIterator
     createStream[T](jssc.ssc, publisherUrl, subscribe, fn, storageLevel, supervisorStrategy)
   }
 
@@ -99,7 +99,7 @@ object ZeroMQUtils {
     ): JavaDStream[T] = {
     implicit val cm: ClassTag[T] =
       implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
-    val fn = (x: Seq[ByteString]) => bytesToObjects.apply(x.map(_.toArray).toArray).toIterator
+    val fn = (x: Seq[ByteString]) => bytesToObjects.call(x.map(_.toArray).toArray).toIterator
     createStream[T](jssc.ssc, publisherUrl, subscribe, fn, storageLevel)
   }
 
@@ -122,7 +122,7 @@ object ZeroMQUtils {
     ): JavaDStream[T] = {
     implicit val cm: ClassTag[T] =
       implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
-    val fn = (x: Seq[ByteString]) => bytesToObjects.apply(x.map(_.toArray).toArray).toIterator
+    val fn = (x: Seq[ByteString]) => bytesToObjects.call(x.map(_.toArray).toArray).toIterator
     createStream[T](jssc.ssc, publisherUrl, subscribe, fn)
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/181ec503/extras/README.md
----------------------------------------------------------------------
diff --git a/extras/README.md b/extras/README.md
new file mode 100644
index 0000000..1b4174b
--- /dev/null
+++ b/extras/README.md
@@ -0,0 +1 @@
+This directory contains build components not included by default in Spark's build.

http://git-wip-us.apache.org/repos/asf/spark/blob/181ec503/extras/java8-tests/README.md
----------------------------------------------------------------------
diff --git a/extras/java8-tests/README.md b/extras/java8-tests/README.md
new file mode 100644
index 0000000..e95b73a
--- /dev/null
+++ b/extras/java8-tests/README.md
@@ -0,0 +1,24 @@
+# Java 8 Test Suites
+
+These tests require having Java 8 installed and are isolated from the main Spark build.
+If Java 8 is not your system's default Java version, you will need to point Spark's build
+to your Java location. The set-up depends a bit on the build system:
+
+* Sbt users can either set JAVA_HOME to the location of a Java 8 JDK or explicitly pass
+  `-java-home` to the sbt launch script. If a Java 8 JDK is detected sbt will automatically
+  include the Java 8 test project.
+
+  `$ JAVA_HOME=/opt/jdk1.8.0/ sbt/sbt clean "test-only org.apache.spark.Java8APISuite"`
+
+* For Maven users,
+
+  Maven users can also refer to their Java 8 directory using JAVA_HOME. However, Maven will not
+  automatically detect the presence of a Java 8 JDK, so a special build profile `-Pjava8-tests`
+  must be used.
+
+  `$ JAVA_HOME=/opt/jdk1.8.0/ mvn clean install -DskipTests`
+  `$ JAVA_HOME=/opt/jdk1.8.0/ mvn test -Pjava8-tests -DwildcardSuites=org.apache.spark.Java8APISuite`
+
+  Note that the above command can only be run from project root directory since this module 
+  depends on core and the test-jars of core and streaming. This means an install step is 
+  required to make the test dependencies visible to the Java 8 sub-project.

http://git-wip-us.apache.org/repos/asf/spark/blob/181ec503/extras/java8-tests/pom.xml
----------------------------------------------------------------------
diff --git a/extras/java8-tests/pom.xml b/extras/java8-tests/pom.xml
new file mode 100644
index 0000000..602f66f
--- /dev/null
+++ b/extras/java8-tests/pom.xml
@@ -0,0 +1,151 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+~ Licensed to the Apache Software Foundation (ASF) under one or more
+~ contributor license agreements.  See the NOTICE file distributed with
+~ this work for additional information regarding copyright ownership.
+~ The ASF licenses this file to You under the Apache License, Version 2.0
+~ (the "License"); you may not use this file except in compliance with
+~ the License.  You may obtain a copy of the License at
+~
+~    http://www.apache.org/licenses/LICENSE-2.0
+~
+~ Unless required by applicable law or agreed to in writing, software
+~ distributed under the License is distributed on an "AS IS" BASIS,
+~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+~ See the License for the specific language governing permissions and
+~ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <groupId>org.apache.spark</groupId>
+    <artifactId>spark-parent</artifactId>
+    <version>1.0.0-SNAPSHOT</version>
+    <relativePath>../../pom.xml</relativePath>
+  </parent>
+
+  <groupId>org.apache.spark</groupId>
+  <artifactId>java8-tests_2.10</artifactId>
+  <packaging>pom</packaging>
+  <name>Spark Project Java8 Tests POM</name>
+  
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.spark</groupId>
+      <artifactId>spark-core_${scala.binary.version}</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.spark</groupId>
+      <artifactId>spark-streaming_${scala.binary.version}</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.spark</groupId>
+      <artifactId>spark-streaming_${scala.binary.version}</artifactId>
+      <version>${project.version}</version>
+      <type>test-jar</type>
+    </dependency>
+    <dependency>
+      <groupId>com.novocode</groupId>
+      <artifactId>junit-interface</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.scalatest</groupId>
+      <artifactId>scalatest_${scala.binary.version}</artifactId>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
+
+  <profiles>
+    <profile>
+      <id>java8-tests</id>
+    </profile>
+  </profiles>
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-surefire-plugin</artifactId>
+        <executions>
+          <execution>
+            <id>test</id>
+            <goals>
+              <goal>test</goal>
+            </goals>
+          </execution>
+        </executions>
+        <configuration>
+          <systemPropertyVariables>
+            <!-- For some reason surefire isn't setting this log4j file on the
+                 test classpath automatically. So we add it manually. -->
+            <log4j.configuration>
+              file:src/test/resources/log4j.properties
+            </log4j.configuration>
+          </systemPropertyVariables>
+          <skipTests>false</skipTests>
+          <includes>
+            <include>**/Suite*.java</include>
+            <include>**/*Suite.java</include>
+          </includes>
+        </configuration>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-compiler-plugin</artifactId>
+        <executions>
+          <execution>
+            <id>test-compile-first</id>
+            <phase>process-test-resources</phase>
+            <goals>
+              <goal>testCompile</goal>
+            </goals>
+          </execution>
+        </executions>
+        <configuration>
+          <fork>true</fork>
+          <verbose>true</verbose>
+          <forceJavacCompilerUse>true</forceJavacCompilerUse>
+          <source>1.8</source>
+          <compilerVersion>1.8</compilerVersion>
+          <target>1.8</target>
+          <encoding>UTF-8</encoding>
+          <maxmem>1024m</maxmem>
+        </configuration>
+      </plugin>
+      <plugin>
+        <!-- disabled -->
+        <groupId>net.alchim31.maven</groupId>
+        <artifactId>scala-maven-plugin</artifactId>
+        <executions>
+          <execution>
+            <phase>none</phase>
+          </execution>
+          <execution>
+            <id>scala-compile-first</id>
+            <phase>none</phase>
+          </execution>
+          <execution>
+            <id>scala-test-compile-first</id>
+            <phase>none</phase>
+          </execution>
+          <execution>
+            <id>attach-scaladocs</id>
+            <phase>none</phase>
+          </execution>
+        </executions>
+      </plugin>
+      <plugin>
+        <groupId>org.scalatest</groupId>
+        <artifactId>scalatest-maven-plugin</artifactId>
+        <executions>
+          <execution>
+            <id>test</id>
+            <phase>none</phase>
+          </execution>
+        </executions>
+      </plugin>
+    </plugins>
+  </build>
+</project>

http://git-wip-us.apache.org/repos/asf/spark/blob/181ec503/extras/java8-tests/src/test/java/org/apache/spark/Java8APISuite.java
----------------------------------------------------------------------
diff --git a/extras/java8-tests/src/test/java/org/apache/spark/Java8APISuite.java b/extras/java8-tests/src/test/java/org/apache/spark/Java8APISuite.java
new file mode 100644
index 0000000..f672512
--- /dev/null
+++ b/extras/java8-tests/src/test/java/org/apache/spark/Java8APISuite.java
@@ -0,0 +1,391 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark;
+
+import java.io.File;
+import java.io.Serializable;
+import java.util.*;
+
+import scala.Tuple2;
+
+import com.google.common.base.Optional;
+import com.google.common.io.Files;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.SequenceFileOutputFormat;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.spark.api.java.JavaDoubleRDD;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.api.java.function.*;
+
+/**
+ * Most of these tests replicate org.apache.spark.JavaAPISuite using java 8
+ * lambda syntax.
+ */
+public class Java8APISuite implements Serializable {
+  static int foreachCalls = 0;
+  private transient JavaSparkContext sc;
+
+  @Before
+  public void setUp() {
+    sc = new JavaSparkContext("local", "JavaAPISuite");
+  }
+
+  @After
+  public void tearDown() {
+    sc.stop();
+    sc = null;
+    // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
+    System.clearProperty("spark.driver.port");
+  }
+
+  @Test
+  public void foreachWithAnonymousClass() {
+    foreachCalls = 0;
+    JavaRDD<String> rdd = sc.parallelize(Arrays.asList("Hello", "World"));
+    rdd.foreach(new VoidFunction<String>() {
+      @Override
+      public void call(String s) {
+        foreachCalls++;
+      }
+    });
+    Assert.assertEquals(2, foreachCalls);
+  }
+
+  @Test
+  public void foreach() {
+    foreachCalls = 0;
+    JavaRDD<String> rdd = sc.parallelize(Arrays.asList("Hello", "World"));
+    rdd.foreach((x) -> foreachCalls++);
+    Assert.assertEquals(2, foreachCalls);
+  }
+
+  @Test
+  public void groupBy() {
+    JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 1, 2, 3, 5, 8, 13));
+    Function<Integer, Boolean> isOdd = x -> x % 2 == 0;
+    JavaPairRDD<Boolean, List<Integer>> oddsAndEvens = rdd.groupBy(isOdd);
+    Assert.assertEquals(2, oddsAndEvens.count());
+    Assert.assertEquals(2, oddsAndEvens.lookup(true).get(0).size());  // Evens
+    Assert.assertEquals(5, oddsAndEvens.lookup(false).get(0).size()); // Odds
+
+    oddsAndEvens = rdd.groupBy(isOdd, 1);
+    Assert.assertEquals(2, oddsAndEvens.count());
+    Assert.assertEquals(2, oddsAndEvens.lookup(true).get(0).size());  // Evens
+    Assert.assertEquals(5, oddsAndEvens.lookup(false).get(0).size()); // Odds
+  }
+
+  @Test
+  public void leftOuterJoin() {
+    JavaPairRDD<Integer, Integer> rdd1 = sc.parallelizePairs(Arrays.asList(
+      new Tuple2<Integer, Integer>(1, 1),
+      new Tuple2<Integer, Integer>(1, 2),
+      new Tuple2<Integer, Integer>(2, 1),
+      new Tuple2<Integer, Integer>(3, 1)
+    ));
+    JavaPairRDD<Integer, Character> rdd2 = sc.parallelizePairs(Arrays.asList(
+      new Tuple2<Integer, Character>(1, 'x'),
+      new Tuple2<Integer, Character>(2, 'y'),
+      new Tuple2<Integer, Character>(2, 'z'),
+      new Tuple2<Integer, Character>(4, 'w')
+    ));
+    List<Tuple2<Integer, Tuple2<Integer, Optional<Character>>>> joined =
+      rdd1.leftOuterJoin(rdd2).collect();
+    Assert.assertEquals(5, joined.size());
+    Tuple2<Integer, Tuple2<Integer, Optional<Character>>> firstUnmatched =
+      rdd1.leftOuterJoin(rdd2).filter(tup -> !tup._2()._2().isPresent()).first();
+    Assert.assertEquals(3, firstUnmatched._1().intValue());
+  }
+
+  @Test
+  public void foldReduce() {
+    JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 1, 2, 3, 5, 8, 13));
+    Function2<Integer, Integer, Integer> add = (a, b) -> a + b;
+
+    int sum = rdd.fold(0, add);
+    Assert.assertEquals(33, sum);
+
+    sum = rdd.reduce(add);
+    Assert.assertEquals(33, sum);
+  }
+
+  @Test
+  public void foldByKey() {
+    List<Tuple2<Integer, Integer>> pairs = Arrays.asList(
+      new Tuple2<Integer, Integer>(2, 1),
+      new Tuple2<Integer, Integer>(2, 1),
+      new Tuple2<Integer, Integer>(1, 1),
+      new Tuple2<Integer, Integer>(3, 2),
+      new Tuple2<Integer, Integer>(3, 1)
+    );
+    JavaPairRDD<Integer, Integer> rdd = sc.parallelizePairs(pairs);
+    JavaPairRDD<Integer, Integer> sums = rdd.foldByKey(0, (a, b) -> a + b);
+    Assert.assertEquals(1, sums.lookup(1).get(0).intValue());
+    Assert.assertEquals(2, sums.lookup(2).get(0).intValue());
+    Assert.assertEquals(3, sums.lookup(3).get(0).intValue());
+  }
+
+  @Test
+  public void reduceByKey() {
+    List<Tuple2<Integer, Integer>> pairs = Arrays.asList(
+      new Tuple2<Integer, Integer>(2, 1),
+      new Tuple2<Integer, Integer>(2, 1),
+      new Tuple2<Integer, Integer>(1, 1),
+      new Tuple2<Integer, Integer>(3, 2),
+      new Tuple2<Integer, Integer>(3, 1)
+    );
+    JavaPairRDD<Integer, Integer> rdd = sc.parallelizePairs(pairs);
+    JavaPairRDD<Integer, Integer> counts = rdd.reduceByKey((a, b) -> a + b);
+    Assert.assertEquals(1, counts.lookup(1).get(0).intValue());
+    Assert.assertEquals(2, counts.lookup(2).get(0).intValue());
+    Assert.assertEquals(3, counts.lookup(3).get(0).intValue());
+
+    Map<Integer, Integer> localCounts = counts.collectAsMap();
+    Assert.assertEquals(1, localCounts.get(1).intValue());
+    Assert.assertEquals(2, localCounts.get(2).intValue());
+    Assert.assertEquals(3, localCounts.get(3).intValue());
+
+    localCounts = rdd.reduceByKeyLocally((a, b) -> a + b);
+    Assert.assertEquals(1, localCounts.get(1).intValue());
+    Assert.assertEquals(2, localCounts.get(2).intValue());
+    Assert.assertEquals(3, localCounts.get(3).intValue());
+  }
+
+  @Test
+  public void map() {
+    JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5));
+    JavaDoubleRDD doubles = rdd.mapToDouble(x -> 1.0 * x).cache();
+    doubles.collect();
+    JavaPairRDD<Integer, Integer> pairs = rdd.mapToPair(x -> new Tuple2<Integer, Integer>(x, x))
+      .cache();
+    pairs.collect();
+    JavaRDD<String> strings = rdd.map(x -> x.toString()).cache();
+    strings.collect();
+  }
+
+  @Test
+  public void flatMap() {
+    JavaRDD<String> rdd = sc.parallelize(Arrays.asList("Hello World!",
+      "The quick brown fox jumps over the lazy dog."));
+    JavaRDD<String> words = rdd.flatMap(x -> Arrays.asList(x.split(" ")));
+
+    Assert.assertEquals("Hello", words.first());
+    Assert.assertEquals(11, words.count());
+
+    JavaPairRDD<String, String> pairs = rdd.flatMapToPair(s -> {
+      List<Tuple2<String, String>> pairs2 = new LinkedList<Tuple2<String, String>>();
+      for (String word : s.split(" ")) pairs2.add(new Tuple2<String, String>(word, word));
+      return pairs2;
+    });
+
+    Assert.assertEquals(new Tuple2<String, String>("Hello", "Hello"), pairs.first());
+    Assert.assertEquals(11, pairs.count());
+
+    JavaDoubleRDD doubles = rdd.flatMapToDouble(s -> {
+      List<Double> lengths = new LinkedList<Double>();
+      for (String word : s.split(" ")) lengths.add(word.length() * 1.0);
+      return lengths;
+    });
+
+    Double x = doubles.first();
+    Assert.assertEquals(5.0, doubles.first().doubleValue(), 0.01);
+    Assert.assertEquals(11, pairs.count());
+  }
+
+  @Test
+  public void mapsFromPairsToPairs() {
+    List<Tuple2<Integer, String>> pairs = Arrays.asList(
+      new Tuple2<Integer, String>(1, "a"),
+      new Tuple2<Integer, String>(2, "aa"),
+      new Tuple2<Integer, String>(3, "aaa")
+    );
+    JavaPairRDD<Integer, String> pairRDD = sc.parallelizePairs(pairs);
+
+    // Regression test for SPARK-668:
+    JavaPairRDD<String, Integer> swapped =
+      pairRDD.flatMapToPair(x -> Collections.singletonList(x.swap()));
+    swapped.collect();
+
+    // There was never a bug here, but it's worth testing:
+    pairRDD.map(item -> item.swap()).collect();
+  }
+
+  @Test
+  public void mapPartitions() {
+    JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4), 2);
+    JavaRDD<Integer> partitionSums = rdd.mapPartitions(iter -> {
+      int sum = 0;
+      while (iter.hasNext()) {
+        sum += iter.next();
+      }
+      return Collections.singletonList(sum);
+    });
+
+    Assert.assertEquals("[3, 7]", partitionSums.collect().toString());
+  }
+
+  @Test
+  public void sequenceFile() {
+    File tempDir = Files.createTempDir();
+    String outputDir = new File(tempDir, "output").getAbsolutePath();
+    List<Tuple2<Integer, String>> pairs = Arrays.asList(
+      new Tuple2<Integer, String>(1, "a"),
+      new Tuple2<Integer, String>(2, "aa"),
+      new Tuple2<Integer, String>(3, "aaa")
+    );
+    JavaPairRDD<Integer, String> rdd = sc.parallelizePairs(pairs);
+
+    rdd.mapToPair(pair ->
+      new Tuple2<IntWritable, Text>(new IntWritable(pair._1()), new Text(pair._2())))
+      .saveAsHadoopFile(outputDir, IntWritable.class, Text.class, SequenceFileOutputFormat.class);
+
+    // Try reading the output back as an object file
+    JavaPairRDD<Integer, String> readRDD = sc.sequenceFile(outputDir, IntWritable.class, Text.class)
+      .mapToPair(pair -> new Tuple2<Integer, String>(pair._1().get(), pair._2().toString()));
+    Assert.assertEquals(pairs, readRDD.collect());
+  }
+
+  @Test
+  public void zip() {
+    JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5));
+    JavaDoubleRDD doubles = rdd.mapToDouble(x -> 1.0 * x);
+    JavaPairRDD<Integer, Double> zipped = rdd.zip(doubles);
+    zipped.count();
+  }
+
+  @Test
+  public void zipPartitions() {
+    JavaRDD<Integer> rdd1 = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6), 2);
+    JavaRDD<String> rdd2 = sc.parallelize(Arrays.asList("1", "2", "3", "4"), 2);
+    FlatMapFunction2<Iterator<Integer>, Iterator<String>, Integer> sizesFn =
+      (Iterator<Integer> i, Iterator<String> s) -> {
+        int sizeI = 0;
+        int sizeS = 0;
+        while (i.hasNext()) {
+          sizeI += 1;
+          i.next();
+        }
+        while (s.hasNext()) {
+          sizeS += 1;
+          s.next();
+        }
+        return Arrays.asList(sizeI, sizeS);
+      };
+    JavaRDD<Integer> sizes = rdd1.zipPartitions(rdd2, sizesFn);
+    Assert.assertEquals("[3, 2, 3, 2]", sizes.collect().toString());
+  }
+
+  @Test
+  public void accumulators() {
+    JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5));
+
+    final Accumulator<Integer> intAccum = sc.intAccumulator(10);
+    rdd.foreach(x -> intAccum.add(x));
+    Assert.assertEquals((Integer) 25, intAccum.value());
+
+    final Accumulator<Double> doubleAccum = sc.doubleAccumulator(10.0);
+    rdd.foreach(x -> doubleAccum.add((double) x));
+    Assert.assertEquals((Double) 25.0, doubleAccum.value());
+
+    // Try a custom accumulator type
+    AccumulatorParam<Float> floatAccumulatorParam = new AccumulatorParam<Float>() {
+      public Float addInPlace(Float r, Float t) {
+        return r + t;
+      }
+
+      public Float addAccumulator(Float r, Float t) {
+        return r + t;
+      }
+
+      public Float zero(Float initialValue) {
+        return 0.0f;
+      }
+    };
+
+    final Accumulator<Float> floatAccum = sc.accumulator((Float) 10.0f, floatAccumulatorParam);
+    rdd.foreach(x -> floatAccum.add((float) x));
+    Assert.assertEquals((Float) 25.0f, floatAccum.value());
+
+    // Test the setValue method
+    floatAccum.setValue(5.0f);
+    Assert.assertEquals((Float) 5.0f, floatAccum.value());
+  }
+
+  @Test
+  public void keyBy() {
+    JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2));
+    List<Tuple2<String, Integer>> s = rdd.keyBy(x -> x.toString()).collect();
+    Assert.assertEquals(new Tuple2<String, Integer>("1", 1), s.get(0));
+    Assert.assertEquals(new Tuple2<String, Integer>("2", 2), s.get(1));
+  }
+
+  @Test
+  public void mapOnPairRDD() {
+    JavaRDD<Integer> rdd1 = sc.parallelize(Arrays.asList(1, 2, 3, 4));
+    JavaPairRDD<Integer, Integer> rdd2 =
+      rdd1.mapToPair(i -> new Tuple2<Integer, Integer>(i, i % 2));
+    JavaPairRDD<Integer, Integer> rdd3 =
+      rdd2.mapToPair(in -> new Tuple2<Integer, Integer>(in._2(), in._1()));
+    Assert.assertEquals(Arrays.asList(
+      new Tuple2<Integer, Integer>(1, 1),
+      new Tuple2<Integer, Integer>(0, 2),
+      new Tuple2<Integer, Integer>(1, 3),
+      new Tuple2<Integer, Integer>(0, 4)), rdd3.collect());
+  }
+
+  @Test
+  public void collectPartitions() {
+    JavaRDD<Integer> rdd1 = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6, 7), 3);
+
+    JavaPairRDD<Integer, Integer> rdd2 =
+      rdd1.mapToPair(i -> new Tuple2<Integer, Integer>(i, i % 2));
+    List[] parts = rdd1.collectPartitions(new int[]{0});
+    Assert.assertEquals(Arrays.asList(1, 2), parts[0]);
+
+    parts = rdd1.collectPartitions(new int[]{1, 2});
+    Assert.assertEquals(Arrays.asList(3, 4), parts[0]);
+    Assert.assertEquals(Arrays.asList(5, 6, 7), parts[1]);
+
+    Assert.assertEquals(Arrays.asList(new Tuple2<Integer, Integer>(1, 1),
+      new Tuple2<Integer, Integer>(2, 0)),
+      rdd2.collectPartitions(new int[]{0})[0]);
+
+    parts = rdd2.collectPartitions(new int[]{1, 2});
+    Assert.assertEquals(Arrays.asList(new Tuple2<Integer, Integer>(3, 1),
+      new Tuple2<Integer, Integer>(4, 0)), parts[0]);
+    Assert.assertEquals(Arrays.asList(new Tuple2<Integer, Integer>(5, 1),
+      new Tuple2<Integer, Integer>(6, 0),
+      new Tuple2<Integer, Integer>(7, 1)), parts[1]);
+  }
+
+  @Test
+  public void collectAsMapWithIntArrayValues() {
+    // Regression test for SPARK-1040
+    JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(new Integer[]{1}));
+    JavaPairRDD<Integer, int[]> pairRDD =
+      rdd.mapToPair(x -> new Tuple2<Integer, int[]>(x, new int[]{x}));
+    pairRDD.collect();  // Works fine
+    Map<Integer, int[]> map = pairRDD.collectAsMap();  // Used to crash with ClassCastException
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/181ec503/extras/java8-tests/src/test/java/org/apache/spark/streaming/Java8APISuite.java
----------------------------------------------------------------------
diff --git a/extras/java8-tests/src/test/java/org/apache/spark/streaming/Java8APISuite.java b/extras/java8-tests/src/test/java/org/apache/spark/streaming/Java8APISuite.java
new file mode 100644
index 0000000..43df0de
--- /dev/null
+++ b/extras/java8-tests/src/test/java/org/apache/spark/streaming/Java8APISuite.java
@@ -0,0 +1,841 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming;
+
+import java.io.Serializable;
+import java.util.*;
+
+import scala.Tuple2;
+
+import com.google.common.base.Optional;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.spark.HashPartitioner;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.function.PairFunction;
+import org.apache.spark.streaming.api.java.JavaDStream;
+import org.apache.spark.streaming.api.java.JavaPairDStream;
+
+/**
+ * Most of these tests replicate org.apache.spark.streaming.JavaAPISuite using java 8
+ * lambda syntax.
+ */
+public class Java8APISuite extends LocalJavaStreamingContext implements Serializable {
+
+  @Test
+  public void testMap() {
+    List<List<String>> inputData = Arrays.asList(
+      Arrays.asList("hello", "world"),
+      Arrays.asList("goodnight", "moon"));
+
+    List<List<Integer>> expected = Arrays.asList(
+      Arrays.asList(5, 5),
+      Arrays.asList(9, 4));
+
+    JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
+    JavaDStream<Integer> letterCount = stream.map(s -> s.length());
+    JavaTestUtils.attachTestOutputStream(letterCount);
+    List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 2, 2);
+
+    assertOrderInvariantEquals(expected, result);
+  }
+
+  @Test
+  public void testFilter() {
+    List<List<String>> inputData = Arrays.asList(
+      Arrays.asList("giants", "dodgers"),
+      Arrays.asList("yankees", "red socks"));
+
+    List<List<String>> expected = Arrays.asList(
+      Arrays.asList("giants"),
+      Arrays.asList("yankees"));
+
+    JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
+    JavaDStream<String> filtered = stream.filter(s -> s.contains("a"));
+    JavaTestUtils.attachTestOutputStream(filtered);
+    List<List<String>> result = JavaTestUtils.runStreams(ssc, 2, 2);
+
+    assertOrderInvariantEquals(expected, result);
+  }
+
+  @Test
+  public void testMapPartitions() {
+    List<List<String>> inputData = Arrays.asList(
+      Arrays.asList("giants", "dodgers"),
+      Arrays.asList("yankees", "red socks"));
+
+    List<List<String>> expected = Arrays.asList(
+      Arrays.asList("GIANTSDODGERS"),
+      Arrays.asList("YANKEESRED SOCKS"));
+
+    JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
+    JavaDStream<String> mapped = stream.mapPartitions(in -> {
+      String out = "";
+      while (in.hasNext()) {
+        out = out + in.next().toUpperCase();
+      }
+      return Lists.newArrayList(out);
+    });
+    JavaTestUtils.attachTestOutputStream(mapped);
+    List<List<String>> result = JavaTestUtils.runStreams(ssc, 2, 2);
+
+    Assert.assertEquals(expected, result);
+  }
+
+  @Test
+  public void testReduce() {
+    List<List<Integer>> inputData = Arrays.asList(
+      Arrays.asList(1, 2, 3),
+      Arrays.asList(4, 5, 6),
+      Arrays.asList(7, 8, 9));
+
+    List<List<Integer>> expected = Arrays.asList(
+      Arrays.asList(6),
+      Arrays.asList(15),
+      Arrays.asList(24));
+
+    JavaDStream<Integer> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
+    JavaDStream<Integer> reduced = stream.reduce((x, y) -> x + y);
+    JavaTestUtils.attachTestOutputStream(reduced);
+    List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 3, 3);
+
+    Assert.assertEquals(expected, result);
+  }
+
+  @Test
+  public void testReduceByWindow() {
+    List<List<Integer>> inputData = Arrays.asList(
+      Arrays.asList(1, 2, 3),
+      Arrays.asList(4, 5, 6),
+      Arrays.asList(7, 8, 9));
+
+    List<List<Integer>> expected = Arrays.asList(
+      Arrays.asList(6),
+      Arrays.asList(21),
+      Arrays.asList(39),
+      Arrays.asList(24));
+
+    JavaDStream<Integer> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
+    JavaDStream<Integer> reducedWindowed = stream.reduceByWindow((x, y) -> x + y,
+      (x, y) -> x - y, new Duration(2000), new Duration(1000));
+    JavaTestUtils.attachTestOutputStream(reducedWindowed);
+    List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 4, 4);
+
+    Assert.assertEquals(expected, result);
+  }
+
+  @Test
+  public void testTransform() {
+    List<List<Integer>> inputData = Arrays.asList(
+      Arrays.asList(1, 2, 3),
+      Arrays.asList(4, 5, 6),
+      Arrays.asList(7, 8, 9));
+
+    List<List<Integer>> expected = Arrays.asList(
+      Arrays.asList(3, 4, 5),
+      Arrays.asList(6, 7, 8),
+      Arrays.asList(9, 10, 11));
+
+    JavaDStream<Integer> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
+    JavaDStream<Integer> transformed = stream.transform(in -> in.map(i -> i + 2));
+
+    JavaTestUtils.attachTestOutputStream(transformed);
+    List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 3, 3);
+
+    assertOrderInvariantEquals(expected, result);
+  }
+
+  @Test
+  public void testVariousTransform() {
+    // tests whether all variations of transform can be called from Java
+
+    List<List<Integer>> inputData = Arrays.asList(Arrays.asList(1));
+    JavaDStream<Integer> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
+
+    List<List<Tuple2<String, Integer>>> pairInputData =
+      Arrays.asList(Arrays.asList(new Tuple2<String, Integer>("x", 1)));
+    JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(
+      JavaTestUtils.attachTestInputStream(ssc, pairInputData, 1));
+
+    JavaDStream<Integer> transformed1 = stream.transform(in -> null);
+    JavaDStream<Integer> transformed2 = stream.transform((x, time) -> null);
+    JavaPairDStream<String, Integer> transformed3 = stream.transformToPair(x -> null);
+    JavaPairDStream<String, Integer> transformed4 = stream.transformToPair((x, time) -> null);
+    JavaDStream<Integer> pairTransformed1 = pairStream.transform(x -> null);
+    JavaDStream<Integer> pairTransformed2 = pairStream.transform((x, time) -> null);
+    JavaPairDStream<String, String> pairTransformed3 = pairStream.transformToPair(x -> null);
+    JavaPairDStream<String, String> pairTransformed4 =
+      pairStream.transformToPair((x, time) -> null);
+
+  }
+
+  @Test
+  public void testTransformWith() {
+    List<List<Tuple2<String, String>>> stringStringKVStream1 = Arrays.asList(
+      Arrays.asList(
+        new Tuple2<String, String>("california", "dodgers"),
+        new Tuple2<String, String>("new york", "yankees")),
+      Arrays.asList(
+        new Tuple2<String, String>("california", "sharks"),
+        new Tuple2<String, String>("new york", "rangers")));
+
+    List<List<Tuple2<String, String>>> stringStringKVStream2 = Arrays.asList(
+      Arrays.asList(
+        new Tuple2<String, String>("california", "giants"),
+        new Tuple2<String, String>("new york", "mets")),
+      Arrays.asList(
+        new Tuple2<String, String>("california", "ducks"),
+        new Tuple2<String, String>("new york", "islanders")));
+
+
+    List<HashSet<Tuple2<String, Tuple2<String, String>>>> expected = Arrays.asList(
+      Sets.newHashSet(
+        new Tuple2<String, Tuple2<String, String>>("california",
+          new Tuple2<String, String>("dodgers", "giants")),
+        new Tuple2<String, Tuple2<String, String>>("new york",
+          new Tuple2<String, String>("yankees", "mets"))),
+      Sets.newHashSet(
+        new Tuple2<String, Tuple2<String, String>>("california",
+          new Tuple2<String, String>("sharks", "ducks")),
+        new Tuple2<String, Tuple2<String, String>>("new york",
+          new Tuple2<String, String>("rangers", "islanders"))));
+
+    JavaDStream<Tuple2<String, String>> stream1 = JavaTestUtils.attachTestInputStream(
+      ssc, stringStringKVStream1, 1);
+    JavaPairDStream<String, String> pairStream1 = JavaPairDStream.fromJavaDStream(stream1);
+
+    JavaDStream<Tuple2<String, String>> stream2 = JavaTestUtils.attachTestInputStream(
+      ssc, stringStringKVStream2, 1);
+    JavaPairDStream<String, String> pairStream2 = JavaPairDStream.fromJavaDStream(stream2);
+
+    JavaPairDStream<String, Tuple2<String, String>> joined =
+      pairStream1.transformWithToPair(pairStream2,(x, y, z) -> x.join(y));
+
+    JavaTestUtils.attachTestOutputStream(joined);
+    List<List<Tuple2<String, Tuple2<String, String>>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
+    List<HashSet<Tuple2<String, Tuple2<String, String>>>> unorderedResult = Lists.newArrayList();
+    for (List<Tuple2<String, Tuple2<String, String>>> res : result) {
+      unorderedResult.add(Sets.newHashSet(res));
+    }
+
+    Assert.assertEquals(expected, unorderedResult);
+  }
+
+
+  @Test
+  public void testVariousTransformWith() {
+    // tests whether all variations of transformWith can be called from Java
+
+    List<List<Integer>> inputData1 = Arrays.asList(Arrays.asList(1));
+    List<List<String>> inputData2 = Arrays.asList(Arrays.asList("x"));
+    JavaDStream<Integer> stream1 = JavaTestUtils.attachTestInputStream(ssc, inputData1, 1);
+    JavaDStream<String> stream2 = JavaTestUtils.attachTestInputStream(ssc, inputData2, 1);
+
+    List<List<Tuple2<String, Integer>>> pairInputData1 =
+      Arrays.asList(Arrays.asList(new Tuple2<String, Integer>("x", 1)));
+    List<List<Tuple2<Double, Character>>> pairInputData2 =
+      Arrays.asList(Arrays.asList(new Tuple2<Double, Character>(1.0, 'x')));
+    JavaPairDStream<String, Integer> pairStream1 = JavaPairDStream.fromJavaDStream(
+      JavaTestUtils.attachTestInputStream(ssc, pairInputData1, 1));
+    JavaPairDStream<Double, Character> pairStream2 = JavaPairDStream.fromJavaDStream(
+      JavaTestUtils.attachTestInputStream(ssc, pairInputData2, 1));
+
+    JavaDStream<Double> transformed1 = stream1.transformWith(stream2, (x, y, z) -> null);
+    JavaDStream<Double> transformed2 = stream1.transformWith(pairStream1,(x, y, z) -> null);
+
+    JavaPairDStream<Double, Double> transformed3 =
+      stream1.transformWithToPair(stream2,(x, y, z) -> null);
+
+    JavaPairDStream<Double, Double> transformed4 =
+      stream1.transformWithToPair(pairStream1,(x, y, z) -> null);
+
+    JavaDStream<Double> pairTransformed1 = pairStream1.transformWith(stream2,(x, y, z) -> null);
+
+    JavaDStream<Double> pairTransformed2_ =
+      pairStream1.transformWith(pairStream1,(x, y, z) -> null);
+
+    JavaPairDStream<Double, Double> pairTransformed3 =
+      pairStream1.transformWithToPair(stream2,(x, y, z) -> null);
+
+    JavaPairDStream<Double, Double> pairTransformed4 =
+      pairStream1.transformWithToPair(pairStream2,(x, y, z) -> null);
+  }
+
+  @Test
+  public void testStreamingContextTransform() {
+    List<List<Integer>> stream1input = Arrays.asList(
+      Arrays.asList(1),
+      Arrays.asList(2)
+    );
+
+    List<List<Integer>> stream2input = Arrays.asList(
+      Arrays.asList(3),
+      Arrays.asList(4)
+    );
+
+    List<List<Tuple2<Integer, String>>> pairStream1input = Arrays.asList(
+      Arrays.asList(new Tuple2<Integer, String>(1, "x")),
+      Arrays.asList(new Tuple2<Integer, String>(2, "y"))
+    );
+
+    List<List<Tuple2<Integer, Tuple2<Integer, String>>>> expected = Arrays.asList(
+      Arrays.asList(new Tuple2<Integer, Tuple2<Integer, String>>(1, new Tuple2<Integer, String>(1, "x"))),
+      Arrays.asList(new Tuple2<Integer, Tuple2<Integer, String>>(2, new Tuple2<Integer, String>(2, "y")))
+    );
+
+    JavaDStream<Integer> stream1 = JavaTestUtils.attachTestInputStream(ssc, stream1input, 1);
+    JavaDStream<Integer> stream2 = JavaTestUtils.attachTestInputStream(ssc, stream2input, 1);
+    JavaPairDStream<Integer, String> pairStream1 = JavaPairDStream.fromJavaDStream(
+      JavaTestUtils.attachTestInputStream(ssc, pairStream1input, 1));
+
+    List<JavaDStream<?>> listOfDStreams1 = Arrays.<JavaDStream<?>>asList(stream1, stream2);
+
+    // This is just to test whether this transform to JavaStream compiles
+    JavaDStream<Long> transformed1 = ssc.transform(
+      listOfDStreams1, (List<JavaRDD<?>> listOfRDDs, Time time) -> {
+      assert (listOfRDDs.size() == 2);
+      return null;
+    });
+
+    List<JavaDStream<?>> listOfDStreams2 =
+      Arrays.<JavaDStream<?>>asList(stream1, stream2, pairStream1.toJavaDStream());
+
+    JavaPairDStream<Integer, Tuple2<Integer, String>> transformed2 = ssc.transformToPair(
+      listOfDStreams2, (List<JavaRDD<?>> listOfRDDs, Time time) -> {
+      assert (listOfRDDs.size() == 3);
+      JavaRDD<Integer> rdd1 = (JavaRDD<Integer>) listOfRDDs.get(0);
+      JavaRDD<Integer> rdd2 = (JavaRDD<Integer>) listOfRDDs.get(1);
+      JavaRDD<Tuple2<Integer, String>> rdd3 = (JavaRDD<Tuple2<Integer, String>>) listOfRDDs.get(2);
+      JavaPairRDD<Integer, String> prdd3 = JavaPairRDD.fromJavaRDD(rdd3);
+      PairFunction<Integer, Integer, Integer> mapToTuple =
+        (Integer i) -> new Tuple2<Integer, Integer>(i, i);
+      return rdd1.union(rdd2).mapToPair(mapToTuple).join(prdd3);
+    });
+    JavaTestUtils.attachTestOutputStream(transformed2);
+    List<List<Tuple2<Integer, Tuple2<Integer, String>>>> result =
+      JavaTestUtils.runStreams(ssc, 2, 2);
+    Assert.assertEquals(expected, result);
+  }
+
+  @Test
+  public void testFlatMap() {
+    List<List<String>> inputData = Arrays.asList(
+      Arrays.asList("go", "giants"),
+      Arrays.asList("boo", "dodgers"),
+      Arrays.asList("athletics"));
+
+    List<List<String>> expected = Arrays.asList(
+      Arrays.asList("g", "o", "g", "i", "a", "n", "t", "s"),
+      Arrays.asList("b", "o", "o", "d", "o", "d", "g", "e", "r", "s"),
+      Arrays.asList("a", "t", "h", "l", "e", "t", "i", "c", "s"));
+
+    JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
+    JavaDStream<String> flatMapped = stream.flatMap(s -> Lists.newArrayList(s.split("(?!^)")));
+    JavaTestUtils.attachTestOutputStream(flatMapped);
+    List<List<String>> result = JavaTestUtils.runStreams(ssc, 3, 3);
+
+    assertOrderInvariantEquals(expected, result);
+  }
+
+  @Test
+  public void testPairFlatMap() {
+    List<List<String>> inputData = Arrays.asList(
+      Arrays.asList("giants"),
+      Arrays.asList("dodgers"),
+      Arrays.asList("athletics"));
+
+    List<List<Tuple2<Integer, String>>> expected = Arrays.asList(
+      Arrays.asList(
+        new Tuple2<Integer, String>(6, "g"),
+        new Tuple2<Integer, String>(6, "i"),
+        new Tuple2<Integer, String>(6, "a"),
+        new Tuple2<Integer, String>(6, "n"),
+        new Tuple2<Integer, String>(6, "t"),
+        new Tuple2<Integer, String>(6, "s")),
+      Arrays.asList(
+        new Tuple2<Integer, String>(7, "d"),
+        new Tuple2<Integer, String>(7, "o"),
+        new Tuple2<Integer, String>(7, "d"),
+        new Tuple2<Integer, String>(7, "g"),
+        new Tuple2<Integer, String>(7, "e"),
+        new Tuple2<Integer, String>(7, "r"),
+        new Tuple2<Integer, String>(7, "s")),
+      Arrays.asList(
+        new Tuple2<Integer, String>(9, "a"),
+        new Tuple2<Integer, String>(9, "t"),
+        new Tuple2<Integer, String>(9, "h"),
+        new Tuple2<Integer, String>(9, "l"),
+        new Tuple2<Integer, String>(9, "e"),
+        new Tuple2<Integer, String>(9, "t"),
+        new Tuple2<Integer, String>(9, "i"),
+        new Tuple2<Integer, String>(9, "c"),
+        new Tuple2<Integer, String>(9, "s")));
+
+    JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
+    JavaPairDStream<Integer, String> flatMapped = stream.flatMapToPair(s -> {
+      List<Tuple2<Integer, String>> out = Lists.newArrayList();
+      for (String letter : s.split("(?!^)")) {
+        out.add(new Tuple2<Integer, String>(s.length(), letter));
+      }
+      return out;
+    });
+
+    JavaTestUtils.attachTestOutputStream(flatMapped);
+    List<List<Tuple2<Integer, String>>> result = JavaTestUtils.runStreams(ssc, 3, 3);
+
+    Assert.assertEquals(expected, result);
+  }
+
+  /*
+   * Performs an order-invariant comparison of lists representing two RDD streams. This allows
+   * us to account for ordering variation within individual RDD's which occurs during windowing.
+   */
+  public static <T extends Comparable<T>> void assertOrderInvariantEquals(
+    List<List<T>> expected, List<List<T>> actual) {
+    for (List<T> list : expected) {
+      Collections.sort(list);
+    }
+    for (List<T> list : actual) {
+      Collections.sort(list);
+    }
+    Assert.assertEquals(expected, actual);
+  }
+
+  @Test
+  public void testPairFilter() {
+    List<List<String>> inputData = Arrays.asList(
+      Arrays.asList("giants", "dodgers"),
+      Arrays.asList("yankees", "red socks"));
+
+    List<List<Tuple2<String, Integer>>> expected = Arrays.asList(
+      Arrays.asList(new Tuple2<String, Integer>("giants", 6)),
+      Arrays.asList(new Tuple2<String, Integer>("yankees", 7)));
+
+    JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
+    JavaPairDStream<String, Integer> pairStream =
+      stream.mapToPair(x -> new Tuple2<>(x, x.length()));
+    JavaPairDStream<String, Integer> filtered = pairStream.filter(x -> x._1().contains("a"));
+    JavaTestUtils.attachTestOutputStream(filtered);
+    List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
+
+    Assert.assertEquals(expected, result);
+  }
+
+  List<List<Tuple2<String, String>>> stringStringKVStream = Arrays.asList(
+    Arrays.asList(new Tuple2<String, String>("california", "dodgers"),
+      new Tuple2<String, String>("california", "giants"),
+      new Tuple2<String, String>("new york", "yankees"),
+      new Tuple2<String, String>("new york", "mets")),
+    Arrays.asList(new Tuple2<String, String>("california", "sharks"),
+      new Tuple2<String, String>("california", "ducks"),
+      new Tuple2<String, String>("new york", "rangers"),
+      new Tuple2<String, String>("new york", "islanders")));
+
+  List<List<Tuple2<String, Integer>>> stringIntKVStream = Arrays.asList(
+    Arrays.asList(
+      new Tuple2<String, Integer>("california", 1),
+      new Tuple2<String, Integer>("california", 3),
+      new Tuple2<String, Integer>("new york", 4),
+      new Tuple2<String, Integer>("new york", 1)),
+    Arrays.asList(
+      new Tuple2<String, Integer>("california", 5),
+      new Tuple2<String, Integer>("california", 5),
+      new Tuple2<String, Integer>("new york", 3),
+      new Tuple2<String, Integer>("new york", 1)));
+
+  @Test
+  public void testPairMap() { // Maps pair -> pair of different type
+    List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream;
+
+    List<List<Tuple2<Integer, String>>> expected = Arrays.asList(
+      Arrays.asList(
+        new Tuple2<Integer, String>(1, "california"),
+        new Tuple2<Integer, String>(3, "california"),
+        new Tuple2<Integer, String>(4, "new york"),
+        new Tuple2<Integer, String>(1, "new york")),
+      Arrays.asList(
+        new Tuple2<Integer, String>(5, "california"),
+        new Tuple2<Integer, String>(5, "california"),
+        new Tuple2<Integer, String>(3, "new york"),
+        new Tuple2<Integer, String>(1, "new york")));
+
+    JavaDStream<Tuple2<String, Integer>> stream =
+      JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
+    JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
+    JavaPairDStream<Integer, String> reversed = pairStream.mapToPair(x -> x.swap());
+    JavaTestUtils.attachTestOutputStream(reversed);
+    List<List<Tuple2<Integer, String>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
+
+    Assert.assertEquals(expected, result);
+  }
+
+  @Test
+  public void testPairMapPartitions() { // Maps pair -> pair of different type
+    List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream;
+
+    List<List<Tuple2<Integer, String>>> expected = Arrays.asList(
+      Arrays.asList(
+        new Tuple2<Integer, String>(1, "california"),
+        new Tuple2<Integer, String>(3, "california"),
+        new Tuple2<Integer, String>(4, "new york"),
+        new Tuple2<Integer, String>(1, "new york")),
+      Arrays.asList(
+        new Tuple2<Integer, String>(5, "california"),
+        new Tuple2<Integer, String>(5, "california"),
+        new Tuple2<Integer, String>(3, "new york"),
+        new Tuple2<Integer, String>(1, "new york")));
+
+    JavaDStream<Tuple2<String, Integer>> stream =
+      JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
+    JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
+    JavaPairDStream<Integer, String> reversed = pairStream.mapPartitionsToPair(in -> {
+      LinkedList<Tuple2<Integer, String>> out = new LinkedList<Tuple2<Integer, String>>();
+      while (in.hasNext()) {
+        Tuple2<String, Integer> next = in.next();
+        out.add(next.swap());
+      }
+      return out;
+    });
+
+    JavaTestUtils.attachTestOutputStream(reversed);
+    List<List<Tuple2<Integer, String>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
+
+    Assert.assertEquals(expected, result);
+  }
+
+  @Test
+  public void testPairMap2() { // Maps pair -> single
+    List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream;
+
+    List<List<Integer>> expected = Arrays.asList(
+      Arrays.asList(1, 3, 4, 1),
+      Arrays.asList(5, 5, 3, 1));
+
+    JavaDStream<Tuple2<String, Integer>> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
+    JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
+    JavaDStream<Integer> reversed = pairStream.map(in -> in._2());
+    JavaTestUtils.attachTestOutputStream(reversed);
+    List<List<Tuple2<Integer, String>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
+
+    Assert.assertEquals(expected, result);
+  }
+
+  @Test
+  public void testPairToPairFlatMapWithChangingTypes() { // Maps pair -> pair
+    List<List<Tuple2<String, Integer>>> inputData = Arrays.asList(
+      Arrays.asList(
+        new Tuple2<String, Integer>("hi", 1),
+        new Tuple2<String, Integer>("ho", 2)),
+      Arrays.asList(
+        new Tuple2<String, Integer>("hi", 1),
+        new Tuple2<String, Integer>("ho", 2)));
+
+    List<List<Tuple2<Integer, String>>> expected = Arrays.asList(
+      Arrays.asList(
+        new Tuple2<Integer, String>(1, "h"),
+        new Tuple2<Integer, String>(1, "i"),
+        new Tuple2<Integer, String>(2, "h"),
+        new Tuple2<Integer, String>(2, "o")),
+      Arrays.asList(
+        new Tuple2<Integer, String>(1, "h"),
+        new Tuple2<Integer, String>(1, "i"),
+        new Tuple2<Integer, String>(2, "h"),
+        new Tuple2<Integer, String>(2, "o")));
+
+    JavaDStream<Tuple2<String, Integer>> stream =
+      JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
+    JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
+    JavaPairDStream<Integer, String> flatMapped = pairStream.flatMapToPair(in -> {
+      List<Tuple2<Integer, String>> out = new LinkedList<Tuple2<Integer, String>>();
+      for (Character s : in._1().toCharArray()) {
+        out.add(new Tuple2<Integer, String>(in._2(), s.toString()));
+      }
+      return out;
+    });
+
+    JavaTestUtils.attachTestOutputStream(flatMapped);
+    List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
+
+    Assert.assertEquals(expected, result);
+  }
+
+  @Test
+  public void testPairReduceByKey() {
+    List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream;
+
+    List<List<Tuple2<String, Integer>>> expected = Arrays.asList(
+      Arrays.asList(
+        new Tuple2<String, Integer>("california", 4),
+        new Tuple2<String, Integer>("new york", 5)),
+      Arrays.asList(
+        new Tuple2<String, Integer>("california", 10),
+        new Tuple2<String, Integer>("new york", 4)));
+
+    JavaDStream<Tuple2<String, Integer>> stream = JavaTestUtils.attachTestInputStream(
+      ssc, inputData, 1);
+    JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
+
+    JavaPairDStream<String, Integer> reduced = pairStream.reduceByKey((x, y) -> x + y);
+
+    JavaTestUtils.attachTestOutputStream(reduced);
+    List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
+
+    Assert.assertEquals(expected, result);
+  }
+
+  @Test
+  public void testCombineByKey() {
+    List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream;
+
+    List<List<Tuple2<String, Integer>>> expected = Arrays.asList(
+      Arrays.asList(
+        new Tuple2<String, Integer>("california", 4),
+        new Tuple2<String, Integer>("new york", 5)),
+      Arrays.asList(
+        new Tuple2<String, Integer>("california", 10),
+        new Tuple2<String, Integer>("new york", 4)));
+
+    JavaDStream<Tuple2<String, Integer>> stream = JavaTestUtils.attachTestInputStream(
+      ssc, inputData, 1);
+    JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
+
+    JavaPairDStream<String, Integer> combined = pairStream.<Integer>combineByKey(i -> i,
+      (x, y) -> x + y, (x, y) -> x + y, new HashPartitioner(2));
+
+    JavaTestUtils.attachTestOutputStream(combined);
+    List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
+
+    Assert.assertEquals(expected, result);
+  }
+
+  @Test
+  public void testReduceByKeyAndWindow() {
+    List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream;
+
+    List<List<Tuple2<String, Integer>>> expected = Arrays.asList(
+      Arrays.asList(new Tuple2<String, Integer>("california", 4),
+        new Tuple2<String, Integer>("new york", 5)),
+      Arrays.asList(new Tuple2<String, Integer>("california", 14),
+        new Tuple2<String, Integer>("new york", 9)),
+      Arrays.asList(new Tuple2<String, Integer>("california", 10),
+        new Tuple2<String, Integer>("new york", 4)));
+
+    JavaDStream<Tuple2<String, Integer>> stream =
+      JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
+    JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
+
+    JavaPairDStream<String, Integer> reduceWindowed =
+      pairStream.reduceByKeyAndWindow((x, y) -> x + y, new Duration(2000), new Duration(1000));
+    JavaTestUtils.attachTestOutputStream(reduceWindowed);
+    List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(ssc, 3, 3);
+
+    Assert.assertEquals(expected, result);
+  }
+
+  @Test
+  public void testUpdateStateByKey() {
+    List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream;
+
+    List<List<Tuple2<String, Integer>>> expected = Arrays.asList(
+      Arrays.asList(new Tuple2<String, Integer>("california", 4),
+        new Tuple2<String, Integer>("new york", 5)),
+      Arrays.asList(new Tuple2<String, Integer>("california", 14),
+        new Tuple2<String, Integer>("new york", 9)),
+      Arrays.asList(new Tuple2<String, Integer>("california", 14),
+        new Tuple2<String, Integer>("new york", 9)));
+
+    JavaDStream<Tuple2<String, Integer>> stream =
+      JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
+    JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
+
+    JavaPairDStream<String, Integer> updated = pairStream.updateStateByKey((values, state) -> {
+      int out = 0;
+      if (state.isPresent()) {
+        out = out + state.get();
+      }
+      for (Integer v : values) {
+        out = out + v;
+      }
+      return Optional.of(out);
+    });
+
+    JavaTestUtils.attachTestOutputStream(updated);
+    List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(ssc, 3, 3);
+
+    Assert.assertEquals(expected, result);
+  }
+
+  @Test
+  public void testReduceByKeyAndWindowWithInverse() {
+    List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream;
+
+    List<List<Tuple2<String, Integer>>> expected = Arrays.asList(
+      Arrays.asList(new Tuple2<String, Integer>("california", 4),
+        new Tuple2<String, Integer>("new york", 5)),
+      Arrays.asList(new Tuple2<String, Integer>("california", 14),
+        new Tuple2<String, Integer>("new york", 9)),
+      Arrays.asList(new Tuple2<String, Integer>("california", 10),
+        new Tuple2<String, Integer>("new york", 4)));
+
+    JavaDStream<Tuple2<String, Integer>> stream =
+      JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
+    JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
+
+    JavaPairDStream<String, Integer> reduceWindowed =
+      pairStream.reduceByKeyAndWindow((x, y) -> x + y, (x, y) -> x - y, new Duration(2000),
+        new Duration(1000));
+    JavaTestUtils.attachTestOutputStream(reduceWindowed);
+    List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(ssc, 3, 3);
+
+    Assert.assertEquals(expected, result);
+  }
+
+  @Test
+  public void testPairTransform() {
+    List<List<Tuple2<Integer, Integer>>> inputData = Arrays.asList(
+      Arrays.asList(
+        new Tuple2<Integer, Integer>(3, 5),
+        new Tuple2<Integer, Integer>(1, 5),
+        new Tuple2<Integer, Integer>(4, 5),
+        new Tuple2<Integer, Integer>(2, 5)),
+      Arrays.asList(
+        new Tuple2<Integer, Integer>(2, 5),
+        new Tuple2<Integer, Integer>(3, 5),
+        new Tuple2<Integer, Integer>(4, 5),
+        new Tuple2<Integer, Integer>(1, 5)));
+
+    List<List<Tuple2<Integer, Integer>>> expected = Arrays.asList(
+      Arrays.asList(
+        new Tuple2<Integer, Integer>(1, 5),
+        new Tuple2<Integer, Integer>(2, 5),
+        new Tuple2<Integer, Integer>(3, 5),
+        new Tuple2<Integer, Integer>(4, 5)),
+      Arrays.asList(
+        new Tuple2<Integer, Integer>(1, 5),
+        new Tuple2<Integer, Integer>(2, 5),
+        new Tuple2<Integer, Integer>(3, 5),
+        new Tuple2<Integer, Integer>(4, 5)));
+
+    JavaDStream<Tuple2<Integer, Integer>> stream = JavaTestUtils.attachTestInputStream(
+      ssc, inputData, 1);
+    JavaPairDStream<Integer, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
+
+    JavaPairDStream<Integer, Integer> sorted = pairStream.transformToPair(in -> in.sortByKey());
+
+    JavaTestUtils.attachTestOutputStream(sorted);
+    List<List<Tuple2<String, String>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
+
+    Assert.assertEquals(expected, result);
+  }
+
+  @Test
+  public void testPairToNormalRDDTransform() {
+    List<List<Tuple2<Integer, Integer>>> inputData = Arrays.asList(
+      Arrays.asList(
+        new Tuple2<Integer, Integer>(3, 5),
+        new Tuple2<Integer, Integer>(1, 5),
+        new Tuple2<Integer, Integer>(4, 5),
+        new Tuple2<Integer, Integer>(2, 5)),
+      Arrays.asList(
+        new Tuple2<Integer, Integer>(2, 5),
+        new Tuple2<Integer, Integer>(3, 5),
+        new Tuple2<Integer, Integer>(4, 5),
+        new Tuple2<Integer, Integer>(1, 5)));
+
+    List<List<Integer>> expected = Arrays.asList(
+      Arrays.asList(3, 1, 4, 2),
+      Arrays.asList(2, 3, 4, 1));
+
+    JavaDStream<Tuple2<Integer, Integer>> stream = JavaTestUtils.attachTestInputStream(
+      ssc, inputData, 1);
+    JavaPairDStream<Integer, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
+    JavaDStream<Integer> firstParts = pairStream.transform(in -> in.map(x -> x._1()));
+    JavaTestUtils.attachTestOutputStream(firstParts);
+    List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 2, 2);
+
+    Assert.assertEquals(expected, result);
+  }
+
+  @Test
+  public void testMapValues() {
+    List<List<Tuple2<String, String>>> inputData = stringStringKVStream;
+
+    List<List<Tuple2<String, String>>> expected = Arrays.asList(
+      Arrays.asList(new Tuple2<String, String>("california", "DODGERS"),
+        new Tuple2<String, String>("california", "GIANTS"),
+        new Tuple2<String, String>("new york", "YANKEES"),
+        new Tuple2<String, String>("new york", "METS")),
+      Arrays.asList(new Tuple2<String, String>("california", "SHARKS"),
+        new Tuple2<String, String>("california", "DUCKS"),
+        new Tuple2<String, String>("new york", "RANGERS"),
+        new Tuple2<String, String>("new york", "ISLANDERS")));
+
+    JavaDStream<Tuple2<String, String>> stream = JavaTestUtils.attachTestInputStream(
+      ssc, inputData, 1);
+    JavaPairDStream<String, String> pairStream = JavaPairDStream.fromJavaDStream(stream);
+
+    JavaPairDStream<String, String> mapped = pairStream.mapValues(s -> s.toUpperCase());
+    JavaTestUtils.attachTestOutputStream(mapped);
+    List<List<Tuple2<String, String>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
+
+    Assert.assertEquals(expected, result);
+  }
+
+  @Test
+  public void testFlatMapValues() {
+    List<List<Tuple2<String, String>>> inputData = stringStringKVStream;
+
+    List<List<Tuple2<String, String>>> expected = Arrays.asList(
+      Arrays.asList(new Tuple2<String, String>("california", "dodgers1"),
+        new Tuple2<String, String>("california", "dodgers2"),
+        new Tuple2<String, String>("california", "giants1"),
+        new Tuple2<String, String>("california", "giants2"),
+        new Tuple2<String, String>("new york", "yankees1"),
+        new Tuple2<String, String>("new york", "yankees2"),
+        new Tuple2<String, String>("new york", "mets1"),
+        new Tuple2<String, String>("new york", "mets2")),
+      Arrays.asList(new Tuple2<String, String>("california", "sharks1"),
+        new Tuple2<String, String>("california", "sharks2"),
+        new Tuple2<String, String>("california", "ducks1"),
+        new Tuple2<String, String>("california", "ducks2"),
+        new Tuple2<String, String>("new york", "rangers1"),
+        new Tuple2<String, String>("new york", "rangers2"),
+        new Tuple2<String, String>("new york", "islanders1"),
+        new Tuple2<String, String>("new york", "islanders2")));
+
+    JavaDStream<Tuple2<String, String>> stream = JavaTestUtils.attachTestInputStream(
+      ssc, inputData, 1);
+    JavaPairDStream<String, String> pairStream = JavaPairDStream.fromJavaDStream(stream);
+
+
+    JavaPairDStream<String, String> flatMapped = pairStream.flatMapValues(in -> {
+      List<String> out = new ArrayList<String>();
+      out.add(in + "1");
+      out.add(in + "2");
+      return out;
+    });
+    JavaTestUtils.attachTestOutputStream(flatMapped);
+    List<List<Tuple2<String, String>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
+    Assert.assertEquals(expected, result);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/181ec503/extras/java8-tests/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/extras/java8-tests/src/test/resources/log4j.properties b/extras/java8-tests/src/test/resources/log4j.properties
new file mode 100644
index 0000000..180beaa
--- /dev/null
+++ b/extras/java8-tests/src/test/resources/log4j.properties
@@ -0,0 +1,28 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Set everything to be logged to the file target/unit-tests.log
+log4j.rootCategory=INFO, file
+log4j.appender.file=org.apache.log4j.FileAppender
+log4j.appender.file.append=false
+log4j.appender.file.file=target/unit-tests.log
+log4j.appender.file.layout=org.apache.log4j.PatternLayout
+log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %p %c{1}: %m%n
+
+# Ignore messages below warning level from Jetty, because it's a bit verbose
+log4j.logger.org.eclipse.jetty=WARN
+org.eclipse.jetty.LEVEL=WARN

http://git-wip-us.apache.org/repos/asf/spark/blob/181ec503/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 7e28d7c..c59fada 100644
--- a/pom.xml
+++ b/pom.xml
@@ -711,6 +711,31 @@
       </modules>
 
     </profile>
+    <profile>
+      <id>java8-tests</id>
+      <build>
+        <plugins>
+          <!-- Needed for publishing test jars as it is needed by java8-tests -->
+          <plugin>
+            <groupId>org.apache.maven.plugins</groupId>
+            <artifactId>maven-jar-plugin</artifactId>
+            <version>2.4</version>
+            <executions>
+              <execution>
+                <goals>
+                  <goal>test-jar</goal>
+                </goals>
+              </execution>
+            </executions>
+          </plugin>
+        </plugins>
+      </build>
+
+      <modules>
+        <module>extras/java8-tests</module>
+      </modules>
+
+    </profile>
 
     <profile>
       <id>yarn</id>

http://git-wip-us.apache.org/repos/asf/spark/blob/181ec503/project/SparkBuild.scala
----------------------------------------------------------------------
diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala
index d45f677..aa17848 100644
--- a/project/SparkBuild.scala
+++ b/project/SparkBuild.scala
@@ -90,6 +90,14 @@ object SparkBuild extends Build {
   }
   lazy val hadoopClient = if (hadoopVersion.startsWith("0.20.") || hadoopVersion == "1.0.0") "hadoop-core" else "hadoop-client"
   val maybeAvro = if (hadoopVersion.startsWith("0.23.") && isYarnEnabled) Seq("org.apache.avro" % "avro" % "1.7.4") else Seq()
+
+  // Conditionally include the java 8 sub-project
+  lazy val javaVersion = System.getProperty("java.specification.version")
+  lazy val isJava8Enabled = javaVersion.toDouble >= "1.8".toDouble
+  val maybeJava8Tests = if (isJava8Enabled) Seq[ProjectReference](java8Tests) else Seq[ProjectReference]()
+  lazy val java8Tests = Project("java8-tests", file("extras/java8-tests"), settings = java8TestsSettings).
+    dependsOn(core) dependsOn(streaming % "compile->compile;test->test")
+
   // Conditionally include the yarn sub-project
   lazy val yarnAlpha = Project("yarn-alpha", file("yarn/alpha"), settings = yarnAlphaSettings) dependsOn(core)
   lazy val yarn = Project("yarn", file("yarn/stable"), settings = yarnSettings) dependsOn(core)
@@ -118,10 +126,11 @@ object SparkBuild extends Build {
   lazy val examples = Project("examples", file("examples"), settings = examplesSettings)
     .dependsOn(core, mllib, graphx, bagel, streaming, externalTwitter) dependsOn(allExternal: _*)
 
-  // Everything except assembly, tools and examples belong to packageProjects
+  // Everything except assembly, tools, java8Tests and examples belong to packageProjects
   lazy val packageProjects = Seq[ProjectReference](core, repl, bagel, streaming, mllib, graphx) ++ maybeYarnRef
 
-  lazy val allProjects = packageProjects ++ allExternalRefs ++ Seq[ProjectReference](examples, tools, assemblyProj)
+  lazy val allProjects = packageProjects ++ allExternalRefs ++
+    Seq[ProjectReference](examples, tools, assemblyProj) ++ maybeJava8Tests
 
   def sharedSettings = Defaults.defaultSettings ++ Seq(
     organization       := "org.apache.spark",
@@ -132,6 +141,7 @@ object SparkBuild extends Build {
     javacOptions := Seq("-target", JAVAC_JVM_VERSION, "-source", JAVAC_JVM_VERSION),
     unmanagedJars in Compile <<= baseDirectory map { base => (base / "lib" ** "*.jar").classpath },
     retrieveManaged := true,
+    javaHome := Properties.envOrNone("JAVA_HOME").map(file),
     // This is to add convenience of enabling sbt -Dsbt.offline=true for making the build offline.
     offline := "true".equalsIgnoreCase(sys.props("sbt.offline")),
     retrievePattern := "[type]s/[artifact](-[revision])(-[classifier]).[ext]",
@@ -370,6 +380,12 @@ object SparkBuild extends Build {
     name := "spark-yarn"
   )
 
+  def java8TestsSettings = sharedSettings ++ Seq(
+    name := "java8-tests",
+    javacOptions := Seq("-target", "1.8", "-source", "1.8"),
+    testOptions += Tests.Argument(TestFrameworks.JUnit, "-v", "-a")
+  )
+
   // Conditionally include the YARN dependencies because some tools look at all sub-projects and will complain
   // if we refer to nonexistent dependencies (e.g. hadoop-yarn-api from a Hadoop version without YARN).
   def extraYarnSettings = if(isYarnEnabled) yarnEnabledSettings else Seq()

http://git-wip-us.apache.org/repos/asf/spark/blob/181ec503/sbt/sbt-launch-lib.bash
----------------------------------------------------------------------
diff --git a/sbt/sbt-launch-lib.bash b/sbt/sbt-launch-lib.bash
index d65bbdc..00a6b41 100755
--- a/sbt/sbt-launch-lib.bash
+++ b/sbt/sbt-launch-lib.bash
@@ -16,7 +16,14 @@ declare -a residual_args
 declare -a java_args
 declare -a scalac_args
 declare -a sbt_commands
-declare java_cmd=java
+
+if test -x "$JAVA_HOME/bin/java"; then
+    echo -e "Using $JAVA_HOME as default JAVA_HOME."
+    echo "Note, this will be overridden by -java-home if it is set."
+    declare java_cmd="$JAVA_HOME/bin/java"
+else
+    declare java_cmd=java
+fi
 
 echoerr () {
   echo 1>&2 "$@"
@@ -131,7 +138,7 @@ process_args () {
 
        -sbt-jar) require_arg path "$1" "$2" && sbt_jar="$2" && shift 2 ;;
    -sbt-version) require_arg version "$1" "$2" && sbt_version="$2" && shift 2 ;;
-     -java-home) require_arg path "$1" "$2" && java_cmd="$2/bin/java" && shift 2 ;;
+     -java-home) require_arg path "$1" "$2" && java_cmd="$2/bin/java" && export JAVA_HOME=$2 && shift 2 ;;
 
             -D*) addJava "$1" && shift ;;
             -J*) addJava "${1:2}" && shift ;;

http://git-wip-us.apache.org/repos/asf/spark/blob/181ec503/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStream.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStream.scala
index e23b725..721d502 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStream.scala
@@ -41,7 +41,7 @@ class JavaDStream[T](val dstream: DStream[T])(implicit val classTag: ClassTag[T]
 
   /** Return a new DStream containing only the elements that satisfy a predicate. */
   def filter(f: JFunction[T, java.lang.Boolean]): JavaDStream[T] =
-    dstream.filter((x => f(x).booleanValue()))
+    dstream.filter((x => f.call(x).booleanValue()))
 
   /** Persist RDDs of this DStream with the default storage level (MEMORY_ONLY_SER) */
   def cache(): JavaDStream[T] = dstream.cache()

http://git-wip-us.apache.org/repos/asf/spark/blob/181ec503/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala
index 7aa7ead..a85cd04 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala
@@ -17,19 +17,20 @@
 
 package org.apache.spark.streaming.api.java
 
-import java.util.{List => JList}
+import java.util
 import java.lang.{Long => JLong}
+import java.util.{List => JList}
 
 import scala.collection.JavaConversions._
 import scala.reflect.ClassTag
 
-import org.apache.spark.streaming._
-import org.apache.spark.api.java.{JavaPairRDD, JavaRDDLike, JavaRDD}
-import org.apache.spark.api.java.function.{Function => JFunction, Function2 => JFunction2}
-import org.apache.spark.api.java.function.{Function3 => JFunction3, _}
-import java.util
+import org.apache.spark.api.java.{JavaPairRDD, JavaRDD, JavaRDDLike}
+import org.apache.spark.api.java.JavaPairRDD._
+import org.apache.spark.api.java.JavaSparkContext.fakeClassTag
+import org.apache.spark.api.java.function.{Function => JFunction, Function2 => JFunction2, Function3 => JFunction3, _}
 import org.apache.spark.rdd.RDD
-import JavaDStream._
+import org.apache.spark.streaming._
+import org.apache.spark.streaming.api.java.JavaDStream._
 import org.apache.spark.streaming.dstream.DStream
 
 trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T, R]]
@@ -123,23 +124,23 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
    * this DStream. Applying glom() to an RDD coalesces all elements within each partition into
    * an array.
    */
-  def glom(): JavaDStream[JList[T]] = {
+  def glom(): JavaDStream[JList[T]] =
     new JavaDStream(dstream.glom().map(x => new java.util.ArrayList[T](x.toSeq)))
-  }
+
 
 
   /** Return the [[org.apache.spark.streaming.StreamingContext]] associated with this DStream */
-  def context(): StreamingContext = dstream.context()
+  def context(): StreamingContext = dstream.context
 
   /** Return a new DStream by applying a function to all elements of this DStream. */
   def map[R](f: JFunction[T, R]): JavaDStream[R] = {
-    new JavaDStream(dstream.map(f)(f.returnType()))(f.returnType())
+    new JavaDStream(dstream.map(f)(fakeClassTag))(fakeClassTag)
   }
 
   /** Return a new DStream by applying a function to all elements of this DStream. */
-  def map[K2, V2](f: PairFunction[T, K2, V2]): JavaPairDStream[K2, V2] = {
-    def cm = implicitly[ClassTag[Tuple2[_, _]]].asInstanceOf[ClassTag[Tuple2[K2, V2]]]
-    new JavaPairDStream(dstream.map(f)(cm))(f.keyType(), f.valueType())
+  def mapToPair[K2, V2](f: PairFunction[T, K2, V2]): JavaPairDStream[K2, V2] = {
+    def cm: ClassTag[(K2, V2)] = fakeClassTag
+    new JavaPairDStream(dstream.map[(K2, V2)](f)(cm))(fakeClassTag[K2], fakeClassTag[V2])
   }
 
   /**
@@ -148,19 +149,19 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
    */
   def flatMap[U](f: FlatMapFunction[T, U]): JavaDStream[U] = {
     import scala.collection.JavaConverters._
-    def fn = (x: T) => f.apply(x).asScala
-    new JavaDStream(dstream.flatMap(fn)(f.elementType()))(f.elementType())
+    def fn = (x: T) => f.call(x).asScala
+    new JavaDStream(dstream.flatMap(fn)(fakeClassTag[U]))(fakeClassTag[U])
   }
 
   /**
    * Return a new DStream by applying a function to all elements of this DStream,
    * and then flattening the results
    */
-  def flatMap[K2, V2](f: PairFlatMapFunction[T, K2, V2]): JavaPairDStream[K2, V2] = {
+  def flatMapToPair[K2, V2](f: PairFlatMapFunction[T, K2, V2]): JavaPairDStream[K2, V2] = {
     import scala.collection.JavaConverters._
-    def fn = (x: T) => f.apply(x).asScala
-    def cm = implicitly[ClassTag[Tuple2[_, _]]].asInstanceOf[ClassTag[Tuple2[K2, V2]]]
-    new JavaPairDStream(dstream.flatMap(fn)(cm))(f.keyType(), f.valueType())
+    def fn = (x: T) => f.call(x).asScala
+    def cm: ClassTag[(K2, V2)] = fakeClassTag
+    new JavaPairDStream(dstream.flatMap(fn)(cm))(fakeClassTag[K2], fakeClassTag[V2])
   }
 
     /**
@@ -169,8 +170,8 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
    * of the RDD.
    */
   def mapPartitions[U](f: FlatMapFunction[java.util.Iterator[T], U]): JavaDStream[U] = {
-    def fn = (x: Iterator[T]) => asScalaIterator(f.apply(asJavaIterator(x)).iterator())
-    new JavaDStream(dstream.mapPartitions(fn)(f.elementType()))(f.elementType())
+    def fn = (x: Iterator[T]) => asScalaIterator(f.call(asJavaIterator(x)).iterator())
+    new JavaDStream(dstream.mapPartitions(fn)(fakeClassTag[U]))(fakeClassTag[U])
   }
 
   /**
@@ -178,10 +179,10 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
    * of this DStream. Applying mapPartitions() to an RDD applies a function to each partition
    * of the RDD.
    */
-  def mapPartitions[K2, V2](f: PairFlatMapFunction[java.util.Iterator[T], K2, V2])
+  def mapPartitionsToPair[K2, V2](f: PairFlatMapFunction[java.util.Iterator[T], K2, V2])
   : JavaPairDStream[K2, V2] = {
-    def fn = (x: Iterator[T]) => asScalaIterator(f.apply(asJavaIterator(x)).iterator())
-    new JavaPairDStream(dstream.mapPartitions(fn))(f.keyType(), f.valueType())
+    def fn = (x: Iterator[T]) => asScalaIterator(f.call(asJavaIterator(x)).iterator())
+    new JavaPairDStream(dstream.mapPartitions(fn))(fakeClassTag[K2], fakeClassTag[V2])
   }
 
   /**
@@ -283,8 +284,8 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
    * on each RDD of 'this' DStream.
    */
   def transform[U](transformFunc: JFunction[R, JavaRDD[U]]): JavaDStream[U] = {
-    implicit val cm: ClassTag[U] =
-      implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[U]]
+    implicit val cm: ClassTag[U] = fakeClassTag
+
     def scalaTransform (in: RDD[T]): RDD[U] =
       transformFunc.call(wrapRDD(in)).rdd
     dstream.transform(scalaTransform(_))
@@ -295,8 +296,8 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
    * on each RDD of 'this' DStream.
    */
   def transform[U](transformFunc: JFunction2[R, Time, JavaRDD[U]]): JavaDStream[U] = {
-    implicit val cm: ClassTag[U] =
-      implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[U]]
+    implicit val cm: ClassTag[U] = fakeClassTag
+
     def scalaTransform (in: RDD[T], time: Time): RDD[U] =
       transformFunc.call(wrapRDD(in), time).rdd
     dstream.transform(scalaTransform(_, _))
@@ -306,12 +307,11 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
    * Return a new DStream in which each RDD is generated by applying a function
    * on each RDD of 'this' DStream.
    */
-  def transform[K2, V2](transformFunc: JFunction[R, JavaPairRDD[K2, V2]]):
+  def transformToPair[K2, V2](transformFunc: JFunction[R, JavaPairRDD[K2, V2]]):
   JavaPairDStream[K2, V2] = {
-    implicit val cmk: ClassTag[K2] =
-      implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[K2]]
-    implicit val cmv: ClassTag[V2] =
-      implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[V2]]
+    implicit val cmk: ClassTag[K2] = fakeClassTag
+    implicit val cmv: ClassTag[V2] = fakeClassTag
+
     def scalaTransform (in: RDD[T]): RDD[(K2, V2)] =
       transformFunc.call(wrapRDD(in)).rdd
     dstream.transform(scalaTransform(_))
@@ -321,12 +321,11 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
    * Return a new DStream in which each RDD is generated by applying a function
    * on each RDD of 'this' DStream.
    */
-  def transform[K2, V2](transformFunc: JFunction2[R, Time, JavaPairRDD[K2, V2]]):
+  def transformToPair[K2, V2](transformFunc: JFunction2[R, Time, JavaPairRDD[K2, V2]]):
   JavaPairDStream[K2, V2] = {
-    implicit val cmk: ClassTag[K2] =
-      implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[K2]]
-    implicit val cmv: ClassTag[V2] =
-      implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[V2]]
+    implicit val cmk: ClassTag[K2] = fakeClassTag
+    implicit val cmv: ClassTag[V2] = fakeClassTag
+
     def scalaTransform (in: RDD[T], time: Time): RDD[(K2, V2)] =
       transformFunc.call(wrapRDD(in), time).rdd
     dstream.transform(scalaTransform(_, _))
@@ -340,10 +339,9 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
       other: JavaDStream[U],
       transformFunc: JFunction3[R, JavaRDD[U], Time, JavaRDD[W]]
     ): JavaDStream[W] = {
-    implicit val cmu: ClassTag[U] =
-      implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[U]]
-    implicit val cmv: ClassTag[W] =
-      implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[W]]
+    implicit val cmu: ClassTag[U] = fakeClassTag
+    implicit val cmv: ClassTag[W] = fakeClassTag
+
     def scalaTransform (inThis: RDD[T], inThat: RDD[U], time: Time): RDD[W] =
       transformFunc.call(wrapRDD(inThis), other.wrapRDD(inThat), time).rdd
     dstream.transformWith[U, W](other.dstream, scalaTransform(_, _, _))
@@ -353,16 +351,13 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
    * Return a new DStream in which each RDD is generated by applying a function
    * on each RDD of 'this' DStream and 'other' DStream.
    */
-  def transformWith[U, K2, V2](
+  def transformWithToPair[U, K2, V2](
       other: JavaDStream[U],
       transformFunc: JFunction3[R, JavaRDD[U], Time, JavaPairRDD[K2, V2]]
     ): JavaPairDStream[K2, V2] = {
-    implicit val cmu: ClassTag[U] =
-      implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[U]]
-    implicit val cmk2: ClassTag[K2] =
-      implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[K2]]
-    implicit val cmv2: ClassTag[V2] =
-      implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[V2]]
+    implicit val cmu: ClassTag[U] = fakeClassTag
+    implicit val cmk2: ClassTag[K2] = fakeClassTag
+    implicit val cmv2: ClassTag[V2] = fakeClassTag
     def scalaTransform (inThis: RDD[T], inThat: RDD[U], time: Time): RDD[(K2, V2)] =
       transformFunc.call(wrapRDD(inThis), other.wrapRDD(inThat), time).rdd
     dstream.transformWith[U, (K2, V2)](other.dstream, scalaTransform(_, _, _))
@@ -376,12 +371,10 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
       other: JavaPairDStream[K2, V2],
       transformFunc: JFunction3[R, JavaPairRDD[K2, V2], Time, JavaRDD[W]]
     ): JavaDStream[W] = {
-    implicit val cmk2: ClassTag[K2] =
-      implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[K2]]
-    implicit val cmv2: ClassTag[V2] =
-      implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[V2]]
-    implicit val cmw: ClassTag[W] =
-      implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[W]]
+    implicit val cmk2: ClassTag[K2] = fakeClassTag
+    implicit val cmv2: ClassTag[V2] = fakeClassTag
+    implicit val cmw: ClassTag[W] = fakeClassTag
+
     def scalaTransform (inThis: RDD[T], inThat: RDD[(K2, V2)], time: Time): RDD[W] =
       transformFunc.call(wrapRDD(inThis), other.wrapRDD(inThat), time).rdd
     dstream.transformWith[(K2, V2), W](other.dstream, scalaTransform(_, _, _))
@@ -391,18 +384,14 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
    * Return a new DStream in which each RDD is generated by applying a function
    * on each RDD of 'this' DStream and 'other' DStream.
    */
-  def transformWith[K2, V2, K3, V3](
+  def transformWithToPair[K2, V2, K3, V3](
       other: JavaPairDStream[K2, V2],
       transformFunc: JFunction3[R, JavaPairRDD[K2, V2], Time, JavaPairRDD[K3, V3]]
     ): JavaPairDStream[K3, V3] = {
-    implicit val cmk2: ClassTag[K2] =
-      implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[K2]]
-    implicit val cmv2: ClassTag[V2] =
-      implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[V2]]
-    implicit val cmk3: ClassTag[K3] =
-      implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[K3]]
-    implicit val cmv3: ClassTag[V3] =
-      implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[V3]]
+    implicit val cmk2: ClassTag[K2] = fakeClassTag
+    implicit val cmv2: ClassTag[V2] = fakeClassTag
+    implicit val cmk3: ClassTag[K3] = fakeClassTag
+    implicit val cmv3: ClassTag[V3] = fakeClassTag
     def scalaTransform (inThis: RDD[T], inThat: RDD[(K2, V2)], time: Time): RDD[(K3, V3)] =
       transformFunc.call(wrapRDD(inThis), other.wrapRDD(inThat), time).rdd
     dstream.transformWith[(K2, V2), (K3, V3)](other.dstream, scalaTransform(_, _, _))


Mime
View raw message