flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jinch...@apache.org
Subject flink git commit: [FLINK-8331][core] FieldParser do not correctly set EMPT_COLUMN error state.
Date Fri, 05 Jan 2018 16:06:34 GMT
Repository: flink
Updated Branches:
  refs/heads/master fcdd56e54 -> 9dd3a859b


[FLINK-8331][core] FieldParser do not correctly set EMPT_COLUMN error state.

This closes #5218


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9dd3a859
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9dd3a859
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9dd3a859

Branch: refs/heads/master
Commit: 9dd3a859b1609d27ccc80a3da86456e533895b7a
Parents: fcdd56e
Author: 金竹 <jincheng.sunjc@alibaba-inc.com>
Authored: Sat Dec 30 11:10:18 2017 +0800
Committer: sunjincheng121 <sunjincheng121@gmail.com>
Committed: Sat Jan 6 00:05:59 2018 +0800

----------------------------------------------------------------------
 .../flink/types/parser/BooleanParser.java       |  21 ++--
 .../apache/flink/types/parser/ByteParser.java   |   6 ++
 .../flink/types/parser/ByteValueParser.java     |   8 +-
 .../apache/flink/types/parser/FieldParser.java  |   8 +-
 .../apache/flink/types/parser/IntParser.java    |   9 +-
 .../flink/types/parser/IntValueParser.java      |   8 +-
 .../apache/flink/types/parser/LongParser.java   |   6 ++
 .../flink/types/parser/LongValueParser.java     |   6 ++
 .../apache/flink/types/parser/ShortParser.java  |   7 ++
 .../flink/types/parser/ShortValueParser.java    |   8 +-
 .../apache/flink/types/parser/StringParser.java |  10 +-
 .../flink/types/parser/StringValueParser.java   |  10 +-
 .../flink/types/parser/FieldParserTest.java     | 105 ++++++++++++++++++-
 .../flink/types/parser/ParserTestBase.java      |  27 ++---
 14 files changed, 194 insertions(+), 45 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/9dd3a859/flink-core/src/main/java/org/apache/flink/types/parser/BooleanParser.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/BooleanParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/BooleanParser.java
index 908c05f..3a6178a 100644
--- a/flink-core/src/main/java/org/apache/flink/types/parser/BooleanParser.java
+++ b/flink-core/src/main/java/org/apache/flink/types/parser/BooleanParser.java
@@ -37,34 +37,25 @@ public class BooleanParser extends FieldParser<Boolean> {
 	};
 
 	@Override
