hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From d...@apache.org
Subject svn commit: r644741 - in /hadoop/core/trunk: ./ src/contrib/streaming/src/java/org/apache/hadoop/streaming/ src/contrib/streaming/src/test/org/apache/hadoop/streaming/ src/java/org/apache/hadoop/mapred/ src/test/org/apache/hadoop/mapred/
Date Fri, 04 Apr 2008 15:50:55 GMT
Author: ddas
Date: Fri Apr  4 08:50:52 2008
New Revision: 644741

URL: http://svn.apache.org/viewvc?rev=644741&view=rev
Log:
HADOOP-2826. Deprecated FileSplit.getFile(), LineRecordReader.readLine(). Contributed by Amareshwari
Sriramadasu.

Modified:
    hadoop/core/trunk/CHANGES.txt
    hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java
    hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamBaseRecordReader.java
    hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/UTF8ByteArrayUtils.java
    hadoop/core/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamedMerge.java
    hadoop/core/trunk/src/java/org/apache/hadoop/mapred/FileSplit.java
    hadoop/core/trunk/src/java/org/apache/hadoop/mapred/LineRecordReader.java
    hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestKeyValueTextInputFormat.java

Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=644741&r1=644740&r2=644741&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Fri Apr  4 08:50:52 2008
@@ -83,6 +83,9 @@
     availability zone as the cluster.  Ganglia monitoring and large
     instance sizes have also been added.  (Chris K Wensel via tomwhite)
 
+    HADOOP-2826. Deprecated FileSplit.getFile(), LineRecordReader.readLine().
+    (Amareshwari Sriramadasu via ddas)
+
   NEW FEATURES
 
     HADOOP-1398.  Add HBase in-memory block cache.  (tomwhite)

Modified: hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java?rev=644741&r1=644740&r2=644741&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java
(original)
+++ hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java
Fri Apr  4 08:50:52 2008
@@ -36,11 +36,11 @@
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.hadoop.mapred.TaskLog;
+import org.apache.hadoop.mapred.LineRecordReader.LineReader;
 import org.apache.hadoop.util.StringUtils;
 
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.Writable;
 
 import org.apache.hadoop.fs.FileSystem;
 
@@ -354,14 +354,16 @@
    * @param val: value of a record
    * @throws IOException
    */
