hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cutt...@apache.org
Subject svn commit: r549237 - in /lucene/hadoop/trunk: CHANGES.txt src/java/org/apache/hadoop/mapred/JobClient.java src/java/org/apache/hadoop/mapred/JobInProgress.java src/test/org/apache/hadoop/mapred/TestMapOutputOrder.java
Date Wed, 20 Jun 2007 21:13:52 GMT
Author: cutting
Date: Wed Jun 20 14:13:51 2007
New Revision: 549237

URL: http://svn.apache.org/viewvc?view=rev&rev=549237
Log:
HADOOP-1440.  When reduce is disabled, use order of splits returned by the InputFormat when
numbering outputs.  Contributed by Senthil.

Added:
    lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMapOutputOrder.java
Modified:
    lucene/hadoop/trunk/CHANGES.txt
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobClient.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java

Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=549237&r1=549236&r2=549237
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Wed Jun 20 14:13:51 2007
@@ -209,6 +209,10 @@
      thread.  Reporting during sorting and more is also more
      consistent.  (Vivek Ratan via cutting)
 
+ 64. HADOOP-1440.  When reduce is disabled, use order of splits
+     returned by InputFormat#getSplits when numbering outputs.
+     (Senthil Subramanian via cutting)
+
 
 Release 0.13.0 - 2007-06-08
 

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobClient.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobClient.java?view=diff&rev=549237&r1=549236&r2=549237
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobClient.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobClient.java Wed Jun 20 14:13:51
2007
@@ -339,6 +339,10 @@
     LOG.debug("Creating splits at " + fs.makeQualified(submitSplitFile));
     InputSplit[] splits = 
       job.getInputFormat().getSplits(job, job.getNumMapTasks());
+    Hashtable<InputSplit, Integer> splitPositions = new Hashtable<InputSplit, Integer>();

