spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From pwend...@apache.org
Subject [05/50] [abbrv] Merge branch 'master' into scala-2.10
Date Sat, 14 Dec 2013 08:41:49 GMT
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0f2e3c6e/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala
----------------------------------------------------------------------
diff --cc streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala
index c80545b,c6cd635..3ba37be
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala
@@@ -37,8 -36,8 +37,8 @@@ import org.apache.spark.rdd.RD
  import org.apache.spark.rdd.PairRDDFunctions
  
  class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
-     implicit val kTag: ClassTag[K],
-     implicit val vTag: ClassTag[V])
 -    implicit val kManifest: ClassManifest[K],
 -    implicit val vManifest: ClassManifest[V])
++    implicit val kManifest: ClassTag[K],
++    implicit val vManifest: ClassTag[V])
      extends JavaDStreamLike[(K, V), JavaPairDStream[K, V], JavaPairRDD[K, V]] {
  
    override def wrapRDD(rdd: RDD[(K, V)]): JavaPairRDD[K, V] = JavaPairRDD.fromRDD(rdd)
@@@ -441,6 -446,8 +447,8 @@@
        updateFunc: JFunction2[JList[V], Optional[S], Optional[S]],
        numPartitions: Int)
    : JavaPairDStream[K, S] = {
 -    implicit val cm: ClassManifest[S] =
 -      implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[S]]
++    implicit val cm: ClassTag[S] =
++      implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[S]]
      dstream.updateStateByKey(convertUpdateStateFunction(updateFunc), numPartitions)
    }
  
@@@ -457,12 -464,19 +465,19 @@@
        updateFunc: JFunction2[JList[V], Optional[S], Optional[S]],
        partitioner: Partitioner
    ): JavaPairDStream[K, S] = {
 -    implicit val cm: ClassManifest[S] =
 -      implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[S]]
++    implicit val cm: ClassTag[S] =
++      implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[S]]
      dstream.updateStateByKey(convertUpdateStateFunction(updateFunc), partitioner)
    }
  
+ 
+   /**
+    * Return a new DStream by applying a map function to the value of each key-value pairs
in
+    * 'this' DStream without changing the key.
+    */
    def mapValues[U](f: JFunction[V, U]): JavaPairDStream[K, U] = {
 -    implicit val cm: ClassManifest[U] =
 -      implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[U]]
 +    implicit val cm: ClassTag[U] =
 +      implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[U]]
      dstream.mapValues(f)
    }
  
@@@ -487,41 -504,150 +505,150 @@@
    }
  
    /**
-    * Cogroup `this` DStream with `other` DStream. For each key k in corresponding RDDs of
`this`
-    * or `other` DStreams, the generated RDD will contains a tuple with the list of values
for that
-    * key in both RDDs. Partitioner is used to partition each generated RDD.
+    * Return a new DStream by applying 'cogroup' between RDDs of `this` DStream and `other`
DStream.
+    * Hash partitioning is used to generate the RDDs with `numPartitions` partitions.
     */
-   def cogroup[W](other: JavaPairDStream[K, W], partitioner: Partitioner)
-   : JavaPairDStream[K, (JList[V], JList[W])] = {
+   def cogroup[W](
+       other: JavaPairDStream[K, W],
+       numPartitions: Int
+     ): JavaPairDStream[K, (JList[V], JList[W])] = {
 -    implicit val cm: ClassManifest[W] =
 -      implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]]
++    implicit val cm: ClassTag[W] =
++      implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[W]]
+     dstream.cogroup(other.dstream, numPartitions)
+            .mapValues(t => (seqAsJavaList(t._1), seqAsJavaList((t._2))))
+   }
+ 
+   /**
+    * Return a new DStream by applying 'cogroup' between RDDs of `this` DStream and `other`
DStream.
+    * Hash partitioning is used to generate the RDDs with `numPartitions` partitions.
+    */
+   def cogroup[W](
+       other: JavaPairDStream[K, W],
+       partitioner: Partitioner
+     ): JavaPairDStream[K, (JList[V], JList[W])] = {
 -    implicit val cm: ClassManifest[W] =
 -      implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]]
 +    implicit val cm: ClassTag[W] =
 +      implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[W]]
      dstream.cogroup(other.dstream, partitioner)
-         .mapValues(t => (seqAsJavaList(t._1), seqAsJavaList((t._2))))
+            .mapValues(t => (seqAsJavaList(t._1), seqAsJavaList((t._2))))
    }
  
    /**
-    * Join `this` DStream with `other` DStream. HashPartitioner is used
-    * to partition each generated RDD into default number of partitions.
+    * Return a new DStream by applying 'join' between RDDs of `this` DStream and `other`
DStream.
+    * Hash partitioning is used to generate the RDDs with Spark's default number of partitions.
     */
    def join[W](other: JavaPairDStream[K, W]): JavaPairDStream[K, (V, W)] = {
 -    implicit val cm: ClassManifest[W] =
 -      implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]]
 +    implicit val cm: ClassTag[W] =
 +      implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[W]]
      dstream.join(other.dstream)
    }
  
    /**
-    * Join `this` DStream with `other` DStream, that is, each RDD of the new DStream will
-    * be generated by joining RDDs from `this` and other DStream. Uses the given
-    * Partitioner to partition each generated RDD.
+    * Return a new DStream by applying 'join' between RDDs of `this` DStream and `other`
DStream.
+    * Hash partitioning is used to generate the RDDs with `numPartitions` partitions.
+    */
+   def join[W](other: JavaPairDStream[K, W], numPartitions: Int): JavaPairDStream[K, (V,
W)] = {
 -    implicit val cm: ClassManifest[W] =
 -      implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]]
