hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jia...@apache.org
Subject [18/47] hadoop git commit: MAPREDUCE-6433. launchTime may be negative. Contributed by Zhihai Xu
Date Wed, 05 Aug 2015 22:23:31 GMT
MAPREDUCE-6433. launchTime may be negative. Contributed by Zhihai Xu


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/93d50b78
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/93d50b78
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/93d50b78

Branch: refs/heads/YARN-1197
Commit: 93d50b782494af7eef980c4d596a59ff4e11646e
Parents: ab80e27
Author: Zhihai Xu <zxu@apache.org>
Authored: Thu Jul 30 23:07:31 2015 -0700
Committer: Zhihai Xu <zxu@apache.org>
Committed: Thu Jul 30 23:07:31 2015 -0700

----------------------------------------------------------------------
 hadoop-mapreduce-project/CHANGES.txt            |  2 +
 .../hadoop/mapreduce/v2/app/MRAppMaster.java    |  2 +-
 .../v2/app/job/event/JobStartEvent.java         |  2 +-
 .../mapreduce/v2/app/job/impl/JobImpl.java      |  2 +-
 .../mapreduce/v2/app/TestMRAppMaster.java       | 88 +++++++++++++++++++-
 .../mapreduce/jobhistory/EventWriter.java       | 19 ++++-
 6 files changed, 107 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/93d50b78/hadoop-mapreduce-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt
index 398ffc6..738dea5 100644
--- a/hadoop-mapreduce-project/CHANGES.txt
+++ b/hadoop-mapreduce-project/CHANGES.txt
@@ -542,6 +542,8 @@ Release 2.8.0 - UNRELEASED
 
     MAPREDUCE-6427. Fix typo in JobHistoryEventHandler. (Ray Chiang via cdouglas)
 
+    MAPREDUCE-6433. launchTime may be negative. (Zhihai Xu)
+
 Release 2.7.2 - UNRELEASED
 
   INCOMPATIBLE CHANGES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/93d50b78/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
