hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From vino...@apache.org
Subject svn commit: r1153447 - /hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
Date Wed, 03 Aug 2011 11:48:32 GMT
Author: vinodkv
Date: Wed Aug  3 11:48:31 2011
New Revision: 1153447

URL: http://svn.apache.org/viewvc?rev=1153447&view=rev
Log:
Completing the scheduler-dispatch cycle.

Modified:
    hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java

Modified: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java?rev=1153447&r1=1153446&r2=1153447&view=diff
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
(original)
+++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
Wed Aug  3 11:48:31 2011
@@ -20,6 +20,8 @@ package org.apache.hadoop.yarn.server.re
 
 
 import java.io.IOException;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.commons.logging.Log;
@@ -32,7 +34,6 @@ import org.apache.hadoop.util.Reflection
 import org.apache.hadoop.yarn.YarnException;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.AsyncDispatcher;
@@ -43,8 +44,8 @@ import org.apache.hadoop.yarn.security.c
 import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.ApplicationMasterLauncher;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.Recoverable;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.Store;
-import org.apache.hadoop.yarn.server.resourcemanager.recovery.StoreFactory;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.Store.RMState;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.StoreFactory;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
@@ -53,17 +54,16 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer;
-import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
-import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWebApp;
 import org.apache.hadoop.yarn.server.security.ContainerTokenSecretManager;
+import org.apache.hadoop.yarn.service.AbstractService;
 import org.apache.hadoop.yarn.service.CompositeService;
 import org.apache.hadoop.yarn.service.Service;
 import org.apache.hadoop.yarn.webapp.WebApp;
@@ -97,6 +97,7 @@ public class ResourceManager extends Com
   private ContainerAllocationExpirer containerAllocationExpirer;
   protected NMLivelinessMonitor nmLivelinessMonitor;
   protected NodesListManager nodesListManager;
+  private SchedulerEventDispatcher schedulerDispatcher;
 
   private final AtomicBoolean shutdown = new AtomicBoolean(false);
   private WebApp webApp;
@@ -136,7 +137,10 @@ public class ResourceManager extends Com
     this.conf = new YarnConfiguration(conf);
     // Initialize the scheduler
     this.scheduler = createScheduler();
-    this.rmDispatcher.register(SchedulerEventType.class, scheduler);
+    this.schedulerDispatcher = new SchedulerEventDispatcher(this.scheduler);
+    addService(this.schedulerDispatcher);
+    this.rmDispatcher.register(SchedulerEventType.class,
+        this.schedulerDispatcher);
 
     // Register event handler for RmAppEvents
     this.rmDispatcher.register(RMAppEventType.class,
@@ -212,6 +216,73 @@ public class ResourceManager extends Com
   }
 
   @Private
+  public static final class SchedulerEventDispatcher extends AbstractService
+      implements EventHandler<SchedulerEvent> {
+
+    private final ResourceScheduler scheduler;
+    private final BlockingQueue<SchedulerEvent> eventQueue =
+      new LinkedBlockingQueue<SchedulerEvent>();
+    private final Thread eventProcessor;
+
+    public SchedulerEventDispatcher(ResourceScheduler scheduler) {
+      super(SchedulerEventDispatcher.class.getName());
+      this.scheduler = scheduler;
+      this.eventProcessor = new Thread(new EventProcessor());
+    }
+
+    @Override
+    public synchronized void start() {
+      this.eventProcessor.start();
+      super.start();
+    }
+
+    private final class EventProcessor implements Runnable {
+      @Override
+      public void run() {
+
+        SchedulerEvent event;
+
+        while (!Thread.currentThread().isInterrupted()) {
+          try {
+            event = eventQueue.take();
+          } catch (InterruptedException e) {
+            LOG.error("Returning, interrupted : " + e);
+            return; // TODO: Kill RM.
+          }
+
+          try {
+            scheduler.handle(event);
+          } catch (Throwable t) {
+            LOG.error("Error in handling event type " + event.getType()
+                + " to the scheduler", t);
+            return; // TODO: Kill RM.
+          }
+        }
+      }
+    }
+
+    @Override
+    public synchronized void stop() {
+      this.eventProcessor.interrupt();
+      try {
+        this.eventProcessor.join();
+      } catch (InterruptedException e) {
+        throw new YarnException(e);
+      }
+      super.stop();
+    }
+
+    @Override
+    public void handle(SchedulerEvent event) {
+      try {
+        this.eventQueue.put(event);
+      } catch (InterruptedException e) {
+        throw new YarnException(e);
+      }
+    }
+  }
+
+  @Private
   public static final class ApplicationEventDispatcher implements
       EventHandler<RMAppEvent> {
 



Mime
View raw message