++    implicit val cm: ClassTag[W] =
++      implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[W]]
+     dstream.join(other.dstream, numPartitions)
+   }
+ 
+   /**
+    * Return a new DStream by applying 'join' between RDDs of `this` DStream and `other`
DStream.
+    * The supplied [[org.apache.spark.Partitioner]] is used to control the partitioning of
each RDD.
     */
-   def join[W](other: JavaPairDStream[K, W], partitioner: Partitioner)
-   : JavaPairDStream[K, (V, W)] = {
+   def join[W](
+       other: JavaPairDStream[K, W],
+       partitioner: Partitioner
+     ): JavaPairDStream[K, (V, W)] = {
 -    implicit val cm: ClassManifest[W] =
 -      implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]]
 +    implicit val cm: ClassTag[W] =
 +      implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[W]]
      dstream.join(other.dstream, partitioner)
    }
  
    /**
+    * Return a new DStream by applying 'left outer join' between RDDs of `this` DStream and
+    * `other` DStream. Hash partitioning is used to generate the RDDs with Spark's default
+    * number of partitions.
+    */
+   def leftOuterJoin[W](other: JavaPairDStream[K, W]): JavaPairDStream[K, (V, Optional[W])]
= {
 -    implicit val cm: ClassManifest[W] =
 -      implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]]
++    implicit val cm: ClassTag[W] =
++      implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[W]]
+     val joinResult = dstream.leftOuterJoin(other.dstream)
+     joinResult.mapValues{case (v, w) => (v, JavaUtils.optionToOptional(w))}
+   }
+ 
+   /**
+    * Return a new DStream by applying 'left outer join' between RDDs of `this` DStream and
+    * `other` DStream. Hash partitioning is used to generate the RDDs with `numPartitions`
+    * partitions.
+    */
+   def leftOuterJoin[W](
+       other: JavaPairDStream[K, W],
+       numPartitions: Int
+     ): JavaPairDStream[K, (V, Optional[W])] = {
 -    implicit val cm: ClassManifest[W] =
 -      implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]]
++    implicit val cm: ClassTag[W] =
++      implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[W]]
+     val joinResult = dstream.leftOuterJoin(other.dstream, numPartitions)
+     joinResult.mapValues{case (v, w) => (v, JavaUtils.optionToOptional(w))}
+   }
+ 
+   /**
+    * Return a new DStream by applying 'join' between RDDs of `this` DStream and `other`
DStream.
+    * The supplied [[org.apache.spark.Partitioner]] is used to control the partitioning of
each RDD.
+    */
+   def leftOuterJoin[W](
+       other: JavaPairDStream[K, W],
+       partitioner: Partitioner
+     ): JavaPairDStream[K, (V, Optional[W])] = {
 -    implicit val cm: ClassManifest[W] =
 -      implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]]
++    implicit val cm: ClassTag[W] =
++      implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[W]]
+     val joinResult = dstream.leftOuterJoin(other.dstream, partitioner)
+     joinResult.mapValues{case (v, w) => (v, JavaUtils.optionToOptional(w))}
+   }
+ 
+   /**
+    * Return a new DStream by applying 'right outer join' between RDDs of `this` DStream
and
+    * `other` DStream. Hash partitioning is used to generate the RDDs with Spark's default
+    * number of partitions.
+    */
+   def rightOuterJoin[W](other: JavaPairDStream[K, W]): JavaPairDStream[K, (Optional[V],
W)] = {
 -    implicit val cm: ClassManifest[W] =
 -      implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]]
++    implicit val cm: ClassTag[W] =
++      implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[W]]
+     val joinResult = dstream.rightOuterJoin(other.dstream)
+     joinResult.mapValues{case (v, w) => (JavaUtils.optionToOptional(v), w)}
+   }
+ 
+   /**
+    * Return a new DStream by applying 'right outer join' between RDDs of `this` DStream
and
+    * `other` DStream. Hash partitioning is used to generate the RDDs with `numPartitions`
+    * partitions.
+    */
+   def rightOuterJoin[W](
+       other: JavaPairDStream[K, W],
+       numPartitions: Int
+     ): JavaPairDStream[K, (Optional[V], W)] = {
 -    implicit val cm: ClassManifest[W] =
 -      implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]]
++    implicit val cm: ClassTag[W] =
++      implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[W]]
+     val joinResult = dstream.rightOuterJoin(other.dstream, numPartitions)
+     joinResult.mapValues{case (v, w) => (JavaUtils.optionToOptional(v), w)}
+   }
+ 
+   /**
+    * Return a new DStream by applying 'right outer join' between RDDs of `this` DStream
and
+    * `other` DStream. The supplied [[org.apache.spark.Partitioner]] is used to control
+    * the partitioning of each RDD.
+    */
+   def rightOuterJoin[W](
+       other: JavaPairDStream[K, W],
+       partitioner: Partitioner
+     ): JavaPairDStream[K, (Optional[V], W)] = {
 -    implicit val cm: ClassManifest[W] =
 -      implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]]
++    implicit val cm: ClassTag[W] =
++      implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[W]]
+     val joinResult = dstream.rightOuterJoin(other.dstream, partitioner)
+     joinResult.mapValues{case (v, w) => (JavaUtils.optionToOptional(v), w)}
+   }
+ 
+   /**
     * Save each RDD in `this` DStream as a Hadoop file. The file name at each batch interval
is
     * generated based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix".
     */
@@@ -591,20 -717,25 +718,25 @@@
      dstream.saveAsNewAPIHadoopFiles(prefix, suffix, keyClass, valueClass, outputFormatClass,
conf)
    }
  
