flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rmetz...@apache.org
Subject [01/30] git commit: Fixes bugs where the TypeExtractor throws an NPE instead of the operators
Date Mon, 09 Jun 2014 18:30:36 GMT
Repository: incubator-flink
Updated Branches:
  refs/heads/master b746f452e -> 24d00598a


Fixes bugs where the TypeExtractor throws an NPE instead of the operators


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/2b0baea9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/2b0baea9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/2b0baea9

Branch: refs/heads/master
Commit: 2b0baea9b8a6dd99052c2dfa98cae719a39d6bbc
Parents: b746f45
Author: twalthr <info@twalthr.com>
Authored: Mon Jun 2 14:39:36 2014 +0200
Committer: StephanEwen <stephan.ewen@tu-berlin.de>
Committed: Fri Jun 6 15:53:24 2014 +0200

----------------------------------------------------------------------
 .../main/java/eu/stratosphere/api/java/DataSet.java  | 15 +++++++++++++++
 .../api/java/operators/CoGroupOperator.java          |  3 +++
 .../api/java/operators/CrossOperator.java            |  3 +++
 .../api/java/operators/FilterOperator.java           |  4 ----
 .../api/java/operators/FlatMapOperator.java          |  4 ----
 .../api/java/operators/JoinOperator.java             |  3 +++
 .../stratosphere/api/java/operators/MapOperator.java |  4 ----
 .../api/java/operators/ReduceGroupOperator.java      |  8 --------
 .../api/java/operators/ReduceOperator.java           |  8 --------
 .../api/java/operators/SortedGrouping.java           |  3 +++
 .../api/java/operators/UnsortedGrouping.java         |  6 ++++++
 11 files changed, 33 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2b0baea9/stratosphere-java/src/main/java/eu/stratosphere/api/java/DataSet.java
