flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From fhue...@apache.org
Subject [1/2] git commit: [FLINK-629] getFieldNotNull added to Tuple and updated Aggregators and Comparators to use that where appropriate
Date Sat, 06 Sep 2014 23:01:07 GMT
Repository: incubator-flink
Updated Branches:
  refs/heads/master 679bdc17c -> 66c1263de


[FLINK-629] getFieldNotNull added to Tuple and updated Aggregators and Comparators to use
that where appropriate


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/761952c0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/761952c0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/761952c0

Branch: refs/heads/master
Commit: 761952c04084e3819d763bbcfb564ec69933dd92
Parents: 679bdc1
Author: gyfora <gyula.fora@gmail.com>
Authored: Wed Jul 16 19:09:04 2014 +0200
Committer: Fabian Hueske <fhueske@apache.org>
Committed: Sat Sep 6 23:08:06 2014 +0200

----------------------------------------------------------------------
 .../apache/flink/types/NullFieldException.java  | 74 ++++++++++++++++++++
 .../api/java/operators/AggregateOperator.java   | 11 ++-
 .../translation/TupleKeyExtractingMapper.java   | 14 ++--
 .../org/apache/flink/api/java/tuple/Tuple.java  | 19 +++++
 .../java/typeutils/runtime/TupleComparator.java | 24 ++-----
 .../runtime/TupleLeadingFieldComparator.java    | 10 +--
 .../TupleLeadingFieldPairComparator.java        |  6 +-
 .../typeutils/runtime/TuplePairComparator.java  |  6 +-
 .../apache/flink/api/java/tuple/Tuple2Test.java | 16 ++++-
 9 files changed, 144 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/761952c0/flink-core/src/main/java/org/apache/flink/types/NullFieldException.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/types/NullFieldException.java b/flink-core/src/main/java/org/apache/flink/types/NullFieldException.java
