flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From fhue...@apache.org
Subject [1/6] flink git commit: [FLINK-2743] Add XORShfitRandom and use it in RandomSamplers.
Date Wed, 21 Oct 2015 12:13:30 GMT
Repository: flink
Updated Branches:
  refs/heads/master 22510f0e2 -> 4c1cffd9d


[FLINK-2743] Add XORShfitRandom and use it in RandomSamplers.

This closes #1170


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5fb1c479
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5fb1c479
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5fb1c479

Branch: refs/heads/master
Commit: 5fb1c479f5e3f00917ebc2c623126b8966e46315
Parents: 22510f0
Author: chengxiang li <chengxiang.li@intel.com>
Authored: Wed Sep 23 18:08:06 2015 +0800
Committer: Fabian Hueske <fhueske@apache.org>
Committed: Wed Oct 21 11:43:58 2015 +0200

----------------------------------------------------------------------
 .../flink/util/RandomGeneratorBenchmark.java    | 82 ++++++++++++++++++++
 flink-core/pom.xml                              |  2 +-
 .../org/apache/flink/util/XORShiftRandom.java   | 61 +++++++++++++++
 .../api/java/sampling/BernoulliSampler.java     |  5 +-
 .../flink/api/java/sampling/PoissonSampler.java |  5 +-
 .../ReservoirSamplerWithReplacement.java        |  5 +-
 .../ReservoirSamplerWithoutReplacement.java     |  5 +-
 7 files changed, 156 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/5fb1c479/flink-benchmark/src/main/java/org/apache/flink/util/RandomGeneratorBenchmark.java
