flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From fhue...@apache.org
Subject [3/4] flink git commit: [FLINK-2533] [java-api] Gap based random sample optimization.
Date Tue, 15 Sep 2015 12:50:37 GMT
[FLINK-2533] [java-api] Gap based random sample optimization.

This closes #1110


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c923fb3c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c923fb3c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c923fb3c

Branch: refs/heads/master
Commit: c923fb3c1c1d61462e1079198ae9fb735bb0acf2
Parents: a75dd62
Author: gallenvara <gallenvara@126.com>
Authored: Mon Sep 7 14:55:11 2015 +0800
Committer: Fabian Hueske <fhueske@apache.org>
Committed: Tue Sep 15 12:20:41 2015 +0200

----------------------------------------------------------------------
 .../api/java/sampling/BernoulliSampler.java     | 36 ++++++++--
 .../flink/api/java/sampling/PoissonSampler.java | 74 +++++++++++++++-----
 .../flink/api/java/sampling/RandomSampler.java  |  2 +
 3 files changed, 87 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/c923fb3c/flink-java/src/main/java/org/apache/flink/api/java/sampling/BernoulliSampler.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/sampling/BernoulliSampler.java
b/flink-java/src/main/java/org/apache/flink/api/java/sampling/BernoulliSampler.java
index 0f5ecc6..99ea5de 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/sampling/BernoulliSampler.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/sampling/BernoulliSampler.java
@@ -28,12 +28,16 @@ import java.util.Random;
  * Bernoulli experiment.
  *
  * @param <T> The type of sample.
