flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From fhue...@apache.org
Subject [1/2] flink git commit: [FLINK-1820] Consistent behavior of numeric value parsers.
Date Tue, 19 May 2015 07:51:13 GMT
Repository: flink
Updated Branches:
  refs/heads/master 6403dbdb4 -> ea23f28c4


[FLINK-1820] Consistent behavior of numeric value parsers.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ea23f28c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ea23f28c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ea23f28c

Branch: refs/heads/master
Commit: ea23f28c449a1a11e509ca62a36f770a6940d40a
Parents: 39d526e
Author: Fabian Hueske <fhueske@apache.org>
Authored: Tue May 19 00:37:30 2015 +0200
Committer: Fabian Hueske <fhueske@apache.org>
Committed: Tue May 19 00:52:29 2015 +0200

----------------------------------------------------------------------
 .../apache/flink/types/parser/ByteParser.java   |  6 +++++-
 .../apache/flink/types/parser/DoubleParser.java | 21 ++++++++++----------
 .../flink/types/parser/DoubleValueParser.java   |  8 +++++---
 .../apache/flink/types/parser/FieldParser.java  |  5 +----
 .../apache/flink/types/parser/FloatParser.java  | 20 +++++++++----------
 .../flink/types/parser/FloatValueParser.java    |  8 +++++---
 .../apache/flink/types/parser/IntParser.java    |  6 +++++-
 .../apache/flink/types/parser/LongParser.java   |  6 +++++-
 .../apache/flink/types/parser/ShortParser.java  |  6 +++++-
 .../flink/types/parser/ByteParserTest.java      |  5 +++++
 .../flink/types/parser/ByteValueParserTest.java |  5 +++++
 .../flink/types/parser/DoubleParserTest.java    |  5 +++++
 .../types/parser/DoubleValueParserTest.java     |  5 +++++
 .../flink/types/parser/FloatParserTest.java     |  5 +++++
 .../types/parser/FloatValueParserTest.java      |  5 +++++
 .../flink/types/parser/IntParserTest.java       |  5 +++++
 .../flink/types/parser/IntValueParserTest.java  |  5 +++++
 .../flink/types/parser/LongParserTest.java      |  5 +++++
 .../flink/types/parser/LongValueParserTest.java |  5 +++++
 .../flink/types/parser/ParserTestBase.java      | 19 +++++-------------
 .../types/parser/QuotedStringParserTest.java    |  5 +++++
 .../parser/QuotedStringValueParserTest.java     |  5 +++++
 .../flink/types/parser/ShortParserTest.java     |  5 +++++
 .../types/parser/ShortValueParserTest.java      |  5 +++++
 .../types/parser/UnquotedStringParserTest.java  |  5 +++++
 .../parser/UnquotedStringValueParserTest.java   |  5 +++++
 26 files changed, 137 insertions(+), 48 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/ea23f28c/flink-core/src/main/java/org/apache/flink/types/parser/ByteParser.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/ByteParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/ByteParser.java
