flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [10/12] flink git commit: [FLINK-3219] [java scala] Implement DataSet.count and DataSet.collect using a single operator
Date Fri, 15 Jan 2016 10:53:40 GMT
[FLINK-3219] [java scala] Implement DataSet.count and DataSet.collect using a single operator


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e9a53587
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e9a53587
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e9a53587

Branch: refs/heads/master
Commit: e9a535877906bd75df3e633e3c5dad556b9c925d
Parents: 0937be0
Author: Greg Hogan <code@greghogan.com>
Authored: Mon Jan 11 17:04:31 2016 -0500
Committer: Stephan Ewen <sewen@apache.org>
Committed: Fri Jan 15 11:44:21 2016 +0100

----------------------------------------------------------------------
 .../java/org/apache/flink/api/java/DataSet.java |  7 ++---
 .../java/org/apache/flink/api/java/Utils.java   | 33 +++++++++++++-------
 .../org/apache/flink/api/scala/DataSet.scala    |  7 ++---
 .../jsonplan/JsonJobGraphGenerationTest.java    |  2 +-
 4 files changed, 28 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/e9a53587/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
index c5a636c..be84032 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
@@ -46,7 +46,6 @@ import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.functions.SelectByMaxFunction;
 import org.apache.flink.api.java.functions.SelectByMinFunction;
 import org.apache.flink.api.java.io.CsvOutputFormat;
-import org.apache.flink.api.java.io.DiscardingOutputFormat;
 import org.apache.flink.api.java.io.PrintingOutputFormat;
 import org.apache.flink.api.java.io.TextOutputFormat;
 import org.apache.flink.api.java.io.TextOutputFormat.TextFormatter;
