crunch-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mkw...@apache.org
Subject git commit: CRUNCH-414: CSVLineReader-Threshold-Logic
Date Wed, 16 Jul 2014 01:56:11 GMT
Repository: crunch
Updated Branches:
  refs/heads/apache-crunch-0.8 6e52d7fc8 -> 191e11464


CRUNCH-414: CSVLineReader-Threshold-Logic

Signed-off-by: Micah Whitacre <mkwhit@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/191e1146
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/191e1146
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/191e1146

Branch: refs/heads/apache-crunch-0.8
Commit: 191e11464eb307198a7e507f49d4cd8d9099e608
Parents: 6e52d7f
Author: Mac Champion <mac.champion@cerner.com>
Authored: Fri Jun 27 16:00:48 2014 -0500
Committer: Micah Whitacre <mkwhit@apache.org>
Committed: Tue Jul 15 20:47:30 2014 -0500

----------------------------------------------------------------------
 .../crunch/io/text/csv/CSVFileSourceIT.java     |  5 +-
 .../io/text/csv/CSVFileReaderFactory.java       | 29 +++++----
 .../crunch/io/text/csv/CSVFileSource.java       | 67 ++++++++++++++------
 .../crunch/io/text/csv/CSVInputFormat.java      | 32 +++++++---
 .../crunch/io/text/csv/CSVLineReader.java       | 59 +++++++++++------
 .../crunch/io/text/csv/CSVReadableData.java     | 28 +++++---
 .../crunch/io/text/csv/CSVRecordIterator.java   | 21 +++---
 .../crunch/io/text/csv/CSVRecordReader.java     | 16 +++--
 .../crunch/io/text/csv/CSVLineReaderTest.java   | 28 ++++++--
 9 files changed, 197 insertions(+), 88 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/crunch/blob/191e1146/crunch-core/src/it/java/org/apache/crunch/io/text/csv/CSVFileSourceIT.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/it/java/org/apache/crunch/io/text/csv/CSVFileSourceIT.java b/crunch-core/src/it/java/org/apache/crunch/io/text/csv/CSVFileSourceIT.java
index ba8e193..b1c247f 100644
--- a/crunch-core/src/it/java/org/apache/crunch/io/text/csv/CSVFileSourceIT.java
+++ b/crunch-core/src/it/java/org/apache/crunch/io/text/csv/CSVFileSourceIT.java
@@ -99,7 +99,7 @@ public class CSVFileSourceIT {
     final Pipeline pipeline = new MRPipeline(CSVFileSourceIT.class, tmpDir.getDefaultConfiguration());
     final PCollection<String> csvLines = pipeline.read(new CSVFileSource(new Path(csvWithNewlines),
         CSVLineReader.DEFAULT_BUFFER_SIZE, CSVLineReader.DEFAULT_INPUT_FILE_ENCODING, '*',
'*',
-        CSVLineReader.DEFAULT_ESCAPE_CHARACTER));
+        CSVLineReader.DEFAULT_ESCAPE_CHARACTER, CSVLineReader.DEFAULT_MAXIMUM_RECORD_SIZE));
 
     final Collection<String> csvLinesList = csvLines.asCollection().getValue();
 
