spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From m...@apache.org
Subject git commit: [SPARK-2514] [mllib] Random RDD generator
Date Sun, 27 Jul 2014 23:16:42 GMT
Repository: spark
Updated Branches:
  refs/heads/master ecf30ee7e -> 81fcdd22c


[SPARK-2514] [mllib] Random RDD generator

Utilities for generating random RDDs.

RandomRDD and RandomVectorRDD are created instead of using `sc.parallelize(range:Range)` because
`Range` objects in Scala can only have `size <= Int.MaxValue`.

The object `RandomRDDGenerators` can be transformed into a generator class to reduce the number
of auxiliary methods for optional arguments.

Author: Doris Xin <doris.s.xin@gmail.com>

Closes #1520 from dorx/randomRDD and squashes the following commits:

01121ac [Doris Xin] reviewer comments
6bf27d8 [Doris Xin] Merge branch 'master' into randomRDD
a8ea92d [Doris Xin] Reviewer comments
063ea0b [Doris Xin] Merge branch 'master' into randomRDD
aec68eb [Doris Xin] newline
bc90234 [Doris Xin] units passed.
d56cacb [Doris Xin] impl with RandomRDD
92d6f1c [Doris Xin] solution for Cloneable
df5bcff [Doris Xin] Merge branch 'generator' into randomRDD
f46d928 [Doris Xin] WIP
49ed20d [Doris Xin] alternative poisson distribution generator
7cb0e40 [Doris Xin] fix for data inconsistency
8881444 [Doris Xin] RandomRDDGenerator: initial design


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/81fcdd22
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/81fcdd22
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/81fcdd22

Branch: refs/heads/master
Commit: 81fcdd22c8ef52889ed51b3ec5c2747708505fc2
Parents: ecf30ee
Author: Doris Xin <doris.s.xin@gmail.com>
Authored: Sun Jul 27 16:16:39 2014 -0700
Committer: Xiangrui Meng <meng@databricks.com>
Committed: Sun Jul 27 16:16:39 2014 -0700

----------------------------------------------------------------------
 .../mllib/random/DistributionGenerator.scala    | 101 ++++
 .../mllib/random/RandomRDDGenerators.scala      | 473 +++++++++++++++++++
 .../org/apache/spark/mllib/rdd/RandomRDD.scala  | 118 +++++
 .../random/DistributionGeneratorSuite.scala     |  90 ++++
 .../mllib/random/RandomRDDGeneratorsSuite.scala | 158 +++++++
 5 files changed, 940 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/81fcdd22/mllib/src/main/scala/org/apache/spark/mllib/random/DistributionGenerator.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/random/DistributionGenerator.scala