index f199ecb..6dc830f 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
@@ -233,7 +233,7 @@ public class MRAppMaster extends CompositeService {
   JobStateInternal forcedState = null;
   private final ScheduledExecutorService logSyncer;
 
-  private long recoveredJobStartTime = 0;
+  private long recoveredJobStartTime = -1L;
   private static boolean mainStarted = false;
 
   @VisibleForTesting

http://git-wip-us.apache.org/repos/asf/hadoop/blob/93d50b78/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/JobStartEvent.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/JobStartEvent.java
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/JobStartEvent.java
index 39051da..a142c31 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/JobStartEvent.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/JobStartEvent.java
@@ -25,7 +25,7 @@ public class JobStartEvent extends JobEvent {
   long recoveredJobStartTime;
 
   public JobStartEvent(JobId jobID) {
-    this(jobID, 0);
+    this(jobID, -1L);
   }
 
   public JobStartEvent(JobId jobID, long recoveredJobStartTime) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/93d50b78/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
index 4c3b3fe..9d141eb 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
@@ -1629,7 +1629,7 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
     @Override
     public void transition(JobImpl job, JobEvent event) {
       JobStartEvent jse = (JobStartEvent) event;
-      if (jse.getRecoveredJobStartTime() != 0) {
+      if (jse.getRecoveredJobStartTime() != -1L) {
         job.startTime = jse.getRecoveredJobStartTime();
       } else {
         job.startTime = job.clock.getTime();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/93d50b78/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRAppMaster.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRAppMaster.java
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRAppMaster.java
index 63b201d..9e0dafc 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRAppMaster.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRAppMaster.java
@@ -31,9 +31,11 @@ import static org.mockito.Mockito.times;
 
 import java.io.File;
 import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
 import java.io.IOException;
 import java.lang.reflect.Field;
 import java.util.Collections;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.HashMap;
 import java.util.Map;
 
@@ -44,16 +46,21 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileContext;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.OutputCommitter;
 import org.apache.hadoop.mapreduce.TypeConverter;
+import org.apache.hadoop.mapreduce.jobhistory.EventType;
+import org.apache.hadoop.mapreduce.jobhistory.EventWriter;
 import org.apache.hadoop.mapreduce.jobhistory.HistoryEvent;
 import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
 import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEventHandler;
+import org.apache.hadoop.mapreduce.jobhistory.JobInitedEvent;
 import org.apache.hadoop.mapreduce.jobhistory.JobUnsuccessfulCompletionEvent;
+import org.apache.hadoop.mapreduce.split.JobSplitWriter;
 import org.apache.hadoop.mapreduce.v2.api.records.JobId;
 import org.apache.hadoop.mapreduce.v2.app.client.ClientService;
 import org.apache.hadoop.mapreduce.v2.app.commit.CommitterEvent;
@@ -61,6 +68,8 @@ import org.apache.hadoop.mapreduce.v2.app.commit.CommitterEventHandler;
 import org.apache.hadoop.mapreduce.v2.app.job.JobStateInternal;
 import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocator;
 import org.apache.hadoop.mapreduce.v2.app.rm.RMHeartbeatHandler;
+import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig;
+import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils;
 import org.apache.hadoop.mapreduce.v2.util.MRApps;
 import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
 import org.apache.hadoop.security.AccessControlException;
@@ -111,7 +120,7 @@ public class TestMRAppMaster {
     }
     dir.mkdirs();
   }
-  
+
   @Test
   public void testMRAppMasterForDifferentUser() throws IOException,
       InterruptedException {
@@ -170,7 +179,46 @@ public class TestMRAppMaster {
     // verify the final status is FAILED
     verifyFailedStatus((MRAppMasterTest)appMaster, "FAILED");
   }
-  
+
+  @Test
+  public void testMRAppMasterJobLaunchTime() throws IOException,
+      InterruptedException {
+    String applicationAttemptIdStr = "appattempt_1317529182569_0004_000002";
+    String containerIdStr = "container_1317529182569_0004_000002_1";
+    String userName = "TestAppMasterUser";
+    JobConf conf = new JobConf();
+    conf.set(MRJobConfig.MR_AM_STAGING_DIR, stagingDir);
+    conf.setInt(MRJobConfig.NUM_REDUCES, 0);
+    conf.set(JHAdminConfig.MR_HS_JHIST_FORMAT, "json");
+    ApplicationAttemptId applicationAttemptId = ConverterUtils
+        .toApplicationAttemptId(applicationAttemptIdStr);
+    JobId jobId = TypeConverter.toYarn(
+        TypeConverter.fromYarn(applicationAttemptId.getApplicationId()));
+
+    File dir = new File(MRApps.getStagingAreaDir(conf, userName).toString(),
+        jobId.toString());
+    dir.mkdirs();
+    File historyFile = new File(JobHistoryUtils.getStagingJobHistoryFile(
+        new Path(dir.toURI().toString()), jobId,
+        (applicationAttemptId.getAttemptId() - 1)).toUri().getRawPath());
+    historyFile.createNewFile();
+    FSDataOutputStream out = new FSDataOutputStream(
+        new FileOutputStream(historyFile), null);
+    EventWriter writer = new EventWriter(out, EventWriter.WriteMode.JSON);
+    writer.close();
+    FileSystem fs = FileSystem.get(conf);
+    JobSplitWriter.createSplitFiles(new Path(dir.getAbsolutePath()), conf,
+        fs, new org.apache.hadoop.mapred.InputSplit[0]);
+    ContainerId containerId = ConverterUtils.toContainerId(containerIdStr);
+    MRAppMasterTestLaunchTime appMaster =
+        new MRAppMasterTestLaunchTime(applicationAttemptId, containerId,
+            "host", -1, -1, System.currentTimeMillis());
+    MRAppMaster.initAndStartAppMaster(appMaster, conf, userName);
+    appMaster.stop();
+    assertTrue("Job launch time should not be negative.",
+            appMaster.jobLaunchTime.get() >= 0);
+  }
+
   @Test
   public void testMRAppMasterSuccessLock() throws IOException,
       InterruptedException {
@@ -585,3 +633,39 @@ class MRAppMasterTest extends MRAppMaster {
     return spyHistoryService;
   }
 }
+
+class MRAppMasterTestLaunchTime extends MRAppMasterTest {
+  final AtomicLong jobLaunchTime = new AtomicLong(0L);
+  public MRAppMasterTestLaunchTime(ApplicationAttemptId applicationAttemptId,
+      ContainerId containerId, String host, int port, int httpPort,
+      long submitTime) {
+    super(applicationAttemptId, containerId, host, port, httpPort,
+        submitTime, false, false);
+  }
+
+  @Override
+  protected EventHandler<CommitterEvent> createCommitterEventHandler(
+      AppContext context, OutputCommitter committer) {
+    return new CommitterEventHandler(context, committer,
+        getRMHeartbeatHandler()) {
+      @Override
+      public void handle(CommitterEvent event) {
+      }
+    };
+  }
+
+  @Override
+  protected EventHandler<JobHistoryEvent> createJobHistoryHandler(
+      AppContext context) {
+    return new JobHistoryEventHandler(context, getStartCount()) {
+      @Override
+      public void handle(JobHistoryEvent event) {
+        if (event.getHistoryEvent().getEventType() == EventType.JOB_INITED) {
+          JobInitedEvent jie = (JobInitedEvent) event.getHistoryEvent();
+          jobLaunchTime.set(jie.getLaunchTime());
+        }
+        super.handle(event);
+      }
+    };
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/93d50b78/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/EventWriter.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/EventWriter.java
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/EventWriter.java
index 29489a5..b1cb6dc 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/EventWriter.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/EventWriter.java
@@ -29,19 +29,25 @@ import org.apache.avro.specific.SpecificDatumWriter;
 import org.apache.avro.util.Utf8;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.mapreduce.Counter;
 import org.apache.hadoop.mapreduce.CounterGroup;
 import org.apache.hadoop.mapreduce.Counters;
 
+import com.google.common.annotations.VisibleForTesting;
+
 /**
  * Event Writer is an utility class used to write events to the underlying
  * stream. Typically, one event writer (which translates to one stream) 
  * is created per job 
  * 
  */
-class EventWriter {
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class EventWriter {
   static final String VERSION = "Avro-Json";
   static final String VERSION_BINARY = "Avro-Binary";
 
@@ -50,11 +56,17 @@ class EventWriter {
     new SpecificDatumWriter<Event>(Event.class);
   private Encoder encoder;
   private static final Log LOG = LogFactory.getLog(EventWriter.class);
+
+  /**
+   * avro encoding format supported by EventWriter.
+   */
   public enum WriteMode { JSON, BINARY }
   private final WriteMode writeMode;
   private final boolean jsonOutput;  // Cache value while we have 2 modes
 
-  EventWriter(FSDataOutputStream out, WriteMode mode) throws IOException {
+  @VisibleForTesting
+  public EventWriter(FSDataOutputStream out, WriteMode mode)
+      throws IOException {
     this.out = out;
     this.writeMode = mode;
     if (this.writeMode==WriteMode.JSON) {
@@ -93,7 +105,8 @@ class EventWriter {
     out.hflush();
   }
 
-  void close() throws IOException {
+  @VisibleForTesting
+  public void close() throws IOException {
     try {
       encoder.flush();
       out.close();


Mime
View raw message