hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ka...@apache.org
Subject svn commit: r1611196 - in /hadoop/common/trunk/hadoop-mapreduce-project: ./ hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/ hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/...
Date Wed, 16 Jul 2014 21:24:51 GMT
Author: kasha
Date: Wed Jul 16 21:24:51 2014
New Revision: 1611196

URL: http://svn.apache.org/r1611196
Log:
MAPREDUCE-5952. LocalContainerLauncher#renameMapOutputForReduce incorrectly assumes a single
dir for mapOutIndex. (Gera Shegalov via kasha)

Modified:
    hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/LocalContainerLauncher.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestLocalContainerLauncher.java

Modified: hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt?rev=1611196&r1=1611195&r2=1611196&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt Wed Jul 16 21:24:51 2014
@@ -307,6 +307,9 @@ Release 2.5.0 - UNRELEASED
     resource configuration for deciding uber-mode on map-only jobs. (Siqi Li via
     vinodkv)
 
+    MAPREDUCE-5952. LocalContainerLauncher#renameMapOutputForReduce incorrectly 
+    assumes a single dir for mapOutIndex. (Gera Shegalov via kasha)
+
 Release 2.4.1 - 2014-06-23 
 
   INCOMPATIBLE CHANGES

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/LocalContainerLauncher.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/LocalContainerLauncher.java?rev=1611196&r1=1611195&r2=1611196&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/LocalContainerLauncher.java
(original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/LocalContainerLauncher.java
Wed Jul 16 21:24:51 2014
@@ -30,6 +30,7 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.LinkedBlockingQueue;
 
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.FSError;
@@ -438,43 +439,6 @@ public class LocalContainerLauncher exte
     }
 
     /**
-     * Within the _local_ filesystem (not HDFS), all activity takes place within
-     * a single subdir (${local.dir}/usercache/$user/appcache/$appId/$contId/),
-     * and all sub-MapTasks create the same filename ("file.out").  Rename that
-     * to something unique (e.g., "map_0.out") to avoid collisions.
-     *
-     * Longer-term, we'll modify [something] to use TaskAttemptID-based
-     * filenames instead of "file.out". (All of this is entirely internal,
-     * so there are no particular compatibility issues.)
-     */
-    private MapOutputFile renameMapOutputForReduce(JobConf conf,
-        TaskAttemptId mapId, MapOutputFile subMapOutputFile) throws IOException {
-      FileSystem localFs = FileSystem.getLocal(conf);
-      // move map output to reduce input
-      Path mapOut = subMapOutputFile.getOutputFile();
-      FileStatus mStatus = localFs.getFileStatus(mapOut);      
-      Path reduceIn = subMapOutputFile.getInputFileForWrite(
-          TypeConverter.fromYarn(mapId).getTaskID(), mStatus.getLen());
-      Path mapOutIndex = new Path(mapOut.toString() + ".index");
-      Path reduceInIndex = new Path(reduceIn.toString() + ".index");
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Renaming map output file for task attempt "
-            + mapId.toString() + " from original location " + mapOut.toString()
-            + " to destination " + reduceIn.toString());
-      }
-      if (!localFs.mkdirs(reduceIn.getParent())) {
-        throw new IOException("Mkdirs failed to create "
-            + reduceIn.getParent().toString());
-      }
-      if (!localFs.rename(mapOut, reduceIn))
-        throw new IOException("Couldn't rename " + mapOut);
-      if (!localFs.rename(mapOutIndex, reduceInIndex))
-        throw new IOException("Couldn't rename " + mapOutIndex);
-      
-      return new RenamedMapOutputFile(reduceIn);
-    }
-
-    /**
      * Also within the local filesystem, we need to restore the initial state
      * of the directory as much as possible.  Compare current contents against
      * the saved original state and nuke everything that doesn't belong, with
@@ -506,7 +470,46 @@ public class LocalContainerLauncher exte
     }
 
   } // end EventHandler
-  
+
+  /**
+   * Within the _local_ filesystem (not HDFS), all activity takes place within
+   * a subdir inside one of the LOCAL_DIRS
+   * (${local.dir}/usercache/$user/appcache/$appId/$contId/),
+   * and all sub-MapTasks create the same filename ("file.out").  Rename that
+   * to something unique (e.g., "map_0.out") to avoid possible collisions.
+   *
+   * Longer-term, we'll modify [something] to use TaskAttemptID-based
+   * filenames instead of "file.out". (All of this is entirely internal,
+   * so there are no particular compatibility issues.)
+   */
+  @VisibleForTesting
+  protected static MapOutputFile renameMapOutputForReduce(JobConf conf,
+      TaskAttemptId mapId, MapOutputFile subMapOutputFile) throws IOException {
+    FileSystem localFs = FileSystem.getLocal(conf);
+    // move map output to reduce input
+    Path mapOut = subMapOutputFile.getOutputFile();
+    FileStatus mStatus = localFs.getFileStatus(mapOut);
+    Path reduceIn = subMapOutputFile.getInputFileForWrite(
+        TypeConverter.fromYarn(mapId).getTaskID(), mStatus.getLen());
+    Path mapOutIndex = subMapOutputFile.getOutputIndexFile();
+    Path reduceInIndex = new Path(reduceIn.toString() + ".index");
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Renaming map output file for task attempt "
+          + mapId.toString() + " from original location " + mapOut.toString()
+          + " to destination " + reduceIn.toString());
+    }
+    if (!localFs.mkdirs(reduceIn.getParent())) {
+      throw new IOException("Mkdirs failed to create "
+          + reduceIn.getParent().toString());
+    }
+    if (!localFs.rename(mapOut, reduceIn))
+      throw new IOException("Couldn't rename " + mapOut);
+    if (!localFs.rename(mapOutIndex, reduceInIndex))
+      throw new IOException("Couldn't rename " + mapOutIndex);
+
+    return new RenamedMapOutputFile(reduceIn);
+  }
+
   private static class RenamedMapOutputFile extends MapOutputFile {
     private Path path;
     

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestLocalContainerLauncher.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestLocalContainerLauncher.java?rev=1611196&r1=1611195&r2=1611196&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestLocalContainerLauncher.java
(original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestLocalContainerLauncher.java
Wed Jul 16 21:24:51 2014
@@ -18,17 +18,26 @@
 
 package org.apache.hadoop.mapred;
 
+import static org.apache.hadoop.fs.CreateFlag.CREATE;
 import static org.mockito.Matchers.isA;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
+import java.io.File;
+import java.io.IOException;
+import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.CountDownLatch;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.MRConfig;
 import org.apache.hadoop.mapreduce.TypeConverter;
 import org.apache.hadoop.mapreduce.v2.api.records.JobId;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
@@ -46,6 +55,9 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.event.Event;
 import org.apache.hadoop.yarn.event.EventHandler;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
 import org.junit.Test;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
@@ -53,6 +65,36 @@ import org.mockito.stubbing.Answer;
 public class TestLocalContainerLauncher {
   private static final Log LOG =
       LogFactory.getLog(TestLocalContainerLauncher.class);
+  private static File testWorkDir;
+  private static final String[] localDirs = new String[2];
+
+  private static void delete(File dir) throws IOException {
+    Configuration conf = new Configuration();
+    FileSystem fs = FileSystem.getLocal(conf);
+    Path p = fs.makeQualified(new Path(dir.getAbsolutePath()));
+    fs.delete(p, true);
+  }
+
+  @BeforeClass
+  public static void setupTestDirs() throws IOException {
+    testWorkDir = new File("target",
+        TestLocalContainerLauncher.class.getCanonicalName());
+    testWorkDir.delete();
+    testWorkDir.mkdirs();
+    testWorkDir = testWorkDir.getAbsoluteFile();
+    for (int i = 0; i < localDirs.length; i++) {
+      final File dir = new File(testWorkDir, "local-" + i);
+      dir.mkdirs();
+      localDirs[i] = dir.toString();
+    }
+  }
+
+  @AfterClass
+  public static void cleanupTestDirs() throws IOException {
+    if (testWorkDir != null) {
+      delete(testWorkDir);
+    }
+  }
 
   @SuppressWarnings("rawtypes")
   @Test(timeout=10000)
@@ -141,4 +183,35 @@ public class TestLocalContainerLauncher 
     when(container.getNodeId()).thenReturn(nodeId);
     return container;
   }
+
+
+  @Test
+  public void testRenameMapOutputForReduce() throws Exception {
+    final JobConf conf = new JobConf();
+
+    final MROutputFiles mrOutputFiles = new MROutputFiles();
+    mrOutputFiles.setConf(conf);
+
+    // make sure both dirs are distinct
+    //
+    conf.set(MRConfig.LOCAL_DIR, localDirs[0].toString());
+    final Path mapOut = mrOutputFiles.getOutputFileForWrite(1);
+    conf.set(MRConfig.LOCAL_DIR, localDirs[1].toString());
+    final Path mapOutIdx = mrOutputFiles.getOutputIndexFileForWrite(1);
+    Assert.assertNotEquals("Paths must be different!",
+        mapOut.getParent(), mapOutIdx.getParent());
+
+    // make both dirs part of LOCAL_DIR
+    conf.setStrings(MRConfig.LOCAL_DIR, localDirs);
+
+    final FileContext lfc = FileContext.getLocalFSFileContext(conf);
+    lfc.create(mapOut, EnumSet.of(CREATE)).close();
+    lfc.create(mapOutIdx, EnumSet.of(CREATE)).close();
+
+    final JobId jobId = MRBuilderUtils.newJobId(12345L, 1, 2);
+    final TaskId tid = MRBuilderUtils.newTaskId(jobId, 0, TaskType.MAP);
+    final TaskAttemptId taid = MRBuilderUtils.newTaskAttemptId(tid, 0);
+
+    LocalContainerLauncher.renameMapOutputForReduce(conf, taid, mrOutputFiles);
+  }
 }



Mime
View raw message