flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From u..@apache.org
Subject flink git commit: [FLINK-3657] [dataSet] Change access of DataSetUtils.countElements() to 'public'
Date Mon, 18 Apr 2016 10:43:42 GMT
Repository: flink
Updated Branches:
  refs/heads/release-1.0 c1eb247e9 -> 5987eb6fd


[FLINK-3657] [dataSet] Change access of DataSetUtils.countElements() to 'public'

This closes #1829


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5987eb6f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5987eb6f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5987eb6f

Branch: refs/heads/release-1.0
Commit: 5987eb6fdcbb50fdae0bcde631e424662b234c31
Parents: c1eb247
Author: smarthi <smarthi@apache.org>
Authored: Fri Apr 15 21:44:00 2016 +0200
Committer: Ufuk Celebi <uce@apache.org>
Committed: Mon Apr 18 12:41:59 2016 +0200

----------------------------------------------------------------------
 .../flink/api/java/utils/DataSetUtils.java      |  5 ++---
 .../apache/flink/api/scala/utils/package.scala  | 14 ++++++++++++++
 .../flink/test/util/DataSetUtilsITCase.java     | 20 ++++++++++++++++----
 .../api/scala/util/DataSetUtilsITCase.scala     | 19 +++++++++++++++++--
 4 files changed, 49 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/5987eb6f/flink-java/src/main/java/org/apache/flink/api/java/utils/DataSetUtils.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/utils/DataSetUtils.java b/flink-java/src/main/java/org/apache/flink/api/java/utils/DataSetUtils.java
