hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From szets...@apache.org
Subject svn commit: r1407706 - in /hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project: ./ conf/ hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/ hadoop-mapreduce-client/hadoop-mapreduce-clien...
Date Sat, 10 Nov 2012 00:49:26 GMT
Author: szetszwo
Date: Sat Nov 10 00:49:15 2012
New Revision: 1407706

URL: http://svn.apache.org/viewvc?rev=1407706&view=rev
Log:
Merge r1406415 through r1407703 from trunk.

Added:
    hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/
      - copied from r1407703, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/
    hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/apt/
      - copied from r1407703, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/apt/
    hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/apt/EncryptedShuffle.apt.vm
      - copied unchanged from r1407703, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/apt/EncryptedShuffle.apt.vm
Removed:
    hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/build-utils.xml
    hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/build.xml
    hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/ivy/
    hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/ivy.xml
    hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/src/
Modified:
    hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/   (props changed)
    hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/CHANGES.txt   (contents, props
changed)
    hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/conf/   (props changed)
    hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
    hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFetchFailure.java
    hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java
    hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
    hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/NLineInputFormat.java
    hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java
    hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleScheduler.java
    hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
  (contents, props changed)
    hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java
    hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistoryServer.java
    hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestIFile.java
    hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestNLineInputFormat.java

Propchange: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/
------------------------------------------------------------------------------
  Merged /hadoop/common/trunk/hadoop-mapreduce-project:r1406415-1407703

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/CHANGES.txt?rev=1407706&r1=1407705&r2=1407706&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/CHANGES.txt Sat Nov 10 00:49:15
2012
@@ -194,6 +194,9 @@ Release 2.0.3-alpha - Unreleased 
 
     MAPREDUCE-1806. CombineFileInputFormat does not work with paths not on default FS. (Gera
Shegalov via tucu)
 
+    MAPREDUCE-4777. In TestIFile, testIFileReaderWithCodec relies on
+    testIFileWriterWithCodec. (Sandy Ryza via tomwhite)
+
 Release 2.0.2-alpha - 2012-09-07 
 
   INCOMPATIBLE CHANGES
@@ -584,6 +587,10 @@ Release 0.23.5 - UNRELEASED
     MAPREDUCE-4752. Reduce MR AM memory usage through String Interning (Robert
     Evans via tgraves)
 
+    MAPREDUCE-4266. remove Ant remnants from MR (tgraves via bobby)
+
+    MAPREDUCE-4666. JVM metrics for history server (jlowe via jeagles)
+
   OPTIMIZATIONS
 
   BUG FIXES
@@ -634,6 +641,15 @@ Release 0.23.5 - UNRELEASED
 
     MAPREDUCE-4771. KeyFieldBasedPartitioner not partitioning properly when
     configured (jlowe via bobby)
+
+    MAPREDUCE-4772. Fetch failures can take way too long for a map to be 
+    restarted (bobby)
+
+    MAPREDUCE-4782. NLineInputFormat skips first line of last InputSplit 
+    (Mark Fuhs via bobby)
+
+    MAPREDUCE-4774. JobImpl does not handle asynchronous task events in FAILED
+    state (jlowe via bobby)
  
 Release 0.23.4 - UNRELEASED
 

Propchange: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/CHANGES.txt
------------------------------------------------------------------------------
  Merged /hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt:r1406415-1407703

