flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [3/6] flink git commit: [FLINK-2158] Add support for null to the DateSerializer
Date Wed, 01 Jul 2015 15:56:04 GMT
[FLINK-2158] Add support for null to the DateSerializer

This closes #780


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4e9e0d68
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4e9e0d68
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4e9e0d68

Branch: refs/heads/master
Commit: 4e9e0d6839ba1a817432169bf6ed7b777e3096d6
Parents: f5c1768
Author: Robert Metzger <rmetzger@apache.org>
Authored: Thu Jun 4 13:19:32 2015 +0200
Committer: Stephan Ewen <sewen@apache.org>
Committed: Wed Jul 1 16:14:06 2015 +0200

----------------------------------------------------------------------
 .../common/typeutils/base/DateSerializer.java   | 26 ++++++++++++++--
 .../apache/flink/types/NullFieldException.java  | 12 ++++++++
 .../common/typeutils/SerializerTestBase.java    |  3 +-
 .../java/typeutils/runtime/TupleSerializer.java |  2 +-
 .../javaApiOperators/GroupReduceITCase.java     | 32 ++++++++++++++++++++
 5 files changed, 70 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/4e9e0d68/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/DateSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/DateSerializer.java
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/DateSerializer.java
index 6aa11eb..d427918 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/DateSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/DateSerializer.java
@@ -43,11 +43,18 @@ public final class DateSerializer extends TypeSerializerSingleton<Date>
{
 
 	@Override
 	public Date copy(Date from) {
+		if(from == null) {
+			return null;
+		}
 		return new Date(from.getTime());
 	}
+
 	
 	@Override
 	public Date copy(Date from, Date reuse) {
+		if(from == null) {
+			return null;
+		}
 		reuse.setTime(from.getTime());
 		return reuse;
 	}
@@ -59,17 +66,30 @@ public final class DateSerializer extends TypeSerializerSingleton<Date>
{
 
 	@Override
 	public void serialize(Date record, DataOutputView target) throws IOException {
-		target.writeLong(record.getTime());
+		if(record == null) {
+			target.writeLong(-1L);
+		} else {
+			target.writeLong(record.getTime());
+		}
 	}
 
 	@Override
 	public Date deserialize(DataInputView source) throws IOException {
-		return new Date(source.readLong());
+		long v = source.readLong();
+		if(v == -1L) {
+			return null;
+		} else {
+			return new Date(v);
+		}
 	}
 	
 	@Override
 	public Date deserialize(Date reuse, DataInputView source) throws IOException {
-		reuse.setTime(source.readLong());
+		long v = source.readLong();
+		if(v == -1L) {
+			return null;
+		}
+		reuse.setTime(v);
 		return reuse;
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/4e9e0d68/flink-core/src/main/java/org/apache/flink/types/NullFieldException.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/types/NullFieldException.java b/flink-core/src/main/java/org/apache/flink/types/NullFieldException.java
index c192ec2..5c48bf4 100644
--- a/flink-core/src/main/java/org/apache/flink/types/NullFieldException.java
+++ b/flink-core/src/main/java/org/apache/flink/types/NullFieldException.java
@@ -59,6 +59,18 @@ public class NullFieldException extends RuntimeException
 		super("Field " + fieldIdx + " is null, but expected to hold a value.");
 		this.fieldPos = fieldIdx;
 	}
+
+	/**
+	 * Constructs an {@code NullFieldException} with a default message, referring to
+	 * given field number as the null field and a cause (Throwable)
+	 *
+	 * @param fieldIdx The index of the field that was null, but expected to hold a value.
+	 * @param cause Pass the root cause of the error
+	 */
+	public NullFieldException(int fieldIdx, Throwable cause) {
+		super("Field " + fieldIdx + " is null, but expected to hold a value.", cause);
+		this.fieldPos = fieldIdx;
+	}
 	
 	/**
 	 * Gets the field number that was attempted to access. If the number is not set, this method
returns

http://git-wip-us.apache.org/repos/asf/flink/blob/4e9e0d68/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestBase.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestBase.java
b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestBase.java
index 59bea0c..998ae12 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestBase.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestBase.java
@@ -182,7 +182,8 @@ public abstract class SerializerTestBase<T> {
 			fail("Exception in test: " + e.getMessage());
 		}
 	}
-	
+
+
 	@Test
 	public void testSerializeIndividuallyReusingValues() {
 		try {

http://git-wip-us.apache.org/repos/asf/flink/blob/4e9e0d68/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializer.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializer.java
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializer.java
index 231486d..2b330c2 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializer.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializer.java
@@ -115,7 +115,7 @@ public final class TupleSerializer<T extends Tuple> extends TupleSerializerBase<
 			try {
 				fieldSerializers[i].serialize(o, target);
 			} catch (NullPointerException npex) {
-				throw new NullFieldException(i);
+				throw new NullFieldException(i, npex);
 			}
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/4e9e0d68/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupReduceITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupReduceITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupReduceITCase.java
index 4061195..d52055d 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupReduceITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupReduceITCase.java
@@ -52,6 +52,7 @@ import org.junit.runners.Parameterized;
 import scala.math.BigInt;
 
 import java.util.Collection;
+import java.util.Date;
 import java.util.Iterator;
 
 @SuppressWarnings("serial")
@@ -1101,6 +1102,37 @@ public class GroupReduceITCase extends MultipleProgramsTestBase {
 		expected = "(1)\n";
 	}
 
+	/**
+	 * Fix for FLINK-2158.
+	 *
+	 * @throws Exception
+	 */
+	@Test
+	public void testDateNullException() throws Exception {
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		DataSet<Tuple2<Integer, Date>> in = env.fromElements(new Tuple2<Integer,
Date>(0, new Date(1230000000)),
+				new Tuple2<Integer, Date>(1, null),
+				new Tuple2<Integer, Date>(2, new Date(1230000000))
+		);
+
+		DataSet<String> r = in.groupBy(0).reduceGroup(new GroupReduceFunction<Tuple2<Integer,
Date>, String>() {
+			@Override
+			public void reduce(Iterable<Tuple2<Integer, Date>> values, Collector<String>
out) throws Exception {
+				for (Tuple2<Integer, Date> e : values) {
+					out.collect(Integer.toString(e.f0));
+				}
+			}
+		});
+
+		r.writeAsText(resultPath);
+		env.execute();
+
+		expected = "0\n1\n2\n";
+	}
+
+
+
 	public static class GroupReducer8 implements GroupReduceFunction<CollectionDataSets.PojoWithCollection,
String> {
 		@Override
 		public void reduce(


Mime
View raw message