hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cutt...@apache.org
Subject svn commit: r511039 - in /lucene/hadoop/trunk: ./ src/contrib/streaming/src/java/org/apache/hadoop/streaming/
Date Fri, 23 Feb 2007 17:57:02 GMT
Author: cutting
Date: Fri Feb 23 09:57:01 2007
New Revision: 511039

URL: http://svn.apache.org/viewvc?view=rev&rev=511039
Log:
HADOOP-1029.  Fix streaming's input format to correctly seek to the start of splits.  Contributed
by Arun.

Modified:
    lucene/hadoop/trunk/CHANGES.txt
    lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamInputFormat.java
    lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamLineRecordReader.java
    lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/UTF8ByteArrayUtils.java

Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=511039&r1=511038&r2=511039
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Fri Feb 23 09:57:01 2007
@@ -119,6 +119,9 @@
 35. HADOOP-248.  Optimize location of map outputs to not use random
     probes.  (Devaraj Das via cutting)
 
+36. HADOOP-1029.  Fix streaming's input format to correctly seek to
+    the start of splits.  (Arun C Murthy via cutting)
+
 
 Release 0.11.2 - 2007-02-16
 

Modified: lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamInputFormat.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamInputFormat.java?view=diff&rev=511039&r1=511038&r2=511039
==============================================================================
--- lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamInputFormat.java
(original)
+++ lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamInputFormat.java
Fri Feb 23 09:57:01 2007
@@ -72,16 +72,27 @@
                                       JobConf job,
                                       Reporter reporter) throws IOException {
     FileSplit split = (FileSplit) genericSplit;
-    FileSystem fs = split.getPath().getFileSystem(job);
     LOG.info("getRecordReader start.....split=" + split);
     reporter.setStatus(split.toString());
 
-    final long start = split.getStart();
-    final long end = start + split.getLength();
-
-    FSDataInputStream in = fs.open(split.getPath());
+    long start = split.getStart();
+    long length  = split.getLength();
     
-    // will open the file and seek to the start of the split
+    // Open the file and seek to the start of the split
+    FileSystem fs = split.getPath().getFileSystem(job);
+    FSDataInputStream in = fs.open(split.getPath());
+    if (isGzippedInput(job)) {
+      length = Long.MAX_VALUE;
+    } else if (start != 0) {
+      in.seek(start-1);
+      LineRecordReader.readLine(in, null);
+      long oldStart = start;
+      start = in.getPos();
+      length -= (start - oldStart); 
+    }
+    // Ugly hack! 
+    split = new FileSplit(split.getPath(), start, length, job);
+
     // Factory dispatch based on available params..
     Class readerClass;
     String c = job.get("stream.recordreader.class");

Modified: lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamLineRecordReader.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamLineRecordReader.java?view=diff&rev=511039&r1=511038&r2=511039
==============================================================================
--- lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamLineRecordReader.java
(original)
+++ lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamLineRecordReader.java
Fri Feb 23 09:57:01 2007
@@ -55,7 +55,7 @@
       Reporter reporter,
       JobConf job, FileSystem fs) throws IOException {
     super(createStream(in, job), split.getStart(), 
-        split.getStart() + split.getLength());
+            (split.getStart() + split.getLength()));
     this.split = split ; 
     this.reporter = reporter ; 
   }
@@ -92,21 +92,23 @@
 
     Text tKey = (Text) key;
     Text tValue = (Text) value;
-    byte[] line = null ; 
+    byte[] line = null ;
+    int lineLen = -1;
     if( super.next(dummyKey, innerValue) ){
-      line = innerValue.getBytes(); 
+      line = innerValue.getBytes();
+      lineLen = innerValue.getLength();
     }else{
       return false;
     }
     if (line == null) return false;