@@ -122,7 +122,8 @@ public class CSVFileSourceIT {
 
     final Pipeline pipeline = new MRPipeline(CSVFileSourceIT.class, tmpDir.getDefaultConfiguration());
     final PCollection<String> csvLines = pipeline.read(new CSVFileSource(new Path(chineseLines),
-        CSVLineReader.DEFAULT_BUFFER_SIZE, CSVLineReader.DEFAULT_INPUT_FILE_ENCODING, '“',
'”', '、'));
+        CSVLineReader.DEFAULT_BUFFER_SIZE, CSVLineReader.DEFAULT_INPUT_FILE_ENCODING, '“',
'”', '、',
+        CSVLineReader.DEFAULT_MAXIMUM_RECORD_SIZE));
     final Collection<String> csvLinesList = csvLines.asCollection().getValue();
     for (int i = 0; i < expectedChineseLines.length; i++) {
       assertTrue(csvLinesList.contains(expectedChineseLines[i]));

http://git-wip-us.apache.org/repos/asf/crunch/blob/191e1146/crunch-core/src/main/java/org/apache/crunch/io/text/csv/CSVFileReaderFactory.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/text/csv/CSVFileReaderFactory.java
b/crunch-core/src/main/java/org/apache/crunch/io/text/csv/CSVFileReaderFactory.java
index c1c687e..8d28439 100644
--- a/crunch-core/src/main/java/org/apache/crunch/io/text/csv/CSVFileReaderFactory.java
+++ b/crunch-core/src/main/java/org/apache/crunch/io/text/csv/CSVFileReaderFactory.java
@@ -35,11 +35,12 @@ import com.google.common.collect.Iterators;
  */
 public class CSVFileReaderFactory implements FileReaderFactory<String> {
   private static final Log LOG = LogFactory.getLog(CSVFileReaderFactory.class);
-  private int bufferSize;
-  private String inputFileEncoding;
-  private char openQuoteChar;
-  private char closeQuoteChar;
-  private char escapeChar;
+  private final int bufferSize;
+  private final String inputFileEncoding;
+  private final char openQuoteChar;
+  private final char closeQuoteChar;
+  private final char escapeChar;
+  private final int maximumRecordSize;
 
   /**
    * Creates a new {@code CSVFileReaderFactory} instance with default
@@ -48,11 +49,11 @@ public class CSVFileReaderFactory implements FileReaderFactory<String>
{
   CSVFileReaderFactory() {
     this(CSVLineReader.DEFAULT_BUFFER_SIZE, CSVLineReader.DEFAULT_INPUT_FILE_ENCODING,
         CSVLineReader.DEFAULT_QUOTE_CHARACTER, CSVLineReader.DEFAULT_QUOTE_CHARACTER,
-        CSVLineReader.DEFAULT_ESCAPE_CHARACTER);
+        CSVLineReader.DEFAULT_ESCAPE_CHARACTER, CSVLineReader.DEFAULT_MAXIMUM_RECORD_SIZE);
   }
 
   /**
-   * Creates a new {@code CSVFileReaderFactory} instance with custon
+   * Creates a new {@code CSVFileReaderFactory} instance with custom
    * configuration
    * 
    * @param bufferSize
@@ -70,23 +71,29 @@ public class CSVFileReaderFactory implements FileReaderFactory<String>
{
    * @param escapeChar
    *          The character representing the escape character to be used in the
    *          underlying {@code CSVLineReader}
+   * @param maximumRecordSize
+   *          The maximum acceptable size of one CSV record. Beyond this limit,
+   *          {@code CSVLineReader} will stop parsing and an exception will be
+   *          thrown.
    */
   CSVFileReaderFactory(final int bufferSize, final String inputFileEncoding, final char openQuoteChar,
-      final char closeQuoteChar, final char escapeChar) {
+      final char closeQuoteChar, final char escapeChar, final int maximumRecordSize) {
     this.bufferSize = bufferSize;
     this.inputFileEncoding = inputFileEncoding;
     this.openQuoteChar = openQuoteChar;
     this.closeQuoteChar = closeQuoteChar;
     this.escapeChar = escapeChar;
+    this.maximumRecordSize = maximumRecordSize;
   }
 
   @Override
-  public Iterator<String> read(FileSystem fs, Path path) {
+  public Iterator<String> read(final FileSystem fs, final Path path) {
     FSDataInputStream is;
     try {
       is = fs.open(path);
-      return new CSVRecordIterator(is, bufferSize, inputFileEncoding, openQuoteChar, closeQuoteChar,
escapeChar);
-    } catch (IOException e) {
+      return new CSVRecordIterator(is, bufferSize, inputFileEncoding, openQuoteChar, closeQuoteChar,
escapeChar,
+          maximumRecordSize);
+    } catch (final IOException e) {
       LOG.info("Could not read path: " + path, e);
       return Iterators.emptyIterator();
     }

http://git-wip-us.apache.org/repos/asf/crunch/blob/191e1146/crunch-core/src/main/java/org/apache/crunch/io/text/csv/CSVFileSource.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/text/csv/CSVFileSource.java b/crunch-core/src/main/java/org/apache/crunch/io/text/csv/CSVFileSource.java
index 9021f78..d0a7631 100644
--- a/crunch-core/src/main/java/org/apache/crunch/io/text/csv/CSVFileSource.java
+++ b/crunch-core/src/main/java/org/apache/crunch/io/text/csv/CSVFileSource.java
@@ -66,11 +66,27 @@ public class CSVFileSource extends FileSourceImpl<String> implements
ReadableSou
    */
   public static final String CSV_ESCAPE_CHAR = "csv.escapechar";
 
+  /**
+   * The key used in the {@code CSVInputFormat}'s {@code FormatBundle} to set
+   * the underlying {@code CSVLineReader}'s maximum record size. If this is not
+   * set, INPUT_SPLIT_SIZE will be checked first, and if that is not set, 64mb
+   * will be assumed.
+   */
+  public static final String MAXIMUM_RECORD_SIZE = "csv.maximumrecordsize";
+
+  /**
+   * The key used in the {@code CSVInputFormat}'s {@code FormatBundle} to set
+   * the underlying {@code CSVLineReader}'s input split size. If it is not set,
+   * 64mb will be assumed.
+   */
+  public static final String INPUT_SPLIT_SIZE = "csv.inputsplitsize";
+
   private int bufferSize;
   private String inputFileEncoding;
   private char openQuoteChar;
   private char closeQuoteChar;
   private char escapeChar;
+  private int maximumRecordSize;
 
   /**
    * Create a new CSVFileSource instance
@@ -78,10 +94,10 @@ public class CSVFileSource extends FileSourceImpl<String> implements
ReadableSou
    * @param path
    *          The {@code Path} to the input data
    */
-  public CSVFileSource(List<Path> paths) {
+  public CSVFileSource(final List<Path> paths) {
     this(paths, CSVLineReader.DEFAULT_BUFFER_SIZE, CSVLineReader.DEFAULT_INPUT_FILE_ENCODING,
         CSVLineReader.DEFAULT_QUOTE_CHARACTER, CSVLineReader.DEFAULT_QUOTE_CHARACTER,
-        CSVLineReader.DEFAULT_ESCAPE_CHARACTER);
+        CSVLineReader.DEFAULT_ESCAPE_CHARACTER, CSVLineReader.DEFAULT_MAXIMUM_RECORD_SIZE);
   }
 
   /**
@@ -90,10 +106,10 @@ public class CSVFileSource extends FileSourceImpl<String> implements
ReadableSou
    * @param path
    *          The {@code Path} to the input data
    */
-  public CSVFileSource(Path path) {
+  public CSVFileSource(final Path path) {
     this(path, CSVLineReader.DEFAULT_BUFFER_SIZE, CSVLineReader.DEFAULT_INPUT_FILE_ENCODING,
         CSVLineReader.DEFAULT_QUOTE_CHARACTER, CSVLineReader.DEFAULT_QUOTE_CHARACTER,
-        CSVLineReader.DEFAULT_ESCAPE_CHARACTER);
+        CSVLineReader.DEFAULT_ESCAPE_CHARACTER, CSVLineReader.DEFAULT_MAXIMUM_RECORD_SIZE);
   }
 
   /**
@@ -116,12 +132,16 @@ public class CSVFileSource extends FileSourceImpl<String> implements
ReadableSou
    * @param escapeChar
    *          The character representing the escape character to be used in the
    *          underlying {@code CSVLineReader}
+   * @param maximumRecordSize
+   *          The maximum acceptable size of one CSV record. Beyond this limit,
+   *          {@code CSVLineReader} will stop parsing and an exception will be
+   *          thrown.
    */
-  public CSVFileSource(List<Path> paths, final int bufferSize, final String inputFileEncoding,
-      final char openQuoteChar, final char closeQuoteChar, final char escapeChar) {
+  public CSVFileSource(final List<Path> paths, final int bufferSize, final String inputFileEncoding,
+      final char openQuoteChar, final char closeQuoteChar, final char escapeChar, final int
maximumRecordSize) {
     super(paths, Writables.strings(), getCSVBundle(bufferSize, inputFileEncoding, openQuoteChar,
closeQuoteChar,
-        escapeChar));
-    setPrivateVariables(bufferSize, inputFileEncoding, openQuoteChar, closeQuoteChar, escapeChar);
+        escapeChar, maximumRecordSize));
+    setPrivateVariables(bufferSize, inputFileEncoding, openQuoteChar, closeQuoteChar, escapeChar,
maximumRecordSize);
   }
 
   /**
@@ -144,23 +164,28 @@ public class CSVFileSource extends FileSourceImpl<String> implements
ReadableSou
    * @param escapeChar
    *          The character representing the escape character to be used in the
    *          underlying {@code CSVLineReader}
+   * @param maximumRecordSize
+   *          The maximum acceptable size of one CSV record. Beyond this limit,
+   *          {@code CSVLineReader} will stop parsing and an exception will be
+   *          thrown.
    */
-  public CSVFileSource(Path path, final int bufferSize, final String inputFileEncoding, final
char openQuoteChar,
-      final char closeQuoteChar, final char escapeChar) {
+  public CSVFileSource(final Path path, final int bufferSize, final String inputFileEncoding,
final char openQuoteChar,
+      final char closeQuoteChar, final char escapeChar, final int maximumRecordSize) {
     super(path, Writables.strings(), getCSVBundle(bufferSize, inputFileEncoding, openQuoteChar,
closeQuoteChar,
-        escapeChar));
-    setPrivateVariables(bufferSize, inputFileEncoding, openQuoteChar, closeQuoteChar, escapeChar);
+        escapeChar, maximumRecordSize));
+    setPrivateVariables(bufferSize, inputFileEncoding, openQuoteChar, closeQuoteChar, escapeChar,
maximumRecordSize);
   }
 
   @Override
-  public Iterable<String> read(Configuration conf) throws IOException {
-    return read(conf,
-        new CSVFileReaderFactory(bufferSize, inputFileEncoding, openQuoteChar, closeQuoteChar,
escapeChar));
+  public Iterable<String> read(final Configuration conf) throws IOException {
+    return read(conf, new CSVFileReaderFactory(bufferSize, inputFileEncoding, openQuoteChar,
closeQuoteChar,
+        escapeChar, maximumRecordSize));
   }
 
   @Override
   public ReadableData<String> asReadable() {
-    return new CSVReadableData(paths, bufferSize, inputFileEncoding, openQuoteChar, closeQuoteChar,
escapeChar);
+    return new CSVReadableData(paths, bufferSize, inputFileEncoding, openQuoteChar, closeQuoteChar,
escapeChar,
+        maximumRecordSize);
   }
 
   @Override
@@ -173,19 +198,20 @@ public class CSVFileSource extends FileSourceImpl<String> implements
ReadableSou
    * by {@code CSVInputFormat}
    */
   private static FormatBundle<CSVInputFormat> getCSVBundle(final int bufferSize, final
String inputFileEncoding,
-      final char openQuoteChar, final char closeQuoteChar, final char escapeChar) {
-    FormatBundle<CSVInputFormat> bundle = FormatBundle.forInput(CSVInputFormat.class);
+      final char openQuoteChar, final char closeQuoteChar, final char escapeChar, final int
maximumRecordSize) {
+    final FormatBundle<CSVInputFormat> bundle = FormatBundle.forInput(CSVInputFormat.class);
     bundle.set(RuntimeParameters.DISABLE_COMBINE_FILE, "true");
     bundle.set(CSV_BUFFER_SIZE, String.valueOf(bufferSize));
     bundle.set(CSV_INPUT_FILE_ENCODING, String.valueOf(inputFileEncoding));
     bundle.set(CSV_OPEN_QUOTE_CHAR, String.valueOf(openQuoteChar));
     bundle.set(CSV_CLOSE_QUOTE_CHAR, String.valueOf(closeQuoteChar));
     bundle.set(CSV_ESCAPE_CHAR, String.valueOf(escapeChar));
+    bundle.set(MAXIMUM_RECORD_SIZE, String.valueOf(maximumRecordSize));
     return bundle;
   }
 
   private void setPrivateVariables(final int bufferSize, final String inputFileEncoding,
final char openQuoteChar,
-      final char closeQuoteChar, final char escapeChar) {
+      final char closeQuoteChar, final char escapeChar, final int maximumRecordSize) {
     if (isSameCharacter(openQuoteChar, escapeChar)) {
       throw new IllegalArgumentException("The open quote (" + openQuoteChar + ") and escape
(" + escapeChar
           + ") characters must be different!");
@@ -199,8 +225,9 @@ public class CSVFileSource extends FileSourceImpl<String> implements
ReadableSou
     this.openQuoteChar = openQuoteChar;
     this.closeQuoteChar = closeQuoteChar;
     this.escapeChar = escapeChar;
+    this.maximumRecordSize = maximumRecordSize;
   }
-  
+
   private boolean isSameCharacter(final char c1, final char c2) {
     return c2 == c1;
   }

http://git-wip-us.apache.org/repos/asf/crunch/blob/191e1146/crunch-core/src/main/java/org/apache/crunch/io/text/csv/CSVInputFormat.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/text/csv/CSVInputFormat.java b/crunch-core/src/main/java/org/apache/crunch/io/text/csv/CSVInputFormat.java
index 867b704..8403f29 100644
--- a/crunch-core/src/main/java/org/apache/crunch/io/text/csv/CSVInputFormat.java
+++ b/crunch-core/src/main/java/org/apache/crunch/io/text/csv/CSVInputFormat.java
@@ -50,6 +50,7 @@ public class CSVInputFormat extends FileInputFormat<LongWritable, Text>
implemen
   private char openQuoteChar;
   private char closeQuoteChar;
   private char escapeChar;
+  private int maximumRecordSize;
   private Configuration configuration;
 
   /**
@@ -59,17 +60,20 @@ public class CSVInputFormat extends FileInputFormat<LongWritable, Text>
implemen
    *          the {@link InputSplit} that will be assigned to the record reader
    * @param context
    *          the {@TaskAttemptContext} for the job
-   * @return an instance of {@link CSVRecordReader} created using configured separator, quote,
and escape characters.
+   * @return an instance of {@link CSVRecordReader} created using configured
+   *         separator, quote, escape, and maximum record size.
    */
   @Override
   public RecordReader<LongWritable, Text> createRecordReader(final InputSplit split,
final TaskAttemptContext context) {
     return new CSVRecordReader(this.bufferSize, this.inputFileEncoding, this.openQuoteChar,
this.closeQuoteChar,
-        this.escapeChar);
+        this.escapeChar, this.maximumRecordSize);
   }
 
   /**
    * A method used by crunch to calculate the splits for each file. This will
-   * split each CSV file at the end of a valid CSV record.
+   * split each CSV file at the end of a valid CSV record. The default split
+   * size is 64mb, but this can be reconfigured by setting the
+   * "csv.inputsplitsize" option in the job configuration.
    * 
    * @param job
    *          the {@link JobContext} for the current job.
@@ -79,10 +83,10 @@ public class CSVInputFormat extends FileInputFormat<LongWritable, Text>
implemen
    */
   @Override
   public List<InputSplit> getSplits(final JobContext job) throws IOException {
-    final long splitSize = job.getConfiguration().getLong("csv.input.split.size", 67108864);
+    final long splitSize = job.getConfiguration().getLong(CSVFileSource.INPUT_SPLIT_SIZE,
67108864);
     final List<InputSplit> splits = new ArrayList<InputSplit>();
     final Path[] paths = FileUtil.stat2Paths(listStatus(job).toArray(new FileStatus[0]));
-    FileSystem fileSystem = FileSystem.get(job.getConfiguration());
+    final FileSystem fileSystem = FileSystem.get(job.getConfiguration());
     FSDataInputStream inputStream = null;
     try {
       for (final Path path : paths) {
@@ -91,7 +95,7 @@ public class CSVInputFormat extends FileInputFormat<LongWritable, Text>
implemen
       }
       return splits;
     } finally {
-      if(inputStream != null) {
+      if (inputStream != null) {
         inputStream.close();
       }
     }
@@ -135,13 +139,13 @@ public class CSVInputFormat extends FileInputFormat<LongWritable,
Text> implemen
       // we need to create a new CSVLineReader around the stream.
       inputStream.seek(currentPosition);
       final CSVLineReader csvLineReader = new CSVLineReader(inputStream, this.bufferSize,
this.inputFileEncoding,
-          this.openQuoteChar, this.closeQuoteChar, this.escapeChar);
+          this.openQuoteChar, this.closeQuoteChar, this.escapeChar, this.maximumRecordSize);
 
       // This line is potentially garbage because we most likely just sought to
       // the middle of a line. Read the rest of the line and leave it for the
       // previous split. Then reset the multi-line CSV record boolean, because
       // the partial line will have a very high chance of falsely triggering the
-      // class wide multi-line logic.
+      // class-wide multi-line logic.
       currentPosition += csvLineReader.readFileLine(new Text());
       csvLineReader.resetMultiLine();
 
@@ -182,6 +186,12 @@ public class CSVInputFormat extends FileInputFormat<LongWritable,
Text> implemen
    * {@link CSVFileSource}'s private getBundle() method
    */
   public void configure() {
+
+    bufferSize = this.configuration.getInt(CSVFileSource.CSV_BUFFER_SIZE, -1);
+    if (bufferSize < 0) {
+      bufferSize = CSVLineReader.DEFAULT_BUFFER_SIZE;
+    }
+
     final String bufferValue = this.configuration.get(CSVFileSource.CSV_BUFFER_SIZE);
     if ("".equals(bufferValue)) {
       bufferSize = CSVLineReader.DEFAULT_BUFFER_SIZE;
@@ -216,5 +226,11 @@ public class CSVInputFormat extends FileInputFormat<LongWritable,
Text> implemen
     } else {
       escapeChar = escapeCharValue.charAt(0);
     }
+
+    maximumRecordSize = this.configuration.getInt(CSVFileSource.MAXIMUM_RECORD_SIZE, -1);
+    if (maximumRecordSize < 0) {
+      maximumRecordSize = this.configuration.getInt(CSVFileSource.INPUT_SPLIT_SIZE,
+          CSVLineReader.DEFAULT_MAXIMUM_RECORD_SIZE);
+    }
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/crunch/blob/191e1146/crunch-core/src/main/java/org/apache/crunch/io/text/csv/CSVLineReader.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/text/csv/CSVLineReader.java b/crunch-core/src/main/java/org/apache/crunch/io/text/csv/CSVLineReader.java
index 83e2abe..79af67d 100644
--- a/crunch-core/src/main/java/org/apache/crunch/io/text/csv/CSVLineReader.java
+++ b/crunch-core/src/main/java/org/apache/crunch/io/text/csv/CSVLineReader.java
@@ -29,6 +29,8 @@ import java.nio.charset.CharsetEncoder;
 import javax.annotation.ParametersAreNonnullByDefault;
 
 import org.apache.hadoop.io.Text;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Preconditions;
 
@@ -38,6 +40,7 @@ import com.google.common.base.Preconditions;
  */
 @ParametersAreNonnullByDefault
 public class CSVLineReader {
+  private static final Logger LOGGER = LoggerFactory.getLogger(CSVLineReader.class);
 
   // InputStream related variables
   /**
@@ -74,6 +77,12 @@ public class CSVLineReader {
    * The default input file encoding to read with, UTF-8
    */
   public static final String DEFAULT_INPUT_FILE_ENCODING = "UTF-8";
+  /**
+   * The default input maximum record size
+   */
+  public static final int DEFAULT_MAXIMUM_RECORD_SIZE = 67108864;
+
+  private final int maximumRecordSize;
   private final char openQuoteChar;
   private final char closeQuoteChar;
   private final char escape;
@@ -96,7 +105,7 @@ public class CSVLineReader {
    */
   public CSVLineReader(final InputStream inputStream) throws UnsupportedEncodingException
{
     this(inputStream, DEFAULT_BUFFER_SIZE, DEFAULT_INPUT_FILE_ENCODING, DEFAULT_QUOTE_CHARACTER,
-        DEFAULT_QUOTE_CHARACTER, DEFAULT_ESCAPE_CHARACTER);
+        DEFAULT_QUOTE_CHARACTER, DEFAULT_ESCAPE_CHARACTER, DEFAULT_MAXIMUM_RECORD_SIZE);
   }
 
   /**
@@ -118,10 +127,13 @@ public class CSVLineReader {
    *          Used to specify a custom close quote character
    * @param escape
    *          Used to specify a custom escape character
+   * @param maximumRecordSize
+   *          The maximum acceptable size of one CSV record. Beyond this limit,
+   *          parsing will stop and an exception will be thrown.
    * @throws UnsupportedEncodingException
    */
   public CSVLineReader(final InputStream inputStream, final int bufferSize, final String
inputFileEncoding,
-      final char openQuoteChar, final char closeQuoteChar, final char escapeChar) {
+      final char openQuoteChar, final char closeQuoteChar, final char escapeChar, final int
maximumRecordSize) {
     Preconditions.checkNotNull(inputStream, "inputStream may not be null");
     Preconditions.checkNotNull(inputFileEncoding, "inputFileEncoding may not be null");
     if (bufferSize <= 0) {
@@ -151,6 +163,7 @@ public class CSVLineReader {
     this.escape = escapeChar;
     this.inputFileEncoding = inputFileEncoding;
     this.charsetEncoder = Charset.forName(inputFileEncoding).newEncoder();
+    this.maximumRecordSize = maximumRecordSize;
   }
 
   /**
@@ -177,23 +190,30 @@ public class CSVLineReader {
       throw new RuntimeException("Cannot begin reading a CSV record while inside of a multi-line
CSV record.");
     }
 
-    inputText.clear();
+    final StringBuilder stringBuilder = new StringBuilder();
     do {
+      // Read a line from the file and add it to the builder
+      inputText.clear();
       totalBytesConsumed += readFileLine(inputText);
-      // a line has been read. We need to see if we're still in quotes and tack
-      // on a newline if so
+      stringBuilder.append(inputText.toString());
+
       if (currentlyInQuotes && !endOfFile) {
-        // Add one LF to mark the line return, otherwise any multi-line CSV
-        // record will all be on one line.
-        inputText.set(new StringBuilder().append(inputText.toString()).append('\n').toString());
+        // If we end up in a multi-line record, we need append a newline
+        stringBuilder.append('\n');
+
+        // Do a check on the total bytes consumed to see if something has gone
+        // wrong.
+        if (totalBytesConsumed > maximumRecordSize || totalBytesConsumed > Integer.MAX_VALUE)
{
+          final String record = stringBuilder.toString();
+          LOGGER.error("Possibly malformed file encountered. First line of record: "
+              + record.substring(0, record.indexOf('\n')));
+          throw new IOException("Possibly malformed file encountered. Check log statements
for more information");
+        }
       }
-    } while (currentlyInQuotes);
+    } while (currentlyInQuotes && !endOfFile);
 
-    if (totalBytesConsumed > Integer.MAX_VALUE) {
-      throw new IOException("Too many bytes consumed before newline: " + Integer.MAX_VALUE);
-    }
-
-    input.set(inputText);
+    // Set the input to the multi-line record
+    input.set(stringBuilder.toString());
     return (int) totalBytesConsumed;
   }
 
@@ -222,8 +242,8 @@ public class CSVLineReader {
     }
 
     // This integer keeps track of the number of newline characters used to
-    // terminate the line being read. This
-    // could be 1, in the case of LF or CR, or 2, in the case of CRLF.
+    // terminate the line being read. This could be 1, in the case of LF or CR,
+    // or 2, in the case of CRLF.
     int newlineLength = 0;
     int inputTextLength = 0;
     long bytesConsumed = 0;
@@ -245,8 +265,7 @@ public class CSVLineReader {
 
       newlineLength = 0;
       // Iterate through the buffer looking for newline characters while keeping
-      // track of if we're in a field
-      // and/or in quotes.
+      // track of if we're in a field and/or in quotes.
       for (; bufferPosition < bufferLength; ++bufferPosition) {
         bytesConsumed += calculateCharacterByteLength(buffer[bufferPosition]);
         if (buffer[bufferPosition] == this.escape) {
@@ -271,8 +290,8 @@ public class CSVLineReader {
           if (lastCharWasCR && buffer[bufferPosition] == LF) {
             lastCharWasCR = false;
             // Check for LF (in case of CRLF line endings) and increment the
-            // counter, skip it by moving the
-            // buffer position, then record the length of the LF.
+            // counter, skip it by moving the buffer position, then record the
+            // length of the LF.
             ++newlineLength;
             ++bufferPosition;
             bytesConsumed += calculateCharacterByteLength(buffer[bufferPosition]);

http://git-wip-us.apache.org/repos/asf/crunch/blob/191e1146/crunch-core/src/main/java/org/apache/crunch/io/text/csv/CSVReadableData.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/text/csv/CSVReadableData.java
b/crunch-core/src/main/java/org/apache/crunch/io/text/csv/CSVReadableData.java
index 63e74d9..b266a08 100644
--- a/crunch-core/src/main/java/org/apache/crunch/io/text/csv/CSVReadableData.java
+++ b/crunch-core/src/main/java/org/apache/crunch/io/text/csv/CSVReadableData.java
@@ -25,11 +25,12 @@ import org.apache.hadoop.fs.Path;
 
 public class CSVReadableData extends ReadableDataImpl<String> {
 
-  private int bufferSize;
-  private String inputFileEncoding;
-  private char openQuoteChar;
-  private char closeQuoteChar;
-  private char escapeChar;
+  private final int bufferSize;
+  private final String inputFileEncoding;
+  private final char openQuoteChar;
+  private final char closeQuoteChar;
+  private final char escapeChar;
+  private final int maximumRecordSize;
 
   /**
    * Creates an instance of {@code CSVReadableData} with default configuration
@@ -37,14 +38,15 @@ public class CSVReadableData extends ReadableDataImpl<String> {
    * @param paths
    *          The paths of the files to be read
    */
-  protected CSVReadableData(List<Path> paths) {
+  protected CSVReadableData(final List<Path> paths) {
     this(paths, CSVLineReader.DEFAULT_BUFFER_SIZE, CSVLineReader.DEFAULT_INPUT_FILE_ENCODING,
         CSVLineReader.DEFAULT_QUOTE_CHARACTER, CSVLineReader.DEFAULT_QUOTE_CHARACTER,
-        CSVLineReader.DEFAULT_ESCAPE_CHARACTER);
+        CSVLineReader.DEFAULT_ESCAPE_CHARACTER, CSVLineReader.DEFAULT_MAXIMUM_RECORD_SIZE);
   }
 
   /**
    * Creates an instance of {@code CSVReadableData} with specified configuration
+   * 
    * @param paths
    *          a list of input file paths
    * @param bufferSize
@@ -57,19 +59,25 @@ public class CSVReadableData extends ReadableDataImpl<String> {
    *          the character to use to close quote blocks
    * @param escape
    *          the character to use for escaping control characters and quotes
+   * @param maximumRecordSize
+   *          The maximum acceptable size of one CSV record. Beyond this limit,
+   *          {@code CSVLineReader} will stop parsing and an exception will be
+   *          thrown.
    */
-  protected CSVReadableData(List<Path> paths, final int bufferSize, final String inputFileEncoding,
-      final char openQuoteChar, final char closeQuoteChar, final char escapeChar) {
+  protected CSVReadableData(final List<Path> paths, final int bufferSize, final String
inputFileEncoding,
+      final char openQuoteChar, final char closeQuoteChar, final char escapeChar, final int
maximumRecordSize) {
     super(paths);
     this.bufferSize = bufferSize;
     this.inputFileEncoding = inputFileEncoding;
     this.openQuoteChar = openQuoteChar;
     this.closeQuoteChar = closeQuoteChar;
     this.escapeChar = escapeChar;
+    this.maximumRecordSize = maximumRecordSize;
   }
 
   @Override
   protected FileReaderFactory<String> getFileReaderFactory() {
-    return new CSVFileReaderFactory(bufferSize, inputFileEncoding, openQuoteChar, closeQuoteChar,
escapeChar);
+    return new CSVFileReaderFactory(bufferSize, inputFileEncoding, openQuoteChar, closeQuoteChar,
escapeChar,
+        maximumRecordSize);
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/crunch/blob/191e1146/crunch-core/src/main/java/org/apache/crunch/io/text/csv/CSVRecordIterator.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/text/csv/CSVRecordIterator.java
b/crunch-core/src/main/java/org/apache/crunch/io/text/csv/CSVRecordIterator.java
index df5e39c..447a6d0 100644
--- a/crunch-core/src/main/java/org/apache/crunch/io/text/csv/CSVRecordIterator.java
+++ b/crunch-core/src/main/java/org/apache/crunch/io/text/csv/CSVRecordIterator.java
@@ -31,7 +31,7 @@ import com.google.common.io.Closeables;
  * An {@code Iterator} for an internally created {@code CSVLineReader}
  */
 public class CSVRecordIterator implements Iterator<String>, Closeable {
-  private CSVLineReader csvLineReader;
+  private final CSVLineReader csvLineReader;
   private InputStream inputStream;
   private String currentLine;
 
@@ -45,7 +45,7 @@ public class CSVRecordIterator implements Iterator<String>, Closeable
{
   public CSVRecordIterator(final InputStream inputStream) throws UnsupportedEncodingException
{
     this(inputStream, CSVLineReader.DEFAULT_BUFFER_SIZE, CSVLineReader.DEFAULT_INPUT_FILE_ENCODING,
         CSVLineReader.DEFAULT_QUOTE_CHARACTER, CSVLineReader.DEFAULT_QUOTE_CHARACTER,
-        CSVLineReader.DEFAULT_ESCAPE_CHARACTER);
+        CSVLineReader.DEFAULT_ESCAPE_CHARACTER, CSVLineReader.DEFAULT_MAXIMUM_RECORD_SIZE);
   }
 
   /**
@@ -63,12 +63,17 @@ public class CSVRecordIterator implements Iterator<String>, Closeable
{
    *          the character to use to close quote blocks
    * @param escape
    *          the character to use for escaping control characters and quotes
+   * @param maximumRecordSize
+   *          The maximum acceptable size of one CSV record. Beyond this limit,
+   *          {@code CSVLineReader} will stop parsing and an exception will be
+   *          thrown.
    * @throws UnsupportedEncodingException
    */
   public CSVRecordIterator(final InputStream inputStream, final int bufferSize, final String
inputFileEncoding,
-      final char openQuoteChar, final char closeQuoteChar, final char escapeChar) throws
UnsupportedEncodingException {
+      final char openQuoteChar, final char closeQuoteChar, final char escapeChar, final int
maximumRecordSize)
+      throws UnsupportedEncodingException {
     csvLineReader = new CSVLineReader(inputStream, bufferSize, inputFileEncoding, openQuoteChar,
closeQuoteChar,
-        escapeChar);
+        escapeChar, maximumRecordSize);
     this.inputStream = inputStream;
     incrementValue();
   }
@@ -84,7 +89,7 @@ public class CSVRecordIterator implements Iterator<String>, Closeable
{
 
   @Override
   public String next() {
-    String result = currentLine;
+    final String result = currentLine;
     incrementValue();
     return result;
   }
@@ -95,13 +100,13 @@ public class CSVRecordIterator implements Iterator<String>, Closeable
{
   }
 
   private void incrementValue() {
-    Text tempText = new Text();
+    final Text tempText = new Text();
     try {
       csvLineReader.readCSVLine(tempText);
-    } catch (IOException e) {
+    } catch (final IOException e) {
       throw new RuntimeException("A problem occurred accessing the underlying CSV file stream.",
e);
     }
-    String tempTextAsString = tempText.toString();
+    final String tempTextAsString = tempText.toString();
     if ("".equals(tempTextAsString)) {
       currentLine = null;
     } else {

http://git-wip-us.apache.org/repos/asf/crunch/blob/191e1146/crunch-core/src/main/java/org/apache/crunch/io/text/csv/CSVRecordReader.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/text/csv/CSVRecordReader.java
b/crunch-core/src/main/java/org/apache/crunch/io/text/csv/CSVRecordReader.java
index d04da98..192a018 100644
--- a/crunch-core/src/main/java/org/apache/crunch/io/text/csv/CSVRecordReader.java
+++ b/crunch-core/src/main/java/org/apache/crunch/io/text/csv/CSVRecordReader.java
@@ -51,6 +51,7 @@ public class CSVRecordReader extends RecordReader<LongWritable, Text>
{
   private final char escape;
   private final String inputFileEncoding;
   private final int fileStreamBufferSize;
+  private final int maximumRecordSize;
 
   private int totalRecordsRead = 0;
 
@@ -60,7 +61,7 @@ public class CSVRecordReader extends RecordReader<LongWritable, Text>
{
   public CSVRecordReader() {
     this(CSVLineReader.DEFAULT_BUFFER_SIZE, CSVLineReader.DEFAULT_INPUT_FILE_ENCODING,
         CSVLineReader.DEFAULT_QUOTE_CHARACTER, CSVLineReader.DEFAULT_QUOTE_CHARACTER,
-        CSVLineReader.DEFAULT_ESCAPE_CHARACTER);
+        CSVLineReader.DEFAULT_ESCAPE_CHARACTER, CSVLineReader.DEFAULT_MAXIMUM_RECORD_SIZE);
   }
 
   /**
@@ -77,9 +78,13 @@ public class CSVRecordReader extends RecordReader<LongWritable, Text>
{
    *          the character to use to close quote blocks
    * @param escape
    *          the character to use for escaping control characters and quotes
+   * @param maximumRecordSize
+   *          The maximum acceptable size of one CSV record. Beyond this limit,
+   *          {@code CSVLineReader} will stop parsing and an exception will be
+   *          thrown.
    */
   public CSVRecordReader(final int bufferSize, final String inputFileEncoding, final char
openQuote,
-      final char closeQuote, final char escape) {
+      final char closeQuote, final char escape, final int maximumRecordSize) {
     Preconditions.checkNotNull(openQuote, "quote cannot be null.");
     Preconditions.checkNotNull(closeQuote, "quote cannot be null.");
     Preconditions.checkNotNull(escape, "escape cannot be null.");
@@ -89,6 +94,7 @@ public class CSVRecordReader extends RecordReader<LongWritable, Text>
{
     this.openQuote = openQuote;
     this.closeQuote = closeQuote;
     this.escape = escape;
+    this.maximumRecordSize = maximumRecordSize;
   }
 
   /**
@@ -116,12 +122,12 @@ public class CSVRecordReader extends RecordReader<LongWritable, Text>
{
     LOGGER.info("Split starts at: " + start);
     LOGGER.info("Split will end at: " + end);
 
-    // Open the file, seek to the start of the split, then wrap it in a
-    // CSVLineReader
+    // Open the file, seek to the start of the split
+    // then wrap it in a CSVLineReader
     fileIn = file.getFileSystem(job).open(file);
     fileIn.seek(start);
     csvLineReader = new CSVLineReader(fileIn, this.fileStreamBufferSize, inputFileEncoding,
this.openQuote,
-        this.closeQuote, this.escape);
+        this.closeQuote, this.escape, this.maximumRecordSize);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/crunch/blob/191e1146/crunch-core/src/test/java/org/apache/crunch/io/text/csv/CSVLineReaderTest.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/test/java/org/apache/crunch/io/text/csv/CSVLineReaderTest.java
b/crunch-core/src/test/java/org/apache/crunch/io/text/csv/CSVLineReaderTest.java
index 55f2f1f..2c51c41 100644
--- a/crunch-core/src/test/java/org/apache/crunch/io/text/csv/CSVLineReaderTest.java
+++ b/crunch-core/src/test/java/org/apache/crunch/io/text/csv/CSVLineReaderTest.java
@@ -17,7 +17,7 @@
  */
 package org.apache.crunch.io.text.csv;
 
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
 
 import java.io.File;
 import java.io.FileInputStream;
@@ -36,7 +36,7 @@ public class CSVLineReaderTest {
   @Test
   public void testVariousUTF8Characters() throws IOException {
     final String variousCharacters = "€Abبώиב¥£€¢₡₢₣₤₥₦§₧₨₩₪₫₭₮漢Ä©óíßä";
-    String utf8Junk = tmpDir.copyResourceFileName("UTF8.csv");
+    final String utf8Junk = tmpDir.copyResourceFileName("UTF8.csv");
     FileInputStream fileInputStream = null;
     try {
 
@@ -60,12 +60,32 @@ public class CSVLineReaderTest {
   public void testBrokenLineParsingInChinese() throws IOException {
     final String[] expectedChineseLines = { "您好我叫马克,我从亚拉巴马州来,我是软件工程师,我二十八岁",
"我有一个宠物,它是一个小猫,它六岁,它很漂亮",
         "我喜欢吃饭,“我觉得这个饭最好\n*蛋糕\n*包子\n*冰淇淋\n*啤酒“,他们都很好,我也很喜欢奶酪但它是不健康的",
"我是男的,我的头发很短,我穿蓝色的裤子,“我穿黑色的、“衣服”"
};
-    String chineseLines = tmpDir.copyResourceFileName("brokenChineseLines.csv");
+    final String chineseLines = tmpDir.copyResourceFileName("brokenChineseLines.csv");
     FileInputStream fileInputStream = null;
     try {
       fileInputStream = new FileInputStream(new File(chineseLines));
       final CSVLineReader csvLineReader = new CSVLineReader(fileInputStream, CSVLineReader.DEFAULT_BUFFER_SIZE,
-          CSVLineReader.DEFAULT_INPUT_FILE_ENCODING, '“', '”', '、');
+          CSVLineReader.DEFAULT_INPUT_FILE_ENCODING, '“', '”', '、', CSVLineReader.DEFAULT_MAXIMUM_RECORD_SIZE);
+      for (int i = 0; i < expectedChineseLines.length; i++) {
+        final Text readText = new Text();
+        csvLineReader.readCSVLine(readText);
+        assertEquals(expectedChineseLines[i], readText.toString());
+      }
+    } finally {
+      fileInputStream.close();
+    }
+  }
+
+  @Test(expected = IOException.class)
+  public void testMalformedBrokenLineParsingInChinese() throws IOException {
+    final String[] expectedChineseLines = { "您好我叫马克,我从亚拉巴马州来,我是软件工程师,我二十八岁",
"我有一个宠物,它是一个小猫,它六岁,它很漂亮",
+        "我喜欢吃饭,“我觉得这个饭最好\n*蛋糕\n*包子\n*冰淇淋\n*啤酒“,他们都很好,我也很喜欢奶酪但它是不健康的",
"我是男的,我的头发很短,我穿蓝色的裤子,“我穿黑色的、“衣服”"
};
+    final String chineseLines = tmpDir.copyResourceFileName("brokenChineseLines.csv");
+    FileInputStream fileInputStream = null;
+    try {
+      fileInputStream = new FileInputStream(new File(chineseLines));
+      final CSVLineReader csvLineReader = new CSVLineReader(fileInputStream, CSVLineReader.DEFAULT_BUFFER_SIZE,
+          CSVLineReader.DEFAULT_INPUT_FILE_ENCODING, '“', '”', '、', 5);
       for (int i = 0; i < expectedChineseLines.length; i++) {
         final Text readText = new Text();
         csvLineReader.readCSVLine(readText);


Mime
View raw message