hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gkesa...@apache.org
Subject svn commit: r1369164 [2/7] - in /hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project: ./ conf/ dev-support/ hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/ hadoop-mapreduce-client/hadoop-mapreduc...
Date Fri, 03 Aug 2012 19:00:51 GMT
Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java?rev=1369164&r1=1369163&r2=1369164&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java Fri Aug  3 19:00:15 2012
@@ -21,7 +21,6 @@ package org.apache.hadoop.mapreduce.v2.a
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Comparator;
-import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -47,9 +46,6 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
 import org.apache.hadoop.mapreduce.v2.app.AppContext;
 import org.apache.hadoop.mapreduce.v2.app.client.ClientService;
-import org.apache.hadoop.mapreduce.v2.app.job.Job;
-import org.apache.hadoop.mapreduce.v2.app.job.Task;
-import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
 import org.apache.hadoop.mapreduce.v2.app.job.event.JobCounterUpdateEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.JobDiagnosticsUpdateEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent;
@@ -131,6 +127,7 @@ public class RMContainerAllocator extend
   private int containersReleased = 0;
   private int hostLocalAssigned = 0;
   private int rackLocalAssigned = 0;
+  private int lastCompletedTasks = 0;
   
   private boolean recalculateReduceSchedule = false;
   private int mapResourceReqt;//memory
@@ -214,11 +211,18 @@ public class RMContainerAllocator extend
       scheduledRequests.assign(allocatedContainers);
       LOG.info("After Assign: " + getStat());
     }
-    
+
+    int completedMaps = getJob().getCompletedMaps();
+    int completedTasks = completedMaps + getJob().getCompletedReduces();
+    if (lastCompletedTasks != completedTasks) {
+      lastCompletedTasks = completedTasks;
+      recalculateReduceSchedule = true;
+    }
+
     if (recalculateReduceSchedule) {
       preemptReducesIfNeeded();
       scheduleReduces(
-          getJob().getTotalMaps(), getJob().getCompletedMaps(),
+          getJob().getTotalMaps(), completedMaps,
           scheduledRequests.maps.size(), scheduledRequests.reduces.size(), 
           assignedRequests.maps.size(), assignedRequests.reduces.size(),
           mapResourceReqt, reduceResourceReqt,

Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/ConfBlock.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/ConfBlock.java?rev=1369164&r1=1369163&r2=1369164&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/ConfBlock.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/ConfBlock.java Fri Aug  3 19:00:15 2012
@@ -78,14 +78,29 @@ public class ConfBlock extends HtmlBlock
           tr().
             th(_TH, "key").
             th(_TH, "value").
+            th(_TH, "source chain").
           _().
         _().
       tbody();
       for (ConfEntryInfo entry : info.getProperties()) {
+        StringBuffer buffer = new StringBuffer();
+        String[] sources = entry.getSource();
+        //Skip the last entry, because it is always the same HDFS file, and
+        // output them in reverse order so most recent is output first
+        boolean first = true;
+        for(int i = (sources.length  - 2); i >= 0; i--) {
+          if(!first) {
+            // \u2B05 is an arrow <--
+            buffer.append(" \u2B05 ");
+          }
+          first = false;
+          buffer.append(sources[i]);
+        }
         tbody.
           tr().
             td(entry.getName()).
             td(entry.getValue()).
+            td(buffer.toString()).
           _();
       }
       tbody._().
@@ -93,6 +108,7 @@ public class ConfBlock extends HtmlBlock
         tr().
           th().input("search_init").$type(InputType.text).$name("key").$value("key")._()._().
           th().input("search_init").$type(InputType.text).$name("value").$value("value")._()._().
+          th().input("search_init").$type(InputType.text).$name("source chain").$value("source chain")._()._().
           _().
         _().
       _();

Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/dao/AMAttemptInfo.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/dao/AMAttemptInfo.java?rev=1369164&r1=1369163&r2=1369164&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/dao/AMAttemptInfo.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/dao/AMAttemptInfo.java Fri Aug  3 19:00:15 2012
@@ -64,7 +64,7 @@ public class AMAttemptInfo {
     if (containerId != null) {
       this.containerId = containerId.toString();
       this.logsLink = join("http://" + nodeHttpAddress,
-          ujoin("node", "containerlogs", this.containerId));
+          ujoin("node", "containerlogs", this.containerId, user));
     }
   }
 

Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/dao/ConfEntryInfo.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/dao/ConfEntryInfo.java?rev=1369164&r1=1369163&r2=1369164&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/dao/ConfEntryInfo.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/dao/ConfEntryInfo.java Fri Aug  3 19:00:15 2012
@@ -27,13 +27,19 @@ public class ConfEntryInfo {
 
   protected String name;
   protected String value;
+  protected String[] source;
 
   public ConfEntryInfo() {
   }
 
   public ConfEntryInfo(String key, String value) {
+    this(key, value, null);
+  }
+  
+  public ConfEntryInfo(String key, String value, String[] source) {
     this.name = key;
     this.value = value;
+    this.source = source;
   }
 
   public String getName() {
@@ -43,4 +49,8 @@ public class ConfEntryInfo {
   public String getValue() {
     return this.value;
   }
+  
+  public String[] getSource() {
+    return source;
+  }
 }

Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/dao/ConfInfo.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/dao/ConfInfo.java?rev=1369164&r1=1369163&r2=1369164&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/dao/ConfInfo.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/dao/ConfInfo.java Fri Aug  3 19:00:15 2012
@@ -46,7 +46,8 @@ public class ConfInfo {
     Configuration jobConf = job.loadConfFile();
     this.path = job.getConfFile().toString();
     for (Map.Entry<String, String> entry : jobConf) {
-      this.property.add(new ConfEntryInfo(entry.getKey(), entry.getValue()));
+      this.property.add(new ConfEntryInfo(entry.getKey(), entry.getValue(), 
+          jobConf.getPropertySources(entry.getKey())));
     }
 
   }

Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MockJobs.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MockJobs.java?rev=1369164&r1=1369163&r2=1369164&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MockJobs.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MockJobs.java Fri Aug  3 19:00:15 2012
@@ -603,7 +603,7 @@ public class MockJobs extends MockApps {
       public Configuration loadConfFile() throws IOException {
         FileContext fc = FileContext.getFileContext(configFile.toUri(), conf);
         Configuration jobConf = new Configuration(false);
-        jobConf.addResource(fc.open(configFile));
+        jobConf.addResource(fc.open(configFile), configFile.toString());
         return jobConf;
       }
     };

Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java?rev=1369164&r1=1369163&r2=1369164&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java Fri Aug  3 19:00:15 2012
@@ -29,6 +29,7 @@ import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
@@ -86,11 +87,13 @@ import org.apache.hadoop.yarn.factories.
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
 import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
+import org.apache.hadoop.yarn.server.security.ContainerTokenSecretManager;
 import org.apache.hadoop.yarn.util.BuilderUtils;
 import org.junit.After;
 import org.junit.Test;
@@ -352,7 +355,7 @@ public class TestRMContainerAllocator {
     }
     @Override
     protected ResourceScheduler createScheduler() {
-      return new MyFifoScheduler();
+      return new MyFifoScheduler(this.getRMContext());
     }
   }
 
@@ -1091,6 +1094,19 @@ public class TestRMContainerAllocator {
   }
   
   private static class MyFifoScheduler extends FifoScheduler {
+
+    public MyFifoScheduler(RMContext rmContext) {
+      super();
+      try {
+        Configuration conf = new Configuration();
+        reinitialize(conf, new ContainerTokenSecretManager(conf),
+            rmContext);
+      } catch (IOException ie) {
+        LOG.info("add application failed with ", ie);
+        assert (false);
+      }
+    }
+
     // override this to copy the objects otherwise FifoScheduler updates the
     // numContainers in same objects as kept by RMContainerAllocator
     @Override
@@ -1393,7 +1409,63 @@ public class TestRMContainerAllocator {
         maxReduceRampupLimit, reduceSlowStart);
     verify(allocator).rampDownReduces(anyInt());
   }
+
+  private static class RecalculateContainerAllocator extends MyContainerAllocator {
+    public boolean recalculatedReduceSchedule = false;
+
+    public RecalculateContainerAllocator(MyResourceManager rm,
+        Configuration conf, ApplicationAttemptId appAttemptId, Job job) {
+      super(rm, conf, appAttemptId, job);
+    }
+
+    @Override
+    public void scheduleReduces(int totalMaps, int completedMaps,
+        int scheduledMaps, int scheduledReduces, int assignedMaps,
+        int assignedReduces, int mapResourceReqt, int reduceResourceReqt,
+        int numPendingReduces, float maxReduceRampupLimit, float reduceSlowStart) {
+      recalculatedReduceSchedule = true;
+    }
+  }
   
+  @Test
+  public void testCompletedTasksRecalculateSchedule() throws Exception {
+    LOG.info("Running testCompletedTasksRecalculateSchedule");
+
+    Configuration conf = new Configuration();
+    final MyResourceManager rm = new MyResourceManager(conf);
+    rm.start();
+    DrainDispatcher dispatcher = (DrainDispatcher) rm.getRMContext()
+        .getDispatcher();
+
+    // Submit the application
+    RMApp app = rm.submitApp(1024);
+    dispatcher.await();
+
+    ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt()
+        .getAppAttemptId();
+    JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0);
+    Job job = mock(Job.class);
+    when(job.getReport()).thenReturn(
+        MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0,
+            0, 0, 0, 0, 0, 0, "jobfile", null, false));
+    doReturn(10).when(job).getTotalMaps();
+    doReturn(10).when(job).getTotalReduces();
+    doReturn(0).when(job).getCompletedMaps();
+    RecalculateContainerAllocator allocator =
+        new RecalculateContainerAllocator(rm, conf, appAttemptId, job);
+    allocator.schedule();
+
+    allocator.recalculatedReduceSchedule = false;
+    allocator.schedule();
+    Assert.assertFalse("Unexpected recalculate of reduce schedule",
+        allocator.recalculatedReduceSchedule);
+
+    doReturn(1).when(job).getCompletedMaps();
+    allocator.schedule();
+    Assert.assertTrue("Expected recalculate of reduce schedule",
+        allocator.recalculatedReduceSchedule);
+  }
+
   public static void main(String[] args) throws Exception {
     TestRMContainerAllocator t = new TestRMContainerAllocator();
     t.testSimple();
@@ -1402,6 +1474,7 @@ public class TestRMContainerAllocator {
     t.testReportedAppProgress();
     t.testReportedAppProgressWithOnlyMaps();
     t.testBlackListedNodes();
+    t.testCompletedTasksRecalculateSchedule();
   }
 
 }

Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java?rev=1369164&r1=1369163&r2=1369164&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java Fri Aug  3 19:00:15 2012
@@ -565,6 +565,73 @@ public class TestTaskAttempt{
     assertFalse("InternalError occurred trying to handle TA_CONTAINER_CLEANED",
         eventHandler.internalError);
   }