@@ -387,8 +386,7 @@ public abstract class DataSet<T> {
 	public long count() throws Exception {
 		final String id = new AbstractID().toString();
 
-		flatMap(new Utils.CountHelper<T>(id)).name("count()")
-				.output(new DiscardingOutputFormat<Long>()).name("count() sink");
+		output(new Utils.CountHelper<T>(id)).name("count()");
 
 		JobExecutionResult res = getExecutionEnvironment().execute();
 		return res.<Long> getAccumulatorResult(id);
@@ -405,8 +403,7 @@ public abstract class DataSet<T> {
 		final String id = new AbstractID().toString();
 		final TypeSerializer<T> serializer = getType().createSerializer(getExecutionEnvironment().getConfig());
 		
-		this.flatMap(new Utils.CollectHelper<>(id, serializer)).name("collect()")
-				.output(new DiscardingOutputFormat<T>()).name("collect() sink");
+		this.output(new Utils.CollectHelper<>(id, serializer)).name("collect()");
 		JobExecutionResult res = getExecutionEnvironment().execute();
 
 		ArrayList<byte[]> accResult = res.getAccumulatorResult(id);

http://git-wip-us.apache.org/repos/asf/flink/blob/e9a53587/flink-java/src/main/java/org/apache/flink/api/java/Utils.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/Utils.java b/flink-java/src/main/java/org/apache/flink/api/java/Utils.java
index 665f35f..cb10906 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/Utils.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/Utils.java
@@ -20,20 +20,19 @@ package org.apache.flink.api.java;
 
 import org.apache.commons.lang3.StringUtils;
 import org.apache.flink.api.common.accumulators.SerializedListAccumulator;
+import org.apache.flink.api.common.io.RichOutputFormat;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.CompositeType;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.typeutils.GenericTypeInfo;
 
+import java.io.IOException;
 import java.lang.reflect.Field;
 import java.lang.reflect.Modifier;
 import java.util.List;
 import java.util.Random;
 
-import org.apache.flink.api.common.functions.RichFlatMapFunction;
-
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.util.Collector;
 import static org.apache.flink.api.java.functions.FunctionAnnotation.SkipCodeAnalysis;
 
 /**
@@ -78,7 +77,7 @@ public final class Utils {
 	}
 
 	@SkipCodeAnalysis
-	public static class CountHelper<T> extends RichFlatMapFunction<T, Long> {
+	public static class CountHelper<T> extends RichOutputFormat<T> {
 
 		private static final long serialVersionUID = 1L;
 
@@ -91,18 +90,26 @@ public final class Utils {
 		}
 
 		@Override
-		public void flatMap(T value, Collector<Long> out) throws Exception {
+		public void configure(Configuration parameters) {
+		}
+
+		@Override
+		public void open(int taskNumber, int numTasks) throws IOException {
+		}
+
+		@Override
+		public void writeRecord(T record) throws IOException {
 			counter++;
 		}
 
 		@Override
-		public void close() throws Exception {
+		public void close() throws IOException {
 			getRuntimeContext().getLongCounter(id).add(counter);
 		}
 	}
 
 	@SkipCodeAnalysis
-	public static class CollectHelper<T> extends RichFlatMapFunction<T, T> {
+	public static class CollectHelper<T> extends RichOutputFormat<T> {
 
 		private static final long serialVersionUID = 1L;
 
@@ -117,17 +124,21 @@ public final class Utils {
 		}
 
 		@Override
-		public void open(Configuration parameters) throws Exception {
+		public void configure(Configuration parameters) {
+		}
+
+		@Override
+		public void open(int taskNumber, int numTasks) throws IOException {
 			this.accumulator = new SerializedListAccumulator<>();
 		}
 
 		@Override
-		public void flatMap(T value, Collector<T> out) throws Exception {
-			accumulator.add(value, serializer);
+		public void writeRecord(T record) throws IOException {
+			accumulator.add(record, serializer);
 		}
 
 		@Override
-		public void close() throws Exception {
+		public void close() throws IOException {
 			// Important: should only be added in close method to minimize traffic of accumulators
 			getRuntimeContext().addAccumulator(id, accumulator);
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9a53587/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala
index a3ce53c..396ee90 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala
@@ -30,7 +30,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.Utils.CountHelper
 import org.apache.flink.api.java.aggregation.Aggregations
 import org.apache.flink.api.java.functions.{FirstReducer, KeySelector}
-import org.apache.flink.api.java.io.{DiscardingOutputFormat, PrintingOutputFormat, TextOutputFormat}
+import org.apache.flink.api.java.io.{PrintingOutputFormat, TextOutputFormat}
 import org.apache.flink.api.java.operators.Keys.ExpressionKeys
 import org.apache.flink.api.java.operators._
 import org.apache.flink.api.java.operators.join.JoinType
@@ -521,7 +521,7 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) {
   @throws(classOf[Exception])
   def count(): Long = {
     val id = new AbstractID().toString
-    javaSet.flatMap(new CountHelper[T](id)).output(new DiscardingOutputFormat[java.lang.Long])
+    javaSet.output(new CountHelper[T](id))
     val res = getExecutionEnvironment.execute()
     res.getAccumulatorResult[Long](id)
   }
@@ -539,8 +539,7 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) {
     val id = new AbstractID().toString
     val serializer = getType().createSerializer(getExecutionEnvironment.getConfig)
     
-    javaSet.flatMap(new Utils.CollectHelper[T](id, serializer))
-           .output(new DiscardingOutputFormat[T])
+    javaSet.output(new Utils.CollectHelper[T](id, serializer))
     
     val res = getExecutionEnvironment.execute()
 

http://git-wip-us.apache.org/repos/asf/flink/blob/e9a53587/flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/JsonJobGraphGenerationTest.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/JsonJobGraphGenerationTest.java
b/flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/JsonJobGraphGenerationTest.java
index a862242..a9ade6a 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/JsonJobGraphGenerationTest.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/JsonJobGraphGenerationTest.java
@@ -183,7 +183,7 @@ public class JsonJobGraphGenerationTest {
 			// without arguments
 			try {
 				final int parallelism = 1; // some ops have DOP 1 forced
-				JsonValidator validator = new GenericValidator(parallelism, 10);
+				JsonValidator validator = new GenericValidator(parallelism, 9);
 				TestingExecutionEnvironment.setAsNext(validator, parallelism);
 
 				ConnectedComponents.main();


Mime
View raw message