Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id D03F9200CBC for ; Tue, 20 Jun 2017 16:59:22 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id CF094160BE1; Tue, 20 Jun 2017 14:59:22 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id C7DB8160BD3 for ; Tue, 20 Jun 2017 16:59:21 +0200 (CEST) Received: (qmail 22636 invoked by uid 500); 20 Jun 2017 14:59:20 -0000 Mailing-List: contact commits-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list commits@flink.apache.org Received: (qmail 22627 invoked by uid 99); 20 Jun 2017 14:59:20 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 20 Jun 2017 14:59:20 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 95ADBDFAEE; Tue, 20 Jun 2017 14:59:17 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: fhueske@apache.org To: commits@flink.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: flink git commit: [FLINK-6652] [core] Fix handling of delimiters split by buffers in DelimitedInputFormat Date: Tue, 20 Jun 2017 14:59:17 +0000 (UTC) archived-at: Tue, 20 Jun 2017 14:59:23 -0000 Repository: flink Updated Branches: refs/heads/master f24a499b3 -> be662bf7e [FLINK-6652] [core] Fix handling of delimiters split by buffers in DelimitedInputFormat This closes #4088. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/be662bf7 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/be662bf7 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/be662bf7 Branch: refs/heads/master Commit: be662bf7ebcefb289988a24392104c3385029568 Parents: f24a499 Author: Fabian Hueske Authored: Wed Jun 7 23:01:06 2017 +0200 Committer: Fabian Hueske Committed: Tue Jun 20 16:58:51 2017 +0200 ---------------------------------------------------------------------- .../api/common/io/DelimitedInputFormat.java | 99 +++++++++++++------- .../api/common/io/DelimitedInputFormatTest.java | 28 +++++- 2 files changed, 93 insertions(+), 34 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/be662bf7/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java b/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java index e20f646..4d715e7 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java @@ -266,10 +266,10 @@ public abstract class DelimitedInputFormat extends FileInputFormat imple } public void setBufferSize(int bufferSize) { - if (bufferSize < 1) { - throw new IllegalArgumentException("Buffer size must be at least 1."); + if (bufferSize < 2) { + throw new IllegalArgumentException("Buffer size must be at least 2."); } - + this.bufferSize = bufferSize; } @@ -487,13 +487,17 @@ public abstract class DelimitedInputFormat extends FileInputFormat imple this.end = true; } } else { - fillBuffer(); + fillBuffer(0); } } private void initBuffers() { this.bufferSize = this.bufferSize <= 0 ? DEFAULT_READ_BUFFER_SIZE : this.bufferSize; + if (this.bufferSize <= this.delimiter.length) { + throw new IllegalArgumentException("Buffer size must be greater than length of delimiter."); + } + if (this.readBuffer == null || this.readBuffer.length != this.bufferSize) { this.readBuffer = new byte[this.bufferSize]; } @@ -548,13 +552,30 @@ public abstract class DelimitedInputFormat extends FileInputFormat imple int countInWrapBuffer = 0; - /* position of matching positions in the delimiter byte array */ - int i = 0; + // position of matching positions in the delimiter byte array + int delimPos = 0; while (true) { if (this.readPos >= this.limit) { - if (!fillBuffer()) { - if (countInWrapBuffer > 0) { + // readBuffer is completely consumed. Fill it again but keep partially read delimiter bytes. + if (!fillBuffer(delimPos)) { + int countInReadBuffer = delimPos; + if (countInWrapBuffer + countInReadBuffer > 0) { + // we have bytes left to emit + if (countInReadBuffer > 0) { + // we have bytes left in the readBuffer. Move them into the wrapBuffer + if (this.wrapBuffer.length - countInWrapBuffer < countInReadBuffer) { + // reallocate + byte[] tmp = new byte[countInWrapBuffer + countInReadBuffer]; + System.arraycopy(this.wrapBuffer, 0, tmp, 0, countInWrapBuffer); + this.wrapBuffer = tmp; + } + + // copy readBuffer bytes to wrapBuffer + System.arraycopy(this.readBuffer, 0, this.wrapBuffer, countInWrapBuffer, countInReadBuffer); + countInWrapBuffer += countInReadBuffer; + } + this.offset += countInWrapBuffer; setResult(this.wrapBuffer, 0, countInWrapBuffer); return true; @@ -564,30 +585,30 @@ public abstract class DelimitedInputFormat extends FileInputFormat imple } } - int startPos = this.readPos; + int startPos = this.readPos - delimPos; int count; // Search for next occurence of delimiter in read buffer. - while (this.readPos < this.limit && i < this.delimiter.length) { - if ((this.readBuffer[this.readPos]) == this.delimiter[i]) { + while (this.readPos < this.limit && delimPos < this.delimiter.length) { + if ((this.readBuffer[this.readPos]) == this.delimiter[delimPos]) { // Found the expected delimiter character. Continue looking for the next character of delimiter. - i++; + delimPos++; } else { // Delimiter does not match. // We have to reset the read position to the character after the first matching character // and search for the whole delimiter again. - readPos -= i; - i = 0; + readPos -= delimPos; + delimPos = 0; } readPos++; } // check why we dropped out - if (i == this.delimiter.length) { - // delimiter found - int totalBytesRead = this.readPos - startPos; - this.offset += countInWrapBuffer + totalBytesRead; - count = totalBytesRead - this.delimiter.length; + if (delimPos == this.delimiter.length) { + // we found a delimiter + int readBufferBytesRead = this.readPos - startPos; + this.offset += countInWrapBuffer + readBufferBytesRead; + count = readBufferBytesRead - this.delimiter.length; // copy to byte array if (countInWrapBuffer > 0) { @@ -607,6 +628,7 @@ public abstract class DelimitedInputFormat extends FileInputFormat imple return true; } } else { + // we reached the end of the readBuffer count = this.limit - startPos; // check against the maximum record length @@ -615,16 +637,23 @@ public abstract class DelimitedInputFormat extends FileInputFormat imple this.lineLengthLimit + ")."); } - // buffer exhausted - if (this.wrapBuffer.length - countInWrapBuffer < count) { + // Compute number of bytes to move to wrapBuffer + // Chars of partially read delimiter must remain in the readBuffer. We might need to go back. + int bytesToMove = count - delimPos; + // ensure wrapBuffer is large enough + if (this.wrapBuffer.length - countInWrapBuffer < bytesToMove) { // reallocate - byte[] tmp = new byte[Math.max(this.wrapBuffer.length * 2, countInWrapBuffer + count)]; + byte[] tmp = new byte[Math.max(this.wrapBuffer.length * 2, countInWrapBuffer + bytesToMove)]; System.arraycopy(this.wrapBuffer, 0, tmp, 0, countInWrapBuffer); this.wrapBuffer = tmp; } - System.arraycopy(this.readBuffer, startPos, this.wrapBuffer, countInWrapBuffer, count); - countInWrapBuffer += count; + // copy readBuffer to wrapBuffer (except delimiter chars) + System.arraycopy(this.readBuffer, startPos, this.wrapBuffer, countInWrapBuffer, bytesToMove); + countInWrapBuffer += bytesToMove; + // move delimiter chars to the beginning of the readBuffer + System.arraycopy(this.readBuffer, this.readPos - delimPos, this.readBuffer, 0, delimPos); + } } } @@ -635,16 +664,20 @@ public abstract class DelimitedInputFormat extends FileInputFormat imple this.currLen = len; } - private boolean fillBuffer() throws IOException { + /** + * Fills the read buffer with bytes read from the file starting from an offset. + */ + private boolean fillBuffer(int offset) throws IOException { + int maxReadLength = this.readBuffer.length - offset; // special case for reading the whole split. if (this.splitLength == FileInputFormat.READ_WHOLE_SPLIT_FLAG) { - int read = this.stream.read(this.readBuffer, 0, readBuffer.length); + int read = this.stream.read(this.readBuffer, offset, maxReadLength); if (read == -1) { this.stream.close(); this.stream = null; return false; } else { - this.readPos = 0; + this.readPos = offset; this.limit = read; return true; } @@ -654,7 +687,7 @@ public abstract class DelimitedInputFormat extends FileInputFormat imple int toRead; if (this.splitLength > 0) { // if we have more data, read that - toRead = this.splitLength > this.readBuffer.length ? this.readBuffer.length : (int) this.splitLength; + toRead = this.splitLength > maxReadLength ? maxReadLength : (int) this.splitLength; } else { // if we have exhausted our split, we need to complete the current record, or read one @@ -662,11 +695,11 @@ public abstract class DelimitedInputFormat extends FileInputFormat imple // the reason is that the next split will skip over the beginning until it finds the first // delimiter, discarding it as an incomplete chunk of data that belongs to the last record in the // previous split. - toRead = this.readBuffer.length; + toRead = maxReadLength; this.overLimit = true; } - int read = this.stream.read(this.readBuffer, 0, toRead); + int read = this.stream.read(this.readBuffer, offset, toRead); if (read == -1) { this.stream.close(); @@ -674,8 +707,8 @@ public abstract class DelimitedInputFormat extends FileInputFormat imple return false; } else { this.splitLength -= read; - this.readPos = 0; - this.limit = read; + this.readPos = offset; // position from where to start reading + this.limit = read + offset; // number of valid bytes in the read buffer return true; } } @@ -726,7 +759,7 @@ public abstract class DelimitedInputFormat extends FileInputFormat imple this.stream.seek(this.offset); if (split.getLength() == -1) { // this is the case for unsplittable files - fillBuffer(); + fillBuffer(0); } else { this.splitLength = this.splitStart + split.getLength() - this.offset; if (splitLength <= 0) { http://git-wip-us.apache.org/repos/asf/flink/blob/be662bf7/flink-core/src/test/java/org/apache/flink/api/common/io/DelimitedInputFormatTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/api/common/io/DelimitedInputFormatTest.java b/flink-core/src/test/java/org/apache/flink/api/common/io/DelimitedInputFormatTest.java index 2ff5ee7..e2df391 100644 --- a/flink-core/src/test/java/org/apache/flink/api/common/io/DelimitedInputFormatTest.java +++ b/flink-core/src/test/java/org/apache/flink/api/common/io/DelimitedInputFormatTest.java @@ -45,7 +45,6 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; public class DelimitedInputFormatTest { @@ -402,6 +401,33 @@ public class DelimitedInputFormatTest { assertEquals(Arrays.asList(myString.split("\n")), result); } + @Test + public void testDelimiterOnBufferBoundary() throws IOException { + + String[] records = new String[]{"12345678901234567890", "12345678901234567890", ""}; + String delimiter = ""; + String fileContent = StringUtils.join(records, delimiter); + + + final FileInputSplit split = createTempFile(fileContent); + final Configuration parameters = new Configuration(); + + format.setBufferSize(12); + format.setDelimiter(delimiter); + format.configure(parameters); + format.open(split); + + for (String record : records) { + String value = format.nextRecord(null); + assertEquals(record, value); + } + + assertNull(format.nextRecord(null)); + assertTrue(format.reachedEnd()); + + format.close(); + } + static FileInputSplit createTempFile(String contents) throws IOException { File tempFile = File.createTempFile("test_contents", "tmp"); tempFile.deleteOnExit();