-    int tab = UTF8ByteArrayUtils.findTab(line);
+    int tab = UTF8ByteArrayUtils.findTab(line, 0, lineLen);
     if (tab == -1) {
-      tKey.set(line);
+      tKey.set(line, 0, lineLen);
       tValue.set("");
     } else {
-      UTF8ByteArrayUtils.splitKeyVal(line, tKey, tValue, tab);
+      UTF8ByteArrayUtils.splitKeyVal(line, 0, lineLen, tKey, tValue, tab);
     }
-    numRecStats(line, 0, line.length);
+    numRecStats(line, 0, lineLen);
     return true;
   }
   

Modified: lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/UTF8ByteArrayUtils.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/UTF8ByteArrayUtils.java?view=diff&rev=511039&r1=511038&r2=511039
==============================================================================
--- lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/UTF8ByteArrayUtils.java
(original)
+++ lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/UTF8ByteArrayUtils.java
Fri Feb 23 09:57:01 2007
@@ -33,38 +33,67 @@
     /**
      * Find the first occured tab in a UTF-8 encoded string
      * @param utf a byte array containing a UTF-8 encoded string
+     * @param start starting offset
+     * @param length no. of bytes
      * @return position that first tab occures otherwise -1
      */
-    public static int findTab(byte [] utf) {
-        for(int i=0; i<utf.length; i++) {
+    public static int findTab(byte [] utf, int start, int length) {
+        for(int i=start; i<(start+length); i++) {
             if(utf[i]==(byte)'\t') {
                 return i;
             }
-          }
-          return -1;      
+        }
+        return -1;      
     }
     
     /**
+     * Find the first occured tab in a UTF-8 encoded string
+     * @param utf a byte array containing a UTF-8 encoded string
+     * @return position that first tab occures otherwise -1
+     */
+    public static int findTab(byte [] utf) {
+      return findTab(utf, 0, utf.length);
+    }
+
+    /**
      * split a UTF-8 byte array into key and value 
      * assuming that the delimilator is at splitpos. 
      * @param utf utf-8 encoded string
+     * @param start starting offset
+     * @param length no. of bytes
      * @param key contains key upon the method is returned
      * @param val contains value upon the method is returned
      * @param splitPos the split pos
      * @throws IOException
      */
-    public static void splitKeyVal(byte[] utf, Text key, Text val, int splitPos) 
-    throws IOException {
-        if(splitPos<0 || splitPos >= utf.length)
-            throw new IllegalArgumentException(
-                    "splitPos must be in the range [0, "+splitPos+"]: " +splitPos);
-        byte [] keyBytes = new byte[splitPos];
-        System.arraycopy(utf, 0, keyBytes, 0, splitPos);
-        int valLen = utf.length-splitPos-1;
+    public static void splitKeyVal(byte[] utf, int start, int length, 
+            Text key, Text val, int splitPos) throws IOException {
+        if(splitPos<start || splitPos >= (start+length))
+            throw new IllegalArgumentException( "splitPos must be in the range " +
+                "[" + start + ", " + (start+length) + "]: " + splitPos);
+        int keyLen = (splitPos-start);
+        byte [] keyBytes = new byte[keyLen];
+        System.arraycopy(utf, start, keyBytes, 0, keyLen);
+        int valLen = (start+length)-splitPos-1;
         byte [] valBytes = new byte[valLen];
-        System.arraycopy(utf,splitPos+1, valBytes, 0, valLen );
+        System.arraycopy(utf, splitPos+1, valBytes, 0, valLen);
         key.set(keyBytes);
         val.set(valBytes);
+    }
+    
+
+    /**
+     * split a UTF-8 byte array into key and value 
+     * assuming that the delimilator is at splitpos. 
+     * @param utf utf-8 encoded string
+     * @param key contains key upon the method is returned
+     * @param val contains value upon the method is returned
+     * @param splitPos the split pos
+     * @throws IOException
+     */
+    public static void splitKeyVal(byte[] utf, Text key, Text val, int splitPos) 
+    throws IOException {
+        splitKeyVal(utf, 0, utf.length, key, val, splitPos);
     }
     
     /**



Mime
View raw message