hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From w...@apache.org
Subject svn commit: r1618700 - in /hadoop/common/branches/fs-encryption/hadoop-mapreduce-project: ./ hadoop-mapreduce-client/hadoop-mapreduce-client-app/ hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/ ...
Date Mon, 18 Aug 2014 18:41:38 GMT
Author: wang
Date: Mon Aug 18 18:41:31 2014
New Revision: 1618700

URL: http://svn.apache.org/r1618700
Log:
Merge from trunk to branch.

Added:
    hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/apt/MapredCommands.apt.vm
      - copied unchanged from r1618693, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/apt/MapredCommands.apt.vm
Modified:
    hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/   (props changed)
    hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/CHANGES.txt   (contents, props changed)
    hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/pom.xml
    hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java
    hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
    hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java
    hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestJobEndNotifier.java
    hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java
    hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JobHistoryUtils.java
    hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/InputFormat.java
    hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TaskCompletionEvent.java
    hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/join/CompositeInputFormat.java
    hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/InputFormat.java
    hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java
    hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
    hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/TaskCompletionEvent.java
    hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/KeyValueTextInputFormat.java
    hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/join/CompositeInputFormat.java
    hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java
    hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl.java
    hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleSchedulerImpl.java
    hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml   (contents, props changed)
    hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/markdown/DistCp.md.vm
    hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/markdown/HadoopArchives.md.vm
    hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryServerFileSystemStateStoreService.java
    hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestHistoryFileManager.java
    hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestHistoryServerFileSystemStateStoreService.java

Propchange: hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/
------------------------------------------------------------------------------
  Merged /hadoop/common/trunk/hadoop-mapreduce-project:r1617528-1618693

Modified: hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/CHANGES.txt?rev=1618700&r1=1618699&r2=1618700&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/CHANGES.txt Mon Aug 18 18:41:31 2014
@@ -165,6 +165,15 @@ Release 2.6.0 - UNRELEASED
     MAPREDUCE-5963. ShuffleHandler DB schema should be versioned with
     compatible/incompatible changes (Junping Du via jlowe)
 
+    MAPREDUCE-883. harchive: Document how to unarchive (Akira AJISAKA and
+      Koji Noguchi via aw)
+
+    MAPREDUCE-4791. Javadoc for KeyValueTextInputFormat should include default
+      separator and how to change it (Akira AJISAKA via aw)
+
+    MAPREDUCE-5906. Inconsistent configuration in property
+      "mapreduce.reduce.shuffle.input.buffer.percent" (Akira AJISAKA via aw)
+
   OPTIMIZATIONS
 
   BUG FIXES
@@ -187,6 +196,43 @@ Release 2.6.0 - UNRELEASED
     MAPREDUCE-6021. MR AM should have working directory in LD_LIBRARY_PATH
     (jlowe)
 
+    MAPREDUCE-6010. HistoryServerFileSystemStateStore fails to update tokens
+    (jlowe)
+
+    MAPREDUCE-5878. some standard JDK APIs are not part of system classes
+    defaults (Sangjin Lee via jlowe)
+
+    MAPREDUCE-5944. Remove MRv1 commands from CommandsManual.apt.vm
+      (Akira AJISAKA via aw)
+
+    MAPREDUCE-5943. Separate mapred commands from CommandManual.apt.vm
+      (Akira AJISAKA via aw)
+
+    MAPREDUCE-5363. Fix doc and spelling for TaskCompletionEvent#getTaskStatus
+      and getStatus (Akira AJISAKA via aw)
+
+    MAPREDUCE-5595. Typo in MergeManagerImpl.java (Akira AJISAKA via aw)
+
+    MAPREDUCE-5597. Missing alternatives in javadocs for deprecated constructors
+     in mapreduce.Job (Akira AJISAKA via aw)
+
+    MAPREDUCE-5950. incorrect description in distcp2 document (Akira AJISAKA
+      via aw)
+
+    MAPREDUCE-5998. CompositeInputFormat javadoc is broken (Akira AJISAKA via
+      aw)
+
+    MAPREDUCE-5999. Fix dead link in InputFormat javadoc (Akira AJISAKA via aw)
+
+    MAPREDUCE-6032. Made MR jobs write job history files on the default FS when
+    the current context's FS is different. (Benjamin Zhitomirsky via zjshen)
+
+    MAPREDUCE-6024. Shortened the time when Fetcher is stuck in retrying before
+    concluding the failure by configuration. (Yunjiong Zhao via zjshen)
+
+    MAPREDUCE-6036. TestJobEndNotifier fails intermittently in branch-2 (chang
+    li via jlowe)
+
 Release 2.5.0 - UNRELEASED
 
   INCOMPATIBLE CHANGES
@@ -268,6 +314,9 @@ Release 2.5.0 - UNRELEASED
 
   BUG FIXES 
 
+    MAPREDUCE-6033. Updated access check for displaying job information 
+    (Yu Gao via Eric Yang)
+
     MAPREDUCE-5759. Remove unnecessary conf load in Limits (Sandy Ryza)
 
     MAPREDUCE-5014. Extend Distcp to accept a custom CopyListing.

Propchange: hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/CHANGES.txt
------------------------------------------------------------------------------
  Merged /hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt:r1617528-1618693

Modified: hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/pom.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/pom.xml?rev=1618700&r1=1618699&r2=1618700&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/pom.xml (original)
+++ hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/pom.xml Mon Aug 18 18:41:31 2014
@@ -73,6 +73,12 @@
       <groupId>org.apache.hadoop</groupId>
       <artifactId>hadoop-mapreduce-client-shuffle</artifactId>
     </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-hdfs</artifactId>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
 
   <build>

Modified: hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java?rev=1618700&r1=1618699&r2=1618700&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java Mon Aug 18 18:41:31 2014
@@ -28,13 +28,13 @@ import java.util.Timer;
 import java.util.TimerTask;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileAlreadyExistsException;
+import org.apache.hadoop.fs.FileContext;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileUtil;
@@ -74,7 +74,9 @@ public class JobHistoryEventHandler exte
 
   private int eventCounter;
 
-  //TODO Does the FS object need to be different ? 
+  // Those file systems may differ from the job configuration
+  // See org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils
+  // #ensurePathInDefaultFileSystem
   private FileSystem stagingDirFS; // log Dir FileSystem
   private FileSystem doneDirFS; // done Dir FileSystem
 
