hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From t...@apache.org
Subject svn commit: r1068729 - in /hadoop/common/trunk: CHANGES.txt src/java/org/apache/hadoop/util/LineReader.java
Date Wed, 09 Feb 2011 01:41:08 GMT
Author: todd
Date: Wed Feb  9 01:41:08 2011
New Revision: 1068729

URL: http://svn.apache.org/viewvc?rev=1068729&view=rev
Log:
HADOOP-7096. Allow setting of end-of-record delimiter for TextInputFormat. Contributed by
Ahmed Radwan.

Modified:
    hadoop/common/trunk/CHANGES.txt
    hadoop/common/trunk/src/java/org/apache/hadoop/util/LineReader.java

Modified: hadoop/common/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/CHANGES.txt?rev=1068729&r1=1068728&r2=1068729&view=diff
==============================================================================
--- hadoop/common/trunk/CHANGES.txt (original)
+++ hadoop/common/trunk/CHANGES.txt Wed Feb  9 01:41:08 2011
@@ -13,6 +13,9 @@ Trunk (unreleased changes)
     HADOOP-7023. Add listCorruptFileBlocks to Filesysem. (Patrick Kling
     via hairong)
 
+    HADOOP-7096. Allow setting of end-of-record delimiter for TextInputFormat
+    (Ahmed Radwan via todd)
+
   IMPROVEMENTS
 
     HADOOP-7042. Updates to test-patch.sh to include failed test names and

Modified: hadoop/common/trunk/src/java/org/apache/hadoop/util/LineReader.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/src/java/org/apache/hadoop/util/LineReader.java?rev=1068729&r1=1068728&r2=1068729&view=diff
==============================================================================
--- hadoop/common/trunk/src/java/org/apache/hadoop/util/LineReader.java (original)
+++ hadoop/common/trunk/src/java/org/apache/hadoop/util/LineReader.java Wed Feb  9 01:41:08
2011
@@ -28,6 +28,14 @@ import org.apache.hadoop.io.Text;
 
 /**
  * A class that provides a line reader from an input stream.
+ * Depending on the constructor used, lines will either be terminated by:
+ * <ul>
+ * <li>one of the following: '\n' (LF) , '\r' (CR),
+ * or '\r\n' (CR+LF).</li>
+ * <li><em>or</em>, a custom byte sequence delimiter</li>
+ * </ul>
+ * In both cases, EOF also terminates an otherwise unterminated
+ * line.
  */
 @InterfaceAudience.LimitedPrivate({"MapReduce"})
 @InterfaceStability.Unstable
