Return-Path: X-Original-To: apmail-crunch-commits-archive@www.apache.org Delivered-To: apmail-crunch-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id F3DAA1148D for ; Wed, 16 Jul 2014 01:46:52 +0000 (UTC) Received: (qmail 98946 invoked by uid 500); 16 Jul 2014 01:46:52 -0000 Delivered-To: apmail-crunch-commits-archive@crunch.apache.org Received: (qmail 98914 invoked by uid 500); 16 Jul 2014 01:46:52 -0000 Mailing-List: contact commits-help@crunch.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@crunch.apache.org Delivered-To: mailing list commits@crunch.apache.org Received: (qmail 98905 invoked by uid 99); 16 Jul 2014 01:46:52 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 16 Jul 2014 01:46:52 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 9465498AFB4; Wed, 16 Jul 2014 01:46:52 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 8bit From: mkwhit@apache.org To: commits@crunch.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: git commit: CRUNCH-414: CSVLineReader-Threshold-Logic Date: Wed, 16 Jul 2014 01:46:52 +0000 (UTC) Repository: crunch Updated Branches: refs/heads/master dac78384a -> 5201256c2 CRUNCH-414: CSVLineReader-Threshold-Logic Signed-off-by: Micah Whitacre Project: http://git-wip-us.apache.org/repos/asf/crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/5201256c Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/5201256c Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/5201256c Branch: refs/heads/master Commit: 5201256c20365de678d197de623a1034b3ae3f2e Parents: dac7838 Author: Mac Champion Authored: Fri Jun 27 16:00:48 2014 -0500 Committer: Micah Whitacre Committed: Tue Jul 15 20:30:45 2014 -0500 ---------------------------------------------------------------------- .../crunch/io/text/csv/CSVFileSourceIT.java | 5 +- .../io/text/csv/CSVFileReaderFactory.java | 29 +++++---- .../crunch/io/text/csv/CSVFileSource.java | 67 ++++++++++++++------ .../crunch/io/text/csv/CSVInputFormat.java | 32 +++++++--- .../crunch/io/text/csv/CSVLineReader.java | 59 +++++++++++------ .../crunch/io/text/csv/CSVReadableData.java | 28 +++++--- .../crunch/io/text/csv/CSVRecordIterator.java | 21 +++--- .../crunch/io/text/csv/CSVRecordReader.java | 16 +++-- .../crunch/io/text/csv/CSVLineReaderTest.java | 28 ++++++-- 9 files changed, 197 insertions(+), 88 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/crunch/blob/5201256c/crunch-core/src/it/java/org/apache/crunch/io/text/csv/CSVFileSourceIT.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/it/java/org/apache/crunch/io/text/csv/CSVFileSourceIT.java b/crunch-core/src/it/java/org/apache/crunch/io/text/csv/CSVFileSourceIT.java index ba8e193..b1c247f 100644 --- a/crunch-core/src/it/java/org/apache/crunch/io/text/csv/CSVFileSourceIT.java +++ b/crunch-core/src/it/java/org/apache/crunch/io/text/csv/CSVFileSourceIT.java @@ -99,7 +99,7 @@ public class CSVFileSourceIT { final Pipeline pipeline = new MRPipeline(CSVFileSourceIT.class, tmpDir.getDefaultConfiguration()); final PCollection csvLines = pipeline.read(new CSVFileSource(new Path(csvWithNewlines), CSVLineReader.DEFAULT_BUFFER_SIZE, CSVLineReader.DEFAULT_INPUT_FILE_ENCODING, '*', '*', - CSVLineReader.DEFAULT_ESCAPE_CHARACTER)); + CSVLineReader.DEFAULT_ESCAPE_CHARACTER, CSVLineReader.DEFAULT_MAXIMUM_RECORD_SIZE)); final Collection csvLinesList = csvLines.asCollection().getValue(); @@ -122,7 +122,8 @@ public class CSVFileSourceIT { final Pipeline pipeline = new MRPipeline(CSVFileSourceIT.class, tmpDir.getDefaultConfiguration()); final PCollection csvLines = pipeline.read(new CSVFileSource(new Path(chineseLines), - CSVLineReader.DEFAULT_BUFFER_SIZE, CSVLineReader.DEFAULT_INPUT_FILE_ENCODING, '“', '”', '、')); + CSVLineReader.DEFAULT_BUFFER_SIZE, CSVLineReader.DEFAULT_INPUT_FILE_ENCODING, '“', '”', '、', + CSVLineReader.DEFAULT_MAXIMUM_RECORD_SIZE)); final Collection csvLinesList = csvLines.asCollection().getValue(); for (int i = 0; i < expectedChineseLines.length; i++) { assertTrue(csvLinesList.contains(expectedChineseLines[i])); http://git-wip-us.apache.org/repos/asf/crunch/blob/5201256c/crunch-core/src/main/java/org/apache/crunch/io/text/csv/CSVFileReaderFactory.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/io/text/csv/CSVFileReaderFactory.java b/crunch-core/src/main/java/org/apache/crunch/io/text/csv/CSVFileReaderFactory.java index c1c687e..8d28439 100644 --- a/crunch-core/src/main/java/org/apache/crunch/io/text/csv/CSVFileReaderFactory.java +++ b/crunch-core/src/main/java/org/apache/crunch/io/text/csv/CSVFileReaderFactory.java @@ -35,11 +35,12 @@ import com.google.common.collect.Iterators; */ public class CSVFileReaderFactory implements FileReaderFactory { private static final Log LOG = LogFactory.getLog(CSVFileReaderFactory.class); - private int bufferSize; - private String inputFileEncoding; - private char openQuoteChar; - private char closeQuoteChar; - private char escapeChar; + private final int bufferSize; + private final String inputFileEncoding; + private final char openQuoteChar; + private final char closeQuoteChar; + private final char escapeChar; + private final int maximumRecordSize; /** * Creates a new {@code CSVFileReaderFactory} instance with default @@ -48,11 +49,11 @@ public class CSVFileReaderFactory implements FileReaderFactory { CSVFileReaderFactory() { this(CSVLineReader.DEFAULT_BUFFER_SIZE, CSVLineReader.DEFAULT_INPUT_FILE_ENCODING, CSVLineReader.DEFAULT_QUOTE_CHARACTER, CSVLineReader.DEFAULT_QUOTE_CHARACTER, - CSVLineReader.DEFAULT_ESCAPE_CHARACTER); + CSVLineReader.DEFAULT_ESCAPE_CHARACTER, CSVLineReader.DEFAULT_MAXIMUM_RECORD_SIZE); } /** - * Creates a new {@code CSVFileReaderFactory} instance with custon + * Creates a new {@code CSVFileReaderFactory} instance with custom * configuration * * @param bufferSize @@ -70,23 +71,29 @@ public class CSVFileReaderFactory implements FileReaderFactory { * @param escapeChar * The character representing the escape character to be used in the * underlying {@code CSVLineReader} + * @param maximumRecordSize + * The maximum acceptable size of one CSV record. Beyond this limit, + * {@code CSVLineReader} will stop parsing and an exception will be + * thrown. */ CSVFileReaderFactory(final int bufferSize, final String inputFileEncoding, final char openQuoteChar, - final char closeQuoteChar, final char escapeChar) { + final char closeQuoteChar, final char escapeChar, final int maximumRecordSize) { this.bufferSize = bufferSize; this.inputFileEncoding = inputFileEncoding; this.openQuoteChar = openQuoteChar; this.closeQuoteChar = closeQuoteChar; this.escapeChar = escapeChar; + this.maximumRecordSize = maximumRecordSize; } @Override - public Iterator read(FileSystem fs, Path path) { + public Iterator read(final FileSystem fs, final Path path) { FSDataInputStream is; try { is = fs.open(path); - return new CSVRecordIterator(is, bufferSize, inputFileEncoding, openQuoteChar, closeQuoteChar, escapeChar); - } catch (IOException e) { + return new CSVRecordIterator(is, bufferSize, inputFileEncoding, openQuoteChar, closeQuoteChar, escapeChar, + maximumRecordSize); + } catch (final IOException e) { LOG.info("Could not read path: " + path, e); return Iterators.emptyIterator(); } http://git-wip-us.apache.org/repos/asf/crunch/blob/5201256c/crunch-core/src/main/java/org/apache/crunch/io/text/csv/CSVFileSource.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/io/text/csv/CSVFileSource.java b/crunch-core/src/main/java/org/apache/crunch/io/text/csv/CSVFileSource.java index 9021f78..d0a7631 100644 --- a/crunch-core/src/main/java/org/apache/crunch/io/text/csv/CSVFileSource.java +++ b/crunch-core/src/main/java/org/apache/crunch/io/text/csv/CSVFileSource.java @@ -66,11 +66,27 @@ public class CSVFileSource extends FileSourceImpl implements ReadableSou */ public static final String CSV_ESCAPE_CHAR = "csv.escapechar"; + /** + * The key used in the {@code CSVInputFormat}'s {@code FormatBundle} to set + * the underlying {@code CSVLineReader}'s maximum record size. If this is not + * set, INPUT_SPLIT_SIZE will be checked first, and if that is not set, 64mb + * will be assumed. + */ + public static final String MAXIMUM_RECORD_SIZE = "csv.maximumrecordsize"; + + /** + * The key used in the {@code CSVInputFormat}'s {@code FormatBundle} to set + * the underlying {@code CSVLineReader}'s input split size. If it is not set, + * 64mb will be assumed. + */ + public static final String INPUT_SPLIT_SIZE = "csv.inputsplitsize"; + private int bufferSize; private String inputFileEncoding; private char openQuoteChar; private char closeQuoteChar; private char escapeChar; + private int maximumRecordSize; /** * Create a new CSVFileSource instance @@ -78,10 +94,10 @@ public class CSVFileSource extends FileSourceImpl implements ReadableSou * @param path * The {@code Path} to the input data */ - public CSVFileSource(List paths) { + public CSVFileSource(final List paths) { this(paths, CSVLineReader.DEFAULT_BUFFER_SIZE, CSVLineReader.DEFAULT_INPUT_FILE_ENCODING, CSVLineReader.DEFAULT_QUOTE_CHARACTER, CSVLineReader.DEFAULT_QUOTE_CHARACTER, - CSVLineReader.DEFAULT_ESCAPE_CHARACTER); + CSVLineReader.DEFAULT_ESCAPE_CHARACTER, CSVLineReader.DEFAULT_MAXIMUM_RECORD_SIZE); } /** @@ -90,10 +106,10 @@ public class CSVFileSource extends FileSourceImpl implements ReadableSou * @param path * The {@code Path} to the input data */ - public CSVFileSource(Path path) { + public CSVFileSource(final Path path) { this(path, CSVLineReader.DEFAULT_BUFFER_SIZE, CSVLineReader.DEFAULT_INPUT_FILE_ENCODING, CSVLineReader.DEFAULT_QUOTE_CHARACTER, CSVLineReader.DEFAULT_QUOTE_CHARACTER, - CSVLineReader.DEFAULT_ESCAPE_CHARACTER); + CSVLineReader.DEFAULT_ESCAPE_CHARACTER, CSVLineReader.DEFAULT_MAXIMUM_RECORD_SIZE); } /** @@ -116,12 +132,16 @@ public class CSVFileSource extends FileSourceImpl implements ReadableSou * @param escapeChar * The character representing the escape character to be used in the * underlying {@code CSVLineReader} + * @param maximumRecordSize + * The maximum acceptable size of one CSV record. Beyond this limit, + * {@code CSVLineReader} will stop parsing and an exception will be + * thrown. */ - public CSVFileSource(List paths, final int bufferSize, final String inputFileEncoding, - final char openQuoteChar, final char closeQuoteChar, final char escapeChar) { + public CSVFileSource(final List paths, final int bufferSize, final String inputFileEncoding, + final char openQuoteChar, final char closeQuoteChar, final char escapeChar, final int maximumRecordSize) { super(paths, Writables.strings(), getCSVBundle(bufferSize, inputFileEncoding, openQuoteChar, closeQuoteChar, - escapeChar)); - setPrivateVariables(bufferSize, inputFileEncoding, openQuoteChar, closeQuoteChar, escapeChar); + escapeChar, maximumRecordSize)); + setPrivateVariables(bufferSize, inputFileEncoding, openQuoteChar, closeQuoteChar, escapeChar, maximumRecordSize); } /** @@ -144,23 +164,28 @@ public class CSVFileSource extends FileSourceImpl implements ReadableSou * @param escapeChar * The character representing the escape character to be used in the * underlying {@code CSVLineReader} + * @param maximumRecordSize + * The maximum acceptable size of one CSV record. Beyond this limit, + * {@code CSVLineReader} will stop parsing and an exception will be + * thrown. */ - public CSVFileSource(Path path, final int bufferSize, final String inputFileEncoding, final char openQuoteChar, - final char closeQuoteChar, final char escapeChar) { + public CSVFileSource(final Path path, final int bufferSize, final String inputFileEncoding, final char openQuoteChar, + final char closeQuoteChar, final char escapeChar, final int maximumRecordSize) { super(path, Writables.strings(), getCSVBundle(bufferSize, inputFileEncoding, openQuoteChar, closeQuoteChar, - escapeChar)); - setPrivateVariables(bufferSize, inputFileEncoding, openQuoteChar, closeQuoteChar, escapeChar); + escapeChar, maximumRecordSize)); + setPrivateVariables(bufferSize, inputFileEncoding, openQuoteChar, closeQuoteChar, escapeChar, maximumRecordSize); } @Override - public Iterable read(Configuration conf) throws IOException { - return read(conf, - new CSVFileReaderFactory(bufferSize, inputFileEncoding, openQuoteChar, closeQuoteChar, escapeChar)); + public Iterable read(final Configuration conf) throws IOException { + return read(conf, new CSVFileReaderFactory(bufferSize, inputFileEncoding, openQuoteChar, closeQuoteChar, + escapeChar, maximumRecordSize)); } @Override public ReadableData asReadable() { - return new CSVReadableData(paths, bufferSize, inputFileEncoding, openQuoteChar, closeQuoteChar, escapeChar); + return new CSVReadableData(paths, bufferSize, inputFileEncoding, openQuoteChar, closeQuoteChar, escapeChar, + maximumRecordSize); } @Override @@ -173,19 +198,20 @@ public class CSVFileSource extends FileSourceImpl implements ReadableSou * by {@code CSVInputFormat} */ private static FormatBundle getCSVBundle(final int bufferSize, final String inputFileEncoding, - final char openQuoteChar, final char closeQuoteChar, final char escapeChar) { - FormatBundle bundle = FormatBundle.forInput(CSVInputFormat.class); + final char openQuoteChar, final char closeQuoteChar, final char escapeChar, final int maximumRecordSize) { + final FormatBundle bundle = FormatBundle.forInput(CSVInputFormat.class); bundle.set(RuntimeParameters.DISABLE_COMBINE_FILE, "true"); bundle.set(CSV_BUFFER_SIZE, String.valueOf(bufferSize)); bundle.set(CSV_INPUT_FILE_ENCODING, String.valueOf(inputFileEncoding)); bundle.set(CSV_OPEN_QUOTE_CHAR, String.valueOf(openQuoteChar)); bundle.set(CSV_CLOSE_QUOTE_CHAR, String.valueOf(closeQuoteChar)); bundle.set(CSV_ESCAPE_CHAR, String.valueOf(escapeChar)); + bundle.set(MAXIMUM_RECORD_SIZE, String.valueOf(maximumRecordSize)); return bundle; } private void setPrivateVariables(final int bufferSize, final String inputFileEncoding, final char openQuoteChar, - final char closeQuoteChar, final char escapeChar) { + final char closeQuoteChar, final char escapeChar, final int maximumRecordSize) { if (isSameCharacter(openQuoteChar, escapeChar)) { throw new IllegalArgumentException("The open quote (" + openQuoteChar + ") and escape (" + escapeChar + ") characters must be different!"); @@ -199,8 +225,9 @@ public class CSVFileSource extends FileSourceImpl implements ReadableSou this.openQuoteChar = openQuoteChar; this.closeQuoteChar = closeQuoteChar; this.escapeChar = escapeChar; + this.maximumRecordSize = maximumRecordSize; } - + private boolean isSameCharacter(final char c1, final char c2) { return c2 == c1; } http://git-wip-us.apache.org/repos/asf/crunch/blob/5201256c/crunch-core/src/main/java/org/apache/crunch/io/text/csv/CSVInputFormat.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/io/text/csv/CSVInputFormat.java b/crunch-core/src/main/java/org/apache/crunch/io/text/csv/CSVInputFormat.java index 867b704..8403f29 100644 --- a/crunch-core/src/main/java/org/apache/crunch/io/text/csv/CSVInputFormat.java +++ b/crunch-core/src/main/java/org/apache/crunch/io/text/csv/CSVInputFormat.java @@ -50,6 +50,7 @@ public class CSVInputFormat extends FileInputFormat implemen private char openQuoteChar; private char closeQuoteChar; private char escapeChar; + private int maximumRecordSize; private Configuration configuration; /** @@ -59,17 +60,20 @@ public class CSVInputFormat extends FileInputFormat implemen * the {@link InputSplit} that will be assigned to the record reader * @param context * the {@TaskAttemptContext} for the job - * @return an instance of {@link CSVRecordReader} created using configured separator, quote, and escape characters. + * @return an instance of {@link CSVRecordReader} created using configured + * separator, quote, escape, and maximum record size. */ @Override public RecordReader createRecordReader(final InputSplit split, final TaskAttemptContext context) { return new CSVRecordReader(this.bufferSize, this.inputFileEncoding, this.openQuoteChar, this.closeQuoteChar, - this.escapeChar); + this.escapeChar, this.maximumRecordSize); } /** * A method used by crunch to calculate the splits for each file. This will - * split each CSV file at the end of a valid CSV record. + * split each CSV file at the end of a valid CSV record. The default split + * size is 64mb, but this can be reconfigured by setting the + * "csv.inputsplitsize" option in the job configuration. * * @param job * the {@link JobContext} for the current job. @@ -79,10 +83,10 @@ public class CSVInputFormat extends FileInputFormat implemen */ @Override public List getSplits(final JobContext job) throws IOException { - final long splitSize = job.getConfiguration().getLong("csv.input.split.size", 67108864); + final long splitSize = job.getConfiguration().getLong(CSVFileSource.INPUT_SPLIT_SIZE, 67108864); final List splits = new ArrayList(); final Path[] paths = FileUtil.stat2Paths(listStatus(job).toArray(new FileStatus[0])); - FileSystem fileSystem = FileSystem.get(job.getConfiguration()); + final FileSystem fileSystem = FileSystem.get(job.getConfiguration()); FSDataInputStream inputStream = null; try { for (final Path path : paths) { @@ -91,7 +95,7 @@ public class CSVInputFormat extends FileInputFormat implemen } return splits; } finally { - if(inputStream != null) { + if (inputStream != null) { inputStream.close(); } } @@ -135,13 +139,13 @@ public class CSVInputFormat extends FileInputFormat implemen // we need to create a new CSVLineReader around the stream. inputStream.seek(currentPosition); final CSVLineReader csvLineReader = new CSVLineReader(inputStream, this.bufferSize, this.inputFileEncoding, - this.openQuoteChar, this.closeQuoteChar, this.escapeChar); + this.openQuoteChar, this.closeQuoteChar, this.escapeChar, this.maximumRecordSize); // This line is potentially garbage because we most likely just sought to // the middle of a line. Read the rest of the line and leave it for the // previous split. Then reset the multi-line CSV record boolean, because // the partial line will have a very high chance of falsely triggering the - // class wide multi-line logic. + // class-wide multi-line logic. currentPosition += csvLineReader.readFileLine(new Text()); csvLineReader.resetMultiLine(); @@ -182,6 +186,12 @@ public class CSVInputFormat extends FileInputFormat implemen * {@link CSVFileSource}'s private getBundle() method */ public void configure() { + + bufferSize = this.configuration.getInt(CSVFileSource.CSV_BUFFER_SIZE, -1); + if (bufferSize < 0) { + bufferSize = CSVLineReader.DEFAULT_BUFFER_SIZE; + } + final String bufferValue = this.configuration.get(CSVFileSource.CSV_BUFFER_SIZE); if ("".equals(bufferValue)) { bufferSize = CSVLineReader.DEFAULT_BUFFER_SIZE; @@ -216,5 +226,11 @@ public class CSVInputFormat extends FileInputFormat implemen } else { escapeChar = escapeCharValue.charAt(0); } + + maximumRecordSize = this.configuration.getInt(CSVFileSource.MAXIMUM_RECORD_SIZE, -1); + if (maximumRecordSize < 0) { + maximumRecordSize = this.configuration.getInt(CSVFileSource.INPUT_SPLIT_SIZE, + CSVLineReader.DEFAULT_MAXIMUM_RECORD_SIZE); + } } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/crunch/blob/5201256c/crunch-core/src/main/java/org/apache/crunch/io/text/csv/CSVLineReader.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/io/text/csv/CSVLineReader.java b/crunch-core/src/main/java/org/apache/crunch/io/text/csv/CSVLineReader.java index 83e2abe..79af67d 100644 --- a/crunch-core/src/main/java/org/apache/crunch/io/text/csv/CSVLineReader.java +++ b/crunch-core/src/main/java/org/apache/crunch/io/text/csv/CSVLineReader.java @@ -29,6 +29,8 @@ import java.nio.charset.CharsetEncoder; import javax.annotation.ParametersAreNonnullByDefault; import org.apache.hadoop.io.Text; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import com.google.common.base.Preconditions; @@ -38,6 +40,7 @@ import com.google.common.base.Preconditions; */ @ParametersAreNonnullByDefault public class CSVLineReader { + private static final Logger LOGGER = LoggerFactory.getLogger(CSVLineReader.class); // InputStream related variables /** @@ -74,6 +77,12 @@ public class CSVLineReader { * The default input file encoding to read with, UTF-8 */ public static final String DEFAULT_INPUT_FILE_ENCODING = "UTF-8"; + /** + * The default input maximum record size + */ + public static final int DEFAULT_MAXIMUM_RECORD_SIZE = 67108864; + + private final int maximumRecordSize; private final char openQuoteChar; private final char closeQuoteChar; private final char escape; @@ -96,7 +105,7 @@ public class CSVLineReader { */ public CSVLineReader(final InputStream inputStream) throws UnsupportedEncodingException { this(inputStream, DEFAULT_BUFFER_SIZE, DEFAULT_INPUT_FILE_ENCODING, DEFAULT_QUOTE_CHARACTER, - DEFAULT_QUOTE_CHARACTER, DEFAULT_ESCAPE_CHARACTER); + DEFAULT_QUOTE_CHARACTER, DEFAULT_ESCAPE_CHARACTER, DEFAULT_MAXIMUM_RECORD_SIZE); } /** @@ -118,10 +127,13 @@ public class CSVLineReader { * Used to specify a custom close quote character * @param escape * Used to specify a custom escape character + * @param maximumRecordSize + * The maximum acceptable size of one CSV record. Beyond this limit, + * parsing will stop and an exception will be thrown. * @throws UnsupportedEncodingException */ public CSVLineReader(final InputStream inputStream, final int bufferSize, final String inputFileEncoding, - final char openQuoteChar, final char closeQuoteChar, final char escapeChar) { + final char openQuoteChar, final char closeQuoteChar, final char escapeChar, final int maximumRecordSize) { Preconditions.checkNotNull(inputStream, "inputStream may not be null"); Preconditions.checkNotNull(inputFileEncoding, "inputFileEncoding may not be null"); if (bufferSize <= 0) { @@ -151,6 +163,7 @@ public class CSVLineReader { this.escape = escapeChar; this.inputFileEncoding = inputFileEncoding; this.charsetEncoder = Charset.forName(inputFileEncoding).newEncoder(); + this.maximumRecordSize = maximumRecordSize; } /** @@ -177,23 +190,30 @@ public class CSVLineReader { throw new RuntimeException("Cannot begin reading a CSV record while inside of a multi-line CSV record."); } - inputText.clear(); + final StringBuilder stringBuilder = new StringBuilder(); do { + // Read a line from the file and add it to the builder + inputText.clear(); totalBytesConsumed += readFileLine(inputText); - // a line has been read. We need to see if we're still in quotes and tack - // on a newline if so + stringBuilder.append(inputText.toString()); + if (currentlyInQuotes && !endOfFile) { - // Add one LF to mark the line return, otherwise any multi-line CSV - // record will all be on one line. - inputText.set(new StringBuilder().append(inputText.toString()).append('\n').toString()); + // If we end up in a multi-line record, we need append a newline + stringBuilder.append('\n'); + + // Do a check on the total bytes consumed to see if something has gone + // wrong. + if (totalBytesConsumed > maximumRecordSize || totalBytesConsumed > Integer.MAX_VALUE) { + final String record = stringBuilder.toString(); + LOGGER.error("Possibly malformed file encountered. First line of record: " + + record.substring(0, record.indexOf('\n'))); + throw new IOException("Possibly malformed file encountered. Check log statements for more information"); + } } - } while (currentlyInQuotes); + } while (currentlyInQuotes && !endOfFile); - if (totalBytesConsumed > Integer.MAX_VALUE) { - throw new IOException("Too many bytes consumed before newline: " + Integer.MAX_VALUE); - } - - input.set(inputText); + // Set the input to the multi-line record + input.set(stringBuilder.toString()); return (int) totalBytesConsumed; } @@ -222,8 +242,8 @@ public class CSVLineReader { } // This integer keeps track of the number of newline characters used to - // terminate the line being read. This - // could be 1, in the case of LF or CR, or 2, in the case of CRLF. + // terminate the line being read. This could be 1, in the case of LF or CR, + // or 2, in the case of CRLF. int newlineLength = 0; int inputTextLength = 0; long bytesConsumed = 0; @@ -245,8 +265,7 @@ public class CSVLineReader { newlineLength = 0; // Iterate through the buffer looking for newline characters while keeping - // track of if we're in a field - // and/or in quotes. + // track of if we're in a field and/or in quotes. for (; bufferPosition < bufferLength; ++bufferPosition) { bytesConsumed += calculateCharacterByteLength(buffer[bufferPosition]); if (buffer[bufferPosition] == this.escape) { @@ -271,8 +290,8 @@ public class CSVLineReader { if (lastCharWasCR && buffer[bufferPosition] == LF) { lastCharWasCR = false; // Check for LF (in case of CRLF line endings) and increment the - // counter, skip it by moving the - // buffer position, then record the length of the LF. + // counter, skip it by moving the buffer position, then record the + // length of the LF. ++newlineLength; ++bufferPosition; bytesConsumed += calculateCharacterByteLength(buffer[bufferPosition]); http://git-wip-us.apache.org/repos/asf/crunch/blob/5201256c/crunch-core/src/main/java/org/apache/crunch/io/text/csv/CSVReadableData.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/io/text/csv/CSVReadableData.java b/crunch-core/src/main/java/org/apache/crunch/io/text/csv/CSVReadableData.java index 63e74d9..b266a08 100644 --- a/crunch-core/src/main/java/org/apache/crunch/io/text/csv/CSVReadableData.java +++ b/crunch-core/src/main/java/org/apache/crunch/io/text/csv/CSVReadableData.java @@ -25,11 +25,12 @@ import org.apache.hadoop.fs.Path; public class CSVReadableData extends ReadableDataImpl { - private int bufferSize; - private String inputFileEncoding; - private char openQuoteChar; - private char closeQuoteChar; - private char escapeChar; + private final int bufferSize; + private final String inputFileEncoding; + private final char openQuoteChar; + private final char closeQuoteChar; + private final char escapeChar; + private final int maximumRecordSize; /** * Creates an instance of {@code CSVReadableData} with default configuration @@ -37,14 +38,15 @@ public class CSVReadableData extends ReadableDataImpl { * @param paths * The paths of the files to be read */ - protected CSVReadableData(List paths) { + protected CSVReadableData(final List paths) { this(paths, CSVLineReader.DEFAULT_BUFFER_SIZE, CSVLineReader.DEFAULT_INPUT_FILE_ENCODING, CSVLineReader.DEFAULT_QUOTE_CHARACTER, CSVLineReader.DEFAULT_QUOTE_CHARACTER, - CSVLineReader.DEFAULT_ESCAPE_CHARACTER); + CSVLineReader.DEFAULT_ESCAPE_CHARACTER, CSVLineReader.DEFAULT_MAXIMUM_RECORD_SIZE); } /** * Creates an instance of {@code CSVReadableData} with specified configuration + * * @param paths * a list of input file paths * @param bufferSize @@ -57,19 +59,25 @@ public class CSVReadableData extends ReadableDataImpl { * the character to use to close quote blocks * @param escape * the character to use for escaping control characters and quotes + * @param maximumRecordSize + * The maximum acceptable size of one CSV record. Beyond this limit, + * {@code CSVLineReader} will stop parsing and an exception will be + * thrown. */ - protected CSVReadableData(List paths, final int bufferSize, final String inputFileEncoding, - final char openQuoteChar, final char closeQuoteChar, final char escapeChar) { + protected CSVReadableData(final List paths, final int bufferSize, final String inputFileEncoding, + final char openQuoteChar, final char closeQuoteChar, final char escapeChar, final int maximumRecordSize) { super(paths); this.bufferSize = bufferSize; this.inputFileEncoding = inputFileEncoding; this.openQuoteChar = openQuoteChar; this.closeQuoteChar = closeQuoteChar; this.escapeChar = escapeChar; + this.maximumRecordSize = maximumRecordSize; } @Override protected FileReaderFactory getFileReaderFactory() { - return new CSVFileReaderFactory(bufferSize, inputFileEncoding, openQuoteChar, closeQuoteChar, escapeChar); + return new CSVFileReaderFactory(bufferSize, inputFileEncoding, openQuoteChar, closeQuoteChar, escapeChar, + maximumRecordSize); } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/crunch/blob/5201256c/crunch-core/src/main/java/org/apache/crunch/io/text/csv/CSVRecordIterator.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/io/text/csv/CSVRecordIterator.java b/crunch-core/src/main/java/org/apache/crunch/io/text/csv/CSVRecordIterator.java index df5e39c..447a6d0 100644 --- a/crunch-core/src/main/java/org/apache/crunch/io/text/csv/CSVRecordIterator.java +++ b/crunch-core/src/main/java/org/apache/crunch/io/text/csv/CSVRecordIterator.java @@ -31,7 +31,7 @@ import com.google.common.io.Closeables; * An {@code Iterator} for an internally created {@code CSVLineReader} */ public class CSVRecordIterator implements Iterator, Closeable { - private CSVLineReader csvLineReader; + private final CSVLineReader csvLineReader; private InputStream inputStream; private String currentLine; @@ -45,7 +45,7 @@ public class CSVRecordIterator implements Iterator, Closeable { public CSVRecordIterator(final InputStream inputStream) throws UnsupportedEncodingException { this(inputStream, CSVLineReader.DEFAULT_BUFFER_SIZE, CSVLineReader.DEFAULT_INPUT_FILE_ENCODING, CSVLineReader.DEFAULT_QUOTE_CHARACTER, CSVLineReader.DEFAULT_QUOTE_CHARACTER, - CSVLineReader.DEFAULT_ESCAPE_CHARACTER); + CSVLineReader.DEFAULT_ESCAPE_CHARACTER, CSVLineReader.DEFAULT_MAXIMUM_RECORD_SIZE); } /** @@ -63,12 +63,17 @@ public class CSVRecordIterator implements Iterator, Closeable { * the character to use to close quote blocks * @param escape * the character to use for escaping control characters and quotes + * @param maximumRecordSize + * The maximum acceptable size of one CSV record. Beyond this limit, + * {@code CSVLineReader} will stop parsing and an exception will be + * thrown. * @throws UnsupportedEncodingException */ public CSVRecordIterator(final InputStream inputStream, final int bufferSize, final String inputFileEncoding, - final char openQuoteChar, final char closeQuoteChar, final char escapeChar) throws UnsupportedEncodingException { + final char openQuoteChar, final char closeQuoteChar, final char escapeChar, final int maximumRecordSize) + throws UnsupportedEncodingException { csvLineReader = new CSVLineReader(inputStream, bufferSize, inputFileEncoding, openQuoteChar, closeQuoteChar, - escapeChar); + escapeChar, maximumRecordSize); this.inputStream = inputStream; incrementValue(); } @@ -84,7 +89,7 @@ public class CSVRecordIterator implements Iterator, Closeable { @Override public String next() { - String result = currentLine; + final String result = currentLine; incrementValue(); return result; } @@ -95,13 +100,13 @@ public class CSVRecordIterator implements Iterator, Closeable { } private void incrementValue() { - Text tempText = new Text(); + final Text tempText = new Text(); try { csvLineReader.readCSVLine(tempText); - } catch (IOException e) { + } catch (final IOException e) { throw new RuntimeException("A problem occurred accessing the underlying CSV file stream.", e); } - String tempTextAsString = tempText.toString(); + final String tempTextAsString = tempText.toString(); if ("".equals(tempTextAsString)) { currentLine = null; } else { http://git-wip-us.apache.org/repos/asf/crunch/blob/5201256c/crunch-core/src/main/java/org/apache/crunch/io/text/csv/CSVRecordReader.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/io/text/csv/CSVRecordReader.java b/crunch-core/src/main/java/org/apache/crunch/io/text/csv/CSVRecordReader.java index d04da98..192a018 100644 --- a/crunch-core/src/main/java/org/apache/crunch/io/text/csv/CSVRecordReader.java +++ b/crunch-core/src/main/java/org/apache/crunch/io/text/csv/CSVRecordReader.java @@ -51,6 +51,7 @@ public class CSVRecordReader extends RecordReader { private final char escape; private final String inputFileEncoding; private final int fileStreamBufferSize; + private final int maximumRecordSize; private int totalRecordsRead = 0; @@ -60,7 +61,7 @@ public class CSVRecordReader extends RecordReader { public CSVRecordReader() { this(CSVLineReader.DEFAULT_BUFFER_SIZE, CSVLineReader.DEFAULT_INPUT_FILE_ENCODING, CSVLineReader.DEFAULT_QUOTE_CHARACTER, CSVLineReader.DEFAULT_QUOTE_CHARACTER, - CSVLineReader.DEFAULT_ESCAPE_CHARACTER); + CSVLineReader.DEFAULT_ESCAPE_CHARACTER, CSVLineReader.DEFAULT_MAXIMUM_RECORD_SIZE); } /** @@ -77,9 +78,13 @@ public class CSVRecordReader extends RecordReader { * the character to use to close quote blocks * @param escape * the character to use for escaping control characters and quotes + * @param maximumRecordSize + * The maximum acceptable size of one CSV record. Beyond this limit, + * {@code CSVLineReader} will stop parsing and an exception will be + * thrown. */ public CSVRecordReader(final int bufferSize, final String inputFileEncoding, final char openQuote, - final char closeQuote, final char escape) { + final char closeQuote, final char escape, final int maximumRecordSize) { Preconditions.checkNotNull(openQuote, "quote cannot be null."); Preconditions.checkNotNull(closeQuote, "quote cannot be null."); Preconditions.checkNotNull(escape, "escape cannot be null."); @@ -89,6 +94,7 @@ public class CSVRecordReader extends RecordReader { this.openQuote = openQuote; this.closeQuote = closeQuote; this.escape = escape; + this.maximumRecordSize = maximumRecordSize; } /** @@ -116,12 +122,12 @@ public class CSVRecordReader extends RecordReader { LOGGER.info("Split starts at: " + start); LOGGER.info("Split will end at: " + end); - // Open the file, seek to the start of the split, then wrap it in a - // CSVLineReader + // Open the file, seek to the start of the split + // then wrap it in a CSVLineReader fileIn = file.getFileSystem(job).open(file); fileIn.seek(start); csvLineReader = new CSVLineReader(fileIn, this.fileStreamBufferSize, inputFileEncoding, this.openQuote, - this.closeQuote, this.escape); + this.closeQuote, this.escape, this.maximumRecordSize); } /** http://git-wip-us.apache.org/repos/asf/crunch/blob/5201256c/crunch-core/src/test/java/org/apache/crunch/io/text/csv/CSVLineReaderTest.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/test/java/org/apache/crunch/io/text/csv/CSVLineReaderTest.java b/crunch-core/src/test/java/org/apache/crunch/io/text/csv/CSVLineReaderTest.java index 55f2f1f..2c51c41 100644 --- a/crunch-core/src/test/java/org/apache/crunch/io/text/csv/CSVLineReaderTest.java +++ b/crunch-core/src/test/java/org/apache/crunch/io/text/csv/CSVLineReaderTest.java @@ -17,7 +17,7 @@ */ package org.apache.crunch.io.text.csv; -import static org.junit.Assert.*; +import static org.junit.Assert.assertEquals; import java.io.File; import java.io.FileInputStream; @@ -36,7 +36,7 @@ public class CSVLineReaderTest { @Test public void testVariousUTF8Characters() throws IOException { final String variousCharacters = "€Abبώиב¥£€¢₡₢₣₤₥₦§₧₨₩₪₫₭₮漢Ä©óíßä"; - String utf8Junk = tmpDir.copyResourceFileName("UTF8.csv"); + final String utf8Junk = tmpDir.copyResourceFileName("UTF8.csv"); FileInputStream fileInputStream = null; try { @@ -60,12 +60,32 @@ public class CSVLineReaderTest { public void testBrokenLineParsingInChinese() throws IOException { final String[] expectedChineseLines = { "您好我叫马克,我从亚拉巴马州来,我是软件工程师,我二十八岁", "我有一个宠物,它是一个小猫,它六岁,它很漂亮", "我喜欢吃饭,“我觉得这个饭最好\n*蛋糕\n*包子\n*冰淇淋\n*啤酒“,他们都很好,我也很喜欢奶酪但它是不健康的", "我是男的,我的头发很短,我穿蓝色的裤子,“我穿黑色的、“衣服”" }; - String chineseLines = tmpDir.copyResourceFileName("brokenChineseLines.csv"); + final String chineseLines = tmpDir.copyResourceFileName("brokenChineseLines.csv"); FileInputStream fileInputStream = null; try { fileInputStream = new FileInputStream(new File(chineseLines)); final CSVLineReader csvLineReader = new CSVLineReader(fileInputStream, CSVLineReader.DEFAULT_BUFFER_SIZE, - CSVLineReader.DEFAULT_INPUT_FILE_ENCODING, '“', '”', '、'); + CSVLineReader.DEFAULT_INPUT_FILE_ENCODING, '“', '”', '、', CSVLineReader.DEFAULT_MAXIMUM_RECORD_SIZE); + for (int i = 0; i < expectedChineseLines.length; i++) { + final Text readText = new Text(); + csvLineReader.readCSVLine(readText); + assertEquals(expectedChineseLines[i], readText.toString()); + } + } finally { + fileInputStream.close(); + } + } + + @Test(expected = IOException.class) + public void testMalformedBrokenLineParsingInChinese() throws IOException { + final String[] expectedChineseLines = { "您好我叫马克,我从亚拉巴马州来,我是软件工程师,我二十八岁", "我有一个宠物,它是一个小猫,它六岁,它很漂亮", + "我喜欢吃饭,“我觉得这个饭最好\n*蛋糕\n*包子\n*冰淇淋\n*啤酒“,他们都很好,我也很喜欢奶酪但它是不健康的", "我是男的,我的头发很短,我穿蓝色的裤子,“我穿黑色的、“衣服”" }; + final String chineseLines = tmpDir.copyResourceFileName("brokenChineseLines.csv"); + FileInputStream fileInputStream = null; + try { + fileInputStream = new FileInputStream(new File(chineseLines)); + final CSVLineReader csvLineReader = new CSVLineReader(fileInputStream, CSVLineReader.DEFAULT_BUFFER_SIZE, + CSVLineReader.DEFAULT_INPUT_FILE_ENCODING, '“', '”', '、', 5); for (int i = 0; i < expectedChineseLines.length; i++) { final Text readText = new Text(); csvLineReader.readCSVLine(readText);