@@ -141,7 +143,7 @@ public class JobHistoryEventHandler exte
     //Check for the existence of the history staging dir. Maybe create it. 
     try {
       stagingDirPath =
-          FileSystem.get(conf).makeQualified(new Path(stagingDirStr));
+          FileContext.getFileContext(conf).makeQualified(new Path(stagingDirStr));
       stagingDirFS = FileSystem.get(stagingDirPath.toUri(), conf);
       mkdir(stagingDirFS, stagingDirPath, new FsPermission(
           JobHistoryUtils.HISTORY_STAGING_DIR_PERMISSIONS));
@@ -154,7 +156,7 @@ public class JobHistoryEventHandler exte
     //Check for the existence of intermediate done dir.
     Path doneDirPath = null;
     try {
-      doneDirPath = FileSystem.get(conf).makeQualified(new Path(doneDirStr));
+      doneDirPath = FileContext.getFileContext(conf).makeQualified(new Path(doneDirStr));
       doneDirFS = FileSystem.get(doneDirPath.toUri(), conf);
       // This directory will be in a common location, or this may be a cluster
       // meant for a single user. Creating based on the conf. Should ideally be
@@ -194,7 +196,7 @@ public class JobHistoryEventHandler exte
     //Check/create user directory under intermediate done dir.
     try {
       doneDirPrefixPath =
-          FileSystem.get(conf).makeQualified(new Path(userDoneDirStr));
+          FileContext.getFileContext(conf).makeQualified(new Path(userDoneDirStr));
       mkdir(doneDirFS, doneDirPrefixPath, new FsPermission(
           JobHistoryUtils.HISTORY_INTERMEDIATE_USER_DIR_PERMISSIONS));
     } catch (IOException e) {

Modified: hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java?rev=1618700&r1=1618699&r2=1618700&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java Mon Aug 18 18:41:31 2014
@@ -148,10 +148,10 @@ public class JobImpl implements org.apac
   private static final Log LOG = LogFactory.getLog(JobImpl.class);
 
   //The maximum fraction of fetch failures allowed for a map
-  private static final double MAX_ALLOWED_FETCH_FAILURES_FRACTION = 0.5;
-
-  // Maximum no. of fetch-failure notifications after which map task is failed
-  private static final int MAX_FETCH_FAILURES_NOTIFICATIONS = 3;
+  private float maxAllowedFetchFailuresFraction;
+  
+  //Maximum no. of fetch-failure notifications after which map task is failed
+  private int maxFetchFailuresNotifications;
 
   public static final String JOB_KILLED_DIAG =
       "Job received Kill while in RUNNING state.";
@@ -704,6 +704,13 @@ public class JobImpl implements org.apac
     if(forcedDiagnostic != null) {
       this.diagnostics.add(forcedDiagnostic);
     }
+    
+    this.maxAllowedFetchFailuresFraction = conf.getFloat(
+        MRJobConfig.MAX_ALLOWED_FETCH_FAILURES_FRACTION,
+        MRJobConfig.DEFAULT_MAX_ALLOWED_FETCH_FAILURES_FRACTION);
+    this.maxFetchFailuresNotifications = conf.getInt(
+        MRJobConfig.MAX_FETCH_FAILURES_NOTIFICATIONS,
+        MRJobConfig.DEFAULT_MAX_FETCH_FAILURES_NOTIFICATIONS);
   }
 
   protected StateMachine<JobStateInternal, JobEventType, JobEvent> getStateMachine() {
@@ -730,7 +737,7 @@ public class JobImpl implements org.apac
     if (jobACL == null) {
       return true;
     }
-    return aclsManager.checkAccess(callerUGI, jobOperation, username, jobACL);
+    return aclsManager.checkAccess(callerUGI, jobOperation, userName, jobACL);
   }
 
   @Override
@@ -1900,9 +1907,8 @@ public class JobImpl implements org.apac
         float failureRate = shufflingReduceTasks == 0 ? 1.0f : 
           (float) fetchFailures / shufflingReduceTasks;
         // declare faulty if fetch-failures >= max-allowed-failures
-        boolean isMapFaulty =
-            (failureRate >= MAX_ALLOWED_FETCH_FAILURES_FRACTION);
-        if (fetchFailures >= MAX_FETCH_FAILURES_NOTIFICATIONS && isMapFaulty) {
+        if (fetchFailures >= job.getMaxFetchFailuresNotifications()
+            && failureRate >= job.getMaxAllowedFetchFailuresFraction()) {
           LOG.info("Too many fetch-failures for output of task attempt: " + 
               mapId + " ... raising fetch failure to map");
           job.eventHandler.handle(new TaskAttemptEvent(mapId, 
@@ -2185,4 +2191,12 @@ public class JobImpl implements org.apac
     jobConf.addResource(fc.open(confPath), confPath.toString());
     return jobConf;
   }
+
+  public float getMaxAllowedFetchFailuresFraction() {
+    return maxAllowedFetchFailuresFraction;
+  }
+
+  public int getMaxFetchFailuresNotifications() {
+    return maxFetchFailuresNotifications;
+  }
 }

Modified: hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java?rev=1618700&r1=1618699&r2=1618700&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java Mon Aug 18 18:41:31 2014
@@ -28,6 +28,7 @@ import static org.mockito.Mockito.when;
 import static org.mockito.Mockito.never;
 
 import java.io.File;
+import java.io.FileOutputStream;
 import java.io.IOException;
 
 import org.junit.Assert;
@@ -35,8 +36,13 @@ import org.junit.Assert;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.mapreduce.Counters;
 import org.apache.hadoop.mapreduce.JobID;
 import org.apache.hadoop.mapreduce.MRJobConfig;
@@ -52,6 +58,10 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.junit.After;
+import org.junit.AfterClass;
+import static org.junit.Assert.assertFalse;
+import org.junit.BeforeClass;
 import org.junit.Test;
 import org.mockito.Mockito;
 
@@ -60,6 +70,26 @@ public class TestJobHistoryEventHandler 
 
   private static final Log LOG = LogFactory
       .getLog(TestJobHistoryEventHandler.class);
+  private static MiniDFSCluster dfsCluster = null;
+  private static String coreSitePath;
+
+  @BeforeClass
+  public static void setUpClass() throws Exception {
+    coreSitePath = "." + File.separator + "target" + File.separator +
+            "test-classes" + File.separator + "core-site.xml";
+    Configuration conf = new HdfsConfiguration();
+    dfsCluster = new MiniDFSCluster.Builder(conf).build();
+  }
+
+  @AfterClass
+  public static void cleanUpClass() throws Exception {
+    dfsCluster.shutdown();
+  }
+
+  @After
+  public void cleanTest() throws Exception {
+    new File(coreSitePath).delete();
+  }
 
   @Test (timeout=50000)
   public void testFirstFlushOnCompletionEvent() throws Exception {
@@ -325,6 +355,50 @@ public class TestJobHistoryEventHandler 
     }
   }
 
+  @Test (timeout=50000)
+  public void testDefaultFsIsUsedForHistory() throws Exception {
+    // Create default configuration pointing to the minicluster
+    Configuration conf = new Configuration();
+    conf.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY,
+            dfsCluster.getURI().toString());
+    FileOutputStream os = new FileOutputStream(coreSitePath);
+    conf.writeXml(os);
+    os.close();
+
+    // simulate execution under a non-default namenode
+    conf.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY,
+            "file:///");
+
+    TestParams t = new TestParams();
+    conf.set(MRJobConfig.MR_AM_STAGING_DIR, t.dfsWorkDir);
+
+    JHEvenHandlerForTest realJheh =
+        new JHEvenHandlerForTest(t.mockAppContext, 0, false);
+    JHEvenHandlerForTest jheh = spy(realJheh);
+    jheh.init(conf);
+
+    try {
+      jheh.start();
+      handleEvent(jheh, new JobHistoryEvent(t.jobId, new AMStartedEvent(
+          t.appAttemptId, 200, t.containerId, "nmhost", 3000, 4000)));
+
+      handleEvent(jheh, new JobHistoryEvent(t.jobId, new JobFinishedEvent(
+          TypeConverter.fromYarn(t.jobId), 0, 0, 0, 0, 0, new Counters(),
+          new Counters(), new Counters())));
+
+      // If we got here then event handler worked but we don't know with which
+      // file system. Now we check that history stuff was written to minicluster
+      FileSystem dfsFileSystem = dfsCluster.getFileSystem();
+      assertTrue("Minicluster contains some history files",
+          dfsFileSystem.globStatus(new Path(t.dfsWorkDir + "/*")).length != 0);
+      FileSystem localFileSystem = LocalFileSystem.get(conf);
+      assertFalse("No history directory on non-default file system",
+          localFileSystem.exists(new Path(t.dfsWorkDir)));
+    } finally {
+      jheh.stop();
+    }
+  }
+
   private void queueEvent(JHEvenHandlerForTest jheh, JobHistoryEvent event) {
     jheh.handle(event);
   }
@@ -372,6 +446,7 @@ public class TestJobHistoryEventHandler 
   private class TestParams {
     boolean isLastAMRetry;
     String workDir = setupTestWorkDir();
+    String dfsWorkDir = "/" + this.getClass().getCanonicalName();
     ApplicationId appId = ApplicationId.newInstance(200, 1);
     ApplicationAttemptId appAttemptId =
         ApplicationAttemptId.newInstance(appId, 1);
@@ -451,10 +526,16 @@ public class TestJobHistoryEventHandler 
 class JHEvenHandlerForTest extends JobHistoryEventHandler {
 
   private EventWriter eventWriter;
+  private boolean mockHistoryProcessing = true;
   public JHEvenHandlerForTest(AppContext context, int startCount) {
     super(context, startCount);
   }
 
+  public JHEvenHandlerForTest(AppContext context, int startCount, boolean mockHistoryProcessing) {
+    super(context, startCount);
+    this.mockHistoryProcessing = mockHistoryProcessing;
+  }
+
   @Override
   protected void serviceStart() {
   }
@@ -462,7 +543,12 @@ class JHEvenHandlerForTest extends JobHi
   @Override
   protected EventWriter createEventWriter(Path historyFilePath)
       throws IOException {
-    this.eventWriter = mock(EventWriter.class);
+    if (mockHistoryProcessing) {
+      this.eventWriter = mock(EventWriter.class);
+    }
+    else {
+      this.eventWriter = super.createEventWriter(historyFilePath);
+    }
     return this.eventWriter;
   }
 
@@ -475,8 +561,13 @@ class JHEvenHandlerForTest extends JobHi
   }
 
   @Override
-  protected void processDoneFiles(JobId jobId){
-    // do nothing
+  protected void processDoneFiles(JobId jobId) throws IOException {
+    if (!mockHistoryProcessing) {
+      super.processDoneFiles(jobId);
+    }
+    else {
+      // do nothing
+    }
   }
 }
 

Modified: hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestJobEndNotifier.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestJobEndNotifier.java?rev=1618700&r1=1618699&r2=1618700&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestJobEndNotifier.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestJobEndNotifier.java Mon Aug 18 18:41:31 2014
@@ -270,7 +270,8 @@ public class TestJobEndNotifier extends 
     app.waitForInternalState(job, JobStateInternal.REBOOT);
     // Now shutdown. User should see FAILED state.
     // Unregistration fails: isLastAMRetry is recalculated, this is
-    app.shutDownJob();
+    ///reboot will stop service internally, we don't need to shutdown twice
+    app.waitForServiceToStop(10000);
     Assert.assertFalse(app.isLastAMRetry());
     // Since it's not last retry, JobEndServlet didn't called
     Assert.assertEquals(0, JobEndServlet.calledTimes);

Modified: hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java?rev=1618700&r1=1618699&r2=1618700&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java Mon Aug 18 18:41:31 2014
@@ -536,7 +536,7 @@ public class TestJobImpl {
 
     // Verify access
     JobImpl job1 = new JobImpl(jobId, null, conf1, null, null, null, null, null,
-        null, null, null, true, null, 0, null, null, null, null);
+        null, null, null, true, user1, 0, null, null, null, null);
     Assert.assertTrue(job1.checkAccess(ugi1, JobACL.VIEW_JOB));
     Assert.assertFalse(job1.checkAccess(ugi2, JobACL.VIEW_JOB));
 
@@ -547,7 +547,7 @@ public class TestJobImpl {
 
     // Verify access
     JobImpl job2 = new JobImpl(jobId, null, conf2, null, null, null, null, null,
-        null, null, null, true, null, 0, null, null, null, null);
+        null, null, null, true, user1, 0, null, null, null, null);
     Assert.assertTrue(job2.checkAccess(ugi1, JobACL.VIEW_JOB));
     Assert.assertTrue(job2.checkAccess(ugi2, JobACL.VIEW_JOB));
 
@@ -558,7 +558,7 @@ public class TestJobImpl {
 
     // Verify access
     JobImpl job3 = new JobImpl(jobId, null, conf3, null, null, null, null, null,
-        null, null, null, true, null, 0, null, null, null, null);
+        null, null, null, true, user1, 0, null, null, null, null);
     Assert.assertTrue(job3.checkAccess(ugi1, JobACL.VIEW_JOB));
     Assert.assertTrue(job3.checkAccess(ugi2, JobACL.VIEW_JOB));
 
@@ -569,7 +569,7 @@ public class TestJobImpl {
 
     // Verify access
     JobImpl job4 = new JobImpl(jobId, null, conf4, null, null, null, null, null,
-        null, null, null, true, null, 0, null, null, null, null);
+        null, null, null, true, user1, 0, null, null, null, null);
     Assert.assertTrue(job4.checkAccess(ugi1, JobACL.VIEW_JOB));
     Assert.assertTrue(job4.checkAccess(ugi2, JobACL.VIEW_JOB));
 
@@ -580,7 +580,7 @@ public class TestJobImpl {
 
     // Verify access
     JobImpl job5 = new JobImpl(jobId, null, conf5, null, null, null, null, null,
-        null, null, null, true, null, 0, null, null, null, null);
+        null, null, null, true, user1, 0, null, null, null, null);
     Assert.assertTrue(job5.checkAccess(ugi1, null));
     Assert.assertTrue(job5.checkAccess(ugi2, null));
   }

Modified: hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JobHistoryUtils.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JobHistoryUtils.java?rev=1618700&r1=1618699&r2=1618700&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JobHistoryUtils.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JobHistoryUtils.java Mon Aug 18 18:41:31 2014
@@ -22,20 +22,24 @@ import java.io.File;
 import java.io.IOException;
 import java.util.Calendar;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
-
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.fs.FileContext;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.fs.UnsupportedFileSystemException;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.mapreduce.JobID;
 import org.apache.hadoop.mapreduce.MRJobConfig;
@@ -117,6 +121,7 @@ public class JobHistoryUtils {
   public static final String TIMESTAMP_DIR_REGEX = "\\d{4}" + "\\" + Path.SEPARATOR +  "\\d{2}" + "\\" + Path.SEPARATOR + "\\d{2}";
   public static final Pattern TIMESTAMP_DIR_PATTERN = Pattern.compile(TIMESTAMP_DIR_REGEX);
   private static final String TIMESTAMP_DIR_FORMAT = "%04d" + File.separator + "%02d" + File.separator + "%02d";
+  private static final Log LOG = LogFactory.getLog(JobHistoryUtils.class);
 
   private static final PathFilter CONF_FILTER = new PathFilter() {
     @Override
@@ -183,7 +188,7 @@ public class JobHistoryUtils {
     Path stagingPath = MRApps.getStagingAreaDir(conf, user);
     Path path = new Path(stagingPath, jobId);
     String logDir = path.toString();
-    return logDir;
+    return ensurePathInDefaultFileSystem(logDir, conf);
   }
   
   /**
@@ -200,7 +205,7 @@ public class JobHistoryUtils {
           MRJobConfig.DEFAULT_MR_AM_STAGING_DIR)
           + "/history/done_intermediate";
     }
-    return doneDirPrefix;
+    return ensurePathInDefaultFileSystem(doneDirPrefix, conf);
   }
   
   /**
@@ -216,7 +221,69 @@ public class JobHistoryUtils {
           MRJobConfig.DEFAULT_MR_AM_STAGING_DIR)
           + "/history/done";
     }
-    return doneDirPrefix;
+    return ensurePathInDefaultFileSystem(doneDirPrefix, conf);
+  }
+
+  /**
+   * Get default file system URI for the cluster (used to ensure consistency
+   * of history done/staging locations) over different context
+   *
+   * @return Default file context
+   */
+  private static FileContext getDefaultFileContext() {
+    // If FS_DEFAULT_NAME_KEY was set solely by core-default.xml then we ignore
+    // ignore it. This prevents defaulting history paths to file system specified
+    // by core-default.xml which would not make sense in any case. For a test
+    // case to exploit this functionality it should create core-site.xml
+    FileContext fc = null;
+    Configuration defaultConf = new Configuration();
+    String[] sources;
+    sources = defaultConf.getPropertySources(
+        CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY);
+    if (sources != null &&
+        (!Arrays.asList(sources).contains("core-default.xml") ||
+        sources.length > 1)) {
+      try {
+        fc = FileContext.getFileContext(defaultConf);
+        LOG.info("Default file system [" +
+                  fc.getDefaultFileSystem().getUri() + "]");
+      } catch (UnsupportedFileSystemException e) {
+        LOG.error("Unable to create default file context [" +
+            defaultConf.get(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY) +
+            "]",
+            e);
+      }
+    }
+    else {
+      LOG.info("Default file system is set solely " +
+          "by core-default.xml therefore -  ignoring");
+    }
+
+    return fc;
+  }
+
+  /**
+   * Ensure that path belongs to cluster's default file system unless
+   * 1. it is already fully qualified.
+   * 2. current job configuration uses default file system
+   * 3. running from a test case without core-site.xml
+   *
+   * @param sourcePath source path
+   * @param conf the job configuration
+   * @return full qualified path (if necessary) in default file system
+   */
+  private static String ensurePathInDefaultFileSystem(String sourcePath, Configuration conf) {
+    Path path = new Path(sourcePath);
+    FileContext fc = getDefaultFileContext();
+    if (fc == null ||
+        fc.getDefaultFileSystem().getUri().toString().equals(
+            conf.get(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, "")) ||
+        path.toUri().getAuthority() != null ||
+        path.toUri().getScheme()!= null) {
+      return sourcePath;
+    }
+
+    return fc.makeQualified(path).toString();
   }
 
   /**

Modified: hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/InputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/InputFormat.java?rev=1618700&r1=1618699&r2=1618700&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/InputFormat.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/InputFormat.java Mon Aug 18 18:41:31 2014
@@ -50,7 +50,7 @@ import org.apache.hadoop.fs.FileSystem;
  * bytes, of the input files. However, the {@link FileSystem} blocksize of  
  * the input files is treated as an upper bound for input splits. A lower bound 
  * on the split size can be set via 
- * <a href="{@docRoot}/../mapred-default.html#mapreduce.input.fileinputformat.split.minsize">
+ * <a href="{@docRoot}/../hadoop-mapreduce-client/hadoop-mapreduce-client-core/mapred-default.xml#mapreduce.input.fileinputformat.split.minsize">
  * mapreduce.input.fileinputformat.split.minsize</a>.</p>
  * 
  * <p>Clearly, logical splits based on input-size is insufficient for many 

Modified: hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TaskCompletionEvent.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TaskCompletionEvent.java?rev=1618700&r1=1618699&r2=1618700&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TaskCompletionEvent.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TaskCompletionEvent.java Mon Aug 18 18:41:31 2014
@@ -90,8 +90,8 @@ public class TaskCompletionEvent 
   }
   
   /**
-   * Returns enum Status.SUCESS or Status.FAILURE.
-   * @return task tracker status
+   * Returns {@link Status}
+   * @return task completion status
    */
   public Status getTaskStatus() {
     return Status.valueOf(super.getStatus().name());

Modified: hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/join/CompositeInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/join/CompositeInputFormat.java?rev=1618700&r1=1618699&r2=1618700&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/join/CompositeInputFormat.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/join/CompositeInputFormat.java Mon Aug 18 18:41:31 2014
@@ -36,7 +36,6 @@ import org.apache.hadoop.mapred.Reporter
 /**
  * An InputFormat capable of performing joins over a set of data sources sorted
  * and partitioned the same way.
- * @see #setFormat
  *
  * A user may define new join types by setting the property
  * <tt>mapred.join.define.&lt;ident&gt;</tt> to a classname. In the expression
@@ -44,6 +43,7 @@ import org.apache.hadoop.mapred.Reporter
  * ComposableRecordReader.
  * <tt>mapred.join.keycomparator</tt> can be a classname used to compare keys
  * in the join.
+ * @see #setFormat
  * @see JoinRecordReader
  * @see MultiFilterRecordReader
  */

Modified: hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/InputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/InputFormat.java?rev=1618700&r1=1618699&r2=1618700&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/InputFormat.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/InputFormat.java Mon Aug 18 18:41:31 2014
@@ -52,7 +52,7 @@ import org.apache.hadoop.mapreduce.lib.i
  * bytes, of the input files. However, the {@link FileSystem} blocksize of  
  * the input files is treated as an upper bound for input splits. A lower bound 
  * on the split size can be set via 
- * <a href="{@docRoot}/../mapred-default.html#mapreduce.input.fileinputformat.split.minsize">
+ * <a href="{@docRoot}/../hadoop-mapreduce-client/hadoop-mapreduce-client-core/mapred-default.xml#mapreduce.input.fileinputformat.split.minsize">
  * mapreduce.input.fileinputformat.split.minsize</a>.</p>
  * 
  * <p>Clearly, logical splits based on input-size is insufficient for many 

Modified: hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java?rev=1618700&r1=1618699&r2=1618700&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java Mon Aug 18 18:41:31 2014
@@ -54,7 +54,7 @@ import org.apache.hadoop.util.StringUtil
  * <p>Here is an example on how to submit a job:</p>
  * <p><blockquote><pre>
  *     // Create a new Job
- *     Job job = new Job(new Configuration());
+ *     Job job = Job.getInstance();
  *     job.setJarByClass(MyJob.class);
  *     
  *     // Specify various job-specific parameters     
@@ -113,16 +113,25 @@ public class Job extends JobContextImpl 
   private long statustime;
   private Cluster cluster;
 
+  /**
+   * @deprecated Use {@link #getInstance()}
+   */
   @Deprecated
   public Job() throws IOException {
     this(new Configuration());
   }
 
+  /**
+   * @deprecated Use {@link #getInstance(Configuration)}
+   */
   @Deprecated
   public Job(Configuration conf) throws IOException {
     this(new JobConf(conf));
   }
 
+  /**
+   * @deprecated Use {@link #getInstance(Configuration, String)}
+   */
   @Deprecated
   public Job(Configuration conf, String jobName) throws IOException {
     this(conf);

Modified: hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java?rev=1618700&r1=1618699&r2=1618700&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java Mon Aug 18 18:41:31 2014
@@ -265,6 +265,7 @@ public interface MRJobConfig {
   public static final String REDUCE_MEMORY_TOTAL_BYTES = "mapreduce.reduce.memory.totalbytes";
 
   public static final String SHUFFLE_INPUT_BUFFER_PERCENT = "mapreduce.reduce.shuffle.input.buffer.percent";
+  public static final float DEFAULT_SHUFFLE_INPUT_BUFFER_PERCENT = 0.70f;
 
   public static final String SHUFFLE_MEMORY_LIMIT_PERCENT
     = "mapreduce.reduce.shuffle.memory.limit.percent";
@@ -292,11 +293,19 @@ public interface MRJobConfig {
   public static final String SHUFFLE_READ_TIMEOUT = "mapreduce.reduce.shuffle.read.timeout";
 
   public static final String SHUFFLE_FETCH_FAILURES = "mapreduce.reduce.shuffle.maxfetchfailures";
+  public static final String MAX_ALLOWED_FETCH_FAILURES_FRACTION = "mapreduce.reduce.shuffle.max-fetch-failures-fraction";
+  public static final float DEFAULT_MAX_ALLOWED_FETCH_FAILURES_FRACTION = 0.5f;
+  
+  public static final String MAX_FETCH_FAILURES_NOTIFICATIONS = "mapreduce.reduce.shuffle.max-fetch-failures-notifications";
+  public static final int DEFAULT_MAX_FETCH_FAILURES_NOTIFICATIONS = 3;
 
   public static final String SHUFFLE_NOTIFY_READERROR = "mapreduce.reduce.shuffle.notify.readerror";
   
   public static final String MAX_SHUFFLE_FETCH_RETRY_DELAY = "mapreduce.reduce.shuffle.retry-delay.max.ms";
   public static final long DEFAULT_MAX_SHUFFLE_FETCH_RETRY_DELAY = 60000;
+  
+  public static final String MAX_SHUFFLE_FETCH_HOST_FAILURES = "mapreduce.reduce.shuffle.max-host-failures";
+  public static final int DEFAULT_MAX_SHUFFLE_FETCH_HOST_FAILURES = 5;
 
   public static final String REDUCE_SKIP_INCR_PROC_COUNT = "mapreduce.reduce.skip.proc-count.auto-incr";
 

Modified: hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/TaskCompletionEvent.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/TaskCompletionEvent.java?rev=1618700&r1=1618699&r2=1618700&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/TaskCompletionEvent.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/TaskCompletionEvent.java Mon Aug 18 18:41:31 2014
@@ -95,8 +95,8 @@ public class TaskCompletionEvent impleme
   }
   
   /**
-   * Returns enum Status.SUCESS or Status.FAILURE.
-   * @return task tracker status
+   * Returns {@link Status}
+   * @return task completion status
    */
   public Status getStatus() {
     return status;

Modified: hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/KeyValueTextInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/KeyValueTextInputFormat.java?rev=1618700&r1=1618699&r2=1618700&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/KeyValueTextInputFormat.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/KeyValueTextInputFormat.java Mon Aug 18 18:41:31 2014
@@ -38,6 +38,9 @@ import org.apache.hadoop.mapreduce.TaskA
  * Either line feed or carriage-return are used to signal end of line. 
  * Each line is divided into key and value parts by a separator byte. If no
  * such a byte exists, the key will be the entire line and value will be empty.
+ * The separator byte can be specified in config file under the attribute name
+ * mapreduce.input.keyvaluelinerecordreader.key.value.separator. The default
+ * is the tab character ('\t').
  */
 @InterfaceAudience.Public
 @InterfaceStability.Stable

Modified: hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/join/CompositeInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/join/CompositeInputFormat.java?rev=1618700&r1=1618699&r2=1618700&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/join/CompositeInputFormat.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/join/CompositeInputFormat.java Mon Aug 18 18:41:31 2014
@@ -39,7 +39,6 @@ import org.apache.hadoop.mapreduce.TaskA
 /**
  * An InputFormat capable of performing joins over a set of data sources sorted
  * and partitioned the same way.
- * @see #setFormat
  *
  * A user may define new join types by setting the property
  * <tt>mapreduce.join.define.&lt;ident&gt;</tt> to a classname. 
@@ -47,6 +46,7 @@ import org.apache.hadoop.mapreduce.TaskA
  * assumed to be a ComposableRecordReader.
  * <tt>mapreduce.join.keycomparator</tt> can be a classname used to compare 
  * keys in the join.
+ * @see #setFormat
  * @see JoinRecordReader
  * @see MultiFilterRecordReader
  */

Modified: hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java?rev=1618700&r1=1618699&r2=1618700&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java Mon Aug 18 18:41:31 2014
@@ -323,6 +323,7 @@ class Fetcher<K,V> extends Thread {
 
       // If connect did not succeed, just mark all the maps as failed,
       // indirectly penalizing the host
+      scheduler.hostFailed(host.getHostName());
       for(TaskAttemptID left: remaining) {
         scheduler.copyFailed(left, host, false, connectExcpt);
       }
@@ -347,6 +348,7 @@ class Fetcher<K,V> extends Thread {
       
       if(failedTasks != null && failedTasks.length > 0) {
         LOG.warn("copyMapOutput failed for tasks "+Arrays.toString(failedTasks));
+        scheduler.hostFailed(host.getHostName());
         for(TaskAttemptID left: failedTasks) {
           scheduler.copyFailed(left, host, true, false);
         }

Modified: hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl.java?rev=1618700&r1=1618699&r2=1618700&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl.java Mon Aug 18 18:41:31 2014
@@ -158,7 +158,8 @@ public class MergeManagerImpl<K, V> impl
     this.rfs = ((LocalFileSystem)localFS).getRaw();
     
     final float maxInMemCopyUse =
-      jobConf.getFloat(MRJobConfig.SHUFFLE_INPUT_BUFFER_PERCENT, 0.90f);
+      jobConf.getFloat(MRJobConfig.SHUFFLE_INPUT_BUFFER_PERCENT,
+          MRJobConfig.DEFAULT_SHUFFLE_INPUT_BUFFER_PERCENT);
     if (maxInMemCopyUse > 1.0 || maxInMemCopyUse < 0.0) {
       throw new IllegalArgumentException("Invalid value for " +
           MRJobConfig.SHUFFLE_INPUT_BUFFER_PERCENT + ": " +
@@ -199,7 +200,7 @@ public class MergeManagerImpl<K, V> impl
              "memToMemMergeOutputsThreshold=" + memToMemMergeOutputsThreshold);
 
     if (this.maxSingleShuffleLimit >= this.mergeThreshold) {
-      throw new RuntimeException("Invlaid configuration: "
+      throw new RuntimeException("Invalid configuration: "
           + "maxSingleShuffleLimit should be less than mergeThreshold"
           + "maxSingleShuffleLimit: " + this.maxSingleShuffleLimit
           + "mergeThreshold: " + this.mergeThreshold);

Modified: hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleSchedulerImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleSchedulerImpl.java?rev=1618700&r1=1618699&r2=1618700&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleSchedulerImpl.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleSchedulerImpl.java Mon Aug 18 18:41:31 2014
@@ -18,7 +18,6 @@
 package org.apache.hadoop.mapreduce.task.reduce;
 
 import java.io.IOException;
-
 import java.net.InetAddress;
 import java.net.URI;
 import java.net.UnknownHostException;
@@ -101,6 +100,7 @@ public class ShuffleSchedulerImpl<K,V> i
 
   private final boolean reportReadErrorImmediately;
   private long maxDelay = MRJobConfig.DEFAULT_MAX_SHUFFLE_FETCH_RETRY_DELAY;
+  private int maxHostFailures;
 
   public ShuffleSchedulerImpl(JobConf job, TaskStatus status,
                           TaskAttemptID reduceId,
@@ -132,6 +132,9 @@ public class ShuffleSchedulerImpl<K,V> i
 
     this.maxDelay = job.getLong(MRJobConfig.MAX_SHUFFLE_FETCH_RETRY_DELAY,
         MRJobConfig.DEFAULT_MAX_SHUFFLE_FETCH_RETRY_DELAY);
+    this.maxHostFailures = job.getInt(
+        MRJobConfig.MAX_SHUFFLE_FETCH_HOST_FAILURES,
+        MRJobConfig.DEFAULT_MAX_SHUFFLE_FETCH_HOST_FAILURES);
   }
 
   @Override
@@ -213,9 +216,18 @@ public class ShuffleSchedulerImpl<K,V> i
     progress.setStatus("copy(" + mapsDone + " of " + totalMaps + " at "
         + mbpsFormat.format(transferRate) + " MB/s)");
   }
+  
+  public synchronized void hostFailed(String hostname) {
+    if (hostFailures.containsKey(hostname)) {
+      IntWritable x = hostFailures.get(hostname);
+      x.set(x.get() + 1);
+    } else {
+      hostFailures.put(hostname, new IntWritable(1));
+    }
+  }
 
   public synchronized void copyFailed(TaskAttemptID mapId, MapHost host,
-                                      boolean readError, boolean connectExcpt) {
+      boolean readError, boolean connectExcpt) {
     host.penalize();
     int failures = 1;
     if (failureCounts.containsKey(mapId)) {
@@ -226,12 +238,9 @@ public class ShuffleSchedulerImpl<K,V> i
       failureCounts.put(mapId, new IntWritable(1));
     }
     String hostname = host.getHostName();
-    if (hostFailures.containsKey(hostname)) {
-      IntWritable x = hostFailures.get(hostname);
-      x.set(x.get() + 1);
-    } else {
-      hostFailures.put(hostname, new IntWritable(1));
-    }
+    //report failure if already retried maxHostFailures times
+    boolean hostFail = hostFailures.get(hostname).get() > getMaxHostFailures() ? true : false;
+    
     if (failures >= abortFailureLimit) {
       try {
         throw new IOException(failures + " failures downloading " + mapId);
@@ -240,7 +249,7 @@ public class ShuffleSchedulerImpl<K,V> i
       }
     }
 
-    checkAndInformJobTracker(failures, mapId, readError, connectExcpt);
+    checkAndInformJobTracker(failures, mapId, readError, connectExcpt, hostFail);
 
     checkReducerHealth();
 
@@ -270,9 +279,9 @@ public class ShuffleSchedulerImpl<K,V> i
   // after every 'maxFetchFailuresBeforeReporting' failures
   private void checkAndInformJobTracker(
       int failures, TaskAttemptID mapId, boolean readError,
-      boolean connectExcpt) {
+      boolean connectExcpt, boolean hostFailed) {
     if (connectExcpt || (reportReadErrorImmediately && readError)
-        || ((failures % maxFetchFailuresBeforeReporting) == 0)) {
+        || ((failures % maxFetchFailuresBeforeReporting) == 0) || hostFailed) {
       LOG.info("Reporting fetch failure for " + mapId + " to jobtracker.");
       status.addFetchFailedMap((org.apache.hadoop.mapred.TaskAttemptID) mapId);
     }
@@ -507,4 +516,7 @@ public class ShuffleSchedulerImpl<K,V> i
     referee.join();
   }
 
+  public int getMaxHostFailures() {
+    return maxHostFailures;
+  }
 }

Modified: hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml?rev=1618700&r1=1618699&r2=1618700&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml (original)
+++ hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml Mon Aug 18 18:41:31 2014
@@ -1225,9 +1225,9 @@
 
 <property>
    <name>mapreduce.job.classloader.system.classes</name>
-   <value>java.,javax.,org.apache.commons.logging.,org.apache.log4j.,
-          org.apache.hadoop.,core-default.xml,hdfs-default.xml,
-          mapred-default.xml,yarn-default.xml</value>
+   <value>java.,javax.,org.w3c.dom.,org.xml.sax.,org.apache.commons.logging.,
+          org.apache.log4j.,org.apache.hadoop.,core-default.xml,
+          hdfs-default.xml,mapred-default.xml,yarn-default.xml</value>
   <description>A comma-separated list of classes that should be loaded from the
     system classpath, not the user-supplied JARs, when mapreduce.job.classloader
     is enabled. Names ending in '.' (period) are treated as package names,

Propchange: hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
------------------------------------------------------------------------------
  Merged /hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml:r1617528-1618693

Modified: hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/markdown/DistCp.md.vm
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/markdown/DistCp.md.vm?rev=1618700&r1=1618699&r2=1618700&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/markdown/DistCp.md.vm (original)
+++ hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/markdown/DistCp.md.vm Mon Aug 18 18:41:31 2014
@@ -118,9 +118,9 @@ $H3 Basic Usage
 
 $H3 Update and Overwrite
 
-  `-update` is used to copy files from source that don't exist at the target,
-  or have different contents. `-overwrite` overwrites target-files even if they
-  exist at the source, or have the same contents.
+  `-update` is used to copy files from source that don't exist at the target
+  or differ than the target version. `-overwrite` overwrites target-files that
+  exist at the target.
 
   Update and Overwrite options warrant special attention, since their handling
   of source-paths varies from the defaults in a very subtle manner. Consider a
@@ -221,7 +221,7 @@ Flag              | Description         
 `-log <logdir>` | Write logs to \<logdir\> | DistCp keeps logs of each file it attempts to copy as map output. If a map fails, the log output will not be retained if it is re-executed.
 `-m <num_maps>` | Maximum number of simultaneous copies | Specify the number of maps to copy data. Note that more maps may not necessarily improve throughput.
 `-overwrite` | Overwrite destination | If a map fails and `-i` is not specified, all the files in the split, not only those that failed, will be recopied. As discussed in the Usage documentation, it also changes the semantics for generating destination paths, so users should use this carefully.
-`-update` | Overwrite if src size different from dst size | As noted in the preceding, this is not a "sync" operation. The only criterion examined is the source and destination file sizes; if they differ, the source file replaces the destination file. As discussed in the Usage documentation, it also changes the semantics for generating destination paths, so users should use this carefully.
+`-update` | Overwrite if source and destination differ in size, blocksize, or checksum | As noted in the preceding, this is not a "sync" operation. The criteria examined are the source and destination file sizes, blocksizes, and checksums; if they differ, the source file replaces the destination file. As discussed in the Usage documentation, it also changes the semantics for generating destination paths, so users should use this carefully.
 `-f <urilist_uri>` | Use list at \<urilist_uri\> as src list | This is equivalent to listing each source on the command line. The `urilist_uri` list should be a fully qualified URI.
 `-filelimit <n>` | Limit the total number of files to be <= n | **Deprecated!** Ignored in the new DistCp.
 `-sizelimit <n>` | Limit the total size to be <= n bytes | **Deprecated!** Ignored in the new DistCp.

Modified: hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/markdown/HadoopArchives.md.vm
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/markdown/HadoopArchives.md.vm?rev=1618700&r1=1618699&r2=1618700&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/markdown/HadoopArchives.md.vm (original)
+++ hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/markdown/HadoopArchives.md.vm Mon Aug 18 18:41:31 2014
@@ -20,6 +20,7 @@ Hadoop Archives Guide
  - [Overview](#Overview)
  - [How to Create an Archive](#How_to_Create_an_Archive)
  - [How to Look Up Files in Archives](#How_to_Look_Up_Files_in_Archives)
+ - [How to Unarchive an Archive](#How_to_Unarchive_an_Archive)
  - [Archives Examples](#Archives_Examples)
      - [Creating an Archive](#Creating_an_Archive)
      - [Looking Up Files](#Looking_Up_Files)
@@ -70,6 +71,20 @@ How to Look Up Files in Archives
 
   `har:///archivepath/fileinarchive`
 
+How to Unarchive an Archive
+---------------------------
+
+  Since all the fs shell commands in the archives work transparently,
+  unarchiving is just a matter of copying.
+
+  To unarchive sequentially:
+
+  `hdfs dfs -cp har:///user/zoo/foo.har/dir1 hdfs:/user/zoo/newdir`
+
+  To unarchive in parallel, use DistCp:
+
+  `hadoop distcp har:///user/zoo/foo.har/dir1 hdfs:/user/zoo/newdir`
+
 Archives Examples
 -----------------
 

Modified: hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryServerFileSystemStateStoreService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryServerFileSystemStateStoreService.java?rev=1618700&r1=1618699&r2=1618700&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryServerFileSystemStateStoreService.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryServerFileSystemStateStoreService.java Mon Aug 18 18:41:31 2014
@@ -24,6 +24,8 @@ import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.FileNotFoundException;
 import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -64,6 +66,7 @@ public class HistoryServerFileSystemStat
   private static final String TOKEN_MASTER_KEY_FILE_PREFIX = "key_";
   private static final String TOKEN_FILE_PREFIX = "token_";
   private static final String TMP_FILE_PREFIX = "tmp-";
+  private static final String UPDATE_TMP_FILE_PREFIX = "update-";
   private static final FsPermission DIR_PERMISSIONS =
       new FsPermission((short)0700);
   private static final FsPermission FILE_PERMISSIONS = Shell.WINDOWS
@@ -90,7 +93,7 @@ public class HistoryServerFileSystemStat
 
   @Override
   protected void startStorage() throws IOException {
-    fs = rootStatePath.getFileSystem(getConfig());
+    fs = createFileSystem();
     createDir(rootStatePath);
     tokenStatePath = new Path(rootStatePath, TOKEN_STATE_DIR_NAME);
     createDir(tokenStatePath);
@@ -101,6 +104,10 @@ public class HistoryServerFileSystemStat
     }
   }
 
+  FileSystem createFileSystem() throws IOException {
+    return rootStatePath.getFileSystem(getConfig());
+  }
+
   @Override
   protected void closeStorage() throws IOException {
     // don't close the filesystem as it's part of the filesystem cache
@@ -127,7 +134,7 @@ public class HistoryServerFileSystemStat
       throw new IOException(tokenPath + " already exists");
     }
 
-    createFile(tokenPath, buildTokenData(tokenId, renewDate));
+    createNewFile(tokenPath, buildTokenData(tokenId, renewDate));
   }
 
   @Override
@@ -136,7 +143,25 @@ public class HistoryServerFileSystemStat
     if (LOG.isDebugEnabled()) {
       LOG.debug("Updating token " + tokenId.getSequenceNumber());
     }
-    createFile(getTokenPath(tokenId), buildTokenData(tokenId, renewDate));
+
+    // Files cannot be atomically replaced, therefore we write a temporary
+    // update file, remove the original token file, then rename the update
+    // file to the token file. During recovery either the token file will be
+    // used or if that is missing and an update file is present then the
+    // update file is used.
+    Path tokenPath = getTokenPath(tokenId);
+    Path tmp = new Path(tokenPath.getParent(),
+        UPDATE_TMP_FILE_PREFIX + tokenPath.getName());
+    writeFile(tmp, buildTokenData(tokenId, renewDate));
+    try {
+      deleteFile(tokenPath);
+    } catch (IOException e) {
+      fs.delete(tmp, false);
+      throw e;
+    }
+    if (!fs.rename(tmp, tokenPath)) {
+      throw new IOException("Could not rename " + tmp + " to " + tokenPath);
+    }
   }
 
   @Override
@@ -168,7 +193,7 @@ public class HistoryServerFileSystemStat
       IOUtils.cleanup(LOG, dataStream);
     }
 
-    createFile(keyPath, memStream.toByteArray());
+    createNewFile(keyPath, memStream.toByteArray());
   }
 
   @Override
@@ -213,23 +238,33 @@ public class HistoryServerFileSystemStat
     }
   }
 
-  private void createFile(Path file, byte[] data) throws IOException {
-    final int WRITE_BUFFER_SIZE = 4096;
+  private void createNewFile(Path file, byte[] data)
+      throws IOException {
     Path tmp = new Path(file.getParent(), TMP_FILE_PREFIX + file.getName());
-    FSDataOutputStream out = fs.create(tmp, FILE_PERMISSIONS, true,
-        WRITE_BUFFER_SIZE, fs.getDefaultReplication(tmp),
-        fs.getDefaultBlockSize(tmp), null);
+    writeFile(tmp, data);
+    try {
+      if (!fs.rename(tmp, file)) {
+        throw new IOException("Could not rename " + tmp + " to " + file);
+      }
+    } catch (IOException e) {
+      fs.delete(tmp, false);
+      throw e;
+    }
+  }
+
+  private void writeFile(Path file, byte[] data) throws IOException {
+    final int WRITE_BUFFER_SIZE = 4096;
+    FSDataOutputStream out = fs.create(file, FILE_PERMISSIONS, true,
+        WRITE_BUFFER_SIZE, fs.getDefaultReplication(file),
+        fs.getDefaultBlockSize(file), null);
     try {
       try {
         out.write(data);
       } finally {
         IOUtils.cleanup(LOG, out);
       }
-      if (!fs.rename(tmp, file)) {
-        throw new IOException("Could not rename " + tmp + " to " + file);
-      }
     } catch (IOException e) {
-      fs.delete(tmp, false);
+      fs.delete(file, false);
       throw e;
     }
   }
@@ -284,6 +319,19 @@ public class HistoryServerFileSystemStat
     state.tokenMasterKeyState.add(key);
   }
 
+  private void loadTokenFromBucket(int bucketId,
+      HistoryServerState state, Path tokenFile, long numTokenFileBytes)
+          throws IOException {
+    MRDelegationTokenIdentifier token =
+        loadToken(state, tokenFile, numTokenFileBytes);
+    int tokenBucketId = getBucketId(token);
+    if (tokenBucketId != bucketId) {
+      throw new IOException("Token " + tokenFile
+          + " should be in bucket " + tokenBucketId + ", found in bucket "
+          + bucketId);
+    }
+  }
+
   private MRDelegationTokenIdentifier loadToken(HistoryServerState state,
       Path tokenFile, long numTokenFileBytes) throws IOException {
     MRDelegationTokenIdentifier tokenId = new MRDelegationTokenIdentifier();
@@ -308,18 +356,29 @@ public class HistoryServerFileSystemStat
     final int bucketId = Integer.parseInt(numStr);
     int numTokens = 0;
     FileStatus[] tokenStats = fs.listStatus(bucket);
+    Set<String> loadedTokens = new HashSet<String>(tokenStats.length);
     for (FileStatus stat : tokenStats) {
       String name = stat.getPath().getName();
       if (name.startsWith(TOKEN_FILE_PREFIX)) {
-        MRDelegationTokenIdentifier token =
-            loadToken(state, stat.getPath(), stat.getLen());
-        int tokenBucketId = getBucketId(token);
-        if (tokenBucketId != bucketId) {
-          throw new IOException("Token " + stat.getPath()
-              + " should be in bucket " + tokenBucketId + ", found in bucket "
-              + bucketId);
-        }
+        loadTokenFromBucket(bucketId, state, stat.getPath(), stat.getLen());
+        loadedTokens.add(name);
         ++numTokens;
+      } else if (name.startsWith(UPDATE_TMP_FILE_PREFIX)) {
+        String tokenName = name.substring(UPDATE_TMP_FILE_PREFIX.length());
+        if (loadedTokens.contains(tokenName)) {
+          // already have the token, update may be partial so ignore it
+          fs.delete(stat.getPath(), false);
+        } else {
+          // token is missing, so try to parse the update temp file
+          loadTokenFromBucket(bucketId, state, stat.getPath(), stat.getLen());
+          fs.rename(stat.getPath(),
+              new Path(stat.getPath().getParent(), tokenName));
+          loadedTokens.add(tokenName);
+          ++numTokens;
+        }
+      } else if (name.startsWith(TMP_FILE_PREFIX)) {
+        // cleanup incomplete temp files
+        fs.delete(stat.getPath(), false);
       } else {
         LOG.warn("Skipping unexpected file in history server token bucket: "
             + stat.getPath());

Modified: hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestHistoryFileManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestHistoryFileManager.java?rev=1618700&r1=1618699&r2=1618700&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestHistoryFileManager.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestHistoryFileManager.java Mon Aug 18 18:41:31 2014
@@ -19,42 +19,74 @@
 package org.apache.hadoop.mapreduce.v2.hs;
 
 
+import java.io.File;
+import java.io.FileOutputStream;
+import java.util.UUID;
 import org.junit.Assert;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.mapreduce.v2.app.ControlledClock;
 import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig;
+import org.apache.hadoop.test.CoreTestDriver;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.apache.hadoop.yarn.util.Clock;
 import org.apache.hadoop.yarn.util.SystemClock;
+import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
+import org.junit.Rule;
 import org.junit.Test;
-
-import java.util.UUID;
+import org.junit.rules.TestName;
 
 public class TestHistoryFileManager {
   private static MiniDFSCluster dfsCluster = null;
+  private static MiniDFSCluster dfsCluster2 = null;
+  private static String coreSitePath;
+
+  @Rule
+  public TestName name = new TestName();
 
   @BeforeClass
   public static void setUpClass() throws Exception {
+    coreSitePath = "." + File.separator + "target" + File.separator +
+            "test-classes" + File.separator + "core-site.xml";
     Configuration conf = new HdfsConfiguration();
+    Configuration conf2 = new HdfsConfiguration();
     dfsCluster = new MiniDFSCluster.Builder(conf).build();
+    conf2.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR,
+            conf.get(MiniDFSCluster.HDFS_MINIDFS_BASEDIR) + "_2");
+    dfsCluster2 = new MiniDFSCluster.Builder(conf2).build();
   }
 
   @AfterClass
   public static void cleanUpClass() throws Exception {
     dfsCluster.shutdown();
+    dfsCluster2.shutdown();
+  }
+
+  @After
+  public void cleanTest() throws Exception {
+    new File(coreSitePath).delete();
+  }
+
+  private String getDoneDirNameForTest() {
+    return "/" + name.getMethodName();
+  }
+
+  private String getIntermediateDoneDirNameForTest() {
+    return "/intermediate_" + name.getMethodName();
   }
 
   private void testTryCreateHistoryDirs(Configuration conf, boolean expected)
       throws Exception {
-    conf.set(JHAdminConfig.MR_HISTORY_DONE_DIR, "/" + UUID.randomUUID());
-    conf.set(JHAdminConfig.MR_HISTORY_INTERMEDIATE_DONE_DIR, "/" + UUID.randomUUID());
+    conf.set(JHAdminConfig.MR_HISTORY_DONE_DIR, getDoneDirNameForTest());
+    conf.set(JHAdminConfig.MR_HISTORY_INTERMEDIATE_DONE_DIR, getIntermediateDoneDirNameForTest());
     HistoryFileManager hfm = new HistoryFileManager();
     hfm.conf = conf;
     Assert.assertEquals(expected, hfm.tryCreatingHistoryDirs(false));
@@ -76,6 +108,36 @@ public class TestHistoryFileManager {
   }
 
   @Test
+  public void testCreateDirsWithAdditionalFileSystem() throws Exception {
+    dfsCluster.getFileSystem().setSafeMode(
+        HdfsConstants.SafeModeAction.SAFEMODE_LEAVE);
+    dfsCluster2.getFileSystem().setSafeMode(
+        HdfsConstants.SafeModeAction.SAFEMODE_LEAVE);
+    Assert.assertFalse(dfsCluster.getFileSystem().isInSafeMode());
+    Assert.assertFalse(dfsCluster2.getFileSystem().isInSafeMode());
+
+    // Set default configuration to the first cluster
+    Configuration conf = new Configuration(false);
+    conf.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY,
+            dfsCluster.getURI().toString());
+    FileOutputStream os = new FileOutputStream(coreSitePath);
+    conf.writeXml(os);
+    os.close();
+
+    testTryCreateHistoryDirs(dfsCluster2.getConfiguration(0), true);
+
+    // Directories should be created only in the default file system (dfsCluster)
+    Assert.assertTrue(dfsCluster.getFileSystem()
+            .exists(new Path(getDoneDirNameForTest())));
+    Assert.assertTrue(dfsCluster.getFileSystem()
+            .exists(new Path(getIntermediateDoneDirNameForTest())));
+    Assert.assertFalse(dfsCluster2.getFileSystem()
+            .exists(new Path(getDoneDirNameForTest())));
+    Assert.assertFalse(dfsCluster2.getFileSystem()
+            .exists(new Path(getIntermediateDoneDirNameForTest())));
+  }
+
+  @Test
   public void testCreateDirsWithFileSystemInSafeMode() throws Exception {
     dfsCluster.getFileSystem().setSafeMode(
         HdfsConstants.SafeModeAction.SAFEMODE_ENTER);

Modified: hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestHistoryServerFileSystemStateStoreService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestHistoryServerFileSystemStateStoreService.java?rev=1618700&r1=1618699&r2=1618700&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestHistoryServerFileSystemStateStoreService.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestHistoryServerFileSystemStateStoreService.java Mon Aug 18 18:41:31 2014
@@ -21,12 +21,19 @@ import static org.junit.Assert.assertEqu
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
+import static org.mockito.Mockito.argThat;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.isA;
+import static org.mockito.Mockito.spy;
 
 import java.io.File;
 import java.io.IOException;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.v2.api.MRDelegationTokenIdentifier;
 import org.apache.hadoop.mapreduce.v2.hs.HistoryServerStateStoreService.HistoryServerState;
@@ -35,6 +42,7 @@ import org.apache.hadoop.security.token.
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
+import org.mockito.ArgumentMatcher;
 
 public class TestHistoryServerFileSystemStateStoreService {
 
@@ -74,8 +82,8 @@ public class TestHistoryServerFileSystem
     return store;
   }
 
-  @Test
-  public void testTokenStore() throws IOException {
+  private void testTokenStore(String stateStoreUri) throws IOException {
+    conf.set(JHAdminConfig.MR_HS_FS_STATE_STORE_URI, stateStoreUri);
     HistoryServerStateStoreService store = createAndStartStore();
 
     HistoryServerState state = store.loadState();
@@ -161,4 +169,77 @@ public class TestHistoryServerFileSystem
     assertTrue("missing master key 3",
         state.tokenMasterKeyState.contains(key3));
   }
+
+  @Test
+  public void testTokenStore() throws IOException {
+    testTokenStore(testDir.getAbsoluteFile().toURI().toString());
+  }
+
+  @Test
+  public void testTokenStoreHdfs() throws IOException {
+    MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
+    conf = cluster.getConfiguration(0);
+    try {
+      testTokenStore("/tmp/historystore");
+    } finally {
+        cluster.shutdown();
+    }
+  }
+
+  @Test
+  public void testUpdatedTokenRecovery() throws IOException {
+    IOException intentionalErr = new IOException("intentional error");
+    FileSystem fs = FileSystem.getLocal(conf);
+    final FileSystem spyfs = spy(fs);
+    // make the update token process fail halfway through where we're left
+    // with just the temporary update file and no token file
+    ArgumentMatcher<Path> updateTmpMatcher = new ArgumentMatcher<Path>() {
+      @Override
+      public boolean matches(Object argument) {
+        if (argument instanceof Path) {
+          return ((Path) argument).getName().startsWith("update");
+        }
+        return false;
+      }
+    };
+    doThrow(intentionalErr)
+        .when(spyfs).rename(argThat(updateTmpMatcher), isA(Path.class));
+
+    conf.set(JHAdminConfig.MR_HS_FS_STATE_STORE_URI,
+        testDir.getAbsoluteFile().toURI().toString());
+    HistoryServerStateStoreService store =
+        new HistoryServerFileSystemStateStoreService() {
+          @Override
+          FileSystem createFileSystem() throws IOException {
+            return spyfs;
+          }
+    };
+    store.init(conf);
+    store.start();
+
+    final MRDelegationTokenIdentifier token1 =
+        new MRDelegationTokenIdentifier(new Text("tokenOwner1"),
+            new Text("tokenRenewer1"), new Text("tokenUser1"));
+    token1.setSequenceNumber(1);
+    final Long tokenDate1 = 1L;
+    store.storeToken(token1, tokenDate1);
+    final Long newTokenDate1 = 975318642L;
+    try {
+      store.updateToken(token1, newTokenDate1);
+      fail("intentional error not thrown");
+    } catch (IOException e) {
+      assertEquals(intentionalErr, e);
+    }
+    store.close();
+
+    // verify the update file is seen and parsed upon recovery when
+    // original token file is missing
+    store = createAndStartStore();
+    HistoryServerState state = store.loadState();
+    assertEquals("incorrect loaded token count", 1, state.tokenState.size());
+    assertTrue("missing token 1", state.tokenState.containsKey(token1));
+    assertEquals("incorrect token 1 date", newTokenDate1,
+        state.tokenState.get(token1));
+    store.close();
+  }
 }



Mime
View raw message