hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cutt...@apache.org
Subject svn commit: r529432 - in /lucene/hadoop/trunk: ./ src/java/org/apache/hadoop/mapred/ src/test/org/apache/hadoop/mapred/ src/test/org/apache/hadoop/record/
Date Mon, 16 Apr 2007 22:56:01 GMT
Author: cutting
Date: Mon Apr 16 15:56:00 2007
New Revision: 529432

URL: http://svn.apache.org/viewvc?view=rev&rev=529432
Log:
HADOOP-1251.  Add a method to Reporter to get the map InputSplit.  Contributed by Owen.

Modified:
    lucene/hadoop/trunk/CHANGES.txt
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Reporter.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Task.java
    lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRLocalFS.java
    lucene/hadoop/trunk/src/test/org/apache/hadoop/record/TestRecordWritable.java

Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=529432&r1=529431&r2=529432
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Mon Apr 16 15:56:00 2007
@@ -213,6 +213,9 @@
 64. HADOOP-1148.  Re-indent all Java source code to consistently use
     two spaces per indent level.  (cutting)
 
+65. HADOOP-1251.  Add a method to Reporter to get the map InputSplit.
+    (omalley via cutting)
+
 
 Release 0.12.3 - 2007-04-06
 

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java?view=diff&rev=529432&r1=529431&r2=529432
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java Mon Apr 16 15:56:00
2007
@@ -59,6 +59,7 @@
   private String splitClass;
   private MapOutputFile mapOutputFile = new MapOutputFile();
   private JobConf conf;
+  private InputSplit instantiatedSplit = null;
 
   private static final Log LOG = LogFactory.getLog(MapTask.class.getName());
 
@@ -84,6 +85,7 @@
     super.localizeConfiguration(conf);
     Path localSplit = new Path(new Path(getJobFile()).getParent(), 
                                "split.dta");
+    LOG.debug("Writing local split to " + localSplit);
     DataOutputStream out = FileSystem.getLocal(conf).create(localSplit);
     Text.writeString(out, splitClass);
     split.write(out);
@@ -107,6 +109,10 @@
     split.readFields(in);
   }
 