index 09e517a..9810fea 100644
--- a/flink-core/src/main/java/org/apache/flink/types/parser/ByteParser.java
+++ b/flink-core/src/main/java/org/apache/flink/types/parser/ByteParser.java
@@ -112,6 +112,10 @@ public class ByteParser extends FieldParser<Byte> {
 		long val = 0;
 		boolean neg = false;
 
+		if(bytes[startPos] == delimiter) {
+			throw new NumberFormatException("Empty field.");
+		}
+
 		if (bytes[startPos] == '-') {
 			neg = true;
 			startPos++;
@@ -123,7 +127,7 @@ public class ByteParser extends FieldParser<Byte> {
 
 		for (; length > 0; startPos++, length--) {
 			if (bytes[startPos] == delimiter) {
-				throw new NumberFormatException("Empty field.");
+				return (byte) (neg ? -val : val);
 			}
 			if (bytes[startPos] < 48 || bytes[startPos] > 57) {
 				throw new NumberFormatException("Invalid character.");

http://git-wip-us.apache.org/repos/asf/flink/blob/ea23f28c/flink-core/src/main/java/org/apache/flink/types/parser/DoubleParser.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/DoubleParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/DoubleParser.java
index 086c1f5..17bb028 100644
--- a/flink-core/src/main/java/org/apache/flink/types/parser/DoubleParser.java
+++ b/flink-core/src/main/java/org/apache/flink/types/parser/DoubleParser.java
@@ -41,12 +41,13 @@ public class DoubleParser extends FieldParser<Double> {
 			i++;
 		}
 
-		String str = new String(bytes, startPos, i - startPos);
-		int len = str.length();
-		if (Character.isWhitespace(bytes[startPos]) || Character.isWhitespace(bytes[Math.max(i
- 1, 0)])) {
-			setErrorState(ParseErrorState.WHITESPACE_IN_NUMERIC_FIELD);
+		if (i > startPos &&
+				(Character.isWhitespace(bytes[startPos]) || Character.isWhitespace(bytes[(i - 1)])))
{
+			setErrorState(ParseErrorState.NUMERIC_VALUE_ILLEGAL_CHARACTER);
 			return -1;
 		}
+
+		String str = new String(bytes, startPos, i - startPos);
 		try {
 			this.result = Double.parseDouble(str);
 			return (i == limit) ? limit : i + delimiter.length;
@@ -102,16 +103,16 @@ public class DoubleParser extends FieldParser<Double> {
 		int i = 0;
 		final byte delByte = (byte) delimiter;
 
-		while (i < length && bytes[i] != delByte) {
+		while (i < length && bytes[startPos + i] != delByte) {
 			i++;
 		}
 
-		String str = new String(bytes, startPos, i - startPos);
-		int len = str.length();
-		if (Character.isWhitespace(bytes[startPos]) || Character.isWhitespace(bytes[Math.max(i
- 1, 0)])) {
-			throw new NumberFormatException("There is leading or trailing whitespace in the " +
-				"numeric field: " + str);
+		if (i > 0 &&
+				(Character.isWhitespace(bytes[startPos]) || Character.isWhitespace(bytes[startPos + i
- 1]))) {
+			throw new NumberFormatException("There is leading or trailing whitespace in the numeric
field.");
 		}
+
+		String str = new String(bytes, startPos, i);
 		return Double.parseDouble(str);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ea23f28c/flink-core/src/main/java/org/apache/flink/types/parser/DoubleValueParser.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/DoubleValueParser.java
b/flink-core/src/main/java/org/apache/flink/types/parser/DoubleValueParser.java
index 7751831..5a1199a 100644
--- a/flink-core/src/main/java/org/apache/flink/types/parser/DoubleValueParser.java
+++ b/flink-core/src/main/java/org/apache/flink/types/parser/DoubleValueParser.java
@@ -42,11 +42,13 @@ public class DoubleValueParser extends FieldParser<DoubleValue>
{
 			i++;
 		}
 		
-		String str = new String(bytes, startPos, i - startPos);
-		if (Character.isWhitespace(bytes[startPos]) || Character.isWhitespace(bytes[Math.max(i
- 1, 0)])) {
-			setErrorState(ParseErrorState.WHITESPACE_IN_NUMERIC_FIELD);
+		if (i > startPos &&
+				(Character.isWhitespace(bytes[startPos]) || Character.isWhitespace(bytes[i - 1]))) {
+			setErrorState(ParseErrorState.NUMERIC_VALUE_ILLEGAL_CHARACTER);
 			return -1;
 		}
+
+		String str = new String(bytes, startPos, i - startPos);
 		try {
 			double value = Double.parseDouble(str);
 			reusable.setValue(value);

http://git-wip-us.apache.org/repos/asf/flink/blob/ea23f28c/flink-core/src/main/java/org/apache/flink/types/parser/FieldParser.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/FieldParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/FieldParser.java
index 55e9915..52faf32 100644
--- a/flink-core/src/main/java/org/apache/flink/types/parser/FieldParser.java
+++ b/flink-core/src/main/java/org/apache/flink/types/parser/FieldParser.java
@@ -67,10 +67,7 @@ public abstract class FieldParser<T> {
 		UNQUOTED_CHARS_AFTER_QUOTED_STRING,
 		
 		/** The string is empty. */
-		EMPTY_STRING,
-
-		/** There is whitespace in a numeric field. */
-		WHITESPACE_IN_NUMERIC_FIELD
+		EMPTY_STRING
 	}
 	
 	private ParseErrorState errorState = ParseErrorState.NONE;

http://git-wip-us.apache.org/repos/asf/flink/blob/ea23f28c/flink-core/src/main/java/org/apache/flink/types/parser/FloatParser.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/FloatParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/FloatParser.java
index be98aa1..5868632 100644
--- a/flink-core/src/main/java/org/apache/flink/types/parser/FloatParser.java
+++ b/flink-core/src/main/java/org/apache/flink/types/parser/FloatParser.java
@@ -41,13 +41,13 @@ public class FloatParser extends FieldParser<Float> {
 			i++;
 		}
 
-		if (Character.isWhitespace(bytes[startPos]) || Character.isWhitespace(bytes[Math.max(i
- 1, 0)])) {
-			setErrorState(ParseErrorState.WHITESPACE_IN_NUMERIC_FIELD);
+		if (i > startPos &&
+				(Character.isWhitespace(bytes[startPos]) || Character.isWhitespace(bytes[i - 1]))) {
+			setErrorState(ParseErrorState.NUMERIC_VALUE_ILLEGAL_CHARACTER);
 			return -1;
 		}
 
 		String str = new String(bytes, startPos, i - startPos);
-		int len = str.length();
 		try {
 			this.result = Float.parseFloat(str);
 			return (i == limit) ? limit : i + delimiter.length;
@@ -103,16 +103,16 @@ public class FloatParser extends FieldParser<Float> {
 		int i = 0;
 		final byte delByte = (byte) delimiter;
 
-		while (i < length && bytes[i] != delByte) {
+		while (i < length && bytes[startPos + i] != delByte) {
 			i++;
 		}
-		
-		String str = new String(bytes, startPos, i - startPos);
-		if (Character.isWhitespace(bytes[startPos]) || Character.isWhitespace(bytes[Math.max(i
- 1, 0)])) {
-			throw new NumberFormatException("There is leading or trailing whitespace in the " +
-				"numeric field: " + str);
+
+		if (i > 0 &&
+				(Character.isWhitespace(bytes[startPos]) || Character.isWhitespace(bytes[startPos + i
- 1]))) {
+			throw new NumberFormatException("There is leading or trailing whitespace in the numeric
field.");
 		}
-		int len = str.length();
+
+		String str = new String(bytes, startPos, i);
 		return Float.parseFloat(str);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ea23f28c/flink-core/src/main/java/org/apache/flink/types/parser/FloatValueParser.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/FloatValueParser.java
b/flink-core/src/main/java/org/apache/flink/types/parser/FloatValueParser.java
index e8caac2..3e17823 100644
--- a/flink-core/src/main/java/org/apache/flink/types/parser/FloatValueParser.java
+++ b/flink-core/src/main/java/org/apache/flink/types/parser/FloatValueParser.java
@@ -42,11 +42,13 @@ public class FloatValueParser extends FieldParser<FloatValue> {
 			i++;
 		}
 		
-		String str = new String(bytes, startPos, i - startPos);
-		if (Character.isWhitespace(bytes[startPos]) || Character.isWhitespace(bytes[Math.max(i
- 1, 0)])) {
-			setErrorState(ParseErrorState.WHITESPACE_IN_NUMERIC_FIELD);
+		if (i > startPos &&
+				(Character.isWhitespace(bytes[startPos]) || Character.isWhitespace(bytes[i - 1]))) {
+			setErrorState(ParseErrorState.NUMERIC_VALUE_ILLEGAL_CHARACTER);
 			return -1;
 		}
+
+		String str = new String(bytes, startPos, i - startPos);
 		try {
 			float value = Float.parseFloat(str);
 			reusable.setValue(value);

http://git-wip-us.apache.org/repos/asf/flink/blob/ea23f28c/flink-core/src/main/java/org/apache/flink/types/parser/IntParser.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/IntParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/IntParser.java
index dcd2ec2..8a40055 100644
--- a/flink-core/src/main/java/org/apache/flink/types/parser/IntParser.java
+++ b/flink-core/src/main/java/org/apache/flink/types/parser/IntParser.java
@@ -120,6 +120,10 @@ public class IntParser extends FieldParser<Integer> {
 		long val = 0;
 		boolean neg = false;
 
+		if (bytes[startPos] == delimiter) {
+			throw new NumberFormatException("Empty field.");
+		}
+
 		if (bytes[startPos] == '-') {
 			neg = true;
 			startPos++;
@@ -131,7 +135,7 @@ public class IntParser extends FieldParser<Integer> {
 
 		for (; length > 0; startPos++, length--) {
 			if (bytes[startPos] == delimiter) {
-				throw new NumberFormatException("Empty field.");
+				return (int) (neg ? -val : val);
 			}
 			if (bytes[startPos] < 48 || bytes[startPos] > 57) {
 				throw new NumberFormatException("Invalid character.");

http://git-wip-us.apache.org/repos/asf/flink/blob/ea23f28c/flink-core/src/main/java/org/apache/flink/types/parser/LongParser.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/LongParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/LongParser.java
index bb6c7c9..556274b 100644
--- a/flink-core/src/main/java/org/apache/flink/types/parser/LongParser.java
+++ b/flink-core/src/main/java/org/apache/flink/types/parser/LongParser.java
@@ -130,6 +130,10 @@ public class LongParser extends FieldParser<Long> {
 		long val = 0;
 		boolean neg = false;
 
+		if (bytes[startPos] == delimiter) {
+			throw new NumberFormatException("Empty field.");
+		}
+
 		if (bytes[startPos] == '-') {
 			neg = true;
 			startPos++;
@@ -141,7 +145,7 @@ public class LongParser extends FieldParser<Long> {
 
 		for (; length > 0; startPos++, length--) {
 			if (bytes[startPos] == delimiter) {
-				throw new NumberFormatException("Empty field.");
+				return neg ? -val : val;
 			}
 			if (bytes[startPos] < 48 || bytes[startPos] > 57) {
 				throw new NumberFormatException("Invalid character.");

http://git-wip-us.apache.org/repos/asf/flink/blob/ea23f28c/flink-core/src/main/java/org/apache/flink/types/parser/ShortParser.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/ShortParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/ShortParser.java
index 6e04d60..32df214 100644
--- a/flink-core/src/main/java/org/apache/flink/types/parser/ShortParser.java
+++ b/flink-core/src/main/java/org/apache/flink/types/parser/ShortParser.java
@@ -118,6 +118,10 @@ public class ShortParser extends FieldParser<Short> {
 		long val = 0;
 		boolean neg = false;
 
+		if (bytes[startPos] == delimiter) {
+			throw new NumberFormatException("Empty field.");
+		}
+
 		if (bytes[startPos] == '-') {
 			neg = true;
 			startPos++;
@@ -129,7 +133,7 @@ public class ShortParser extends FieldParser<Short> {
 
 		for (; length > 0; startPos++, length--) {
 			if (bytes[startPos] == delimiter) {
-				throw new NumberFormatException("Empty field.");
+				return (short) (neg ? -val : val);
 			}
 			if (bytes[startPos] < 48 || bytes[startPos] > 57) {
 				throw new NumberFormatException("Invalid character.");

http://git-wip-us.apache.org/repos/asf/flink/blob/ea23f28c/flink-core/src/test/java/org/apache/flink/types/parser/ByteParserTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/types/parser/ByteParserTest.java b/flink-core/src/test/java/org/apache/flink/types/parser/ByteParserTest.java
index ac49783..3676e62 100644
--- a/flink-core/src/test/java/org/apache/flink/types/parser/ByteParserTest.java
+++ b/flink-core/src/test/java/org/apache/flink/types/parser/ByteParserTest.java
@@ -52,6 +52,11 @@ public class ByteParserTest extends ParserTestBase<Byte> {
 	}
 
 	@Override
+	public boolean allowsEmptyField() {
+		return false;
+	}
+
+	@Override
 	public FieldParser<Byte> getParser() {
 		return new ByteParser();
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/ea23f28c/flink-core/src/test/java/org/apache/flink/types/parser/ByteValueParserTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/types/parser/ByteValueParserTest.java
b/flink-core/src/test/java/org/apache/flink/types/parser/ByteValueParserTest.java
index 1df3429..31b60d4 100644
--- a/flink-core/src/test/java/org/apache/flink/types/parser/ByteValueParserTest.java
+++ b/flink-core/src/test/java/org/apache/flink/types/parser/ByteValueParserTest.java
@@ -50,6 +50,11 @@ public class ByteValueParserTest extends ParserTestBase<ByteValue>
{
 	}
 
 	@Override
+	public boolean allowsEmptyField() {
+		return false;
+	}
+
+	@Override
 	public FieldParser<ByteValue> getParser() {
 		return new ByteValueParser();
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/ea23f28c/flink-core/src/test/java/org/apache/flink/types/parser/DoubleParserTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/types/parser/DoubleParserTest.java
b/flink-core/src/test/java/org/apache/flink/types/parser/DoubleParserTest.java
index c68dd43..bda8252 100644
--- a/flink-core/src/test/java/org/apache/flink/types/parser/DoubleParserTest.java
+++ b/flink-core/src/test/java/org/apache/flink/types/parser/DoubleParserTest.java
@@ -55,6 +55,11 @@ public class DoubleParserTest extends ParserTestBase<Double> {
 	}
 
 	@Override
+	public boolean allowsEmptyField() {
+		return false;
+	}
+
+	@Override
 	public FieldParser<Double> getParser() {
 		return new DoubleParser();
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/ea23f28c/flink-core/src/test/java/org/apache/flink/types/parser/DoubleValueParserTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/types/parser/DoubleValueParserTest.java
b/flink-core/src/test/java/org/apache/flink/types/parser/DoubleValueParserTest.java
index 7908180..fbbb5f2 100644
--- a/flink-core/src/test/java/org/apache/flink/types/parser/DoubleValueParserTest.java
+++ b/flink-core/src/test/java/org/apache/flink/types/parser/DoubleValueParserTest.java
@@ -57,6 +57,11 @@ public class DoubleValueParserTest extends ParserTestBase<DoubleValue>
{
 	}
 
 	@Override
+	public boolean allowsEmptyField() {
+		return false;
+	}
+
+	@Override
 	public FieldParser<DoubleValue> getParser() {
 		return new DoubleValueParser();
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/ea23f28c/flink-core/src/test/java/org/apache/flink/types/parser/FloatParserTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/types/parser/FloatParserTest.java b/flink-core/src/test/java/org/apache/flink/types/parser/FloatParserTest.java
index 012e353..d05557f 100644
--- a/flink-core/src/test/java/org/apache/flink/types/parser/FloatParserTest.java
+++ b/flink-core/src/test/java/org/apache/flink/types/parser/FloatParserTest.java
@@ -55,6 +55,11 @@ public class FloatParserTest extends ParserTestBase<Float> {
 	}
 
 	@Override
+	public boolean allowsEmptyField() {
+		return false;
+	}
+
+	@Override
 	public FieldParser<Float> getParser() {
 		return new FloatParser();
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/ea23f28c/flink-core/src/test/java/org/apache/flink/types/parser/FloatValueParserTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/types/parser/FloatValueParserTest.java
b/flink-core/src/test/java/org/apache/flink/types/parser/FloatValueParserTest.java
index 2b85de0..5c6e1c3 100644
--- a/flink-core/src/test/java/org/apache/flink/types/parser/FloatValueParserTest.java
+++ b/flink-core/src/test/java/org/apache/flink/types/parser/FloatValueParserTest.java
@@ -57,6 +57,11 @@ public class FloatValueParserTest extends ParserTestBase<FloatValue>
{
 	}
 
 	@Override
+	public boolean allowsEmptyField() {
+		return false;
+	}
+
+	@Override
 	public FieldParser<FloatValue> getParser() {
 		return new FloatValueParser();
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/ea23f28c/flink-core/src/test/java/org/apache/flink/types/parser/IntParserTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/types/parser/IntParserTest.java b/flink-core/src/test/java/org/apache/flink/types/parser/IntParserTest.java
index 0f11fbd..1d33b51 100644
--- a/flink-core/src/test/java/org/apache/flink/types/parser/IntParserTest.java
+++ b/flink-core/src/test/java/org/apache/flink/types/parser/IntParserTest.java
@@ -48,6 +48,11 @@ public class IntParserTest extends ParserTestBase<Integer> {
 	}
 
 	@Override
+	public boolean allowsEmptyField() {
+		return false;
+	}
+
+	@Override
 	public FieldParser<Integer> getParser() {
 		return new IntParser();
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/ea23f28c/flink-core/src/test/java/org/apache/flink/types/parser/IntValueParserTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/types/parser/IntValueParserTest.java
b/flink-core/src/test/java/org/apache/flink/types/parser/IntValueParserTest.java
index 2b6d72e..eb0403e 100644
--- a/flink-core/src/test/java/org/apache/flink/types/parser/IntValueParserTest.java
+++ b/flink-core/src/test/java/org/apache/flink/types/parser/IntValueParserTest.java
@@ -51,6 +51,11 @@ public class IntValueParserTest extends ParserTestBase<IntValue>
{
 	}
 
 	@Override
+	public boolean allowsEmptyField() {
+		return false;
+	}
+
+	@Override
 	public FieldParser<IntValue> getParser() {
 		return new IntValueParser();
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/ea23f28c/flink-core/src/test/java/org/apache/flink/types/parser/LongParserTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/types/parser/LongParserTest.java b/flink-core/src/test/java/org/apache/flink/types/parser/LongParserTest.java
index 2f7ac8f..b17de9d 100644
--- a/flink-core/src/test/java/org/apache/flink/types/parser/LongParserTest.java
+++ b/flink-core/src/test/java/org/apache/flink/types/parser/LongParserTest.java
@@ -50,6 +50,11 @@ public class LongParserTest extends ParserTestBase<Long> {
 	}
 
 	@Override
+	public boolean allowsEmptyField() {
+		return false;
+	}
+
+	@Override
 	public FieldParser<Long> getParser() {
 		return new LongParser();
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/ea23f28c/flink-core/src/test/java/org/apache/flink/types/parser/LongValueParserTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/types/parser/LongValueParserTest.java
b/flink-core/src/test/java/org/apache/flink/types/parser/LongValueParserTest.java
index 2000907..f4d82a0 100644
--- a/flink-core/src/test/java/org/apache/flink/types/parser/LongValueParserTest.java
+++ b/flink-core/src/test/java/org/apache/flink/types/parser/LongValueParserTest.java
@@ -52,6 +52,11 @@ public class LongValueParserTest extends ParserTestBase<LongValue>
{
 	}
 
 	@Override
+	public boolean allowsEmptyField() {
+		return false;
+	}
+
+	@Override
 	public FieldParser<LongValue> getParser() {
 		return new LongValueParser();
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/ea23f28c/flink-core/src/test/java/org/apache/flink/types/parser/ParserTestBase.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/types/parser/ParserTestBase.java b/flink-core/src/test/java/org/apache/flink/types/parser/ParserTestBase.java
index dabac6f..94fe327 100644
--- a/flink-core/src/test/java/org/apache/flink/types/parser/ParserTestBase.java
+++ b/flink-core/src/test/java/org/apache/flink/types/parser/ParserTestBase.java
@@ -25,10 +25,7 @@ import static org.junit.Assert.fail;
 
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
-import java.util.Arrays;
 
-import org.apache.flink.types.StringValue;
-import org.apache.flink.types.parser.FieldParser;
 import org.junit.Test;
 
 
@@ -42,6 +39,8 @@ public abstract class ParserTestBase<T> {
 	public abstract T[] getValidTestResults();
 	
 	public abstract String[] getInvalidTestValues();
+
+	public abstract boolean allowsEmptyField();
 	
 	public abstract FieldParser<T> getParser();
 	
@@ -414,19 +413,11 @@ public abstract class ParserTestBase<T> {
 				byte[] bytes = emptyString.getBytes();
 				int numRead = parser.parseField(bytes, 0, bytes.length, new byte[]{'|'}, parser.createValue());
 
-				if (getTypeClass() == String.class) {
-					assertTrue("Parser declared the empty string as invalid.", numRead != -1);
-					assertEquals("Invalid number of bytes read returned.", bytes.length, numRead);
-
-					T result = parser.getLastResult();
-					assertEquals("Parser parsed wrong.", "", result);
-				} else if(getTypeClass() == StringValue.class) {
+				if(this.allowsEmptyField()) {
 					assertTrue("Parser declared the empty string as invalid.", numRead != -1);
 					assertEquals("Invalid number of bytes read returned.", bytes.length, numRead);
-
-					T result = parser.getLastResult();
-					assertEquals("Parser parsed wrong.", new StringValue(""), result);
-				} else {
+				}
+				else {
 					assertTrue("Parser accepted the empty string.", numRead == -1);
 				}
 			}

http://git-wip-us.apache.org/repos/asf/flink/blob/ea23f28c/flink-core/src/test/java/org/apache/flink/types/parser/QuotedStringParserTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/types/parser/QuotedStringParserTest.java
b/flink-core/src/test/java/org/apache/flink/types/parser/QuotedStringParserTest.java
index 89d5ac8..3c23192 100644
--- a/flink-core/src/test/java/org/apache/flink/types/parser/QuotedStringParserTest.java
+++ b/flink-core/src/test/java/org/apache/flink/types/parser/QuotedStringParserTest.java
@@ -47,6 +47,11 @@ public class QuotedStringParserTest extends ParserTestBase<String>
{
         };
     }
 
+	@Override
+	public boolean allowsEmptyField() {
+		return true;
+	}
+
     @Override
     public FieldParser<String> getParser() {
         StringParser p = new StringParser();

http://git-wip-us.apache.org/repos/asf/flink/blob/ea23f28c/flink-core/src/test/java/org/apache/flink/types/parser/QuotedStringValueParserTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/types/parser/QuotedStringValueParserTest.java
b/flink-core/src/test/java/org/apache/flink/types/parser/QuotedStringValueParserTest.java
index 66097ac..2708c79 100644
--- a/flink-core/src/test/java/org/apache/flink/types/parser/QuotedStringValueParserTest.java
+++ b/flink-core/src/test/java/org/apache/flink/types/parser/QuotedStringValueParserTest.java
@@ -51,6 +51,11 @@ public class QuotedStringValueParserTest extends ParserTestBase<StringValue>
{
         };
     }
 
+	@Override
+	public boolean allowsEmptyField() {
+		return true;
+	}
+
     @Override
     public FieldParser<StringValue> getParser() {
         StringValueParser p = new StringValueParser();

http://git-wip-us.apache.org/repos/asf/flink/blob/ea23f28c/flink-core/src/test/java/org/apache/flink/types/parser/ShortParserTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/types/parser/ShortParserTest.java b/flink-core/src/test/java/org/apache/flink/types/parser/ShortParserTest.java
index baea30f..201714b 100644
--- a/flink-core/src/test/java/org/apache/flink/types/parser/ShortParserTest.java
+++ b/flink-core/src/test/java/org/apache/flink/types/parser/ShortParserTest.java
@@ -48,6 +48,11 @@ public class ShortParserTest extends ParserTestBase<Short> {
 	}
 
 	@Override
+	public boolean allowsEmptyField() {
+		return false;
+	}
+
+	@Override
 	public FieldParser<Short> getParser() {
 		return new ShortParser();
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/ea23f28c/flink-core/src/test/java/org/apache/flink/types/parser/ShortValueParserTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/types/parser/ShortValueParserTest.java
b/flink-core/src/test/java/org/apache/flink/types/parser/ShortValueParserTest.java
index c56df83..59e9c52 100644
--- a/flink-core/src/test/java/org/apache/flink/types/parser/ShortValueParserTest.java
+++ b/flink-core/src/test/java/org/apache/flink/types/parser/ShortValueParserTest.java
@@ -51,6 +51,11 @@ public class ShortValueParserTest extends ParserTestBase<ShortValue>
{
 	}
 
 	@Override
+	public boolean allowsEmptyField() {
+		return false;
+	}
+
+	@Override
 	public FieldParser<ShortValue> getParser() {
 		return new ShortValueParser();
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/ea23f28c/flink-core/src/test/java/org/apache/flink/types/parser/UnquotedStringParserTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/types/parser/UnquotedStringParserTest.java
b/flink-core/src/test/java/org/apache/flink/types/parser/UnquotedStringParserTest.java
index cadd021..8e75192 100644
--- a/flink-core/src/test/java/org/apache/flink/types/parser/UnquotedStringParserTest.java
+++ b/flink-core/src/test/java/org/apache/flink/types/parser/UnquotedStringParserTest.java
@@ -39,6 +39,11 @@ public class UnquotedStringParserTest extends ParserTestBase<String>
{
         };
     }
 
+	@Override
+	public boolean allowsEmptyField() {
+		return true;
+	}
+
     @Override
     public String[] getInvalidTestValues() {
         return new String[] { };

http://git-wip-us.apache.org/repos/asf/flink/blob/ea23f28c/flink-core/src/test/java/org/apache/flink/types/parser/UnquotedStringValueParserTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/types/parser/UnquotedStringValueParserTest.java
b/flink-core/src/test/java/org/apache/flink/types/parser/UnquotedStringValueParserTest.java
index d66e852..e3ddb36 100644
--- a/flink-core/src/test/java/org/apache/flink/types/parser/UnquotedStringValueParserTest.java
+++ b/flink-core/src/test/java/org/apache/flink/types/parser/UnquotedStringValueParserTest.java
@@ -44,6 +44,11 @@ public class UnquotedStringValueParserTest extends ParserTestBase<StringValue>
{
         return new String[] { };
     }
 
+	@Override
+	public boolean allowsEmptyField() {
+		return true;
+	}
+
     @Override
     public FieldParser<StringValue> getParser() {
         return new StringValueParser();


Mime
View raw message