+ * @see <a href="http://erikerlandson.github.io/blog/2014/09/11/faster-random-samples-with-gap-sampling/">Gap
Sampling</a>
  */
 public class BernoulliSampler<T> extends RandomSampler<T> {
 	
 	private final double fraction;
 	private final Random random;
 	
+	// THRESHOLD is a tuning parameter for choosing sampling method according to the fraction.
+	private final static double THRESHOLD = 0.33;
+	
 	/**
 	 * Create a Bernoulli sampler with sample fraction and default random number generator.
 	 *
@@ -102,15 +106,35 @@ public class BernoulliSampler<T> extends RandomSampler<T>
{
 			}
 
 			private T getNextSampledElement() {
-				while (input.hasNext()) {
-					T element = input.next();
+				if (fraction <= THRESHOLD) {
+					double rand = random.nextDouble();
+					double u = Math.max(rand, EPSILON);
+					int gap = (int) (Math.log(u) / Math.log(1 - fraction));
+					int elementCount = 0;
+					if (input.hasNext()) {
+						T element = input.next();
+						while (input.hasNext() && elementCount < gap) {
+							element = input.next();
+							elementCount++;
+						}
+						if (elementCount < gap) {
+							return null;
+						} else {
+							return element;
+						}
+					} else {
+						return null;
+					}
+				} else {
+					while (input.hasNext()) {
+						T element = input.next();
 
-					if (random.nextDouble() <= fraction) {
-						return element;
+						if (random.nextDouble() <= fraction) {
+							return element;
+						}
 					}
+					return null;
 				}
-
-				return null;
 			}
 		};
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/c923fb3c/flink-java/src/main/java/org/apache/flink/api/java/sampling/PoissonSampler.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/sampling/PoissonSampler.java
b/flink-java/src/main/java/org/apache/flink/api/java/sampling/PoissonSampler.java
index 3834d24..8701167 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/sampling/PoissonSampler.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/sampling/PoissonSampler.java
@@ -21,6 +21,7 @@ import com.google.common.base.Preconditions;
 import org.apache.commons.math3.distribution.PoissonDistribution;
 
 import java.util.Iterator;
+import java.util.Random;
 
 /**
  * A sampler implementation based on the Poisson Distribution. While sampling elements with
fraction
@@ -28,11 +29,16 @@ import java.util.Iterator;
  *
  * @param <T> The type of sample.
  * @see <a href="https://en.wikipedia.org/wiki/Poisson_distribution">https://en.wikipedia.org/wiki/Poisson_distribution</a>
+ * @see <a href="http://erikerlandson.github.io/blog/2014/09/11/faster-random-samples-with-gap-sampling/">Gap
Sampling</a>
  */
 public class PoissonSampler<T> extends RandomSampler<T> {
 	
 	private PoissonDistribution poissonDistribution;
 	private final double fraction;
+	private final Random random;
+	
+	// THRESHOLD is a tuning parameter for choosing sampling method according to the fraction.
+	private final static double THRESHOLD = 0.4;
 	
 	/**
 	 * Create a poisson sampler which can sample elements with replacement.
@@ -47,6 +53,7 @@ public class PoissonSampler<T> extends RandomSampler<T> {
 			this.poissonDistribution = new PoissonDistribution(fraction);
 			this.poissonDistribution.reseedRandomGenerator(seed);
 		}
+		this.random = new Random(seed);
 	}
 	
 	/**
@@ -60,6 +67,7 @@ public class PoissonSampler<T> extends RandomSampler<T> {
 		if (this.fraction > 0) {
 			this.poissonDistribution = new PoissonDistribution(fraction);
 		}
+		this.random = new Random();
 	}
 	
 	/**
@@ -84,8 +92,7 @@ public class PoissonSampler<T> extends RandomSampler<T> {
 				if (currentCount > 0) {
 					return true;
 				} else {
-					moveToNextElement();
-
+					samplingProcess();
 					if (currentCount > 0) {
 						return true;
 					} else {
@@ -93,28 +100,57 @@ public class PoissonSampler<T> extends RandomSampler<T> {
 					}
 				}
 			}
-
-			private void moveToNextElement() {
-				while (input.hasNext()) {
-					currentElement = input.next();
-					currentCount = poissonDistribution.sample();
-					if (currentCount > 0) {
-						break;
-					}
-				}
-			}
 			
 			@Override
 			public T next() {
-				if (currentCount == 0) {
-					moveToNextElement();
+				if (currentCount <= 0) {
+					samplingProcess();
 				}
-
-				if (currentCount == 0) {
-					return null;
+				currentCount--;
+				return currentElement;
+			}
+			
+			public int poisson_ge1(double p){
+				// sample 'k' from Poisson(p), conditioned to k >= 1.
+				double q = Math.pow(Math.E, -p);
+				// simulate a poisson trial such that k >= 1.
+				double t = q + (1 - q) * random.nextDouble();
+				int k = 1;
+				// continue standard poisson generation trials.
+				t = t * random.nextDouble();
+				while (t > q) {
+					k++;
+					t = t * random.nextDouble();
+				}
+				return k;
+			}
+			
+			private void skipGapElements(int num) {
+				// skip the elements that occurrence number is zero.
+				int elementCount = 0;
+				while (input.hasNext() && elementCount < num){
+					currentElement = input.next();
+					elementCount++;
+				}
+			}
+			
+			private void samplingProcess(){
+				if (fraction <= THRESHOLD) {
+					double u = Math.max(random.nextDouble(), EPSILON);
+					int gap = (int) (Math.log(u) / -fraction);
+					skipGapElements(gap);
+					if (input.hasNext()) {
+						currentElement = input.next();
+						currentCount = poisson_ge1(fraction);
+					}
 				} else {
-					currentCount--;
-					return currentElement;
+					while (input.hasNext()){
+						currentElement = input.next();
+						currentCount = poissonDistribution.sample();
+						if (currentCount > 0) {
+							break;
+						}
+					}
 				}
 			}
 		};

http://git-wip-us.apache.org/repos/asf/flink/blob/c923fb3c/flink-java/src/main/java/org/apache/flink/api/java/sampling/RandomSampler.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/sampling/RandomSampler.java
b/flink-java/src/main/java/org/apache/flink/api/java/sampling/RandomSampler.java
index 5fe2920..7d74897 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/sampling/RandomSampler.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/sampling/RandomSampler.java
@@ -26,6 +26,8 @@ import java.util.Iterator;
  * @param <T> The type of sampler data.
  */
 public abstract class RandomSampler<T> {
+
+	protected final static double EPSILON = 1e-5;
 	
 	protected final Iterator<T> EMPTY_ITERABLE = new SampledIterator<T>() {
 		@Override


Mime
View raw message