+   /** Convert to a JavaDStream */
+   def toJavaDStream(): JavaDStream[(K, V)] = {
+     new JavaDStream[(K, V)](dstream)
+   }
+ 
 -  override val classManifest: ClassManifest[(K, V)] =
 -    implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[Tuple2[K, V]]]
 +  override val classTag: ClassTag[(K, V)] =
 +    implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[Tuple2[K, V]]]
  }
  
  object JavaPairDStream {
-   implicit def fromPairDStream[K: ClassTag, V: ClassTag](dstream: DStream[(K, V)])
-   :JavaPairDStream[K, V] =
 -  implicit def fromPairDStream[K: ClassManifest, V: ClassManifest](dstream: DStream[(K,
V)]) = {
++  implicit def fromPairDStream[K: ClassTag, V: ClassTag](dstream: DStream[(K, V)]) : JavaPairDStream[K,
V] = {
      new JavaPairDStream[K, V](dstream)
+   }
  
    def fromJavaDStream[K, V](dstream: JavaDStream[(K, V)]): JavaPairDStream[K, V] = {
 -    implicit val cmk: ClassManifest[K] =
 -      implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[K]]
 -    implicit val cmv: ClassManifest[V] =
 -      implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[V]]
 +    implicit val cmk: ClassTag[K] =
 +      implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[K]]
 +    implicit val cmv: ClassTag[V] =
 +      implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[V]]
      new JavaPairDStream[K, V](dstream.dstream)
    }
  

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0f2e3c6e/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
----------------------------------------------------------------------
diff --cc streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
index 8242af6,cf30b54..ca0c905
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
@@@ -19,10 -19,9 +19,10 @@@ package org.apache.spark.streaming.api.
  
  import java.lang.{Long => JLong, Integer => JInt}
  import java.io.InputStream
- import java.util.{Map => JMap}
+ import java.util.{Map => JMap, List => JList}
  
  import scala.collection.JavaConversions._
 +import scala.reflect.ClassTag
  
  import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
  import twitter4j.Status
@@@ -144,12 -141,11 +144,12 @@@ class JavaStreamingContext(val ssc: Str
      zkQuorum: String,
      groupId: String,
      topics: JMap[String, JInt])
-   : JavaDStream[String] = {
+   : JavaPairDStream[String, String] = {
 -    implicit val cmt: ClassManifest[String] =
 -      implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[String]]
 +    implicit val cmt: ClassTag[String] =
 +      implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]]
      ssc.kafkaStream(zkQuorum, groupId, Map(topics.mapValues(_.intValue()).toSeq: _*),
        StorageLevel.MEMORY_ONLY_SER_2)
 +
    }
  
    /**
@@@ -166,9 -162,9 +166,9 @@@
      groupId: String,
      topics: JMap[String, JInt],
      storageLevel: StorageLevel)
-   : JavaDStream[String] = {
+   : JavaPairDStream[String, String] = {
 -    implicit val cmt: ClassManifest[String] =
 -      implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[String]]
 +    implicit val cmt: ClassTag[String] =
 +      implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]]
      ssc.kafkaStream(zkQuorum, groupId, Map(topics.mapValues(_.intValue()).toSeq: _*),
        storageLevel)
    }
@@@ -189,11 -189,16 +193,16 @@@
      kafkaParams: JMap[String, String],
      topics: JMap[String, JInt],
      storageLevel: StorageLevel)
-   : JavaDStream[T] = {
-     implicit val cmt: ClassTag[T] =
-       implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
-     implicit val cmd: Manifest[D] = implicitly[Manifest[AnyRef]].asInstanceOf[Manifest[D]]
-     ssc.kafkaStream[T, D](
+   : JavaPairDStream[K, V] = {
 -    implicit val keyCmt: ClassManifest[K] =
 -      implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[K]]
 -    implicit val valueCmt: ClassManifest[V] =
 -      implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[V]]
++    implicit val keyCmt: ClassTag[K] =
++      implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[K]]
++    implicit val valueCmt: ClassTag[V] =
++      implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[V]]
+ 
+     implicit val keyCmd: Manifest[U] = implicitly[Manifest[AnyRef]].asInstanceOf[Manifest[U]]
+     implicit val valueCmd: Manifest[T] = implicitly[Manifest[AnyRef]].asInstanceOf[Manifest[T]]
+ 
+     ssc.kafkaStream[K, V, U, T](
        kafkaParams.toMap,
        Map(topics.mapValues(_.intValue()).toSeq: _*),
        storageLevel)
@@@ -589,6 -594,77 +598,77 @@@
    }
  
    /**
+    * Create a unified DStream from multiple DStreams of the same type and same slide duration.
+    */
+   def union[T](first: JavaDStream[T], rest: JList[JavaDStream[T]]): JavaDStream[T] = {
+     val dstreams: Seq[DStream[T]] = (Seq(first) ++ asScalaBuffer(rest)).map(_.dstream)
 -    implicit val cm: ClassManifest[T] = first.classManifest
++    implicit val cm: ClassTag[T] = first.classTag
+     ssc.union(dstreams)(cm)
+   }
+ 
+   /**
+    * Create a unified DStream from multiple DStreams of the same type and same slide duration.
+    */
+   def union[K, V](
+       first: JavaPairDStream[K, V],
+       rest: JList[JavaPairDStream[K, V]]
+     ): JavaPairDStream[K, V] = {
+     val dstreams: Seq[DStream[(K, V)]] = (Seq(first) ++ asScalaBuffer(rest)).map(_.dstream)
 -    implicit val cm: ClassManifest[(K, V)] = first.classManifest
 -    implicit val kcm: ClassManifest[K] = first.kManifest
 -    implicit val vcm: ClassManifest[V] = first.vManifest
++    implicit val cm: ClassTag[(K, V)] = first.classTag
++    implicit val kcm: ClassTag[K] = first.kManifest
++    implicit val vcm: ClassTag[V] = first.vManifest
+     new JavaPairDStream[K, V](ssc.union(dstreams)(cm))(kcm, vcm)
+   }
+ 
+   /**
+    * Create a new DStream in which each RDD is generated by applying a function on RDDs
of
+    * the DStreams. The order of the JavaRDDs in the transform function parameter will be
the
+    * same as the order of corresponding DStreams in the list. Note that for adding a
+    * JavaPairDStream in the list of JavaDStreams, convert it to a JavaDStream using
+    * [[org.apache.spark.streaming.api.java.JavaPairDStream]].toJavaDStream().
+    * In the transform function, convert the JavaRDD corresponding to that JavaDStream to
+    * a JavaPairRDD using [[org.apache.spark.api.java.JavaPairRDD]].fromJavaRDD().
+    */
+   def transform[T](
+       dstreams: JList[JavaDStream[_]],
+       transformFunc: JFunction2[JList[JavaRDD[_]], Time, JavaRDD[T]]
+     ): JavaDStream[T] = {
 -    implicit val cmt: ClassManifest[T] =
 -      implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]]