+  
+  @Test
+  public void testDoubleTooManyFetchFailure() throws Exception {
+    ApplicationId appId = BuilderUtils.newApplicationId(1, 2);
+    ApplicationAttemptId appAttemptId =
+      BuilderUtils.newApplicationAttemptId(appId, 0);
+    JobId jobId = MRBuilderUtils.newJobId(appId, 1);
+    TaskId taskId = MRBuilderUtils.newTaskId(jobId, 1, TaskType.MAP);
+    TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(taskId, 0);
+    Path jobFile = mock(Path.class);
+
+    MockEventHandler eventHandler = new MockEventHandler();
+    TaskAttemptListener taListener = mock(TaskAttemptListener.class);
+    when(taListener.getAddress()).thenReturn(new InetSocketAddress("localhost", 0));
+
+    JobConf jobConf = new JobConf();
+    jobConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
+    jobConf.setBoolean("fs.file.impl.disable.cache", true);
+    jobConf.set(JobConf.MAPRED_MAP_TASK_ENV, "");
+    jobConf.set(MRJobConfig.APPLICATION_ATTEMPT_ID, "10");
+
+    TaskSplitMetaInfo splits = mock(TaskSplitMetaInfo.class);
+    when(splits.getLocations()).thenReturn(new String[] {"127.0.0.1"});
+
+    AppContext appCtx = mock(AppContext.class);
+    ClusterInfo clusterInfo = mock(ClusterInfo.class);
+    Resource resource = mock(Resource.class);
+    when(appCtx.getClusterInfo()).thenReturn(clusterInfo);
+    when(clusterInfo.getMinContainerCapability()).thenReturn(resource);
+    when(resource.getMemory()).thenReturn(1024);
+
+    TaskAttemptImpl taImpl =
+      new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1,
+          splits, jobConf, taListener,
+          mock(OutputCommitter.class), mock(Token.class), new Credentials(),
+          new SystemClock(), appCtx);
+
+    NodeId nid = BuilderUtils.newNodeId("127.0.0.1", 0);
+    ContainerId contId = BuilderUtils.newContainerId(appAttemptId, 3);
+    Container container = mock(Container.class);
+    when(container.getId()).thenReturn(contId);
+    when(container.getNodeId()).thenReturn(nid);
+    when(container.getNodeHttpAddress()).thenReturn("localhost:0");
+
+    taImpl.handle(new TaskAttemptEvent(attemptId,
+        TaskAttemptEventType.TA_SCHEDULE));
+    taImpl.handle(new TaskAttemptContainerAssignedEvent(attemptId,
+        container, mock(Map.class)));
+    taImpl.handle(new TaskAttemptContainerLaunchedEvent(attemptId, 0));
+    taImpl.handle(new TaskAttemptEvent(attemptId,
+        TaskAttemptEventType.TA_DONE));
+    taImpl.handle(new TaskAttemptEvent(attemptId,
+        TaskAttemptEventType.TA_CONTAINER_CLEANED));
+    
+    assertEquals("Task attempt is not in succeeded state", taImpl.getState(),
+        TaskAttemptState.SUCCEEDED);
+    taImpl.handle(new TaskAttemptEvent(attemptId,
+        TaskAttemptEventType.TA_TOO_MANY_FETCH_FAILURE));
+    assertEquals("Task attempt is not in FAILED state", taImpl.getState(),
+        TaskAttemptState.FAILED);
+    taImpl.handle(new TaskAttemptEvent(attemptId,
+        TaskAttemptEventType.TA_TOO_MANY_FETCH_FAILURE));
+    assertEquals("Task attempt is not in FAILED state, still", taImpl.getState(),
+        TaskAttemptState.FAILED);
+    assertFalse("InternalError occurred trying to handle TA_CONTAINER_CLEANED",
+        eventHandler.internalError);
+  }
 
   public static class MockEventHandler implements EventHandler {
     public boolean internalError;

Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java?rev=1369164&r1=1369163&r2=1369164&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java Fri Aug  3 19:00:15 2012
@@ -20,6 +20,7 @@ package org.apache.hadoop.mapreduce.v2.a
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
@@ -127,8 +128,13 @@ public class TestTaskImpl {
     @Override
     protected int getMaxAttempts() {
       return 100;
-    }    
-    
+    }
+
+    @Override
+    protected void internalError(TaskEventType type) {
+      super.internalError(type);
+      fail("Internal error: " + type);
+    }
   }
   
   private class MockTaskAttemptImpl extends TaskAttemptImpl {
@@ -462,5 +468,32 @@ public class TestTaskImpl {
 
     assertTaskSucceededState();
   }
+  
+  @Test
+  public void testSpeculativeTaskAttemptSucceedsEvenIfFirstFails() {
+    TaskId taskId = getNewTaskID();
+    scheduleTaskAttempt(taskId);
+    launchTaskAttempt(getLastAttempt().getAttemptId());
+    updateLastAttemptState(TaskAttemptState.RUNNING);
+
+    // Add a speculative task attempt that succeeds
+    mockTask.handle(new TaskTAttemptEvent(getLastAttempt().getAttemptId(), 
+        TaskEventType.T_ADD_SPEC_ATTEMPT));
+    launchTaskAttempt(getLastAttempt().getAttemptId());
+    commitTaskAttempt(getLastAttempt().getAttemptId());
+    mockTask.handle(new TaskTAttemptEvent(getLastAttempt().getAttemptId(), 
+        TaskEventType.T_ATTEMPT_SUCCEEDED));
+    
+    // The task should now have succeeded
+    assertTaskSucceededState();
+    
+    // Now fail the first task attempt, after the second has succeeded
+    mockTask.handle(new TaskTAttemptEvent(taskAttempts.get(0).getAttemptId(), 
+        TaskEventType.T_ATTEMPT_FAILED));
+    
+    // The task should still be in the succeeded state
+    assertTaskSucceededState();
+    
+  }
 
 }

Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/webapp/TestAMWebServicesJobs.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/webapp/TestAMWebServicesJobs.java?rev=1369164&r1=1369163&r2=1369164&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/webapp/TestAMWebServicesJobs.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/webapp/TestAMWebServicesJobs.java Fri Aug  3 19:00:15 2012
@@ -972,7 +972,8 @@ public class TestAMWebServicesJobs exten
         WebServicesTestUtils.checkStringMatch("containerId", amInfo
             .getContainerId().toString(), containerId);
 