-	public int parseField(byte[] bytes, int startPos, int limit, byte[] delim, Boolean reuse)
{
+	public int parseField(byte[] bytes, int startPos, int limit, byte[] delimiter, Boolean reuse)
{
 
-		final int delimLimit = limit - delim.length + 1;
+		final int i = nextStringEndPos(bytes, startPos, limit, delimiter);
 
-		int i = startPos;
-
-		while (i < limit) {
-			if (i < delimLimit && delimiterNext(bytes, i, delim)) {
-				if (i == startPos) {
-					setErrorState(ParseErrorState.EMPTY_COLUMN);
-					return -1;
-				}
-				break;
-			}
-			i++;
+		if (i < 0) {
+			return -1;
 		}
 
 		for (byte[] aTRUE : TRUE) {
 			if (byteArrayEquals(bytes, startPos, i - startPos, aTRUE)) {
 				result = true;
-				return (i == limit) ? limit : i + delim.length;
+				return (i == limit) ? limit : i + delimiter.length;
 			}
 		}
 
 		for (byte[] aFALSE : FALSE) {
 			if (byteArrayEquals(bytes, startPos, i - startPos, aFALSE)) {
 				result = false;
-				return (i == limit) ? limit : i + delim.length;
+				return (i == limit) ? limit : i + delimiter.length;
 			}
 		}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9dd3a859/flink-core/src/main/java/org/apache/flink/types/parser/ByteParser.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/ByteParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/ByteParser.java
index 7ee257e..79e9f1f 100644
--- a/flink-core/src/main/java/org/apache/flink/types/parser/ByteParser.java
+++ b/flink-core/src/main/java/org/apache/flink/types/parser/ByteParser.java
@@ -28,6 +28,12 @@ public class ByteParser extends FieldParser<Byte> {
 
 	@Override
 	public int parseField(byte[] bytes, int startPos, int limit, byte[] delimiter, Byte reusable)
{
+
+		if (startPos == limit) {
+			setErrorState(ParseErrorState.EMPTY_COLUMN);
+			return -1;
+		}
+
 		int val = 0;
 		boolean neg = false;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9dd3a859/flink-core/src/main/java/org/apache/flink/types/parser/ByteValueParser.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/ByteValueParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/ByteValueParser.java
index c79f5d4..e5b8e47 100644
--- a/flink-core/src/main/java/org/apache/flink/types/parser/ByteValueParser.java
+++ b/flink-core/src/main/java/org/apache/flink/types/parser/ByteValueParser.java
@@ -33,6 +33,12 @@ public class ByteValueParser extends FieldParser<ByteValue> {
 	
 	@Override
 	public int parseField(byte[] bytes, int startPos, int limit, byte[] delimiter, ByteValue
reusable) {
+
+		if (startPos == limit) {
+			setErrorState(ParseErrorState.EMPTY_COLUMN);
+			return -1;
+		}
+
 		int val = 0;
 		boolean neg = false;
 		
@@ -73,7 +79,7 @@ public class ByteValueParser extends FieldParser<ByteValue> {
 				return -1;
 			}
 		}
-		
+
 		reusable.setValue((byte) (neg ? -val : val));
 		return limit;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/9dd3a859/flink-core/src/main/java/org/apache/flink/types/parser/FieldParser.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/FieldParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/FieldParser.java
index c45f820..59cc2e7 100644
--- a/flink-core/src/main/java/org/apache/flink/types/parser/FieldParser.java
+++ b/flink-core/src/main/java/org/apache/flink/types/parser/FieldParser.java
@@ -210,15 +210,15 @@ public abstract class FieldParser<T> {
 
 		while (endPos < limit) {
 			if (endPos < delimLimit && delimiterNext(bytes, endPos, delimiter)) {
-				if (endPos == startPos) {
-					setErrorState(ParseErrorState.EMPTY_COLUMN);
-					return -1;
-				}
 				break;
 			}
 			endPos++;
 		}
 
+		if (endPos == startPos) {
+			setErrorState(ParseErrorState.EMPTY_COLUMN);
+			return -1;
+		}
 		return endPos;
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9dd3a859/flink-core/src/main/java/org/apache/flink/types/parser/IntParser.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/IntParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/IntParser.java
index 4e5d43f..5135a99 100644
--- a/flink-core/src/main/java/org/apache/flink/types/parser/IntParser.java
+++ b/flink-core/src/main/java/org/apache/flink/types/parser/IntParser.java
@@ -35,8 +35,13 @@ public class IntParser extends FieldParser<Integer> {
 	private int result;
 
 	@Override
-	public int parseField(byte[] bytes, int startPos, int limit, byte[] delimiter, Integer 
-		reusable) {
+	public int parseField(byte[] bytes, int startPos, int limit, byte[] delimiter, Integer reusable)
{
+
+		if (startPos == limit) {
+			setErrorState(ParseErrorState.EMPTY_COLUMN);
+			return -1;
+		}
+
 		long val = 0;
 		boolean neg = false;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9dd3a859/flink-core/src/main/java/org/apache/flink/types/parser/IntValueParser.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/IntValueParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/IntValueParser.java
index 0229bc7..0c15f90 100644
--- a/flink-core/src/main/java/org/apache/flink/types/parser/IntValueParser.java
+++ b/flink-core/src/main/java/org/apache/flink/types/parser/IntValueParser.java
@@ -36,6 +36,12 @@ public class IntValueParser extends FieldParser<IntValue> {
 	
 	@Override
 	public int parseField(byte[] bytes, int startPos, int limit, byte[] delimiter, IntValue
reusable) {
+
+		if (startPos == limit) {
+			setErrorState(ParseErrorState.EMPTY_COLUMN);
+			return -1;
+		}
+
 		long val = 0;
 		boolean neg = false;
 
@@ -75,7 +81,7 @@ public class IntValueParser extends FieldParser<IntValue> {
 				return -1;
 			}
 		}
-		
+
 		reusable.setValue((int) (neg ? -val : val));
 		return limit;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/9dd3a859/flink-core/src/main/java/org/apache/flink/types/parser/LongParser.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/LongParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/LongParser.java
index 79eb080..11ed1a4 100644
--- a/flink-core/src/main/java/org/apache/flink/types/parser/LongParser.java
+++ b/flink-core/src/main/java/org/apache/flink/types/parser/LongParser.java
@@ -32,6 +32,12 @@ public class LongParser extends FieldParser<Long> {
 
 	@Override
 	public int parseField(byte[] bytes, int startPos, int limit, byte[] delimiter, Long reusable)
{
+
+		if (startPos == limit) {
+			setErrorState(ParseErrorState.EMPTY_COLUMN);
+			return -1;
+		}
+
 		long val = 0;
 		boolean neg = false;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9dd3a859/flink-core/src/main/java/org/apache/flink/types/parser/LongValueParser.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/LongValueParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/LongValueParser.java
index 5ddd40c..91b8aab 100644
--- a/flink-core/src/main/java/org/apache/flink/types/parser/LongValueParser.java
+++ b/flink-core/src/main/java/org/apache/flink/types/parser/LongValueParser.java
@@ -33,6 +33,12 @@ public class LongValueParser extends FieldParser<LongValue> {
 	
 	@Override
 	public int parseField(byte[] bytes, int startPos, int limit, byte[] delimiter, LongValue
reusable) {
+
+		if (startPos == limit) {
+			setErrorState(ParseErrorState.EMPTY_COLUMN);
+			return -1;
+		}
+
 		long val = 0;
 		boolean neg = false;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9dd3a859/flink-core/src/main/java/org/apache/flink/types/parser/ShortParser.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/ShortParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/ShortParser.java
index c458a3f..20f727d 100644
--- a/flink-core/src/main/java/org/apache/flink/types/parser/ShortParser.java
+++ b/flink-core/src/main/java/org/apache/flink/types/parser/ShortParser.java
@@ -36,6 +36,12 @@ public class ShortParser extends FieldParser<Short> {
 
 	@Override
 	public int parseField(byte[] bytes, int startPos, int limit, byte[] delimiter, Short reusable)
{
+
+		if (startPos == limit) {
+			setErrorState(ParseErrorState.EMPTY_COLUMN);
+			return -1;
+		}
+
 		int val = 0;
 		boolean neg = false;
 
@@ -148,6 +154,7 @@ public class ShortParser extends FieldParser<Short> {
 				throw new NumberFormatException("Value overflow/underflow");
 			}
 		}
+
 		return (short) (neg ? -val : val);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9dd3a859/flink-core/src/main/java/org/apache/flink/types/parser/ShortValueParser.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/ShortValueParser.java
b/flink-core/src/main/java/org/apache/flink/types/parser/ShortValueParser.java
index 47471a3..0332ba8 100644
--- a/flink-core/src/main/java/org/apache/flink/types/parser/ShortValueParser.java
+++ b/flink-core/src/main/java/org/apache/flink/types/parser/ShortValueParser.java
@@ -36,6 +36,12 @@ public class ShortValueParser extends FieldParser<ShortValue> {
 
 	@Override
 	public int parseField(byte[] bytes, int startPos, int limit, byte[] delimiter, ShortValue
reusable) {
+
+		if (startPos == limit) {
+			setErrorState(ParseErrorState.EMPTY_COLUMN);
+			return -1;
+		}
+
 		int val = 0;
 		boolean neg = false;
 
@@ -75,7 +81,7 @@ public class ShortValueParser extends FieldParser<ShortValue> {
 				return -1;
 			}
 		}
-		
+
 		reusable.setValue((short) (neg ? -val : val));
 		return limit;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/9dd3a859/flink-core/src/main/java/org/apache/flink/types/parser/StringParser.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/StringParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/StringParser.java
index 7b46a7e..bb1c04f 100644
--- a/flink-core/src/main/java/org/apache/flink/types/parser/StringParser.java
+++ b/flink-core/src/main/java/org/apache/flink/types/parser/StringParser.java
@@ -42,6 +42,12 @@ public class StringParser extends FieldParser<String> {
 	@Override
 	public int parseField(byte[] bytes, int startPos, int limit, byte[] delimiter, String reusable)
{
 
+		if (startPos == limit) {
+			setErrorState(ParseErrorState.EMPTY_COLUMN);
+			this.result = "";
+			return limit;
+		}
+
 		int i = startPos;
 
 		final int delimLimit = limit - delimiter.length + 1;
@@ -83,10 +89,6 @@ public class StringParser extends FieldParser<String> {
 			}
 
 			if (i >= delimLimit) {
-				// no delimiter found. Take the full string
-				if (limit == startPos) {
-					setErrorState(ParseErrorState.EMPTY_COLUMN); // mark empty column
-				}
 				this.result = new String(bytes, startPos, limit - startPos, getCharset());
 				return limit;
 			} else {

http://git-wip-us.apache.org/repos/asf/flink/blob/9dd3a859/flink-core/src/main/java/org/apache/flink/types/parser/StringValueParser.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/StringValueParser.java
b/flink-core/src/main/java/org/apache/flink/types/parser/StringValueParser.java
index c72b029..193babb 100644
--- a/flink-core/src/main/java/org/apache/flink/types/parser/StringValueParser.java
+++ b/flink-core/src/main/java/org/apache/flink/types/parser/StringValueParser.java
@@ -45,6 +45,12 @@ public class StringValueParser extends FieldParser<StringValue> {
 	@Override
 	public int parseField(byte[] bytes, int startPos, int limit, byte[] delimiter, StringValue
reusable) {
 
+		if (startPos == limit) {
+			setErrorState(ParseErrorState.EMPTY_COLUMN);
+			reusable.setValueAscii(bytes, startPos, 0);
+			return limit;
+		}
+
 		this.result = reusable;
 		int i = startPos;
 
@@ -89,10 +95,6 @@ public class StringValueParser extends FieldParser<StringValue> {
 			}
 
 			if (i >= delimLimit) {
-				// no delimiter found. Take the full string
-				if (limit == startPos) {
-					setErrorState(ParseErrorState.EMPTY_COLUMN); // mark empty column
-				}
 				reusable.setValueAscii(bytes, startPos, limit - startPos);
 				return limit;
 			} else {

http://git-wip-us.apache.org/repos/asf/flink/blob/9dd3a859/flink-core/src/test/java/org/apache/flink/types/parser/FieldParserTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/types/parser/FieldParserTest.java b/flink-core/src/test/java/org/apache/flink/types/parser/FieldParserTest.java
index bcb2bfb..ad46742 100644
--- a/flink-core/src/test/java/org/apache/flink/types/parser/FieldParserTest.java
+++ b/flink-core/src/test/java/org/apache/flink/types/parser/FieldParserTest.java
@@ -18,6 +18,9 @@
 
 package org.apache.flink.types.parser;
 
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.types.parser.FieldParser.ParseErrorState;
+
 import org.junit.Test;
 
 import static org.junit.Assert.*;
@@ -43,4 +46,104 @@ public class FieldParserTest {
 		assertFalse(FieldParser.endsWithDelimiter(bytes, 3, delim));
 	}
 
-}
\ No newline at end of file
+	@Test
+	public void testNextStringEndPos() throws Exception {
+
+		FieldParser parser = new TestFieldParser<String>();
+		// single-char delimiter
+		byte[] singleCharDelim = "|".getBytes(ConfigConstants.DEFAULT_CHARSET);
+
+		byte[] bytes1 = "a|".getBytes(ConfigConstants.DEFAULT_CHARSET);
+		assertEquals(1, parser.nextStringEndPos(bytes1, 0, bytes1.length, singleCharDelim));
+		assertEquals(-1, parser.nextStringEndPos(bytes1, 1, bytes1.length, singleCharDelim));
+		assertEquals(ParseErrorState.EMPTY_COLUMN, parser.getErrorState());
+
+		parser.resetParserState();
+		assertEquals(-1, parser.nextStringEndPos(bytes1, 1, 1, singleCharDelim));
+		assertEquals(ParseErrorState.EMPTY_COLUMN, parser.getErrorState());
+
+		parser.resetParserState();
+		assertEquals(-1, parser.nextStringEndPos(bytes1, 2, bytes1.length, singleCharDelim));
+		assertEquals(ParseErrorState.EMPTY_COLUMN, parser.getErrorState());
+
+		byte[] bytes2 = "a||".getBytes(ConfigConstants.DEFAULT_CHARSET);
+		parser.resetParserState();
+		assertEquals(-1, parser.nextStringEndPos(bytes2, 1, bytes2.length, singleCharDelim));
+		assertEquals(ParseErrorState.EMPTY_COLUMN, parser.getErrorState());
+
+		byte[] bytes3 = "a|c".getBytes(ConfigConstants.DEFAULT_CHARSET);
+		parser.resetParserState();
+		assertEquals(-1, parser.nextStringEndPos(bytes3, 1, bytes3.length, singleCharDelim));
+		assertEquals(ParseErrorState.EMPTY_COLUMN, parser.getErrorState());
+
+		parser.resetParserState();
+		assertEquals(3, parser.nextStringEndPos(bytes3, 2, bytes3.length, singleCharDelim));
+		assertEquals(ParseErrorState.NONE, parser.getErrorState());
+
+		byte[] bytes4 = "a|c|".getBytes(ConfigConstants.DEFAULT_CHARSET);
+		parser.resetParserState();
+		assertEquals(3, parser.nextStringEndPos(bytes4, 2, bytes4.length, singleCharDelim));
+		assertEquals(ParseErrorState.NONE, parser.getErrorState());
+
+		// multi-char delimiter
+		byte[] multiCharDelim = "|#|".getBytes(ConfigConstants.DEFAULT_CHARSET);
+		byte[] mBytes1 = "a|#|".getBytes(ConfigConstants.DEFAULT_CHARSET);
+		parser.resetParserState();
+		assertEquals(1, parser.nextStringEndPos(mBytes1, 0, mBytes1.length, multiCharDelim));
+		assertEquals(-1, parser.nextStringEndPos(mBytes1, 1, mBytes1.length, multiCharDelim));
+		assertEquals(ParseErrorState.EMPTY_COLUMN, parser.getErrorState());
+
+		parser.resetParserState();
+		assertEquals(-1, parser.nextStringEndPos(mBytes1, 1, 1, multiCharDelim));
+		assertEquals(ParseErrorState.EMPTY_COLUMN, parser.getErrorState());
+
+		byte[] mBytes2 = "a|#||#|".getBytes(ConfigConstants.DEFAULT_CHARSET);
+		parser.resetParserState();
+		assertEquals(-1, parser.nextStringEndPos(mBytes2, 1, mBytes2.length, multiCharDelim));
+		assertEquals(ParseErrorState.EMPTY_COLUMN, parser.getErrorState());
+
+		byte[] mBytes3 = "a|#|b".getBytes(ConfigConstants.DEFAULT_CHARSET);
+		parser.resetParserState();
+		assertEquals(-1, parser.nextStringEndPos(mBytes3, 1, mBytes3.length, multiCharDelim));
+		assertEquals(ParseErrorState.EMPTY_COLUMN, parser.getErrorState());
+
+		parser.resetParserState();
+		assertEquals(5, parser.nextStringEndPos(mBytes3, 2, mBytes3.length, multiCharDelim));
+		assertEquals(ParseErrorState.NONE, parser.getErrorState());
+
+		byte[] mBytes4 = "a|#|b|#|".getBytes(ConfigConstants.DEFAULT_CHARSET);
+		parser.resetParserState();
+		assertEquals(5, parser.nextStringEndPos(mBytes4, 2, mBytes4.length, multiCharDelim));
+		assertEquals(ParseErrorState.NONE, parser.getErrorState());
+
+	}
+
+}
+
+/**
+ * A FieldParser just for nextStringEndPos test.
+ *
+ * @param <T> The type that is parsed.
+ */
+class TestFieldParser<T> extends FieldParser<T>{
+
+	@Override
+	protected int parseField(byte[] bytes, int startPos, int limit, byte[] delim, T reuse) {
+		return 0;
+	}
+
+	@Override
+	public T getLastResult() {
+		return null;
+	}
+
+	@Override
+	public T createValue() {
+		return null;
+	}
+
+	@Override
+	protected void resetParserState() {
+		super.resetParserState();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9dd3a859/flink-core/src/test/java/org/apache/flink/types/parser/ParserTestBase.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/types/parser/ParserTestBase.java b/flink-core/src/test/java/org/apache/flink/types/parser/ParserTestBase.java
index 51ace12..c0e01b1 100644
--- a/flink-core/src/test/java/org/apache/flink/types/parser/ParserTestBase.java
+++ b/flink-core/src/test/java/org/apache/flink/types/parser/ParserTestBase.java
@@ -405,28 +405,31 @@ public abstract class ParserTestBase<T> extends TestLogger {
 	}
 
 	@Test
-	public void testEmptyFieldInIsolation() {
+	public void testTrailingEmptyField() {
 		try {
-			String [] emptyStrings = new String[] {"|"};
-
 			FieldParser<T> parser = getParser();
 
-			for (String emptyString : emptyStrings) {
-				byte[] bytes = emptyString.getBytes(ConfigConstants.DEFAULT_CHARSET);
-				int numRead = parser.parseField(bytes, 0, bytes.length, new byte[]{'|'}, parser.createValue());
+			byte[] bytes = "||".getBytes(ConfigConstants.DEFAULT_CHARSET);
+
+			for (int i = 0; i < 2; i++) {
+
+				// test empty field with trailing delimiter when i = 0,
+				// test empty field with out trailing delimiter when i= 1.
+				int numRead = parser.parseField(bytes, i, bytes.length, new byte[]{'|'}, parser.createValue());
 
 				assertEquals(FieldParser.ParseErrorState.EMPTY_COLUMN, parser.getErrorState());
 
-				if(this.allowsEmptyField()) {
+				if (this.allowsEmptyField()) {
 					assertTrue("Parser declared the empty string as invalid.", numRead != -1);
-					assertEquals("Invalid number of bytes read returned.", bytes.length, numRead);
-				}
-				else {
+					assertEquals("Invalid number of bytes read returned.", i + 1, numRead);
+				} else {
 					assertTrue("Parser accepted the empty string.", numRead == -1);
 				}
+
+				parser.resetParserState();
 			}
-		}
-		catch (Exception e) {
+
+		} catch (Exception e) {
 			System.err.println(e.getMessage());
 			e.printStackTrace();
 			fail("Test erroneous: " + e.getMessage());


Mime
View raw message