++    implicit val cmt: ClassTag[T] =
++      implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
+     val scalaDStreams = dstreams.map(_.dstream).toSeq
+     val scalaTransformFunc = (rdds: Seq[RDD[_]], time: Time) => {
+       val jrdds = rdds.map(rdd => JavaRDD.fromRDD[AnyRef](rdd.asInstanceOf[RDD[AnyRef]])).toList
+       transformFunc.call(jrdds, time).rdd
+     }
+     ssc.transform(scalaDStreams, scalaTransformFunc)
+   }
+ 
+   /**
+    * Create a new DStream in which each RDD is generated by applying a function on RDDs
of
+    * the DStreams. The order of the JavaRDDs in the transform function parameter will be
the
+    * same as the order of corresponding DStreams in the list. Note that for adding a
+    * JavaPairDStream in the list of JavaDStreams, convert it to a JavaDStream using
+    * [[org.apache.spark.streaming.api.java.JavaPairDStream]].toJavaDStream().
+    * In the transform function, convert the JavaRDD corresponding to that JavaDStream to
+    * a JavaPairRDD using [[org.apache.spark.api.java.JavaPairRDD]].fromJavaRDD().
+    */
+   def transform[K, V](
+       dstreams: JList[JavaDStream[_]],
+       transformFunc: JFunction2[JList[JavaRDD[_]], Time, JavaPairRDD[K, V]]
+     ): JavaPairDStream[K, V] = {
 -    implicit val cmk: ClassManifest[K] =
 -      implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[K]]
 -    implicit val cmv: ClassManifest[V] =
 -      implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[V]]
++    implicit val cmk: ClassTag[K] =
++      implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[K]]
++    implicit val cmv: ClassTag[V] =
++      implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[V]]
+     val scalaDStreams = dstreams.map(_.dstream).toSeq
+     val scalaTransformFunc = (rdds: Seq[RDD[_]], time: Time) => {
+       val jrdds = rdds.map(rdd => JavaRDD.fromRDD[AnyRef](rdd.asInstanceOf[RDD[AnyRef]])).toList
+       transformFunc.call(jrdds, time).rdd
+     }
+     ssc.transform(scalaDStreams, scalaTransformFunc)
+   }
+ 
+   /**
     * Sets the context to periodically checkpoint the DStream operations for master
     * fault-tolerance. The graph will be checkpointed every batch interval.
     * @param directory HDFS-compatible directory where the checkpoint data will be reliably
stored

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0f2e3c6e/streaming/src/main/scala/org/apache/spark/streaming/dstream/KafkaInputDStream.scala
----------------------------------------------------------------------
diff --cc streaming/src/main/scala/org/apache/spark/streaming/dstream/KafkaInputDStream.scala
index 9613486,a5de5e1..ec0096c
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/KafkaInputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/KafkaInputDStream.scala
@@@ -33,20 -31,22 +31,23 @@@ import kafka.utils.ZKStringSerialize
  import org.I0Itec.zkclient._
  
  import scala.collection.Map
- import scala.collection.mutable.HashMap
- import scala.collection.JavaConversions._
 +import scala.reflect.ClassTag
  
+ 
  /**
   * Input stream that pulls messages from a Kafka Broker.
 - * 
 + *
   * @param kafkaParams Map of kafka configuration paramaters. See: http://kafka.apache.org/configuration.html
   * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed
   * in its own thread.
   * @param storageLevel RDD storage level.
   */
  private[streaming]