----------------------------------------------------------------------
diff --git a/stratosphere-java/src/main/java/eu/stratosphere/api/java/DataSet.java b/stratosphere-java/src/main/java/eu/stratosphere/api/java/DataSet.java
index 66f8eeb..758cbf2 100644
--- a/stratosphere-java/src/main/java/eu/stratosphere/api/java/DataSet.java
+++ b/stratosphere-java/src/main/java/eu/stratosphere/api/java/DataSet.java
@@ -130,6 +130,9 @@ public abstract class DataSet<T> {
 	 * @see DataSet
 	 */
 	public <R> MapOperator<T, R> map(MapFunction<T, R> mapper) {
+		if (mapper == null) {
+			throw new NullPointerException("Map function must not be null.");
+		}
 		return new MapOperator<T, R>(this, mapper);
 	}
 	
@@ -146,6 +149,9 @@ public abstract class DataSet<T> {
 	 * @see DataSet
 	 */
 	public <R> FlatMapOperator<T, R> flatMap(FlatMapFunction<T, R> flatMapper)
{
+		if (flatMapper == null) {
+			throw new NullPointerException("FlatMap function must not be null.");
+		}
 		return new FlatMapOperator<T, R>(this, flatMapper);
 	}
 	
@@ -163,6 +169,9 @@ public abstract class DataSet<T> {
 	 * @see DataSet
 	 */
 	public FilterOperator<T> filter(FilterFunction<T> filter) {
+		if (filter == null) {
+			throw new NullPointerException("Filter function must not be null.");
+		}
 		return new FilterOperator<T>(this, filter);
 	}
 	
@@ -229,6 +238,9 @@ public abstract class DataSet<T> {
 	 * @see DataSet
 	 */
 	public ReduceOperator<T> reduce(ReduceFunction<T> reducer) {
+		if (reducer == null) {
+			throw new NullPointerException("Reduce function must not be null.");
+		}
 		return new ReduceOperator<T>(this, reducer);
 	}
 	
@@ -246,6 +258,9 @@ public abstract class DataSet<T> {
 	 * @see DataSet
 	 */
 	public <R> ReduceGroupOperator<T, R> reduceGroup(GroupReduceFunction<T, R>
reducer) {
+		if (reducer == null) {
+			throw new NullPointerException("GroupReduce function must not be null.");
+		}
 		return new ReduceGroupOperator<T, R>(this, reducer);
 	}
 	

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2b0baea9/stratosphere-java/src/main/java/eu/stratosphere/api/java/operators/CoGroupOperator.java
----------------------------------------------------------------------
diff --git a/stratosphere-java/src/main/java/eu/stratosphere/api/java/operators/CoGroupOperator.java
b/stratosphere-java/src/main/java/eu/stratosphere/api/java/operators/CoGroupOperator.java
index 67b9e5c..ca4b1db 100644
--- a/stratosphere-java/src/main/java/eu/stratosphere/api/java/operators/CoGroupOperator.java
+++ b/stratosphere-java/src/main/java/eu/stratosphere/api/java/operators/CoGroupOperator.java
@@ -439,6 +439,9 @@ public class CoGroupOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1,
I2, OU
 				 * @see DataSet
 				 */
 				public <R> CoGroupOperator<I1, I2, R> with(CoGroupFunction<I1, I2, R>
function) {
+					if (function == null) {
+						throw new NullPointerException("CoGroup function must not be null.");
+					}
 					TypeInformation<R> returnType = TypeExtractor.getCoGroupReturnTypes(function,
input1.getType(), input2.getType());
 					return new CoGroupOperator<I1, I2, R>(input1, input2, keys1, keys2, function,
returnType);
 				}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2b0baea9/stratosphere-java/src/main/java/eu/stratosphere/api/java/operators/CrossOperator.java
----------------------------------------------------------------------
diff --git a/stratosphere-java/src/main/java/eu/stratosphere/api/java/operators/CrossOperator.java
b/stratosphere-java/src/main/java/eu/stratosphere/api/java/operators/CrossOperator.java
index 48da306..3566224 100644
--- a/stratosphere-java/src/main/java/eu/stratosphere/api/java/operators/CrossOperator.java
+++ b/stratosphere-java/src/main/java/eu/stratosphere/api/java/operators/CrossOperator.java
@@ -113,6 +113,9 @@ public class CrossOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1,
I2, OUT,
 		 * @see DataSet
 		 */
 		public <R> CrossOperator<I1, I2, R> with(CrossFunction<I1, I2, R> function)
{
+			if (function == null) {
+				throw new NullPointerException("Cross function must not be null.");
+			}
 			TypeInformation<R> returnType = TypeExtractor.getCrossReturnTypes(function, input1.getType(),
input2.getType());
 			return new CrossOperator<I1, I2, R>(input1, input2, function, returnType);
 		}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2b0baea9/stratosphere-java/src/main/java/eu/stratosphere/api/java/operators/FilterOperator.java
----------------------------------------------------------------------
diff --git a/stratosphere-java/src/main/java/eu/stratosphere/api/java/operators/FilterOperator.java
b/stratosphere-java/src/main/java/eu/stratosphere/api/java/operators/FilterOperator.java
index d9db757..adbe77f 100644
--- a/stratosphere-java/src/main/java/eu/stratosphere/api/java/operators/FilterOperator.java
+++ b/stratosphere-java/src/main/java/eu/stratosphere/api/java/operators/FilterOperator.java
@@ -34,10 +34,6 @@ public class FilterOperator<T> extends SingleInputUdfOperator<T,
T, FilterOperat
 	public FilterOperator(DataSet<T> input, FilterFunction<T> function) {
 		super(input, input.getType());
 		
-		if (function == null) {
-			throw new NullPointerException("Filter function must not be null.");
-		}
-		
 		this.function = function;
 		extractSemanticAnnotationsFromUdf(function.getClass());
 	}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2b0baea9/stratosphere-java/src/main/java/eu/stratosphere/api/java/operators/FlatMapOperator.java
----------------------------------------------------------------------
diff --git a/stratosphere-java/src/main/java/eu/stratosphere/api/java/operators/FlatMapOperator.java
b/stratosphere-java/src/main/java/eu/stratosphere/api/java/operators/FlatMapOperator.java
index 27025c7..32f2343 100644
--- a/stratosphere-java/src/main/java/eu/stratosphere/api/java/operators/FlatMapOperator.java
+++ b/stratosphere-java/src/main/java/eu/stratosphere/api/java/operators/FlatMapOperator.java
@@ -37,10 +37,6 @@ public class FlatMapOperator<IN, OUT> extends SingleInputUdfOperator<IN,
OUT, Fl
 	public FlatMapOperator(DataSet<IN> input, FlatMapFunction<IN, OUT> function)
{
 		super(input, TypeExtractor.getFlatMapReturnTypes(function, input.getType()));
 		
-		if (function == null) {
-			throw new NullPointerException("FlatMap function must not be null.");
-		}
-		
 		this.function = function;
 		extractSemanticAnnotationsFromUdf(function.getClass());
 	}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2b0baea9/stratosphere-java/src/main/java/eu/stratosphere/api/java/operators/JoinOperator.java
----------------------------------------------------------------------
diff --git a/stratosphere-java/src/main/java/eu/stratosphere/api/java/operators/JoinOperator.java
b/stratosphere-java/src/main/java/eu/stratosphere/api/java/operators/JoinOperator.java
index 578d68c..992cc0a 100644
--- a/stratosphere-java/src/main/java/eu/stratosphere/api/java/operators/JoinOperator.java
+++ b/stratosphere-java/src/main/java/eu/stratosphere/api/java/operators/JoinOperator.java
@@ -421,6 +421,9 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1,
 		 * @see DataSet
 		 */
 		public <R> EquiJoin<I1, I2, R> with(JoinFunction<I1, I2, R> function)
{
+			if (function == null) {
+				throw new NullPointerException("Join function must not be null.");
+			}
 			TypeInformation<R> returnType = TypeExtractor.getJoinReturnTypes(function, getInput1Type(),
getInput2Type());
 			return new EquiJoin<I1, I2, R>(getInput1(), getInput2(), getKeys1(), getKeys2(),
function, returnType, getJoinHint());
 		}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2b0baea9/stratosphere-java/src/main/java/eu/stratosphere/api/java/operators/MapOperator.java
----------------------------------------------------------------------
diff --git a/stratosphere-java/src/main/java/eu/stratosphere/api/java/operators/MapOperator.java
b/stratosphere-java/src/main/java/eu/stratosphere/api/java/operators/MapOperator.java
index e2c36bf..00bbb27 100644
--- a/stratosphere-java/src/main/java/eu/stratosphere/api/java/operators/MapOperator.java
+++ b/stratosphere-java/src/main/java/eu/stratosphere/api/java/operators/MapOperator.java
@@ -39,10 +39,6 @@ public class MapOperator<IN, OUT> extends SingleInputUdfOperator<IN,
OUT, MapOpe
 	public MapOperator(DataSet<IN> input, MapFunction<IN, OUT> function) {
 		super(input, TypeExtractor.getMapReturnTypes(function, input.getType()));
 		
-		if (function == null) {
-			throw new NullPointerException("Map function must not be null.");
-		}
-		
 		this.function = function;
 		extractSemanticAnnotationsFromUdf(function.getClass());
 	}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2b0baea9/stratosphere-java/src/main/java/eu/stratosphere/api/java/operators/ReduceGroupOperator.java
----------------------------------------------------------------------
diff --git a/stratosphere-java/src/main/java/eu/stratosphere/api/java/operators/ReduceGroupOperator.java
b/stratosphere-java/src/main/java/eu/stratosphere/api/java/operators/ReduceGroupOperator.java
index 9029b09..e001d91 100644
--- a/stratosphere-java/src/main/java/eu/stratosphere/api/java/operators/ReduceGroupOperator.java
+++ b/stratosphere-java/src/main/java/eu/stratosphere/api/java/operators/ReduceGroupOperator.java
@@ -58,10 +58,6 @@ public class ReduceGroupOperator<IN, OUT> extends SingleInputUdfOperator<IN,
OUT
 	public ReduceGroupOperator(DataSet<IN> input, GroupReduceFunction<IN, OUT> function)
{
 		super(input, TypeExtractor.getGroupReduceReturnTypes(function, input.getType()));
 		
-		if (function == null) {
-			throw new NullPointerException("GroupReduce function must not be null.");
-		}
-		
 		this.function = function;
 		this.grouper = null;
 		checkCombinability();
@@ -76,10 +72,6 @@ public class ReduceGroupOperator<IN, OUT> extends SingleInputUdfOperator<IN,
OUT
 	public ReduceGroupOperator(Grouping<IN> input, GroupReduceFunction<IN, OUT>
function) {
 		super(input != null ? input.getDataSet() : null, TypeExtractor.getGroupReduceReturnTypes(function,
input.getDataSet().getType()));
 		
-		if (function == null) {
-			throw new NullPointerException("GroupReduce function must not be null.");
-		}
-		
 		this.function = function;
 		this.grouper = input;
 		checkCombinability();

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2b0baea9/stratosphere-java/src/main/java/eu/stratosphere/api/java/operators/ReduceOperator.java
----------------------------------------------------------------------
diff --git a/stratosphere-java/src/main/java/eu/stratosphere/api/java/operators/ReduceOperator.java
b/stratosphere-java/src/main/java/eu/stratosphere/api/java/operators/ReduceOperator.java
index bdf9436..6056e8b 100644
--- a/stratosphere-java/src/main/java/eu/stratosphere/api/java/operators/ReduceOperator.java
+++ b/stratosphere-java/src/main/java/eu/stratosphere/api/java/operators/ReduceOperator.java
@@ -53,10 +53,6 @@ public class ReduceOperator<IN> extends SingleInputUdfOperator<IN,
IN, ReduceOpe
 	public ReduceOperator(DataSet<IN> input, ReduceFunction<IN> function) {
 		super(input, input.getType());
 		
-		if (function == null) {
-			throw new NullPointerException("Reduce function must not be null.");
-		}
-		
 		this.function = function;
 		this.grouper = null;
 		
@@ -67,10 +63,6 @@ public class ReduceOperator<IN> extends SingleInputUdfOperator<IN,
IN, ReduceOpe
 	public ReduceOperator(Grouping<IN> input, ReduceFunction<IN> function) {
 		super(input.getDataSet(), input.getDataSet().getType());
 		
-		if (function == null) {
-			throw new NullPointerException("Reduce function must not be null.");
-		}
-		
 		this.function = function;
 		this.grouper = input;
 		

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2b0baea9/stratosphere-java/src/main/java/eu/stratosphere/api/java/operators/SortedGrouping.java
----------------------------------------------------------------------
diff --git a/stratosphere-java/src/main/java/eu/stratosphere/api/java/operators/SortedGrouping.java
b/stratosphere-java/src/main/java/eu/stratosphere/api/java/operators/SortedGrouping.java
index 3edb05c..dc26a2b 100644
--- a/stratosphere-java/src/main/java/eu/stratosphere/api/java/operators/SortedGrouping.java
+++ b/stratosphere-java/src/main/java/eu/stratosphere/api/java/operators/SortedGrouping.java
@@ -72,6 +72,9 @@ public class SortedGrouping<T> extends Grouping<T> {
 	 * @see DataSet
 	 */
 	public <R> ReduceGroupOperator<T, R> reduceGroup(GroupReduceFunction<T, R>
reducer) {
+		if (reducer == null) {
+			throw new NullPointerException("GroupReduce function must not be null.");
+		}
 		return new ReduceGroupOperator<T, R>(this, reducer);
 	}
 	

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2b0baea9/stratosphere-java/src/main/java/eu/stratosphere/api/java/operators/UnsortedGrouping.java
----------------------------------------------------------------------
diff --git a/stratosphere-java/src/main/java/eu/stratosphere/api/java/operators/UnsortedGrouping.java
b/stratosphere-java/src/main/java/eu/stratosphere/api/java/operators/UnsortedGrouping.java
index 711bc24..95e40bc 100644
--- a/stratosphere-java/src/main/java/eu/stratosphere/api/java/operators/UnsortedGrouping.java
+++ b/stratosphere-java/src/main/java/eu/stratosphere/api/java/operators/UnsortedGrouping.java
@@ -64,6 +64,9 @@ public class UnsortedGrouping<T> extends Grouping<T> {
 	 * @see DataSet
 	 */
 	public ReduceOperator<T> reduce(ReduceFunction<T> reducer) {
+		if (reducer == null) {
+			throw new NullPointerException("Reduce function must not be null.");
+		}
 		return new ReduceOperator<T>(this, reducer);
 	}
 	
@@ -81,6 +84,9 @@ public class UnsortedGrouping<T> extends Grouping<T> {
 	 * @see DataSet
 	 */
 	public <R> ReduceGroupOperator<T, R> reduceGroup(GroupReduceFunction<T, R>
reducer) {
+		if (reducer == null) {
+			throw new NullPointerException("GroupReduce function must not be null.");
+		}
 		return new ReduceGroupOperator<T, R>(this, reducer);
 	}
 


Mime
View raw message