hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From vino...@apache.org
Subject svn commit: r1230354 - in /hadoop/common/branches/branch-0.23/hadoop-mapreduce-project: ./ hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/ hadoop-mapreduce-client/hadoop-mapreduce-client-app/src...
Date Thu, 12 Jan 2012 01:43:34 GMT
Author: vinodkv
Date: Thu Jan 12 01:43:33 2012
New Revision: 1230354

URL: http://svn.apache.org/viewvc?rev=1230354&view=rev
Log:
MAPREDUCE-3512. Batching JobHistory flushing to DFS so that we don't flush for every event
slowing down AM. Contributed by Siddarth Seth.
svn merge --ignore-ancestry -c 1230353 ../../trunk/

Added:
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/
      - copied from r1230353, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java
      - copied unchanged from r1230353, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java
Modified:
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java

Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt?rev=1230354&r1=1230353&r2=1230354&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt Thu Jan 12 01:43:33
2012
@@ -135,6 +135,9 @@ Release 0.23.1 - Unreleased
     MAPREDUCE-3618. Fixed TaskHeartbeatHandler to not hold a global lock for all
     task-updates. (Siddarth Seth via vinodkv)
 
+    MAPREDUCE-3512. Batching JobHistory flushing to DFS so that we don't flush
+    for every event slowing down AM. (Siddarth Seth via vinodkv)
+
   BUG FIXES
     MAPREDUCE-3462. Fix Gridmix JUnit testcase failures. 
                     (Ravi Prakash and Ravi Gummadi via amarrk)

Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java?rev=1230354&r1=1230353&r2=1230354&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java
(original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java
Thu Jan 12 01:43:33 2012
@@ -20,9 +20,12 @@ package org.apache.hadoop.mapreduce.jobh
 
 import java.io.IOException;
 import java.util.Collections;
+import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
+import java.util.Timer;
+import java.util.TimerTask;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 
@@ -70,13 +73,20 @@ public class JobHistoryEventHandler exte
   private FileSystem stagingDirFS; // log Dir FileSystem
   private FileSystem doneDirFS; // done Dir FileSystem
 
-  private Configuration conf;
 
   private Path stagingDirPath = null;
   private Path doneDirPrefixPath = null; // folder for completed jobs
 
+  private int maxUnflushedCompletionEvents;
+  private int postJobCompletionMultiplier;
+  private long flushTimeout;
+  private int minQueueSizeForBatchingFlushes; // TODO: Rename
 
-  private BlockingQueue<JobHistoryEvent> eventQueue =
+  private int numUnflushedCompletionEvents = 0;
+  private boolean isTimerActive;
+
+
+  protected BlockingQueue<JobHistoryEvent> eventQueue =
     new LinkedBlockingQueue<JobHistoryEvent>();
   protected Thread eventHandlingThread;
   private volatile boolean stopped;
@@ -103,8 +113,6 @@ public class JobHistoryEventHandler exte
   @Override
   public void init(Configuration conf) {
 
-    this.conf = conf;
-
     String stagingDirStr = null;
     String doneDirStr = null;
     String userDoneDirStr = null;
@@ -184,6 +192,27 @@ public class JobHistoryEventHandler exte
       throw new YarnException(e);
     }
 
+    // Maximum number of unflushed completion-events that can stay in the queue
+    // before flush kicks in.
+    maxUnflushedCompletionEvents =
+        conf.getInt(MRJobConfig.MR_AM_HISTORY_MAX_UNFLUSHED_COMPLETE_EVENTS,
+            MRJobConfig.DEFAULT_MR_AM_HISTORY_MAX_UNFLUSHED_COMPLETE_EVENTS);
+    // We want to cut down flushes after job completes so as to write quicker,
+    // so we increase maxUnflushedEvents post Job completion by using the
+    // following multiplier.
+    postJobCompletionMultiplier =
+        conf.getInt(
+            MRJobConfig.MR_AM_HISTORY_JOB_COMPLETE_UNFLUSHED_MULTIPLIER,
+            MRJobConfig.DEFAULT_MR_AM_HISTORY_JOB_COMPLETE_UNFLUSHED_MULTIPLIER);
+    // Max time until which flush doesn't take place.
+    flushTimeout =
+        conf.getLong(MRJobConfig.MR_AM_HISTORY_COMPLETE_EVENT_FLUSH_TIMEOUT_MS,
+            MRJobConfig.DEFAULT_MR_AM_HISTORY_COMPLETE_EVENT_FLUSH_TIMEOUT_MS);
+    minQueueSizeForBatchingFlushes =
+        conf.getInt(
+            MRJobConfig.MR_AM_HISTORY_USE_BATCHED_FLUSH_QUEUE_SIZE_THRESHOLD,
+            MRJobConfig.DEFAULT_MR_AM_HISTORY_USE_BATCHED_FLUSH_QUEUE_SIZE_THRESHOLD);
+    
     super.init(conf);
   }
 