b/mllib/src/main/scala/org/apache/spark/mllib/random/DistributionGenerator.scala
new file mode 100644
index 0000000..7ecb409
--- /dev/null
+++ b/mllib/src/main/scala/org/apache/spark/mllib/random/DistributionGenerator.scala
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.random
+
+import cern.jet.random.Poisson
+import cern.jet.random.engine.DRand
+
+import org.apache.spark.annotation.Experimental
+import org.apache.spark.util.random.{XORShiftRandom, Pseudorandom}
+
+/**
+ * :: Experimental ::
+ * Trait for random number generators that generate i.i.d. values from a distribution.
+ */
+@Experimental
+trait DistributionGenerator extends Pseudorandom with Serializable {
+
+  /**
+   * Returns an i.i.d. sample as a Double from an underlying distribution.
+   */
+  def nextValue(): Double
+
+  /**
+   * Returns a copy of the DistributionGenerator with a new instance of the rng object used
in the
+   * class when applicable for non-locking concurrent usage.
+   */
+  def copy(): DistributionGenerator
+}
+
+/**
+ * :: Experimental ::
+ * Generates i.i.d. samples from U[0.0, 1.0]
+ */
+@Experimental
+class UniformGenerator extends DistributionGenerator {
+
+  // XORShiftRandom for better performance. Thread safety isn't necessary here.
+  private val random = new XORShiftRandom()
+
+  override def nextValue(): Double = {
+    random.nextDouble()
+  }
+
+  override def setSeed(seed: Long) = random.setSeed(seed)
+
+  override def copy(): UniformGenerator = new UniformGenerator()
+}
+
+/**
+ * :: Experimental ::
+ * Generates i.i.d. samples from the standard normal distribution.
+ */
+@Experimental
+class StandardNormalGenerator extends DistributionGenerator {
+
+  // XORShiftRandom for better performance. Thread safety isn't necessary here.
+  private val random = new XORShiftRandom()
+
+  override def nextValue(): Double = {
+      random.nextGaussian()
+  }
+
+  override def setSeed(seed: Long) = random.setSeed(seed)
+
+  override def copy(): StandardNormalGenerator = new StandardNormalGenerator()
+}
+
+/**
+ * :: Experimental ::
+ * Generates i.i.d. samples from the Poisson distribution with the given mean.
+ *
+ * @param mean mean for the Poisson distribution.
+ */
+@Experimental
+class PoissonGenerator(val mean: Double) extends DistributionGenerator {
+
+  private var rng = new Poisson(mean, new DRand)
+
+  override def nextValue(): Double = rng.nextDouble()
+
+  override def setSeed(seed: Long) {
+    rng = new Poisson(mean, new DRand(seed.toInt))
+  }
+
+  override def copy(): PoissonGenerator = new PoissonGenerator(mean)
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/81fcdd22/mllib/src/main/scala/org/apache/spark/mllib/random/RandomRDDGenerators.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/random/RandomRDDGenerators.scala
b/mllib/src/main/scala/org/apache/spark/mllib/random/RandomRDDGenerators.scala
new file mode 100644
index 0000000..d7ee2d3
--- /dev/null
+++ b/mllib/src/main/scala/org/apache/spark/mllib/random/RandomRDDGenerators.scala
@@ -0,0 +1,473 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.random
+
+import org.apache.spark.SparkContext
+import org.apache.spark.annotation.Experimental
+import org.apache.spark.mllib.linalg.Vector
+import org.apache.spark.mllib.rdd.{RandomVectorRDD, RandomRDD}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.util.Utils
+
+/**
+ * :: Experimental ::
+ * Generator methods for creating RDDs comprised of i.i.d samples from some distribution.
+ */
+@Experimental
+object RandomRDDGenerators {
+
+  /**
+   * :: Experimental ::
+   * Generates an RDD comprised of i.i.d samples from the uniform distribution on [0.0, 1.0].
+   *
+   * @param sc SparkContext used to create the RDD.
+   * @param size Size of the RDD.
+   * @param numPartitions Number of partitions in the RDD.
+   * @param seed Seed for the RNG that generates the seed for the generator in each partition.
+   * @return RDD[Double] comprised of i.i.d. samples ~ U[0.0, 1.0].
+   */
+  @Experimental
+  def uniformRDD(sc: SparkContext, size: Long, numPartitions: Int, seed: Long): RDD[Double]
= {
+    val uniform = new UniformGenerator()
+    randomRDD(sc, uniform,  size, numPartitions, seed)
+  }
+
+  /**
+   * :: Experimental ::
+   * Generates an RDD comprised of i.i.d samples from the uniform distribution on [0.0, 1.0].
+   *
+   * @param sc SparkContext used to create the RDD.
+   * @param size Size of the RDD.
+   * @param numPartitions Number of partitions in the RDD.
+   * @return RDD[Double] comprised of i.i.d. samples ~ U[0.0, 1.0].
+   */
+  @Experimental
+  def uniformRDD(sc: SparkContext, size: Long, numPartitions: Int): RDD[Double] = {
+    uniformRDD(sc, size, numPartitions, Utils.random.nextLong)
+  }
+
+  /**
+   * :: Experimental ::
+   * Generates an RDD comprised of i.i.d samples from the uniform distribution on [0.0, 1.0].
+   * sc.defaultParallelism used for the number of partitions in the RDD.
+   *
+   * @param sc SparkContext used to create the RDD.
+   * @param size Size of the RDD.
+   * @return RDD[Double] comprised of i.i.d. samples ~ U[0.0, 1.0].
+   */
+  @Experimental
+  def uniformRDD(sc: SparkContext, size: Long): RDD[Double] = {
+    uniformRDD(sc, size, sc.defaultParallelism, Utils.random.nextLong)
+  }
+
+  /**
+   * :: Experimental ::
+   * Generates an RDD comprised of i.i.d samples from the standard normal distribution.
+   *
+   * @param sc SparkContext used to create the RDD.
+   * @param size Size of the RDD.
+   * @param numPartitions Number of partitions in the RDD.
+   * @param seed Seed for the RNG that generates the seed for the generator in each partition.
+   * @return RDD[Double] comprised of i.i.d. samples ~ N(0.0, 1.0).
+   */
+  @Experimental
+  def normalRDD(sc: SparkContext, size: Long, numPartitions: Int, seed: Long): RDD[Double]
= {
+    val normal = new StandardNormalGenerator()
+    randomRDD(sc, normal, size, numPartitions, seed)
+  }
+
+  /**
+   * :: Experimental ::
+   * Generates an RDD comprised of i.i.d samples from the standard normal distribution.
+   *
+   * @param sc SparkContext used to create the RDD.
+   * @param size Size of the RDD.
+   * @param numPartitions Number of partitions in the RDD.
+   * @return RDD[Double] comprised of i.i.d. samples ~ N(0.0, 1.0).
+   */
+  @Experimental
+  def normalRDD(sc: SparkContext, size: Long, numPartitions: Int): RDD[Double] = {
+    normalRDD(sc, size, numPartitions, Utils.random.nextLong)
+  }
+
+  /**
+   * :: Experimental ::
+   * Generates an RDD comprised of i.i.d samples from the standard normal distribution.
+   * sc.defaultParallelism used for the number of partitions in the RDD.
+   *
+   * @param sc SparkContext used to create the RDD.
+   * @param size Size of the RDD.
+   * @return RDD[Double] comprised of i.i.d. samples ~ N(0.0, 1.0).
+   */
+  @Experimental
+  def normalRDD(sc: SparkContext, size: Long): RDD[Double] = {
+    normalRDD(sc, size, sc.defaultParallelism, Utils.random.nextLong)
+  }
+
+  /**
+   * :: Experimental ::
+   * Generates an RDD comprised of i.i.d samples from the Poisson distribution with the input
mean.
+   *
+   * @param sc SparkContext used to create the RDD.
+   * @param mean Mean, or lambda, for the Poisson distribution.
+   * @param size Size of the RDD.
+   * @param numPartitions Number of partitions in the RDD.
+   * @param seed Seed for the RNG that generates the seed for the generator in each partition.
+   * @return RDD[Double] comprised of i.i.d. samples ~ Pois(mean).
+   */
+  @Experimental
+  def poissonRDD(sc: SparkContext,
+      mean: Double,
+      size: Long,
+      numPartitions: Int,
+      seed: Long): RDD[Double] = {
+    val poisson = new PoissonGenerator(mean)
+    randomRDD(sc, poisson, size, numPartitions, seed)
+  }
+
+  /**
+   * :: Experimental ::
+   * Generates an RDD comprised of i.i.d samples from the Poisson distribution with the input
mean.
+   *
+   * @param sc SparkContext used to create the RDD.
+   * @param mean Mean, or lambda, for the Poisson distribution.
+   * @param size Size of the RDD.
+   * @param numPartitions Number of partitions in the RDD.
+   * @return RDD[Double] comprised of i.i.d. samples ~ Pois(mean).
+   */
+  @Experimental
+  def poissonRDD(sc: SparkContext, mean: Double, size: Long, numPartitions: Int): RDD[Double]
= {
+    poissonRDD(sc, mean, size, numPartitions, Utils.random.nextLong)
+  }
+
+  /**
+   * :: Experimental ::
+   * Generates an RDD comprised of i.i.d samples from the Poisson distribution with the input
mean.
+   * sc.defaultParallelism used for the number of partitions in the RDD.
+   *
+   * @param sc SparkContext used to create the RDD.
+   * @param mean Mean, or lambda, for the Poisson distribution.
+   * @param size Size of the RDD.
+   * @return RDD[Double] comprised of i.i.d. samples ~ Pois(mean).
+   */
+  @Experimental
+  def poissonRDD(sc: SparkContext, mean: Double, size: Long): RDD[Double] = {
+    poissonRDD(sc, mean, size, sc.defaultParallelism, Utils.random.nextLong)
+  }
+
+  /**
+   * :: Experimental ::
+   * Generates an RDD comprised of i.i.d samples produced by the input DistributionGenerator.
+   *
+   * @param sc SparkContext used to create the RDD.
+   * @param generator DistributionGenerator used to populate the RDD.
+   * @param size Size of the RDD.
+   * @param numPartitions Number of partitions in the RDD.
+   * @param seed Seed for the RNG that generates the seed for the generator in each partition.
+   * @return RDD[Double] comprised of i.i.d. samples produced by generator.
+   */
+  @Experimental
+  def randomRDD(sc: SparkContext,
+      generator: DistributionGenerator,
+      size: Long,
+      numPartitions: Int,
+      seed: Long): RDD[Double] = {
+    new RandomRDD(sc, size, numPartitions, generator, seed)
+  }
+
+  /**
+   * :: Experimental ::
+   * Generates an RDD comprised of i.i.d samples produced by the input DistributionGenerator.
+   *
+   * @param sc SparkContext used to create the RDD.
+   * @param generator DistributionGenerator used to populate the RDD.
+   * @param size Size of the RDD.
+   * @param numPartitions Number of partitions in the RDD.
+   * @return RDD[Double] comprised of i.i.d. samples produced by generator.
+   */
+  @Experimental
+  def randomRDD(sc: SparkContext,
+      generator: DistributionGenerator,
+      size: Long,
+      numPartitions: Int): RDD[Double] = {
+    randomRDD(sc, generator, size, numPartitions, Utils.random.nextLong)
+  }
+
+  /**
+   * :: Experimental ::
+   * Generates an RDD comprised of i.i.d samples produced by the input DistributionGenerator.
+   * sc.defaultParallelism used for the number of partitions in the RDD.
+   *
+   * @param sc SparkContext used to create the RDD.
+   * @param generator DistributionGenerator used to populate the RDD.
+   * @param size Size of the RDD.
+   * @return RDD[Double] comprised of i.i.d. samples produced by generator.
+   */
+  @Experimental
+  def randomRDD(sc: SparkContext,
+      generator: DistributionGenerator,
+      size: Long): RDD[Double] = {
+    randomRDD(sc, generator, size, sc.defaultParallelism, Utils.random.nextLong)
+  }
+
+  // TODO Generate RDD[Vector] from multivariate distributions.
+
+  /**
+   * :: Experimental ::
+   * Generates an RDD[Vector] with vectors containing i.i.d samples drawn from the
+   * uniform distribution on [0.0 1.0].
+   *
+   * @param sc SparkContext used to create the RDD.
+   * @param numRows Number of Vectors in the RDD.
+   * @param numCols Number of elements in each Vector.
+   * @param numPartitions Number of partitions in the RDD.
+   * @param seed Seed for the RNG that generates the seed for the generator in each partition.
+   * @return RDD[Vector] with vectors containing i.i.d samples ~ U[0.0, 1.0].
+   */
+  @Experimental
+  def uniformVectorRDD(sc: SparkContext,
+      numRows: Long,
+      numCols: Int,
+      numPartitions: Int,
+      seed: Long): RDD[Vector] = {
+    val uniform = new UniformGenerator()
+    randomVectorRDD(sc, uniform, numRows, numCols, numPartitions, seed)
+  }
+
+  /**
+   * :: Experimental ::
+   * Generates an RDD[Vector] with vectors containing i.i.d samples drawn from the
+   * uniform distribution on [0.0 1.0].
+   *
+   * @param sc SparkContext used to create the RDD.
+   * @param numRows Number of Vectors in the RDD.
+   * @param numCols Number of elements in each Vector.
+   * @param numPartitions Number of partitions in the RDD.
+   * @return RDD[Vector] with vectors containing i.i.d samples ~ U[0.0, 1.0].
+   */
+  @Experimental
+  def uniformVectorRDD(sc: SparkContext,
+      numRows: Long,
+      numCols: Int,
+      numPartitions: Int): RDD[Vector] = {
+    uniformVectorRDD(sc, numRows, numCols, numPartitions, Utils.random.nextLong)
+  }
+
+  /**
+   * :: Experimental ::
+   * Generates an RDD[Vector] with vectors containing i.i.d samples drawn from the
+   * uniform distribution on [0.0 1.0].
+   * sc.defaultParallelism used for the number of partitions in the RDD.
+   *
+   * @param sc SparkContext used to create the RDD.
+   * @param numRows Number of Vectors in the RDD.
+   * @param numCols Number of elements in each Vector.
+   * @return RDD[Vector] with vectors containing i.i.d samples ~ U[0.0, 1.0].
+   */
+  @Experimental
+  def uniformVectorRDD(sc: SparkContext, numRows: Long, numCols: Int): RDD[Vector] = {
+    uniformVectorRDD(sc, numRows, numCols, sc.defaultParallelism, Utils.random.nextLong)
+  }
+
+  /**
+   * :: Experimental ::
+   * Generates an RDD[Vector] with vectors containing i.i.d samples drawn from the
+   * standard normal distribution.
+   *
+   * @param sc SparkContext used to create the RDD.
+   * @param numRows Number of Vectors in the RDD.
+   * @param numCols Number of elements in each Vector.
+   * @param numPartitions Number of partitions in the RDD.
+   * @param seed Seed for the RNG that generates the seed for the generator in each partition.
+   * @return RDD[Vector] with vectors containing i.i.d samples ~ N(0.0, 1.0).
+   */
+  @Experimental
+  def normalVectorRDD(sc: SparkContext,
+      numRows: Long,
+      numCols: Int,
+      numPartitions: Int,
+      seed: Long): RDD[Vector] = {
+    val uniform = new StandardNormalGenerator()
+    randomVectorRDD(sc, uniform, numRows, numCols, numPartitions, seed)
+  }
+
+  /**
+   * :: Experimental ::
+   * Generates an RDD[Vector] with vectors containing i.i.d samples drawn from the
+   * standard normal distribution.
+   *
+   * @param sc SparkContext used to create the RDD.
+   * @param numRows Number of Vectors in the RDD.
+   * @param numCols Number of elements in each Vector.
+   * @param numPartitions Number of partitions in the RDD.
+   * @return RDD[Vector] with vectors containing i.i.d samples ~ N(0.0, 1.0).
+   */
+  @Experimental
+  def normalVectorRDD(sc: SparkContext,
+      numRows: Long,
+      numCols: Int,
+      numPartitions: Int): RDD[Vector] = {
+    normalVectorRDD(sc, numRows, numCols, numPartitions, Utils.random.nextLong)
+  }
+
+  /**
+   * :: Experimental ::
+   * Generates an RDD[Vector] with vectors containing i.i.d samples drawn from the
+   * standard normal distribution.
+   * sc.defaultParallelism used for the number of partitions in the RDD.
+   *
+   * @param sc SparkContext used to create the RDD.
+   * @param numRows Number of Vectors in the RDD.
+   * @param numCols Number of elements in each Vector.
+   * @return RDD[Vector] with vectors containing i.i.d samples ~ N(0.0, 1.0).
+   */
+  @Experimental
+  def normalVectorRDD(sc: SparkContext, numRows: Long, numCols: Int): RDD[Vector] = {
+    normalVectorRDD(sc, numRows, numCols, sc.defaultParallelism, Utils.random.nextLong)
+  }
+
+  /**
+   * :: Experimental ::
+   * Generates an RDD[Vector] with vectors containing i.i.d samples drawn from the
+   * Poisson distribution with the input mean.
+   *
+   * @param sc SparkContext used to create the RDD.
+   * @param mean Mean, or lambda, for the Poisson distribution.
+   * @param numRows Number of Vectors in the RDD.
+   * @param numCols Number of elements in each Vector.
+   * @param numPartitions Number of partitions in the RDD.
+   * @param seed Seed for the RNG that generates the seed for the generator in each partition.
+   * @return RDD[Vector] with vectors containing i.i.d samples ~ Pois(mean).
+   */
+  @Experimental
+  def poissonVectorRDD(sc: SparkContext,
+      mean: Double,
+      numRows: Long,
+      numCols: Int,
+      numPartitions: Int,
+      seed: Long): RDD[Vector] = {
+    val poisson = new PoissonGenerator(mean)
+    randomVectorRDD(sc, poisson, numRows, numCols, numPartitions, seed)
+  }
+
+  /**
+   * :: Experimental ::
+   * Generates an RDD[Vector] with vectors containing i.i.d samples drawn from the
+   * Poisson distribution with the input mean.
+   *
+   * @param sc SparkContext used to create the RDD.
+   * @param mean Mean, or lambda, for the Poisson distribution.
+   * @param numRows Number of Vectors in the RDD.
+   * @param numCols Number of elements in each Vector.
+   * @param numPartitions Number of partitions in the RDD.
+   * @return RDD[Vector] with vectors containing i.i.d samples ~ Pois(mean).
+   */
+  @Experimental
+  def poissonVectorRDD(sc: SparkContext,
+      mean: Double,
+      numRows: Long,
+      numCols: Int,
+      numPartitions: Int): RDD[Vector] = {
+    poissonVectorRDD(sc, mean, numRows, numCols, numPartitions, Utils.random.nextLong)
+  }
+
+  /**
+   * :: Experimental ::
+   * Generates an RDD[Vector] with vectors containing i.i.d samples drawn from the
+   * Poisson distribution with the input mean.
+   * sc.defaultParallelism used for the number of partitions in the RDD.
+   *
+   * @param sc SparkContext used to create the RDD.
+   * @param mean Mean, or lambda, for the Poisson distribution.
+   * @param numRows Number of Vectors in the RDD.
+   * @param numCols Number of elements in each Vector.
+   * @return RDD[Vector] with vectors containing i.i.d samples ~ Pois(mean).
+   */
+  @Experimental
+  def poissonVectorRDD(sc: SparkContext,
+      mean: Double,
+      numRows: Long,
+      numCols: Int): RDD[Vector] = {
+    poissonVectorRDD(sc, mean, numRows, numCols, sc.defaultParallelism, Utils.random.nextLong)
+  }
+
+  /**
+   * :: Experimental ::
+   * Generates an RDD[Vector] with vectors containing i.i.d samples produced by the
+   * input DistributionGenerator.
+   *
+   * @param sc SparkContext used to create the RDD.
+   * @param generator DistributionGenerator used to populate the RDD.
+   * @param numRows Number of Vectors in the RDD.
+   * @param numCols Number of elements in each Vector.
+   * @param numPartitions Number of partitions in the RDD.
+   * @param seed Seed for the RNG that generates the seed for the generator in each partition.
+   * @return RDD[Vector] with vectors containing i.i.d samples produced by generator.
+   */
+  @Experimental
+  def randomVectorRDD(sc: SparkContext,
+      generator: DistributionGenerator,
+      numRows: Long,
+      numCols: Int,
+      numPartitions: Int,
+      seed: Long): RDD[Vector] = {
+    new RandomVectorRDD(sc, numRows, numCols, numPartitions, generator, seed)
+  }
+
+  /**
+   * :: Experimental ::
+   * Generates an RDD[Vector] with vectors containing i.i.d samples produced by the
+   * input DistributionGenerator.
+   *
+   * @param sc SparkContext used to create the RDD.
+   * @param generator DistributionGenerator used to populate the RDD.
+   * @param numRows Number of Vectors in the RDD.
+   * @param numCols Number of elements in each Vector.
+   * @param numPartitions Number of partitions in the RDD.
+   * @return RDD[Vector] with vectors containing i.i.d samples produced by generator.
+   */
+  @Experimental
+  def randomVectorRDD(sc: SparkContext,
+      generator: DistributionGenerator,
+      numRows: Long,
+      numCols: Int,
+      numPartitions: Int): RDD[Vector] = {
+    randomVectorRDD(sc, generator, numRows, numCols, numPartitions, Utils.random.nextLong)
+  }
+
+  /**
+   * :: Experimental ::
+   * Generates an RDD[Vector] with vectors containing i.i.d samples produced by the
+   * input DistributionGenerator.
+   * sc.defaultParallelism used for the number of partitions in the RDD.
+   *
+   * @param sc SparkContext used to create the RDD.
+   * @param generator DistributionGenerator used to populate the RDD.
+   * @param numRows Number of Vectors in the RDD.
+   * @param numCols Number of elements in each Vector.
+   * @return RDD[Vector] with vectors containing i.i.d samples produced by generator.
+   */
+  @Experimental
+  def randomVectorRDD(sc: SparkContext,
+      generator: DistributionGenerator,
+      numRows: Long,
+      numCols: Int): RDD[Vector] = {
+    randomVectorRDD(sc, generator, numRows, numCols,
+      sc.defaultParallelism, Utils.random.nextLong)
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/81fcdd22/mllib/src/main/scala/org/apache/spark/mllib/rdd/RandomRDD.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/rdd/RandomRDD.scala b/mllib/src/main/scala/org/apache/spark/mllib/rdd/RandomRDD.scala
new file mode 100644
index 0000000..f13282d
--- /dev/null
+++ b/mllib/src/main/scala/org/apache/spark/mllib/rdd/RandomRDD.scala
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.rdd
+
+import org.apache.spark.{Partition, SparkContext, TaskContext}
+import org.apache.spark.mllib.linalg.{DenseVector, Vector}
+import org.apache.spark.mllib.random.DistributionGenerator
+import org.apache.spark.rdd.RDD
+import org.apache.spark.util.Utils
+
+import scala.util.Random
+
+private[mllib] class RandomRDDPartition(override val index: Int,
+    val size: Int,
+    val generator: DistributionGenerator,
+    val seed: Long) extends Partition {
+
+  require(size >= 0, "Non-negative partition size required.")
+}
+
+// These two classes are necessary since Range objects in Scala cannot have size > Int.MaxValue
+private[mllib] class RandomRDD(@transient sc: SparkContext,
+    size: Long,
+    numPartitions: Int,
+    @transient rng: DistributionGenerator,
+    @transient seed: Long = Utils.random.nextLong) extends RDD[Double](sc, Nil) {
+
+  require(size > 0, "Positive RDD size required.")
+  require(numPartitions > 0, "Positive number of partitions required")
+  require(math.ceil(size.toDouble / numPartitions) <= Int.MaxValue,
+    "Partition size cannot exceed Int.MaxValue")
+
+  override def compute(splitIn: Partition, context: TaskContext): Iterator[Double] = {
+    val split = splitIn.asInstanceOf[RandomRDDPartition]
+    RandomRDD.getPointIterator(split)
+  }
+
+  override def getPartitions: Array[Partition] = {
+    RandomRDD.getPartitions(size, numPartitions, rng, seed)
+  }
+}
+
+private[mllib] class RandomVectorRDD(@transient sc: SparkContext,
+    size: Long,
+    vectorSize: Int,
+    numPartitions: Int,
+    @transient rng: DistributionGenerator,
+    @transient seed: Long = Utils.random.nextLong) extends RDD[Vector](sc, Nil) {
+
+  require(size > 0, "Positive RDD size required.")
+  require(numPartitions > 0, "Positive number of partitions required")
+  require(vectorSize > 0, "Positive vector size required.")
+  require(math.ceil(size.toDouble / numPartitions) <= Int.MaxValue,
+    "Partition size cannot exceed Int.MaxValue")
+
+  override def compute(splitIn: Partition, context: TaskContext): Iterator[Vector] = {
+    val split = splitIn.asInstanceOf[RandomRDDPartition]
+    RandomRDD.getVectorIterator(split, vectorSize)
+  }
+
+  override protected def getPartitions: Array[Partition] = {
+    RandomRDD.getPartitions(size, numPartitions, rng, seed)
+  }
+}
+
+private[mllib] object RandomRDD {
+
+  def getPartitions(size: Long,
+      numPartitions: Int,
+      rng: DistributionGenerator,
+      seed: Long): Array[Partition] = {
+
+    val partitions = new Array[RandomRDDPartition](numPartitions)
+    var i = 0
+    var start: Long = 0
+    var end: Long = 0
+    val random = new Random(seed)
+    while (i < numPartitions) {
+      end = ((i + 1) * size) / numPartitions
+      partitions(i) = new RandomRDDPartition(i, (end - start).toInt, rng, random.nextLong())
+      start = end
+      i += 1
+    }
+    partitions.asInstanceOf[Array[Partition]]
+  }
+
+  // The RNG has to be reset every time the iterator is requested to guarantee same data
+  // every time the content of the RDD is examined.
+  def getPointIterator(partition: RandomRDDPartition): Iterator[Double] = {
+    val generator = partition.generator.copy()
+    generator.setSeed(partition.seed)
+    Array.fill(partition.size)(generator.nextValue()).toIterator
+  }
+
+  // The RNG has to be reset every time the iterator is requested to guarantee same data
+  // every time the content of the RDD is examined.
+  def getVectorIterator(partition: RandomRDDPartition, vectorSize: Int): Iterator[Vector]
= {
+    val generator = partition.generator.copy()
+    generator.setSeed(partition.seed)
+    Array.fill(partition.size)(new DenseVector(
+      (0 until vectorSize).map { _ => generator.nextValue() }.toArray)).toIterator
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/81fcdd22/mllib/src/test/scala/org/apache/spark/mllib/random/DistributionGeneratorSuite.scala
----------------------------------------------------------------------
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/random/DistributionGeneratorSuite.scala
b/mllib/src/test/scala/org/apache/spark/mllib/random/DistributionGeneratorSuite.scala
new file mode 100644
index 0000000..974dec4
--- /dev/null
+++ b/mllib/src/test/scala/org/apache/spark/mllib/random/DistributionGeneratorSuite.scala
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.random
+
+import org.scalatest.FunSuite
+
+import org.apache.spark.util.StatCounter
+
+// TODO update tests to use TestingUtils for floating point comparison after PR 1367 is merged
+class DistributionGeneratorSuite extends FunSuite {
+
+  def apiChecks(gen: DistributionGenerator) {
+
+    // resetting seed should generate the same sequence of random numbers
+    gen.setSeed(42L)
+    val array1 = (0 until 1000).map(_ => gen.nextValue())
+    gen.setSeed(42L)
+    val array2 = (0 until 1000).map(_ => gen.nextValue())
+    assert(array1.equals(array2))
+
+    // newInstance should contain a difference instance of the rng
+    // i.e. setting difference seeds for difference instances produces different sequences
of
+    // random numbers.
+    val gen2 = gen.copy()
+    gen.setSeed(0L)
+    val array3 = (0 until 1000).map(_ => gen.nextValue())
+    gen2.setSeed(1L)
+    val array4 = (0 until 1000).map(_ => gen2.nextValue())
+    // Compare arrays instead of elements since individual elements can coincide by chance
but the
+    // sequences should differ given two different seeds.
+    assert(!array3.equals(array4))
+
+    // test that setting the same seed in the copied instance produces the same sequence
of numbers
+    gen.setSeed(0L)
+    val array5 = (0 until 1000).map(_ => gen.nextValue())
+    gen2.setSeed(0L)
+    val array6 = (0 until 1000).map(_ => gen2.nextValue())
+    assert(array5.equals(array6))
+  }
+
+  def distributionChecks(gen: DistributionGenerator,
+      mean: Double = 0.0,
+      stddev: Double = 1.0,
+      epsilon: Double = 0.01) {
+    for (seed <- 0 until 5) {
+      gen.setSeed(seed.toLong)
+      val sample = (0 until 100000).map { _ => gen.nextValue()}
+      val stats = new StatCounter(sample)
+      assert(math.abs(stats.mean - mean) < epsilon)
+      assert(math.abs(stats.stdev - stddev) < epsilon)
+    }
+  }
+
+  test("UniformGenerator") {
+    val uniform = new UniformGenerator()
+    apiChecks(uniform)
+    // Stddev of uniform distribution = (ub - lb) / math.sqrt(12)
+    distributionChecks(uniform, 0.5, 1 / math.sqrt(12))
+  }
+
+  test("StandardNormalGenerator") {
+    val normal = new StandardNormalGenerator()
+    apiChecks(normal)
+    distributionChecks(normal, 0.0, 1.0)
+  }
+
+  test("PoissonGenerator") {
+    // mean = 0.0 will not pass the API checks since 0.0 is always deterministically produced.
+    for (mean <- List(1.0, 5.0, 100.0)) {
+      val poisson = new PoissonGenerator(mean)
+      apiChecks(poisson)
+      distributionChecks(poisson, mean, math.sqrt(mean), 0.1)
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/81fcdd22/mllib/src/test/scala/org/apache/spark/mllib/random/RandomRDDGeneratorsSuite.scala
----------------------------------------------------------------------
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/random/RandomRDDGeneratorsSuite.scala
b/mllib/src/test/scala/org/apache/spark/mllib/random/RandomRDDGeneratorsSuite.scala
new file mode 100644
index 0000000..6aa4f80
--- /dev/null
+++ b/mllib/src/test/scala/org/apache/spark/mllib/random/RandomRDDGeneratorsSuite.scala
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.random
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.scalatest.FunSuite
+
+import org.apache.spark.SparkContext._
+import org.apache.spark.mllib.linalg.Vector
+import org.apache.spark.mllib.rdd.{RandomRDDPartition, RandomRDD}
+import org.apache.spark.mllib.util.LocalSparkContext
+import org.apache.spark.rdd.RDD
+import org.apache.spark.util.StatCounter
+
+/*
+ * Note: avoid including APIs that do not set the seed for the RNG in unit tests
+ * in order to guarantee deterministic behavior.
+ *
+ * TODO update tests to use TestingUtils for floating point comparison after PR 1367 is merged
+ */
+class RandomRDDGeneratorsSuite extends FunSuite with LocalSparkContext with Serializable
{
+
+  def testGeneratedRDD(rdd: RDD[Double],
+      expectedSize: Long,
+      expectedNumPartitions: Int,
+      expectedMean: Double,
+      expectedStddev: Double,
+      epsilon: Double = 0.01) {
+    val stats = rdd.stats()
+    assert(expectedSize === stats.count)
+    assert(expectedNumPartitions === rdd.partitions.size)
+    assert(math.abs(stats.mean - expectedMean) < epsilon)
+    assert(math.abs(stats.stdev - expectedStddev) < epsilon)
+  }
+
+  // assume test RDDs are small
+  def testGeneratedVectorRDD(rdd: RDD[Vector],
+      expectedRows: Long,
+      expectedColumns: Int,
+      expectedNumPartitions: Int,
+      expectedMean: Double,
+      expectedStddev: Double,
+      epsilon: Double = 0.01) {
+    assert(expectedNumPartitions === rdd.partitions.size)
+    val values = new ArrayBuffer[Double]()
+    rdd.collect.foreach { vector => {
+      assert(vector.size === expectedColumns)
+      values ++= vector.toArray
+    }}
+    assert(expectedRows === values.size / expectedColumns)
+    val stats = new StatCounter(values)
+    assert(math.abs(stats.mean - expectedMean) < epsilon)
+    assert(math.abs(stats.stdev - expectedStddev) < epsilon)
+  }
+
+  test("RandomRDD sizes") {
+
+    // some cases where size % numParts != 0 to test getPartitions behaves correctly
+    for ((size, numPartitions) <- List((10000, 6), (12345, 1), (1000, 101))) {
+      val rdd = new RandomRDD(sc, size, numPartitions, new UniformGenerator, 0L)
+      assert(rdd.count() === size)
+      assert(rdd.partitions.size === numPartitions)
+
+      // check that partition sizes are balanced
+      val partSizes = rdd.partitions.map(p => p.asInstanceOf[RandomRDDPartition].size.toDouble)
+      val partStats = new StatCounter(partSizes)
+      assert(partStats.max - partStats.min <= 1)
+    }
+
+    // size > Int.MaxValue
+    val size = Int.MaxValue.toLong * 100L
+    val numPartitions = 101
+    val rdd = new RandomRDD(sc, size, numPartitions, new UniformGenerator, 0L)
+    assert(rdd.partitions.size === numPartitions)
+    val count = rdd.partitions.foldLeft(0L) { (count, part) =>
+      count + part.asInstanceOf[RandomRDDPartition].size
+    }
+    assert(count === size)
+
+    // size needs to be positive
+    intercept[IllegalArgumentException] { new RandomRDD(sc, 0, 10, new UniformGenerator,
0L) }
+
+    // numPartitions needs to be positive
+    intercept[IllegalArgumentException] { new RandomRDD(sc, 100, 0, new UniformGenerator,
0L) }
+
+    // partition size needs to be <= Int.MaxValue
+    intercept[IllegalArgumentException] {
+      new RandomRDD(sc, Int.MaxValue.toLong * 100L, 99, new UniformGenerator, 0L)
+    }
+  }
+
+  test("randomRDD for different distributions") {
+    val size = 100000L
+    val numPartitions = 10
+    val poissonMean = 100.0
+
+    for (seed <- 0 until 5) {
+      val uniform = RandomRDDGenerators.uniformRDD(sc, size, numPartitions, seed)
+      testGeneratedRDD(uniform, size, numPartitions, 0.5, 1 / math.sqrt(12))
+
+      val normal = RandomRDDGenerators.normalRDD(sc, size, numPartitions, seed)
+      testGeneratedRDD(normal, size, numPartitions, 0.0, 1.0)
+
+      val poisson = RandomRDDGenerators.poissonRDD(sc, poissonMean, size, numPartitions,
seed)
+      testGeneratedRDD(poisson, size, numPartitions, poissonMean, math.sqrt(poissonMean),
0.1)
+    }
+
+    // mock distribution to check that partitions have unique seeds
+    val random = RandomRDDGenerators.randomRDD(sc, new MockDistro(), 1000L, 1000, 0L)
+    assert(random.collect.size === random.collect.distinct.size)
+  }
+
+  test("randomVectorRDD for different distributions") {
+    val rows = 1000L
+    val cols = 100
+    val parts = 10
+    val poissonMean = 100.0
+
+    for (seed <- 0 until 5) {
+      val uniform = RandomRDDGenerators.uniformVectorRDD(sc, rows, cols, parts, seed)
+      testGeneratedVectorRDD(uniform, rows, cols, parts, 0.5, 1 / math.sqrt(12))
+
+      val normal = RandomRDDGenerators.normalVectorRDD(sc, rows, cols, parts, seed)
+      testGeneratedVectorRDD(normal, rows, cols, parts, 0.0, 1.0)
+
+      val poisson = RandomRDDGenerators.poissonVectorRDD(sc, poissonMean, rows, cols, parts,
seed)
+      testGeneratedVectorRDD(poisson, rows, cols, parts, poissonMean, math.sqrt(poissonMean),
0.1)
+    }
+  }
+}
+
+private[random] class MockDistro extends DistributionGenerator {
+
+  var seed = 0L
+
+  // This allows us to check that each partition has a different seed
+  override def nextValue(): Double = seed.toDouble
+
+  override def setSeed(seed: Long) = this.seed = seed
+
+  override def copy(): MockDistro = new MockDistro
+}


Mime
View raw message