-  void splitKeyVal(byte[] line, Text key, Text val) throws IOException {
-    int pos = UTF8ByteArrayUtils.findNthByte(line, (byte)this.getFieldSeparator(), this.getNumOfKeyFields());
+  void splitKeyVal(byte[] line, int length, Text key, Text val)
+  throws IOException {
+    int pos = UTF8ByteArrayUtils.findNthByte(line, 0, length,
+                (byte)this.getFieldSeparator(), this.getNumOfKeyFields());
     try {
       if (pos == -1) {
-        key.set(line);
+        key.set(line, 0, length);
         val.set("");
       } else {
-        UTF8ByteArrayUtils.splitKeyVal(line, key, val, pos);
+        UTF8ByteArrayUtils.splitKeyVal(line, 0, length, key, val, pos);
       }
     } catch (CharacterCodingException e) {
       LOG.warn(StringUtils.stringifyException(e));
@@ -377,15 +379,18 @@
     }
 
     public void run() {
+      LineReader lineReader = null;
       try {
         Text key = new Text();
         Text val = new Text();
+        Text line = new Text();
+        lineReader = new LineReader((InputStream)clientIn_, job_);
         // 3/4 Tool to Hadoop
-        while ((answer = UTF8ByteArrayUtils.readLine((InputStream) clientIn_)) != null) {
-          
-          splitKeyVal(answer, key, val);
+        while (lineReader.readLine(line) > 0) {
+          answer = line.getBytes();
+          splitKeyVal(answer, line.getLength(), key, val);
           output.collect(key, val);
-          
+          line.clear();
           numRecWritten_++;
           long now = System.currentTimeMillis();
           if (now-lastStdoutReport > reporterOutDelay_) {
@@ -396,6 +401,9 @@
             logflush();
           }
         }
+        if (lineReader != null) {
+          lineReader.close();
+        }
         if (clientIn_ != null) {
           clientIn_.close();
           clientIn_ = null;
@@ -405,6 +413,9 @@
         outerrThreadsThrowable = th;
         LOG.warn(StringUtils.stringifyException(th));
         try {
+          if (lineReader != null) {
+            lineReader.close();
+          }
           if (clientIn_ != null) {
             clientIn_.close();
             clientIn_ = null;
@@ -433,18 +444,23 @@
     }
       
     public void run() {
-      byte[] line;
+      Text line = new Text();
+      LineReader lineReader = null;
       try {
-        while ((line = UTF8ByteArrayUtils.readLine((InputStream) clientErr_)) != null) {
-          String lineStr = new String(line, "UTF-8");
-          System.err.println(lineStr);
+        lineReader = new LineReader((InputStream)clientErr_, job_);
+        while (lineReader.readLine(line) > 0) {
+          System.err.println(line.toString());
           long now = System.currentTimeMillis(); 
           if (reporter != null && now-lastStderrReport > reporterErrDelay_) {
             lastStderrReport = now;
             reporter.progress();
           }
+          line.clear();
         }
-	if (clientErr_ != null) {
+        if (lineReader != null) {
+          lineReader.close();
+        }
+        if (clientErr_ != null) {
           clientErr_.close();
           clientErr_ = null;
           LOG.info("MRErrorThread done");
@@ -453,6 +469,9 @@
         outerrThreadsThrowable = th;
         LOG.warn(StringUtils.stringifyException(th));
         try {
+          if (lineReader != null) {
+            lineReader.close();
+          }
           if (clientErr_ != null) {
             clientErr_.close();
             clientErr_ = null;

Modified: hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamBaseRecordReader.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamBaseRecordReader.java?rev=644741&r1=644740&r2=644741&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamBaseRecordReader.java
(original)
+++ hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamBaseRecordReader.java
Fri Apr  4 08:50:52 2008
@@ -52,7 +52,7 @@
     start_ = split_.getStart();
     length_ = split_.getLength();
     end_ = start_ + length_;
-    splitName_ = split_.getFile().getName();
+    splitName_ = split_.getPath().getName();
     reporter_ = reporter;
     job_ = job;
     fs_ = fs;
@@ -128,8 +128,8 @@
     } else {
       recStr = record.toString();
     }
-    String unqualSplit = split_.getFile().getName() + ":" + split_.getStart() + "+"
-      + split_.getLength();
+    String unqualSplit = split_.getPath().getName() + ":" +
+                         split_.getStart() + "+" + split_.getLength();
     String status = "HSTR " + StreamUtil.HOST + " " + numRec_ + ". pos=" + pos + " " + unqualSplit
       + " Processing record=" + recStr;
     status += " " + splitName_;

Modified: hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/UTF8ByteArrayUtils.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/UTF8ByteArrayUtils.java?rev=644741&r1=644740&r2=644741&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/UTF8ByteArrayUtils.java
(original)
+++ hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/UTF8ByteArrayUtils.java
Fri Apr  4 08:50:52 2008
@@ -18,12 +18,10 @@
 
 package org.apache.hadoop.streaming;
 
-import java.io.ByteArrayOutputStream;
 import java.io.IOException;
-import java.io.InputStream;
 
 import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.LineRecordReader;
+import org.apache.hadoop.mapred.LineRecordReader.LineReader;
 
 /**
  * General utils for byte array containing UTF-8 encoded strings
@@ -67,12 +65,12 @@
    * Find the nth occurrence of the given byte b in a UTF-8 encoded string
    * @param utf a byte array containing a UTF-8 encoded string
    * @param start starting offset
-   * @param end ending position
+   * @param length the length of byte array
    * @param b the byte to find
    * @param n the desired occurrence of the given byte
    * @return position that nth occurrence of the given byte if exists; otherwise -1
    */
-  private static int findNthByte(byte [] utf, int start, int length, byte b, int n) {
+  public static int findNthByte(byte [] utf, int start, int length, byte b, int n) {
     int pos = -1;
     int nextStart = start;
     for (int i = 0; i < n; i++) {
@@ -148,16 +146,14 @@
     
   /**
    * Read a utf8 encoded line from a data input stream. 
-   * @param in data input stream
-   * @return a byte array containing the line 
+   * @param lineReader LineReader to read the line from.
+   * @param out Text to read into
+   * @return number of bytes read 
    * @throws IOException
    */
-  public static byte [] readLine(InputStream in) throws IOException {
-    ByteArrayOutputStream baos = new ByteArrayOutputStream();
-    long bytes = LineRecordReader.readLine(in, baos);
-    baos.close();
-    if (bytes <= 0)
-      return null;
-    return baos.toByteArray();
+  public static int readLine(LineReader lineReader, Text out) 
+  throws IOException {
+    out.clear();
+    return lineReader.readLine(out);
   }
 }

Modified: hadoop/core/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamedMerge.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamedMerge.java?rev=644741&r1=644740&r2=644741&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamedMerge.java
(original)
+++ hadoop/core/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamedMerge.java
Fri Apr  4 08:50:52 2008
@@ -35,6 +35,8 @@
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FsShell;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.LineRecordReader.LineReader;
 import org.apache.hadoop.util.ToolRunner;
 
 /**
@@ -190,14 +192,14 @@
     public void run() {
       try {
         in_ = connectInputStream();
-        while (true) {
-          byte[] b = UTF8ByteArrayUtils.readLine(in_);
-          if (b == null) {
-            break;
-          }
-          buf_.append(new String(b, "UTF-8"));
+        LineReader lineReader = new LineReader((InputStream)in_, conf_);
+        Text line = new Text();
+        while (lineReader.readLine(line) > 0) {
+          buf_.append(line.toString());
           buf_.append('\n');
+          line.clear();
         }
+        lineReader.close();
         in_.close();
       } catch (IOException io) {
         throw new RuntimeException(io);

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/mapred/FileSplit.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/mapred/FileSplit.java?rev=644741&r1=644740&r2=644741&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/mapred/FileSplit.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/mapred/FileSplit.java Fri Apr  4 08:50:52
2008
@@ -63,9 +63,6 @@
     this.hosts = hosts;
   }
 
-  /** @deprecated Call {@link #getPath()} instead. */
-  public File getFile() { return new File(file.toString()); }
-  
   /** The file containing this split's data. */
   public Path getPath() { return file; }
   

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/mapred/LineRecordReader.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/mapred/LineRecordReader.java?rev=644741&r1=644740&r2=644741&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/mapred/LineRecordReader.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/mapred/LineRecordReader.java Fri Apr  4 08:50:52
2008
@@ -74,7 +74,7 @@
      * @param conf configuration
      * @throws IOException
      */
-    LineReader(InputStream in, Configuration conf) throws IOException {
+    public LineReader(InputStream in, Configuration conf) throws IOException {
       this(in, conf.getInt("io.file.buffer.size", DEFAULT_BUFFER_SIZE));
     }
 
@@ -220,43 +220,6 @@
     return false;
   }
 
-  /**
-   * @deprecated
-   */
-  public static long readLine(InputStream in, 
-                              OutputStream out) throws IOException {
-    long bytes = 0;
-    while (true) {
-      
-      int b = in.read();
-      if (b == -1) {
-        break;
-      }
-      bytes += 1;
-      
-      byte c = (byte)b;
-      if (c == '\n') {
-        break;
-      }
-      
-      if (c == '\r') {
-        in.mark(1);
-        byte nextC = (byte)in.read();
-        if (nextC != '\n') {
-          in.reset();
-        } else {
-          bytes += 1;
-        }
-        break;
-      }
-      
-      if (out != null) {
-        out.write(c);
-      }
-    }
-    return bytes;
-  }
-  
   /**
    * Get the progress within the split
    */

Modified: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestKeyValueTextInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestKeyValueTextInputFormat.java?rev=644741&r1=644740&r2=644741&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestKeyValueTextInputFormat.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestKeyValueTextInputFormat.java Fri
Apr  4 08:50:52 2008
@@ -26,6 +26,7 @@
 import org.apache.hadoop.fs.*;
 import org.apache.hadoop.io.*;
 import org.apache.hadoop.io.compress.*;
+import org.apache.hadoop.mapred.LineRecordReader.LineReader;
 import org.apache.hadoop.util.ReflectionUtils;
 
 public class TestKeyValueTextInputFormat extends TestCase {
@@ -128,48 +129,39 @@
 
     }
   }
-
-  private InputStream makeStream(String str) throws IOException {
-    Text text = new Text(str);
-    return new ByteArrayInputStream(text.getBytes(), 0, text.getLength());
+  private LineReader makeStream(String str) throws IOException {
+    return new LineRecordReader.LineReader(new ByteArrayInputStream
+                                           (str.getBytes("UTF-8")), 
+                                           defaultConf);
   }
   
   public void testUTF8() throws Exception {
-    InputStream in = makeStream("abcd\u20acbdcd\u20ac");
-    ByteArrayOutputStream out = new ByteArrayOutputStream();
-    LineRecordReader.readLine(in, out);
+    LineReader in = makeStream("abcd\u20acbdcd\u20ac");
     Text line = new Text();
-    line.set(out.toByteArray());
+    in.readLine(line);
     assertEquals("readLine changed utf8 characters", 
                  "abcd\u20acbdcd\u20ac", line.toString());
     in = makeStream("abc\u200axyz");
-    out.reset();
-    LineRecordReader.readLine(in, out);
-    line.set(out.toByteArray());
+    in.readLine(line);
     assertEquals("split on fake newline", "abc\u200axyz", line.toString());
   }
 
   public void testNewLines() throws Exception {
-    InputStream in = makeStream("a\nbb\n\nccc\rdddd\r\neeeee");
-    ByteArrayOutputStream out = new ByteArrayOutputStream();
-    LineRecordReader.readLine(in, out);
-    assertEquals("line1 length", 1, out.size());
-    out.reset();
-    LineRecordReader.readLine(in, out);
-    assertEquals("line2 length", 2, out.size());
-    out.reset();
-    LineRecordReader.readLine(in, out);
-    assertEquals("line3 length", 0, out.size());
-    out.reset();
-    LineRecordReader.readLine(in, out);
-    assertEquals("line4 length", 3, out.size());
-    out.reset();
-    LineRecordReader.readLine(in, out);
-    assertEquals("line5 length", 4, out.size());
-    out.reset();
-    LineRecordReader.readLine(in, out);
-    assertEquals("line5 length", 5, out.size());
-    assertEquals("end of file", 0, LineRecordReader.readLine(in, out));
+    LineReader in = makeStream("a\nbb\n\nccc\rdddd\r\neeeee");
+    Text out = new Text();
+    in.readLine(out);
+    assertEquals("line1 length", 1, out.getLength());
+    in.readLine(out);
+    assertEquals("line2 length", 2, out.getLength());
+    in.readLine(out);
+    assertEquals("line3 length", 0, out.getLength());
+    in.readLine(out);
+    assertEquals("line4 length", 3, out.getLength());
+    in.readLine(out);
+    assertEquals("line5 length", 4, out.getLength());
+    in.readLine(out);
+    assertEquals("line5 length", 5, out.getLength());
+    assertEquals("end of file", 0, in.readLine(out));
   }
   
   private static void writeFile(FileSystem fs, Path name, 



Mime
View raw message