flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From trohrm...@apache.org
Subject flink git commit: [FLINK-1901] [core] Create sample operator for Dataset.
Date Fri, 21 Aug 2015 13:41:53 GMT
Repository: flink
Updated Branches:
  refs/heads/master 58421b848 -> c9cfb17cb


[FLINK-1901] [core] Create sample operator for Dataset.

[FLINK-1901] [core] enable sample with fixed size on the whole dataset.

[FLINK-1901] [core] add more comments for RandomSamplerTest.

[FLINK-1901] [core] refactor PoissonSampler output Iterator.

[FLINK-1901] [core] move sample/sampleWithSize operator to DataSetUtils.

Adds notes for commons-math3 to LICENSE and NOTICE file

This closes #949.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c9cfb17c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c9cfb17c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c9cfb17c

Branch: refs/heads/master
Commit: c9cfb17cb095def8b8ea0ed1b598fc78b890b874
Parents: 58421b8
Author: chengxiang li <chengxiang.li@intel.com>
Authored: Wed Jul 22 11:38:13 2015 +0800
Committer: Till Rohrmann <trohrmann@apache.org>
Committed: Fri Aug 21 15:41:27 2015 +0200

----------------------------------------------------------------------
 flink-dist/src/main/flink-bin/LICENSE           |   1 +
 flink-dist/src/main/flink-bin/NOTICE            |  15 +
 flink-java/pom.xml                              |   6 +
 .../java/org/apache/flink/api/java/DataSet.java |   2 +-
 .../java/org/apache/flink/api/java/Utils.java   |   4 +
 .../api/java/functions/SampleInCoordinator.java |  71 +++
 .../api/java/functions/SampleInPartition.java   |  71 +++
 .../api/java/functions/SampleWithFraction.java  |  68 +++
 .../api/java/sampling/BernoulliSampler.java     | 117 +++++
 .../java/sampling/DistributedRandomSampler.java | 125 +++++
 .../java/sampling/IntermediateSampleData.java   |  47 ++
 .../flink/api/java/sampling/PoissonSampler.java | 122 +++++
 .../flink/api/java/sampling/RandomSampler.java  |  63 +++
 .../ReservoirSamplerWithReplacement.java        | 110 +++++
 .../ReservoirSamplerWithoutReplacement.java     | 106 +++++
 .../flink/api/java/utils/DataSetUtils.java      |  95 ++++
 .../api/java/sampling/RandomSamplerTest.java    | 452 +++++++++++++++++++
 .../apache/flink/api/scala/DataSetUtils.scala   |  40 +-
 .../apache/flink/test/util/TestBaseUtils.java   |  31 ++
 .../test/javaApiOperators/SampleITCase.java     | 167 +++++++
 .../api/scala/operators/SampleITCase.scala      | 167 +++++++
 pom.xml                                         |   6 +
 22 files changed, 1884 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/c9cfb17c/flink-dist/src/main/flink-bin/LICENSE
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/flink-bin/LICENSE b/flink-dist/src/main/flink-bin/LICENSE
index 281b8f0..e79ff71 100644
--- a/flink-dist/src/main/flink-bin/LICENSE
+++ b/flink-dist/src/main/flink-bin/LICENSE
@@ -277,6 +277,7 @@ under the Apache License (v 2.0):
  - Uncommons Math (org.uncommons.maths:uncommons-maths:1.2.2a - https://github.com/dwdyer/uncommons-maths)
  - Jansi (org.fusesource.jansi:jansi:1.4 - https://github.com/fusesource/jansi)
  - Apache Camel Core (org.apache.camel:camel-core:2.10.3 - http://camel.apache.org/camel-core.html)
+ - Apache Commons Math (org.apache.commons:commons-math3:3.5 - http://commons.apache.org/proper/commons-math/index.html)
 
 
 -----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/c9cfb17c/flink-dist/src/main/flink-bin/NOTICE
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/flink-bin/NOTICE b/flink-dist/src/main/flink-bin/NOTICE
index a71e61d..7b0fe72 100644
--- a/flink-dist/src/main/flink-bin/NOTICE
+++ b/flink-dist/src/main/flink-bin/NOTICE
@@ -69,6 +69,21 @@ Copyright (c) 2002 JSON.org
 
 
 -----------------------------------------------------------------------
+                           Apache Commons Math
+-----------------------------------------------------------------------
+
+Apache Commons Math
+Copyright 2001-2015 The Apache Software Foundation
+
+This product includes software developed at
+The Apache Software Foundation (http://www.apache.org/).
+
+This product includes software developed for Orekit by
+CS Systèmes d'Information (http://www.c-s.fr/)
+Copyright 2010-2012 CS Systèmes d'Information
+
+
+-----------------------------------------------------------------------
                            Akka
 -----------------------------------------------------------------------
 

http://git-wip-us.apache.org/repos/asf/flink/blob/c9cfb17c/flink-java/pom.xml
----------------------------------------------------------------------
diff --git a/flink-java/pom.xml b/flink-java/pom.xml
index 683304f..d777048 100644
--- a/flink-java/pom.xml
+++ b/flink-java/pom.xml
@@ -92,6 +92,12 @@ under the License.
 			<artifactId>guava</artifactId>
 			<version>${guava.version}</version>
 		</dependency>
+
+		<dependency>
+			<groupId>org.apache.commons</groupId>
+			<artifactId>commons-math3</artifactId>
+			<!-- managed version -->
+		</dependency>
 		
 		<dependency>
 			<groupId>org.apache.flink</groupId>

http://git-wip-us.apache.org/repos/asf/flink/blob/c9cfb17c/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
index 81ba279..98a94c6 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
@@ -1057,7 +1057,7 @@ public abstract class DataSet<T> {
 	public UnionOperator<T> union(DataSet<T> other){
 		return new UnionOperator<T>(this, other, Utils.getCallLocationName());
 	}
-	
+
 	// --------------------------------------------------------------------------------------------
 	//  Partitioning
 	// --------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/c9cfb17c/flink-java/src/main/java/org/apache/flink/api/java/Utils.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/Utils.java b/flink-java/src/main/java/org/apache/flink/api/java/Utils.java
index a1e3d25..785f3ce 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/Utils.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/Utils.java
@@ -28,6 +28,8 @@ import org.apache.flink.api.java.typeutils.GenericTypeInfo;
 import java.lang.reflect.Field;
 import java.lang.reflect.Modifier;
 import java.util.List;
+import java.util.Random;
+
 import org.apache.flink.api.common.functions.RichFlatMapFunction;
 
 import org.apache.flink.configuration.Configuration;
@@ -36,6 +38,8 @@ import static org.apache.flink.api.java.functions.FunctionAnnotation.SkipCodeAna
 
 
 public class Utils {
+	
+	public static final Random RNG = new Random();
 
 	public static String getCallLocationName() {
 		return getCallLocationName(4);

http://git-wip-us.apache.org/repos/asf/flink/blob/c9cfb17c/flink-java/src/main/java/org/apache/flink/api/java/functions/SampleInCoordinator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/functions/SampleInCoordinator.java b/flink-java/src/main/java/org/apache/flink/api/java/functions/SampleInCoordinator.java
new file mode 100644
index 0000000..528d746
--- /dev/null
+++ b/flink-java/src/main/java/org/apache/flink/api/java/functions/SampleInCoordinator.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.java.functions;
+
+import org.apache.flink.api.common.functions.GroupReduceFunction;
+import org.apache.flink.api.java.sampling.IntermediateSampleData;
+import org.apache.flink.api.java.sampling.DistributedRandomSampler;
+import org.apache.flink.api.java.sampling.ReservoirSamplerWithReplacement;
+import org.apache.flink.api.java.sampling.ReservoirSamplerWithoutReplacement;
+import org.apache.flink.util.Collector;
+
+import java.util.Iterator;
+
+/**
+ * SampleInCoordinator wraps the sample logic of the coordinator side (the second phase of
+ * distributed sample algorithm). It executes the coordinator side sample logic in an all reduce
+ * function. The user needs to make sure that the operator parallelism of this function is 1 to
+ * make sure this is a central coordinator. Besides, we do not need the task index information for
+ * random generator seed as the parallelism must be 1.
+ *
+ * @param <T> the data type wrapped in ElementWithRandom as input.
+ */
+public class SampleInCoordinator<T> implements GroupReduceFunction<IntermediateSampleData<T>, T> {
+
+	private boolean withReplacement;
+	private int numSample;
+	private long seed;
+
+	/**
+	 * Create a function instance of SampleInCoordinator.
+	 *
+	 * @param withReplacement Whether element can be selected more than once.
+	 * @param numSample       Fixed sample size.
+	 * @param seed            Random generator seed.
+	 */
+	public SampleInCoordinator(boolean withReplacement, int numSample, long seed) {
+		this.withReplacement = withReplacement;
+		this.numSample = numSample;
+		this.seed = seed;
+	}
+
+	@Override
+	public void reduce(Iterable<IntermediateSampleData<T>> values, Collector<T> out) throws Exception {
+		DistributedRandomSampler<T> sampler;
+		if (withReplacement) {
+			sampler = new ReservoirSamplerWithReplacement<>(numSample, seed);
+		} else {
+			sampler = new ReservoirSamplerWithoutReplacement<>(numSample, seed);
+		}
+
+		Iterator<T> sampled = sampler.sampleInCoordinator(values.iterator());
+		while (sampled.hasNext()) {
+			out.collect(sampled.next());
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c9cfb17c/flink-java/src/main/java/org/apache/flink/api/java/functions/SampleInPartition.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/functions/SampleInPartition.java b/flink-java/src/main/java/org/apache/flink/api/java/functions/SampleInPartition.java
new file mode 100644
index 0000000..295fb44
--- /dev/null
+++ b/flink-java/src/main/java/org/apache/flink/api/java/functions/SampleInPartition.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.java.functions;
+
+import org.apache.flink.api.common.functions.RichMapPartitionFunction;
+import org.apache.flink.api.java.sampling.IntermediateSampleData;
+import org.apache.flink.api.java.sampling.DistributedRandomSampler;
+import org.apache.flink.api.java.sampling.ReservoirSamplerWithReplacement;
+import org.apache.flink.api.java.sampling.ReservoirSamplerWithoutReplacement;
+import org.apache.flink.util.Collector;
+
+import java.util.Iterator;
+
+/**
+ * SampleInPartition wraps the sample logic on the partition side (the first phase of distributed
+ * sample algorithm). It executes the partition side sample logic in a mapPartition function.
+ *
+ * @param <T> The type of input data
+ */
+public class SampleInPartition<T> extends RichMapPartitionFunction<T, IntermediateSampleData<T>> {
+
+	private boolean withReplacement;
+	private int numSample;
+	private long seed;
+
+	/**
+	 * Create a function instance of SampleInPartition.
+	 *
+	 * @param withReplacement Whether element can be selected more than once.
+	 * @param numSample       Fixed sample size.
+	 * @param seed            Random generator seed.
+	 */
+	public SampleInPartition(boolean withReplacement, int numSample, long seed) {
+		this.withReplacement = withReplacement;
+		this.numSample = numSample;
+		this.seed = seed;
+	}
+
+	@Override
+	public void mapPartition(Iterable<T> values, Collector<IntermediateSampleData<T>> out) throws Exception {
+		DistributedRandomSampler<T> sampler;
+		long seedAndIndex = seed + getRuntimeContext().getIndexOfThisSubtask();
+		if (withReplacement) {
+			sampler = new ReservoirSamplerWithReplacement<T>(numSample, seedAndIndex);
+		} else {
+			sampler = new ReservoirSamplerWithoutReplacement<T>(numSample, seedAndIndex);
+		}
+
+		Iterator<IntermediateSampleData<T>> sampled = sampler.sampleInPartition(values.iterator());
+		while (sampled.hasNext()) {
+			out.collect(sampled.next());
+		}
+	}
+}
+
+

http://git-wip-us.apache.org/repos/asf/flink/blob/c9cfb17c/flink-java/src/main/java/org/apache/flink/api/java/functions/SampleWithFraction.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/functions/SampleWithFraction.java b/flink-java/src/main/java/org/apache/flink/api/java/functions/SampleWithFraction.java
new file mode 100644
index 0000000..4ef9aa0
--- /dev/null
+++ b/flink-java/src/main/java/org/apache/flink/api/java/functions/SampleWithFraction.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.java.functions;
+
+import org.apache.flink.api.common.functions.RichMapPartitionFunction;
+import org.apache.flink.api.java.sampling.BernoulliSampler;
+import org.apache.flink.api.java.sampling.PoissonSampler;
+import org.apache.flink.api.java.sampling.RandomSampler;
+import org.apache.flink.util.Collector;
+
+import java.util.Iterator;
+
+/**
+ * A map partition function wrapper for sampling algorithms with fraction, the sample algorithm
+ * takes the partition iterator as input.
+ *
+ * @param <T>
+ */
+public class SampleWithFraction<T> extends RichMapPartitionFunction<T, T> {
+
+	private boolean withReplacement;
+	private double fraction;
+	private long seed;
+
+	/**
+	 * Create a function instance of SampleWithFraction.
+	 *
+	 * @param withReplacement Whether element can be selected more than once.
+	 * @param fraction        Probability that each element is selected.
+	 * @param seed            random number generator seed.
+	 */
+	public SampleWithFraction(boolean withReplacement, double fraction, long seed) {
+		this.withReplacement = withReplacement;
+		this.fraction = fraction;
+		this.seed = seed;
+	}
+
+	@Override
+	public void mapPartition(Iterable<T> values, Collector<T> out) throws Exception {
+		RandomSampler<T> sampler;
+		long seedAndIndex = seed + getRuntimeContext().getIndexOfThisSubtask();
+		if (withReplacement) {
+			sampler = new PoissonSampler<>(fraction, seedAndIndex);
+		} else {
+			sampler = new BernoulliSampler<>(fraction, seedAndIndex);
+		}
+
+		Iterator<T> sampled = sampler.sample(values.iterator());
+		while (sampled.hasNext()) {
+			out.collect(sampled.next());
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c9cfb17c/flink-java/src/main/java/org/apache/flink/api/java/sampling/BernoulliSampler.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/sampling/BernoulliSampler.java b/flink-java/src/main/java/org/apache/flink/api/java/sampling/BernoulliSampler.java
new file mode 100644
index 0000000..0f5ecc6
--- /dev/null
+++ b/flink-java/src/main/java/org/apache/flink/api/java/sampling/BernoulliSampler.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.java.sampling;
+
+import com.google.common.base.Preconditions;
+
+import java.util.Iterator;
+import java.util.Random;
+
+/**
+ * A sampler implementation built upon a Bernoulli trail. This sampler is used to sample with
+ * fraction and without replacement. Whether an element is sampled or not is determined by a
+ * Bernoulli experiment.
+ *
+ * @param <T> The type of sample.
+ */
+public class BernoulliSampler<T> extends RandomSampler<T> {
+	
+	private final double fraction;
+	private final Random random;
+	
+	/**
+	 * Create a Bernoulli sampler with sample fraction and default random number generator.
+	 *
+	 * @param fraction Sample fraction, aka the Bernoulli sampler possibility.
+	 */
+	public BernoulliSampler(double fraction) {
+		this(fraction, new Random());
+	}
+	
+	/**
+	 * Create a Bernoulli sampler with sample fraction and random number generator seed.
+	 *
+	 * @param fraction Sample fraction, aka the Bernoulli sampler possibility.
+	 * @param seed     Random number generator seed.
+	 */
+	public BernoulliSampler(double fraction, long seed) {
+		this(fraction, new Random(seed));
+	}
+	
+	/**
+	 * Create a Bernoulli sampler with sample fraction and random number generator.
+	 *
+	 * @param fraction Sample fraction, aka the Bernoulli sampler possibility.
+	 * @param random   The random number generator.
+	 */
+	public BernoulliSampler(double fraction, Random random) {
+		Preconditions.checkArgument(fraction >= 0 && fraction <= 1.0d, "fraction fraction must between [0, 1].");
+		this.fraction = fraction;
+		this.random = random;
+	}
+	
+	/**
+	 * Sample the input elements, for each input element, take a Bernoulli trail for sampling.
+	 *
+	 * @param input Elements to be sampled.
+	 * @return The sampled result which is lazy computed upon input elements.
+	 */
+	@Override
+	public Iterator<T> sample(final Iterator<T> input) {
+		if (fraction == 0) {
+			return EMPTY_ITERABLE;
+		}
+		
+		return new SampledIterator<T>() {
+			T current = null;
+			
+			@Override
+			public boolean hasNext() {
+				if (current == null) {
+					current = getNextSampledElement();
+				}
+
+				return current != null;
+			}
+			
+			@Override
+			public T next() {
+				if (current == null) {
+					return getNextSampledElement();
+				} else {
+					T result = current;
+					current = null;
+
+					return result;
+				}
+			}
+
+			private T getNextSampledElement() {
+				while (input.hasNext()) {
+					T element = input.next();
+
+					if (random.nextDouble() <= fraction) {
+						return element;
+					}
+				}
+
+				return null;
+			}
+		};
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c9cfb17c/flink-java/src/main/java/org/apache/flink/api/java/sampling/DistributedRandomSampler.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/sampling/DistributedRandomSampler.java b/flink-java/src/main/java/org/apache/flink/api/java/sampling/DistributedRandomSampler.java
new file mode 100644
index 0000000..e5a719f
--- /dev/null
+++ b/flink-java/src/main/java/org/apache/flink/api/java/sampling/DistributedRandomSampler.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.java.sampling;
+
+import java.util.Iterator;
+import java.util.PriorityQueue;
+
+/**
+ * For sampling with fraction, the sample algorithms are natively distributed, while it's not
+ * true for fixed size sample algorithms. The fixed size sample algorithms require two-phases
+ * sampling (according to our current implementation). In the first phase, each distributed
+ * partition is sampled independently. The partial sampling results are handled by a central
+ * coordinator. The central coordinator combines the partial sampling results to form the final
+ * result.
+ *
+ * @param <T> The input data type.
+ */
+public abstract class DistributedRandomSampler<T> extends RandomSampler<T> {
+
+	protected final int numSamples;
+
+	public DistributedRandomSampler(int numSamples) {
+		this.numSamples = numSamples;
+	}
+
+	protected final Iterator<IntermediateSampleData<T>> EMPTY_INTERMEDIATE_ITERABLE =
+		new SampledIterator<IntermediateSampleData<T>>() {
+			@Override
+			public boolean hasNext() {
+				return false;
+			}
+
+			@Override
+			public IntermediateSampleData<T> next() {
+				return null;
+			}
+		};
+
+	/**
+	 * Sample algorithm for the first phase. It operates on a single partition.
+	 *
+	 * @param input The DataSet input of each partition.
+	 * @return Intermediate sample output which will be used as the input of the second phase.
+	 */
+	public abstract Iterator<IntermediateSampleData<T>> sampleInPartition(Iterator<T> input);
+
+	/**
+	 * Sample algorithm for the second phase. This operation should be executed as the UDF of
+	 * an all reduce operation.
+	 *
+	 * @param input The intermediate sample output generated in the first phase.
+	 * @return The sampled output.
+	 */
+	public Iterator<T> sampleInCoordinator(Iterator<IntermediateSampleData<T>> input) {
+		if (numSamples == 0) {
+			return EMPTY_ITERABLE;
+		}
+
+		// This queue holds fixed number elements with the top K weight for the coordinator.
+		PriorityQueue<IntermediateSampleData<T>> reservoir = new PriorityQueue<IntermediateSampleData<T>>(numSamples);
+		int index = 0;
+		IntermediateSampleData<T> smallest = null;
+		while (input.hasNext()) {
+			IntermediateSampleData<T> element = input.next();
+			if (index < numSamples) {
+				// Fill the queue with first K elements from input.
+				reservoir.add(element);
+				smallest = reservoir.peek();
+			} else {
+				// If current element weight is larger than the smallest one in queue, remove the element
+				// with the smallest weight, and append current element into the queue.
+				if (element.getWeight() > smallest.getWeight()) {
+					reservoir.remove();
+					reservoir.add(element);
+					smallest = reservoir.peek();
+				}
+			}
+			index++;
+		}
+		final Iterator<IntermediateSampleData<T>> itr = reservoir.iterator();
+
+		return new Iterator<T>() {
+			@Override
+			public boolean hasNext() {
+				return itr.hasNext();
+			}
+
+			@Override
+			public T next() {
+				return itr.next().getElement();
+			}
+
+			@Override
+			public void remove() {
+				itr.remove();
+			}
+		};
+	}
+
+	/**
+	 * Combine the first phase and second phase in sequence, implemented for test purpose only.
+	 *
+	 * @param input Source data.
+	 * @return Sample result in sequence.
+	 */
+	@Override
+	public Iterator<T> sample(Iterator<T> input) {
+		return sampleInCoordinator(sampleInPartition(input));
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c9cfb17c/flink-java/src/main/java/org/apache/flink/api/java/sampling/IntermediateSampleData.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/sampling/IntermediateSampleData.java b/flink-java/src/main/java/org/apache/flink/api/java/sampling/IntermediateSampleData.java
new file mode 100644
index 0000000..1d70f19
--- /dev/null
+++ b/flink-java/src/main/java/org/apache/flink/api/java/sampling/IntermediateSampleData.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.java.sampling;
+
+/**
+ * The data structure which is transferred between partitions and the coordinator for distributed
+ * random sampling.
+ *
+ * @param <T> The type of sample data.
+ */
+public class IntermediateSampleData<T> implements Comparable<IntermediateSampleData<T>> {
+	private double weight;
+	private T element;
+
+	public IntermediateSampleData(double weight, T element) {
+		this.weight = weight;
+		this.element = element;
+	}
+
+	public double getWeight() {
+		return weight;
+	}
+
+	public T getElement() {
+		return element;
+	}
+
+	@Override
+	public int compareTo(IntermediateSampleData<T> other) {
+		return this.weight >= other.getWeight() ? 1 : -1;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c9cfb17c/flink-java/src/main/java/org/apache/flink/api/java/sampling/PoissonSampler.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/sampling/PoissonSampler.java b/flink-java/src/main/java/org/apache/flink/api/java/sampling/PoissonSampler.java
new file mode 100644
index 0000000..3834d24
--- /dev/null
+++ b/flink-java/src/main/java/org/apache/flink/api/java/sampling/PoissonSampler.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.java.sampling;
+
+import com.google.common.base.Preconditions;
+import org.apache.commons.math3.distribution.PoissonDistribution;
+
+import java.util.Iterator;
+
+/**
+ * A sampler implementation based on the Poisson Distribution. While sampling elements with fraction
+ * and replacement, the selected number of each element follows a given poisson distribution.
+ *
+ * @param <T> The type of sample.
+ * @see <a href="https://en.wikipedia.org/wiki/Poisson_distribution">https://en.wikipedia.org/wiki/Poisson_distribution</a>
+ */
+public class PoissonSampler<T> extends RandomSampler<T> {
+	
+	private PoissonDistribution poissonDistribution;
+	private final double fraction;
+	
+	/**
+	 * Create a poisson sampler which can sample elements with replacement.
+	 *
+	 * @param fraction The expected count of each element.
+	 * @param seed     Random number generator seed for internal PoissonDistribution.
+	 */
+	public PoissonSampler(double fraction, long seed) {
+		Preconditions.checkArgument(fraction >= 0, "fraction should be positive.");
+		this.fraction = fraction;
+		if (this.fraction > 0) {
+			this.poissonDistribution = new PoissonDistribution(fraction);
+			this.poissonDistribution.reseedRandomGenerator(seed);
+		}
+	}
+	
+	/**
+	 * Create a poisson sampler which can sample elements with replacement.
+	 *
+	 * @param fraction The expected count of each element.
+	 */
+	public PoissonSampler(double fraction) {
+		Preconditions.checkArgument(fraction >= 0, "fraction should be non-negative.");
+		this.fraction = fraction;
+		if (this.fraction > 0) {
+			this.poissonDistribution = new PoissonDistribution(fraction);
+		}
+	}
+	
+	/**
+	 * Sample the input elements, for each input element, generate its count following a poisson
+	 * distribution.
+	 *
+	 * @param input Elements to be sampled.
+	 * @return The sampled result which is lazy computed upon input elements.
+	 */
+	@Override
+	public Iterator<T> sample(final Iterator<T> input) {
+		if (fraction == 0) {
+			return EMPTY_ITERABLE;
+		}
+		
+		return new SampledIterator<T>() {
+			T currentElement;
+			int currentCount = 0;
+			
+			@Override
+			public boolean hasNext() {
+				if (currentCount > 0) {
+					return true;
+				} else {
+					moveToNextElement();
+
+					if (currentCount > 0) {
+						return true;
+					} else {
+						return false;
+					}
+				}
+			}
+
+			private void moveToNextElement() {
+				while (input.hasNext()) {
+					currentElement = input.next();
+					currentCount = poissonDistribution.sample();
+					if (currentCount > 0) {
+						break;
+					}
+				}
+			}
+			
+			@Override
+			public T next() {
+				if (currentCount == 0) {
+					moveToNextElement();
+				}
+
+				if (currentCount == 0) {
+					return null;
+				} else {
+					currentCount--;
+					return currentElement;
+				}
+			}
+		};
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c9cfb17c/flink-java/src/main/java/org/apache/flink/api/java/sampling/RandomSampler.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/sampling/RandomSampler.java b/flink-java/src/main/java/org/apache/flink/api/java/sampling/RandomSampler.java
new file mode 100644
index 0000000..5fe2920
--- /dev/null
+++ b/flink-java/src/main/java/org/apache/flink/api/java/sampling/RandomSampler.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.java.sampling;
+
+import java.util.Iterator;
+
+/**
+ * A data sample is a set of data selected from a statistical population by a defined procedure.
+ * RandomSampler helps to create data sample randomly.
+ *
+ * @param <T> The type of sampler data.
+ */
+public abstract class RandomSampler<T> {
+	
+	protected final Iterator<T> EMPTY_ITERABLE = new SampledIterator<T>() {
+		@Override
+		public boolean hasNext() {
+			return false;
+		}
+		
+		@Override
+		public T next() {
+			return null;
+		}
+	};
+	
+	/**
+	 * Randomly sample the elements from input in sequence, and return the result iterator.
+	 *
+	 * @param input Source data
+	 * @return The sample result.
+	 */
+	public abstract Iterator<T> sample(Iterator<T> input);
+
+}
+
+/**
+ * A simple abstract iterator which implements the remove method as unsupported operation.
+ *
+ * @param <T> The type of iterator data.
+ */
+abstract class SampledIterator<T> implements Iterator<T> {
+	@Override
+	public void remove() {
+		throw new UnsupportedOperationException("Do not support this operation.");
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c9cfb17c/flink-java/src/main/java/org/apache/flink/api/java/sampling/ReservoirSamplerWithReplacement.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/sampling/ReservoirSamplerWithReplacement.java b/flink-java/src/main/java/org/apache/flink/api/java/sampling/ReservoirSamplerWithReplacement.java
new file mode 100644
index 0000000..9c37154
--- /dev/null
+++ b/flink-java/src/main/java/org/apache/flink/api/java/sampling/ReservoirSamplerWithReplacement.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.java.sampling;
+
+import com.google.common.base.Preconditions;
+
+import java.util.Iterator;
+import java.util.PriorityQueue;
+import java.util.Random;
+
+/**
+ * A simple in memory implementation of Reservoir Sampling with replacement and with only one pass
+ * through the input iteration whose size is unpredictable. The basic idea behind this sampler
+ * implementation is quite similar to {@link ReservoirSamplerWithoutReplacement}. The main
+ * difference is that, in the first phase, we generate weights for each element K times, so that
+ * each element can get selected multiple times.
+ *
+ * This implementation refers to the algorithm described in <a href="researcher.ibm.com/files/us-dpwoodru/tw11.pdf">
+ * "Optimal Random Sampling from Distributed Streams Revisited"</a>.
+ *
+ * @param <T> The type of sample.
+ */
+public class ReservoirSamplerWithReplacement<T> extends DistributedRandomSampler<T> {
+
+	private final Random random;
+
+	/**
+	 * Create a sampler with fixed sample size and default random number generator.
+	 *
+	 * @param numSamples Number of selected elements, must be non-negative.
+	 */
+	public ReservoirSamplerWithReplacement(int numSamples) {
+		this(numSamples, new Random());
+	}
+	
+	/**
+	 * Create a sampler with fixed sample size and random number generator seed.
+	 *
+	 * @param numSamples Number of selected elements, must be non-negative.
+	 * @param seed       Random number generator seed
+	 */
+	public ReservoirSamplerWithReplacement(int numSamples, long seed) {
+		this(numSamples, new Random(seed));
+	}
+	
+	/**
+	 * Create a sampler with fixed sample size and random number generator.
+	 *
+	 * @param numSamples Number of selected elements, must be non-negative.
+	 * @param random     Random number generator
+	 */
+	public ReservoirSamplerWithReplacement(int numSamples, Random random) {
+		super(numSamples);
+		Preconditions.checkArgument(numSamples >= 0, "numSamples should be non-negative.");
+		this.random = random;
+	}
+
+	@Override
+	public Iterator<IntermediateSampleData<T>> sampleInPartition(Iterator<T> input) {
+		if (numSamples == 0) {
+			return EMPTY_INTERMEDIATE_ITERABLE;
+		}
+
+		// This queue holds a fixed number of elements with the top K weight for current partition.
+		PriorityQueue<IntermediateSampleData<T>> queue = new PriorityQueue<IntermediateSampleData<T>>(numSamples);
+
+		IntermediateSampleData<T> smallest = null;
+
+		if (input.hasNext()) {
+			T element = input.next();
+			// Initiate the queue with the first element and random weights.
+			for (int i = 0; i < numSamples; i++) {
+				queue.add(new IntermediateSampleData<T>(random.nextDouble(), element));
+				smallest = queue.peek();
+			}
+		}
+
+		while (input.hasNext()) {
+			T element = input.next();
+			// To sample with replacement, we generate K random weights for each element, so that it's
+			// possible to be selected multi times.
+			for (int i = 0; i < numSamples; i++) {
+				// If current element weight is larger than the smallest one in queue, remove the element
+				// with the smallest weight, and append current element into the queue.
+				double rand = random.nextDouble();
+				if (rand > smallest.getWeight()) {
+					queue.remove();
+					queue.add(new IntermediateSampleData<T>(rand, element));
+					smallest = queue.peek();
+				}
+			}
+		}
+		return queue.iterator();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c9cfb17c/flink-java/src/main/java/org/apache/flink/api/java/sampling/ReservoirSamplerWithoutReplacement.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/sampling/ReservoirSamplerWithoutReplacement.java b/flink-java/src/main/java/org/apache/flink/api/java/sampling/ReservoirSamplerWithoutReplacement.java
new file mode 100644
index 0000000..b953bff
--- /dev/null
+++ b/flink-java/src/main/java/org/apache/flink/api/java/sampling/ReservoirSamplerWithoutReplacement.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.java.sampling;
+
+import com.google.common.base.Preconditions;
+
+import java.util.Iterator;
+import java.util.PriorityQueue;
+import java.util.Random;
+
+/**
+ * A simple in memory implementation of Reservoir Sampling without replacement, and with only one
+ * pass through the input iteration whose size is unpredictable. The basic idea behind this sampler
+ * implementation is to generate a random number for each input element as its weight, select the
+ * top K elements with max weight. As the weights are generated randomly, so are the selected
+ * top K elements. The algorithm is implemented using the {@link DistributedRandomSampler}
+ * interface. In the first phase, we generate random numbers as the weights for each element and
+ * select top K elements as the output of each partitions. In the second phase, we select top K
+ * elements from all the outputs of the first phase.
+ *
+ * This implementation refers to the algorithm described in <a href="researcher.ibm.com/files/us-dpwoodru/tw11.pdf">
+ * "Optimal Random Sampling from Distributed Streams Revisited"</a>.
+ *
+ * @param <T> The type of the sampler.
+ */
+public class ReservoirSamplerWithoutReplacement<T> extends DistributedRandomSampler<T> {
+	
+	private final Random random;
+
+	/**
+	 * Create a new sampler with reservoir size and a supplied random number generator.
+	 *
+	 * @param numSamples Maximum number of samples to retain in reservoir, must be non-negative.
+	 * @param random     Instance of random number generator for sampling.
+	 */
+	public ReservoirSamplerWithoutReplacement(int numSamples, Random random) {
+		super(numSamples);
+		Preconditions.checkArgument(numSamples >= 0, "numSamples should be non-negative.");
+		this.random = random;
+	}
+	
+	/**
+	 * Create a new sampler with reservoir size and a default random number generator.
+	 *
+	 * @param numSamples Maximum number of samples to retain in reservoir, must be non-negative.
+	 */
+	public ReservoirSamplerWithoutReplacement(int numSamples) {
+		this(numSamples, new Random());
+	}
+	
+	/**
+	 * Create a new sampler with reservoir size and the seed for random number generator.
+	 *
+	 * @param numSamples Maximum number of samples to retain in reservoir, must be non-negative.
+	 * @param seed       Random number generator seed.
+	 */
+	public ReservoirSamplerWithoutReplacement(int numSamples, long seed) {
+		
+		this(numSamples, new Random(seed));
+	}
+	
+	@Override
+	public Iterator<IntermediateSampleData<T>> sampleInPartition(Iterator<T> input) {
+		if (numSamples == 0) {
+			return EMPTY_INTERMEDIATE_ITERABLE;
+		}
+
+		// This queue holds fixed number elements with the top K weight for current partition.
+		PriorityQueue<IntermediateSampleData<T>> queue = new PriorityQueue<IntermediateSampleData<T>>(numSamples);
+		int index = 0;
+		IntermediateSampleData<T> smallest = null;
+		while (input.hasNext()) {
+			T element = input.next();
+			if (index < numSamples) {
+				// Fill the queue with first K elements from input.
+				queue.add(new IntermediateSampleData<T>(random.nextDouble(), element));
+				smallest = queue.peek();
+			} else {
+				double rand = random.nextDouble();
+				// Remove the element with the smallest weight, and append current element into the queue.
+				if (rand > smallest.getWeight()) {
+					queue.remove();
+					queue.add(new IntermediateSampleData<T>(rand, element));
+					smallest = queue.peek();
+				}
+			}
+			index++;
+		}
+		return queue.iterator();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c9cfb17c/flink-java/src/main/java/org/apache/flink/api/java/utils/DataSetUtils.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/utils/DataSetUtils.java b/flink-java/src/main/java/org/apache/flink/api/java/utils/DataSetUtils.java
index 142e7cf..d268925 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/utils/DataSetUtils.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/utils/DataSetUtils.java
@@ -19,7 +19,14 @@
 package org.apache.flink.api.java.utils;
 
 import org.apache.flink.api.common.functions.RichMapPartitionFunction;
+import org.apache.flink.api.java.sampling.IntermediateSampleData;
 import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.Utils;
+import org.apache.flink.api.java.functions.SampleInCoordinator;
+import org.apache.flink.api.java.functions.SampleInPartition;
+import org.apache.flink.api.java.functions.SampleWithFraction;
+import org.apache.flink.api.java.operators.GroupReduceOperator;
+import org.apache.flink.api.java.operators.MapPartitionOperator;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.util.Collector;
@@ -142,6 +149,94 @@ public class DataSetUtils {
 		});
 	}
 
+	// --------------------------------------------------------------------------------------------
+	//  Sample
+	// --------------------------------------------------------------------------------------------
+
+	/**
+	 * Generate a sample of DataSet by the probability fraction of each element.
+	 *
+	 * @param withReplacement Whether element can be selected more than once.
+	 * @param fraction        Probability that each element is chosen, should be [0,1] without replacement,
+	 *                        and [0, ∞) with replacement. While fraction is larger than 1, the elements are
+	 *                        expected to be selected multi times into sample on average.
+	 * @return The sampled DataSet
+	 */
+	public static <T> MapPartitionOperator<T, T> sample(
+		DataSet <T> input,
+		final boolean withReplacement,
+		final double fraction) {
+
+		return sample(input, withReplacement, fraction, Utils.RNG.nextLong());
+	}
+
+	/**
+	 * Generate a sample of DataSet by the probability fraction of each element.
+	 *
+	 * @param withReplacement Whether element can be selected more than once.
+	 * @param fraction        Probability that each element is chosen, should be [0,1] without replacement,
+	 *                        and [0, ∞) with replacement. While fraction is larger than 1, the elements are
+	 *                        expected to be selected multi times into sample on average.
+	 * @param seed            random number generator seed.
+	 * @return The sampled DataSet
+	 */
+	public static <T> MapPartitionOperator<T, T> sample(
+		DataSet <T> input,
+		final boolean withReplacement,
+		final double fraction,
+		final long seed) {
+
+		return input.mapPartition(new SampleWithFraction<T>(withReplacement, fraction, seed));
+	}
+
+	/**
+	 * Generate a sample of DataSet which contains fixed size elements.
+	 * <p>
+	 * <strong>NOTE:</strong> Sample with fixed size is not as efficient as sample with fraction, use sample with
+	 * fraction unless you need exact precision.
+	 * <p/>
+	 *
+	 * @param withReplacement Whether element can be selected more than once.
+	 * @param numSample       The expected sample size.
+	 * @return The sampled DataSet
+	 */
+	public static <T> DataSet<T> sampleWithSize(
+		DataSet <T> input,
+		final boolean withReplacement,
+		final int numSample) {
+
+		return sampleWithSize(input, withReplacement, numSample, Utils.RNG.nextLong());
+	}
+
+	/**
+	 * Generate a sample of DataSet which contains fixed size elements.
+	 * <p>
+	 * <strong>NOTE:</strong> Sample with fixed size is not as efficient as sample with fraction, use sample with
+	 * fraction unless you need exact precision.
+	 * <p/>
+	 *
+	 * @param withReplacement Whether element can be selected more than once.
+	 * @param numSample       The expected sample size.
+	 * @param seed            Random number generator seed.
+	 * @return The sampled DataSet
+	 */
+	public static <T> DataSet<T> sampleWithSize(
+		DataSet <T> input,
+		final boolean withReplacement,
+		final int numSample,
+		final long seed) {
+
+		SampleInPartition sampleInPartition = new SampleInPartition<T>(withReplacement, numSample, seed);
+		MapPartitionOperator mapPartitionOperator = input.mapPartition(sampleInPartition);
+
+		// There is no previous group, so the parallelism of GroupReduceOperator is always 1.
+		String callLocation = Utils.getCallLocationName();
+		SampleInCoordinator<T> sampleInCoordinator = new SampleInCoordinator<T>(withReplacement, numSample, seed);
+		return new GroupReduceOperator<IntermediateSampleData<T>, T>(mapPartitionOperator,
+			input.getType(), sampleInCoordinator, callLocation);
+	}
+
+
 	// *************************************************************************
 	//     UTIL METHODS
 	// *************************************************************************

http://git-wip-us.apache.org/repos/asf/flink/blob/c9cfb17c/flink-java/src/test/java/org/apache/flink/api/java/sampling/RandomSamplerTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/sampling/RandomSamplerTest.java b/flink-java/src/test/java/org/apache/flink/api/java/sampling/RandomSamplerTest.java
new file mode 100644
index 0000000..83e5b41
--- /dev/null
+++ b/flink-java/src/test/java/org/apache/flink/api/java/sampling/RandomSamplerTest.java
@@ -0,0 +1,452 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.java.sampling;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import org.apache.commons.math3.stat.inference.KolmogorovSmirnovTest;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Set;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * This test suite try to verify whether all the random samplers work as we expected, which mainly focus on:
+ * <ul>
+ * <li>Does sampled result fit into input parameters? we check parameters like sample fraction, sample size,
+ * w/o replacement, and so on.</li>
+ * <li>Does sampled result randomly selected? we verify this by measure how much does it distributed on source data.
+ * Run Kolmogorov-Smirnov (KS) test between the random samplers and default reference samplers which is distributed
+ * well-proportioned on source data. If random sampler select elements randomly from source, it would distributed
+ * well-proportioned on source data as well. The KS test will fail to strongly reject the null hypothesis that
+ * the distributions of sampling gaps are the same.
+ * </li>
+ * </ul>
+ *
+ * @see <a href="https://en.wikipedia.org/wiki/Kolmogorov%E2%80%93Smirnov_test">Kolmogorov Smirnov test</a>
+ */
+public class RandomSamplerTest {
+	private final static int SOURCE_SIZE = 10000;
+	private static KolmogorovSmirnovTest ksTest;
+	private static List<Double> source;
+	private final static int DEFFAULT_PARTITION_NUMBER=10;
+	private List<Double>[] sourcePartitions = new List[DEFFAULT_PARTITION_NUMBER];
+
+	@BeforeClass
+	public static void init() {
+		// initiate source data set.
+		source = new ArrayList<Double>(SOURCE_SIZE);
+		for (int i = 0; i < SOURCE_SIZE; i++) {
+			source.add((double) i);
+		}
+		
+		ksTest = new KolmogorovSmirnovTest();
+	}
+
+	private void initSourcePartition() {
+		for (int i=0; i<DEFFAULT_PARTITION_NUMBER; i++) {
+			sourcePartitions[i] = new LinkedList<Double>();
+		}
+		for (int i = 0; i< SOURCE_SIZE; i++) {
+			int index = i % DEFFAULT_PARTITION_NUMBER;
+			sourcePartitions[index].add((double)i);
+		}
+	}
+	
+	@Test(expected = java.lang.IllegalArgumentException.class)
+	public void testBernoulliSamplerWithUnexpectedFraction1() {
+		verifySamplerFraction(-1, false);
+	}
+	
+	@Test(expected = java.lang.IllegalArgumentException.class)
+	public void testBernoulliSamplerWithUnexpectedFraction2() {
+		verifySamplerFraction(2, false);
+	}
+	
+	@Test
+	public void testBernoulliSamplerFraction() {
+		verifySamplerFraction(0.01, false);
+		verifySamplerFraction(0.05, false);
+		verifySamplerFraction(0.1, false);
+		verifySamplerFraction(0.3, false);
+		verifySamplerFraction(0.5, false);
+		verifySamplerFraction(0.854, false);
+		verifySamplerFraction(0.99, false);
+	}
+	
+	@Test
+	public void testBernoulliSamplerDuplicateElements() {
+		verifyRandomSamplerDuplicateElements(new BernoulliSampler<Double>(0.01));
+		verifyRandomSamplerDuplicateElements(new BernoulliSampler<Double>(0.1));
+		verifyRandomSamplerDuplicateElements(new BernoulliSampler<Double>(0.5));
+	}
+	
+	@Test(expected = java.lang.IllegalArgumentException.class)
+	public void testPoissonSamplerWithUnexpectedFraction1() {
+		verifySamplerFraction(-1, true);
+	}
+	
+	@Test
+	public void testPoissonSamplerFraction() {
+		verifySamplerFraction(0.01, true);
+		verifySamplerFraction(0.05, true);
+		verifySamplerFraction(0.1, true);
+		verifySamplerFraction(0.5, true);
+		verifySamplerFraction(0.854, true);
+		verifySamplerFraction(0.99, true);
+		verifySamplerFraction(1.5, true);
+	}
+	
+	@Test(expected = java.lang.IllegalArgumentException.class)
+	public void testReservoirSamplerUnexpectedSize1() {
+		verifySamplerFixedSampleSize(-1, true);
+	}
+	
+	@Test(expected = java.lang.IllegalArgumentException.class)
+	public void testReservoirSamplerUnexpectedSize2() {
+		verifySamplerFixedSampleSize(-1, false);
+	}
+	
+	@Test
+	public void testBernoulliSamplerDistribution() {
+		verifyBernoulliSampler(0.01d);
+		verifyBernoulliSampler(0.05d);
+		verifyBernoulliSampler(0.1d);
+		verifyBernoulliSampler(0.5d);
+	}
+	
+	@Test
+	public void testPoissonSamplerDistribution() {
+		verifyPoissonSampler(0.01d);
+		verifyPoissonSampler(0.05d);
+		verifyPoissonSampler(0.1d);
+		verifyPoissonSampler(0.5d);
+	}
+	
+	@Test
+	public void testReservoirSamplerSampledSize() {
+		verifySamplerFixedSampleSize(1, true);
+		verifySamplerFixedSampleSize(10, true);
+		verifySamplerFixedSampleSize(100, true);
+		verifySamplerFixedSampleSize(1234, true);
+		verifySamplerFixedSampleSize(9999, true);
+		verifySamplerFixedSampleSize(20000, true);
+		
+		verifySamplerFixedSampleSize(1, false);
+		verifySamplerFixedSampleSize(10, false);
+		verifySamplerFixedSampleSize(100, false);
+		verifySamplerFixedSampleSize(1234, false);
+		verifySamplerFixedSampleSize(9999, false);
+	}
+	
+	@Test
+	public void testReservoirSamplerSampledSize2() {
+		RandomSampler<Double> sampler = new ReservoirSamplerWithoutReplacement<Double>(20000);
+		Iterator<Double> sampled = sampler.sample(source.iterator());
+		assertTrue("ReservoirSamplerWithoutReplacement sampled output size should not beyond the source size.", getSize(sampled) == SOURCE_SIZE);
+	}
+	
+	@Test
+	public void testReservoirSamplerDuplicateElements() {
+		verifyRandomSamplerDuplicateElements(new ReservoirSamplerWithoutReplacement<Double>(100));
+		verifyRandomSamplerDuplicateElements(new ReservoirSamplerWithoutReplacement<Double>(1000));
+		verifyRandomSamplerDuplicateElements(new ReservoirSamplerWithoutReplacement<Double>(5000));
+	}
+	
+	@Test
+	public void testReservoirSamplerWithoutReplacement() {
+		verifyReservoirSamplerWithoutReplacement(100, false);
+		verifyReservoirSamplerWithoutReplacement(500, false);
+		verifyReservoirSamplerWithoutReplacement(1000, false);
+		verifyReservoirSamplerWithoutReplacement(5000, false);
+	}
+	
+	@Test
+	public void testReservoirSamplerWithReplacement() {
+		verifyReservoirSamplerWithReplacement(100, false);
+		verifyReservoirSamplerWithReplacement(500, false);
+		verifyReservoirSamplerWithReplacement(1000, false);
+		verifyReservoirSamplerWithReplacement(5000, false);
+	}
+
+	@Test
+	public void testReservoirSamplerWithMultiSourcePartitions1() {
+		initSourcePartition();
+
+		verifyReservoirSamplerWithoutReplacement(100, true);
+		verifyReservoirSamplerWithoutReplacement(500, true);
+		verifyReservoirSamplerWithoutReplacement(1000, true);
+		verifyReservoirSamplerWithoutReplacement(5000, true);
+	}
+
+	@Test
+	public void testReservoirSamplerWithMultiSourcePartitions2() {
+		initSourcePartition();
+
+		verifyReservoirSamplerWithReplacement(100, true);
+		verifyReservoirSamplerWithReplacement(500, true);
+		verifyReservoirSamplerWithReplacement(1000, true);
+		verifyReservoirSamplerWithReplacement(5000, true);
+	}
+
+	/*
+	 * Sample with fixed size, verify whether the sampled result size equals to input size.
+	 */
+	private void verifySamplerFixedSampleSize(int numSample, boolean withReplacement) {
+		RandomSampler<Double> sampler;
+		if (withReplacement) {
+			sampler = new ReservoirSamplerWithReplacement<Double>(numSample);
+		} else {
+			sampler = new ReservoirSamplerWithoutReplacement<Double>(numSample);
+		}
+		Iterator<Double> sampled = sampler.sample(source.iterator());
+		assertEquals(numSample, getSize(sampled));
+	}
+
+	/*
+	 * Sample with fraction, and verify whether the sampled result close to input fraction.
+	 */
+	private void verifySamplerFraction(double fraction, boolean withReplacement) {
+		RandomSampler<Double> sampler;
+		if (withReplacement) {
+			sampler = new PoissonSampler<Double>(fraction);
+		} else {
+			sampler = new BernoulliSampler<Double>(fraction);
+		}
+		
+		// take 5 times sample, and take the average result size for next step comparison.
+		int totalSampledSize = 0;
+		double sampleCount = 5;
+		for (int i = 0; i < sampleCount; i++) {
+			totalSampledSize += getSize(sampler.sample(source.iterator()));
+		}
+		double resultFraction = totalSampledSize / ((double) SOURCE_SIZE * sampleCount);
+		assertTrue(String.format("expected fraction: %f, result fraction: %f", fraction, resultFraction), Math.abs((resultFraction - fraction) / fraction) < 0.1);
+	}
+
+	/*
+	 * Test sampler without replacement, and verify that there should not exist any duplicate element in sampled result.
+	 */
+	private void verifyRandomSamplerDuplicateElements(final RandomSampler<Double> sampler) {
+		List<Double> list = Lists.newLinkedList(new Iterable<Double>() {
+			@Override
+			public Iterator<Double> iterator() {
+				return sampler.sample(source.iterator());
+			}
+		});
+		Set<Double> set = Sets.newHashSet(list);
+		assertTrue("There should not have duplicate element for sampler without replacement.", list.size() == set.size());
+	}
+	
+	private int getSize(Iterator iterator) {
+		int size = 0;
+		while (iterator.hasNext()) {
+			iterator.next();
+			size++;
+		}
+		return size;
+	}
+	
+	private void verifyBernoulliSampler(double fraction) {
+		BernoulliSampler<Double> sampler = new BernoulliSampler<Double>(fraction);
+		verifyRandomSamplerWithFraction(fraction, sampler, true);
+		verifyRandomSamplerWithFraction(fraction, sampler, false);
+	}
+	
+	private void verifyPoissonSampler(double fraction) {
+		PoissonSampler<Double> sampler = new PoissonSampler<Double>(fraction);
+		verifyRandomSamplerWithFraction(fraction, sampler, true);
+		verifyRandomSamplerWithFraction(fraction, sampler, false);
+	}
+	
+	private void verifyReservoirSamplerWithReplacement(int numSamplers, boolean sampleOnPartitions) {
+		ReservoirSamplerWithReplacement<Double> sampler = new ReservoirSamplerWithReplacement<Double>(numSamplers);
+		verifyRandomSamplerWithSampleSize(numSamplers, sampler, true, sampleOnPartitions);
+		verifyRandomSamplerWithSampleSize(numSamplers, sampler, false, sampleOnPartitions);
+	}
+	
+	private void verifyReservoirSamplerWithoutReplacement(int numSamplers, boolean sampleOnPartitions) {
+		ReservoirSamplerWithoutReplacement<Double> sampler = new ReservoirSamplerWithoutReplacement<Double>(numSamplers);
+		verifyRandomSamplerWithSampleSize(numSamplers, sampler, true, sampleOnPartitions);
+		verifyRandomSamplerWithSampleSize(numSamplers, sampler, false, sampleOnPartitions);
+	}
+
+	/*
+	 * Verify whether random sampler sample with fraction from source data randomly. There are two default sample, one is
+	 * sampled from source data with certain interval, the other is sampled only from the first half region of source data,
+	 * If random sampler select elements randomly from source, it would distributed well-proportioned on source data as well,
+	 * so the K-S Test result would accept the first one, while reject the second one.
+	 */
+	private void verifyRandomSamplerWithFraction(double fraction, RandomSampler sampler, boolean withDefaultSampler) {
+		double[] baseSample;
+		if (withDefaultSampler) {
+			baseSample = getDefaultSampler(fraction);
+		} else {
+			baseSample = getWrongSampler(fraction);
+		}
+		
+		verifyKSTest(sampler, baseSample, withDefaultSampler);
+	}
+
+	/*
+	 * Verify whether random sampler sample with fixed size from source data randomly. There are two default sample, one is
+	 * sampled from source data with certain interval, the other is sampled only from the first half region of source data,
+	 * If random sampler select elements randomly from source, it would distributed well-proportioned on source data as well,
+	 * so the K-S Test result would accept the first one, while reject the second one.
+	 */
+	private void verifyRandomSamplerWithSampleSize(int sampleSize, RandomSampler sampler, boolean withDefaultSampler, boolean sampleWithPartitions) {
+		double[] baseSample;
+		if (withDefaultSampler) {
+			baseSample = getDefaultSampler(sampleSize);
+		} else {
+			baseSample = getWrongSampler(sampleSize);
+		}
+		
+		verifyKSTest(sampler, baseSample, withDefaultSampler, sampleWithPartitions);
+	}
+
+	private void verifyKSTest(RandomSampler sampler, double[] defaultSampler, boolean expectSuccess) {
+		verifyKSTest(sampler, defaultSampler, expectSuccess, false);
+	}
+
+	private void verifyKSTest(RandomSampler sampler, double[] defaultSampler, boolean expectSuccess, boolean sampleOnPartitions) {
+		double[] sampled = getSampledOutput(sampler, sampleOnPartitions);
+		double pValue = ksTest.kolmogorovSmirnovStatistic(sampled, defaultSampler);
+		double dValue = getDValue(sampled.length, defaultSampler.length);
+		if (expectSuccess) {
+			assertTrue(String.format("KS test result with p value(%f), d value(%f)", pValue, dValue), pValue <= dValue);
+		} else {
+			assertTrue(String.format("KS test result with p value(%f), d value(%f)", pValue, dValue), pValue > dValue);
+		}
+	}
+	
+	private double[] getSampledOutput(RandomSampler<Double> sampler, boolean sampleOnPartitions) {
+		Iterator<Double> sampled = null;
+		if (sampleOnPartitions) {
+			DistributedRandomSampler<Double> reservoirRandomSampler = (DistributedRandomSampler<Double>)sampler;
+			List<IntermediateSampleData<Double>> intermediateResult = Lists.newLinkedList();
+			for (int i=0; i<DEFFAULT_PARTITION_NUMBER; i++) {
+				Iterator<IntermediateSampleData<Double>> partialIntermediateResult = reservoirRandomSampler.sampleInPartition(sourcePartitions[i].iterator());
+				while (partialIntermediateResult.hasNext()) {
+					intermediateResult.add(partialIntermediateResult.next());
+				}
+			}
+			sampled = reservoirRandomSampler.sampleInCoordinator(intermediateResult.iterator());
+		} else {
+			sampled = sampler.sample(source.iterator());
+		}
+		List<Double> list = Lists.newArrayList();
+		while (sampled.hasNext()) {
+			list.add(sampled.next());
+		}
+		double[] result = transferFromListToArrayWithOrder(list);
+		return result;
+	}
+
+	/*
+	 * Some sample result may not order by the input sequence, we should make it in order to do K-S test.
+	 */
+	private double[] transferFromListToArrayWithOrder(List<Double> list) {
+		Collections.sort(list, new Comparator<Double>() {
+			@Override
+			public int compare(Double o1, Double o2) {
+				return o1 - o2 >= 0 ? 1 : -1;
+			}
+		});
+		double[] result = new double[list.size()];
+		for (int i = 0; i < list.size(); i++) {
+			result[i] = list.get(i);
+		}
+		return result;
+	}
+
+	private double[] getDefaultSampler(double fraction) {
+		Preconditions.checkArgument(fraction > 0, "Sample fraction should be positive.");
+		int size = (int) (SOURCE_SIZE * fraction);
+		double step = 1 / fraction;
+		double[] defaultSampler = new double[size];
+		for (int i = 0; i < size; i++) {
+			defaultSampler[i] = Math.round(step * i);
+		}
+		
+		return defaultSampler;
+	}
+	
+	private double[] getDefaultSampler(int fixSize) {
+		Preconditions.checkArgument(fixSize > 0, "Sample fraction should be positive.");
+		int size = fixSize;
+		double step = SOURCE_SIZE / (double) fixSize;
+		double[] defaultSampler = new double[size];
+		for (int i = 0; i < size; i++) {
+			defaultSampler[i] = Math.round(step * i);
+		}
+		
+		return defaultSampler;
+	}
+	
+	/*
+	 * Build a failed sample distribution which only contains elements in the first half of source data.
+	 */
+	private double[] getWrongSampler(double fraction) {
+		Preconditions.checkArgument(fraction > 0, "Sample size should be positive.");
+		int size = (int) (SOURCE_SIZE * fraction);
+		int halfSourceSize = SOURCE_SIZE / 2;
+		double[] wrongSampler = new double[size];
+		for (int i = 0; i < size; i++) {
+			wrongSampler[i] = (double) i % halfSourceSize;
+		}
+		
+		return wrongSampler;
+	}
+	
+	/*
+	 * Build a failed sample distribution which only contains elements in the first half of source data.
+	 */
+	private double[] getWrongSampler(int fixSize) {
+		Preconditions.checkArgument(fixSize > 0, "Sample size be positive.");
+		int halfSourceSize = SOURCE_SIZE / 2;
+		int size = fixSize;
+		double[] wrongSampler = new double[size];
+		for (int i = 0; i < size; i++) {
+			wrongSampler[i] = (double) i % halfSourceSize;
+		}
+		
+		return wrongSampler;
+	}
+	
+	/*
+	 * Calculate the D value of K-S test for p-value 0.05, m and n are the sample size
+	 */
+	private double getDValue(int m, int n) {
+		Preconditions.checkArgument(m > 0, "input sample size should be positive.");
+		Preconditions.checkArgument(n > 0, "input sample size should be positive.");
+		double first = (double) m;
+		double second = (double) n;
+		return 1.36 * Math.sqrt((first + second) / (first * second));
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c9cfb17c/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSetUtils.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSetUtils.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSetUtils.scala
index b1a1ab6..2663754 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSetUtils.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSetUtils.scala
@@ -19,7 +19,7 @@
 package org.apache.flink.api.scala
 
 import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.java.{utils => jutils}
+import org.apache.flink.api.java.{Utils, utils => jutils}
 
 import _root_.scala.language.implicitConversions
 import _root_.scala.reflect.ClassTag
@@ -53,6 +53,44 @@ class DataSetUtils[T](val self: DataSet[T]) extends AnyVal {
     wrap(jutils.DataSetUtils.zipWithUniqueId(self.javaSet))
       .map { t => (t.f0.toLong, t.f1) }
   }
+
+  // --------------------------------------------------------------------------------------------
+  //  Sample
+  // --------------------------------------------------------------------------------------------
+  /**
+   * Generate a sample of DataSet by the probability fraction of each element.
+   *
+   * @param withReplacement Whether element can be selected more than once.
+   * @param fraction        Probability that each element is chosen, should be [0,1] without
+   *                        replacement, and [0, ∞) with replacement. While fraction is larger
+   *                        than 1, the elements are expected to be selected multi times into
+   *                        sample on average.
+   * @param seed            Random number generator seed.
+   * @return The sampled DataSet
+   */
+  def sample(withReplacement: Boolean, fraction: Double, seed: Long = Utils.RNG.nextLong())
+            (implicit ti: TypeInformation[T], ct: ClassTag[T]): DataSet[T] = {
+
+    wrap(jutils.DataSetUtils.sample(self.javaSet, withReplacement, fraction, seed))
+  }
+
+  /**
+   * Generate a sample of DataSet with fixed sample size.
+   * <p>
+   * <strong>NOTE:</strong> Sample with fixed size is not as efficient as sample with fraction,
+   * use sample with fraction unless you need exact precision.
+   * <p/>
+   *
+   * @param withReplacement Whether element can be selected more than once.
+   * @param numSample       The expected sample size.
+   * @param seed            Random number generator seed.
+   * @return The sampled DataSet
+   */
+  def sampleWithSize(withReplacement: Boolean, numSample: Int, seed: Long = Utils.RNG.nextLong())
+                    (implicit ti: TypeInformation[T], ct: ClassTag[T]): DataSet[T] = {
+
+    wrap(jutils.DataSetUtils.sampleWithSize(self.javaSet, withReplacement, numSample, seed))
+  }
 }
 
 object DataSetUtils {

http://git-wip-us.apache.org/repos/asf/flink/blob/c9cfb17c/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
----------------------------------------------------------------------
diff --git a/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java b/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
index c28347c..ce02267 100644
--- a/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
+++ b/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
@@ -19,11 +19,14 @@
 package org.apache.flink.test.util;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
 import akka.actor.ActorRef;
 import akka.dispatch.Futures;
 import akka.pattern.Patterns;
 import akka.util.Timeout;
 
+import com.google.common.collect.Lists;
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.io.IOUtils;
 import org.apache.flink.api.java.tuple.Tuple;
@@ -451,6 +454,34 @@ public class TestBaseUtils extends TestLogger {
 			assertEquals(extectedStrings[i], resultStrings[i]);
 		}
 	}
+	
+	// --------------------------------------------------------------------------------------------
+	// Comparison methods for tests using sample
+	// --------------------------------------------------------------------------------------------
+
+	/**
+	 * The expected string contains all expected results separate with line break, check whether all elements in result
+	 * are contained in the expected string.
+	 * @param result The test result.
+	 * @param expected The expected string value combination.
+	 * @param <T> The result type.
+	 */
+	public static <T> void containsResultAsText(List<T> result, String expected) {
+		String[] expectedStrings = expected.split("\n");
+		List<String> resultStrings = Lists.newLinkedList();
+
+		for (int i = 0; i < result.size(); i++) {
+			T val = result.get(i);
+			String str = (val == null) ? "null" : val.toString();
+			resultStrings.add(str);
+		}
+
+		List<String> expectedStringList = Arrays.asList(expectedStrings);
+
+		for (String element : resultStrings) {
+			assertTrue(expectedStringList.contains(element));
+		}
+	}
 
 	// --------------------------------------------------------------------------------------------
 	//  Miscellaneous helper methods

http://git-wip-us.apache.org/repos/asf/flink/blob/c9cfb17c/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/SampleITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/SampleITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/SampleITCase.java
new file mode 100644
index 0000000..a9c75e5
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/SampleITCase.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.test.javaApiOperators;
+
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.operators.FlatMapOperator;
+import org.apache.flink.api.java.operators.MapPartitionOperator;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.utils.DataSetUtils;
+import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.apache.flink.util.Collector;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.List;
+import java.util.Random;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+@SuppressWarnings("serial")
+@RunWith(Parameterized.class)
+public class SampleITCase extends MultipleProgramsTestBase {
+
+	private static final Random RNG = new Random();
+
+	public SampleITCase(TestExecutionMode mode) {
+		super(mode);
+	}
+
+	@Before
+	public void initiate() {
+		ExecutionEnvironment.getExecutionEnvironment().setParallelism(5);
+	}
+
+	@Test
+	public void testSamplerWithFractionWithoutReplacement() throws Exception {
+		verifySamplerWithFractionWithoutReplacement(0d);
+		verifySamplerWithFractionWithoutReplacement(0.2d);
+		verifySamplerWithFractionWithoutReplacement(1.0d);
+	}
+
+	@Test
+	public void testSamplerWithFractionWithReplacement() throws Exception {
+		verifySamplerWithFractionWithReplacement(0d);
+		verifySamplerWithFractionWithReplacement(0.2d);
+		verifySamplerWithFractionWithReplacement(1.0d);
+		verifySamplerWithFractionWithReplacement(2.0d);
+	}
+
+	@Test
+	public void testSamplerWithSizeWithoutReplacement() throws Exception {
+		verifySamplerWithFixedSizeWithoutReplacement(0);
+		verifySamplerWithFixedSizeWithoutReplacement(2);
+		verifySamplerWithFixedSizeWithoutReplacement(21);
+	}
+
+	@Test
+	public void testSamplerWithSizeWithReplacement() throws Exception {
+		verifySamplerWithFixedSizeWithReplacement(0);
+		verifySamplerWithFixedSizeWithReplacement(2);
+		verifySamplerWithFixedSizeWithReplacement(21);
+	}
+
+	private void verifySamplerWithFractionWithoutReplacement(double fraction) throws Exception {
+		verifySamplerWithFractionWithoutReplacement(fraction, RNG.nextLong());
+	}
+
+	private void verifySamplerWithFractionWithoutReplacement(double fraction, long seed) throws Exception {
+		verifySamplerWithFraction(false, fraction, seed);
+	}
+
+	private void verifySamplerWithFractionWithReplacement(double fraction) throws Exception {
+		verifySamplerWithFractionWithReplacement(fraction, RNG.nextLong());
+	}
+
+	private void verifySamplerWithFractionWithReplacement(double fraction, long seed) throws Exception {
+		verifySamplerWithFraction(true, fraction, seed);
+	}
+
+	private void verifySamplerWithFraction(boolean withReplacement, double fraction, long seed) throws Exception {
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		FlatMapOperator<Tuple3<Integer, Long, String>, String> ds = getSourceDataSet(env);
+		MapPartitionOperator<String, String> sampled = DataSetUtils.sample(ds, withReplacement, fraction, seed);
+		List<String> result = sampled.collect();
+		containsResultAsText(result, getSourceStrings());
+	}
+
+	private void verifySamplerWithFixedSizeWithoutReplacement(int numSamples) throws Exception {
+		verifySamplerWithFixedSizeWithoutReplacement(numSamples, RNG.nextLong());
+	}
+
+	private void verifySamplerWithFixedSizeWithoutReplacement(int numSamples, long seed) throws Exception {
+		verifySamplerWithFixedSize(false, numSamples, seed);
+	}
+
+	private void verifySamplerWithFixedSizeWithReplacement(int numSamples) throws Exception {
+		verifySamplerWithFixedSizeWithReplacement(numSamples, RNG.nextLong());
+	}
+
+	private void verifySamplerWithFixedSizeWithReplacement(int numSamples, long seed) throws Exception {
+		verifySamplerWithFixedSize(true, numSamples, seed);
+	}
+
+	private void verifySamplerWithFixedSize(boolean withReplacement, int numSamples, long seed) throws Exception {
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		FlatMapOperator<Tuple3<Integer, Long, String>, String> ds = getSourceDataSet(env);
+		DataSet<String> sampled = DataSetUtils.sampleWithSize(ds, withReplacement, numSamples, seed);
+		List<String> result = sampled.collect();
+		assertEquals(numSamples, result.size());
+		containsResultAsText(result, getSourceStrings());
+	}
+
+	private FlatMapOperator<Tuple3<Integer, Long, String>, String> getSourceDataSet(ExecutionEnvironment env) {
+		return CollectionDataSets.get3TupleDataSet(env).flatMap(
+			new FlatMapFunction<Tuple3<Integer, Long, String>, String>() {
+				@Override
+				public void flatMap(Tuple3<Integer, Long, String> value, Collector<String> out) throws Exception {
+					out.collect(value.f2);
+				}
+			});
+	}
+	
+	private String getSourceStrings() {
+		return "Hi\n" +
+			"Hello\n" +
+			"Hello world\n" +
+			"Hello world, how are you?\n" +
+			"I am fine.\n" +
+			"Luke Skywalker\n" +
+			"Comment#1\n" +
+			"Comment#2\n" +
+			"Comment#3\n" +
+			"Comment#4\n" +
+			"Comment#5\n" +
+			"Comment#6\n" +
+			"Comment#7\n" +
+			"Comment#8\n" +
+			"Comment#9\n" +
+			"Comment#10\n" +
+			"Comment#11\n" +
+			"Comment#12\n" +
+			"Comment#13\n" +
+			"Comment#14\n" +
+			"Comment#15\n";
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c9cfb17c/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/SampleITCase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/SampleITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/SampleITCase.scala
new file mode 100644
index 0000000..86b0818
--- /dev/null
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/SampleITCase.scala
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.scala.operators
+
+import java.util.{List => JavaList, Random}
+
+import org.apache.flink.api.scala._
+import org.apache.flink.api.scala.util.CollectionDataSets
+import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.{MultipleProgramsTestBase, TestBaseUtils}
+import org.junit.Assert._
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized
+import org.junit.{Before, After, Test}
+
+import org.apache.flink.api.scala.DataSetUtils.utilsToDataSet
+import scala.collection.JavaConverters._
+
+@RunWith(classOf[Parameterized])
+class SampleITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode) {
+  private val RNG: Random = new Random
+
+  private var result: JavaList[String] = null;
+
+  @Before
+  def initiate {
+    ExecutionEnvironment.getExecutionEnvironment.setParallelism(5)
+  }
+
+  @After
+  def after() = {
+    TestBaseUtils.containsResultAsText(result, getSourceStrings)
+  }
+
+  @Test
+  @throws(classOf[Exception])
+  def testSamplerWithFractionWithoutReplacement {
+    verifySamplerWithFractionWithoutReplacement(0d)
+    verifySamplerWithFractionWithoutReplacement(0.2d)
+    verifySamplerWithFractionWithoutReplacement(1.0d)
+  }
+
+  @Test
+  @throws(classOf[Exception])
+  def testSamplerWithFractionWithReplacement {
+    verifySamplerWithFractionWithReplacement(0d)
+    verifySamplerWithFractionWithReplacement(0.2d)
+    verifySamplerWithFractionWithReplacement(1.0d)
+    verifySamplerWithFractionWithReplacement(2.0d)
+  }
+
+  @Test
+  @throws(classOf[Exception])
+  def testSamplerWithSizeWithoutReplacement {
+    verifySamplerWithFixedSizeWithoutReplacement(0)
+    verifySamplerWithFixedSizeWithoutReplacement(2)
+    verifySamplerWithFixedSizeWithoutReplacement(21)
+  }
+
+  @Test
+  @throws(classOf[Exception])
+  def testSamplerWithSizeWithReplacement {
+    verifySamplerWithFixedSizeWithReplacement(0)
+    verifySamplerWithFixedSizeWithReplacement(2)
+    verifySamplerWithFixedSizeWithReplacement(21)
+  }
+
+  @throws(classOf[Exception])
+  private def verifySamplerWithFractionWithoutReplacement(fraction: Double) {
+    verifySamplerWithFractionWithoutReplacement(fraction, RNG.nextLong)
+  }
+
+  @throws(classOf[Exception])
+  private def verifySamplerWithFractionWithoutReplacement(fraction: Double, seed: Long) {
+    verifySamplerWithFraction(false, fraction, seed)
+  }
+
+  @throws(classOf[Exception])
+  private def verifySamplerWithFractionWithReplacement(fraction: Double) {
+    verifySamplerWithFractionWithReplacement(fraction, RNG.nextLong)
+  }
+
+  @throws(classOf[Exception])
+  private def verifySamplerWithFractionWithReplacement(fraction: Double, seed: Long) {
+    verifySamplerWithFraction(true, fraction, seed)
+  }
+
+  @throws(classOf[Exception])
+  private def verifySamplerWithFraction(withReplacement: Boolean, fraction: Double, seed: Long) {
+    val ds = getSourceDataSet()
+    val sampled = ds.sample(withReplacement, fraction, seed)
+    result = sampled.collect.asJava
+  }
+
+  @throws(classOf[Exception])
+  private def verifySamplerWithFixedSizeWithoutReplacement(numSamples: Int) {
+    verifySamplerWithFixedSizeWithoutReplacement(numSamples, RNG.nextLong)
+  }
+
+  @throws(classOf[Exception])
+  private def verifySamplerWithFixedSizeWithoutReplacement(numSamples: Int, seed: Long) {
+    verifySamplerWithFixedSize(false, numSamples, seed)
+  }
+
+  @throws(classOf[Exception])
+  private def verifySamplerWithFixedSizeWithReplacement(numSamples: Int) {
+    verifySamplerWithFixedSizeWithReplacement(numSamples, RNG.nextLong)
+  }
+
+  @throws(classOf[Exception])
+  private def verifySamplerWithFixedSizeWithReplacement(numSamples: Int, seed: Long) {
+    verifySamplerWithFixedSize(true, numSamples, seed)
+  }
+
+  @throws(classOf[Exception])
+  private def verifySamplerWithFixedSize(withReplacement: Boolean, numSamples: Int, seed: Long) {
+    val ds = getSourceDataSet()
+    val sampled = ds.sampleWithSize(withReplacement, numSamples, seed)
+    result = sampled.collect.asJava
+    assertEquals(numSamples, result.size)
+  }
+
+  private def getSourceDataSet(): DataSet[String] = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tupleDataSet = CollectionDataSets.get3TupleDataSet(env)
+    tupleDataSet.map(x => x._3)
+  }
+
+  private def getSourceStrings: String = {
+    return "Hi\n" +
+      "Hello\n" +
+      "Hello world\n" +
+      "Hello world, how are you?\n" +
+      "I am fine.\n" +
+      "Luke Skywalker\n" +
+      "Comment#1\n" +
+      "Comment#2\n" +
+      "Comment#3\n" +
+      "Comment#4\n" +
+      "Comment#5\n" +
+      "Comment#6\n" +
+      "Comment#7\n" +
+      "Comment#8\n" +
+      "Comment#9\n" +
+      "Comment#10\n" +
+      "Comment#11\n" +
+      "Comment#12\n" +
+      "Comment#13\n" +
+      "Comment#14\n" +
+      "Comment#15\n"
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c9cfb17c/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index f215fe4..6af0355 100644
--- a/pom.xml
+++ b/pom.xml
@@ -224,6 +224,12 @@ under the License.
 				<version>3.2.1</version>
 			</dependency>
 
+			<dependency>
+				<groupId>org.apache.commons</groupId>
+				<artifactId>commons-math3</artifactId>
+				<version>3.5</version>
+			</dependency>
+
 			<!-- Managed dependency required for HBase in flink-hbase -->
 			<dependency>
 				<groupId>org.javassist</groupId>


Mime
View raw message