+  InputSplit getInputSplit() throws UnsupportedOperationException {
+    return instantiatedSplit;
+  }
+
   public void run(final JobConf job, final TaskUmbilicalProtocol umbilical)
     throws IOException {
 
@@ -115,9 +121,8 @@
     MapOutputBuffer collector = new MapOutputBuffer(umbilical, job, reporter);
     
     // reinstantiate the split
-    InputSplit split;
     try {
-      split = (InputSplit) 
+      instantiatedSplit = (InputSplit) 
         ReflectionUtils.newInstance(job.getClassByName(splitClass), job);
     } catch (ClassNotFoundException exp) {
       IOException wrap = new IOException("Split class " + splitClass + 
@@ -126,18 +131,19 @@
       throw wrap;
     }
     DataInputBuffer splitBuffer = new DataInputBuffer();
-    splitBuffer.reset(this.split.get(), 0, this.split.getSize());
-    split.readFields(splitBuffer);
+    splitBuffer.reset(split.get(), 0, split.getSize());
+    instantiatedSplit.readFields(splitBuffer);
     
     // if it is a file split, we can give more details
-    if (split instanceof FileSplit) {
-      job.set("map.input.file", ((FileSplit) split).getPath().toString());
-      job.setLong("map.input.start", ((FileSplit) split).getStart());
-      job.setLong("map.input.length", ((FileSplit) split).getLength());
+    if (instantiatedSplit instanceof FileSplit) {
+      FileSplit fileSplit = (FileSplit) instantiatedSplit;
+      job.set("map.input.file", fileSplit.getPath().toString());
+      job.setLong("map.input.start", fileSplit.getStart());
+      job.setLong("map.input.length", fileSplit.getLength());
     }
       
     final RecordReader rawIn =                  // open input
-      job.getInputFormat().getRecordReader(split, job, reporter);
+      job.getInputFormat().getRecordReader(instantiatedSplit, job, reporter);
 
     RecordReader in = new RecordReader() {      // wrap in progress reporter
 

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Reporter.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Reporter.java?view=diff&rev=529432&r1=529431&r2=529432
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Reporter.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Reporter.java Mon Apr 16 15:56:00
2007
@@ -35,6 +35,9 @@
       }
       public void incrCounter(Enum key, long amount) {
       }
+      public InputSplit getInputSplit() throws UnsupportedOperationException {
+        throw new UnsupportedOperationException("NULL reporter has no input");
+      }
     };
 
   /**
@@ -53,4 +56,12 @@
    * be incremented
    */
   public abstract void incrCounter(Enum key, long amount);
+  
+  /**
+   * Get the InputSplit object for a map.
+   * @return the input split that the map is reading from
+   * @throws UnsupportedOperationException if called outside a mapper
+   */
+  public abstract InputSplit getInputSplit() 
+    throws UnsupportedOperationException;
 }

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Task.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Task.java?view=diff&rev=529432&r1=529431&r2=529432
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Task.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Task.java Mon Apr 16 15:56:00 2007
@@ -173,6 +173,10 @@
 
   public Progress getProgress() { return taskProgress; }
 
+  InputSplit getInputSplit() throws UnsupportedOperationException {
+    throw new UnsupportedOperationException("Input only available on map");
+  }
+
   protected Reporter getReporter(final TaskUmbilicalProtocol umbilical) 
     throws IOException 
   {
@@ -191,6 +195,9 @@
           if (counters != null) {
             counters.incrCounter(key, amount);
           }
+        }
+        public InputSplit getInputSplit() throws UnsupportedOperationException {
+          return Task.this.getInputSplit();
         }
       };
   }

Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRLocalFS.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRLocalFS.java?view=diff&rev=529432&r1=529431&r2=529432
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRLocalFS.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRLocalFS.java Mon Apr 16
15:56:00 2007
@@ -21,7 +21,6 @@
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.*;
-import org.apache.hadoop.mapred.lib.IdentityReducer;
 import org.apache.hadoop.util.Progressable;
 
 import java.io.DataInput;
@@ -215,6 +214,11 @@
                     OutputCollector out, Reporter reporter) throws IOException {
       System.out.println("map: " + key + ", " + value);
       out.collect((WritableComparable) value, key);
+      InputSplit split = reporter.getInputSplit();
+      if (split.getClass() != MyInputFormat.MySplit.class) {
+        throw new IOException("Got wrong split in MyMapper! " + 
+                              split.getClass().getName());
+      }
     }
   }
 
@@ -222,6 +226,12 @@
     public void reduce(WritableComparable key, Iterator values, 
                        OutputCollector output, Reporter reporter
                        ) throws IOException {
+      try {
+        InputSplit split = reporter.getInputSplit();
+        throw new IOException("Got an input split of " + split);
+      } catch (UnsupportedOperationException e) {
+        // expected result
+      }
       while (values.hasNext()) {
         Writable value = (Writable) values.next();
         System.out.println("reduce: " + key + ", " + value);

Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/record/TestRecordWritable.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/record/TestRecordWritable.java?view=diff&rev=529432&r1=529431&r2=529432
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/record/TestRecordWritable.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/record/TestRecordWritable.java Mon Apr
16 15:56:00 2007
@@ -46,12 +46,6 @@
     Path dir = new Path(System.getProperty("test.build.data",".") + "/mapred");
     Path file = new Path(dir, "test.seq");
     
-    Reporter reporter = new Reporter() {
-        public void setStatus(String status) throws IOException {}
-        public void progress() throws IOException {}
-        public void incrCounter(Enum key, long amount) {}
-      };
-    
     int seed = new Random().nextInt();
     //LOG.info("seed = "+seed);
     Random random = new Random(seed);
@@ -95,7 +89,7 @@
         BitSet bits = new BitSet(length);
         for (int j = 0; j < splits.length; j++) {
           RecordReader reader =
-            format.getRecordReader(splits[j], job, reporter);
+            format.getRecordReader(splits[j], job, Reporter.NULL);
           try {
             int count = 0;
             while (reader.next(key, value)) {



Mime
View raw message