@@ -256,14 +285,28 @@ public class JobHistoryEventHandler exte
     stopped = true;
     //do not interrupt while event handling is in progress
     synchronized(lock) {
-      eventHandlingThread.interrupt();
+      if (eventHandlingThread != null)
+        eventHandlingThread.interrupt();
     }
 
     try {
-      eventHandlingThread.join();
+      if (eventHandlingThread != null)
+        eventHandlingThread.join();
     } catch (InterruptedException ie) {
       LOG.info("Interruped Exception while stopping", ie);
     }
+
+    // Cancel all timers - so that they aren't invoked during or after
+    // the metaInfo object is wrapped up.
+    for (MetaInfo mi : fileMap.values()) {
+      try {
+        mi.shutDownTimer();
+      } catch (IOException e) {
+        LOG.info("Exception while cancelling delayed flush timer. "
+            + "Likely caused by a failed flush " + e.getMessage());
+      }
+    }
+
     //write all the events remaining in queue
     Iterator<JobHistoryEvent> it = eventQueue.iterator();
     while(it.hasNext()) {
@@ -284,6 +327,12 @@ public class JobHistoryEventHandler exte
     super.stop();
   }
 
+  protected EventWriter createEventWriter(Path historyFilePath)
+      throws IOException {
+    FSDataOutputStream out = stagingDirFS.create(historyFilePath, true);
+    return new EventWriter(out);
+  }
+  
   /**
    * Create an event writer for the Job represented by the jobID.
    * Writes out the job configuration to the log directory.
@@ -319,8 +368,7 @@ public class JobHistoryEventHandler exte
         JobHistoryUtils.getStagingConfFile(stagingDirPath, jobId, startCount);
     if (writer == null) {
       try {
-        FSDataOutputStream out = stagingDirFS.create(historyFile, true);
-        writer = new EventWriter(out);
+        writer = createEventWriter(historyFile);
         LOG.info("Event Writer setup for JobId: " + jobId + ", File: "
             + historyFile);
       } catch (IOException ioe) {
@@ -371,12 +419,26 @@ public class JobHistoryEventHandler exte
   @Override
   public void handle(JobHistoryEvent event) {
     try {
+      if (isJobCompletionEvent(event.getHistoryEvent())) {
+        // When the job is complete, flush slower but write faster.
+        maxUnflushedCompletionEvents =
+            maxUnflushedCompletionEvents * postJobCompletionMultiplier;
+      }
+
       eventQueue.put(event);
     } catch (InterruptedException e) {
       throw new YarnException(e);
     }
   }
 
+  private boolean isJobCompletionEvent(HistoryEvent historyEvent) {
+    if (EnumSet.of(EventType.JOB_FINISHED, EventType.JOB_FAILED,
+        EventType.JOB_KILLED).contains(historyEvent.getEventType())) {
+      return true;
+    }
+    return false;
+  }
+
   protected void handleEvent(JobHistoryEvent event) {
     synchronized (lock) {
 
@@ -615,50 +677,159 @@ public class JobHistoryEventHandler exte
     }
   }
 
+  private class FlushTimerTask extends TimerTask {
+    private MetaInfo metaInfo;
+    private IOException ioe = null;
+    private volatile boolean shouldRun = true;
+
+    FlushTimerTask(MetaInfo metaInfo) {
+      this.metaInfo = metaInfo;
+    }
+
+    @Override
+    public void run() {
+      synchronized (lock) {
+        try {
+          if (!metaInfo.isTimerShutDown() && shouldRun)
+            metaInfo.flush();
+        } catch (IOException e) {
+          ioe = e;
+        }
+      }
+    }
+
+    public IOException getException() {
+      return ioe;
+    }
+
+    public void stop() {
+      shouldRun = false;
+      this.cancel();
+    }
+  }
+
   private class MetaInfo {
     private Path historyFile;
     private Path confFile;
     private EventWriter writer;
     JobIndexInfo jobIndexInfo;
     JobSummary jobSummary;
+    Timer flushTimer; 
+    FlushTimerTask flushTimerTask;
+    private boolean isTimerShutDown = false;
 
-    MetaInfo(Path historyFile, Path conf, EventWriter writer, 
-             String user, String jobName, JobId jobId) {
+    MetaInfo(Path historyFile, Path conf, EventWriter writer, String user,
+        String jobName, JobId jobId) {
       this.historyFile = historyFile;
       this.confFile = conf;
       this.writer = writer;
-      this.jobIndexInfo = new JobIndexInfo(-1, -1, user, jobName, jobId, -1, -1,
-          null);
+      this.jobIndexInfo =
+          new JobIndexInfo(-1, -1, user, jobName, jobId, -1, -1, null);
       this.jobSummary = new JobSummary();
+      this.flushTimer = new Timer("FlushTimer", true);
     }
 
-    Path getHistoryFile() { return historyFile; }
+    Path getHistoryFile() {
+      return historyFile;
+    }
 
-    Path getConfFile() {return confFile; } 
+    Path getConfFile() {
+      return confFile;
+    }
 
-    JobIndexInfo getJobIndexInfo() { return jobIndexInfo; }
+    JobIndexInfo getJobIndexInfo() {
+      return jobIndexInfo;
+    }
 
-    JobSummary getJobSummary() { return jobSummary; }
+    JobSummary getJobSummary() {
+      return jobSummary;
+    }
 
-    boolean isWriterActive() {return writer != null ; }
+    boolean isWriterActive() {
+      return writer != null;
+    }
+    
+    boolean isTimerShutDown() {
+      return isTimerShutDown;
+    }
 
     void closeWriter() throws IOException {
       synchronized (lock) {
-      if (writer != null) {
-        writer.close();
+        if (writer != null) {
+          writer.close();
+        }
+        writer = null;
       }
-      writer = null;
-    }
     }
 
     void writeEvent(HistoryEvent event) throws IOException {
       synchronized (lock) {
-      if (writer != null) {
-        writer.write(event);
-        writer.flush();
+        if (writer != null) {
+          writer.write(event);
+          processEventForFlush(event);
+          maybeFlush(event);
+        }
+      }
+    }
+
+    void processEventForFlush(HistoryEvent historyEvent) throws IOException {
+      if (EnumSet.of(EventType.MAP_ATTEMPT_FINISHED,
+          EventType.MAP_ATTEMPT_FAILED, EventType.MAP_ATTEMPT_KILLED,
+          EventType.REDUCE_ATTEMPT_FINISHED, EventType.REDUCE_ATTEMPT_FAILED,
+          EventType.REDUCE_ATTEMPT_KILLED, EventType.TASK_FINISHED,
+          EventType.TASK_FAILED, EventType.JOB_FINISHED, EventType.JOB_FAILED,
+          EventType.JOB_KILLED).contains(historyEvent.getEventType())) {
+        numUnflushedCompletionEvents++;
+        if (!isTimerActive) {
+          resetFlushTimer();
+          if (!isTimerShutDown) {
+            flushTimerTask = new FlushTimerTask(this);
+            flushTimer.schedule(flushTimerTask, flushTimeout);
+          }
+        }
+      }
+    }
+
+    void resetFlushTimer() throws IOException {
+      if (flushTimerTask != null) {
+        IOException exception = flushTimerTask.getException();
+        flushTimerTask.stop();
+        if (exception != null) {
+          throw exception;
+        }
+        flushTimerTask = null;
+      }
+      isTimerActive = false;
+    }
+
+    void maybeFlush(HistoryEvent historyEvent) throws IOException {
+      if ((eventQueue.size() < minQueueSizeForBatchingFlushes 
+          && numUnflushedCompletionEvents > 0)
+          || numUnflushedCompletionEvents >= maxUnflushedCompletionEvents 
+          || isJobCompletionEvent(historyEvent)) {
+        this.flush();
+      }
+    }
+
+    void flush() throws IOException {
+      synchronized (lock) {
+        if (numUnflushedCompletionEvents != 0) { // skipped timer cancel.
+          writer.flush();
+          numUnflushedCompletionEvents = 0;
+          resetFlushTimer();
+        }
+      }
+    }
+
+    void shutDownTimer() throws IOException {
+      synchronized (lock) {
+        isTimerShutDown = true;
+        flushTimer.cancel();
+        if (flushTimerTask != null && flushTimerTask.getException() != null) {
+          throw flushTimerTask.getException();
+        }
       }
     }
-  }
   }
 
   private void moveTmpToDone(Path tmpPath) throws IOException {
@@ -682,7 +853,7 @@ public class JobHistoryEventHandler exte
         doneDirFS.delete(toPath, true);
       }
       boolean copied = FileUtil.copy(stagingDirFS, fromPath, doneDirFS, toPath,
-          false, conf);
+          false, getConfig());
 
       if (copied)
         LOG.info("Copied to done location: " + toPath);

Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java?rev=1230354&r1=1230353&r2=1230354&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
(original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
Thu Jan 12 01:43:33 2012
@@ -436,6 +436,26 @@ public interface MRJobConfig {
   public static final String MR_AM_CREATE_JH_INTERMEDIATE_BASE_DIR = 
     MR_AM_PREFIX + "create-intermediate-jh-base-dir";
   
+  public static final String MR_AM_HISTORY_MAX_UNFLUSHED_COMPLETE_EVENTS =
+      MR_AM_PREFIX + "history.max-unflushed-events";
+  public static final int DEFAULT_MR_AM_HISTORY_MAX_UNFLUSHED_COMPLETE_EVENTS =
+      200;
+
+  public static final String MR_AM_HISTORY_JOB_COMPLETE_UNFLUSHED_MULTIPLIER =
+      MR_AM_PREFIX + "history.job-complete-unflushed-multiplier";
+  public static final int DEFAULT_MR_AM_HISTORY_JOB_COMPLETE_UNFLUSHED_MULTIPLIER =
+      30;
+
+  public static final String MR_AM_HISTORY_COMPLETE_EVENT_FLUSH_TIMEOUT_MS =
+      MR_AM_PREFIX + "history.complete-event-flush-timeout";
+  public static final long DEFAULT_MR_AM_HISTORY_COMPLETE_EVENT_FLUSH_TIMEOUT_MS =
+      30 * 1000l;
+
+  public static final String MR_AM_HISTORY_USE_BATCHED_FLUSH_QUEUE_SIZE_THRESHOLD =
+      MR_AM_PREFIX + "history.use-batched-flush.queue-size.threshold";
+  public static final int DEFAULT_MR_AM_HISTORY_USE_BATCHED_FLUSH_QUEUE_SIZE_THRESHOLD =
+      50;
+  
   public static final String MAPRED_MAP_ADMIN_JAVA_OPTS =
       "mapreduce.admin.map.child.java.opts";
 



Mime
View raw message