----------------------------------------------------------------------
diff --git a/flink-benchmark/src/main/java/org/apache/flink/util/RandomGeneratorBenchmark.java
b/flink-benchmark/src/main/java/org/apache/flink/util/RandomGeneratorBenchmark.java
new file mode 100644
index 0000000..65b2434
--- /dev/null
+++ b/flink-benchmark/src/main/java/org/apache/flink/util/RandomGeneratorBenchmark.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.util;
+
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Warmup;
+import org.openjdk.jmh.runner.Runner;
+import org.openjdk.jmh.runner.RunnerException;
+import org.openjdk.jmh.runner.options.Options;
+import org.openjdk.jmh.runner.options.OptionsBuilder;
+
+import java.util.Random;
+import java.util.concurrent.TimeUnit;
+
+@State(Scope.Benchmark)
+public class RandomGeneratorBenchmark {
+
+	@BenchmarkMode(Mode.Throughput)
+	@Fork(1)
+	@State(Scope.Thread)
+	@OutputTimeUnit(TimeUnit.SECONDS)
+	public static abstract class AbstractRandomBench {
+		private final static long ITERATOR_NUMBER = 10000000;
+		protected Random random;
+
+		@Setup
+		public abstract void init();
+
+		@Benchmark
+		@Warmup(iterations = 5)
+		@Measurement(iterations = 5)
+		public void bench() {
+			for (int i = 0; i < ITERATOR_NUMBER; i++) {
+				random.nextInt();
+			}
+		}
+	}
+
+	public static class RandomBench extends AbstractRandomBench {
+		@Override
+		public void init() {
+			this.random = new Random(11);
+		}
+	}
+
+	public static class XORShiftRandomBench extends AbstractRandomBench {
+
+		@Override
+		public void init() {
+			this.random = new XORShiftRandom(11);
+		}
+	}
+
+	public static void main(String[] args) throws RunnerException {
+		Options opt = new OptionsBuilder().include(".*" + RandomGeneratorBenchmark.class.getSimpleName()
+
+			".*").build();
+		new Runner(opt).run();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/5fb1c479/flink-core/pom.xml
----------------------------------------------------------------------
diff --git a/flink-core/pom.xml b/flink-core/pom.xml
index 7ff0fbc..94a4082 100644
--- a/flink-core/pom.xml
+++ b/flink-core/pom.xml
@@ -70,7 +70,7 @@ under the License.
 			<scope>test</scope>
 		</dependency>
 
-    </dependencies>
+	</dependencies>
 
 	<build>
 		<plugins>

http://git-wip-us.apache.org/repos/asf/flink/blob/5fb1c479/flink-core/src/main/java/org/apache/flink/util/XORShiftRandom.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/XORShiftRandom.java b/flink-core/src/main/java/org/apache/flink/util/XORShiftRandom.java
new file mode 100644
index 0000000..fa68442
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/util/XORShiftRandom.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.util;
+
+import com.google.common.hash.Hashing;
+
+import java.util.Random;
+
+/**
+ * Implement a random number generator based on the XORShift algorithm discovered by George
Marsaglia.
+ * This RNG is observed 4.5 times faster than {@link java.util.Random} in benchmark, with
the cost
+ * that abandon thread-safety. So it's recommended to create a new {@link XORShiftRandom}
for each
+ * thread.
+ *
+ * @see <a href="http://www.jstatsoft.org/v08/i14/paper">XORShift Algorithm Paper</a>
+ */
+public class XORShiftRandom extends Random {
+
+	private long seed;
+
+	public XORShiftRandom() {
+		this(System.nanoTime());
+	}
+
+	public XORShiftRandom(long input) {
+		super(input);
+		this.seed = Hashing.murmur3_128().hashLong(input).asLong();
+	}
+
+	/**
+	 * All other methods like nextInt()/nextDouble()... depends on this, so we just need to
overwrite
+	 * this.
+	 *
+	 * @param bits Random bits
+	 * @return The next pseudorandom value from this random number
+	 * generator's sequence
+	 */
+	@Override
+	public int next(int bits) {
+		long nextSeed = seed ^ (seed << 21);
+		nextSeed ^= (nextSeed >>> 35);
+		nextSeed ^= (nextSeed << 4);
+		seed = nextSeed;
+		return (int) (nextSeed & ((1L << bits) - 1));
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/5fb1c479/flink-java/src/main/java/org/apache/flink/api/java/sampling/BernoulliSampler.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/sampling/BernoulliSampler.java
b/flink-java/src/main/java/org/apache/flink/api/java/sampling/BernoulliSampler.java
index 99ea5de..b9aef66 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/sampling/BernoulliSampler.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/sampling/BernoulliSampler.java
@@ -18,6 +18,7 @@
 package org.apache.flink.api.java.sampling;
 
 import com.google.common.base.Preconditions;
+import org.apache.flink.util.XORShiftRandom;
 
 import java.util.Iterator;
 import java.util.Random;
@@ -44,7 +45,7 @@ public class BernoulliSampler<T> extends RandomSampler<T> {
 	 * @param fraction Sample fraction, aka the Bernoulli sampler possibility.
 	 */
 	public BernoulliSampler(double fraction) {
-		this(fraction, new Random());
+		this(fraction, new XORShiftRandom());
 	}
 	
 	/**
@@ -54,7 +55,7 @@ public class BernoulliSampler<T> extends RandomSampler<T> {
 	 * @param seed     Random number generator seed.
 	 */
 	public BernoulliSampler(double fraction, long seed) {
-		this(fraction, new Random(seed));
+		this(fraction, new XORShiftRandom(seed));
 	}
 	
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/5fb1c479/flink-java/src/main/java/org/apache/flink/api/java/sampling/PoissonSampler.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/sampling/PoissonSampler.java
b/flink-java/src/main/java/org/apache/flink/api/java/sampling/PoissonSampler.java
index 8701167..e132882 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/sampling/PoissonSampler.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/sampling/PoissonSampler.java
@@ -19,6 +19,7 @@ package org.apache.flink.api.java.sampling;
 
 import com.google.common.base.Preconditions;
 import org.apache.commons.math3.distribution.PoissonDistribution;
+import org.apache.flink.util.XORShiftRandom;
 
 import java.util.Iterator;
 import java.util.Random;
@@ -53,7 +54,7 @@ public class PoissonSampler<T> extends RandomSampler<T> {
 			this.poissonDistribution = new PoissonDistribution(fraction);
 			this.poissonDistribution.reseedRandomGenerator(seed);
 		}
-		this.random = new Random(seed);
+		this.random = new XORShiftRandom(seed);
 	}
 	
 	/**
@@ -67,7 +68,7 @@ public class PoissonSampler<T> extends RandomSampler<T> {
 		if (this.fraction > 0) {
 			this.poissonDistribution = new PoissonDistribution(fraction);
 		}
-		this.random = new Random();
+		this.random = new XORShiftRandom();
 	}
 	
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/5fb1c479/flink-java/src/main/java/org/apache/flink/api/java/sampling/ReservoirSamplerWithReplacement.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/sampling/ReservoirSamplerWithReplacement.java
b/flink-java/src/main/java/org/apache/flink/api/java/sampling/ReservoirSamplerWithReplacement.java
index 9c37154..634f60d 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/sampling/ReservoirSamplerWithReplacement.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/sampling/ReservoirSamplerWithReplacement.java
@@ -18,6 +18,7 @@
 package org.apache.flink.api.java.sampling;
 
 import com.google.common.base.Preconditions;
+import org.apache.flink.util.XORShiftRandom;
 
 import java.util.Iterator;
 import java.util.PriorityQueue;
@@ -45,7 +46,7 @@ public class ReservoirSamplerWithReplacement<T> extends DistributedRandomSampler
 	 * @param numSamples Number of selected elements, must be non-negative.
 	 */
 	public ReservoirSamplerWithReplacement(int numSamples) {
-		this(numSamples, new Random());
+		this(numSamples, new XORShiftRandom());
 	}
 	
 	/**
@@ -55,7 +56,7 @@ public class ReservoirSamplerWithReplacement<T> extends DistributedRandomSampler
 	 * @param seed       Random number generator seed
 	 */
 	public ReservoirSamplerWithReplacement(int numSamples, long seed) {
-		this(numSamples, new Random(seed));
+		this(numSamples, new XORShiftRandom(seed));
 	}
 	
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/5fb1c479/flink-java/src/main/java/org/apache/flink/api/java/sampling/ReservoirSamplerWithoutReplacement.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/sampling/ReservoirSamplerWithoutReplacement.java
b/flink-java/src/main/java/org/apache/flink/api/java/sampling/ReservoirSamplerWithoutReplacement.java
index b953bff..139859b 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/sampling/ReservoirSamplerWithoutReplacement.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/sampling/ReservoirSamplerWithoutReplacement.java
@@ -18,6 +18,7 @@
 package org.apache.flink.api.java.sampling;
 
 import com.google.common.base.Preconditions;
+import org.apache.flink.util.XORShiftRandom;
 
 import java.util.Iterator;
 import java.util.PriorityQueue;
@@ -60,7 +61,7 @@ public class ReservoirSamplerWithoutReplacement<T> extends DistributedRandomSamp
 	 * @param numSamples Maximum number of samples to retain in reservoir, must be non-negative.
 	 */
 	public ReservoirSamplerWithoutReplacement(int numSamples) {
-		this(numSamples, new Random());
+		this(numSamples, new XORShiftRandom());
 	}
 	
 	/**
@@ -71,7 +72,7 @@ public class ReservoirSamplerWithoutReplacement<T> extends DistributedRandomSamp
 	 */
 	public ReservoirSamplerWithoutReplacement(int numSamples, long seed) {
 		
-		this(numSamples, new Random(seed));
+		this(numSamples, new XORShiftRandom(seed));
 	}
 	
 	@Override


Mime
View raw message