flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [1/3] flink git commit: [core] Add tests for DelimitedInputFormat's handling of records across split boundaries
Date Mon, 13 Jul 2015 13:15:25 GMT
Repository: flink
Updated Branches:
  refs/heads/master 61b1c0a6c -> d94dfde57


[core] Add tests for DelimitedInputFormat's handling of records across split boundaries


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d94dfde5
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d94dfde5
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d94dfde5

Branch: refs/heads/master
Commit: d94dfde570632e6114dbca44c6464f204c198866
Parents: efa62df
Author: Stephan Ewen <sewen@apache.org>
Authored: Sun Jul 12 21:34:32 2015 +0200
Committer: Stephan Ewen <sewen@apache.org>
Committed: Mon Jul 13 15:14:41 2015 +0200

----------------------------------------------------------------------
 .../api/common/io/DelimitedInputFormat.java     |  18 +-
 .../api/common/io/DelimitedInputFormatTest.java | 270 +++++++++++++++----
 2 files changed, 239 insertions(+), 49 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/d94dfde5/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java
b/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java
index 7fc42ab..a1b045f 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java
@@ -80,7 +80,7 @@ public abstract class DelimitedInputFormat<OT> extends FileInputFormat<OT>
{
 	
 	static { loadGloablConfigParams(); }
 	
-	protected static final void loadGloablConfigParams() {
+	protected static void loadGloablConfigParams() {
 		int maxSamples = GlobalConfiguration.getInteger(ConfigConstants.DELIMITED_FORMAT_MAX_LINE_SAMPLES_KEY,
 				ConfigConstants.DEFAULT_DELIMITED_FORMAT_MAX_LINE_SAMPLES);
 		int minSamples = GlobalConfiguration.getInteger(ConfigConstants.DELIMITED_FORMAT_MIN_LINE_SAMPLES_KEY,
@@ -570,9 +570,19 @@ public abstract class DelimitedInputFormat<OT> extends FileInputFormat<OT>
{
 				return true;
 			}
 		}
+		
 		// else ..
-		int toRead = this.splitLength > this.readBuffer.length ? this.readBuffer.length : (int)
this.splitLength;
-		if (this.splitLength <= 0) {
+		int toRead;
+		if (this.splitLength > 0) {
+			// if we have more data, read that
+			toRead = this.splitLength > this.readBuffer.length ? this.readBuffer.length : (int)
this.splitLength;
+		}
+		else {
+			// if we have exhausted our split, we need to complete the current record, or read one
+			// more across the next split.
+			// the reason is that the next split will skip over the beginning until it finds the first
+			// delimiter, discarding it as an incomplete chunk of data that belongs to the last record
in the
+			// previous split.
 			toRead = this.readBuffer.length;
 			this.overLimit = true;
 		}
@@ -592,7 +602,7 @@ public abstract class DelimitedInputFormat<OT> extends FileInputFormat<OT>
{
 	}
 	
 	// ============================================================================================
-	//  Parameterization via configuration
+	//  Parametrization via configuration
 	// ============================================================================================
 	
 	// ------------------------------------- Config Keys ------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/d94dfde5/flink-core/src/test/java/org/apache/flink/api/common/io/DelimitedInputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/io/DelimitedInputFormatTest.java
b/flink-core/src/test/java/org/apache/flink/api/common/io/DelimitedInputFormatTest.java
index 4af394f..599a640 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/io/DelimitedInputFormatTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/io/DelimitedInputFormatTest.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.api.common.io;
 
 import static org.junit.Assert.assertArrayEquals;
@@ -34,6 +33,9 @@ import java.io.IOException;
 import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
 import java.io.OutputStreamWriter;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
 
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.FileInputSplit;
@@ -45,10 +47,6 @@ import org.junit.Test;
 
 public class DelimitedInputFormatTest {
 	
-	protected Configuration config;
-	
-	protected File tempFile;
-	
 	private final DelimitedInputFormat<String> format = new MyTextInputFormat();
 	
 	// --------------------------------------------------------------------------------------------
@@ -56,7 +54,6 @@ public class DelimitedInputFormatTest {
 	@Before
 	public void setup() {
 		this.format.setFilePath(new Path("file:///some/file/that/will/not/be/read"));
-		this.config = new Configuration();
 	}
 	
 	@After
@@ -64,22 +61,20 @@ public class DelimitedInputFormatTest {
 		if (this.format != null) {
 			this.format.close();
 		}
-		if (this.tempFile != null) {
-			this.tempFile.delete();
-		}
 	}
 
 	// --------------------------------------------------------------------------------------------
 	// --------------------------------------------------------------------------------------------
 	@Test
 	public void testConfigure() {
-		this.config.setString("delimited-format.delimiter", "\n");
+		Configuration cfg = new Configuration();
+		cfg.setString("delimited-format.delimiter", "\n");
 		
-		format.configure(this.config);
+		format.configure(cfg);
 		assertEquals("\n", new String(format.getDelimiter()));
 
-		this.config.setString("delimited-format.delimiter", "&-&");
-		format.configure(this.config);
+		cfg.setString("delimited-format.delimiter", "&-&");
+		format.configure(cfg);
 		assertEquals("&-&", new String(format.getDelimiter()));
 	}
 	
@@ -125,11 +120,58 @@ public class DelimitedInputFormatTest {
 		assertEquals(bufferSize, format.getBufferSize());
 	}
 
-	/**
-	 * Tests simple delimited parsing with a custom delimiter.
-	 */
 	@Test
-	public void testRead() {
+	public void testReadWithoutTrailingDelimiter() throws IOException {
+		// 2. test case
+		final String myString = "my key|my val$$$my key2\n$$ctd.$$|my value2";
+		final FileInputSplit split = createTempFile(myString);
+
+		final Configuration parameters = new Configuration();
+		// default delimiter = '\n'
+
+		format.configure(parameters);
+		format.open(split);
+
+		String first = format.nextRecord(null);
+		String second = format.nextRecord(null);
+
+		assertNotNull(first);
+		assertNotNull(second);
+
+		assertEquals("my key|my val$$$my key2", first);
+		assertEquals("$$ctd.$$|my value2", second);
+
+		assertNull(format.nextRecord(null));
+		assertTrue(format.reachedEnd());
+	}
+	
+	@Test
+	public void testReadWithTrailingDelimiter() throws IOException {
+		// 2. test case
+		final String myString = "my key|my val$$$my key2\n$$ctd.$$|my value2\n";
+		final FileInputSplit split = createTempFile(myString);
+
+		final Configuration parameters = new Configuration();
+		// default delimiter = '\n'
+
+		format.configure(parameters);
+		format.open(split);
+
+		String first = format.nextRecord(null);
+		String second = format.nextRecord(null);
+
+		assertNotNull(first);
+		assertNotNull(second);
+
+		assertEquals("my key|my val$$$my key2", first);
+		assertEquals("$$ctd.$$|my value2", second);
+
+		assertNull(format.nextRecord(null));
+		assertTrue(format.reachedEnd());
+	}
+	
+	@Test
+	public void testReadCustomDelimiter() {
 		try {
 			final String myString = "my key|my val$$$my key2\n$$ctd.$$|my value2";
 			final FileInputSplit split = createTempFile(myString);
@@ -156,42 +198,180 @@ public class DelimitedInputFormatTest {
 			fail(e.getMessage());
 		}
 	}
-	
+
+	/**
+	 * Tests that the records are read correctly when the split boundary is in the middle of
a record.
+	 */
 	@Test
-	public void testRead2() throws IOException {
-		// 2. test case
-		final String myString = "my key|my val$$$my key2\n$$ctd.$$|my value2\n";
-		final FileInputSplit split = createTempFile(myString);
-		
-		final Configuration parameters = new Configuration();
-		// default delimiter = '\n'
-		
-		format.configure(parameters);
-		format.open(split);
+	public void testReadOverSplitBoundariesUnaligned() {
+		try {
+			final String myString = "value1\nvalue2\nvalue3";
+			final FileInputSplit split = createTempFile(myString);
+			
+			FileInputSplit split1 = new FileInputSplit(0, split.getPath(), 0, split.getLength() /
2, split.getHostnames());
+			FileInputSplit split2 = new FileInputSplit(1, split.getPath(), split1.getLength(), split.getLength(),
split.getHostnames());
 
-		String first = format.nextRecord(null);
-		String second = format.nextRecord(null);
-		
-		assertNotNull(first);
-		assertNotNull(second);
-		
-		assertEquals("my key|my val$$$my key2", first);
-		assertEquals("$$ctd.$$|my value2", second);
-		
-		assertNull(format.nextRecord(null));
-		assertTrue(format.reachedEnd());
+			final Configuration parameters = new Configuration();
+			
+			format.configure(parameters);
+			format.open(split1);
+			
+			assertEquals("value1", format.nextRecord(null));
+			assertEquals("value2", format.nextRecord(null));
+			assertNull(format.nextRecord(null));
+			assertTrue(format.reachedEnd());
+			
+			format.close();
+			format.open(split2);
+
+			assertEquals("value3", format.nextRecord(null));
+			assertNull(format.nextRecord(null));
+			assertTrue(format.reachedEnd());
+			
+			format.close();
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+
+	/**
+	 * Tests that the correct number of records is read when the split boundary is exact at
the record boundary.
+	 */
+	@Test
+	public void testReadWithBufferSizeIsMultple() {
+		try {
+			final String myString = "aaaaaaa\nbbbbbbb\nccccccc\nddddddd\n";
+			final FileInputSplit split = createTempFile(myString);
+
+			FileInputSplit split1 = new FileInputSplit(0, split.getPath(), 0, split.getLength() /
2, split.getHostnames());
+			FileInputSplit split2 = new FileInputSplit(1, split.getPath(), split1.getLength(), split.getLength(),
split.getHostnames());
+
+			final Configuration parameters = new Configuration();
+
+			format.setBufferSize(2 * ((int) split1.getLength()));
+			format.configure(parameters);
+
+			String next;
+			int count = 0;
+
+			// read split 1
+			format.open(split1);
+			while ((next = format.nextRecord(null)) != null) {
+				assertEquals(7, next.length());
+				count++;
+			}
+			assertNull(format.nextRecord(null));
+			assertTrue(format.reachedEnd());
+			format.close();
+			
+			// this one must have read one too many, because the next split will skipp the trailing
remainder
+			// which happens to be one full record
+			assertEquals(3, count);
+
+			// read split 2
+			format.open(split2);
+			while ((next = format.nextRecord(null)) != null) {
+				assertEquals(7, next.length());
+				count++;
+			}
+			format.close();
+
+			assertEquals(4, count);
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+
+	@Test
+	public void testReadExactlyBufferSize() {
+		try {
+			final String myString = "aaaaaaa\nbbbbbbb\nccccccc\nddddddd\n";
+			
+			final FileInputSplit split = createTempFile(myString);
+			final Configuration parameters = new Configuration();
+			
+			format.setBufferSize((int) split.getLength());
+			format.configure(parameters);
+			format.open(split);
+
+			String next;
+			int count = 0;
+			while ((next = format.nextRecord(null)) != null) {
+				assertEquals(7, next.length());
+				count++;
+			}
+			assertNull(format.nextRecord(null));
+			assertTrue(format.reachedEnd());
+
+			format.close();
+			
+			assertEquals(4, count);
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+
+	@Test
+	public void testReadRecordsLargerThanBuffer() {
+		try {
+			final String myString = "aaaaaaaaaaaaaaaaaaaaa\n" +
+									"bbbbbbbbbbbbbbbbbbbbbbbbb\n" +
+									"ccccccccccccccccccc\n" +
+									"ddddddddddddddddddddddddddddddddddd\n";
+
+			final FileInputSplit split = createTempFile(myString);
+			FileInputSplit split1 = new FileInputSplit(0, split.getPath(), 0, split.getLength() /
2, split.getHostnames());
+			FileInputSplit split2 = new FileInputSplit(1, split.getPath(), split1.getLength(), split.getLength(),
split.getHostnames());
+			
+			final Configuration parameters = new Configuration();
+
+			format.setBufferSize(8);
+			format.configure(parameters);
+
+			String next;
+			List<String> result = new ArrayList<String>();
+			
+			
+			format.open(split1);
+			while ((next = format.nextRecord(null)) != null) {
+				result.add(next);
+			}
+			assertNull(format.nextRecord(null));
+			assertTrue(format.reachedEnd());
+			format.close();
+
+			format.open(split2);
+			while ((next = format.nextRecord(null)) != null) {
+				result.add(next);
+			}
+			assertNull(format.nextRecord(null));
+			assertTrue(format.reachedEnd());
+			format.close();
+			
+			assertEquals(4, result.size());
+			assertEquals(Arrays.asList(myString.split("\n")), result);
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
 	}
 	
-	
-	private FileInputSplit createTempFile(String contents) throws IOException {
-		this.tempFile = File.createTempFile("test_contents", "tmp");
-		this.tempFile.deleteOnExit();
+	private static FileInputSplit createTempFile(String contents) throws IOException {
+		File tempFile = File.createTempFile("test_contents", "tmp");
+		tempFile.deleteOnExit();
 		
-		OutputStreamWriter wrt = new OutputStreamWriter(new FileOutputStream(this.tempFile));
+		OutputStreamWriter wrt = new OutputStreamWriter(new FileOutputStream(tempFile));
 		wrt.write(contents);
 		wrt.close();
 		
-		return new FileInputSplit(0, new Path(this.tempFile.toURI().toString()), 0, this.tempFile.length(),
new String[] {"localhost"});
+		return new FileInputSplit(0, new Path(tempFile.toURI().toString()), 0, tempFile.length(),
new String[] {"localhost"});
 	}
 	
 	


Mime
View raw message