-        String localLogsLink = ujoin("node", "containerlogs", containerId);
+        String localLogsLink =ujoin("node", "containerlogs", containerId,
+            job.getUserName());
 
         assertTrue("logsLink", logsLink.contains(localLogsLink));
       }

Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalDistributedCacheManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalDistributedCacheManager.java?rev=1369164&r1=1369163&r2=1369164&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalDistributedCacheManager.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalDistributedCacheManager.java Fri Aug  3 19:00:15 2012
@@ -18,12 +18,9 @@
 
 package org.apache.hadoop.mapred;
 
-import com.google.common.collect.Maps;
-
 import java.io.File;
 import java.io.IOException;
 import java.net.MalformedURLException;
-import java.net.URI;
 import java.net.URISyntaxException;
 import java.net.URL;
 import java.net.URLClassLoader;
@@ -34,6 +31,7 @@ import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.Random;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
@@ -60,6 +58,7 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.hadoop.yarn.util.FSDownload;
 
+import com.google.common.collect.Maps;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 
 /**
@@ -85,6 +84,8 @@ class LocalDistributedCacheManager {
    * @throws IOException
    */
   public void setup(JobConf conf) throws IOException {
+    File workDir = new File(System.getProperty("user.dir"));
+    
     // Generate YARN local resources objects corresponding to the distributed
     // cache configuration
     Map<String, LocalResource> localResources = 
@@ -132,7 +133,8 @@ class LocalDistributedCacheManager {
         Future<Path> future = exec.submit(download);
         resourcesToPaths.put(resource, future);
       }
-      for (LocalResource resource : localResources.values()) {
+      for (Entry<String, LocalResource> entry : localResources.entrySet()) {
+        LocalResource resource = entry.getValue();
         Path path;
         try {
           path = resourcesToPaths.get(resource).get();
@@ -142,6 +144,10 @@ class LocalDistributedCacheManager {
           throw new IOException(e);
         }
         String pathString = path.toUri().toString();
+        String link = entry.getKey();
+        String target = new File(path.toUri()).getPath();
+        symlink(workDir, target, link);
+        
         if (resource.getType() == LocalResourceType.ARCHIVE) {
           localArchives.add(pathString);
         } else if (resource.getType() == LocalResourceType.FILE) {
@@ -175,27 +181,6 @@ class LocalDistributedCacheManager {
           .arrayToString(localFiles.toArray(new String[localArchives
               .size()])));
     }
-    if (DistributedCache.getSymlink(conf)) {
-      File workDir = new File(System.getProperty("user.dir"));
-      URI[] archives = DistributedCache.getCacheArchives(conf);
-      URI[] files = DistributedCache.getCacheFiles(conf);
-      Path[] localArchives = DistributedCache.getLocalCacheArchives(conf);
-      Path[] localFiles = DistributedCache.getLocalCacheFiles(conf);
-      if (archives != null) {
-        for (int i = 0; i < archives.length; i++) {
-          String link = archives[i].getFragment();
-          String target = new File(localArchives[i].toUri()).getPath();
-          symlink(workDir, target, link);
-        }
-      }
-      if (files != null) {
-        for (int i = 0; i < files.length; i++) {
-          String link = files[i].getFragment();
-          String target = new File(localFiles[i].toUri()).getPath();
-          symlink(workDir, target, link);
-        }
-      }
-    }
     setupCalled = true;
   }
   

Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/TypeConverter.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/TypeConverter.java?rev=1369164&r1=1369163&r2=1369164&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/TypeConverter.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/TypeConverter.java Fri Aug  3 19:00:15 2012
@@ -383,6 +383,7 @@ public class TypeConverter {
     switch (yarnApplicationState) {
     case NEW:
     case SUBMITTED:
+    case ACCEPTED:
       return State.PREP;
     case RUNNING:
       return State.RUNNING;

Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JHAdminConfig.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JHAdminConfig.java?rev=1369164&r1=1369163&r2=1369164&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JHAdminConfig.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JHAdminConfig.java Fri Aug  3 19:00:15 2012
@@ -133,5 +133,16 @@ public class JHAdminConfig {
    * The HistoryStorage class to use to cache history data.
    */
   public static final String MR_HISTORY_STORAGE =
-    MR_HISTORY_PREFIX + ".store.class";
+    MR_HISTORY_PREFIX + "store.class";
+
+  /** Whether to use fixed ports with the minicluster. */
+  public static final String MR_HISTORY_MINICLUSTER_FIXED_PORTS = MR_HISTORY_PREFIX
+       + "minicluster.fixed.ports";
+
+  /**
+   * Default is false to be able to run tests concurrently without port
+   * conflicts.
+   */
+  public static boolean DEFAULT_MR_HISTORY_MINICLUSTER_FIXED_PORTS = false;
+
 }

Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java?rev=1369164&r1=1369163&r2=1369164&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java Fri Aug  3 19:00:15 2012
@@ -171,8 +171,9 @@ public class MRApps extends Apps {
       }
 
       // Add standard Hadoop classes
-      for (String c : conf.get(YarnConfiguration.YARN_APPLICATION_CLASSPATH)
-          .split(",")) {
+      for (String c : conf.getStrings(
+          YarnConfiguration.YARN_APPLICATION_CLASSPATH,
+          YarnConfiguration.DEFAULT_YARN_APPLICATION_CLASSPATH)) {
         Apps.addToEnvironment(environment, Environment.CLASSPATH.name(), c
             .trim());
       }

Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapred/TestMRWithDistributedCache.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapred/TestMRWithDistributedCache.java?rev=1369164&r1=1369163&r2=1369164&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapred/TestMRWithDistributedCache.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapred/TestMRWithDistributedCache.java Fri Aug  3 19:00:15 2012
@@ -117,7 +117,8 @@ public class TestMRWithDistributedCache 
       TestCase.assertEquals("symlink distributed.first.symlink length not 1", 1,
           symlinkFile.length());
       
-      TestCase.assertFalse("second file should not be symlinked",
+      //This last one is a difference between MRv2 and MRv1
+      TestCase.assertTrue("second file should be symlinked too",
           expectedAbsentSymlinkFile.exists());
     }
   }
@@ -145,7 +146,6 @@ public class TestMRWithDistributedCache 
     job.addFileToClassPath(second);
     job.addArchiveToClassPath(third);
     job.addCacheArchive(fourth.toUri());
-    job.createSymlink();
     job.setMaxMapAttempts(1); // speed up failures
 
     job.submit();

Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/filecache/DistributedCache.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/filecache/DistributedCache.java?rev=1369164&r1=1369163&r2=1369164&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/filecache/DistributedCache.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/filecache/DistributedCache.java Fri Aug  3 19:00:15 2012
@@ -48,8 +48,12 @@ import org.apache.hadoop.mapreduce.Job;
  * Archives (zip, tar and tgz/tar.gz files) are un-archived at the slave nodes. 
  * Jars may be optionally added to the classpath of the tasks, a rudimentary 
  * software distribution mechanism.  Files have execution permissions.
- * Optionally users can also direct it to symlink the distributed cache file(s)
- * into the working directory of the task.</p>
+ * In older version of Hadoop Map/Reduce users could optionally ask for symlinks
+ * to be created in the working directory of the child task.  In the current 
+ * version symlinks are always created.  If the URL does not have a fragment 
+ * the name of the file or directory will be used. If multiple files or 
+ * directories map to the same link name, the last one added, will be used.  All
+ * others will not even be downloaded.</p>
  * 
  * <p><code>DistributedCache</code> tracks modification timestamps of the cache 
  * files. Clearly the cache files should not be modified by the application 
@@ -91,8 +95,7 @@ import org.apache.hadoop.mapreduce.Job;
  *       
  *       public void configure(JobConf job) {
  *         // Get the cached archives/files
- *         localArchives = DistributedCache.getLocalCacheArchives(job);
- *         localFiles = DistributedCache.getLocalCacheFiles(job);
+ *         File f = new File("./map.zip/some/file/in/zip.txt");
  *       }
  *       
  *       public void map(K key, V value, 

Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/IFile.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/IFile.java?rev=1369164&r1=1369163&r2=1369164&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/IFile.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/IFile.java Fri Aug  3 19:00:15 2012
@@ -33,6 +33,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.DataInputBuffer;
 import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.WritableUtils;
 import org.apache.hadoop.io.compress.CodecPool;
 import org.apache.hadoop.io.compress.CompressionCodec;
@@ -379,7 +380,8 @@ public class IFile {
     private int readData(byte[] buf, int off, int len) throws IOException {
       int bytesRead = 0;
       while (bytesRead < len) {
-        int n = in.read(buf, off+bytesRead, len-bytesRead);
+        int n = IOUtils.wrappedReadForCompressedData(in, buf, off + bytesRead,
+            len - bytesRead);
         if (n < 0) {
           return bytesRead;
         }

Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/IndexCache.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/IndexCache.java?rev=1369164&r1=1369163&r2=1369164&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/IndexCache.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/IndexCache.java Fri Aug  3 19:00:15 2012
@@ -67,13 +67,13 @@ class IndexCache {
     if (info == null) {
       info = readIndexFileToCache(fileName, mapId, expectedIndexOwner);
     } else {
-      while (isUnderConstruction(info)) {
-        try {
-          // In case the entry is ready after the above check but
-          // before the following wait, we do timed wait.
-          info.wait(200);
-        } catch (InterruptedException e) {
-          throw new IOException("Interrupted waiting for construction", e);
+      synchronized(info) {
+        while (isUnderConstruction(info)) {
+          try {
+            info.wait();
+          } catch (InterruptedException e) {
+            throw new IOException("Interrupted waiting for construction", e);
+          }
         }
       }
       LOG.debug("IndexCache HIT: MapId " + mapId + " found");
@@ -101,13 +101,13 @@ class IndexCache {
     IndexInformation info;
     IndexInformation newInd = new IndexInformation();
     if ((info = cache.putIfAbsent(mapId, newInd)) != null) {
-      while (isUnderConstruction(info)) {
-        try {
-          // In case the entry is ready after the above check but
-          // before the following wait, we do timed wait.
-          info.wait(200);
-        } catch (InterruptedException e) {
-          throw new IOException("Interrupted waiting for construction", e);
+      synchronized(info) {
+        while (isUnderConstruction(info)) {
+          try {
+            info.wait();
+          } catch (InterruptedException e) {
+            throw new IOException("Interrupted waiting for construction", e);
+          }
         }
       }
       LOG.debug("IndexCache HIT: MapId " + mapId + " found");

Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobQueueClient.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobQueueClient.java?rev=1369164&r1=1369163&r2=1369164&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobQueueClient.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobQueueClient.java Fri Aug  3 19:00:15 2012
@@ -25,6 +25,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 
 import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.mapreduce.JobStatus;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
@@ -184,7 +185,7 @@ class JobQueueClient extends Configured 
     printJobQueueInfo(jobQueueInfo, new PrintWriter(System.out));
     if (showJobs && (jobQueueInfo.getChildren() == null ||
         jobQueueInfo.getChildren().size() == 0)) {
-      JobStatus[] jobs = jc.getJobsFromQueue(queue);
+      JobStatus[] jobs = jobQueueInfo.getJobStatuses();
       if (jobs == null)
         jobs = new JobStatus[0];
       jc.displayJobList(jobs);

Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobStatus.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobStatus.java?rev=1369164&r1=1369163&r2=1369164&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobStatus.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobStatus.java Fri Aug  3 19:00:15 2012
@@ -238,7 +238,7 @@ public class JobStatus extends org.apach
       stat.getSetupProgress(), stat.getMapProgress(), stat.getReduceProgress(),
       stat.getCleanupProgress(), stat.getState().getValue(), 
       JobPriority.valueOf(stat.getPriority().name()),
-      stat.getUsername(), stat.getJobName(), stat.getJobFile(),
+      stat.getUsername(), stat.getJobName(), stat.getQueue(), stat.getJobFile(),
       stat.getTrackingUrl(), stat.isUber());
     old.setStartTime(stat.getStartTime());
     old.setFinishTime(stat.getFinishTime());

Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Mapper.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Mapper.java?rev=1369164&r1=1369163&r2=1369164&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Mapper.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Mapper.java Fri Aug  3 19:00:15 2012
@@ -144,7 +144,7 @@ public interface Mapper<K1, V1, K2, V2> 
    *
    * <p>Applications can use the {@link Reporter} provided to report progress 
    * or just indicate that they are alive. In scenarios where the application 
-   * takes an insignificant amount of time to process individual key/value 
+   * takes significant amount of time to process individual key/value
    * pairs, this is crucial since the framework might assume that the task has 
    * timed-out and kill that task. The other way of avoiding this is to set 
    * <a href="{@docRoot}/../mapred-default.html#mapreduce.task.timeout">

Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Reporter.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Reporter.java?rev=1369164&r1=1369163&r2=1369164&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Reporter.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Reporter.java Fri Aug  3 19:00:15 2012
@@ -29,7 +29,7 @@ import org.apache.hadoop.util.Progressab
  * 
  * <p>{@link Mapper} and {@link Reducer} can use the <code>Reporter</code>
  * provided to report progress or just indicate that they are alive. In 
- * scenarios where the application takes an insignificant amount of time to 
+ * scenarios where the application takes significant amount of time to
  * process individual key/value pairs, this is crucial since the framework 
  * might assume that the task has timed-out and kill that task.
  *

Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TaskLog.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TaskLog.java?rev=1369164&r1=1369163&r2=1369164&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TaskLog.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TaskLog.java Fri Aug  3 19:00:15 2012
@@ -66,14 +66,6 @@ public class TaskLog {
   
   // localFS is set in (and used by) writeToIndexFile()
   static LocalFileSystem localFS = null;
-  static {
-    if (!LOG_DIR.exists()) {
-      boolean b = LOG_DIR.mkdirs();
-      if (!b) {
-        LOG.debug("mkdirs failed. Ignoring.");
-      }
-    }
-  }
   
   public static String getMRv2LogDir() {
     return System.getProperty(MRJobConfig.TASK_LOG_DIR);
@@ -638,6 +630,12 @@ public class TaskLog {
    * @return base log directory
    */
   static File getUserLogDir() {
+    if (!LOG_DIR.exists()) {
+      boolean b = LOG_DIR.mkdirs();
+      if (!b) {
+        LOG.debug("mkdirs failed. Ignoring.");
+      }
+    }
     return LOG_DIR;
   }
   

Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/pipes/Submitter.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/pipes/Submitter.java?rev=1369164&r1=1369163&r2=1369164&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/pipes/Submitter.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/pipes/Submitter.java Fri Aug  3 19:00:15 2012
@@ -313,7 +313,6 @@ public class Submitter extends Configure
     // add default debug script only when executable is expressed as
     // <path>#<executable>
     if (exec.contains("#")) {
-      DistributedCache.createSymlink(conf);
       // set default gdb commands for map and reduce task 
       String defScript = "$HADOOP_PREFIX/src/c++/pipes/debug/pipes-default-script";
       setIfUnset(conf, MRJobConfig.MAP_DEBUG_SCRIPT,defScript);

Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java?rev=1369164&r1=1369163&r2=1369164&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java Fri Aug  3 19:00:15 2012
@@ -1049,9 +1049,10 @@ public class Job extends JobContextImpl 
   }
 
   /**
-   * This method allows you to create symlinks in the current working directory
-   * of the task to all the cache files/archives
+   * Originally intended to enable symlinks, but currently symlinks cannot be
+   * disabled.
    */
+  @Deprecated
   public void createSymlink() {
     ensureState(JobState.DEFINE);
     DistributedCache.createSymlink(conf);

Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobContext.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobContext.java?rev=1369164&r1=1369163&r2=1369164&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobContext.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobContext.java Fri Aug  3 19:00:15 2012
@@ -221,10 +221,11 @@ public interface JobContext extends MRJo
   public String getUser();
   
   /**
-   * This method checks to see if symlinks are to be create for the 
-   * localized cache files in the current working directory 
-   * @return true if symlinks are to be created- else return false
+   * Originally intended to check if symlinks should be used, but currently
+   * symlinks cannot be disabled.
+   * @return true
    */
+  @Deprecated
   public boolean getSymlink();
   
   /**
@@ -251,14 +252,22 @@ public interface JobContext extends MRJo
    * Return the path array of the localized caches
    * @return A path array of localized caches
    * @throws IOException
+   * @deprecated the array returned only includes the items the were 
+   * downloaded. There is no way to map this to what is returned by
+   * {@link #getCacheArchives()}.
    */
+  @Deprecated
   public Path[] getLocalCacheArchives() throws IOException;
 
   /**
    * Return the path array of the localized files
    * @return A path array of localized files
    * @throws IOException
+   * @deprecated the array returned only includes the items the were 
+   * downloaded. There is no way to map this to what is returned by
+   * {@link #getCacheFiles()}.
    */
+  @Deprecated
   public Path[] getLocalCacheFiles() throws IOException;
 
   /**

Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobSubmitter.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobSubmitter.java?rev=1369164&r1=1369163&r2=1369164&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobSubmitter.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobSubmitter.java Fri Aug  3 19:00:15 2012
@@ -135,8 +135,9 @@ class JobSubmitter {
       short replication) throws IOException {
     Configuration conf = job.getConfiguration();
     if (!(conf.getBoolean(Job.USED_GENERIC_PARSER, false))) {
-      LOG.warn("Use GenericOptionsParser for parsing the arguments. " +
-               "Applications should implement Tool for the same.");
+      LOG.warn("Hadoop command-line option parsing not performed. " +
+               "Implement the Tool interface and execute your application " +
+               "with ToolRunner to remedy this.");
     }
 
     // get all the command line arguments passed in by the user conf
@@ -189,7 +190,6 @@ class JobSubmitter {
           //should not throw a uri exception 
           throw new IOException("Failed to create uri for " + tmpFile, ue);
         }
-        DistributedCache.createSymlink(conf);
       }
     }
       
@@ -224,7 +224,6 @@ class JobSubmitter {
           //should not throw an uri excpetion
           throw new IOException("Failed to create uri for " + tmpArchives, ue);
         }
-        DistributedCache.createSymlink(conf);
       }
     }
 

Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRConfig.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRConfig.java?rev=1369164&r1=1369163&r2=1369164&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRConfig.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRConfig.java Fri Aug  3 19:00:15 2012
@@ -79,4 +79,9 @@ public interface MRConfig {
   public static final int MAX_BLOCK_LOCATIONS_DEFAULT = 10;
   public static final String MAX_BLOCK_LOCATIONS_KEY =
     "mapreduce.job.max.split.locations";
+
+  public static final String SHUFFLE_SSL_ENABLED_KEY =
+    "mapreduce.shuffle.ssl.enabled";
+
+  public static final boolean SHUFFLE_SSL_ENABLED_DEFAULT = false;
 }

Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java?rev=1369164&r1=1369163&r2=1369164&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java Fri Aug  3 19:00:15 2012
@@ -114,6 +114,10 @@ public interface MRJobConfig {
 
   public static final String CACHE_ARCHIVES_VISIBILITIES = "mapreduce.job.cache.archives.visibilities";
 
+  /**
+   * @deprecated Symlinks are always on and cannot be disabled.
+   */
+  @Deprecated
   public static final String CACHE_SYMLINK = "mapreduce.job.cache.symlink.create";
 
   public static final String USER_LOG_RETAIN_HOURS = "mapreduce.job.userlog.retain.hours";

Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/filecache/DistributedCache.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/filecache/DistributedCache.java?rev=1369164&r1=1369163&r2=1369164&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/filecache/DistributedCache.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/filecache/DistributedCache.java Fri Aug  3 19:00:15 2012
@@ -55,8 +55,12 @@ import java.net.URI;
  * Archives (zip, tar and tgz/tar.gz files) are un-archived at the slave nodes. 
  * Jars may be optionally added to the classpath of the tasks, a rudimentary 
  * software distribution mechanism.  Files have execution permissions.
- * Optionally users can also direct it to symlink the distributed cache file(s)
- * into the working directory of the task.</p>
+ * In older version of Hadoop Map/Reduce users could optionally ask for symlinks
+ * to be created in the working directory of the child task.  In the current 
+ * version symlinks are always created.  If the URL does not have a fragment 
+ * the name of the file or directory will be used. If multiple files or 
+ * directories map to the same link name, the last one added, will be used.  All
+ * others will not even be downloaded.</p>
  * 
  * <p><code>DistributedCache</code> tracks modification timestamps of the cache 
  * files. Clearly the cache files should not be modified by the application 
@@ -98,8 +102,7 @@ import java.net.URI;
  *       
  *       public void configure(JobConf job) {
  *         // Get the cached archives/files
- *         localArchives = DistributedCache.getLocalCacheArchives(job);
- *         localFiles = DistributedCache.getLocalCacheFiles(job);
+ *         File f = new File("./map.zip/some/file/in/zip.txt");
  *       }
  *       
  *       public void map(K key, V value, 
@@ -375,32 +378,26 @@ public class DistributedCache {
   }
 
   /**
-   * This method allows you to create symlinks in the current working directory
-   * of the task to all the cache files/archives.
-   * Intended to be used by user code.
+   * Originally intended to enable symlinks, but currently symlinks cannot be
+   * disabled. This is a NO-OP.
    * @param conf the jobconf
-   * @deprecated Use {@link Job#createSymlink()} instead  
+   * @deprecated This is a NO-OP. 
    */
   @Deprecated
   public static void createSymlink(Configuration conf){
-    conf.set(MRJobConfig.CACHE_SYMLINK, "yes");
+    //NOOP
   }
   
   /**
-   * This method checks to see if symlinks are to be create for the 
-   * localized cache files in the current working directory 
-   * Used by internal DistributedCache code.
+   * Originally intended to check if symlinks should be used, but currently
+   * symlinks cannot be disabled.
    * @param conf the jobconf
-   * @return true if symlinks are to be created- else return false
-   * @deprecated Use {@link JobContext#getSymlink()} instead
+   * @return true
+   * @deprecated symlinks are always created.
    */
   @Deprecated
   public static boolean getSymlink(Configuration conf){
-    String result = conf.get(MRJobConfig.CACHE_SYMLINK);
-    if ("yes".equals(result)){
-      return true;
-    }
-    return false;
+    return true;
   }
 
   private static boolean[] parseBooleans(String[] strs) {

Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java?rev=1369164&r1=1369163&r2=1369164&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java Fri Aug  3 19:00:15 2012
@@ -21,15 +21,18 @@ import java.io.DataInputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
+import java.net.HttpURLConnection;
 import java.net.MalformedURLException;
 import java.net.URL;
-import java.net.HttpURLConnection;
 import java.net.URLConnection;
+import java.security.GeneralSecurityException;
+import java.util.Arrays;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 
 import javax.crypto.SecretKey;
+import javax.net.ssl.HttpsURLConnection;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -42,14 +45,17 @@ import org.apache.hadoop.mapred.Counters
 import org.apache.hadoop.mapred.IFileInputStream;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapreduce.MRConfig;
 import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.TaskAttemptID;
 import org.apache.hadoop.mapreduce.security.SecureShuffleUtils;
+import org.apache.hadoop.security.ssl.SSLFactory;
 import org.apache.hadoop.mapreduce.task.reduce.MapOutput.Type;
 import org.apache.hadoop.util.Progressable;
 import org.apache.hadoop.util.ReflectionUtils;
 
-@SuppressWarnings({"deprecation"})
+import com.google.common.annotations.VisibleForTesting;
+
 class Fetcher<K,V> extends Thread {
   
   private static final Log LOG = LogFactory.getLog(Fetcher.class);
@@ -92,6 +98,9 @@ class Fetcher<K,V> extends Thread {
 
   private volatile boolean stopped = false;
 
+  private static boolean sslShuffle;
+  private static SSLFactory sslFactory;
+
   public Fetcher(JobConf job, TaskAttemptID reduceId, 
                  ShuffleScheduler<K,V> scheduler, MergeManager<K,V> merger,
                  Reporter reporter, ShuffleClientMetrics metrics,
@@ -135,6 +144,20 @@ class Fetcher<K,V> extends Thread {
     
     setName("fetcher#" + id);
     setDaemon(true);
+
+    synchronized (Fetcher.class) {
+      sslShuffle = job.getBoolean(MRConfig.SHUFFLE_SSL_ENABLED_KEY,
+                                  MRConfig.SHUFFLE_SSL_ENABLED_DEFAULT);
+      if (sslShuffle && sslFactory == null) {
+        sslFactory = new SSLFactory(SSLFactory.Mode.CLIENT, job);
+        try {
+          sslFactory.init();
+        } catch (Exception ex) {
+          sslFactory.destroy();
+          throw new RuntimeException(ex);
+        }
+      }
+    }
   }
   
   public void run() {
@@ -173,15 +196,34 @@ class Fetcher<K,V> extends Thread {
     } catch (InterruptedException ie) {
       LOG.warn("Got interrupt while joining " + getName(), ie);
     }
+    if (sslFactory != null) {
+      sslFactory.destroy();
+    }
   }
 
+  @VisibleForTesting
+  protected HttpURLConnection openConnection(URL url) throws IOException {
+    HttpURLConnection conn = (HttpURLConnection) url.openConnection();
+    if (sslShuffle) {
+      HttpsURLConnection httpsConn = (HttpsURLConnection) conn;
+      try {
+        httpsConn.setSSLSocketFactory(sslFactory.createSSLSocketFactory());
+      } catch (GeneralSecurityException ex) {
+        throw new IOException(ex);
+      }
+      httpsConn.setHostnameVerifier(sslFactory.getHostnameVerifier());
+    }
+    return conn;
+  }
+  
   /**
    * The crux of the matter...
    * 
    * @param host {@link MapHost} from which we need to  
    *              shuffle available map-outputs.
    */
-  private void copyFromHost(MapHost host) throws IOException {
+  @VisibleForTesting
+  protected void copyFromHost(MapHost host) throws IOException {
     // Get completed maps on 'host'
     List<TaskAttemptID> maps = scheduler.getMapsForHost(host);
     
@@ -191,9 +233,9 @@ class Fetcher<K,V> extends Thread {
       return;
     }
     
-    LOG.debug("Fetcher " + id + " going to fetch from " + host);
-    for (TaskAttemptID tmp: maps) {
-      LOG.debug(tmp);
+    if(LOG.isDebugEnabled()) {
+      LOG.debug("Fetcher " + id + " going to fetch from " + host + " for: "
+        + maps);
     }
     
     // List of maps to be fetched yet
@@ -205,7 +247,7 @@ class Fetcher<K,V> extends Thread {
     
     try {
       URL url = getMapOutputURL(host, maps);
-      HttpURLConnection connection = (HttpURLConnection)url.openConnection();
+      HttpURLConnection connection = openConnection(url);
       
       // generate hash of the url
       String msgToEncode = SecureShuffleUtils.buildMsgFrom(url);
@@ -266,17 +308,25 @@ class Fetcher<K,V> extends Thread {
     
     try {
       // Loop through available map-outputs and fetch them
-      // On any error, good becomes false and we exit after putting back
-      // the remaining maps to the yet_to_be_fetched list
-      boolean good = true;
-      while (!remaining.isEmpty() && good) {
-        good = copyMapOutput(host, input, remaining);
+      // On any error, faildTasks is not null and we exit
+      // after putting back the remaining maps to the 
+      // yet_to_be_fetched list and marking the failed tasks.
+      TaskAttemptID[] failedTasks = null;
+      while (!remaining.isEmpty() && failedTasks == null) {
+        failedTasks = copyMapOutput(host, input, remaining);
+      }
+      
+      if(failedTasks != null && failedTasks.length > 0) {
+        LOG.warn("copyMapOutput failed for tasks "+Arrays.toString(failedTasks));
+        for(TaskAttemptID left: failedTasks) {
+          scheduler.copyFailed(left, host, true);
+        }
       }
       
       IOUtils.cleanup(LOG, input);
       
       // Sanity check
-      if (good && !remaining.isEmpty()) {
+      if (failedTasks == null && !remaining.isEmpty()) {
         throw new IOException("server didn't return all expected map outputs: "
             + remaining.size() + " left.");
       }
@@ -285,10 +335,11 @@ class Fetcher<K,V> extends Thread {
         scheduler.putBackKnownMapOutput(host, left);
       }
     }
-      
-   }
+  }
+  
+  private static TaskAttemptID[] EMPTY_ATTEMPT_ID_ARRAY = new TaskAttemptID[0];
   
-  private boolean copyMapOutput(MapHost host,
+  private TaskAttemptID[] copyMapOutput(MapHost host,
                                 DataInputStream input,
                                 Set<TaskAttemptID> remaining) {
     MapOutput<K,V> mapOutput = null;
@@ -310,18 +361,21 @@ class Fetcher<K,V> extends Thread {
       } catch (IllegalArgumentException e) {
         badIdErrs.increment(1);
         LOG.warn("Invalid map id ", e);
-        return false;
+        //Don't know which one was bad, so consider all of them as bad
+        return remaining.toArray(new TaskAttemptID[remaining.size()]);
       }
 
  
       // Do some basic sanity verification
       if (!verifySanity(compressedLength, decompressedLength, forReduce,
           remaining, mapId)) {
-        return false;
+        return new TaskAttemptID[] {mapId};
       }
       
-      LOG.debug("header: " + mapId + ", len: " + compressedLength + 
-               ", decomp len: " + decompressedLength);
+      if(LOG.isDebugEnabled()) {
+        LOG.debug("header: " + mapId + ", len: " + compressedLength + 
+            ", decomp len: " + decompressedLength);
+      }
       
       // Get the location for the map output - either in-memory or on-disk
       mapOutput = merger.reserve(mapId, decompressedLength, id);
@@ -329,7 +383,8 @@ class Fetcher<K,V> extends Thread {
       // Check if we can shuffle *now* ...
       if (mapOutput.getType() == Type.WAIT) {
         LOG.info("fetcher#" + id + " - MergerManager returned Status.WAIT ...");
-        return false;
+        //Not an error but wait to process data.
+        return EMPTY_ATTEMPT_ID_ARRAY;
       } 
       
       // Go!
@@ -351,24 +406,27 @@ class Fetcher<K,V> extends Thread {
       // Note successful shuffle
       remaining.remove(mapId);
       metrics.successFetch();
-      return true;
+      return null;
     } catch (IOException ioe) {
       ioErrs.increment(1);
       if (mapId == null || mapOutput == null) {
         LOG.info("fetcher#" + id + " failed to read map header" + 
                  mapId + " decomp: " + 
                  decompressedLength + ", " + compressedLength, ioe);
-        return false;
+        if(mapId == null) {
+          return remaining.toArray(new TaskAttemptID[remaining.size()]);
+        } else {
+          return new TaskAttemptID[] {mapId};
+        }
       }
       
-      LOG.info("Failed to shuffle output of " + mapId + 
+      LOG.warn("Failed to shuffle output of " + mapId + 
                " from " + host.getHostName(), ioe); 
 
       // Inform the shuffle-scheduler
       mapOutput.abort();
-      scheduler.copyFailed(mapId, host, true);
       metrics.failedFetch();
-      return false;
+      return new TaskAttemptID[] {mapId};
     }
 
   }

Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/util/ConfigUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/util/ConfigUtil.java?rev=1369164&r1=1369163&r2=1369164&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/util/ConfigUtil.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/util/ConfigUtil.java Fri Aug  3 19:00:15 2012
@@ -246,8 +246,6 @@ public class ConfigUtil {
       new String[] {MRJobConfig.CACHE_FILE_TIMESTAMPS});
     Configuration.addDeprecation("mapred.cache.archives.timestamps", 
       new String[] {MRJobConfig.CACHE_ARCHIVES_TIMESTAMPS});
-    Configuration.addDeprecation("mapred.create.symlink", 
-      new String[] {MRJobConfig.CACHE_SYMLINK});
     Configuration.addDeprecation("mapred.working.dir", 
       new String[] {MRJobConfig.WORKING_DIR});
     Configuration.addDeprecation("user.name", 

Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml?rev=1369164&r1=1369163&r2=1369164&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml Fri Aug  3 19:00:15 2012
@@ -513,6 +513,21 @@
 </property>
 
 <property>
+  <name>mapreduce.shuffle.ssl.enabled</name>
+  <value>false</value>
+  <description>
+    Whether to use SSL for for the Shuffle HTTP endpoints.
+  </description>
+</property>
+
+<property>
+  <name>mapreduce.shuffle.ssl.file.buffer.size</name>
+  <value>65536</value>
+  <description>Buffer size for reading spills from file when using SSL.
+  </description>
+</property>
+
+<property>
   <name>mapreduce.reduce.markreset.buffer.percent</name>
   <value>0.0</value>
   <description>The percentage of memory -relative to the maximum heap size- to
@@ -692,7 +707,7 @@
 </property>
 
 <property>
-  <name>mapreduce.output.fileoutputformat.compression.type</name>
+  <name>mapreduce.output.fileoutputformat.compress.type</name>
   <value>RECORD</value>
   <description>If the job outputs are to compressed as SequenceFiles, how should
                they be compressed? Should be one of NONE, RECORD or BLOCK.
@@ -700,7 +715,7 @@
 </property>
 
 <property>
-  <name>mapreduce.output.fileoutputformat.compression.codec</name>
+  <name>mapreduce.output.fileoutputformat.compress.codec</name>
   <value>org.apache.hadoop.io.compress.DefaultCodec</value>
   <description>If the job outputs are compressed, how should they be compressed?
   </description>
@@ -1285,7 +1300,7 @@
 <property>
   <name>mapreduce.jobhistory.address</name>
   <value>0.0.0.0:10020</value>
-  <description>MapReduce JobHistory Server host:port</description>
+  <description>MapReduce JobHistory Server IPC host:port</description>
 </property>
 
 <property>
@@ -1294,4 +1309,21 @@
   <description>MapReduce JobHistory Server Web UI host:port</description>
 </property>
 
+<property>
+  <name>mapreduce.jobhistory.keytab</name>
+  <description>
+    Location of the kerberos keytab file for the MapReduce
+    JobHistory Server.
+  </description>
+  <value>/etc/security/keytab/jhs.service.keytab</value>
+</property>
+
+<property>
+  <name>mapreduce.jobhistory.principal</name>
+  <description>
+    Kerberos principal name for the MapReduce JobHistory Server.
+  </description>
+  <value>jhs/_HOST@REALM.TLD</value>
+</property>
+
 </configuration>

Propchange: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
------------------------------------------------------------------------------
  Merged /hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml:r1358480-1369130

Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestIndexCache.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestIndexCache.java?rev=1369164&r1=1369163&r2=1369164&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestIndexCache.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestIndexCache.java Fri Aug  3 19:00:15 2012
@@ -35,16 +35,23 @@ import org.apache.hadoop.mapreduce.serve
 import junit.framework.TestCase;
 
 public class TestIndexCache extends TestCase {
+  private JobConf conf;
+  private FileSystem fs;
+  private Path p;
+
+  @Override
+  public void setUp() throws IOException {
+    conf = new JobConf();
+    fs = FileSystem.getLocal(conf).getRaw();
+    p =  new Path(System.getProperty("test.build.data", "/tmp"),
+        "cache").makeQualified(fs.getUri(), fs.getWorkingDirectory());
+  }
 
   public void testLRCPolicy() throws Exception {
     Random r = new Random();
     long seed = r.nextLong();
     r.setSeed(seed);
     System.out.println("seed: " + seed);
-    JobConf conf = new JobConf();
-    FileSystem fs = FileSystem.getLocal(conf).getRaw();
-    Path p = new Path(System.getProperty("test.build.data", "/tmp"),
-        "cache").makeQualified(fs);
     fs.delete(p, true);
     conf.setInt(TTConfig.TT_INDEX_CACHE, 1);
     final int partsPerMap = 1000;
@@ -115,10 +122,6 @@ public class TestIndexCache extends Test
 
   public void testBadIndex() throws Exception {
     final int parts = 30;
-    JobConf conf = new JobConf();
-    FileSystem fs = FileSystem.getLocal(conf).getRaw();
-    Path p = new Path(System.getProperty("test.build.data", "/tmp"),
-        "cache").makeQualified(fs);
     fs.delete(p, true);
     conf.setInt(TTConfig.TT_INDEX_CACHE, 1);
     IndexCache cache = new IndexCache(conf);
@@ -150,10 +153,6 @@ public class TestIndexCache extends Test
   }
 
   public void testInvalidReduceNumberOrLength() throws Exception {
-    JobConf conf = new JobConf();
-    FileSystem fs = FileSystem.getLocal(conf).getRaw();
-    Path p = new Path(System.getProperty("test.build.data", "/tmp"),
-                      "cache").makeQualified(fs);
     fs.delete(p, true);
     conf.setInt(TTConfig.TT_INDEX_CACHE, 1);
     final int partsPerMap = 1000;
@@ -199,10 +198,6 @@ public class TestIndexCache extends Test
     // This test case may not repeatable. But on my macbook this test 
     // fails with probability of 100% on code before MAPREDUCE-2541,
     // so it is repeatable in practice.
-    JobConf conf = new JobConf();
-    FileSystem fs = FileSystem.getLocal(conf).getRaw();
-    Path p = new Path(System.getProperty("test.build.data", "/tmp"),
-                      "cache").makeQualified(fs);
     fs.delete(p, true);
     conf.setInt(TTConfig.TT_INDEX_CACHE, 10);
     // Make a big file so removeMapThread almost surely runs faster than 
@@ -247,6 +242,66 @@ public class TestIndexCache extends Test
     }      
   }
   
+  public void testCreateRace() throws Exception {
+    fs.delete(p, true);
+    conf.setInt(TTConfig.TT_INDEX_CACHE, 1);
+    final int partsPerMap = 1000;
+    final int bytesPerFile = partsPerMap * 24;
+    final IndexCache cache = new IndexCache(conf);
+    
+    final Path racy = new Path(p, "racyIndex");
+    final String user =  
+      UserGroupInformation.getCurrentUser().getShortUserName();
+    writeFile(fs, racy, bytesPerFile, partsPerMap);
+
+    // run multiple instances
+    Thread[] getInfoThreads = new Thread[50];
+    for (int i = 0; i < 50; i++) {
+      getInfoThreads[i] = new Thread() {
+        @Override
+        public void run() {
+          try {
+            cache.getIndexInformation("racyIndex", partsPerMap, racy, user);
+            cache.removeMap("racyIndex");
+          } catch (Exception e) {
+            // should not be here
+          }
+        }
+      };
+    }
+
+    for (int i = 0; i < 50; i++) {
+      getInfoThreads[i].start();
+    }
+
+    final Thread mainTestThread = Thread.currentThread();
+
+    Thread timeoutThread = new Thread() {
+      @Override
+      public void run() {
+        try {
+          Thread.sleep(15000);
+          mainTestThread.interrupt();
+        } catch (InterruptedException ie) {
+          // we are done;
+        }
+      }
+    };
+
+    for (int i = 0; i < 50; i++) {
+      try {
+        getInfoThreads[i].join();
+      } catch (InterruptedException ie) {
+        // we haven't finished in time. Potential deadlock/race.
+        fail("Unexpectedly long delay during concurrent cache entry creations");
+      }
+    }
+    // stop the timeoutThread. If we get interrupted before stopping, there
+    // must be something wrong, although it wasn't a deadlock. No need to
+    // catch and swallow.
+    timeoutThread.interrupt();
+  }
+
   private static void checkRecord(IndexRecord rec, long fill) {
     assertEquals(fill, rec.startOffset);
     assertEquals(fill, rec.rawLength);

Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryFileManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryFileManager.java?rev=1369164&r1=1369163&r2=1369164&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryFileManager.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryFileManager.java Fri Aug  3 19:00:15 2012
@@ -336,7 +336,7 @@ public class HistoryFileManager extends 
     public synchronized Configuration loadConfFile() throws IOException {
       FileContext fc = FileContext.getFileContext(confFile.toUri(), conf);
       Configuration jobConf = new Configuration(false);
-      jobConf.addResource(fc.open(confFile));
+      jobConf.addResource(fc.open(confFile), confFile.toString());
       return jobConf;
     }
   }

Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistoryServer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistoryServer.java?rev=1369164&r1=1369163&r2=1369164&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistoryServer.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistoryServer.java Fri Aug  3 19:00:15 2012
@@ -31,6 +31,7 @@ import org.apache.hadoop.security.Securi
 import org.apache.hadoop.util.ShutdownHookManager;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.yarn.YarnException;
+import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.Dispatcher;
 import org.apache.hadoop.yarn.service.CompositeService;
@@ -122,6 +123,7 @@ public class JobHistoryServer extends Co
   }
 
   public static void main(String[] args) {
+    Thread.setDefaultUncaughtExceptionHandler(new YarnUncaughtExceptionHandler());
     StringUtils.startupShutdownMessage(JobHistoryServer.class, args, LOG);
     try {
       JobHistoryServer jobHistoryServer = new JobHistoryServer();

Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/webapp/HsWebApp.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/webapp/HsWebApp.java?rev=1369164&r1=1369163&r2=1369164&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/webapp/HsWebApp.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/webapp/HsWebApp.java Fri Aug  3 19:00:15 2012
@@ -21,6 +21,7 @@ package org.apache.hadoop.mapreduce.v2.h
 import static org.apache.hadoop.yarn.util.StringHelper.pajoin;
 import static org.apache.hadoop.yarn.webapp.YarnWebParams.APP_OWNER;
 import static org.apache.hadoop.yarn.webapp.YarnWebParams.CONTAINER_ID;
+import static org.apache.hadoop.yarn.webapp.YarnWebParams.CONTAINER_LOG_TYPE;
 import static org.apache.hadoop.yarn.webapp.YarnWebParams.ENTITY_STRING;
 import static org.apache.hadoop.yarn.webapp.YarnWebParams.NM_NODENAME;
 
@@ -60,10 +61,10 @@ public class HsWebApp extends WebApp imp
     route(pajoin("/singletaskcounter",TASK_ID, COUNTER_GROUP, COUNTER_NAME),
         HsController.class, "singleTaskCounter");
     route("/about", HsController.class, "about");
-    route(pajoin("/logs", NM_NODENAME, CONTAINER_ID, ENTITY_STRING, APP_OWNER),
-        HsController.class, "logs");
-    route(pajoin("/nmlogs", NM_NODENAME, CONTAINER_ID, ENTITY_STRING, APP_OWNER),
-        HsController.class, "nmlogs");
+    route(pajoin("/logs", NM_NODENAME, CONTAINER_ID, ENTITY_STRING, APP_OWNER,
+        CONTAINER_LOG_TYPE), HsController.class, "logs");
+    route(pajoin("/nmlogs", NM_NODENAME, CONTAINER_ID, ENTITY_STRING, APP_OWNER,
+        CONTAINER_LOG_TYPE), HsController.class, "nmlogs");
   }
 }
 



Mime
View raw message