@@ -44,6 +52,9 @@ public class LineReader {
   private static final byte CR = '\r';
   private static final byte LF = '\n';
 
+  // The line delimiter
+  private final byte[] recordDelimiterBytes;
+
   /**
    * Create a line reader that reads from the given stream using the
    * default buffer-size (64k).
@@ -65,6 +76,7 @@ public class LineReader {
     this.in = in;
     this.bufferSize = bufferSize;
     this.buffer = new byte[this.bufferSize];
+    this.recordDelimiterBytes = null;
   }
 
   /**
@@ -80,6 +92,56 @@ public class LineReader {
   }
 
   /**
+   * Create a line reader that reads from the given stream using the
+   * default buffer-size, and using a custom delimiter of array of
+   * bytes.
+   * @param in The input stream
+   * @param recordDelimiterBytes The delimiter
+   */
+  public LineReader(InputStream in, byte[] recordDelimiterBytes) {
+    this.in = in;
+    this.bufferSize = DEFAULT_BUFFER_SIZE;
+    this.buffer = new byte[this.bufferSize];
+    this.recordDelimiterBytes = recordDelimiterBytes;
+  }
+
+  /**
+   * Create a line reader that reads from the given stream using the
+   * given buffer-size, and using a custom delimiter of array of
+   * bytes.
+   * @param in The input stream
+   * @param bufferSize Size of the read buffer
+   * @param recordDelimiterBytes The delimiter
+   * @throws IOException
+   */
+  public LineReader(InputStream in, int bufferSize,
+      byte[] recordDelimiterBytes) {
+    this.in = in;
+    this.bufferSize = bufferSize;
+    this.buffer = new byte[this.bufferSize];
+    this.recordDelimiterBytes = recordDelimiterBytes;
+  }
+
+  /**
+   * Create a line reader that reads from the given stream using the
+   * <code>io.file.buffer.size</code> specified in the given
+   * <code>Configuration</code>, and using a custom delimiter of array of
+   * bytes.
+   * @param in input stream
+   * @param conf configuration
+   * @param recordDelimiterBytes The delimiter
+   * @throws IOException
+   */
+  public LineReader(InputStream in, Configuration conf,
+      byte[] recordDelimiterBytes) throws IOException {
+    this.in = in;
+    this.bufferSize = conf.getInt("io.file.buffer.size", DEFAULT_BUFFER_SIZE);
+    this.buffer = new byte[this.bufferSize];
+    this.recordDelimiterBytes = recordDelimiterBytes;
+  }
+
+
+  /**
    * Close the underlying stream.
    * @throws IOException
    */
@@ -88,10 +150,7 @@ public class LineReader {
   }
   
   /**
-   * Read one line from the InputStream into the given Text.  A line
-   * can be terminated by one of the following: '\n' (LF) , '\r' (CR),
-   * or '\r\n' (CR+LF).  EOF also terminates an otherwise unterminated
-   * line.
+   * Read one line from the InputStream into the given Text.
    *
    * @param str the object to store the given line (without newline)
    * @param maxLineLength the maximum number of bytes to store into str;
@@ -108,6 +167,18 @@ public class LineReader {
    */
   public int readLine(Text str, int maxLineLength,
                       int maxBytesToConsume) throws IOException {
+    if (this.recordDelimiterBytes != null) {
+      return readCustomLine(str, maxLineLength, maxBytesToConsume);
+    } else {
+      return readDefaultLine(str, maxLineLength, maxBytesToConsume);
+    }
+  }
+
+  /**
+   * Read a line terminated by one of CR, LF, or CRLF.
+   */
+  private int readDefaultLine(Text str, int maxLineLength, int maxBytesToConsume)
+  throws IOException {
     /* We're reading data from in, but the head of the stream may be
      * already buffered in buffer, so we have several cases:
      * 1. No newline characters are in the buffer, so we need to copy
@@ -171,6 +242,52 @@ public class LineReader {
   }
 
   /**
+   * Read a line terminated by a custom delimiter.
+   */
+  private int readCustomLine(Text str, int maxLineLength, int maxBytesToConsume)
+      throws IOException {
+    str.clear();
+    int txtLength = 0; // tracks str.getLength(), as an optimization
+    long bytesConsumed = 0;
+    int delPosn = 0;
+    do {
+      int startPosn = bufferPosn; // starting from where we left off the last
+      // time
+      if (bufferPosn >= bufferLength) {
+        startPosn = bufferPosn = 0;
+        bufferLength = in.read(buffer);
+        if (bufferLength <= 0)
+          break; // EOF
+      }
+      for (; bufferPosn < bufferLength; ++bufferPosn) {
+        if (buffer[bufferPosn] == recordDelimiterBytes[delPosn]) {
+          delPosn++;
+          if (delPosn >= recordDelimiterBytes.length) {
+            bufferPosn++;
+            break;
+          }
+        } else {
+          delPosn = 0;
+        }
+      }
+      int readLength = bufferPosn - startPosn;
+      bytesConsumed += readLength;
+      int appendLength = readLength - delPosn;
+      if (appendLength > maxLineLength - txtLength) {
+        appendLength = maxLineLength - txtLength;
+      }
+      if (appendLength > 0) {
+        str.append(buffer, startPosn, appendLength);
+        txtLength += appendLength;
+      }
+    } while (delPosn < recordDelimiterBytes.length
+        && bytesConsumed < maxBytesToConsume);
+    if (bytesConsumed > (long) Integer.MAX_VALUE)
+      throw new IOException("Too many bytes before delimiter: " + bytesConsumed);
+    return (int) bytesConsumed;
+  }
+
+  /**
    * Read from the InputStream into the given Text.
    * @param str the object to store the given line
    * @param maxLineLength the maximum number of bytes to store into str.



Mime
View raw message