- class KafkaInputDStream[T: ClassTag, D <: Decoder[_]: Manifest](
+ class KafkaInputDStream[
 -  K: ClassManifest,
 -  V: ClassManifest,
++  K: ClassTag,
++  V: ClassTag,
+   U <: Decoder[_]: Manifest,
+   T <: Decoder[_]: Manifest](
      @transient ssc_ : StreamingContext,
      kafkaParams: Map[String, String],
      topics: Map[String, Int],
@@@ -61,10 -60,14 +61,14 @@@
  }
  
  private[streaming]
- class KafkaReceiver[T: ClassTag, D <: Decoder[_]: Manifest](
-   kafkaParams: Map[String, String],
-   topics: Map[String, Int],
-   storageLevel: StorageLevel
+ class KafkaReceiver[
 -  K: ClassManifest,
 -  V: ClassManifest,
++  K: ClassTag,
++  V: ClassTag,
+   U <: Decoder[_]: Manifest,
+   T <: Decoder[_]: Manifest](
+     kafkaParams: Map[String, String],
+     topics: Map[String, Int],
+     storageLevel: StorageLevel
    ) extends NetworkReceiver[Any] {
  
    // Handles pushing data into the BlockManager
@@@ -97,14 -100,21 +101,22 @@@
  
      // When autooffset.reset is defined, it is our responsibility to try and whack the
      // consumer group zk node.
-     if (kafkaParams.contains("autooffset.reset")) {
-       tryZookeeperConsumerGroupCleanup(kafkaParams("zk.connect"), kafkaParams("groupid"))
+     if (kafkaParams.contains("auto.offset.reset")) {
+       tryZookeeperConsumerGroupCleanup(kafkaParams("zookeeper.connect"), kafkaParams("group.id"))
      }
  
 -    // Create Threads for each Topic/Message Stream we are listening
 -    val keyDecoder = manifest[U].erasure.getConstructor(classOf[VerifiableProperties])
++    val keyDecoder = manifest[U].runtimeClass.getConstructor(classOf[VerifiableProperties])
+       .newInstance(consumerConfig.props)
+       .asInstanceOf[Decoder[K]]
 -    val valueDecoder = manifest[T].erasure.getConstructor(classOf[VerifiableProperties])
++    val valueDecoder = manifest[T].runtimeClass.getConstructor(classOf[VerifiableProperties])
+       .newInstance(consumerConfig.props)
+       .asInstanceOf[Decoder[V]]
+ 
 +    // Create Threads for each Topic/Message Stream we are listening
-     val decoder = manifest[D].runtimeClass.newInstance.asInstanceOf[Decoder[T]]
-     val topicMessageStreams = consumerConnector.createMessageStreams(topics, decoder)
+     val topicMessageStreams = consumerConnector.createMessageStreams(
+       topics, keyDecoder, valueDecoder)
+ 
 +
      // Start the messages handler for each partition
      topicMessageStreams.values.foreach { streams =>
        streams.foreach { stream => executorPool.submit(new MessageHandler(stream)) }
@@@ -112,7 -122,8 +124,8 @@@
    }
  
    // Handles Kafka Messages
-   private class MessageHandler[T: ClassTag](stream: KafkaStream[T]) extends Runnable {
 -  private class MessageHandler[K: ClassManifest, V: ClassManifest](stream: KafkaStream[K,
V])
++  private class MessageHandler[K: ClassTag, V: ClassTag](stream: KafkaStream[K, V])
+     extends Runnable {
      def run() {
        logInfo("Starting MessageHandler.")
        for (msgAndMetadata <- stream) {

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0f2e3c6e/streaming/src/main/scala/org/apache/spark/streaming/dstream/MQTTInputDStream.scala
----------------------------------------------------------------------
diff --cc streaming/src/main/scala/org/apache/spark/streaming/dstream/MQTTInputDStream.scala
index 0000000,ac05282..ef4a737
mode 000000,100644..100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/MQTTInputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/MQTTInputDStream.scala
@@@ -1,0 -1,109 +1,110 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *    http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ 
+ package org.apache.spark.streaming.dstream
+ 
+ import org.apache.spark.Logging
+ import org.apache.spark.storage.StorageLevel
+ import org.apache.spark.streaming.{ Time, DStreamCheckpointData, StreamingContext }
+ 
+ import java.util.Properties
+ import java.util.concurrent.Executors
+ import java.io.IOException
+ 
+ import org.eclipse.paho.client.mqttv3.MqttCallback
+ import org.eclipse.paho.client.mqttv3.MqttClient
+ import org.eclipse.paho.client.mqttv3.MqttClientPersistence
+ import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence
+ import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken
+ import org.eclipse.paho.client.mqttv3.MqttException
+ import org.eclipse.paho.client.mqttv3.MqttMessage
+ import org.eclipse.paho.client.mqttv3.MqttTopic
+ 
+ import scala.collection.Map
+ import scala.collection.mutable.HashMap
+ import scala.collection.JavaConversions._
++import scala.reflect.ClassTag
+ 
+ /**
+  * Input stream that subscribe messages from a Mqtt Broker.
+  * Uses eclipse paho as MqttClient http://www.eclipse.org/paho/
+  * @param brokerUrl Url of remote mqtt publisher
+  * @param topic topic name to subscribe to
+  * @param storageLevel RDD storage level.
+  */
+ 
+ private[streaming] 
 -class MQTTInputDStream[T: ClassManifest](
++class MQTTInputDStream[T: ClassTag](
+   @transient ssc_ : StreamingContext,
+   brokerUrl: String,
+   topic: String,
+   storageLevel: StorageLevel
+   ) extends NetworkInputDStream[T](ssc_) with Logging {
+   
+   def getReceiver(): NetworkReceiver[T] = {
+     new MQTTReceiver(brokerUrl, topic, storageLevel)
+       .asInstanceOf[NetworkReceiver[T]]
+   }
+ }
+ 
+ private[streaming] 
+ class MQTTReceiver(brokerUrl: String,
+   topic: String,
+   storageLevel: StorageLevel
+   ) extends NetworkReceiver[Any] {
+   lazy protected val blockGenerator = new BlockGenerator(storageLevel)
+   
+   def onStop() {
+     blockGenerator.stop()
+   }
+   
+   def onStart() {
+ 
+     blockGenerator.start()
+ 
+     // Set up persistence for messages 
+     var peristance: MqttClientPersistence = new MemoryPersistence()
+ 
+     // Initializing Mqtt Client specifying brokerUrl, clientID and MqttClientPersistance
+     var client: MqttClient = new MqttClient(brokerUrl, "MQTTSub", peristance)
+ 
+     // Connect to MqttBroker    
+     client.connect()
+ 
+     // Subscribe to Mqtt topic
+     client.subscribe(topic)
+ 
+     // Callback automatically triggers as and when new message arrives on specified topic
+     var callback: MqttCallback = new MqttCallback() {
+ 
+       // Handles Mqtt message 
+       override def messageArrived(arg0: String, arg1: MqttMessage) {
+         blockGenerator += new String(arg1.getPayload())
+       }
+ 
+       override def deliveryComplete(arg0: IMqttDeliveryToken) {
+       }
+ 
+       override def connectionLost(arg0: Throwable) {
+         logInfo("Connection lost " + arg0)
+       }
+     }
+ 
+     // Set up callback for MqttClient
+     client.setCallback(callback)
+   }
+ }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0f2e3c6e/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0f2e3c6e/streaming/src/main/scala/org/apache/spark/streaming/dstream/RawInputDStream.scala
----------------------------------------------------------------------
diff --cc streaming/src/main/scala/org/apache/spark/streaming/dstream/RawInputDStream.scala
index a4746f0,10ed4ef..dea0f26
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/RawInputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/RawInputDStream.scala
@@@ -18,11 -18,9 +18,11 @@@
  package org.apache.spark.streaming.dstream
  
  import org.apache.spark.Logging
- import org.apache.spark.storage.StorageLevel
+ import org.apache.spark.storage.{StorageLevel, StreamBlockId}
  import org.apache.spark.streaming.StreamingContext
  
 +import scala.reflect.ClassTag
 +
  import java.net.InetSocketAddress
  import java.nio.ByteBuffer
  import java.nio.channels.{ReadableByteChannel, SocketChannel}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0f2e3c6e/streaming/src/main/scala/org/apache/spark/streaming/dstream/TransformedDStream.scala
----------------------------------------------------------------------
diff --cc streaming/src/main/scala/org/apache/spark/streaming/dstream/TransformedDStream.scala
index 73e1ddf,71bcb2b..aeea060
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/TransformedDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/TransformedDStream.scala
@@@ -19,19 -19,24 +19,25 @@@ package org.apache.spark.streaming.dstr
  
  import org.apache.spark.rdd.RDD
  import org.apache.spark.streaming.{Duration, DStream, Time}
 +import scala.reflect.ClassTag
  
  private[streaming]
- class TransformedDStream[T: ClassTag, U: ClassTag] (
-     parent: DStream[T],
-     transformFunc: (RDD[T], Time) => RDD[U]
-   ) extends DStream[U](parent.ssc) {
 -class TransformedDStream[U: ClassManifest] (
++class TransformedDStream[U: ClassTag] (
+     parents: Seq[DStream[_]],
+     transformFunc: (Seq[RDD[_]], Time) => RDD[U]
+   ) extends DStream[U](parents.head.ssc) {
  
-   override def dependencies = List(parent)
+   require(parents.length > 0, "List of DStreams to transform is empty")
+   require(parents.map(_.ssc).distinct.size == 1, "Some of the DStreams have different contexts")
+   require(parents.map(_.slideDuration).distinct.size == 1,
+     "Some of the DStreams have different slide durations")
  
-   override def slideDuration: Duration = parent.slideDuration
+   override def dependencies = parents.toList
+ 
+   override def slideDuration: Duration = parents.head.slideDuration
  
    override def compute(validTime: Time): Option[RDD[U]] = {
-     parent.getOrCompute(validTime).map(transformFunc(_, validTime))
+     val parentRDDs = parents.map(_.getOrCompute(validTime).orNull).toSeq
+     Some(transformFunc(parentRDDs, validTime))
    }
  }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0f2e3c6e/streaming/src/main/scala/org/apache/spark/streaming/receivers/ActorReceiver.scala
----------------------------------------------------------------------
diff --cc streaming/src/main/scala/org/apache/spark/streaming/receivers/ActorReceiver.scala
index ee087a1,ef0f85a..fdf5371
--- a/streaming/src/main/scala/org/apache/spark/streaming/receivers/ActorReceiver.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/receivers/ActorReceiver.scala
@@@ -20,12 -20,8 +20,12 @@@ package org.apache.spark.streaming.rece
  import akka.actor.{ Actor, PoisonPill, Props, SupervisorStrategy }
  import akka.actor.{ actorRef2Scala, ActorRef }
  import akka.actor.{ PossiblyHarmful, OneForOneStrategy }
 +import akka.actor.SupervisorStrategy._
 +
 +import scala.concurrent.duration._
 +import scala.reflect.ClassTag
  
- import org.apache.spark.storage.StorageLevel
+ import org.apache.spark.storage.{StorageLevel, StreamBlockId}
  import org.apache.spark.streaming.dstream.NetworkReceiver
  
  import java.util.concurrent.atomic.AtomicInteger

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0f2e3c6e/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
----------------------------------------------------------------------
diff --cc streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
index 076fb53,ad4a8b9..daeb99f
--- a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
+++ b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
@@@ -21,10 -21,9 +21,11 @@@ import com.google.common.base.Optional
  import com.google.common.collect.Lists;
  import com.google.common.collect.Maps;
  import com.google.common.io.Files;
 +
  import kafka.serializer.StringDecoder;
 +
  import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+ import org.apache.spark.streaming.api.java.JavaDStreamLike;
  import org.junit.After;
  import org.junit.Assert;
  import org.junit.Before;

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0f2e3c6e/streaming/src/test/java/org/apache/spark/streaming/JavaTestUtils.scala
----------------------------------------------------------------------
diff --cc streaming/src/test/java/org/apache/spark/streaming/JavaTestUtils.scala
index d5cdad4,5e384ee..42ab959
--- a/streaming/src/test/java/org/apache/spark/streaming/JavaTestUtils.scala
+++ b/streaming/src/test/java/org/apache/spark/streaming/JavaTestUtils.scala
@@@ -33,15 -31,15 +33,15 @@@ trait JavaTestBase extends TestSuiteBas
    /**
     * Create a [[org.apache.spark.streaming.TestInputStream]] and attach it to the supplied
context.
     * The stream will be derived from the supplied lists of Java objects.
 -   **/
 +   */
    def attachTestInputStream[T](
-     ssc: JavaStreamingContext,
-     data: JList[JList[T]],
-     numPartitions: Int) = {
+       ssc: JavaStreamingContext,
+       data: JList[JList[T]],
+       numPartitions: Int) = {
      val seqData = data.map(Seq(_:_*))
  
 -    implicit val cm: ClassManifest[T] =
 -      implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]]
 +    implicit val cm: ClassTag[T] =
 +      implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
      val dstream = new TestInputStream[T](ssc.ssc, seqData, numPartitions)
      ssc.ssc.registerInputStream(dstream)
      new JavaDStream[T](dstream)
@@@ -52,12 -50,11 +52,11 @@@
     * [[org.apache.spark.streaming.TestOutputStream]].
     **/
    def attachTestOutputStream[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T,
R]](
-     dstream: JavaDStreamLike[T, This, R]) =
+       dstream: JavaDStreamLike[T, This, R]) =
    {
 -    implicit val cm: ClassManifest[T] =
 -      implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]]
 +    implicit val cm: ClassTag[T] =
 +      implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
-     val ostream = new TestOutputStream(dstream.dstream,
-       new ArrayBuffer[Seq[T]] with SynchronizedBuffer[Seq[T]])
+     val ostream = new TestOutputStreamWithPartitions(dstream.dstream)
      dstream.dstream.ssc.registerOutputStream(ostream)
    }
  
@@@ -65,16 -62,39 +64,39 @@@
     * Process all registered streams for a numBatches batches, failing if
     * numExpectedOutput RDD's are not generated. Generated RDD's are collected
     * and returned, represented as a list for each batch interval.
+    *
+    * Returns a list of items for each RDD.
     */
    def runStreams[V](
-     ssc: JavaStreamingContext, numBatches: Int, numExpectedOutput: Int): JList[JList[V]]
= {
+       ssc: JavaStreamingContext, numBatches: Int, numExpectedOutput: Int): JList[JList[V]]
= {
 -    implicit val cm: ClassManifest[V] =
 -      implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[V]]
 +    implicit val cm: ClassTag[V] =
 +      implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[V]]
      val res = runStreams[V](ssc.ssc, numBatches, numExpectedOutput)
      val out = new ArrayList[JList[V]]()
      res.map(entry => out.append(new ArrayList[V](entry)))
      out
    }
+ 
+   /**
+    * Process all registered streams for a numBatches batches, failing if
+    * numExpectedOutput RDD's are not generated. Generated RDD's are collected
+    * and returned, represented as a list for each batch interval.
+    *
+    * Returns a sequence of RDD's. Each RDD is represented as several sequences of items,
each
+    * representing one partition.
+    */
+   def runStreamsWithPartitions[V](ssc: JavaStreamingContext, numBatches: Int,
+       numExpectedOutput: Int): JList[JList[JList[V]]] = {
 -    implicit val cm: ClassManifest[V] =
 -      implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[V]]
++    implicit val cm: ClassTag[V] =
++      implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[V]]
+     val res = runStreamsWithPartitions[V](ssc.ssc, numBatches, numExpectedOutput)
+     val out = new ArrayList[JList[JList[V]]]()
+     res.map{entry =>
+       val lists = entry.map(new ArrayList[V](_))
+       out.append(new ArrayList[JList[V]](lists))
+     }
+     out
+   }
  }
  
  object JavaTestUtils extends JavaTestBase {

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0f2e3c6e/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0f2e3c6e/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
----------------------------------------------------------------------
diff --cc streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
index c91f9ba,be14069..126915a
--- a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
@@@ -61,8 -60,11 +61,11 @@@ class TestInputStream[T: ClassTag](ssc
  /**
   * This is a output stream just for the testsuites. All the output is collected into a
   * ArrayBuffer. This buffer is wiped clean on being restored from checkpoint.
+  *
+  * The buffer contains a sequence of RDD's, each containing a sequence of items
   */
- class TestOutputStream[T: ClassTag](parent: DStream[T], val output: ArrayBuffer[Seq[T]])
 -class TestOutputStream[T: ClassManifest](parent: DStream[T],
++class TestOutputStream[T: ClassTag](parent: DStream[T],
+     val output: ArrayBuffer[Seq[T]] = ArrayBuffer[Seq[T]]())
    extends ForEachDStream[T](parent, (rdd: RDD[T], t: Time) => {
      val collected = rdd.collect()
      output += collected
@@@ -77,6 -79,30 +80,30 @@@
  }
  
  /**
+  * This is a output stream just for the testsuites. All the output is collected into a
+  * ArrayBuffer. This buffer is wiped clean on being restored from checkpoint.
+  *
+  * The buffer contains a sequence of RDD's, each containing a sequence of partitions, each
+  * containing a sequence of items.
+  */
 -class TestOutputStreamWithPartitions[T: ClassManifest](parent: DStream[T],
++class TestOutputStreamWithPartitions[T: ClassTag](parent: DStream[T],
+     val output: ArrayBuffer[Seq[Seq[T]]] = ArrayBuffer[Seq[Seq[T]]]())
+   extends ForEachDStream[T](parent, (rdd: RDD[T], t: Time) => {
+     val collected = rdd.glom().collect().map(_.toSeq)
+     output += collected
+   }) {
+ 
+   // This is to clear the output buffer every it is read from a checkpoint
+   @throws(classOf[IOException])
+   private def readObject(ois: ObjectInputStream) {
+     ois.defaultReadObject()
+     output.clear()
+   }
+ 
+   def toTestOutputStream = new TestOutputStream[T](this.parent, this.output.map(_.flatten))
+ }
+ 
+ /**
   * This is the base trait for Spark Streaming testsuites. This provides basic functionality
   * to run user-defined set of input on user-defined stream operations, and verify the output.
   */
@@@ -107,9 -133,10 +134,10 @@@ trait TestSuiteBase extends FunSuite wi
     * Set up required DStreams to test the DStream operation using the two sequences
     * of input collections.
     */
 -  def setupStreams[U: ClassManifest, V: ClassManifest](
 +  def setupStreams[U: ClassTag, V: ClassTag](
        input: Seq[Seq[U]],
-       operation: DStream[U] => DStream[V]
+       operation: DStream[U] => DStream[V],
+       numPartitions: Int = numInputPartitions
      ): StreamingContext = {
  
      // Create StreamingContext
@@@ -158,12 -187,31 +188,31 @@@
     * Runs the streams set up in `ssc` on manual clock for `numBatches` batches and
     * returns the collected output. It will wait until `numExpectedOutput` number of
     * output data has been collected or timeout (set by `maxWaitTimeMillis`) is reached.
+    *
+    * Returns a sequence of items for each RDD.
     */
 -  def runStreams[V: ClassManifest](
 +  def runStreams[V: ClassTag](
        ssc: StreamingContext,
        numBatches: Int,
        numExpectedOutput: Int
      ): Seq[Seq[V]] = {
+     // Flatten each RDD into a single Seq
+     runStreamsWithPartitions(ssc, numBatches, numExpectedOutput).map(_.flatten.toSeq)
+   }
+ 
+   /**
+    * Runs the streams set up in `ssc` on manual clock for `numBatches` batches and
+    * returns the collected output. It will wait until `numExpectedOutput` number of
+    * output data has been collected or timeout (set by `maxWaitTimeMillis`) is reached.
+    *
+    * Returns a sequence of RDD's. Each RDD is represented as several sequences of items,
each
+    * representing one partition.
+    */
 -  def runStreamsWithPartitions[V: ClassManifest](
++  def runStreamsWithPartitions[V: ClassTag](
+       ssc: StreamingContext,
+       numBatches: Int,
+       numExpectedOutput: Int
+     ): Seq[Seq[Seq[V]]] = {
      assert(numBatches > 0, "Number of batches to run stream computation is zero")
      assert(numExpectedOutput > 0, "Number of expected outputs after " + numBatches +
" is zero")
      logInfo("numBatches = " + numBatches + ", numExpectedOutput = " + numExpectedOutput)

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0f2e3c6e/tools/src/main/scala/org/apache/spark/tools/JavaAPICompletenessChecker.scala
----------------------------------------------------------------------
diff --cc tools/src/main/scala/org/apache/spark/tools/JavaAPICompletenessChecker.scala
index f824c47,f824c47..f670f65
--- a/tools/src/main/scala/org/apache/spark/tools/JavaAPICompletenessChecker.scala
+++ b/tools/src/main/scala/org/apache/spark/tools/JavaAPICompletenessChecker.scala
@@@ -199,7 -199,7 +199,7 @@@ object JavaAPICompletenessChecker 
  
    private def toJavaMethod(method: SparkMethod): SparkMethod = {
      val params = method.parameters
--      .filterNot(_.name == "scala.reflect.ClassManifest")
++      .filterNot(_.name == "scala.reflect.ClassTag")
        .map(toJavaType(_, isReturnType = false))
      SparkMethod(method.name, toJavaType(method.returnType, isReturnType = true), params)
    }
@@@ -212,7 -212,7 +212,7 @@@
      // internal Spark components.
      val excludedNames = Seq(
        "org.apache.spark.rdd.RDD.origin",
--      "org.apache.spark.rdd.RDD.elementClassManifest",
++      "org.apache.spark.rdd.RDD.elementClassTag",
        "org.apache.spark.rdd.RDD.checkpointData",
        "org.apache.spark.rdd.RDD.partitioner",
        "org.apache.spark.rdd.RDD.partitions",

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0f2e3c6e/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
----------------------------------------------------------------------
diff --cc yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
index d222f41,25da9aa..4beb522
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
@@@ -209,9 -209,9 +209,9 @@@ private[yarn] class YarnAllocationHandl
          else {
            // deallocate + allocate can result in reusing id's wrongly - so use a different
counter (workerIdCounter)
            val workerId = workerIdCounter.incrementAndGet().toString
 -          val driverUrl = "akka://spark@%s:%s/user/%s".format(
 +          val driverUrl = "akka.tcp://spark@%s:%s/user/%s".format(
              System.getProperty("spark.driver.host"), System.getProperty("spark.driver.port"),
-             StandaloneSchedulerBackend.ACTOR_NAME)
+             CoarseGrainedSchedulerBackend.ACTOR_NAME)
  
            logInfo("launching container on " + containerId + " host " + workerHostname)
            // just to be safe, simply remove it from pendingReleaseContainers. Should not
be there, but ..


Mime
View raw message