hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From acmur...@apache.org
Subject svn commit: r1227426 - in /hadoop/common/trunk/hadoop-mapreduce-project: ./ hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/ hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/a...
Date Thu, 05 Jan 2012 01:37:14 GMT
Author: acmurthy
Date: Thu Jan  5 01:37:13 2012
New Revision: 1227426

URL: http://svn.apache.org/viewvc?rev=1227426&view=rev
Log:
MAPREDUCE-3572. Moved AM event dispatcher to a separate thread for performance reasons. Contributed
by Vinod K V. 

Modified:
    hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java

Modified: hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt?rev=1227426&r1=1227425&r2=1227426&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt Thu Jan  5 01:37:13 2012
@@ -406,6 +406,9 @@ Release 0.23.1 - Unreleased
     MAPREDUCE-3566. Fixed MR AM to construct CLC only once across all tasks.
     (vinodkv via acmurthy) 
 
+    MAPREDUCE-3572. Moved AM event dispatcher to a separate thread for
+    performance reasons. (vinodkv via acmurthy) 
+
 Release 0.23.0 - 2011-11-01 
 
   INCOMPATIBLE CHANGES

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java?rev=1227426&r1=1227425&r2=1227426&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java
(original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java
Thu Jan  5 01:37:13 2012
@@ -104,10 +104,9 @@ public abstract class RMCommunicator ext
   @Override
   public void start() {
     scheduler= createSchedulerProxy();
-    //LOG.info("Scheduler is " + scheduler);
     register();
     startAllocatorThread();
-    JobID id = TypeConverter.fromYarn(context.getApplicationID());
+    JobID id = TypeConverter.fromYarn(this.applicationId);
     JobId jobId = TypeConverter.toYarn(id);
     job = context.getJob(jobId);
     super.start();

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java?rev=1227426&r1=1227425&r2=1227426&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
(original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
Thu Jan  5 01:37:13 2012
@@ -30,18 +30,17 @@ import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapreduce.JobCounter;
-import org.apache.hadoop.mapreduce.JobID;
 import org.apache.hadoop.mapreduce.MRJobConfig;
-import org.apache.hadoop.mapreduce.TypeConverter;
 import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
 import org.apache.hadoop.mapreduce.jobhistory.NormalizedResourceEvent;
-import org.apache.hadoop.mapreduce.v2.api.records.JobId;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
 import org.apache.hadoop.mapreduce.v2.app.AppContext;
@@ -69,7 +68,7 @@ import org.apache.hadoop.yarn.util.RackR
 public class RMContainerAllocator extends RMContainerRequestor
     implements ContainerAllocator {
 
-  private static final Log LOG = LogFactory.getLog(RMContainerAllocator.class);
+  static final Log LOG = LogFactory.getLog(RMContainerAllocator.class);
   
   public static final 
   float DEFAULT_COMPLETED_MAPS_PERCENT_FOR_REDUCE_SLOWSTART = 0.05f;
@@ -77,7 +76,10 @@ public class RMContainerAllocator extend
   private static final Priority PRIORITY_FAST_FAIL_MAP;
   private static final Priority PRIORITY_REDUCE;
   private static final Priority PRIORITY_MAP;
-  
+
+  private Thread eventHandlingThread;
+  private volatile boolean stopEventHandling;
+
   static {
     PRIORITY_FAST_FAIL_MAP = RecordFactoryProvider.getRecordFactory(null).newRecordInstance(Priority.class);
     PRIORITY_FAST_FAIL_MAP.setPriority(5);
@@ -130,7 +132,10 @@ public class RMContainerAllocator extend
   private float reduceSlowStart = 0;
   private long retryInterval;
   private long retrystartTime;
-  
+
+  BlockingQueue<ContainerAllocatorEvent> eventQueue
+    = new LinkedBlockingQueue<ContainerAllocatorEvent>();
+
   public RMContainerAllocator(ClientService clientService, AppContext context) {
     super(clientService, context);
   }
@@ -156,6 +161,40 @@ public class RMContainerAllocator extend
   }
 
   @Override
+  public void start() {
+    this.eventHandlingThread = new Thread() {
+      @SuppressWarnings("unchecked")
+      @Override
+      public void run() {
+
+        ContainerAllocatorEvent event;
+
+        while (!stopEventHandling && !Thread.currentThread().isInterrupted()) {
+          try {
+            event = RMContainerAllocator.this.eventQueue.take();
+          } catch (InterruptedException e) {
+            LOG.error("Returning, interrupted : " + e);
+            return;
+          }
+
+          try {
+            handleEvent(event);
+          } catch (Throwable t) {
+            LOG.error("Error in handling event type " + event.getType()
+                + " to the ContainreAllocator", t);
+            // Kill the AM
+            eventHandler.handle(new JobEvent(getJob().getID(),
+              JobEventType.INTERNAL_ERROR));
+            return;
+          }
+        }
+      }
+    };
+    this.eventHandlingThread.start();
+    super.start();
+  }
+
+  @Override
   protected synchronized void heartbeat() throws Exception {
     LOG.info("Before Scheduling: " + getStat());
     List<Container> allocatedContainers = getResources();
@@ -181,6 +220,8 @@ public class RMContainerAllocator extend
 
   @Override
   public void stop() {
+    this.stopEventHandling = true;
+    eventHandlingThread.interrupt();
     super.stop();
     LOG.info("Final Stats: " + getStat());
   }
@@ -192,10 +233,27 @@ public class RMContainerAllocator extend
   public void setIsReduceStarted(boolean reduceStarted) {
     this.reduceStarted = reduceStarted; 
   }
-  
-  @SuppressWarnings("unchecked")
+
   @Override
-  public synchronized void handle(ContainerAllocatorEvent event) {
+  public void handle(ContainerAllocatorEvent event) {
+    int qSize = eventQueue.size();
+    if (qSize != 0 && qSize % 1000 == 0) {
+      LOG.info("Size of event-queue in RMContainerAllocator is " + qSize);
+    }
+    int remCapacity = eventQueue.remainingCapacity();
+    if (remCapacity < 1000) {
+      LOG.warn("Very low remaining capacity in the event-queue "
+          + "of RMContainerAllocator: " + remCapacity);
+    }
+    try {
+      eventQueue.put(event);
+    } catch (InterruptedException e) {
+      throw new YarnException(e);
+    }
+  }
+
+  @SuppressWarnings({ "unchecked" })
+  protected synchronized void handleEvent(ContainerAllocatorEvent event) {
     LOG.info("Processing the event " + event.toString());
     recalculateReduceSchedule = true;
     if (event.getType() == ContainerAllocator.EventType.CONTAINER_REQ) {
@@ -206,9 +264,7 @@ public class RMContainerAllocator extend
           int minSlotMemSize = getMinContainerCapability().getMemory();
           mapResourceReqt = (int) Math.ceil((float) mapResourceReqt/minSlotMemSize)
               * minSlotMemSize;
-          JobID id = TypeConverter.fromYarn(applicationId);
-          JobId jobId = TypeConverter.toYarn(id);
-          eventHandler.handle(new JobHistoryEvent(jobId, 
+          eventHandler.handle(new JobHistoryEvent(getJob().getID(), 
               new NormalizedResourceEvent(org.apache.hadoop.mapreduce.TaskType.MAP,
               mapResourceReqt)));
           LOG.info("mapResourceReqt:"+mapResourceReqt);
@@ -232,9 +288,7 @@ public class RMContainerAllocator extend
           //round off on slotsize
           reduceResourceReqt = (int) Math.ceil((float) 
               reduceResourceReqt/minSlotMemSize) * minSlotMemSize;
-          JobID id = TypeConverter.fromYarn(applicationId);
-          JobId jobId = TypeConverter.toYarn(id);
-          eventHandler.handle(new JobHistoryEvent(jobId, 
+          eventHandler.handle(new JobHistoryEvent(getJob().getID(), 
               new NormalizedResourceEvent(
                   org.apache.hadoop.mapreduce.TaskType.REDUCE,
               reduceResourceReqt)));

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java?rev=1227426&r1=1227425&r2=1227426&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java
(original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java
Thu Jan  5 01:37:13 2012
@@ -1186,12 +1186,12 @@ public class TestRMContainerAllocator {
 
     public void sendRequests(List<ContainerRequestEvent> reqs) {
       for (ContainerRequestEvent req : reqs) {
-        super.handle(req);
+        super.handleEvent(req);
       }
     }
 
     public void sendFailure(ContainerFailedEvent f) {
-      super.handle(f);
+      super.handleEvent(f);
     }
     
     // API to be used by tests



Mime
View raw message