index 78e5231..c1e0819 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/utils/DataSetUtils.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/utils/DataSetUtils.java
@@ -53,7 +53,7 @@ public final class DataSetUtils {
 	 * @param input the DataSet received as input
 	 * @return a data set containing tuples of subtask index, number of elements mappings.
 	 */
-	private static <T> DataSet<Tuple2<Integer, Long>> countElements(DataSet<T>
input) {
+	public static <T> DataSet<Tuple2<Integer, Long>> countElementsPerPartition(DataSet<T>
input) {
 		return input.mapPartition(new RichMapPartitionFunction<T, Tuple2<Integer, Long>>()
{
 			@Override
 			public void mapPartition(Iterable<T> values, Collector<Tuple2<Integer, Long>>
out) throws Exception {
@@ -61,7 +61,6 @@ public final class DataSetUtils {
 				for (T value : values) {
 					counter++;
 				}
-
 				out.collect(new Tuple2<>(getRuntimeContext().getIndexOfThisSubtask(), counter));
 			}
 		});
@@ -76,7 +75,7 @@ public final class DataSetUtils {
 	 */
 	public static <T> DataSet<Tuple2<Long, T>> zipWithIndex(DataSet<T>
input) {
 
-		DataSet<Tuple2<Integer, Long>> elementCount = countElements(input);
+		DataSet<Tuple2<Integer, Long>> elementCount = countElementsPerPartition(input);
 
 		return input.mapPartition(new RichMapPartitionFunction<T, Tuple2<Long, T>>()
{
 

http://git-wip-us.apache.org/repos/asf/flink/blob/5987eb6f/flink-scala/src/main/scala/org/apache/flink/api/scala/utils/package.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/utils/package.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/utils/package.scala
index 6407093..6d116e9 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/utils/package.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/utils/package.scala
@@ -41,6 +41,20 @@ package object utils {
   implicit class DataSetUtils[T: TypeInformation : ClassTag](val self: DataSet[T]) {
 
     /**
+      * Method that goes over all the elements in each partition in order to retrieve
+      * the total number of elements.
+      *
+      * @return a data set of tuple2 consisting of (subtask index, number of elements mappings)
+      */
+    def countElementsPerPartition: DataSet[(Int, Long)] = {
+      implicit val typeInfo = createTuple2TypeInformation[Int, Long](
+        BasicTypeInfo.INT_TYPE_INFO.asInstanceOf[TypeInformation[Int]],
+        BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]]
+      )
+      wrap(jutils.countElementsPerPartition(self.javaSet)).map { t => (t.f0.toInt, t.f1.toLong)}
+    }
+
+    /**
      * Method that takes a set of subtask index, total number of elements mappings
      * and assigns ids to all the elements from the input data set.
      *

http://git-wip-us.apache.org/repos/asf/flink/blob/5987eb6f/flink-tests/src/test/java/org/apache/flink/test/util/DataSetUtilsITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/util/DataSetUtilsITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/util/DataSetUtilsITCase.java
index 4ccc6e2..afbcb89 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/util/DataSetUtilsITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/util/DataSetUtilsITCase.java
@@ -18,8 +18,6 @@
 
 package org.apache.flink.test.util;
 
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
@@ -32,8 +30,10 @@ import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Comparator;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 
@@ -45,12 +45,24 @@ public class DataSetUtilsITCase extends MultipleProgramsTestBase {
 	}
 
 	@Test
+	public void testCountElementsPerPartition() throws Exception {
+	 	ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+	 	long expectedSize = 100L;
+	 	DataSet<Long> numbers = env.generateSequence(0, expectedSize - 1);
+
+	 	DataSet<Tuple2<Integer, Long>> ds = DataSetUtils.countElementsPerPartition(numbers);
+
+	 	Assert.assertEquals(env.getParallelism(), ds.count());
+	 	Assert.assertEquals(expectedSize, ds.sum(1).collect().get(0).f1.longValue());
+	}
+
+	@Test
 	public void testZipWithIndex() throws Exception {
 		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 		long expectedSize = 100L;
 		DataSet<Long> numbers = env.generateSequence(0, expectedSize - 1);
 
-		List<Tuple2<Long, Long>> result = Lists.newArrayList(DataSetUtils.zipWithIndex(numbers).collect());
+		List<Tuple2<Long, Long>> result = new ArrayList<>(DataSetUtils.zipWithIndex(numbers).collect());
 
 		Assert.assertEquals(expectedSize, result.size());
 		// sort result by created index
@@ -79,7 +91,7 @@ public class DataSetUtilsITCase extends MultipleProgramsTestBase {
 			}
 		});
 
-		Set<Long> result = Sets.newHashSet(ids.collect());
+		Set<Long> result = new HashSet<>(ids.collect());
 
 		Assert.assertEquals(expectedSize, result.size());
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/5987eb6f/flink-tests/src/test/scala/org/apache/flink/api/scala/util/DataSetUtilsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/util/DataSetUtilsITCase.scala
b/flink-tests/src/test/scala/org/apache/flink/api/scala/util/DataSetUtilsITCase.scala
index 25ecc9c..83dd2a4 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/util/DataSetUtilsITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/util/DataSetUtilsITCase.scala
@@ -34,7 +34,7 @@ class DataSetUtilsITCase (
   @Test
   @throws(classOf[Exception])
   def testZipWithIndex(): Unit = {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+    val env = ExecutionEnvironment.getExecutionEnvironment
 
     val expectedSize = 100L
 
@@ -52,7 +52,7 @@ class DataSetUtilsITCase (
   @Test
   @throws(classOf[Exception])
   def testZipWithUniqueId(): Unit = {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+    val env = ExecutionEnvironment.getExecutionEnvironment
 
     val expectedSize = 100L
 
@@ -73,4 +73,19 @@ class DataSetUtilsITCase (
     Assert.assertEquals(checksum.getCount, 15)
     Assert.assertEquals(checksum.getChecksum, 55)
   }
+
+  @Test
+  @throws(classOf[Exception])
+  def testCountElementsPerPartition(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+
+    val expectedSize = 100L
+
+    val numbers = env.generateSequence(0, expectedSize - 1)
+
+    val ds = numbers.countElementsPerPartition
+
+    Assert.assertEquals(env.getParallelism, ds.collect().size)
+    Assert.assertEquals(expectedSize, ds.sum(1).collect().head._2)
+  }
 }


Mime
View raw message