+    for (int i = 0; i < splits.length; ++i) {
+      splitPositions.put(splits[i], i);
+    }
     // sort the splits into order based on size, so that the biggest
     // go first
     Arrays.sort(splits, new Comparator<InputSplit>() {
@@ -362,7 +366,7 @@
     // write the splits to a file for the job tracker
     FSDataOutputStream out = fs.create(submitSplitFile);
     try {
-      writeSplitsFile(splits, out);
+      writeSplitsFile(splits, splitPositions, out);
     } finally {
       out.close();
     }
@@ -391,6 +395,7 @@
   static class RawSplit implements Writable {
     private String splitClass;
     private BytesWritable bytes = new BytesWritable();
+    private int position;
     private String[] locations;
       
     public void setBytes(byte[] data, int offset, int length) {
@@ -408,11 +413,19 @@
     public BytesWritable getBytes() {
       return bytes;
     }
+
+    public void setPosition(int position) {
+      this.position = position;
+    }
       
     public void setLocations(String[] locations) {
       this.locations = locations;
     }
       
+    public int getPosition() {
+      return position;
+    }
+      
     public String[] getLocations() {
       return locations;
     }
@@ -420,6 +433,7 @@
     public void readFields(DataInput in) throws IOException {
       splitClass = Text.readString(in);
       bytes.readFields(in);
+      position = WritableUtils.readVInt(in);
       int len = WritableUtils.readVInt(in);
       locations = new String[len];
       for(int i=0; i < len; ++i) {
@@ -430,6 +444,7 @@
     public void write(DataOutput out) throws IOException {
       Text.writeString(out, splitClass);
       bytes.write(out);
+      WritableUtils.writeVInt(out, position);
       WritableUtils.writeVInt(out, locations.length);
       for(int i = 0; i < locations.length; i++) {
         Text.writeString(out, locations[i]);
@@ -449,7 +464,8 @@
    * @param splits the input splits to write out
    * @param out the stream to write to
    */
-  private void writeSplitsFile(InputSplit[] splits, FSDataOutputStream out) throws IOException
{
+  private void writeSplitsFile(InputSplit[] splits, Hashtable splitPositions,
+                              FSDataOutputStream out) throws IOException {
     out.write(SPLIT_FILE_HEADER);
     WritableUtils.writeVInt(out, CURRENT_SPLIT_FILE_VERSION);
     WritableUtils.writeVInt(out, splits.length);
@@ -460,6 +476,7 @@
       buffer.reset();
       split.write(buffer);
       rawSplit.setBytes(buffer.getData(), 0, buffer.getLength());
+      rawSplit.setPosition(((Integer) splitPositions.get(split)).intValue());
       rawSplit.setLocations(split.getLocations());
       rawSplit.write(out);
     }

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java?view=diff&rev=549237&r1=549236&r2=549237
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java Wed Jun 20 14:13:51
2007
@@ -221,7 +221,7 @@
       maps[i] = new TaskInProgress(uniqueString, jobFile, 
                                    splits[i].getClassName(),
                                    splits[i].getBytes(), 
-                                   jobtracker, conf, this, i);
+                                   jobtracker, conf, this, splits[i].getPosition());
       for(String host: splits[i].getLocations()) {
         List<TaskInProgress> hostMaps = hostToMaps.get(host);
         if (hostMaps == null) {

Added: lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMapOutputOrder.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMapOutputOrder.java?view=auto&rev=549237
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMapOutputOrder.java (added)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMapOutputOrder.java Wed Jun
20 14:13:51 2007
@@ -0,0 +1,162 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import org.apache.commons.logging.*;
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.io.*;
+import org.apache.hadoop.io.compress.*;
+import org.apache.hadoop.mapred.lib.*;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.dfs.MiniDFSCluster;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.util.ReflectionUtils;
+import junit.framework.TestCase;
+import java.io.*;
+import java.util.*;
+
+/** 
+ * TestMapOutputOrder checks if there is a 1-1 correspondence between
+ * the order of Map Input files (returned from InputFormat.getSplits())
+ * and the Map Output Files
+ */
+public class TestMapOutputOrder extends TestCase 
+{
+  private static final Log LOG =
+    LogFactory.getLog(TestTextInputFormat.class.getName());
+
+  JobConf jobConf = new JobConf(TestMapOutputOrder.class);
+  JobClient jc;
+
+  private static class TestMapper extends MapReduceBase implements Mapper {
+    public void map(WritableComparable key, Writable val,
+                   OutputCollector output, Reporter reporter)
+      throws IOException {
+      output.collect(null, val);
+    }
+  }
+
+  private static void writeFile(FileSystem fs, Path name,
+                                CompressionCodec codec,
+                                String contents) throws IOException {
+    OutputStream stm;
+    if (codec == null) {
+      stm = fs.create(name);
+    } else {
+      stm = codec.createOutputStream(fs.create(name));
+    }
+    stm.write(contents.getBytes()); 
+    stm.close();
+  } 
+
+  private static String readFile(FileSystem fs, Path name,
+                                CompressionCodec codec) throws IOException {
+    InputStream stm;
+    if (codec == null) {
+      stm = fs.open(name);
+    } else {
+      stm = codec.createInputStream(fs.open(name));
+    }
+
+    String contents = "";
+    int b = stm.read();
+    while (b != -1) {
+       contents += (char) b;
+       b = stm.read();
+    }
+    stm.close();
+    return contents;
+  }
+
+  public void testMapOutputOrder() throws Exception {
+    String nameNode = null;
+    MiniDFSCluster dfs = null;
+    MiniMRCluster mr = null;
+    FileSystem fileSys = null;
+
+    try {
+      final int taskTrackers = 3;
+      final int jobTrackerPort = 60070;
+
+      Configuration conf = new Configuration();
+      dfs = new MiniDFSCluster(conf, 1, true, null);
+      fileSys = dfs.getFileSystem();
+      nameNode = fileSys.getName();
+      mr = new MiniMRCluster(taskTrackers, nameNode, 3);
+      final String jobTrackerName = "localhost:" + mr.getJobTrackerPort();
+
+      Path testdir = new Path("/testing/mapoutputorder/");
+      Path inDir = new Path(testdir, "input");
+      Path outDir = new Path(testdir, "output");
+      FileSystem fs = FileSystem.getNamed(nameNode, conf);
+      fs.delete(testdir);
+      jobConf.set("fs.default.name", nameNode);
+      jobConf.set("mapred.job.tracker", jobTrackerName);
+      jobConf.setInputFormat(TextInputFormat.class);
+      jobConf.setInputPath(inDir);
+      jobConf.setOutputPath(outDir);
+      jobConf.setMapperClass(TestMapper.class);
+      jobConf.setNumMapTasks(3);
+      jobConf.setMapOutputKeyClass(LongWritable.class);
+      jobConf.setMapOutputValueClass(Text.class); 
+      jobConf.setNumReduceTasks(0);
+      jobConf.setJar("build/test/testjar/testjob.jar");
+
+      if (!fs.mkdirs(testdir)) {
+        throw new IOException("Mkdirs failed to create " + testdir.toString());
+      }
+      if (!fs.mkdirs(inDir)) {
+        throw new IOException("Mkdirs failed to create " + inDir.toString());
+      }
+
+      // create input files
+      CompressionCodec gzip = new GzipCodec();
+      ReflectionUtils.setConf(gzip, jobConf);
+      String[] inpStrings = new String[3];
+      inpStrings[0] = "part1_line1\npart1_line2\n";
+      inpStrings[1] = "part2_line1\npart2_line2\npart2_line3\n";
+      inpStrings[2] = "part3_line1\n";
+      writeFile(fs, new Path(inDir, "part1.txt.gz"), gzip, inpStrings[0]);
+      writeFile(fs, new Path(inDir, "part2.txt.gz"), gzip, inpStrings[1]);
+      writeFile(fs, new Path(inDir, "part3.txt.gz"), gzip, inpStrings[2]);
+
+      // run job
+      jc = new JobClient(jobConf);
+
+      RunningJob rj = jc.runJob(jobConf);
+      assertTrue("job was complete", rj.isComplete());
+      assertTrue("job was successful", rj.isSuccessful());
+
+      // check map output files
+      Path[] outputPaths = fs.listPaths(outDir);
+      String contents;
+      for (int i = 0; i < outputPaths.length; i++) {
+        LOG.debug("Output Path (#" + (i+1) +"): " + outputPaths[i].getName());
+        contents = readFile(fs, outputPaths[i], null);
+        LOG.debug("Contents: " + contents);
+        assertTrue(new String("Input File #" + (i+1) + " == Map Output File #" + (i+1)),
inpStrings[i].equals(contents));
+      }
+    }
+    finally {
+      // clean-up
+      if (fileSys != null) { fileSys.close(); }
+      if (dfs != null) { dfs.shutdown(); }
+      if (mr != null) { mr.shutdown(); }
+    }
+  }
+}



Mime
View raw message