Return-Path: X-Original-To: apmail-spark-commits-archive@minotaur.apache.org Delivered-To: apmail-spark-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 66FC8176A6 for ; Fri, 31 Oct 2014 05:31:03 +0000 (UTC) Received: (qmail 91418 invoked by uid 500); 31 Oct 2014 05:31:03 -0000 Delivered-To: apmail-spark-commits-archive@spark.apache.org Received: (qmail 91386 invoked by uid 500); 31 Oct 2014 05:31:03 -0000 Mailing-List: contact commits-help@spark.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list commits@spark.apache.org Received: (qmail 91377 invoked by uid 99); 31 Oct 2014 05:31:03 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 31 Oct 2014 05:31:03 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 003D8A07087; Fri, 31 Oct 2014 05:31:02 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: meng@apache.org To: commits@spark.apache.org Message-Id: <01fb9e727883441a92ab911fa97cd40f@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: git commit: [SPARK-3250] Implement Gap Sampling optimization for random sampling Date: Fri, 31 Oct 2014 05:31:02 +0000 (UTC) Repository: spark Updated Branches: refs/heads/master 872fc669b -> ad3bd0dff [SPARK-3250] Implement Gap Sampling optimization for random sampling More efficient sampling, based on Gap Sampling optimization: http://erikerlandson.github.io/blog/2014/09/11/faster-random-samples-with-gap-sampling/ Author: Erik Erlandson Closes #2455 from erikerlandson/spark-3250-pr and squashes the following commits: 72496bc [Erik Erlandson] [SPARK-3250] Implement Gap Sampling optimization for random sampling Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ad3bd0df Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ad3bd0df Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ad3bd0df Branch: refs/heads/master Commit: ad3bd0dff8997861c5a04438145ba6f91c57a849 Parents: 872fc66 Author: Erik Erlandson Authored: Thu Oct 30 22:30:52 2014 -0700 Committer: Xiangrui Meng Committed: Thu Oct 30 22:30:52 2014 -0700 ---------------------------------------------------------------------- .../main/scala/org/apache/spark/rdd/RDD.scala | 6 +- .../spark/util/random/RandomSampler.scala | 286 ++++++++- .../java/org/apache/spark/JavaAPISuite.java | 9 +- .../spark/util/random/RandomSamplerSuite.scala | 606 ++++++++++++++++--- .../org/apache/spark/mllib/util/MLUtils.scala | 4 +- 5 files changed, 790 insertions(+), 121 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/ad3bd0df/core/src/main/scala/org/apache/spark/rdd/RDD.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index b7f125d..c169b2d 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -43,7 +43,8 @@ import org.apache.spark.partial.PartialResult import org.apache.spark.storage.StorageLevel import org.apache.spark.util.{BoundedPriorityQueue, Utils, CallSite} import org.apache.spark.util.collection.OpenHashMap -import org.apache.spark.util.random.{BernoulliSampler, PoissonSampler, SamplingUtils} +import org.apache.spark.util.random.{BernoulliSampler, PoissonSampler, BernoulliCellSampler, + SamplingUtils} /** * A Resilient Distributed Dataset (RDD), the basic abstraction in Spark. Represents an immutable, @@ -375,7 +376,8 @@ abstract class RDD[T: ClassTag]( val sum = weights.sum val normalizedCumWeights = weights.map(_ / sum).scanLeft(0.0d)(_ + _) normalizedCumWeights.sliding(2).map { x => - new PartitionwiseSampledRDD[T, T](this, new BernoulliSampler[T](x(0), x(1)), true, seed) + new PartitionwiseSampledRDD[T, T]( + this, new BernoulliCellSampler[T](x(0), x(1)), true, seed) }.toArray } http://git-wip-us.apache.org/repos/asf/spark/blob/ad3bd0df/core/src/main/scala/org/apache/spark/util/random/RandomSampler.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/util/random/RandomSampler.scala b/core/src/main/scala/org/apache/spark/util/random/RandomSampler.scala index ee389de..76e7a27 100644 --- a/core/src/main/scala/org/apache/spark/util/random/RandomSampler.scala +++ b/core/src/main/scala/org/apache/spark/util/random/RandomSampler.scala @@ -19,6 +19,9 @@ package org.apache.spark.util.random import java.util.Random +import scala.reflect.ClassTag +import scala.collection.mutable.ArrayBuffer + import org.apache.commons.math3.distribution.PoissonDistribution import org.apache.spark.annotation.DeveloperApi @@ -38,13 +41,47 @@ trait RandomSampler[T, U] extends Pseudorandom with Cloneable with Serializable /** take a random sample */ def sample(items: Iterator[T]): Iterator[U] + /** return a copy of the RandomSampler object */ override def clone: RandomSampler[T, U] = throw new NotImplementedError("clone() is not implemented.") } +private[spark] +object RandomSampler { + /** Default random number generator used by random samplers. */ + def newDefaultRNG: Random = new XORShiftRandom + + /** + * Default maximum gap-sampling fraction. + * For sampling fractions <= this value, the gap sampling optimization will be applied. + * Above this value, it is assumed that "tradtional" Bernoulli sampling is faster. The + * optimal value for this will depend on the RNG. More expensive RNGs will tend to make + * the optimal value higher. The most reliable way to determine this value for a new RNG + * is to experiment. When tuning for a new RNG, I would expect a value of 0.5 to be close + * in most cases, as an initial guess. + */ + val defaultMaxGapSamplingFraction = 0.4 + + /** + * Default epsilon for floating point numbers sampled from the RNG. + * The gap-sampling compute logic requires taking log(x), where x is sampled from an RNG. + * To guard against errors from taking log(0), a positive epsilon lower bound is applied. + * A good value for this parameter is at or near the minimum positive floating + * point value returned by "nextDouble()" (or equivalent), for the RNG being used. + */ + val rngEpsilon = 5e-11 + + /** + * Sampling fraction arguments may be results of computation, and subject to floating + * point jitter. I check the arguments with this epsilon slop factor to prevent spurious + * warnings for cases such as summing some numbers to get a sampling fraction of 1.000000001 + */ + val roundingEpsilon = 1e-6 +} + /** * :: DeveloperApi :: - * A sampler based on Bernoulli trials. + * A sampler based on Bernoulli trials for partitioning a data sequence. * * @param lb lower bound of the acceptance range * @param ub upper bound of the acceptance range @@ -52,57 +89,262 @@ trait RandomSampler[T, U] extends Pseudorandom with Cloneable with Serializable * @tparam T item type */ @DeveloperApi -class BernoulliSampler[T](lb: Double, ub: Double, complement: Boolean = false) +class BernoulliCellSampler[T](lb: Double, ub: Double, complement: Boolean = false) extends RandomSampler[T, T] { - private[random] var rng: Random = new XORShiftRandom + /** epsilon slop to avoid failure from floating point jitter. */ + require( + lb <= (ub + RandomSampler.roundingEpsilon), + s"Lower bound ($lb) must be <= upper bound ($ub)") + require( + lb >= (0.0 - RandomSampler.roundingEpsilon), + s"Lower bound ($lb) must be >= 0.0") + require( + ub <= (1.0 + RandomSampler.roundingEpsilon), + s"Upper bound ($ub) must be <= 1.0") - def this(ratio: Double) = this(0.0d, ratio) + private val rng: Random = new XORShiftRandom override def setSeed(seed: Long) = rng.setSeed(seed) override def sample(items: Iterator[T]): Iterator[T] = { - items.filter { item => - val x = rng.nextDouble() - (x >= lb && x < ub) ^ complement + if (ub - lb <= 0.0) { + if (complement) items else Iterator.empty + } else { + if (complement) { + items.filter { item => { + val x = rng.nextDouble() + (x < lb) || (x >= ub) + }} + } else { + items.filter { item => { + val x = rng.nextDouble() + (x >= lb) && (x < ub) + }} + } } } /** * Return a sampler that is the complement of the range specified of the current sampler. */ - def cloneComplement(): BernoulliSampler[T] = new BernoulliSampler[T](lb, ub, !complement) + def cloneComplement(): BernoulliCellSampler[T] = + new BernoulliCellSampler[T](lb, ub, !complement) + + override def clone = new BernoulliCellSampler[T](lb, ub, complement) +} + + +/** + * :: DeveloperApi :: + * A sampler based on Bernoulli trials. + * + * @param fraction the sampling fraction, aka Bernoulli sampling probability + * @tparam T item type + */ +@DeveloperApi +class BernoulliSampler[T: ClassTag](fraction: Double) extends RandomSampler[T, T] { + + /** epsilon slop to avoid failure from floating point jitter */ + require( + fraction >= (0.0 - RandomSampler.roundingEpsilon) + && fraction <= (1.0 + RandomSampler.roundingEpsilon), + s"Sampling fraction ($fraction) must be on interval [0, 1]") - override def clone = new BernoulliSampler[T](lb, ub, complement) + private val rng: Random = RandomSampler.newDefaultRNG + + override def setSeed(seed: Long) = rng.setSeed(seed) + + override def sample(items: Iterator[T]): Iterator[T] = { + if (fraction <= 0.0) { + Iterator.empty + } else if (fraction >= 1.0) { + items + } else if (fraction <= RandomSampler.defaultMaxGapSamplingFraction) { + new GapSamplingIterator(items, fraction, rng, RandomSampler.rngEpsilon) + } else { + items.filter { _ => rng.nextDouble() <= fraction } + } + } + + override def clone = new BernoulliSampler[T](fraction) } + /** * :: DeveloperApi :: - * A sampler based on values drawn from Poisson distribution. + * A sampler for sampling with replacement, based on values drawn from Poisson distribution. * - * @param mean Poisson mean + * @param fraction the sampling fraction (with replacement) * @tparam T item type */ @DeveloperApi -class PoissonSampler[T](mean: Double) extends RandomSampler[T, T] { +class PoissonSampler[T: ClassTag](fraction: Double) extends RandomSampler[T, T] { + + /** Epsilon slop to avoid failure from floating point jitter. */ + require( + fraction >= (0.0 - RandomSampler.roundingEpsilon), + s"Sampling fraction ($fraction) must be >= 0") - private[random] var rng = new PoissonDistribution(mean) + // PoissonDistribution throws an exception when fraction <= 0 + // If fraction is <= 0, Iterator.empty is used below, so we can use any placeholder value. + private val rng = new PoissonDistribution(if (fraction > 0.0) fraction else 1.0) + private val rngGap = RandomSampler.newDefaultRNG override def setSeed(seed: Long) { - rng = new PoissonDistribution(mean) rng.reseedRandomGenerator(seed) + rngGap.setSeed(seed) } override def sample(items: Iterator[T]): Iterator[T] = { - items.flatMap { item => - val count = rng.sample() - if (count == 0) { - Iterator.empty - } else { - Iterator.fill(count)(item) - } + if (fraction <= 0.0) { + Iterator.empty + } else if (fraction <= RandomSampler.defaultMaxGapSamplingFraction) { + new GapSamplingReplacementIterator(items, fraction, rngGap, RandomSampler.rngEpsilon) + } else { + items.flatMap { item => { + val count = rng.sample() + if (count == 0) Iterator.empty else Iterator.fill(count)(item) + }} + } + } + + override def clone = new PoissonSampler[T](fraction) +} + + +private[spark] +class GapSamplingIterator[T: ClassTag]( + var data: Iterator[T], + f: Double, + rng: Random = RandomSampler.newDefaultRNG, + epsilon: Double = RandomSampler.rngEpsilon) extends Iterator[T] { + + require(f > 0.0 && f < 1.0, s"Sampling fraction ($f) must reside on open interval (0, 1)") + require(epsilon > 0.0, s"epsilon ($epsilon) must be > 0") + + /** implement efficient linear-sequence drop until Scala includes fix for jira SI-8835. */ + private val iterDrop: Int => Unit = { + val arrayClass = Array.empty[T].iterator.getClass + val arrayBufferClass = ArrayBuffer.empty[T].iterator.getClass + data.getClass match { + case `arrayClass` => ((n: Int) => { data = data.drop(n) }) + case `arrayBufferClass` => ((n: Int) => { data = data.drop(n) }) + case _ => ((n: Int) => { + var j = 0 + while (j < n && data.hasNext) { + data.next() + j += 1 + } + }) + } + } + + override def hasNext: Boolean = data.hasNext + + override def next(): T = { + val r = data.next() + advance + r + } + + private val lnq = math.log1p(-f) + + /** skip elements that won't be sampled, according to geometric dist P(k) = (f)(1-f)^k. */ + private def advance: Unit = { + val u = math.max(rng.nextDouble(), epsilon) + val k = (math.log(u) / lnq).toInt + iterDrop(k) + } + + /** advance to first sample as part of object construction. */ + advance + // Attempting to invoke this closer to the top with other object initialization + // was causing it to break in strange ways, so I'm invoking it last, which seems to + // work reliably. +} + +private[spark] +class GapSamplingReplacementIterator[T: ClassTag]( + var data: Iterator[T], + f: Double, + rng: Random = RandomSampler.newDefaultRNG, + epsilon: Double = RandomSampler.rngEpsilon) extends Iterator[T] { + + require(f > 0.0, s"Sampling fraction ($f) must be > 0") + require(epsilon > 0.0, s"epsilon ($epsilon) must be > 0") + + /** implement efficient linear-sequence drop until scala includes fix for jira SI-8835. */ + private val iterDrop: Int => Unit = { + val arrayClass = Array.empty[T].iterator.getClass + val arrayBufferClass = ArrayBuffer.empty[T].iterator.getClass + data.getClass match { + case `arrayClass` => ((n: Int) => { data = data.drop(n) }) + case `arrayBufferClass` => ((n: Int) => { data = data.drop(n) }) + case _ => ((n: Int) => { + var j = 0 + while (j < n && data.hasNext) { + data.next() + j += 1 + } + }) + } + } + + /** current sampling value, and its replication factor, as we are sampling with replacement. */ + private var v: T = _ + private var rep: Int = 0 + + override def hasNext: Boolean = data.hasNext || rep > 0 + + override def next(): T = { + val r = v + rep -= 1 + if (rep <= 0) advance + r + } + + /** + * Skip elements with replication factor zero (i.e. elements that won't be sampled). + * Samples 'k' from geometric distribution P(k) = (1-q)(q)^k, where q = e^(-f), that is + * q is the probabililty of Poisson(0; f) + */ + private def advance: Unit = { + val u = math.max(rng.nextDouble(), epsilon) + val k = (math.log(u) / (-f)).toInt + iterDrop(k) + // set the value and replication factor for the next value + if (data.hasNext) { + v = data.next() + rep = poissonGE1 + } + } + + private val q = math.exp(-f) + + /** + * Sample from Poisson distribution, conditioned such that the sampled value is >= 1. + * This is an adaptation from the algorithm for Generating Poisson distributed random variables: + * http://en.wikipedia.org/wiki/Poisson_distribution + */ + private def poissonGE1: Int = { + // simulate that the standard poisson sampling + // gave us at least one iteration, for a sample of >= 1 + var pp = q + ((1.0 - q) * rng.nextDouble()) + var r = 1 + + // now continue with standard poisson sampling algorithm + pp *= rng.nextDouble() + while (pp > q) { + r += 1 + pp *= rng.nextDouble() } + r } - override def clone = new PoissonSampler[T](mean) + /** advance to first sample as part of object construction. */ + advance + // Attempting to invoke this closer to the top with other object initialization + // was causing it to break in strange ways, so I'm invoking it last, which seems to + // work reliably. } http://git-wip-us.apache.org/repos/asf/spark/blob/ad3bd0df/core/src/test/java/org/apache/spark/JavaAPISuite.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/spark/JavaAPISuite.java b/core/src/test/java/org/apache/spark/JavaAPISuite.java index 0172876..c21a4b3 100644 --- a/core/src/test/java/org/apache/spark/JavaAPISuite.java +++ b/core/src/test/java/org/apache/spark/JavaAPISuite.java @@ -140,11 +140,10 @@ public class JavaAPISuite implements Serializable { public void sample() { List ints = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); JavaRDD rdd = sc.parallelize(ints); - JavaRDD sample20 = rdd.sample(true, 0.2, 11); - // expected 2 but of course result varies randomly a bit - Assert.assertEquals(1, sample20.count()); - JavaRDD sample20NoReplacement = rdd.sample(false, 0.2, 11); - Assert.assertEquals(2, sample20NoReplacement.count()); + JavaRDD sample20 = rdd.sample(true, 0.2, 3); + Assert.assertEquals(2, sample20.count()); + JavaRDD sample20WithoutReplacement = rdd.sample(false, 0.2, 5); + Assert.assertEquals(2, sample20WithoutReplacement.count()); } @Test http://git-wip-us.apache.org/repos/asf/spark/blob/ad3bd0df/core/src/test/scala/org/apache/spark/util/random/RandomSamplerSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/util/random/RandomSamplerSuite.scala b/core/src/test/scala/org/apache/spark/util/random/RandomSamplerSuite.scala index ba67d76..20944b6 100644 --- a/core/src/test/scala/org/apache/spark/util/random/RandomSamplerSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/random/RandomSamplerSuite.scala @@ -18,97 +18,523 @@ package org.apache.spark.util.random import java.util.Random - +import scala.collection.mutable.ArrayBuffer import org.apache.commons.math3.distribution.PoissonDistribution -import org.scalatest.{BeforeAndAfter, FunSuite} -import org.scalatest.mock.EasyMockSugar - -class RandomSamplerSuite extends FunSuite with BeforeAndAfter with EasyMockSugar { - - val a = List(1, 2, 3, 4, 5, 6, 7, 8, 9) - - var random: Random = _ - var poisson: PoissonDistribution = _ - - before { - random = mock[Random] - poisson = mock[PoissonDistribution] - } - - test("BernoulliSamplerWithRange") { - expecting { - for(x <- Seq(0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9)) { - random.nextDouble().andReturn(x) - } - } - whenExecuting(random) { - val sampler = new BernoulliSampler[Int](0.25, 0.55) - sampler.rng = random - assert(sampler.sample(a.iterator).toList == List(3, 4, 5)) - } - } - - test("BernoulliSamplerWithRangeInverse") { - expecting { - for(x <- Seq(0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9)) { - random.nextDouble().andReturn(x) - } - } - whenExecuting(random) { - val sampler = new BernoulliSampler[Int](0.25, 0.55, true) - sampler.rng = random - assert(sampler.sample(a.iterator).toList === List(1, 2, 6, 7, 8, 9)) - } - } - - test("BernoulliSamplerWithRatio") { - expecting { - for(x <- Seq(0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9)) { - random.nextDouble().andReturn(x) - } - } - whenExecuting(random) { - val sampler = new BernoulliSampler[Int](0.35) - sampler.rng = random - assert(sampler.sample(a.iterator).toList == List(1, 2, 3)) - } - } - - test("BernoulliSamplerWithComplement") { - expecting { - for(x <- Seq(0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9)) { - random.nextDouble().andReturn(x) - } - } - whenExecuting(random) { - val sampler = new BernoulliSampler[Int](0.25, 0.55, true) - sampler.rng = random - assert(sampler.sample(a.iterator).toList == List(1, 2, 6, 7, 8, 9)) - } - } - - test("BernoulliSamplerSetSeed") { - expecting { - random.setSeed(10L) - } - whenExecuting(random) { - val sampler = new BernoulliSampler[Int](0.2) - sampler.rng = random - sampler.setSeed(10L) - } - } - - test("PoissonSampler") { - expecting { - for(x <- Seq(0, 1, 2, 0, 1, 1, 0, 0, 0)) { - poisson.sample().andReturn(x) - } - } - whenExecuting(poisson) { - val sampler = new PoissonSampler[Int](0.2) - sampler.rng = poisson - assert(sampler.sample(a.iterator).toList == List(2, 3, 3, 5, 6)) - } +import org.scalatest.{FunSuite, Matchers} + +class RandomSamplerSuite extends FunSuite with Matchers { + /** + * My statistical testing methodology is to run a Kolmogorov-Smirnov (KS) test + * between the random samplers and simple reference samplers (known to work correctly). + * The sampling gap sizes between chosen samples should show up as having the same + * distributions between test and reference, if things are working properly. That is, + * the KS test will fail to strongly reject the null hypothesis that the distributions of + * sampling gaps are the same. + * There are no actual KS tests implemented for scala (that I can find) - and so what I + * have done here is pre-compute "D" - the KS statistic - that corresponds to a "weak" + * p-value for a particular sample size. I can then test that my measured KS stats + * are less than D. Computing D-values is easy, and implemented below. + * + * I used the scipy 'kstwobign' distribution to pre-compute my D value: + * + * def ksdval(q=0.1, n=1000): + * en = np.sqrt(float(n) / 2.0) + * return stats.kstwobign.isf(float(q)) / (en + 0.12 + 0.11 / en) + * + * When comparing KS stats I take the median of a small number of independent test runs + * to compensate for the issue that any sampled statistic will show "false positive" with + * some probability. Even when two distributions are the same, they will register as + * different 10% of the time at a p-value of 0.1 + */ + + // This D value is the precomputed KS statistic for p-value 0.1, sample size 1000: + val sampleSize = 1000 + val D = 0.0544280747619 + + // I'm not a big fan of fixing seeds, but unit testing based on running statistical tests + // will always fail with some nonzero probability, so I'll fix the seed to prevent these + // tests from generating random failure noise in CI testing, etc. + val rngSeed: Random = RandomSampler.newDefaultRNG + rngSeed.setSeed(235711) + + // Reference implementation of sampling without replacement (bernoulli) + def sample[T](data: Iterator[T], f: Double): Iterator[T] = { + val rng: Random = RandomSampler.newDefaultRNG + rng.setSeed(rngSeed.nextLong) + data.filter(_ => (rng.nextDouble <= f)) + } + + // Reference implementation of sampling with replacement + def sampleWR[T](data: Iterator[T], f: Double): Iterator[T] = { + val rng = new PoissonDistribution(f) + rng.reseedRandomGenerator(rngSeed.nextLong) + data.flatMap { v => { + val rep = rng.sample() + if (rep == 0) Iterator.empty else Iterator.fill(rep)(v) + }} + } + + // Returns iterator over gap lengths between samples. + // This function assumes input data is integers sampled from the sequence of + // increasing integers: {0, 1, 2, ...}. This works because that is how I generate them, + // and the samplers preserve their input order + def gaps(data: Iterator[Int]): Iterator[Int] = { + data.sliding(2).withPartial(false).map { x => x(1) - x(0) } + } + + // Returns the cumulative distribution from a histogram + def cumulativeDist(hist: Array[Int]): Array[Double] = { + val n = hist.sum.toDouble + assert(n > 0.0) + hist.scanLeft(0)(_ + _).drop(1).map { _.toDouble / n } + } + + // Returns aligned cumulative distributions from two arrays of data + def cumulants(d1: Array[Int], d2: Array[Int], + ss: Int = sampleSize): (Array[Double], Array[Double]) = { + assert(math.min(d1.length, d2.length) > 0) + assert(math.min(d1.min, d2.min) >= 0) + val m = 1 + math.max(d1.max, d2.max) + val h1 = Array.fill[Int](m)(0) + val h2 = Array.fill[Int](m)(0) + for (v <- d1) { h1(v) += 1 } + for (v <- d2) { h2(v) += 1 } + assert(h1.sum == h2.sum) + assert(h1.sum == ss) + (cumulativeDist(h1), cumulativeDist(h2)) + } + + // Computes the Kolmogorov-Smirnov 'D' statistic from two cumulative distributions + def KSD(cdf1: Array[Double], cdf2: Array[Double]): Double = { + assert(cdf1.length == cdf2.length) + val n = cdf1.length + assert(n > 0) + assert(cdf1(n-1) == 1.0) + assert(cdf2(n-1) == 1.0) + cdf1.zip(cdf2).map { x => Math.abs(x._1 - x._2) }.max + } + + // Returns the median KS 'D' statistic between two samples, over (m) sampling trials + def medianKSD(data1: => Iterator[Int], data2: => Iterator[Int], m: Int = 5): Double = { + val t = Array.fill[Double](m) { + val (c1, c2) = cumulants(data1.take(sampleSize).toArray, + data2.take(sampleSize).toArray) + KSD(c1, c2) + }.sorted + // return the median KS statistic + t(m / 2) + } + + test("utilities") { + val s1 = Array(0, 1, 1, 0, 2) + val s2 = Array(1, 0, 3, 2, 1) + val (c1, c2) = cumulants(s1, s2, ss = 5) + c1 should be (Array(0.4, 0.8, 1.0, 1.0)) + c2 should be (Array(0.2, 0.6, 0.8, 1.0)) + KSD(c1, c2) should be (0.2 +- 0.000001) + KSD(c2, c1) should be (KSD(c1, c2)) + gaps(List(0, 1, 1, 2, 4, 11).iterator).toArray should be (Array(1, 0, 1, 2, 7)) + } + + test("sanity check medianKSD against references") { + var d: Double = 0.0 + + // should be statistically same, i.e. fail to reject null hypothesis strongly + d = medianKSD(gaps(sample(Iterator.from(0), 0.5)), gaps(sample(Iterator.from(0), 0.5))) + d should be < D + + // should be statistically different - null hypothesis will have high D value, + // corresponding to low p-value that rejects the null hypothesis + d = medianKSD(gaps(sample(Iterator.from(0), 0.4)), gaps(sample(Iterator.from(0), 0.5))) + d should be > D + + // same! + d = medianKSD(gaps(sampleWR(Iterator.from(0), 0.5)), gaps(sampleWR(Iterator.from(0), 0.5))) + d should be < D + + // different! + d = medianKSD(gaps(sampleWR(Iterator.from(0), 0.5)), gaps(sampleWR(Iterator.from(0), 0.6))) + d should be > D + } + + test("bernoulli sampling") { + // Tests expect maximum gap sampling fraction to be this value + RandomSampler.defaultMaxGapSamplingFraction should be (0.4) + + var d: Double = 0.0 + + var sampler: RandomSampler[Int, Int] = new BernoulliSampler[Int](0.5) + sampler.setSeed(rngSeed.nextLong) + d = medianKSD(gaps(sampler.sample(Iterator.from(0))), gaps(sample(Iterator.from(0), 0.5))) + d should be < D + + sampler = new BernoulliSampler[Int](0.7) + sampler.setSeed(rngSeed.nextLong) + d = medianKSD(gaps(sampler.sample(Iterator.from(0))), gaps(sample(Iterator.from(0), 0.7))) + d should be < D + + sampler = new BernoulliSampler[Int](0.9) + sampler.setSeed(rngSeed.nextLong) + d = medianKSD(gaps(sampler.sample(Iterator.from(0))), gaps(sample(Iterator.from(0), 0.9))) + d should be < D + + // sampling at different frequencies should show up as statistically different: + sampler = new BernoulliSampler[Int](0.5) + sampler.setSeed(rngSeed.nextLong) + d = medianKSD(gaps(sampler.sample(Iterator.from(0))), gaps(sample(Iterator.from(0), 0.6))) + d should be > D + } + + test("bernoulli sampling with gap sampling optimization") { + // Tests expect maximum gap sampling fraction to be this value + RandomSampler.defaultMaxGapSamplingFraction should be (0.4) + + var d: Double = 0.0 + + var sampler: RandomSampler[Int, Int] = new BernoulliSampler[Int](0.01) + sampler.setSeed(rngSeed.nextLong) + d = medianKSD(gaps(sampler.sample(Iterator.from(0))), gaps(sample(Iterator.from(0), 0.01))) + d should be < D + + sampler = new BernoulliSampler[Int](0.1) + sampler.setSeed(rngSeed.nextLong) + d = medianKSD(gaps(sampler.sample(Iterator.from(0))), gaps(sample(Iterator.from(0), 0.1))) + d should be < D + + sampler = new BernoulliSampler[Int](0.3) + sampler.setSeed(rngSeed.nextLong) + d = medianKSD(gaps(sampler.sample(Iterator.from(0))), gaps(sample(Iterator.from(0), 0.3))) + d should be < D + + // sampling at different frequencies should show up as statistically different: + sampler = new BernoulliSampler[Int](0.3) + sampler.setSeed(rngSeed.nextLong) + d = medianKSD(gaps(sampler.sample(Iterator.from(0))), gaps(sample(Iterator.from(0), 0.4))) + d should be > D + } + + test("bernoulli boundary cases") { + val data = (1 to 100).toArray + + var sampler = new BernoulliSampler[Int](0.0) + sampler.sample(data.iterator).toArray should be (Array.empty[Int]) + + sampler = new BernoulliSampler[Int](1.0) + sampler.sample(data.iterator).toArray should be (data) + + sampler = new BernoulliSampler[Int](0.0 - (RandomSampler.roundingEpsilon / 2.0)) + sampler.sample(data.iterator).toArray should be (Array.empty[Int]) + + sampler = new BernoulliSampler[Int](1.0 + (RandomSampler.roundingEpsilon / 2.0)) + sampler.sample(data.iterator).toArray should be (data) + } + + test("bernoulli data types") { + // Tests expect maximum gap sampling fraction to be this value + RandomSampler.defaultMaxGapSamplingFraction should be (0.4) + + var d: Double = 0.0 + var sampler = new BernoulliSampler[Int](0.1) + sampler.setSeed(rngSeed.nextLong) + + // Array iterator (indexable type) + d = medianKSD( + gaps(sampler.sample(Iterator.from(0).take(20*sampleSize).toArray.iterator)), + gaps(sample(Iterator.from(0), 0.1))) + d should be < D + + // ArrayBuffer iterator (indexable type) + d = medianKSD( + gaps(sampler.sample(Iterator.from(0).take(20*sampleSize).to[ArrayBuffer].iterator)), + gaps(sample(Iterator.from(0), 0.1))) + d should be < D + + // List iterator (non-indexable type) + d = medianKSD( + gaps(sampler.sample(Iterator.from(0).take(20*sampleSize).toList.iterator)), + gaps(sample(Iterator.from(0), 0.1))) + d should be < D + } + + test("bernoulli clone") { + // Tests expect maximum gap sampling fraction to be this value + RandomSampler.defaultMaxGapSamplingFraction should be (0.4) + + var d = 0.0 + var sampler = new BernoulliSampler[Int](0.1).clone + sampler.setSeed(rngSeed.nextLong) + d = medianKSD(gaps(sampler.sample(Iterator.from(0))), gaps(sample(Iterator.from(0), 0.1))) + d should be < D + + sampler = new BernoulliSampler[Int](0.9).clone + sampler.setSeed(rngSeed.nextLong) + d = medianKSD(gaps(sampler.sample(Iterator.from(0))), gaps(sample(Iterator.from(0), 0.9))) + d should be < D + } + + test("bernoulli set seed") { + RandomSampler.defaultMaxGapSamplingFraction should be (0.4) + + var d: Double = 0.0 + var sampler1 = new BernoulliSampler[Int](0.2) + var sampler2 = new BernoulliSampler[Int](0.2) + + // distributions should be identical if seeds are set same + sampler1.setSeed(73) + sampler2.setSeed(73) + d = medianKSD(gaps(sampler1.sample(Iterator.from(0))), gaps(sampler2.sample(Iterator.from(0)))) + d should be (0.0) + + // should be different for different seeds + sampler1.setSeed(73) + sampler2.setSeed(37) + d = medianKSD(gaps(sampler1.sample(Iterator.from(0))), gaps(sampler2.sample(Iterator.from(0)))) + d should be > 0.0 + d should be < D + + sampler1 = new BernoulliSampler[Int](0.8) + sampler2 = new BernoulliSampler[Int](0.8) + + // distributions should be identical if seeds are set same + sampler1.setSeed(73) + sampler2.setSeed(73) + d = medianKSD(gaps(sampler1.sample(Iterator.from(0))), gaps(sampler2.sample(Iterator.from(0)))) + d should be (0.0) + + // should be different for different seeds + sampler1.setSeed(73) + sampler2.setSeed(37) + d = medianKSD(gaps(sampler1.sample(Iterator.from(0))), gaps(sampler2.sample(Iterator.from(0)))) + d should be > 0.0 + d should be < D + } + + test("replacement sampling") { + // Tests expect maximum gap sampling fraction to be this value + RandomSampler.defaultMaxGapSamplingFraction should be (0.4) + + var d: Double = 0.0 + + var sampler = new PoissonSampler[Int](0.5) + sampler.setSeed(rngSeed.nextLong) + d = medianKSD(gaps(sampler.sample(Iterator.from(0))), gaps(sampleWR(Iterator.from(0), 0.5))) + d should be < D + + sampler = new PoissonSampler[Int](0.7) + sampler.setSeed(rngSeed.nextLong) + d = medianKSD(gaps(sampler.sample(Iterator.from(0))), gaps(sampleWR(Iterator.from(0), 0.7))) + d should be < D + + sampler = new PoissonSampler[Int](0.9) + sampler.setSeed(rngSeed.nextLong) + d = medianKSD(gaps(sampler.sample(Iterator.from(0))), gaps(sampleWR(Iterator.from(0), 0.9))) + d should be < D + + // sampling at different frequencies should show up as statistically different: + sampler = new PoissonSampler[Int](0.5) + sampler.setSeed(rngSeed.nextLong) + d = medianKSD(gaps(sampler.sample(Iterator.from(0))), gaps(sampleWR(Iterator.from(0), 0.6))) + d should be > D + } + + test("replacement sampling with gap sampling") { + // Tests expect maximum gap sampling fraction to be this value + RandomSampler.defaultMaxGapSamplingFraction should be (0.4) + + var d: Double = 0.0 + + var sampler = new PoissonSampler[Int](0.01) + sampler.setSeed(rngSeed.nextLong) + d = medianKSD(gaps(sampler.sample(Iterator.from(0))), gaps(sampleWR(Iterator.from(0), 0.01))) + d should be < D + + sampler = new PoissonSampler[Int](0.1) + sampler.setSeed(rngSeed.nextLong) + d = medianKSD(gaps(sampler.sample(Iterator.from(0))), gaps(sampleWR(Iterator.from(0), 0.1))) + d should be < D + + sampler = new PoissonSampler[Int](0.3) + sampler.setSeed(rngSeed.nextLong) + d = medianKSD(gaps(sampler.sample(Iterator.from(0))), gaps(sampleWR(Iterator.from(0), 0.3))) + d should be < D + + // sampling at different frequencies should show up as statistically different: + sampler = new PoissonSampler[Int](0.3) + sampler.setSeed(rngSeed.nextLong) + d = medianKSD(gaps(sampler.sample(Iterator.from(0))), gaps(sampleWR(Iterator.from(0), 0.4))) + d should be > D + } + + test("replacement boundary cases") { + val data = (1 to 100).toArray + + var sampler = new PoissonSampler[Int](0.0) + sampler.sample(data.iterator).toArray should be (Array.empty[Int]) + + sampler = new PoissonSampler[Int](0.0 - (RandomSampler.roundingEpsilon / 2.0)) + sampler.sample(data.iterator).toArray should be (Array.empty[Int]) + + // sampling with replacement has no upper bound on sampling fraction + sampler = new PoissonSampler[Int](2.0) + sampler.sample(data.iterator).length should be > (data.length) + } + + test("replacement data types") { + // Tests expect maximum gap sampling fraction to be this value + RandomSampler.defaultMaxGapSamplingFraction should be (0.4) + + var d: Double = 0.0 + var sampler = new PoissonSampler[Int](0.1) + sampler.setSeed(rngSeed.nextLong) + + // Array iterator (indexable type) + d = medianKSD( + gaps(sampler.sample(Iterator.from(0).take(20*sampleSize).toArray.iterator)), + gaps(sampleWR(Iterator.from(0), 0.1))) + d should be < D + + // ArrayBuffer iterator (indexable type) + d = medianKSD( + gaps(sampler.sample(Iterator.from(0).take(20*sampleSize).to[ArrayBuffer].iterator)), + gaps(sampleWR(Iterator.from(0), 0.1))) + d should be < D + + // List iterator (non-indexable type) + d = medianKSD( + gaps(sampler.sample(Iterator.from(0).take(20*sampleSize).toList.iterator)), + gaps(sampleWR(Iterator.from(0), 0.1))) + d should be < D + } + + test("replacement clone") { + // Tests expect maximum gap sampling fraction to be this value + RandomSampler.defaultMaxGapSamplingFraction should be (0.4) + + var d = 0.0 + var sampler = new PoissonSampler[Int](0.1).clone + sampler.setSeed(rngSeed.nextLong) + d = medianKSD(gaps(sampler.sample(Iterator.from(0))), gaps(sampleWR(Iterator.from(0), 0.1))) + d should be < D + + sampler = new PoissonSampler[Int](0.9).clone + sampler.setSeed(rngSeed.nextLong) + d = medianKSD(gaps(sampler.sample(Iterator.from(0))), gaps(sampleWR(Iterator.from(0), 0.9))) + d should be < D + } + + test("replacement set seed") { + RandomSampler.defaultMaxGapSamplingFraction should be (0.4) + + var d: Double = 0.0 + var sampler1 = new PoissonSampler[Int](0.2) + var sampler2 = new PoissonSampler[Int](0.2) + + // distributions should be identical if seeds are set same + sampler1.setSeed(73) + sampler2.setSeed(73) + d = medianKSD(gaps(sampler1.sample(Iterator.from(0))), gaps(sampler2.sample(Iterator.from(0)))) + d should be (0.0) + + // should be different for different seeds + sampler1.setSeed(73) + sampler2.setSeed(37) + d = medianKSD(gaps(sampler1.sample(Iterator.from(0))), gaps(sampler2.sample(Iterator.from(0)))) + d should be > 0.0 + d should be < D + + sampler1 = new PoissonSampler[Int](0.8) + sampler2 = new PoissonSampler[Int](0.8) + + // distributions should be identical if seeds are set same + sampler1.setSeed(73) + sampler2.setSeed(73) + d = medianKSD(gaps(sampler1.sample(Iterator.from(0))), gaps(sampler2.sample(Iterator.from(0)))) + d should be (0.0) + + // should be different for different seeds + sampler1.setSeed(73) + sampler2.setSeed(37) + d = medianKSD(gaps(sampler1.sample(Iterator.from(0))), gaps(sampler2.sample(Iterator.from(0)))) + d should be > 0.0 + d should be < D + } + + test("bernoulli partitioning sampling") { + var d: Double = 0.0 + + var sampler = new BernoulliCellSampler[Int](0.1, 0.2) + sampler.setSeed(rngSeed.nextLong) + d = medianKSD(gaps(sampler.sample(Iterator.from(0))), gaps(sample(Iterator.from(0), 0.1))) + d should be < D + + sampler = new BernoulliCellSampler[Int](0.1, 0.2, true) + sampler.setSeed(rngSeed.nextLong) + d = medianKSD(gaps(sampler.sample(Iterator.from(0))), gaps(sample(Iterator.from(0), 0.9))) + d should be < D + } + + test("bernoulli partitioning boundary cases") { + val data = (1 to 100).toArray + val d = RandomSampler.roundingEpsilon / 2.0 + + var sampler = new BernoulliCellSampler[Int](0.0, 0.0) + sampler.sample(data.iterator).toArray should be (Array.empty[Int]) + + sampler = new BernoulliCellSampler[Int](0.5, 0.5) + sampler.sample(data.iterator).toArray should be (Array.empty[Int]) + + sampler = new BernoulliCellSampler[Int](1.0, 1.0) + sampler.sample(data.iterator).toArray should be (Array.empty[Int]) + + sampler = new BernoulliCellSampler[Int](0.0, 1.0) + sampler.sample(data.iterator).toArray should be (data) + + sampler = new BernoulliCellSampler[Int](0.0 - d, 1.0 + d) + sampler.sample(data.iterator).toArray should be (data) + + sampler = new BernoulliCellSampler[Int](0.5, 0.5 - d) + sampler.sample(data.iterator).toArray should be (Array.empty[Int]) + } + + test("bernoulli partitioning data") { + val seed = rngSeed.nextLong + val data = (1 to 100).toArray + + var sampler = new BernoulliCellSampler[Int](0.4, 0.6) + sampler.setSeed(seed) + val s1 = sampler.sample(data.iterator).toArray + s1.length should be > 0 + + sampler = new BernoulliCellSampler[Int](0.4, 0.6, true) + sampler.setSeed(seed) + val s2 = sampler.sample(data.iterator).toArray + s2.length should be > 0 + + (s1 ++ s2).sorted should be (data) + + sampler = new BernoulliCellSampler[Int](0.5, 0.5) + sampler.sample(data.iterator).toArray should be (Array.empty[Int]) + + sampler = new BernoulliCellSampler[Int](0.5, 0.5, true) + sampler.sample(data.iterator).toArray should be (data) + } + + test("bernoulli partitioning clone") { + val seed = rngSeed.nextLong + val data = (1 to 100).toArray + val base = new BernoulliCellSampler[Int](0.35, 0.65) + + var sampler = base.clone + sampler.setSeed(seed) + val s1 = sampler.sample(data.iterator).toArray + s1.length should be > 0 + + sampler = base.cloneComplement + sampler.setSeed(seed) + val s2 = sampler.sample(data.iterator).toArray + s2.length should be > 0 + + (s1 ++ s2).sorted should be (data) } } http://git-wip-us.apache.org/repos/asf/spark/blob/ad3bd0df/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala ---------------------------------------------------------------------- diff --git a/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala b/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala index b88e08b..9353351 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala @@ -26,7 +26,7 @@ import org.apache.spark.annotation.Experimental import org.apache.spark.SparkContext import org.apache.spark.rdd.RDD import org.apache.spark.rdd.PartitionwiseSampledRDD -import org.apache.spark.util.random.BernoulliSampler +import org.apache.spark.util.random.BernoulliCellSampler import org.apache.spark.mllib.regression.LabeledPoint import org.apache.spark.mllib.linalg.{Vector, Vectors} import org.apache.spark.storage.StorageLevel @@ -244,7 +244,7 @@ object MLUtils { def kFold[T: ClassTag](rdd: RDD[T], numFolds: Int, seed: Int): Array[(RDD[T], RDD[T])] = { val numFoldsF = numFolds.toFloat (1 to numFolds).map { fold => - val sampler = new BernoulliSampler[T]((fold - 1) / numFoldsF, fold / numFoldsF, + val sampler = new BernoulliCellSampler[T]((fold - 1) / numFoldsF, fold / numFoldsF, complement = false) val validation = new PartitionwiseSampledRDD(rdd, sampler, true, seed) val training = new PartitionwiseSampledRDD(rdd, sampler.cloneComplement(), true, seed) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org For additional commands, e-mail: commits-help@spark.apache.org