new file mode 100755
index 0000000..085660d
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/types/NullFieldException.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.types;
+
+
+/**
+ * An exception specifying that a required field was not set in a record, i.e. was <code>null</code>.
+ */
+public class NullFieldException extends RuntimeException
+{
+	/**
+	 * UID for serialization interoperability. 
+	 */
+	private static final long serialVersionUID = -8820467525772321173L;
+	
+	private final int fieldNumber;
+
+	/**
+	 * Constructs an {@code NullFieldException} with {@code null}
+	 * as its error detail message.
+	 */
+	public NullFieldException() {
+		super();
+		this.fieldNumber = -1;
+	}
+
+	/**
+	 * Constructs an {@code NullFieldException} with the specified detail message.
+	 *
+	 * @param message The detail message.
+	 */
+	public NullFieldException(String message) {
+		super(message);
+		this.fieldNumber = -1;
+	}
+	
+	/**
+	 * Constructs an {@code NullFieldException} with a default message, referring to
+	 * given field number as the null field.
+	 *
+	 * @param fieldNumber The index of the field that was null, bit expected to hold a value.
+	 */
+	public NullFieldException(int fieldNumber) {
+		super("Field " + fieldNumber + " is null, but expected to hold a value.");
+		this.fieldNumber = fieldNumber;
+	}
+	
+	/**
+	 * Gets the field number that was attempted to access. If the number is not set, this method
returns
+	 * {@code -1}.
+	 * 
+	 * @return The field number that was attempted to access, or {@code -1}, if not set.
+	 */
+	public int getFieldNumber() {
+		return this.fieldNumber;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/761952c0/flink-java/src/main/java/org/apache/flink/api/java/operators/AggregateOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/AggregateOperator.java
b/flink-java/src/main/java/org/apache/flink/api/java/operators/AggregateOperator.java
index 1ceb8c8..6073a1a 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/AggregateOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/AggregateOperator.java
@@ -37,8 +37,9 @@ import org.apache.flink.api.java.functions.RichGroupReduceFunction.Combinable;
 import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.types.NullFieldException;
+import org.apache.flink.types.NullKeyFieldException;
 import org.apache.flink.util.Collector;
-
 import org.apache.flink.api.java.DataSet;
 
 /**
@@ -283,8 +284,12 @@ public class AggregateOperator<IN> extends SingleInputOperator<IN,
IN, Aggregate
 				current = values.next();
 				
 				for (int i = 0; i < fieldPositions.length; i++) {
-					Object val = current.getField(fieldPositions[i]);
-					aggFunctions[i].aggregate(val);
+					try {
+						Object val = current.getFieldNotNull(fieldPositions[i]);
+						aggFunctions[i].aggregate(val);
+					} catch (NullKeyFieldException e) {
+						throw new NullFieldException(fieldPositions[i]);
+					}
 				}
 			}
 			

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/761952c0/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/TupleKeyExtractingMapper.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/TupleKeyExtractingMapper.java
b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/TupleKeyExtractingMapper.java
index ecac775..97e67ca 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/TupleKeyExtractingMapper.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/TupleKeyExtractingMapper.java
@@ -21,6 +21,8 @@ package org.apache.flink.api.java.operators.translation;
 import org.apache.flink.api.java.functions.RichMapFunction;
 import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.types.NullFieldException;
+import org.apache.flink.types.NullKeyFieldException;
 
 
 public final class TupleKeyExtractingMapper<T, K> extends RichMapFunction<T, Tuple2<K,
T>> {
@@ -42,10 +44,14 @@ public final class TupleKeyExtractingMapper<T, K> extends RichMapFunction<T,
Tup
 		
 		Tuple v = (Tuple) value;
 		
-		K key = v.getField(pos);
-		tuple.f0 = key;
-		tuple.f1 = value;
-		
+		try {
+			K key = v.getFieldNotNull(pos);
+			tuple.f0 = key;
+			tuple.f1 = value;
+		} catch (NullFieldException nfex) {
+			throw new NullKeyFieldException(nfex);
+		}
+
 		return tuple;
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/761952c0/flink-java/src/main/java/org/apache/flink/api/java/tuple/Tuple.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/tuple/Tuple.java b/flink-java/src/main/java/org/apache/flink/api/java/tuple/Tuple.java
index 4738b02..2966830 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/tuple/Tuple.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/tuple/Tuple.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.api.java.tuple;
 
+import org.apache.flink.types.NullKeyFieldException;
+
 /**
  * The base class of all tuples. Tuples have a fix length and contain a set of fields,
  * which may all be of different types. Because Tuples are strongly typed, each distinct
@@ -47,6 +49,23 @@ public abstract class Tuple implements java.io.Serializable {
 	public abstract <T> T getField(int pos);
 	
 	/**
+	 * Gets the field at the specified position, throws NullKeyFieldException if the field is
null. Used for comparing key fields.
+	 * 
+	 * @param pos The position of the field, zero indexed. 
+	 * @returnThe field at the specified position.
+	 * @throws IndexOutOfBoundsException Thrown, if the position is negative, or equal to, or
larger than the number of fields.
+	 * @throws NullKeyFieldException Thrown, if the field at pos is null.
+	 */
+	public <T> T getFieldNotNull(int pos){
+		T field = getField(pos);
+		if (field != null) {
+			return field;
+		} else {
+			throw new NullKeyFieldException(pos);
+		}
+	}
+
+	/**
 	 * Sets the field at the specified position.
 	 *
 	 * @param value The value to be assigned to the field at the specified position.

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/761952c0/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleComparator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleComparator.java
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleComparator.java
index e7dd25a..4472949 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleComparator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleComparator.java
@@ -156,17 +156,14 @@ public final class TupleComparator<T extends Tuple> extends TypeComparator<T>
im
 	public int hash(T value) {
 		int i = 0;
 		try {
-			int code = this.comparators[0].hash(value.getField(keyPositions[0]));
+			int code = this.comparators[0].hash(value.getFieldNotNull(keyPositions[0]));
 			
 			for (i = 1; i < this.keyPositions.length; i++) {
 				code *= HASH_SALT[i & 0x1F]; // salt code with (i % HASH_SALT.length)-th salt component
-				code += this.comparators[i].hash(value.getField(keyPositions[i]));
+				code += this.comparators[i].hash(value.getFieldNotNull(keyPositions[i]));
 			}
 			return code;
 		}
-		catch (NullPointerException npex) {
-			throw new NullKeyFieldException(keyPositions[i]);
-		}
 		catch (IndexOutOfBoundsException iobex) {
 			throw new KeyFieldOutOfBoundsException(keyPositions[i]);
 		}
@@ -177,12 +174,9 @@ public final class TupleComparator<T extends Tuple> extends TypeComparator<T>
im
 		int i = 0;
 		try {
 			for (; i < this.keyPositions.length; i++) {
-				this.comparators[i].setReference(toCompare.getField(this.keyPositions[i]));
+				this.comparators[i].setReference(toCompare.getFieldNotNull(this.keyPositions[i]));
 			}
 		}
-		catch (NullPointerException npex) {
-			throw new NullKeyFieldException(keyPositions[i]);
-		}
 		catch (IndexOutOfBoundsException iobex) {
 			throw new KeyFieldOutOfBoundsException(keyPositions[i]);
 		}
@@ -193,15 +187,12 @@ public final class TupleComparator<T extends Tuple> extends TypeComparator<T>
im
 		int i = 0;
 		try {
 			for (; i < this.keyPositions.length; i++) {
-				if (!this.comparators[i].equalToReference(candidate.getField(this.keyPositions[i])))
{
+				if (!this.comparators[i].equalToReference(candidate.getFieldNotNull(this.keyPositions[i])))
{
 					return false;
 				}
 			}
 			return true;
 		}
-		catch (NullPointerException npex) {
-			throw new NullKeyFieldException(keyPositions[i]);
-		}
 		catch (IndexOutOfBoundsException iobex) {
 			throw new KeyFieldOutOfBoundsException(keyPositions[i]);
 		}
@@ -236,15 +227,14 @@ public final class TupleComparator<T extends Tuple> extends TypeComparator<T>
im
 			for (; i < keyPositions.length; i++) {
 				int keyPos = keyPositions[i];
 				@SuppressWarnings("unchecked")
-				int cmp = comparators[i].compare((T)first.getField(keyPos), (T)second.getField(keyPos));
+				int cmp = comparators[i].compare((T)first.getFieldNotNull(keyPos), (T)second.getFieldNotNull(keyPos));
+
 				if (cmp != 0) {
 					return cmp;
 				}
 			}
 			
 			return 0;
-		} catch (NullPointerException npex) {
-			throw new NullKeyFieldException(keyPositions[i]);
 		} catch (IndexOutOfBoundsException iobex) {
 			throw new KeyFieldOutOfBoundsException(keyPositions[i]);
 		}
@@ -304,7 +294,7 @@ public final class TupleComparator<T extends Tuple> extends TypeComparator<T>
im
 			{
 				int len = this.normalizedKeyLengths[i]; 
 				len = numBytes >= len ? len : numBytes;
-				this.comparators[i].putNormalizedKey(value.getField(this.keyPositions[i]), target, offset,
len);
+				this.comparators[i].putNormalizedKey(value.getFieldNotNull(this.keyPositions[i]), target,
offset, len);
 				numBytes -= len;
 				offset += len;
 			}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/761952c0/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleLeadingFieldComparator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleLeadingFieldComparator.java
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleLeadingFieldComparator.java
index 43906f8..7a9aa43 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleLeadingFieldComparator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleLeadingFieldComparator.java
@@ -46,18 +46,18 @@ public final class TupleLeadingFieldComparator<T extends Tuple, K>
extends TypeC
 	
 	@Override
 	public int hash(T value) {
-		return comparator.hash(value.<K>getField(0));
+		return comparator.hash(value.<K>getFieldNotNull(0));
 		
 	}
 
 	@Override
 	public void setReference(T toCompare) {
-		this.comparator.setReference(toCompare.<K>getField(0));
+		this.comparator.setReference(toCompare.<K>getFieldNotNull(0));
 	}
 
 	@Override
 	public boolean equalToReference(T candidate) {
-		return this.comparator.equalToReference(candidate.<K>getField(0));
+		return this.comparator.equalToReference(candidate.<K>getFieldNotNull(0));
 	}
 
 	@SuppressWarnings("unchecked")
@@ -68,7 +68,7 @@ public final class TupleLeadingFieldComparator<T extends Tuple, K>
extends TypeC
 	
 	@Override
 	public int compare(T first, T second) {
-		return this.comparator.compare(first.<K>getField(0), second.<K>getField(0));
+		return this.comparator.compare(first.<K>getFieldNotNull(0), second.<K>getFieldNotNull(0));
 	}
 
 	@Override
@@ -98,7 +98,7 @@ public final class TupleLeadingFieldComparator<T extends Tuple, K>
extends TypeC
 
 	@Override
 	public void putNormalizedKey(T record, MemorySegment target, int offset, int numBytes) {
-		this.comparator.putNormalizedKey(record.<K>getField(0), target, offset, numBytes);
+		this.comparator.putNormalizedKey(record.<K>getFieldNotNull(0), target, offset, numBytes);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/761952c0/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleLeadingFieldPairComparator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleLeadingFieldPairComparator.java
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleLeadingFieldPairComparator.java
index cb7eef5..749c38d 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleLeadingFieldPairComparator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleLeadingFieldPairComparator.java
@@ -39,17 +39,17 @@ public class TupleLeadingFieldPairComparator<K, T1 extends Tuple, T2
extends Tup
 	
 	@Override
 	public void setReference(T1 reference) {
-		this.comparator1.setReference(reference.<K>getField(0));
+		this.comparator1.setReference(reference.<K>getFieldNotNull(0));
 	}
 
 	@Override
 	public boolean equalToReference(T2 candidate) {
-		return this.comparator1.equalToReference(candidate.<K>getField(0));
+		return this.comparator1.equalToReference(candidate.<K>getFieldNotNull(0));
 	}
 
 	@Override
 	public int compareToReference(T2 candidate) {
-		this.comparator2.setReference(candidate.<K>getField(0));
+		this.comparator2.setReference(candidate.<K>getFieldNotNull(0));
 		return this.comparator1.compareToReference(this.comparator2);
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/761952c0/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TuplePairComparator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TuplePairComparator.java
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TuplePairComparator.java
index 796799b..43f46e4 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TuplePairComparator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TuplePairComparator.java
@@ -59,14 +59,14 @@ public class TuplePairComparator<T1 extends Tuple, T2 extends Tuple>
extends Typ
 	@Override
 	public void setReference(T1 reference) {
 		for (int i = 0; i < this.comparators1.length; i++) {
-			this.comparators1[i].setReference(reference.getField(keyFields1[i]));
+			this.comparators1[i].setReference(reference.getFieldNotNull(keyFields1[i]));
 		}
 	}
 
 	@Override
 	public boolean equalToReference(T2 candidate) {
 		for (int i = 0; i < this.comparators1.length; i++) {
-			if (!this.comparators1[i].equalToReference(candidate.getField(keyFields2[i]))) {
+			if (!this.comparators1[i].equalToReference(candidate.getFieldNotNull(keyFields2[i])))
{
 				return false;
 			}
 		}
@@ -76,7 +76,7 @@ public class TuplePairComparator<T1 extends Tuple, T2 extends Tuple>
extends Typ
 	@Override
 	public int compareToReference(T2 candidate) {
 		for (int i = 0; i < this.comparators1.length; i++) {
-			this.comparators2[i].setReference(candidate.getField(keyFields2[i]));
+			this.comparators2[i].setReference(candidate.getFieldNotNull(keyFields2[i]));
 			int res = this.comparators1[i].compareToReference(this.comparators2[i]);
 			if(res != 0) {
 				return res;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/761952c0/flink-java/src/test/java/org/apache/flink/api/java/tuple/Tuple2Test.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/tuple/Tuple2Test.java b/flink-java/src/test/java/org/apache/flink/api/java/tuple/Tuple2Test.java
index 7379a88..caf98fd 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/tuple/Tuple2Test.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/tuple/Tuple2Test.java
@@ -21,7 +21,7 @@ package org.apache.flink.api.java.tuple;
 
 import org.junit.Assert;
 
-import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.types.NullKeyFieldException;
 import org.junit.Test;
 
 public class Tuple2Test {
@@ -35,4 +35,18 @@ public class Tuple2Test {
 
 		Assert.assertEquals(swapped.f1, toSwap.f0);
 	}
+	
+	@Test
+	public void testGetFieldNotNull() {
+		Tuple2<String, Integer> tuple = new Tuple2<String, Integer>(new String("Test
case"), null);
+
+		Assert.assertEquals("Test case", tuple.getFieldNotNull(0));
+
+		try {
+			tuple.getFieldNotNull(1);
+			Assert.fail();
+		} catch (NullKeyFieldException e) {
+			// right
+		}
+	}
 }


Mime
View raw message