spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ma...@apache.org
Subject [2/9] git commit: Fixed bug in Java transformWith, added more Java testcases for transform and transformWith, added missing variations of Java join and cogroup, updated various Scala and Java API docs.
Date Sat, 26 Oct 2013 00:26:20 GMT
Fixed bug in Java transformWith, added more Java testcases for transform and transformWith, added missing variations of Java join and cogroup, updated various Scala and Java API docs.


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/72d2e1dd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/72d2e1dd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/72d2e1dd

Branch: refs/heads/master
Commit: 72d2e1dd777696640f64aaf92fecab64c6387df0
Parents: 0666498
Author: Tathagata Das <tathagata.das1565@gmail.com>
Authored: Tue Oct 22 23:35:51 2013 -0700
Committer: Tathagata Das <tathagata.das1565@gmail.com>
Committed: Tue Oct 22 23:35:51 2013 -0700

----------------------------------------------------------------------
 .../spark/api/java/function/Function3.java      |   2 +-
 .../api/java/function/WrappedFunction3.scala    |   4 +-
 .../org/apache/spark/streaming/DStream.scala    |  16 +-
 .../spark/streaming/PairDStreamFunctions.scala  |  98 ++++++---
 .../streaming/api/java/JavaDStreamLike.scala    |  64 +++---
 .../streaming/api/java/JavaPairDStream.scala    | 142 +++++++++---
 .../streaming/dstream/CoGroupedDStream.scala    |  58 -----
 .../apache/spark/streaming/JavaAPISuite.java    | 219 +++++++++++++++++--
 8 files changed, 424 insertions(+), 179 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/72d2e1dd/core/src/main/scala/org/apache/spark/api/java/function/Function3.java
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/java/function/Function3.java b/core/src/main/scala/org/apache/spark/api/java/function/Function3.java
index 530ee2e..2ce714c 100644
--- a/core/src/main/scala/org/apache/spark/api/java/function/Function3.java
+++ b/core/src/main/scala/org/apache/spark/api/java/function/Function3.java
@@ -24,7 +24,7 @@ import scala.runtime.AbstractFunction2;
 import java.io.Serializable;
 
 /**
- * A two-argument function that takes arguments of type T1 and T2 and returns an R.
+ * A three-argument function that takes arguments of type T1, T2 and T3 and returns an R.
  */
 public abstract class Function3<T1, T2, T3, R> extends WrappedFunction3<T1, T2, T3, R>
         implements Serializable {

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/72d2e1dd/core/src/main/scala/org/apache/spark/api/java/function/WrappedFunction3.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/java/function/WrappedFunction3.scala b/core/src/main/scala/org/apache/spark/api/java/function/WrappedFunction3.scala
index 8e8bbeb..d314dbd 100644
--- a/core/src/main/scala/org/apache/spark/api/java/function/WrappedFunction3.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/function/WrappedFunction3.scala
@@ -20,8 +20,8 @@ package org.apache.spark.api.java.function
 import scala.runtime.AbstractFunction3
 
 /**
- * Subclass of Function2 for ease of calling from Java. The main thing it does is re-expose the
- * apply() method as call() and declare that it can throw Exception (since AbstractFunction2.apply
+ * Subclass of Function3 for ease of calling from Java. The main thing it does is re-expose the
+ * apply() method as call() and declare that it can throw Exception (since AbstractFunction3.apply
  * isn't marked to allow that).
  */
 private[spark] abstract class WrappedFunction3[T1, T2, T3, R]

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/72d2e1dd/streaming/src/main/scala/org/apache/spark/streaming/DStream.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/DStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/DStream.scala
index ee351da..38e3479 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/DStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/DStream.scala
@@ -479,7 +479,7 @@ abstract class DStream[T: ClassManifest] (
 
   /**
    * Apply a function to each RDD in this DStream. This is an output operator, so
-   * this DStream will be registered as an output stream and therefore materialized.
+   * 'this' DStream will be registered as an output stream and therefore materialized.
    */
   def foreach(foreachFunc: RDD[T] => Unit) {
     this.foreach((r: RDD[T], t: Time) => foreachFunc(r))
@@ -487,7 +487,7 @@ abstract class DStream[T: ClassManifest] (
 
   /**
    * Apply a function to each RDD in this DStream. This is an output operator, so
-   * this DStream will be registered as an output stream and therefore materialized.
+   * 'this' DStream will be registered as an output stream and therefore materialized.
    */
   def foreach(foreachFunc: (RDD[T], Time) => Unit) {
     val newStream = new ForEachDStream(this, context.sparkContext.clean(foreachFunc))
@@ -497,7 +497,7 @@ abstract class DStream[T: ClassManifest] (
 
   /**
    * Return a new DStream in which each RDD is generated by applying a function
-   * on each RDD of this DStream.
+   * on each RDD of 'this' DStream.
    */
   def transform[U: ClassManifest](transformFunc: RDD[T] => RDD[U]): DStream[U] = {
     transform((r: RDD[T], t: Time) => context.sparkContext.clean(transformFunc(r)))
@@ -505,7 +505,7 @@ abstract class DStream[T: ClassManifest] (
 
   /**
    * Return a new DStream in which each RDD is generated by applying a function
-   * on each RDD of this DStream.
+   * on each RDD of 'this' DStream.
    */
   def transform[U: ClassManifest](transformFunc: (RDD[T], Time) => RDD[U]): DStream[U] = {
     //new TransformedDStream(this, context.sparkContext.clean(transformFunc))
@@ -518,8 +518,8 @@ abstract class DStream[T: ClassManifest] (
   }
 
   /**
-   * Return a new DStream in which each RDD is generated by applying a function on RDDs
-   * of DStreams stream1 and stream2.
+   * Return a new DStream in which each RDD is generated by applying a function
+   * on each RDD of 'this' DStream and 'other' DStream.
    */
   def transformWith[U: ClassManifest, V: ClassManifest](
       other: DStream[U], transformFunc: (RDD[T], RDD[U]) => RDD[V]
@@ -529,8 +529,8 @@ abstract class DStream[T: ClassManifest] (
   }
 
   /**
-   * Return a new DStream in which each RDD is generated by applying a function on RDDs
-   * of DStreams stream1 and stream2.
+   * Return a new DStream in which each RDD is generated by applying a function
+   * on each RDD of 'this' DStream and 'other' DStream.
    */
   def transformWith[U: ClassManifest, V: ClassManifest](
       other: DStream[U], transformFunc: (RDD[T], RDD[U], Time) => RDD[V]

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/72d2e1dd/streaming/src/main/scala/org/apache/spark/streaming/PairDStreamFunctions.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/PairDStreamFunctions.scala b/streaming/src/main/scala/org/apache/spark/streaming/PairDStreamFunctions.scala
index c319433..8c12fd1 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/PairDStreamFunctions.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/PairDStreamFunctions.scala
@@ -19,7 +19,7 @@ package org.apache.spark.streaming
 
 import org.apache.spark.streaming.StreamingContext._
 import org.apache.spark.streaming.dstream.{ReducedWindowedDStream, StateDStream}
-import org.apache.spark.streaming.dstream.{CoGroupedDStream, ShuffledDStream}
+import org.apache.spark.streaming.dstream.{ShuffledDStream}
 import org.apache.spark.streaming.dstream.{MapValuedDStream, FlatMapValuedDStream}
 
 import org.apache.spark.{Partitioner, HashPartitioner}
@@ -359,7 +359,7 @@ extends Serializable {
   }
 
   /**
-   * Create a new "state" DStream where the state for each key is updated by applying
+   * Return a new "state" DStream where the state for each key is updated by applying
    * the given function on the previous state of the key and the new values of the key.
    * [[org.apache.spark.Partitioner]] is used to control the partitioning of each RDD.
    * @param updateFunc State update function. If `this` function returns None, then
@@ -398,11 +398,18 @@ extends Serializable {
      new StateDStream(self, ssc.sc.clean(updateFunc), partitioner, rememberPartitioner)
   }
 
-
+  /**
+   * Return a new DStream by applying a map function to the value of each key-value pairs in
+   * 'this' DStream without changing the key.
+   */
   def mapValues[U: ClassManifest](mapValuesFunc: V => U): DStream[(K, U)] = {
     new MapValuedDStream[K, V, U](self, mapValuesFunc)
   }
 
+  /**
+   * Return a new DStream by applying a flatmap function to the value of each key-value pairs in
+   * 'this' DStream without changing the key.
+   */
   def flatMapValues[U: ClassManifest](
       flatMapValuesFunc: V => TraversableOnce[U]
     ): DStream[(K, U)] = {
@@ -410,9 +417,8 @@ extends Serializable {
   }
 
   /**
-   * Cogroup `this` DStream with `other` DStream. For each key k in corresponding RDDs of `this`
-   * or `other` DStreams, the generated RDD will contains a tuple with the list of values for that
-   * key in both RDDs. HashPartitioner is used to partition each generated RDD into default number
+   * Return a new DStream by applying 'cogroup' between RDDs of `this` DStream and `other` DStream.
+   * Hash partitioning is used to generate the RDDs with Spark's default number
    * of partitions.
    */
   def cogroup[W: ClassManifest](other: DStream[(K, W)]): DStream[(K, (Seq[V], Seq[W]))] = {
@@ -420,31 +426,29 @@ extends Serializable {
   }
 
   /**
-   * Cogroup `this` DStream with `other` DStream using a partitioner. For each key k in corresponding RDDs of `this`
-   * or `other` DStreams, the generated RDD will contains a tuple with the list of values for that
-   * key in both RDDs. Partitioner is used to partition each generated RDD.
+   * Return a new DStream by applying 'cogroup' between RDDs of `this` DStream and `other` DStream.
+   * Hash partitioning is used to generate the RDDs with `numPartitions` partitions.
+   */
+  def cogroup[W: ClassManifest](other: DStream[(K, W)], numPartitions: Int): DStream[(K, (Seq[V], Seq[W]))] = {
+    cogroup(other, defaultPartitioner(numPartitions))
+  }
+
+  /**
+   * Return a new DStream by applying 'cogroup' between RDDs of `this` DStream and `other` DStream.
+   * The supplied [[org.apache.spark.Partitioner]] is used to partition the generated RDDs.
    */
   def cogroup[W: ClassManifest](
       other: DStream[(K, W)],
       partitioner: Partitioner
     ): DStream[(K, (Seq[V], Seq[W]))] = {
-
-    val cgd = new CoGroupedDStream[K](
-      Seq(self.asInstanceOf[DStream[(K, _)]], other.asInstanceOf[DStream[(K, _)]]),
-      partitioner
-    )
-    val pdfs = new PairDStreamFunctions[K, Seq[Seq[_]]](cgd)(
-      classManifest[K],
-      Manifests.seqSeqManifest
+    self.transformWith(
+      other,
+      (rdd1: RDD[(K, V)], rdd2: RDD[(K, W)]) => rdd1.cogroup(rdd2, partitioner)
     )
-    pdfs.mapValues {
-      case Seq(vs, ws) =>
-        (vs.asInstanceOf[Seq[V]], ws.asInstanceOf[Seq[W]])
-    }
   }
 
   /**
-   * Return new DStream by applying 'join' between RDDs of `this` DStream and `other` DStream..
+   * Return a new DStream by applying 'join' between RDDs of `this` DStream and `other` DStream.
    * Hash partitioning is used to generate the RDDs with Spark's default number of partitions.
    */
   def join[W: ClassManifest](other: DStream[(K, W)]): DStream[(K, (V, W))] = {
@@ -452,7 +456,15 @@ extends Serializable {
   }
 
   /**
-   * Return new DStream by applying 'join' between RDDs of `this` DStream and `other` DStream.
+   * Return a new DStream by applying 'join' between RDDs of `this` DStream and `other` DStream.
+   * Hash partitioning is used to generate the RDDs with `numPartitions` partitions.
+   */
+  def join[W: ClassManifest](other: DStream[(K, W)], numPartitions: Int): DStream[(K, (V, W))] = {
+    join[W](other, defaultPartitioner(numPartitions))
+  }
+
+  /**
+   * Return a new DStream by applying 'join' between RDDs of `this` DStream and `other` DStream.
    * The supplied [[org.apache.spark.Partitioner]] is used to control the partitioning of each RDD.
    */
   def join[W: ClassManifest](
@@ -466,7 +478,7 @@ extends Serializable {
   }
 
   /**
-   * Return new DStream by applying 'left outer join' between RDDs of `this` DStream and
+   * Return a new DStream by applying 'left outer join' between RDDs of `this` DStream and
    * `other` DStream. Hash partitioning is used to generate the RDDs with Spark's default
    * number of partitions.
    */
@@ -475,7 +487,19 @@ extends Serializable {
   }
 
   /**
-   * Return new DStream by applying 'left outer join' between RDDs of `this` DStream and
+   * Return a new DStream by applying 'left outer join' between RDDs of `this` DStream and
+   * `other` DStream. Hash partitioning is used to generate the RDDs with `numPartitions`
+   * partitions.
+   */
+  def leftOuterJoin[W: ClassManifest](
+      other: DStream[(K, W)],
+      numPartitions: Int
+    ): DStream[(K, (V, Option[W]))] = {
+    leftOuterJoin[W](other, defaultPartitioner(numPartitions))
+  }
+
+  /**
+   * Return a new DStream by applying 'left outer join' between RDDs of `this` DStream and
    * `other` DStream. The supplied [[org.apache.spark.Partitioner]] is used to control
    * the partitioning of each RDD.
    */
@@ -490,7 +514,7 @@ extends Serializable {
   }
 
   /**
-   * Return new DStream by applying 'right outer join' between RDDs of `this` DStream and
+   * Return a new DStream by applying 'right outer join' between RDDs of `this` DStream and
    * `other` DStream. Hash partitioning is used to generate the RDDs with Spark's default
    * number of partitions.
    */
@@ -499,7 +523,19 @@ extends Serializable {
   }
 
   /**
-   * Return new DStream by applying 'right outer join' between RDDs of `this` DStream and
+   * Return a new DStream by applying 'right outer join' between RDDs of `this` DStream and
+   * `other` DStream. Hash partitioning is used to generate the RDDs with `numPartitions`
+   * partitions.
+   */
+  def rightOuterJoin[W: ClassManifest](
+      other: DStream[(K, W)],
+      numPartitions: Int
+    ): DStream[(K, (Option[V], W))] = {
+    rightOuterJoin[W](other, defaultPartitioner(numPartitions))
+  }
+
+  /**
+   * Return a new DStream by applying 'right outer join' between RDDs of `this` DStream and
    * `other` DStream. The supplied [[org.apache.spark.Partitioner]] is used to control
    * the partitioning of each RDD.
    */
@@ -514,8 +550,8 @@ extends Serializable {
   }
 
   /**
-   * Save each RDD in `this` DStream as a Hadoop file. The file name at each batch interval is generated
-   * based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix"
+   * Save each RDD in `this` DStream as a Hadoop file. The file name at each batch interval
+   * is generated based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix"
    */
   def saveAsHadoopFiles[F <: OutputFormat[K, V]](
       prefix: String,
@@ -525,8 +561,8 @@ extends Serializable {
   }
 
   /**
-   * Save each RDD in `this` DStream as a Hadoop file. The file name at each batch interval is generated
-   * based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix"
+   * Save each RDD in `this` DStream as a Hadoop file. The file name at each batch interval
+   * is generated based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix"
    */
   def saveAsHadoopFiles(
       prefix: String,

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/72d2e1dd/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala
index 1110d77..09189ea 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala
@@ -121,10 +121,12 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
    * this DStream. Applying glom() to an RDD coalesces all elements within each partition into
    * an array.
    */
-  def glom(): JavaDStream[JList[T]] =
+  def glom(): JavaDStream[JList[T]] = {
     new JavaDStream(dstream.glom().map(x => new java.util.ArrayList[T](x.toSeq)))
+  }
+
 
-  /** Return the StreamingContext associated with this DStream */
+  /** Return the [[org.apache.spark.streaming.StreamingContext]] associated with this DStream */
   def context(): StreamingContext = dstream.context()
 
   /** Return a new DStream by applying a function to all elements of this DStream. */
@@ -239,7 +241,7 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
 
   /**
    * Apply a function to each RDD in this DStream. This is an output operator, so
-   * this DStream will be registered as an output stream and therefore materialized.
+   * 'this' DStream will be registered as an output stream and therefore materialized.
    */
   def foreach(foreachFunc: JFunction[R, Void]) {
     dstream.foreach(rdd => foreachFunc.call(wrapRDD(rdd)))
@@ -247,7 +249,7 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
 
   /**
    * Apply a function to each RDD in this DStream. This is an output operator, so
-   * this DStream will be registered as an output stream and therefore materialized.
+   * 'this' DStream will be registered as an output stream and therefore materialized.
    */
   def foreach(foreachFunc: JFunction2[R, Time, Void]) {
     dstream.foreach((rdd, time) => foreachFunc.call(wrapRDD(rdd), time))
@@ -255,7 +257,7 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
 
   /**
    * Return a new DStream in which each RDD is generated by applying a function
-   * on each RDD of this DStream.
+   * on each RDD of 'this' DStream.
    */
   def transform[U](transformFunc: JFunction[R, JavaRDD[U]]): JavaDStream[U] = {
     implicit val cm: ClassManifest[U] =
@@ -267,7 +269,7 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
 
   /**
    * Return a new DStream in which each RDD is generated by applying a function
-   * on each RDD of this DStream.
+   * on each RDD of 'this' DStream.
    */
   def transform[U](transformFunc: JFunction2[R, Time, JavaRDD[U]]): JavaDStream[U] = {
     implicit val cm: ClassManifest[U] =
@@ -279,7 +281,7 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
 
   /**
    * Return a new DStream in which each RDD is generated by applying a function
-   * on each RDD of this DStream.
+   * on each RDD of 'this' DStream.
    */
   def transform[K2, V2](transformFunc: JFunction[R, JavaPairRDD[K2, V2]]):
   JavaPairDStream[K2, V2] = {
@@ -294,7 +296,7 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
 
   /**
    * Return a new DStream in which each RDD is generated by applying a function
-   * on each RDD of this DStream.
+   * on each RDD of 'this' DStream.
    */
   def transform[K2, V2](transformFunc: JFunction2[R, Time, JavaPairRDD[K2, V2]]):
   JavaPairDStream[K2, V2] = {
@@ -309,7 +311,7 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
 
   /**
    * Return a new DStream in which each RDD is generated by applying a function
-   * on each RDD of this and other DStreams.
+   * on each RDD of 'this' DStream and 'other' DStream.
    */
   def transformWith[U, W](
       other: JavaDStream[U],
@@ -326,7 +328,7 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
 
   /**
    * Return a new DStream in which each RDD is generated by applying a function
-   * on each RDD of this and other DStreams.
+   * on each RDD of 'this' DStream and 'other' DStream.
    */
   def transformWith[U, K2, V2](
       other: JavaDStream[U],
@@ -345,42 +347,42 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
 
   /**
    * Return a new DStream in which each RDD is generated by applying a function
-   * on each RDD of this and other DStreams.
+   * on each RDD of 'this' DStream and 'other' DStream.
    */
-  def transformWith[K, V, W](
-      other: JavaPairDStream[K, V],
-      transformFunc: JFunction3[R, JavaPairRDD[K, V], Time, JavaRDD[W]]
+  def transformWith[K2, V2, W](
+      other: JavaPairDStream[K2, V2],
+      transformFunc: JFunction3[R, JavaPairRDD[K2, V2], Time, JavaRDD[W]]
     ): JavaDStream[W] = {
-    implicit val cmk: ClassManifest[K] =
-      implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[K]]
-    implicit val cmv: ClassManifest[V] =
-      implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[V]]
+    implicit val cmk2: ClassManifest[K2] =
+      implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[K2]]
+    implicit val cmv2: ClassManifest[V2] =
+      implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[V2]]
     implicit val cmw: ClassManifest[W] =
       implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]]
-    def scalaTransform (inThis: RDD[T], inThat: RDD[(K, V)], time: Time): RDD[W] =
+    def scalaTransform (inThis: RDD[T], inThat: RDD[(K2, V2)], time: Time): RDD[W] =
       transformFunc.call(wrapRDD(inThis), other.wrapRDD(inThat), time).rdd
-    dstream.transformWith[(K, V), W](other.dstream, scalaTransform(_, _, _))
+    dstream.transformWith[(K2, V2), W](other.dstream, scalaTransform(_, _, _))
   }
 
   /**
    * Return a new DStream in which each RDD is generated by applying a function
-   * on each RDD of this and other DStreams.
+   * on each RDD of 'this' DStream and 'other' DStream.
    */
-  def transformWith[K, V, K2, V2](
-      other: JavaPairDStream[K, V],
-      transformFunc: JFunction3[R, JavaPairRDD[K, V], Time, JavaPairRDD[K2, V2]]
-    ): JavaPairDStream[K2, V2] = {
-    implicit val cmk: ClassManifest[K] =
-      implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[K]]
-    implicit val cmv: ClassManifest[V] =
-      implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[V]]
+  def transformWith[K2, V2, K3, V3](
+      other: JavaPairDStream[K2, V2],
+      transformFunc: JFunction3[R, JavaPairRDD[K2, V2], Time, JavaPairRDD[K3, V3]]
+    ): JavaPairDStream[K3, V3] = {
     implicit val cmk2: ClassManifest[K2] =
       implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[K2]]
     implicit val cmv2: ClassManifest[V2] =
       implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[V2]]
-    def scalaTransform (inThis: RDD[T], inThat: RDD[(K, V)], time: Time): RDD[(K2, V2)] =
+    implicit val cmk3: ClassManifest[K3] =
+      implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[K3]]
+    implicit val cmv3: ClassManifest[V3] =
+      implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[V3]]
+    def scalaTransform (inThis: RDD[T], inThat: RDD[(K2, V2)], time: Time): RDD[(K3, V3)] =
       transformFunc.call(wrapRDD(inThis), other.wrapRDD(inThat), time).rdd
-    dstream.transformWith[(K, V), (K2, V2)](other.dstream, scalaTransform(_, _, _))
+    dstream.transformWith[(K2, V2), (K3, V3)](other.dstream, scalaTransform(_, _, _))
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/72d2e1dd/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala
index 821db46..309c0fa 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala
@@ -24,7 +24,7 @@ import scala.collection.JavaConversions._
 
 import org.apache.spark.streaming._
 import org.apache.spark.streaming.StreamingContext._
-import org.apache.spark.api.java.function.{Function => JFunction, Function2 => JFunction2}
+import org.apache.spark.api.java.function.{Function => JFunction, Function2 => JFunction2, Function3 => JFunction3}
 import org.apache.spark.Partitioner
 import org.apache.hadoop.mapred.{JobConf, OutputFormat}
 import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat}
@@ -148,7 +148,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
 
   /**
    * Combine elements of each key in DStream's RDDs using custom function. This is similar to the
-   * combineByKey for RDDs. Please refer to combineByKey in [[PairRDDFunctions]] for more
+   * combineByKey for RDDs. Please refer to combineByKey in [[org.apache.spark.PairRDDFunctions]] for more
    * information.
    */
   def combineByKey[C](createCombiner: JFunction[V, C],
@@ -413,7 +413,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
   }
 
   /**
-   * Create a new "state" DStream where the state for each key is updated by applying
+   * Return a new "state" DStream where the state for each key is updated by applying
    * the given function on the previous state of the key and the new values of each key.
    * Hash partitioning is used to generate the RDDs with Spark's default number of partitions.
    * @param updateFunc State update function. If `this` function returns None, then
@@ -428,7 +428,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
   }
 
   /**
-   * Create a new "state" DStream where the state for each key is updated by applying
+   * Return a new "state" DStream where the state for each key is updated by applying
    * the given function on the previous state of the key and the new values of each key.
    * Hash partitioning is used to generate the RDDs with `numPartitions` partitions.
    * @param updateFunc State update function. If `this` function returns None, then
@@ -436,15 +436,17 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
    * @param numPartitions Number of partitions of each RDD in the new DStream.
    * @tparam S State type
    */
-  def updateStateByKey[S: ClassManifest](
+  def updateStateByKey[S](
       updateFunc: JFunction2[JList[V], Optional[S], Optional[S]],
       numPartitions: Int)
   : JavaPairDStream[K, S] = {
+    implicit val cm: ClassManifest[S] =
+      implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[S]]
     dstream.updateStateByKey(convertUpdateStateFunction(updateFunc), numPartitions)
   }
 
   /**
-   * Create a new "state" DStream where the state for each key is updated by applying
+   * Return a new "state" DStream where the state for each key is updated by applying
    * the given function on the previous state of the key and the new values of the key.
    * [[org.apache.spark.Partitioner]] is used to control the partitioning of each RDD.
    * @param updateFunc State update function. If `this` function returns None, then
@@ -452,19 +454,30 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
    * @param partitioner Partitioner for controlling the partitioning of each RDD in the new DStream.
    * @tparam S State type
    */
-  def updateStateByKey[S: ClassManifest](
+  def updateStateByKey[S](
       updateFunc: JFunction2[JList[V], Optional[S], Optional[S]],
       partitioner: Partitioner
   ): JavaPairDStream[K, S] = {
+    implicit val cm: ClassManifest[S] =
+      implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[S]]
     dstream.updateStateByKey(convertUpdateStateFunction(updateFunc), partitioner)
   }
 
+
+  /**
+   * Return a new DStream by applying a map function to the value of each key-value pairs in
+   * 'this' DStream without changing the key.
+   */
   def mapValues[U](f: JFunction[V, U]): JavaPairDStream[K, U] = {
     implicit val cm: ClassManifest[U] =
       implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[U]]
     dstream.mapValues(f)
   }
 
+  /**
+   * Return a new DStream by applying a flatmap function to the value of each key-value pairs in
+   * 'this' DStream without changing the key.
+   */
   def flatMapValues[U](f: JFunction[V, java.lang.Iterable[U]]): JavaPairDStream[K, U] = {
     import scala.collection.JavaConverters._
     def fn = (x: V) => f.apply(x).asScala
@@ -474,9 +487,8 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
   }
 
   /**
-   * Cogroup `this` DStream with `other` DStream. For each key k in corresponding RDDs of `this`
-   * or `other` DStreams, the generated RDD will contains a tuple with the list of values for that
-   * key in both RDDs. HashPartitioner is used to partition each generated RDD into default number
+   * Return a new DStream by applying 'cogroup' between RDDs of `this` DStream and `other` DStream.
+   * Hash partitioning is used to generate the RDDs with Spark's default number
    * of partitions.
    */
   def cogroup[W](other: JavaPairDStream[K, W]): JavaPairDStream[K, (JList[V], JList[W])] = {
@@ -486,20 +498,35 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
   }
 
   /**
-   * Cogroup `this` DStream with `other` DStream. For each key k in corresponding RDDs of `this`
-   * or `other` DStreams, the generated RDD will contains a tuple with the list of values for that
-   * key in both RDDs. Partitioner is used to partition each generated RDD.
+   * Return a new DStream by applying 'cogroup' between RDDs of `this` DStream and `other` DStream.
+   * Hash partitioning is used to generate the RDDs with `numPartitions` partitions.
    */
-  def cogroup[W](other: JavaPairDStream[K, W], partitioner: Partitioner)
-  : JavaPairDStream[K, (JList[V], JList[W])] = {
+  def cogroup[W](
+      other: JavaPairDStream[K, W],
+      numPartitions: Int
+    ): JavaPairDStream[K, (JList[V], JList[W])] = {
+    implicit val cm: ClassManifest[W] =
+      implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]]
+    dstream.cogroup(other.dstream, numPartitions)
+           .mapValues(t => (seqAsJavaList(t._1), seqAsJavaList((t._2))))
+  }
+
+  /**
+   * Return a new DStream by applying 'cogroup' between RDDs of `this` DStream and `other` DStream.
+   * Hash partitioning is used to generate the RDDs with `numPartitions` partitions.
+   */
+  def cogroup[W](
+      other: JavaPairDStream[K, W],
+      partitioner: Partitioner
+    ): JavaPairDStream[K, (JList[V], JList[W])] = {
     implicit val cm: ClassManifest[W] =
       implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]]
     dstream.cogroup(other.dstream, partitioner)
-        .mapValues(t => (seqAsJavaList(t._1), seqAsJavaList((t._2))))
+           .mapValues(t => (seqAsJavaList(t._1), seqAsJavaList((t._2))))
   }
 
   /**
-   * Return new DStream by applying 'join' between RDDs of `this` DStream and `other` DStream..
+   * Return a new DStream by applying 'join' between RDDs of `this` DStream and `other` DStream.
    * Hash partitioning is used to generate the RDDs with Spark's default number of partitions.
    */
   def join[W](other: JavaPairDStream[K, W]): JavaPairDStream[K, (V, W)] = {
@@ -509,19 +536,32 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
   }
 
   /**
-   * Return new DStream by applying 'join' between RDDs of `this` DStream and `other` DStream.
+   * Return a new DStream by applying 'join' between RDDs of `this` DStream and `other` DStream.
+   * Hash partitioning is used to generate the RDDs with `numPartitions` partitions.
+   */
+  def join[W](other: JavaPairDStream[K, W], numPartitions: Int): JavaPairDStream[K, (V, W)] = {
+    implicit val cm: ClassManifest[W] =
+      implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]]
+    dstream.join(other.dstream, numPartitions)
+  }
+
+  /**
+   * Return a new DStream by applying 'join' between RDDs of `this` DStream and `other` DStream.
    * The supplied [[org.apache.spark.Partitioner]] is used to control the partitioning of each RDD.
    */
-  def join[W](other: JavaPairDStream[K, W], partitioner: Partitioner)
-  : JavaPairDStream[K, (V, W)] = {
+  def join[W](
+      other: JavaPairDStream[K, W],
+      partitioner: Partitioner
+    ): JavaPairDStream[K, (V, W)] = {
     implicit val cm: ClassManifest[W] =
       implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]]
     dstream.join(other.dstream, partitioner)
   }
 
   /**
-   * Return new DStream by applying 'left outer join' between RDDs of `this` DStream and `other` DStream..
-   * Hash partitioning is used to generate the RDDs with Spark's default number of partitions.
+   * Return a new DStream by applying 'left outer join' between RDDs of `this` DStream and
+   * `other` DStream. Hash partitioning is used to generate the RDDs with Spark's default
+   * number of partitions.
    */
   def leftOuterJoin[W](other: JavaPairDStream[K, W]): JavaPairDStream[K, (V, Optional[W])] = {
     implicit val cm: ClassManifest[W] =
@@ -531,11 +571,28 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
   }
 
   /**
-   * Return new DStream by applying 'join' between RDDs of `this` DStream and `other` DStream.
+   * Return a new DStream by applying 'left outer join' between RDDs of `this` DStream and
+   * `other` DStream. Hash partitioning is used to generate the RDDs with `numPartitions`
+   * partitions.
+   */
+  def leftOuterJoin[W](
+      other: JavaPairDStream[K, W],
+      numPartitions: Int
+    ): JavaPairDStream[K, (V, Optional[W])] = {
+    implicit val cm: ClassManifest[W] =
+      implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]]
+    val joinResult = dstream.leftOuterJoin(other.dstream, numPartitions)
+    joinResult.mapValues{case (v, w) => (v, JavaUtils.optionToOptional(w))}
+  }
+
+  /**
+   * Return a new DStream by applying 'join' between RDDs of `this` DStream and `other` DStream.
    * The supplied [[org.apache.spark.Partitioner]] is used to control the partitioning of each RDD.
    */
-  def leftOuterJoin[W](other: JavaPairDStream[K, W], partitioner: Partitioner)
-  : JavaPairDStream[K, (V, Optional[W])] = {
+  def leftOuterJoin[W](
+      other: JavaPairDStream[K, W],
+      partitioner: Partitioner
+    ): JavaPairDStream[K, (V, Optional[W])] = {
     implicit val cm: ClassManifest[W] =
       implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]]
     val joinResult = dstream.leftOuterJoin(other.dstream, partitioner)
@@ -543,8 +600,9 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
   }
 
   /**
-   * Return new DStream by applying 'left outer join' between RDDs of `this` DStream and `other` DStream..
-   * Hash partitioning is used to generate the RDDs with Spark's default number of partitions.
+   * Return a new DStream by applying 'right outer join' between RDDs of `this` DStream and
+   * `other` DStream. Hash partitioning is used to generate the RDDs with Spark's default
+   * number of partitions.
    */
   def rightOuterJoin[W](other: JavaPairDStream[K, W]): JavaPairDStream[K, (Optional[V], W)] = {
     implicit val cm: ClassManifest[W] =
@@ -554,11 +612,29 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
   }
 
   /**
-   * Return new DStream by applying 'join' between RDDs of `this` DStream and `other` DStream.
-   * The supplied [[org.apache.spark.Partitioner]] is used to control the partitioning of each RDD.
+   * Return a new DStream by applying 'right outer join' between RDDs of `this` DStream and
+   * `other` DStream. Hash partitioning is used to generate the RDDs with `numPartitions`
+   * partitions.
+   */
+  def rightOuterJoin[W](
+      other: JavaPairDStream[K, W],
+      numPartitions: Int
+    ): JavaPairDStream[K, (Optional[V], W)] = {
+    implicit val cm: ClassManifest[W] =
+      implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]]
+    val joinResult = dstream.rightOuterJoin(other.dstream, numPartitions)
+    joinResult.mapValues{case (v, w) => (JavaUtils.optionToOptional(v), w)}
+  }
+
+  /**
+   * Return a new DStream by applying 'right outer join' between RDDs of `this` DStream and
+   * `other` DStream. The supplied [[org.apache.spark.Partitioner]] is used to control
+   * the partitioning of each RDD.
    */
-  def rightOuterJoin[W](other: JavaPairDStream[K, W], partitioner: Partitioner)
-  : JavaPairDStream[K, (Optional[V], W)] = {
+  def rightOuterJoin[W](
+      other: JavaPairDStream[K, W],
+      partitioner: Partitioner
+    ): JavaPairDStream[K, (Optional[V], W)] = {
     implicit val cm: ClassManifest[W] =
       implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]]
     val joinResult = dstream.rightOuterJoin(other.dstream, partitioner)
@@ -640,9 +716,9 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
 }
 
 object JavaPairDStream {
-  implicit def fromPairDStream[K: ClassManifest, V: ClassManifest](dstream: DStream[(K, V)])
-  :JavaPairDStream[K, V] =
+  implicit def fromPairDStream[K: ClassManifest, V: ClassManifest](dstream: DStream[(K, V)]) = {
     new JavaPairDStream[K, V](dstream)
+  }
 
   def fromJavaDStream[K, V](dstream: JavaDStream[(K, V)]): JavaPairDStream[K, V] = {
     implicit val cmk: ClassManifest[K] =

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/72d2e1dd/streaming/src/main/scala/org/apache/spark/streaming/dstream/CoGroupedDStream.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/CoGroupedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/CoGroupedDStream.scala
deleted file mode 100644
index 4eddc75..0000000
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/CoGroupedDStream.scala
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming.dstream
-
-import org.apache.spark.Partitioner
-import org.apache.spark.rdd.RDD
-import org.apache.spark.rdd.CoGroupedRDD
-import org.apache.spark.streaming.{Time, DStream, Duration}
-
-private[streaming]
-class CoGroupedDStream[K : ClassManifest](
-    parents: Seq[DStream[(K, _)]],
-    partitioner: Partitioner
-  ) extends DStream[(K, Seq[Seq[_]])](parents.head.ssc) {
-
-  if (parents.length == 0) {
-    throw new IllegalArgumentException("Empty array of parents")
-  }
-
-  if (parents.map(_.ssc).distinct.size > 1) {
-    throw new IllegalArgumentException("Array of parents have different StreamingContexts")
-  }
-
-  if (parents.map(_.slideDuration).distinct.size > 1) {
-    throw new IllegalArgumentException("Array of parents have different slide times")
-  }
-
-  override def dependencies = parents.toList
-
-  override def slideDuration: Duration = parents.head.slideDuration
-
-  override def compute(validTime: Time): Option[RDD[(K, Seq[Seq[_]])]] = {
-    val part = partitioner
-    val rdds = parents.flatMap(_.getOrCompute(validTime))
-    if (rdds.size > 0) {
-      val q = new CoGroupedRDD[K](rdds, part)
-      Some(q)
-    } else {
-      None
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/72d2e1dd/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
----------------------------------------------------------------------
diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
index 9f885f0..16622a3 100644
--- a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
+++ b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
@@ -320,17 +320,20 @@ public class JavaAPISuite implements Serializable {
         Arrays.asList(9,10,11));
 
     JavaDStream<Integer> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
-    JavaDStream<Integer> transformed =
-        stream.transform(new Function<JavaRDD<Integer>, JavaRDD<Integer>>() {
-      @Override
-      public JavaRDD<Integer> call(JavaRDD<Integer> in) throws Exception {
-        return in.map(new Function<Integer, Integer>() {
-          @Override
-          public Integer call(Integer i) throws Exception {
-            return i + 2;
-          }
-        });
-      }});
+    JavaDStream<Integer> transformed = stream.transform(
+      new Function<JavaRDD<Integer>, JavaRDD<Integer>>() {
+        @Override
+        public JavaRDD<Integer> call(JavaRDD<Integer> in) throws Exception {
+          return in.map(new Function<Integer, Integer>() {
+            @Override
+            public Integer call(Integer i) throws Exception {
+              return i + 2;
+            }
+          });
+        }
+      }
+    );
+
     JavaTestUtils.attachTestOutputStream(transformed);
     List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 3, 3);
 
@@ -338,6 +341,84 @@ public class JavaAPISuite implements Serializable {
   }
 
   @Test
+  public void testVariousTransform() {
+    // tests whether all variations of transform can be called from Java
+
+    List<List<Integer>> inputData = Arrays.asList(Arrays.asList(1));
+    JavaDStream<Integer> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
+
+    List<List<Tuple2<String, Integer>>> pairInputData =
+        Arrays.asList(Arrays.asList(new Tuple2<String, Integer>("x", 1)));
+    JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(
+        JavaTestUtils.attachTestInputStream(ssc, pairInputData, 1));
+
+    JavaDStream<Integer> transformed1 = stream.transform(
+        new Function<JavaRDD<Integer>, JavaRDD<Integer>>() {
+          @Override public JavaRDD<Integer> call(JavaRDD<Integer> in) throws Exception {
+            return null;
+          }
+        }
+    );
+
+    JavaDStream<Integer> transformed2 = stream.transform(
+      new Function2<JavaRDD<Integer>, Time, JavaRDD<Integer>>() {
+        @Override public JavaRDD<Integer> call(JavaRDD<Integer> in, Time time) throws Exception {
+          return null;
+        }
+      }
+    );
+
+    JavaPairDStream<String, Integer> transformed3 = stream.transform(
+        new Function<JavaRDD<Integer>, JavaPairRDD<String, Integer>>() {
+          @Override public JavaPairRDD<String, Integer> call(JavaRDD<Integer> in) throws Exception {
+            return null;
+          }
+        }
+    );
+
+    JavaPairDStream<String, Integer> transformed4 = stream.transform(
+        new Function2<JavaRDD<Integer>, Time, JavaPairRDD<String, Integer>>() {
+          @Override public JavaPairRDD<String, Integer> call(JavaRDD<Integer> in, Time time) throws Exception {
+            return null;
+          }
+        }
+    );
+
+    JavaDStream<Integer> pairTransformed1 = pairStream.transform(
+        new Function<JavaPairRDD<String, Integer>, JavaRDD<Integer>>() {
+          @Override public JavaRDD<Integer> call(JavaPairRDD<String, Integer> in) throws Exception {
+            return null;
+          }
+        }
+    );
+
+    JavaDStream<Integer> pairTransformed2 = pairStream.transform(
+        new Function2<JavaPairRDD<String, Integer>, Time, JavaRDD<Integer>>() {
+          @Override public JavaRDD<Integer> call(JavaPairRDD<String, Integer> in, Time time) throws Exception {
+            return null;
+          }
+        }
+    );
+
+    JavaPairDStream<String, String> pairTransformed3 = pairStream.transform(
+        new Function<JavaPairRDD<String, Integer>, JavaPairRDD<String, String>>() {
+          @Override public JavaPairRDD<String, String> call(JavaPairRDD<String, Integer> in) throws Exception {
+            return null;
+          }
+        }
+    );
+
+    JavaPairDStream<String, String> pairTransformed4 = pairStream.transform(
+        new Function2<JavaPairRDD<String, Integer>, Time, JavaPairRDD<String, String>>() {
+          @Override public JavaPairRDD<String, String> call(JavaPairRDD<String, Integer> in, Time time) throws Exception {
+            return null;
+          }
+        }
+    );
+
+  }
+
+  @Test
   public void testTransformWith() {
     List<List<Tuple2<String, String>>> stringStringKVStream1 = Arrays.asList(
           Arrays.asList(new Tuple2<String, String>("california", "dodgers"),
@@ -374,10 +455,18 @@ public class JavaAPISuite implements Serializable {
 
     JavaPairDStream<String, Tuple2<String, String>> joined = pairStream1.transformWith(
         pairStream2,
-        new Function3<JavaPairRDD<String, String>, JavaPairRDD<String, String>, Time, JavaPairRDD<String, Tuple2<String, String>>>() {
-          @Override
-          public JavaPairRDD<String, Tuple2<String, String>> call(JavaPairRDD<String, String> stringStringJavaPairRDD, JavaPairRDD<String, String> stringStringJavaPairRDD2, Time time) throws Exception {
-            return stringStringJavaPairRDD.join(stringStringJavaPairRDD2);
+        new Function3 <
+            JavaPairRDD<String, String>,
+            JavaPairRDD<String, String>,
+            Time,
+            JavaPairRDD<String, Tuple2<String, String>>
+          >() {
+          @Override public JavaPairRDD<String, Tuple2<String, String>> call(
+              JavaPairRDD<String, String> rdd1,
+              JavaPairRDD<String, String> rdd2,
+              Time time
+            ) throws Exception {
+            return rdd1.join(rdd2);
           }
         }
     );
@@ -389,6 +478,106 @@ public class JavaAPISuite implements Serializable {
 
   }
 
+  @Test
+  public void testVariousTransformWith() {
+    // tests whether all variations of transformWith can be called from Java
+
+    List<List<Integer>> inputData1 = Arrays.asList(Arrays.asList(1));
+    List<List<String>> inputData2 = Arrays.asList(Arrays.asList("x"));
+    JavaDStream<Integer> stream1 = JavaTestUtils.attachTestInputStream(ssc, inputData1, 1);
+    JavaDStream<String> stream2 = JavaTestUtils.attachTestInputStream(ssc, inputData2, 1);
+
+    List<List<Tuple2<String, Integer>>> pairInputData1 =
+        Arrays.asList(Arrays.asList(new Tuple2<String, Integer>("x", 1)));
+    List<List<Tuple2<Double, Character>>> pairInputData2 =
+        Arrays.asList(Arrays.asList(new Tuple2<Double, Character>(1.0, 'x')));
+    JavaPairDStream<String, Integer> pairStream1 = JavaPairDStream.fromJavaDStream(
+        JavaTestUtils.attachTestInputStream(ssc, pairInputData1, 1));
+    JavaPairDStream<Double, Character> pairStream2 = JavaPairDStream.fromJavaDStream(
+        JavaTestUtils.attachTestInputStream(ssc, pairInputData2, 1));
+
+    JavaDStream<Double> transformed1 = stream1.transformWith(
+        stream2,
+        new Function3<JavaRDD<Integer>, JavaRDD<String>, Time, JavaRDD<Double>>() {
+          @Override
+          public JavaRDD<Double> call(JavaRDD<Integer> rdd1, JavaRDD<String> rdd2, Time time) throws Exception {
+            return null;
+          }
+        }
+    );
+
+    JavaDStream<Double> transformed2 = stream1.transformWith(
+        pairStream1,
+        new Function3<JavaRDD<Integer>, JavaPairRDD<String, Integer>, Time, JavaRDD<Double>>() {
+          @Override
+          public JavaRDD<Double> call(JavaRDD<Integer> rdd1, JavaPairRDD<String, Integer> rdd2, Time time) throws Exception {
+            return null;
+          }
+        }
+    );
+
+    JavaPairDStream<Double, Double> transformed3 = stream1.transformWith(
+        stream2,
+        new Function3<JavaRDD<Integer>, JavaRDD<String>, Time, JavaPairRDD<Double, Double>>() {
+          @Override
+          public JavaPairRDD<Double, Double> call(JavaRDD<Integer> rdd1, JavaRDD<String> rdd2, Time time) throws Exception {
+            return null;
+          }
+        }
+    );
+
+    JavaPairDStream<Double, Double> transformed4 = stream1.transformWith(
+        pairStream1,
+        new Function3<JavaRDD<Integer>, JavaPairRDD<String, Integer>, Time, JavaPairRDD<Double, Double>>() {
+          @Override
+          public JavaPairRDD<Double, Double> call(JavaRDD<Integer> rdd1, JavaPairRDD<String, Integer> rdd2, Time time) throws Exception {
+            return null;
+          }
+        }
+    );
+
+    JavaDStream<Double> pairTransformed1 = pairStream1.transformWith(
+        stream2,
+        new Function3<JavaPairRDD<String, Integer>, JavaRDD<String>, Time, JavaRDD<Double>>() {
+          @Override
+          public JavaRDD<Double> call(JavaPairRDD<String, Integer> rdd1, JavaRDD<String> rdd2, Time time) throws Exception {
+            return null;
+          }
+        }
+    );
+
+    JavaDStream<Double> pairTransformed2_ = pairStream1.transformWith(
+        pairStream1,
+        new Function3<JavaPairRDD<String, Integer>, JavaPairRDD<String, Integer>, Time, JavaRDD<Double>>() {
+          @Override
+          public JavaRDD<Double> call(JavaPairRDD<String, Integer> rdd1, JavaPairRDD<String, Integer> rdd2, Time time) throws Exception {
+            return null;
+          }
+        }
+    );
+
+    JavaPairDStream<Double, Double> pairTransformed3 = pairStream1.transformWith(
+        stream2,
+        new Function3<JavaPairRDD<String, Integer>, JavaRDD<String>, Time, JavaPairRDD<Double, Double>>() {
+          @Override
+          public JavaPairRDD<Double, Double> call(JavaPairRDD<String, Integer> rdd1, JavaRDD<String> rdd2, Time time) throws Exception {
+            return null;
+          }
+        }
+    );
+
+
+    JavaPairDStream<Double, Double> pairTransformed4 = pairStream1.transformWith(
+        pairStream2,
+        new Function3<JavaPairRDD<String, Integer>, JavaPairRDD<Double, Character>, Time, JavaPairRDD<Double, Double>>() {
+          @Override
+          public JavaPairRDD<Double, Double> call(JavaPairRDD<String, Integer> rdd1, JavaPairRDD<Double, Character> rdd2, Time time) throws Exception {
+            return null;
+          }
+        }
+    );
+  }
+
     @Test
   public void testFlatMap() {
     List<List<String>> inputData = Arrays.asList(


Mime
View raw message