Propchange: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/conf/
------------------------------------------------------------------------------
  Merged /hadoop/common/trunk/hadoop-mapreduce-project/conf:r1406415-1407703

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java?rev=1407706&r1=1407705&r2=1407706&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
(original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
Sat Nov 10 00:49:15 2012
@@ -68,6 +68,7 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.mapreduce.v2.api.records.JobId;
 import org.apache.hadoop.mapreduce.v2.api.records.JobReport;
 import org.apache.hadoop.mapreduce.v2.api.records.JobState;
+import org.apache.hadoop.mapreduce.v2.api.records.Phase;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEvent;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEventStatus;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
@@ -347,6 +348,9 @@ public class JobImpl implements org.apac
           .addTransition(JobStateInternal.FAILED, JobStateInternal.FAILED,
               EnumSet.of(JobEventType.JOB_KILL, 
                   JobEventType.JOB_UPDATED_NODES,
+                  JobEventType.JOB_TASK_COMPLETED,
+                  JobEventType.JOB_TASK_ATTEMPT_COMPLETED,
+                  JobEventType.JOB_MAP_TASK_RESCHEDULED,
                   JobEventType.JOB_TASK_ATTEMPT_FETCH_FAILURE))
 
           // Transitions from KILLED state
@@ -1409,16 +1413,22 @@ public class JobImpl implements org.apac
         fetchFailures = (fetchFailures == null) ? 1 : (fetchFailures+1);
         job.fetchFailuresMapping.put(mapId, fetchFailures);
         
-        //get number of running reduces
-        int runningReduceTasks = 0;
+        //get number of shuffling reduces
+        int shufflingReduceTasks = 0;
         for (TaskId taskId : job.reduceTasks) {
-          if (TaskState.RUNNING.equals(job.tasks.get(taskId).getState())) {
-            runningReduceTasks++;
+          Task task = job.tasks.get(taskId);
+          if (TaskState.RUNNING.equals(task.getState())) {
+            for(TaskAttempt attempt : task.getAttempts().values()) {
+              if(attempt.getReport().getPhase() == Phase.SHUFFLE) {
+                shufflingReduceTasks++;
+                break;
+              }
+            }
           }
         }
         
-        float failureRate = runningReduceTasks == 0 ? 1.0f : 
-          (float) fetchFailures / runningReduceTasks;
+        float failureRate = shufflingReduceTasks == 0 ? 1.0f : 
+          (float) fetchFailures / shufflingReduceTasks;
         // declare faulty if fetch-failures >= max-allowed-failures
         boolean isMapFaulty =
             (failureRate >= MAX_ALLOWED_FETCH_FAILURES_FRACTION);

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFetchFailure.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFetchFailure.java?rev=1407706&r1=1407705&r2=1407706&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFetchFailure.java
(original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFetchFailure.java
Sat Nov 10 00:49:15 2012
@@ -18,14 +18,19 @@
 
 package org.apache.hadoop.mapreduce.v2.app;
 
+import static org.junit.Assert.assertEquals;
+
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Iterator;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.Counters;
 import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
 import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEventHandler;
 import org.apache.hadoop.mapreduce.v2.api.records.JobState;
+import org.apache.hadoop.mapreduce.v2.api.records.Phase;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEvent;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEventStatus;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
@@ -37,6 +42,7 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.mapreduce.v2.app.job.event.JobTaskAttemptFetchFailureEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.junit.Assert;
 import org.junit.Test;
@@ -254,6 +260,169 @@ public class TestFetchFailure {
     events = job.getTaskAttemptCompletionEvents(0, 100);
     Assert.assertEquals("Num completion events not correct", 2, events.length);
   }
+  
+  @Test
+  public void testFetchFailureMultipleReduces() throws Exception {
+    MRApp app = new MRApp(1, 3, false, this.getClass().getName(), true);
+    Configuration conf = new Configuration();
+    // map -> reduce -> fetch-failure -> map retry is incompatible with
+    // sequential, single-task-attempt approach in uber-AM, so disable:
+    conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
+    Job job = app.submit(conf);
+    app.waitForState(job, JobState.RUNNING);
+    //all maps would be running
+    Assert.assertEquals("Num tasks not correct",
+       4, job.getTasks().size());
+    Iterator<Task> it = job.getTasks().values().iterator();
+    Task mapTask = it.next();
+    Task reduceTask = it.next();
+    Task reduceTask2 = it.next();
+    Task reduceTask3 = it.next();
+    
+    //wait for Task state move to RUNNING
+    app.waitForState(mapTask, TaskState.RUNNING);
+    TaskAttempt mapAttempt1 = mapTask.getAttempts().values().iterator().next();
+    app.waitForState(mapAttempt1, TaskAttemptState.RUNNING);
+
+    //send the done signal to the map attempt
+    app.getContext().getEventHandler().handle(
+        new TaskAttemptEvent(mapAttempt1.getID(),
+            TaskAttemptEventType.TA_DONE));
+    
+    // wait for map success
+    app.waitForState(mapTask, TaskState.SUCCEEDED);
+    
+    TaskAttemptCompletionEvent[] events = 
+      job.getTaskAttemptCompletionEvents(0, 100);
+    Assert.assertEquals("Num completion events not correct",
+        1, events.length);
+    Assert.assertEquals("Event status not correct",
+        TaskAttemptCompletionEventStatus.SUCCEEDED, events[0].getStatus());
+    
+    // wait for reduce to start running
+    app.waitForState(reduceTask, TaskState.RUNNING);
+    app.waitForState(reduceTask2, TaskState.RUNNING);
+    app.waitForState(reduceTask3, TaskState.RUNNING);
+    TaskAttempt reduceAttempt = 
+      reduceTask.getAttempts().values().iterator().next();
+    app.waitForState(reduceAttempt, TaskAttemptState.RUNNING);
+    
+    updateStatus(app, reduceAttempt, Phase.SHUFFLE);
+    
+    TaskAttempt reduceAttempt2 = 
+      reduceTask2.getAttempts().values().iterator().next();
+    app.waitForState(reduceAttempt2, TaskAttemptState.RUNNING);
+    updateStatus(app, reduceAttempt2, Phase.SHUFFLE);
+    
+    TaskAttempt reduceAttempt3 = 
+      reduceTask3.getAttempts().values().iterator().next();
+    app.waitForState(reduceAttempt3, TaskAttemptState.RUNNING);
+    updateStatus(app, reduceAttempt3, Phase.SHUFFLE);
+    
+    //send 3 fetch failures from reduce to trigger map re execution
+    sendFetchFailure(app, reduceAttempt, mapAttempt1);
+    sendFetchFailure(app, reduceAttempt, mapAttempt1);
+    sendFetchFailure(app, reduceAttempt, mapAttempt1);
+    
+    //We should not re-launch the map task yet
+    assertEquals(TaskState.SUCCEEDED, mapTask.getState());
+    updateStatus(app, reduceAttempt2, Phase.REDUCE);
+    updateStatus(app, reduceAttempt3, Phase.REDUCE);
+    
+    sendFetchFailure(app, reduceAttempt, mapAttempt1);
+    
+    //wait for map Task state move back to RUNNING
+    app.waitForState(mapTask, TaskState.RUNNING);
+    
+    //map attempt must have become FAILED
+    Assert.assertEquals("Map TaskAttempt state not correct",
+        TaskAttemptState.FAILED, mapAttempt1.getState());
+
+    Assert.assertEquals("Num attempts in Map Task not correct",
+        2, mapTask.getAttempts().size());
+    
+    Iterator<TaskAttempt> atIt = mapTask.getAttempts().values().iterator();
+    atIt.next();
+    TaskAttempt mapAttempt2 = atIt.next();
+    
+    app.waitForState(mapAttempt2, TaskAttemptState.RUNNING);
+   //send the done signal to the second map attempt
+    app.getContext().getEventHandler().handle(
+        new TaskAttemptEvent(mapAttempt2.getID(),
+            TaskAttemptEventType.TA_DONE));
+    
+    // wait for map success
+    app.waitForState(mapTask, TaskState.SUCCEEDED);
+    
+    //send done to reduce
+    app.getContext().getEventHandler().handle(
+        new TaskAttemptEvent(reduceAttempt.getID(),
+            TaskAttemptEventType.TA_DONE));
+    
+    //send done to reduce
+    app.getContext().getEventHandler().handle(
+        new TaskAttemptEvent(reduceAttempt2.getID(),
+            TaskAttemptEventType.TA_DONE));
+    
+    //send done to reduce
+    app.getContext().getEventHandler().handle(
+        new TaskAttemptEvent(reduceAttempt3.getID(),
+            TaskAttemptEventType.TA_DONE));
+
+    app.waitForState(job, JobState.SUCCEEDED);
+    
+    //previous completion event now becomes obsolete
+    Assert.assertEquals("Event status not correct",
+        TaskAttemptCompletionEventStatus.OBSOLETE, events[0].getStatus());
+    
+    events = job.getTaskAttemptCompletionEvents(0, 100);
+    Assert.assertEquals("Num completion events not correct",
+        6, events.length);
+    Assert.assertEquals("Event map attempt id not correct",
+        mapAttempt1.getID(), events[0].getAttemptId());
+    Assert.assertEquals("Event map attempt id not correct",
+        mapAttempt1.getID(), events[1].getAttemptId());
+    Assert.assertEquals("Event map attempt id not correct",
+        mapAttempt2.getID(), events[2].getAttemptId());
+    Assert.assertEquals("Event reduce attempt id not correct",
+        reduceAttempt.getID(), events[3].getAttemptId());
+    Assert.assertEquals("Event status not correct for map attempt1",
+        TaskAttemptCompletionEventStatus.OBSOLETE, events[0].getStatus());
+    Assert.assertEquals("Event status not correct for map attempt1",
+        TaskAttemptCompletionEventStatus.FAILED, events[1].getStatus());
+    Assert.assertEquals("Event status not correct for map attempt2",
+        TaskAttemptCompletionEventStatus.SUCCEEDED, events[2].getStatus());
+    Assert.assertEquals("Event status not correct for reduce attempt1",
+        TaskAttemptCompletionEventStatus.SUCCEEDED, events[3].getStatus());
+
+    TaskAttemptCompletionEvent mapEvents[] =
+        job.getMapAttemptCompletionEvents(0, 2);
+    Assert.assertEquals("Incorrect number of map events", 2, mapEvents.length);
+    Assert.assertArrayEquals("Unexpected map events",
+        Arrays.copyOfRange(events, 0, 2), mapEvents);
+    mapEvents = job.getMapAttemptCompletionEvents(2, 200);
+    Assert.assertEquals("Incorrect number of map events", 1, mapEvents.length);
+    Assert.assertEquals("Unexpected map event", events[2], mapEvents[0]);
+  }
+  
+
+  private void updateStatus(MRApp app, TaskAttempt attempt, Phase phase) {
+    TaskAttemptStatusUpdateEvent.TaskAttemptStatus status = new TaskAttemptStatusUpdateEvent.TaskAttemptStatus();
+    status.counters = new Counters();
+    status.fetchFailedMaps = new ArrayList<TaskAttemptId>();
+    status.id = attempt.getID();
+    status.mapFinishTime = 0;
+    status.outputSize = 0;
+    status.phase = phase;
+    status.progress = 0.5f;
+    status.shuffleFinishTime = 0;
+    status.sortFinishTime = 0;
+    status.stateString = "OK";
+    status.taskState = attempt.getState();
+    TaskAttemptStatusUpdateEvent event = new TaskAttemptStatusUpdateEvent(attempt.getID(),
+        status);
+    app.getContext().getEventHandler().handle(event);
+  }
 
   private void sendFetchFailure(MRApp app, TaskAttempt reduceAttempt, 
       TaskAttempt mapAttempt) {

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java?rev=1407706&r1=1407705&r2=1407706&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java
(original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java
Sat Nov 10 00:49:15 2012
@@ -27,6 +27,7 @@ import static org.mockito.Mockito.verify
 import static org.mockito.Mockito.when;
 
 import java.io.IOException;
+import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.Map;
 
@@ -42,6 +43,7 @@ import org.apache.hadoop.mapreduce.jobhi
 import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
 import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
 import org.apache.hadoop.mapreduce.v2.api.records.JobId;
+import org.apache.hadoop.mapreduce.v2.api.records.JobState;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
 import org.apache.hadoop.mapreduce.v2.app.job.JobStateInternal;
 import org.apache.hadoop.mapreduce.v2.app.job.Task;
@@ -51,10 +53,14 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl.InitTransition;
 import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl.JobNoTasksCompletedTransition;
 import org.apache.hadoop.mapreduce.v2.app.metrics.MRAppMetrics;
+import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.yarn.SystemClock;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.InlineDispatcher;
+import org.apache.hadoop.yarn.state.StateMachine;
+import org.apache.hadoop.yarn.state.StateMachineFactory;
 import org.apache.hadoop.yarn.util.Records;
 import org.junit.Assert;
 import org.junit.Test;
@@ -340,7 +346,7 @@ public class TestJobImpl {
     return isUber;
   }
 
-  private InitTransition getInitTransition() {
+  private static InitTransition getInitTransition() {
     InitTransition initTransition = new InitTransition() {
       @Override
       protected TaskSplitMetaInfo[] createSplits(JobImpl job, JobId jobId) {
@@ -350,4 +356,63 @@ public class TestJobImpl {
     };
     return initTransition;
   }
+
+  @Test
+  public void testTransitionsAtFailed() throws IOException {
+    Configuration conf = new Configuration();
+    JobID jobID = JobID.forName("job_1234567890000_0001");
+    JobId jobId = TypeConverter.toYarn(jobID);
+    OutputCommitter committer = mock(OutputCommitter.class);
+    doThrow(new IOException("forcefail"))
+      .when(committer).setupJob(any(JobContext.class));
+    InlineDispatcher dispatcher = new InlineDispatcher();
+    JobImpl job = new StubbedJob(jobId, Records
+        .newRecord(ApplicationAttemptId.class), conf,
+        dispatcher.getEventHandler(), committer, true, null);
+
+    dispatcher.register(JobEventType.class, job);
+    job.handle(new JobEvent(jobId, JobEventType.JOB_INIT));
+    Assert.assertEquals(JobState.FAILED, job.getState());
+
+    job.handle(new JobEvent(jobId, JobEventType.JOB_TASK_COMPLETED));
+    Assert.assertEquals(JobState.FAILED, job.getState());
+    job.handle(new JobEvent(jobId, JobEventType.JOB_TASK_ATTEMPT_COMPLETED));
+    Assert.assertEquals(JobState.FAILED, job.getState());
+    job.handle(new JobEvent(jobId, JobEventType.JOB_MAP_TASK_RESCHEDULED));
+    Assert.assertEquals(JobState.FAILED, job.getState());
+    job.handle(new JobEvent(jobId, JobEventType.JOB_TASK_ATTEMPT_FETCH_FAILURE));
+    Assert.assertEquals(JobState.FAILED, job.getState());
+  }
+
+  private static class StubbedJob extends JobImpl {
+    //override the init transition
+    private final InitTransition initTransition = getInitTransition();
+    StateMachineFactory<JobImpl, JobStateInternal, JobEventType, JobEvent> localFactory
+        = stateMachineFactory.addTransition(JobStateInternal.NEW,
+            EnumSet.of(JobStateInternal.INITED, JobStateInternal.FAILED),
+            JobEventType.JOB_INIT,
+            // This is abusive.
+            initTransition);
+
+    private final StateMachine<JobStateInternal, JobEventType, JobEvent>
+        localStateMachine;
+
+    @Override
+    protected StateMachine<JobStateInternal, JobEventType, JobEvent> getStateMachine()
{
+      return localStateMachine;
+    }
+
+    public StubbedJob(JobId jobId, ApplicationAttemptId applicationAttemptId,
+        Configuration conf, EventHandler eventHandler,
+        OutputCommitter committer, boolean newApiCommitter, String user) {
+      super(jobId, applicationAttemptId, conf, eventHandler,
+          null, new JobTokenSecretManager(), new Credentials(),
+          new SystemClock(), null, MRAppMetrics.create(), committer,
+          newApiCommitter, user, System.currentTimeMillis(), null, null);
+
+      // This "this leak" is okay because the retained pointer is in an
+      //  instance variable.
+      localStateMachine = localFactory.make(this);
+    }
+  }
 }

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java?rev=1407706&r1=1407705&r2=1407706&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
(original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
Sat Nov 10 00:49:15 2012
@@ -262,6 +262,9 @@ public interface MRJobConfig {
   public static final String SHUFFLE_FETCH_FAILURES = "mapreduce.reduce.shuffle.maxfetchfailures";
 
   public static final String SHUFFLE_NOTIFY_READERROR = "mapreduce.reduce.shuffle.notify.readerror";
+  
+  public static final String MAX_SHUFFLE_FETCH_RETRY_DELAY = "mapreduce.reduce.shuffle.retry-delay.max.ms";
+  public static final long DEFAULT_MAX_SHUFFLE_FETCH_RETRY_DELAY = 60000;
 
   public static final String REDUCE_SKIP_INCR_PROC_COUNT = "mapreduce.reduce.skip.proc-count.auto-incr";
 

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/NLineInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/NLineInputFormat.java?rev=1407706&r1=1407705&r2=1407706&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/NLineInputFormat.java
(original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/NLineInputFormat.java
Sat Nov 10 00:49:15 2012
@@ -107,25 +107,14 @@ public class NLineInputFormat extends Fi
         numLines++;
         length += num;
         if (numLines == numLinesPerSplit) {
-          // NLineInputFormat uses LineRecordReader, which always reads
-          // (and consumes) at least one character out of its upper split
-          // boundary. So to make sure that each mapper gets N lines, we
-          // move back the upper split limits of each split 
-          // by one character here.
-          if (begin == 0) {
-            splits.add(new FileSplit(fileName, begin, length - 1,
-              new String[] {}));
-          } else {
-            splits.add(new FileSplit(fileName, begin - 1, length,
-              new String[] {}));
-          }
+          splits.add(createFileSplit(fileName, begin, length));
           begin += length;
           length = 0;
           numLines = 0;
         }
       }
       if (numLines != 0) {
-        splits.add(new FileSplit(fileName, begin, length, new String[]{}));
+        splits.add(createFileSplit(fileName, begin, length));
       }
     } finally {
       if (lr != null) {
@@ -134,6 +123,23 @@ public class NLineInputFormat extends Fi
     }
     return splits; 
   }
+
+  /**
+   * NLineInputFormat uses LineRecordReader, which always reads
+   * (and consumes) at least one character out of its upper split
+   * boundary. So to make sure that each mapper gets N lines, we
+   * move back the upper split limits of each split 
+   * by one character here.
+   * @param fileName  Path of file
+   * @param begin  the position of the first byte in the file to process
+   * @param length  number of bytes in InputSplit
+   * @return  FileSplit
+   */
+  protected static FileSplit createFileSplit(Path fileName, long begin, long length) {
+    return (begin == 0) 
+    ? new FileSplit(fileName, begin, length - 1, new String[] {})
+    : new FileSplit(fileName, begin - 1, length, new String[] {});
+  }
   
   /**
    * Set the number of lines per split

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java?rev=1407706&r1=1407705&r2=1407706&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java
(original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java
Sat Nov 10 00:49:15 2012
@@ -21,6 +21,7 @@ import java.io.DataInputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
+import java.net.ConnectException;
 import java.net.HttpURLConnection;
 import java.net.MalformedURLException;
 import java.net.URL;
@@ -283,6 +284,7 @@ class Fetcher<K,V> extends Thread {
       SecureShuffleUtils.verifyReply(replyHash, encHash, jobTokenSecret);
       LOG.info("for url="+msgToEncode+" sent hash and receievd reply");
     } catch (IOException ie) {
+      boolean connectExcpt = ie instanceof ConnectException;
       ioErrs.increment(1);
       LOG.warn("Failed to connect to " + host + " with " + remaining.size() + 
                " map outputs", ie);
@@ -291,14 +293,14 @@ class Fetcher<K,V> extends Thread {
       // indirectly penalizing the host
       if (!connectSucceeded) {
         for(TaskAttemptID left: remaining) {
-          scheduler.copyFailed(left, host, connectSucceeded);
+          scheduler.copyFailed(left, host, connectSucceeded, connectExcpt);
         }
       } else {
         // If we got a read error at this stage, it implies there was a problem
         // with the first map, typically lost map. So, penalize only that map
         // and add the rest
         TaskAttemptID firstMap = maps.get(0);
-        scheduler.copyFailed(firstMap, host, connectSucceeded);
+        scheduler.copyFailed(firstMap, host, connectSucceeded, connectExcpt);
       }
       
       // Add back all the remaining maps, WITHOUT marking them as failed
@@ -322,7 +324,7 @@ class Fetcher<K,V> extends Thread {
       if(failedTasks != null && failedTasks.length > 0) {
         LOG.warn("copyMapOutput failed for tasks "+Arrays.toString(failedTasks));
         for(TaskAttemptID left: failedTasks) {
-          scheduler.copyFailed(left, host, true);
+          scheduler.copyFailed(left, host, true, false);
         }
       }
       

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleScheduler.java?rev=1407706&r1=1407705&r2=1407706&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleScheduler.java
(original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleScheduler.java
Sat Nov 10 00:49:15 2012
@@ -89,6 +89,7 @@ class ShuffleScheduler<K,V> {
   private DecimalFormat  mbpsFormat = new DecimalFormat("0.00");
 
   private boolean reportReadErrorImmediately = true;
+  private long maxDelay = MRJobConfig.DEFAULT_MAX_SHUFFLE_FETCH_RETRY_DELAY;
   
   public ShuffleScheduler(JobConf job, TaskStatus status,
                           ExceptionReporter reporter,
@@ -115,6 +116,9 @@ class ShuffleScheduler<K,V> {
         MRJobConfig.SHUFFLE_FETCH_FAILURES, REPORT_FAILURE_LIMIT);
     this.reportReadErrorImmediately = job.getBoolean(
         MRJobConfig.SHUFFLE_NOTIFY_READERROR, true);
+    
+    this.maxDelay = job.getLong(MRJobConfig.MAX_SHUFFLE_FETCH_RETRY_DELAY, 
+        MRJobConfig.DEFAULT_MAX_SHUFFLE_FETCH_RETRY_DELAY);
   }
 
   public synchronized void copySucceeded(TaskAttemptID mapId, 
@@ -159,7 +163,7 @@ class ShuffleScheduler<K,V> {
   }
 
   public synchronized void copyFailed(TaskAttemptID mapId, MapHost host,
-                                      boolean readError) {
+                                      boolean readError, boolean connectExcpt) {
     host.penalize();
     int failures = 1;
     if (failureCounts.containsKey(mapId)) {
@@ -184,12 +188,15 @@ class ShuffleScheduler<K,V> {
       }
     }
     
-    checkAndInformJobTracker(failures, mapId, readError);
+    checkAndInformJobTracker(failures, mapId, readError, connectExcpt);
 
     checkReducerHealth();
     
     long delay = (long) (INITIAL_PENALTY *
         Math.pow(PENALTY_GROWTH_RATE, failures));
+    if (delay > maxDelay) {
+      delay = maxDelay;
+    }
     
     penalties.add(new Penalty(host, delay));
     
@@ -200,8 +207,9 @@ class ShuffleScheduler<K,V> {
   // after every read error, if 'reportReadErrorImmediately' is true or
   // after every 'maxFetchFailuresBeforeReporting' failures
   private void checkAndInformJobTracker(
-      int failures, TaskAttemptID mapId, boolean readError) {
-    if ((reportReadErrorImmediately && readError)
+      int failures, TaskAttemptID mapId, boolean readError, 
+      boolean connectExcpt) {
+    if (connectExcpt || (reportReadErrorImmediately && readError)
         || ((failures % maxFetchFailuresBeforeReporting) == 0)) {
       LOG.info("Reporting fetch failure for " + mapId + " to jobtracker.");
       status.addFetchFailedMap((org.apache.hadoop.mapred.TaskAttemptID) mapId);

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml?rev=1407706&r1=1407705&r2=1407706&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
(original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
Sat Nov 10 00:49:15 2012
@@ -111,6 +111,14 @@
 </property>
 
 <property>
+  <name>mapreduce.reduce.shuffle.retry-delay.max.ms</name>
+  <value>60000</value>
+  <description>The maximum number of ms the reducer will delay before retrying
+  to download map data.
+  </description>
+</property>
+
+<property>
   <name>mapreduce.reduce.shuffle.parallelcopies</name>
   <value>5</value>
   <description>The default number of parallel transfers run by reduce

Propchange: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
------------------------------------------------------------------------------
  Merged /hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml:r1406415-1407703

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java?rev=1407706&r1=1407705&r2=1407706&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java
(original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java
Sat Nov 10 00:49:15 2012
@@ -118,8 +118,8 @@ public class TestFetcher {
           encHash);
     
     verify(allErrs).increment(1);
-    verify(ss).copyFailed(map1ID, host, true);
-    verify(ss).copyFailed(map2ID, host, true);
+    verify(ss).copyFailed(map1ID, host, true, false);
+    verify(ss).copyFailed(map2ID, host, true, false);
     
     verify(ss).putBackKnownMapOutput(any(MapHost.class), eq(map1ID));
     verify(ss).putBackKnownMapOutput(any(MapHost.class), eq(map2ID));
@@ -178,8 +178,8 @@ public class TestFetcher {
       .addRequestProperty(SecureShuffleUtils.HTTP_HEADER_URL_HASH, 
           encHash);
     verify(allErrs, never()).increment(1);
-    verify(ss, never()).copyFailed(map1ID, host, true);
-    verify(ss, never()).copyFailed(map2ID, host, true);
+    verify(ss, never()).copyFailed(map1ID, host, true, false);
+    verify(ss, never()).copyFailed(map2ID, host, true, false);
     
     verify(ss).putBackKnownMapOutput(any(MapHost.class), eq(map1ID));
     verify(ss).putBackKnownMapOutput(any(MapHost.class), eq(map2ID));

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistoryServer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistoryServer.java?rev=1407706&r1=1407705&r2=1407706&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistoryServer.java
(original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistoryServer.java
Sat Nov 10 00:49:15 2012
@@ -27,6 +27,8 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapreduce.MRConfig;
 import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+import org.apache.hadoop.metrics2.source.JvmMetrics;
 import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.util.ShutdownHookManager;
 import org.apache.hadoop.util.StringUtils;
@@ -106,6 +108,8 @@ public class JobHistoryServer extends Co
 
   @Override
   public void start() {
+    DefaultMetricsSystem.initialize("JobHistoryServer");
+    JvmMetrics.initSingleton("JobHistoryServer", null);
     try {
       jhsDTSecretManager.startThreads();
     } catch(IOException io) {
@@ -118,6 +122,7 @@ public class JobHistoryServer extends Co
   @Override
   public void stop() {
     jhsDTSecretManager.stopThreads();
+    DefaultMetricsSystem.shutdown();
     super.stop();
   }
 

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestIFile.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestIFile.java?rev=1407706&r1=1407705&r2=1407706&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestIFile.java
(original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestIFile.java
Sat Nov 10 00:49:15 2012
@@ -56,6 +56,10 @@ public class TestIFile {
     Path path = new Path(new Path("build/test.ifile"), "data");
     DefaultCodec codec = new GzipCodec();
     codec.setConf(conf);
+    IFile.Writer<Text, Text> writer =
+        new IFile.Writer<Text, Text>(conf, rfs, path, Text.class, Text.class,
+                                     codec, null);
+    writer.close();
     IFile.Reader<Text, Text> reader =
       new IFile.Reader<Text, Text>(conf, rfs, path, codec, null);
     reader.close();

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestNLineInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestNLineInputFormat.java?rev=1407706&r1=1407705&r2=1407706&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestNLineInputFormat.java
(original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestNLineInputFormat.java
Sat Nov 10 00:49:15 2012
@@ -50,37 +50,40 @@ public class TestNLineInputFormat extend
     Job job = Job.getInstance(conf);
     Path file = new Path(workDir, "test.txt");
 
-    int seed = new Random().nextInt();
-    Random random = new Random(seed);
-
     localFs.delete(workDir, true);
     FileInputFormat.setInputPaths(job, workDir);
     int numLinesPerMap = 5;
     NLineInputFormat.setNumLinesPerSplit(job, numLinesPerMap);
-    // for a variety of lengths
     for (int length = 0; length < MAX_LENGTH;
-         length += random.nextInt(MAX_LENGTH / 10) + 1) {
+         length += 1) {
+ 
       // create a file with length entries
       Writer writer = new OutputStreamWriter(localFs.create(file));
       try {
         for (int i = 0; i < length; i++) {
-          writer.write(Integer.toString(i));
+          writer.write(Integer.toString(i)+" some more text");
           writer.write("\n");
         }
       } finally {
         writer.close();
       }
-      checkFormat(job, numLinesPerMap);
+      int lastN = 0;
+      if (length != 0) {
+        lastN = length % 5;
+        if (lastN == 0) {
+          lastN = 5;
+        }
+      }
+      checkFormat(job, numLinesPerMap, lastN);
     }
   }
 
-  void checkFormat(Job job, int expectedN) 
+  void checkFormat(Job job, int expectedN, int lastN) 
       throws IOException, InterruptedException {
     NLineInputFormat format = new NLineInputFormat();
     List<InputSplit> splits = format.getSplits(job);
-    // check all splits except last one
     int count = 0;
-    for (int i = 0; i < splits.size() -1; i++) {
+    for (int i = 0; i < splits.size(); i++) {
       assertEquals("There are no split locations", 0,
                    splits.get(i).getLocations().length);
       TaskAttemptContext context = MapReduceTestUtil.
@@ -104,8 +107,13 @@ public class TestNLineInputFormat extend
       } finally {
         reader.close();
       }
-      assertEquals("number of lines in split is " + expectedN ,
-                   expectedN, count);
+      if ( i == splits.size() - 1) {
+        assertEquals("number of lines in split(" + i + ") is wrong" ,
+                     lastN, count);
+      } else {
+        assertEquals("number of lines in split(" + i + ") is wrong" ,
+                     expectedN, count);
+      }
     }
   }
   



Mime
View raw message