hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jl...@apache.org
Subject [2/2] hadoop git commit: YARN-6411. Clean up the overwrite of createDispatcher() in subclass of MockRM. Contributed by Yufei Gu
Date Fri, 31 Mar 2017 15:23:13 GMT
YARN-6411. Clean up the overwrite of createDispatcher() in subclass of MockRM. Contributed by Yufei Gu

(cherry picked from commit 4d1fac5df2508011adfc4c5d683beb00fd5ced45)

Conflicts:
	hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientOnRMRestart.java
	hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestClientToAMTokens.java


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/3ae529ba
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/3ae529ba
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/3ae529ba

Branch: refs/heads/branch-2
Commit: 3ae529ba54e5f9f1416042b9be2aa3cf7b2516de
Parents: 9e7f8cb
Author: Jason Lowe <jlowe@yahoo-inc.com>
Authored: Fri Mar 31 10:05:34 2017 -0500
Committer: Jason Lowe <jlowe@yahoo-inc.com>
Committed: Fri Mar 31 10:21:44 2017 -0500

----------------------------------------------------------------------
 .../v2/app/rm/TestRMContainerAllocator.java     | 453 +++++++++----------
 .../api/impl/TestAMRMClientOnRMRestart.java     |  59 +--
 .../server/resourcemanager/ACLsTestBase.java    |  10 -
 .../server/resourcemanager/RMHATestBase.java    |  20 +-
 .../ReservationACLsTestBase.java                |   5 +-
 .../resourcemanager/TestApplicationCleanup.java |  44 +-
 .../TestApplicationMasterLauncher.java          |  11 +-
 .../TestApplicationMasterService.java           |  19 +-
 .../TestNodeBlacklistingOnAMFailures.java       |  41 +-
 .../TestReservationSystemWithRMHA.java          |   5 +-
 .../TestAMRMRPCNodeUpdates.java                 |  18 +-
 .../resourcetracker/TestNMReconnect.java        |  14 +-
 .../rmcontainer/TestRMContainerImpl.java        |   1 -
 .../capacity/TestApplicationPriority.java       |  29 +-
 .../security/TestClientToAMTokens.java          |  23 +-
 15 files changed, 277 insertions(+), 475 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/3ae529ba/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java
index 78b389a..64c36bc 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java
@@ -177,21 +177,19 @@ public class TestRMContainerAllocator {
     Configuration conf = new Configuration();
     MyResourceManager rm = new MyResourceManager(conf);
     rm.start();
-    DrainDispatcher dispatcher = (DrainDispatcher) rm.getRMContext()
-        .getDispatcher();
 
     // Submit the application
     RMApp app = rm.submitApp(1024);
-    dispatcher.await();
+    rm.drainEvents();
 
     MockNM amNodeManager = rm.registerNode("amNM:1234", 2048);
     amNodeManager.nodeHeartbeat(true);
-    dispatcher.await();
+    rm.drainEvents();
 
     ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt()
         .getAppAttemptId();
     rm.sendAMLaunched(appAttemptId);
-    dispatcher.await();
+    rm.drainEvents();
 
     JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0);
     Job mockJob = mock(Job.class);
@@ -205,7 +203,7 @@ public class TestRMContainerAllocator {
     MockNM nodeManager1 = rm.registerNode("h1:1234", 10240);
     MockNM nodeManager2 = rm.registerNode("h2:1234", 10240);
     MockNM nodeManager3 = rm.registerNode("h3:1234", 10240);
-    dispatcher.await();
+    rm.drainEvents();
 
     // create the container request
     ContainerRequestEvent event1 = createReq(jobId, 1, 1024,
@@ -220,7 +218,7 @@ public class TestRMContainerAllocator {
     // this tells the scheduler about the requests
     // as nodes are not added, no allocations
     List<TaskAttemptContainerAssignedEvent> assigned = allocator.schedule();
-    dispatcher.await();
+    rm.drainEvents();
     Assert.assertEquals("No of assignments must be 0", 0, assigned.size());
     Assert.assertEquals(4, rm.getMyFifoScheduler().lastAsk.size());
 
@@ -232,7 +230,7 @@ public class TestRMContainerAllocator {
     // this tells the scheduler about the requests
     // as nodes are not added, no allocations
     assigned = allocator.schedule();
-    dispatcher.await();
+    rm.drainEvents();
     Assert.assertEquals("No of assignments must be 0", 0, assigned.size());
     Assert.assertEquals(3, rm.getMyFifoScheduler().lastAsk.size());
     
@@ -240,18 +238,18 @@ public class TestRMContainerAllocator {
     nodeManager1.nodeHeartbeat(true); // Node heartbeat
     nodeManager2.nodeHeartbeat(true); // Node heartbeat
     nodeManager3.nodeHeartbeat(true); // Node heartbeat
-    dispatcher.await();
+    rm.drainEvents();
 
     assigned = allocator.schedule();
-    dispatcher.await();
+    rm.drainEvents();
     Assert.assertEquals(0, rm.getMyFifoScheduler().lastAsk.size());
     checkAssignments(new ContainerRequestEvent[] { event1, event2, event3 },
         assigned, false);
     
     // check that the assigned container requests are cancelled
-    assigned = allocator.schedule();
-    dispatcher.await();
-    Assert.assertEquals(5, rm.getMyFifoScheduler().lastAsk.size());    
+    allocator.schedule();
+    rm.drainEvents();
+    Assert.assertEquals(5, rm.getMyFifoScheduler().lastAsk.size());
   }
   
   @Test 
@@ -267,21 +265,19 @@ public class TestRMContainerAllocator {
     Configuration conf = new Configuration();
     MyResourceManager rm = new MyResourceManager(conf);
     rm.start();
-    DrainDispatcher dispatcher = (DrainDispatcher) rm.getRMContext()
-        .getDispatcher();
 
     // Submit the application
     RMApp app = rm.submitApp(1024);
-    dispatcher.await();
+    rm.drainEvents();
 
     MockNM amNodeManager = rm.registerNode("amNM:1234", 2048);
     amNodeManager.nodeHeartbeat(true);
-    dispatcher.await();
+    rm.drainEvents();
 
     ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt()
         .getAppAttemptId();
     rm.sendAMLaunched(appAttemptId);
-    dispatcher.await();
+    rm.drainEvents();
 
     JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0);
     Job mockJob = mock(Job.class);
@@ -295,7 +291,7 @@ public class TestRMContainerAllocator {
     MockNM nodeManager1 = rm.registerNode("h1:1234", 3072); // can assign 2 maps 
     rm.registerNode("h2:1234", 10240); // wont heartbeat on node local node
     MockNM nodeManager3 = rm.registerNode("h3:1234", 1536); // assign 1 map
-    dispatcher.await();
+    rm.drainEvents();
 
     // create the container requests for maps
     ContainerRequestEvent event1 = createReq(jobId, 1, 1024,
@@ -311,7 +307,7 @@ public class TestRMContainerAllocator {
     // this tells the scheduler about the requests
     // as nodes are not added, no allocations
     List<TaskAttemptContainerAssignedEvent> assigned = allocator.schedule();
-    dispatcher.await();
+    rm.drainEvents();
     Assert.assertEquals("No of assignments must be 0", 0, assigned.size());
 
     // update resources in scheduler
@@ -321,10 +317,10 @@ public class TestRMContainerAllocator {
     // Node heartbeat from node-local next. This allocates 2 node local 
     // containers for task1 and task2. These should be matched with those tasks.
     nodeManager1.nodeHeartbeat(true);
-    dispatcher.await();
+    rm.drainEvents();
 
     assigned = allocator.schedule();
-    dispatcher.await();
+    rm.drainEvents();
     checkAssignments(new ContainerRequestEvent[] { event1, event2, event3 },
         assigned, false);
     // remove the rack-local assignment that should have happened for task3
@@ -348,21 +344,19 @@ public class TestRMContainerAllocator {
     Configuration conf = new Configuration();
     MyResourceManager rm = new MyResourceManager(conf);
     rm.start();
-    DrainDispatcher dispatcher = (DrainDispatcher) rm.getRMContext()
-        .getDispatcher();
 
     // Submit the application
     RMApp app = rm.submitApp(1024);
-    dispatcher.await();
+    rm.drainEvents();
 
     MockNM amNodeManager = rm.registerNode("amNM:1234", 2048);
     amNodeManager.nodeHeartbeat(true);
-    dispatcher.await();
+    rm.drainEvents();
 
     ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt()
         .getAppAttemptId();
     rm.sendAMLaunched(appAttemptId);
-    dispatcher.await();
+    rm.drainEvents();
 
     JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0);
     Job mockJob = mock(Job.class);
@@ -376,7 +370,7 @@ public class TestRMContainerAllocator {
     MockNM nodeManager1 = rm.registerNode("h1:1234", 10240);
     MockNM nodeManager2 = rm.registerNode("h2:1234", 10240);
     MockNM nodeManager3 = rm.registerNode("h3:1234", 10240);
-    dispatcher.await();
+    rm.drainEvents();
 
     // create the container request
     ContainerRequestEvent event1 = createReq(jobId, 1, 1024,
@@ -391,17 +385,17 @@ public class TestRMContainerAllocator {
     // this tells the scheduler about the requests
     // as nodes are not added, no allocations
     List<TaskAttemptContainerAssignedEvent> assigned = allocator.schedule();
-    dispatcher.await();
+    rm.drainEvents();
     Assert.assertEquals("No of assignments must be 0", 0, assigned.size());
 
     // update resources in scheduler
     nodeManager1.nodeHeartbeat(true); // Node heartbeat
     nodeManager2.nodeHeartbeat(true); // Node heartbeat
     nodeManager3.nodeHeartbeat(true); // Node heartbeat
-    dispatcher.await();
+    rm.drainEvents();
 
     assigned = allocator.schedule();
-    dispatcher.await();
+    rm.drainEvents();
     checkAssignments(new ContainerRequestEvent[] { event1, event2 },
         assigned, false);
   }
@@ -414,19 +408,17 @@ public class TestRMContainerAllocator {
     conf.setFloat(MRJobConfig.COMPLETED_MAPS_FOR_REDUCE_SLOWSTART, 0.0f);
     final MyResourceManager rm = new MyResourceManager(conf);
     rm.start();
-    final DrainDispatcher dispatcher = (DrainDispatcher) rm.getRMContext()
-        .getDispatcher();
     final RMApp app = rm.submitApp(1024);
-    dispatcher.await();
+    rm.drainEvents();
 
     final String host = "host1";
     final MockNM nm = rm.registerNode(String.format("%s:1234", host), 2048);
     nm.nodeHeartbeat(true);
-    dispatcher.await();
+    rm.drainEvents();
     final ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt()
         .getAppAttemptId();
     rm.sendAMLaunched(appAttemptId);
-    dispatcher.await();
+    rm.drainEvents();
     final JobId jobId = MRBuilderUtils
         .newJobId(appAttemptId.getApplicationId(), 0);
     final Job mockJob = mock(Job.class);
@@ -436,20 +428,20 @@ public class TestRMContainerAllocator {
     final MyContainerAllocator allocator = new MyContainerAllocator(rm, conf,
         appAttemptId, mockJob, SystemClock.getInstance());
     // add resources to scheduler
-    dispatcher.await();
+    rm.drainEvents();
 
     // create the container request
     final String[] locations = new String[] { host };
     allocator.sendRequest(createReq(jobId, 0, 1024, locations, false, true));
     for (int i = 0; i < 1;) {
-      dispatcher.await();
+      rm.drainEvents();
       i += allocator.schedule().size();
       nm.nodeHeartbeat(true);
     }
 
     allocator.sendRequest(createReq(jobId, 0, 1024, locations, true, false));
     while (allocator.getTaskAttemptKillEvents().size() == 0) {
-      dispatcher.await();
+      rm.drainEvents();
       allocator.schedule().size();
       nm.nodeHeartbeat(true);
     }
@@ -466,21 +458,19 @@ public class TestRMContainerAllocator {
     Configuration conf = new Configuration();
     MyResourceManager rm = new MyResourceManager(conf);
     rm.start();
-    DrainDispatcher dispatcher = (DrainDispatcher) rm.getRMContext()
-        .getDispatcher();
 
     // Submit the application
     RMApp app = rm.submitApp(1024);
-    dispatcher.await();
+    rm.drainEvents();
 
     MockNM amNodeManager = rm.registerNode("amNM:1234", 2048);
     amNodeManager.nodeHeartbeat(true);
-    dispatcher.await();
+    rm.drainEvents();
 
     ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt()
         .getAppAttemptId();
     rm.sendAMLaunched(appAttemptId);
-    dispatcher.await();
+    rm.drainEvents();
 
     JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0);
     Job mockJob = mock(Job.class);
@@ -519,21 +509,19 @@ public class TestRMContainerAllocator {
 
     MyResourceManager rm = new MyResourceManager(conf);
     rm.start();
-    DrainDispatcher dispatcher = (DrainDispatcher) rm.getRMContext()
-        .getDispatcher();
 
     // Submit the application
     RMApp app = rm.submitApp(1024);
-    dispatcher.await();
+    rm.drainEvents();
 
     MockNM amNodeManager = rm.registerNode("amNM:1234", 2048);
     amNodeManager.nodeHeartbeat(true);
-    dispatcher.await();
+    rm.drainEvents();
 
     ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt()
         .getAppAttemptId();
     rm.sendAMLaunched(appAttemptId);
-    dispatcher.await();
+    rm.drainEvents();
 
     JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0);
     Job mockJob = mock(Job.class);
@@ -582,21 +570,19 @@ public class TestRMContainerAllocator {
     MyResourceManager rm = new MyResourceManager(conf);
     rm.start();
     rm.getMyFifoScheduler().forceResourceLimit(Resource.newInstance(8192, 8));
-    DrainDispatcher dispatcher = (DrainDispatcher) rm.getRMContext()
-        .getDispatcher();
 
     // Submit the application
     RMApp app = rm.submitApp(1024);
-    dispatcher.await();
+    rm.drainEvents();
 
     MockNM amNodeManager = rm.registerNode("amNM:1234", 2048);
     amNodeManager.nodeHeartbeat(true);
-    dispatcher.await();
+    rm.drainEvents();
 
     ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt()
         .getAppAttemptId();
     rm.sendAMLaunched(appAttemptId);
-    dispatcher.await();
+    rm.drainEvents();
 
     JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0);
     Job mockJob = mock(Job.class);
@@ -637,18 +623,16 @@ public class TestRMContainerAllocator {
     conf.setFloat(MRJobConfig.COMPLETED_MAPS_FOR_REDUCE_SLOWSTART, 0.0f);
     final MyResourceManager2 rm = new MyResourceManager2(conf);
     rm.start();
-    final DrainDispatcher dispatcher = (DrainDispatcher)rm.getRMContext()
-            .getDispatcher();
     final RMApp app = rm.submitApp(2048);
-    dispatcher.await();
+    rm.drainEvents();
     final String host = "host1";
     final MockNM nm = rm.registerNode(String.format("%s:1234", host), 4096);
     nm.nodeHeartbeat(true);
-    dispatcher.await();
+    rm.drainEvents();
     final ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt()
           .getAppAttemptId();
     rm.sendAMLaunched(appAttemptId);
-    dispatcher.await();
+    rm.drainEvents();
     final JobId jobId = MRBuilderUtils
                  .newJobId(appAttemptId.getApplicationId(), 0);
     final Job mockJob = mock(Job.class);
@@ -664,14 +648,14 @@ public class TestRMContainerAllocator {
     allocator.scheduleAllReduces();
     allocator.makeRemoteRequest();
     nm.nodeHeartbeat(true);
-    dispatcher.await();
+    rm.drainEvents();
     allocator.sendRequest(createReq(jobId, 1, 1024, locations, false, false));
 
     int assignedContainer;
     for (assignedContainer = 0; assignedContainer < 1;) {
       assignedContainer += allocator.schedule().size();
       nm.nodeHeartbeat(true);
-      dispatcher.await();
+      rm.drainEvents();
     }
     // only 1 allocated container should be assigned
     Assert.assertEquals(assignedContainer, 1);
@@ -771,21 +755,19 @@ public class TestRMContainerAllocator {
     Configuration conf = new Configuration();
     MyResourceManager rm = new MyResourceManager(conf);
     rm.start();
-    DrainDispatcher dispatcher = (DrainDispatcher) rm.getRMContext()
-        .getDispatcher();
 
     // Submit the application
     RMApp app = rm.submitApp(1024);
-    dispatcher.await();
+    rm.drainEvents();
 
     MockNM amNodeManager = rm.registerNode("amNM:1234", 2048);
     amNodeManager.nodeHeartbeat(true);
-    dispatcher.await();
+    rm.drainEvents();
 
     ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt()
         .getAppAttemptId();
     rm.sendAMLaunched(appAttemptId);
-    dispatcher.await();
+    rm.drainEvents();
 
     JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0);
     Job mockJob = mock(Job.class);
@@ -799,7 +781,7 @@ public class TestRMContainerAllocator {
     MockNM nodeManager1 = rm.registerNode("h1:1234", 1024);
     MockNM nodeManager2 = rm.registerNode("h2:1234", 10240);
     MockNM nodeManager3 = rm.registerNode("h3:1234", 10240);
-    dispatcher.await();
+    rm.drainEvents();
 
     // create the container request
     // send MAP request
@@ -820,17 +802,17 @@ public class TestRMContainerAllocator {
     // this tells the scheduler about the requests
     // as nodes are not added, no allocations
     List<TaskAttemptContainerAssignedEvent> assigned = allocator.schedule();
-    dispatcher.await();
+    rm.drainEvents();
     Assert.assertEquals("No of assignments must be 0", 0, assigned.size());
 
     // update resources in scheduler
     nodeManager1.nodeHeartbeat(true); // Node heartbeat
     nodeManager2.nodeHeartbeat(true); // Node heartbeat
     nodeManager3.nodeHeartbeat(true); // Node heartbeat
-    dispatcher.await();
+    rm.drainEvents();
 
     assigned = allocator.schedule();
-    dispatcher.await();
+    rm.drainEvents();
     checkAssignments(new ContainerRequestEvent[] { event1, event3 },
         assigned, false);
 
@@ -862,11 +844,6 @@ public class TestRMContainerAllocator {
     }
 
     @Override
-    protected Dispatcher createDispatcher() {
-      return new DrainDispatcher();
-    }
-
-    @Override
     protected EventHandler<SchedulerEvent> createSchedulerEventDispatcher() {
       // Dispatch inline for test sanity
       return new EventHandler<SchedulerEvent>() {
@@ -910,16 +887,16 @@ public class TestRMContainerAllocator {
 
     // Submit the application
     RMApp rmApp = rm.submitApp(1024);
-    rmDispatcher.await();
+    rm.drainEvents();
 
     MockNM amNodeManager = rm.registerNode("amNM:1234", 21504);
     amNodeManager.nodeHeartbeat(true);
-    rmDispatcher.await();
+    rm.drainEvents();
 
     final ApplicationAttemptId appAttemptId = rmApp.getCurrentAppAttempt()
         .getAppAttemptId();
     rm.sendAMLaunched(appAttemptId);
-    rmDispatcher.await();
+    rm.drainEvents();
 
     MRApp mrApp = new MRApp(appAttemptId, ContainerId.newContainerId(
       appAttemptId, 0), 10, 10, false, this.getClass().getName(), true, 1) {
@@ -957,11 +934,11 @@ public class TestRMContainerAllocator {
     amDispatcher.await();
 
     allocator.schedule();
-    rmDispatcher.await();
+    rm.drainEvents();
     amNodeManager.nodeHeartbeat(true);
-    rmDispatcher.await();
+    rm.drainEvents();
     allocator.schedule();
-    rmDispatcher.await();
+    rm.drainEvents();
 
     // Wait for all map-tasks to be running
     for (Task t : job.getTasks().values()) {
@@ -971,7 +948,7 @@ public class TestRMContainerAllocator {
     }
 
     allocator.schedule(); // Send heartbeat
-    rmDispatcher.await();
+    rm.drainEvents();
     Assert.assertEquals(0.05f, job.getProgress(), 0.001f);
     Assert.assertEquals(0.05f, rmApp.getProgress(), 0.001f);
 
@@ -979,14 +956,14 @@ public class TestRMContainerAllocator {
     Iterator<Task> it = job.getTasks().values().iterator();
     finishNextNTasks(rmDispatcher, amNodeManager, mrApp, it, 1);
     allocator.schedule();
-    rmDispatcher.await();
+    rm.drainEvents();
     Assert.assertEquals(0.095f, job.getProgress(), 0.001f);
     Assert.assertEquals(0.095f, rmApp.getProgress(), 0.001f);
 
     // Finish off 7 more so that map-progress is 80%
     finishNextNTasks(rmDispatcher, amNodeManager, mrApp, it, 7);
     allocator.schedule();
-    rmDispatcher.await();
+    rm.drainEvents();
     Assert.assertEquals(0.41f, job.getProgress(), 0.001f);
     Assert.assertEquals(0.41f, rmApp.getProgress(), 0.001f);
 
@@ -994,11 +971,11 @@ public class TestRMContainerAllocator {
     finishNextNTasks(rmDispatcher, amNodeManager, mrApp, it, 2);
 
     allocator.schedule();
-    rmDispatcher.await();
+    rm.drainEvents();
     amNodeManager.nodeHeartbeat(true);
-    rmDispatcher.await();
+    rm.drainEvents();
     allocator.schedule();
-    rmDispatcher.await();
+    rm.drainEvents();
 
     // Wait for all reduce-tasks to be running
     for (Task t : job.getTasks().values()) {
@@ -1011,14 +988,14 @@ public class TestRMContainerAllocator {
     finishNextNTasks(rmDispatcher, amNodeManager, mrApp, it, 2);
 
     allocator.schedule();
-    rmDispatcher.await();
+    rm.drainEvents();
     Assert.assertEquals(0.59f, job.getProgress(), 0.001f);
     Assert.assertEquals(0.59f, rmApp.getProgress(), 0.001f);
 
     // Finish off the remaining 8 reduces.
     finishNextNTasks(rmDispatcher, amNodeManager, mrApp, it, 8);
     allocator.schedule();
-    rmDispatcher.await();
+    rm.drainEvents();
     // Remaining is JobCleanup
     Assert.assertEquals(0.95f, job.getProgress(), 0.001f);
     Assert.assertEquals(0.95f, rmApp.getProgress(), 0.001f);
@@ -1062,16 +1039,16 @@ public class TestRMContainerAllocator {
 
     // Submit the application
     RMApp rmApp = rm.submitApp(1024);
-    rmDispatcher.await();
+    rm.drainEvents();
 
     MockNM amNodeManager = rm.registerNode("amNM:1234", 11264);
     amNodeManager.nodeHeartbeat(true);
-    rmDispatcher.await();
+    rm.drainEvents();
 
     final ApplicationAttemptId appAttemptId = rmApp.getCurrentAppAttempt()
         .getAppAttemptId();
     rm.sendAMLaunched(appAttemptId);
-    rmDispatcher.await();
+    rm.drainEvents();
 
     MRApp mrApp = new MRApp(appAttemptId, ContainerId.newContainerId(
       appAttemptId, 0), 10, 0, false, this.getClass().getName(), true, 1) {
@@ -1107,11 +1084,11 @@ public class TestRMContainerAllocator {
     amDispatcher.await();
 
     allocator.schedule();
-    rmDispatcher.await();
+    rm.drainEvents();
     amNodeManager.nodeHeartbeat(true);
-    rmDispatcher.await();
+    rm.drainEvents();
     allocator.schedule();
-    rmDispatcher.await();
+    rm.drainEvents();
 
     // Wait for all map-tasks to be running
     for (Task t : job.getTasks().values()) {
@@ -1119,7 +1096,7 @@ public class TestRMContainerAllocator {
     }
 
     allocator.schedule(); // Send heartbeat
-    rmDispatcher.await();
+    rm.drainEvents();
     Assert.assertEquals(0.05f, job.getProgress(), 0.001f);
     Assert.assertEquals(0.05f, rmApp.getProgress(), 0.001f);
 
@@ -1128,21 +1105,21 @@ public class TestRMContainerAllocator {
     // Finish off 1 map so that map-progress is 10%
     finishNextNTasks(rmDispatcher, amNodeManager, mrApp, it, 1);
     allocator.schedule();
-    rmDispatcher.await();
+    rm.drainEvents();
     Assert.assertEquals(0.14f, job.getProgress(), 0.001f);
     Assert.assertEquals(0.14f, rmApp.getProgress(), 0.001f);
 
     // Finish off 5 more map so that map-progress is 60%
     finishNextNTasks(rmDispatcher, amNodeManager, mrApp, it, 5);
     allocator.schedule();
-    rmDispatcher.await();
+    rm.drainEvents();
     Assert.assertEquals(0.59f, job.getProgress(), 0.001f);
     Assert.assertEquals(0.59f, rmApp.getProgress(), 0.001f);
 
     // Finish off remaining map so that map-progress is 100%
     finishNextNTasks(rmDispatcher, amNodeManager, mrApp, it, 4);
     allocator.schedule();
-    rmDispatcher.await();
+    rm.drainEvents();
     Assert.assertEquals(0.95f, job.getProgress(), 0.001f);
     Assert.assertEquals(0.95f, rmApp.getProgress(), 0.001f);
   }
@@ -1152,21 +1129,19 @@ public class TestRMContainerAllocator {
     Configuration conf = new Configuration();
     MyResourceManager rm = new MyResourceManager(conf);
     rm.start();
-    DrainDispatcher dispatcher = (DrainDispatcher) rm.getRMContext()
-        .getDispatcher();
 
     // Submit the application
     RMApp app = rm.submitApp(1024);
-    dispatcher.await();
+    rm.drainEvents();
     MockNM amNodeManager = rm.registerNode("amNM:1234", 2048);
     amNodeManager.nodeHeartbeat(true);
-    dispatcher.await();
+    rm.drainEvents();
 
     ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt()
         .getAppAttemptId();
     rm.sendAMLaunched(appAttemptId);
-    dispatcher.await();
-    
+    rm.drainEvents();
+
     JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0);
     Job mockJob = mock(Job.class);
     MyContainerAllocator allocator = new MyContainerAllocator(rm, conf,
@@ -1175,7 +1150,7 @@ public class TestRMContainerAllocator {
     // add resources to scheduler
     MockNM nm1 = rm.registerNode("h1:1234", 10240);
     MockNM nm2 = rm.registerNode("h2:1234", 10240);
-    dispatcher.await();
+    rm.drainEvents();
 
     // create the map container request
     ContainerRequestEvent event = createReq(jobId, 1, 1024,
@@ -1191,16 +1166,16 @@ public class TestRMContainerAllocator {
 
     // this tells the scheduler about the requests
     List<TaskAttemptContainerAssignedEvent> assigned = allocator.schedule();
-    dispatcher.await();
+    rm.drainEvents();
 
     nm1.nodeHeartbeat(true);
-    dispatcher.await();
+    rm.drainEvents();
     Assert.assertEquals(1, allocator.getJobUpdatedNodeEvents().size());
     Assert.assertEquals(3, allocator.getJobUpdatedNodeEvents().get(0).getUpdatedNodes().size());
     allocator.getJobUpdatedNodeEvents().clear();
     // get the assignment
     assigned = allocator.schedule();
-    dispatcher.await();
+    rm.drainEvents();
     Assert.assertEquals(1, assigned.size());
     Assert.assertEquals(nm1.getNodeId(), assigned.get(0).getContainer().getNodeId());
     // no updated nodes reported
@@ -1210,11 +1185,11 @@ public class TestRMContainerAllocator {
     // mark nodes bad
     nm1.nodeHeartbeat(false);
     nm2.nodeHeartbeat(false);
-    dispatcher.await();
-    
+    rm.drainEvents();
+
     // schedule response returns updated nodes
     assigned = allocator.schedule();
-    dispatcher.await();
+    rm.drainEvents();
     Assert.assertEquals(0, assigned.size());
     // updated nodes are reported
     Assert.assertEquals(1, allocator.getJobUpdatedNodeEvents().size());
@@ -1225,7 +1200,7 @@ public class TestRMContainerAllocator {
     allocator.getTaskAttemptKillEvents().clear();
     
     assigned = allocator.schedule();
-    dispatcher.await();
+    rm.drainEvents();
     Assert.assertEquals(0, assigned.size());
     // no updated nodes reported
     Assert.assertTrue(allocator.getJobUpdatedNodeEvents().isEmpty());
@@ -1245,21 +1220,19 @@ public class TestRMContainerAllocator {
     
     MyResourceManager rm = new MyResourceManager(conf);
     rm.start();
-    DrainDispatcher dispatcher = (DrainDispatcher) rm.getRMContext()
-        .getDispatcher();
 
     // Submit the application
     RMApp app = rm.submitApp(1024);
-    dispatcher.await();
+    rm.drainEvents();
 
     MockNM amNodeManager = rm.registerNode("amNM:1234", 2048);
     amNodeManager.nodeHeartbeat(true);
-    dispatcher.await();
+    rm.drainEvents();
 
     ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt()
         .getAppAttemptId();
     rm.sendAMLaunched(appAttemptId);
-    dispatcher.await();
+    rm.drainEvents();
     
     JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0);
     Job mockJob = mock(Job.class);
@@ -1273,7 +1246,7 @@ public class TestRMContainerAllocator {
     MockNM nodeManager1 = rm.registerNode("h1:1234", 10240);
     MockNM nodeManager2 = rm.registerNode("h2:1234", 10240);
     MockNM nodeManager3 = rm.registerNode("h3:1234", 10240);
-    dispatcher.await();
+    rm.drainEvents();
 
     // create the container request
     ContainerRequestEvent event1 = createReq(jobId, 1, 1024,
@@ -1293,7 +1266,7 @@ public class TestRMContainerAllocator {
     // this tells the scheduler about the requests
     // as nodes are not added, no allocations
     List<TaskAttemptContainerAssignedEvent> assigned = allocator.schedule();
-    dispatcher.await();
+    rm.drainEvents();
     Assert.assertEquals("No of assignments must be 0", 0, assigned.size());
 
     // Send events to blacklist nodes h1 and h2
@@ -1305,28 +1278,28 @@ public class TestRMContainerAllocator {
     // update resources in scheduler
     nodeManager1.nodeHeartbeat(true); // Node heartbeat
     nodeManager2.nodeHeartbeat(true); // Node heartbeat
-    dispatcher.await();
+    rm.drainEvents();
 
     assigned = allocator.schedule();
     Assert.assertEquals("No of assignments must be 0", 0, assigned.size());
-    dispatcher.await();
+    rm.drainEvents();
     Assert.assertEquals("No of assignments must be 0", 0, assigned.size());
     assertBlacklistAdditionsAndRemovals(2, 0, rm);
 
     // mark h1/h2 as bad nodes
     nodeManager1.nodeHeartbeat(false);
     nodeManager2.nodeHeartbeat(false);
-    dispatcher.await();
+    rm.drainEvents();
 
     assigned = allocator.schedule();
-    dispatcher.await();
+    rm.drainEvents();
     assertBlacklistAdditionsAndRemovals(0, 0, rm);
     Assert.assertEquals("No of assignments must be 0", 0, assigned.size());    
 
     nodeManager3.nodeHeartbeat(true); // Node heartbeat
-    dispatcher.await();
+    rm.drainEvents();
     assigned = allocator.schedule();
-    dispatcher.await();
+    rm.drainEvents();
     assertBlacklistAdditionsAndRemovals(0, 0, rm);
         
     Assert.assertTrue("No of assignments must be 3", assigned.size() == 3);
@@ -1350,24 +1323,22 @@ public class TestRMContainerAllocator {
 
     MyResourceManager rm = new MyResourceManager(conf);
     rm.start();
-    DrainDispatcher dispatcher =
-        (DrainDispatcher) rm.getRMContext().getDispatcher();
 
     // Submit the application
     RMApp app = rm.submitApp(1024);
-    dispatcher.await();
+    rm.drainEvents();
 
     MockNM[] nodeManagers = new MockNM[10];
     int nmNum = 0;
     List<TaskAttemptContainerAssignedEvent> assigned = null;
-    nodeManagers[nmNum] = registerNodeManager(nmNum++, rm, dispatcher);
+    nodeManagers[nmNum] = registerNodeManager(nmNum++, rm);
     nodeManagers[0].nodeHeartbeat(true);
-    dispatcher.await();
+    rm.drainEvents();
 
     ApplicationAttemptId appAttemptId =
         app.getCurrentAppAttempt().getAppAttemptId();
     rm.sendAMLaunched(appAttemptId);
-    dispatcher.await();
+    rm.drainEvents();
 
     JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0);
     Job mockJob = mock(Job.class);
@@ -1380,7 +1351,7 @@ public class TestRMContainerAllocator {
     // Known=1, blacklisted=0, ignore should be false - assign first container
     assigned =
         getContainerOnHost(jobId, 1, 1024, new String[] { "h1" },
-            nodeManagers[0], dispatcher, allocator, 0, 0, 0, 0, rm);
+            nodeManagers[0], allocator, 0, 0, 0, 0, rm);
     Assert.assertEquals("No of assignments must be 1", 1, assigned.size());
 
     LOG.info("Failing container _1 on H1 (Node should be blacklisted and"
@@ -1395,47 +1366,47 @@ public class TestRMContainerAllocator {
     // The current call will send blacklisted node "h1" to RM
     assigned =
         getContainerOnHost(jobId, 2, 1024, new String[] { "h1" },
-            nodeManagers[0], dispatcher, allocator, 1, 0, 0, 1, rm);
+            nodeManagers[0], allocator, 1, 0, 0, 1, rm);
     Assert.assertEquals("No of assignments must be 0", 0, assigned.size());
 
     // Known=1, blacklisted=1, ignore should be true - assign 1
     assigned =
         getContainerOnHost(jobId, 2, 1024, new String[] { "h1" },
-            nodeManagers[0], dispatcher, allocator, 0, 0, 0, 0, rm);
+            nodeManagers[0], allocator, 0, 0, 0, 0, rm);
     Assert.assertEquals("No of assignments must be 1", 1, assigned.size());
 
-    nodeManagers[nmNum] = registerNodeManager(nmNum++, rm, dispatcher);
+    nodeManagers[nmNum] = registerNodeManager(nmNum++, rm);
     // Known=2, blacklisted=1, ignore should be true - assign 1 anyway.
     assigned =
         getContainerOnHost(jobId, 3, 1024, new String[] { "h2" },
-            nodeManagers[1], dispatcher, allocator, 0, 0, 0, 0, rm);
+            nodeManagers[1], allocator, 0, 0, 0, 0, rm);
     Assert.assertEquals("No of assignments must be 1", 1, assigned.size());
 
-    nodeManagers[nmNum] = registerNodeManager(nmNum++, rm, dispatcher);
+    nodeManagers[nmNum] = registerNodeManager(nmNum++, rm);
     // Known=3, blacklisted=1, ignore should be true - assign 1 anyway.
     assigned =
         getContainerOnHost(jobId, 4, 1024, new String[] { "h3" },
-            nodeManagers[2], dispatcher, allocator, 0, 0, 0, 0, rm);
+            nodeManagers[2], allocator, 0, 0, 0, 0, rm);
     Assert.assertEquals("No of assignments must be 1", 1, assigned.size());
 
     // Known=3, blacklisted=1, ignore should be true - assign 1
     assigned =
         getContainerOnHost(jobId, 5, 1024, new String[] { "h1" },
-            nodeManagers[0], dispatcher, allocator, 0, 0, 0, 0, rm);
+            nodeManagers[0], allocator, 0, 0, 0, 0, rm);
     Assert.assertEquals("No of assignments must be 1", 1, assigned.size());
 
-    nodeManagers[nmNum] = registerNodeManager(nmNum++, rm, dispatcher);
+    nodeManagers[nmNum] = registerNodeManager(nmNum++, rm);
     // Known=4, blacklisted=1, ignore should be false - assign 1 anyway
     assigned =
         getContainerOnHost(jobId, 6, 1024, new String[] { "h4" },
-            nodeManagers[3], dispatcher, allocator, 0, 0, 1, 0, rm);
+            nodeManagers[3], allocator, 0, 0, 1, 0, rm);
     Assert.assertEquals("No of assignments must be 1", 1, assigned.size());
 
     // Test blacklisting re-enabled.
     // Known=4, blacklisted=1, ignore should be false - no assignment on h1
     assigned =
         getContainerOnHost(jobId, 7, 1024, new String[] { "h1" },
-            nodeManagers[0], dispatcher, allocator, 0, 0, 0, 0, rm);
+            nodeManagers[0], allocator, 0, 0, 0, 0, rm);
     Assert.assertEquals("No of assignments must be 0", 0, assigned.size());
     // RMContainerRequestor would have created a replacement request.
 
@@ -1448,61 +1419,61 @@ public class TestRMContainerAllocator {
     // container for the same reason above.
     assigned =
         getContainerOnHost(jobId, 8, 1024, new String[] { "h1" },
-            nodeManagers[0], dispatcher, allocator, 1, 0, 0, 2, rm);
+            nodeManagers[0], allocator, 1, 0, 0, 2, rm);
     Assert.assertEquals("No of assignments must be 0", 0, assigned.size());
 
     // Known=4, blacklisted=2, ignore should be true. Should assign 2
     // containers.
     assigned =
         getContainerOnHost(jobId, 8, 1024, new String[] { "h1" },
-            nodeManagers[0], dispatcher, allocator, 0, 0, 0, 0, rm);
+            nodeManagers[0], allocator, 0, 0, 0, 0, rm);
     Assert.assertEquals("No of assignments must be 2", 2, assigned.size());
 
     // Known=4, blacklisted=2, ignore should be true.
     assigned =
         getContainerOnHost(jobId, 9, 1024, new String[] { "h2" },
-            nodeManagers[1], dispatcher, allocator, 0, 0, 0, 0, rm);
+            nodeManagers[1], allocator, 0, 0, 0, 0, rm);
     Assert.assertEquals("No of assignments must be 1", 1, assigned.size());
 
     // Test blacklist while ignore blacklisting enabled
     ContainerFailedEvent f3 = createFailEvent(jobId, 4, "h3", false);
     allocator.sendFailure(f3);
 
-    nodeManagers[nmNum] = registerNodeManager(nmNum++, rm, dispatcher);
+    nodeManagers[nmNum] = registerNodeManager(nmNum++, rm);
     // Known=5, blacklisted=3, ignore should be true.
     assigned =
         getContainerOnHost(jobId, 10, 1024, new String[] { "h3" },
-            nodeManagers[2], dispatcher, allocator, 0, 0, 0, 0, rm);
+            nodeManagers[2], allocator, 0, 0, 0, 0, rm);
     Assert.assertEquals("No of assignments must be 1", 1, assigned.size());
     
     // Assign on 5 more nodes - to re-enable blacklisting
     for (int i = 0; i < 5; i++) {
-      nodeManagers[nmNum] = registerNodeManager(nmNum++, rm, dispatcher);
+      nodeManagers[nmNum] = registerNodeManager(nmNum++, rm);
       assigned =
           getContainerOnHost(jobId, 11 + i, 1024,
               new String[] { String.valueOf(5 + i) }, nodeManagers[4 + i],
-              dispatcher, allocator, 0, 0, (i == 4 ? 3 : 0), 0, rm);
+              allocator, 0, 0, (i == 4 ? 3 : 0), 0, rm);
       Assert.assertEquals("No of assignments must be 1", 1, assigned.size());
     }
 
     // Test h3 (blacklisted while ignoring blacklisting) is blacklisted.
     assigned =
         getContainerOnHost(jobId, 20, 1024, new String[] { "h3" },
-            nodeManagers[2], dispatcher, allocator, 0, 0, 0, 0, rm);
+            nodeManagers[2], allocator, 0, 0, 0, 0, rm);
     Assert.assertEquals("No of assignments must be 0", 0, assigned.size());
   }
 
-  private MockNM registerNodeManager(int i, MyResourceManager rm,
-      DrainDispatcher dispatcher) throws Exception {
+  private MockNM registerNodeManager(int i, MyResourceManager rm)
+      throws Exception {
     MockNM nm = rm.registerNode("h" + (i + 1) + ":1234", 10240);
-    dispatcher.await();
+    rm.drainEvents();
     return nm;
   }
 
   private
       List<TaskAttemptContainerAssignedEvent> getContainerOnHost(JobId jobId,
           int taskAttemptId, int memory, String[] hosts, MockNM mockNM,
-          DrainDispatcher dispatcher, MyContainerAllocator allocator,
+          MyContainerAllocator allocator,
           int expectedAdditions1, int expectedRemovals1,
           int expectedAdditions2, int expectedRemovals2, MyResourceManager rm)
           throws Exception {
@@ -1512,17 +1483,17 @@ public class TestRMContainerAllocator {
 
     // Send the request to the RM
     List<TaskAttemptContainerAssignedEvent> assigned = allocator.schedule();
-    dispatcher.await();
+    rm.drainEvents();
     assertBlacklistAdditionsAndRemovals(
         expectedAdditions1, expectedRemovals1, rm);
     Assert.assertEquals("No of assignments must be 0", 0, assigned.size());
 
     // Heartbeat from the required nodeManager
     mockNM.nodeHeartbeat(true);
-    dispatcher.await();
+    rm.drainEvents();
 
     assigned = allocator.schedule();
-    dispatcher.await();
+    rm.drainEvents();
     assertBlacklistAdditionsAndRemovals(
         expectedAdditions2, expectedRemovals2, rm);
     return assigned;
@@ -1540,21 +1511,19 @@ public class TestRMContainerAllocator {
     
     MyResourceManager rm = new MyResourceManager(conf);
     rm.start();
-    DrainDispatcher dispatcher = (DrainDispatcher) rm.getRMContext()
-        .getDispatcher();
 
     // Submit the application
     RMApp app = rm.submitApp(1024);
-    dispatcher.await();
+    rm.drainEvents();
 
     MockNM amNodeManager = rm.registerNode("amNM:1234", 2048);
     amNodeManager.nodeHeartbeat(true);
-    dispatcher.await();
+    rm.drainEvents();
 
     ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt()
         .getAppAttemptId();
     rm.sendAMLaunched(appAttemptId);
-    dispatcher.await();
+    rm.drainEvents();
     
     JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0);
     Job mockJob = mock(Job.class);
@@ -1567,7 +1536,7 @@ public class TestRMContainerAllocator {
     // add resources to scheduler
     MockNM nodeManager1 = rm.registerNode("h1:1234", 10240);
     MockNM nodeManager3 = rm.registerNode("h3:1234", 10240);
-    dispatcher.await();
+    rm.drainEvents();
 
     LOG.info("Requesting 1 Containers _1 on H1");
     // create the container request
@@ -1579,17 +1548,17 @@ public class TestRMContainerAllocator {
     // this tells the scheduler about the requests
     // as nodes are not added, no allocations
     List<TaskAttemptContainerAssignedEvent> assigned = allocator.schedule();
-    dispatcher.await();
+    rm.drainEvents();
     Assert.assertEquals("No of assignments must be 0", 0, assigned.size());
 
     LOG.info("h1 Heartbeat (To actually schedule the containers)");
     // update resources in scheduler
     nodeManager1.nodeHeartbeat(true); // Node heartbeat
-    dispatcher.await();
+    rm.drainEvents();
     
     LOG.info("RM Heartbeat (To process the scheduled containers)");
     assigned = allocator.schedule();
-    dispatcher.await();
+    rm.drainEvents();
     assertBlacklistAdditionsAndRemovals(0, 0, rm);
     Assert.assertEquals("No of assignments must be 1", 1, assigned.size());    
     
@@ -1606,7 +1575,7 @@ public class TestRMContainerAllocator {
 
     //Update the Scheduler with the new requests.
     assigned = allocator.schedule();
-    dispatcher.await();
+    rm.drainEvents();
     assertBlacklistAdditionsAndRemovals(1, 0, rm);
     Assert.assertEquals("No of assignments must be 0", 0, assigned.size());
 
@@ -1621,11 +1590,11 @@ public class TestRMContainerAllocator {
     LOG.info("h1 Heartbeat (To actually schedule the containers)");
     // update resources in scheduler
     nodeManager1.nodeHeartbeat(true); // Node heartbeat
-    dispatcher.await();
+    rm.drainEvents();
     
     LOG.info("RM Heartbeat (To process the scheduled containers)");
     assigned = allocator.schedule();
-    dispatcher.await();
+    rm.drainEvents();
     assertBlacklistAdditionsAndRemovals(0, 0, rm);
     Assert.assertEquals("No of assignments must be 0", 0, assigned.size());    
     
@@ -1634,19 +1603,19 @@ public class TestRMContainerAllocator {
     //Send a release for the p:5 container + another request.
     LOG.info("RM Heartbeat (To process the re-scheduled containers)");
     assigned = allocator.schedule();
-    dispatcher.await();
+    rm.drainEvents();
     assertBlacklistAdditionsAndRemovals(0, 0, rm);
     Assert.assertEquals("No of assignments must be 0", 0, assigned.size());
     
     //Hearbeat from H3 to schedule on this host.
     LOG.info("h3 Heartbeat (To re-schedule the containers)");
     nodeManager3.nodeHeartbeat(true); // Node heartbeat
-    dispatcher.await();
+    rm.drainEvents();
     
     LOG.info("RM Heartbeat (To process the re-scheduled containers for H3)");
     assigned = allocator.schedule();
     assertBlacklistAdditionsAndRemovals(0, 0, rm);
-    dispatcher.await();
+    rm.drainEvents();
      
     // For debugging
     for (TaskAttemptContainerAssignedEvent assig : assigned) {
@@ -2225,22 +2194,20 @@ public class TestRMContainerAllocator {
     Configuration conf = new Configuration();
     final MyResourceManager rm = new MyResourceManager(conf);
     rm.start();
-    DrainDispatcher dispatcher = (DrainDispatcher) rm.getRMContext()
-        .getDispatcher();
 
     // Submit the application
     RMApp app = rm.submitApp(1024);
-    dispatcher.await();
+    rm.drainEvents();
 
     // Make a node to register so as to launch the AM.
     MockNM amNodeManager = rm.registerNode("amNM:1234", 2048);
     amNodeManager.nodeHeartbeat(true);
-    dispatcher.await();
+    rm.drainEvents();
 
     ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt()
         .getAppAttemptId();
     rm.sendAMLaunched(appAttemptId);
-    dispatcher.await();
+    rm.drainEvents();
 
     JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0);
     Job job = mock(Job.class);
@@ -2375,21 +2342,19 @@ public class TestRMContainerAllocator {
     Configuration conf = new Configuration();
     final MyResourceManager rm = new MyResourceManager(conf);
     rm.start();
-    DrainDispatcher rmDispatcher =
-        (DrainDispatcher) rm.getRMContext().getDispatcher();
 
     // Submit the application
     RMApp rmApp = rm.submitApp(1024);
-    rmDispatcher.await();
+    rm.drainEvents();
 
     MockNM amNodeManager = rm.registerNode("127.0.0.1:1234", 11264);
     amNodeManager.nodeHeartbeat(true);
-    rmDispatcher.await();
+    rm.drainEvents();
 
     final ApplicationAttemptId appAttemptId =
         rmApp.getCurrentAppAttempt().getAppAttemptId();
     rm.sendAMLaunched(appAttemptId);
-    rmDispatcher.await();
+    rm.drainEvents();
 
     MRApp mrApp =
         new MRApp(appAttemptId, ContainerId.newContainerId(appAttemptId, 0), 10,
@@ -2448,22 +2413,20 @@ public class TestRMContainerAllocator {
 
     MyResourceManager rm1 = new MyResourceManager(conf, memStore);
     rm1.start();
-    DrainDispatcher dispatcher =
-        (DrainDispatcher) rm1.getRMContext().getDispatcher();
 
     // Submit the application
     RMApp app = rm1.submitApp(1024);
-    dispatcher.await();
+    rm1.drainEvents();
 
     MockNM nm1 = new MockNM("h1:1234", 15120, rm1.getResourceTrackerService());
     nm1.registerNode();
     nm1.nodeHeartbeat(true); // Node heartbeat
-    dispatcher.await();
+    rm1.drainEvents();
 
     ApplicationAttemptId appAttemptId =
         app.getCurrentAppAttempt().getAppAttemptId();
     rm1.sendAMLaunched(appAttemptId);
-    dispatcher.await();
+    rm1.drainEvents();
 
     JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0);
     Job mockJob = mock(Job.class);
@@ -2492,7 +2455,7 @@ public class TestRMContainerAllocator {
     // send allocate request and 1 blacklisted nodes
     List<TaskAttemptContainerAssignedEvent> assignedContainers =
         allocator.schedule();
-    dispatcher.await();
+    rm1.drainEvents();
     Assert.assertEquals("No of assignments must be 0", 0,
         assignedContainers.size());
     // Why ask is 3, not 4? --> ask from blacklisted node h2 is removed
@@ -2500,11 +2463,11 @@ public class TestRMContainerAllocator {
     assertBlacklistAdditionsAndRemovals(1, 0, rm1);
 
     nm1.nodeHeartbeat(true); // Node heartbeat
-    dispatcher.await();
+    rm1.drainEvents();
 
     // Step-2 : 2 containers are allocated by RM.
     assignedContainers = allocator.schedule();
-    dispatcher.await();
+    rm1.drainEvents();
     Assert.assertEquals("No of assignments must be 2", 2,
         assignedContainers.size());
     assertAsksAndReleases(0, 0, rm1);
@@ -2539,7 +2502,6 @@ public class TestRMContainerAllocator {
     rm2.start();
     nm1.setResourceTrackerService(rm2.getResourceTrackerService());
     allocator.updateSchedulerProxy(rm2);
-    dispatcher = (DrainDispatcher) rm2.getRMContext().getDispatcher();
 
     // NM should be rebooted on heartbeat, even first heartbeat for nm2
     NodeHeartbeatResponse hbResponse = nm1.nodeHeartbeat(true);
@@ -2549,7 +2511,7 @@ public class TestRMContainerAllocator {
     nm1 = new MockNM("h1:1234", 10240, rm2.getResourceTrackerService());
     nm1.registerNode();
     nm1.nodeHeartbeat(true);
-    dispatcher.await();
+    rm2.drainEvents();
 
     // Step-4 : On RM restart, AM(does not know RM is restarted) sends
     // additional containerRequest(event4) and blacklisted nodes.
@@ -2570,7 +2532,7 @@ public class TestRMContainerAllocator {
 
     // send allocate request to 2nd RM and get resync command
     allocator.schedule();
-    dispatcher.await();
+    rm2.drainEvents();
 
     // Step-5 : On Resync,AM sends all outstanding
     // asks,release,blacklistAaddition
@@ -2581,16 +2543,16 @@ public class TestRMContainerAllocator {
 
     // send all outstanding request again.
     assignedContainers = allocator.schedule();
-    dispatcher.await();
+    rm2.drainEvents();
     assertAsksAndReleases(3, 2, rm2);
     assertBlacklistAdditionsAndRemovals(2, 0, rm2);
 
     nm1.nodeHeartbeat(true);
-    dispatcher.await();
+    rm2.drainEvents();
 
     // Step-6 : RM allocates containers i.e event3,event4 and cRequest5
     assignedContainers = allocator.schedule();
-    dispatcher.await();
+    rm2.drainEvents();
 
     Assert.assertEquals("Number of container should be 3", 3,
         assignedContainers.size());
@@ -2693,20 +2655,19 @@ public class TestRMContainerAllocator {
       MRJobConfig.MR_AM_TO_RM_WAIT_INTERVAL_MS, 0);
     MyResourceManager rm1 = new MyResourceManager(conf);
     rm1.start();
-    DrainDispatcher dispatcher =
-        (DrainDispatcher) rm1.getRMContext().getDispatcher();
+
     RMApp app = rm1.submitApp(1024);
-    dispatcher.await();
+    rm1.drainEvents();
 
     MockNM nm1 = new MockNM("h1:1234", 15120, rm1.getResourceTrackerService());
     nm1.registerNode();
     nm1.nodeHeartbeat(true);
-    dispatcher.await();
+    rm1.drainEvents();
 
     ApplicationAttemptId appAttemptId =
         app.getCurrentAppAttempt().getAppAttemptId();
     rm1.sendAMLaunched(appAttemptId);
-    dispatcher.await();
+    rm1.drainEvents();
 
     JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0);
     Job mockJob = mock(Job.class);
@@ -2722,7 +2683,7 @@ public class TestRMContainerAllocator {
     } catch (RMContainerAllocationException e) {
       Assert.assertTrue(e.getMessage().contains("Could not contact RM after"));
     }
-    dispatcher.await();
+    rm1.drainEvents();
     Assert.assertEquals("Should Have 1 Job Event", 1,
         allocator.jobEvents.size());
     JobEvent event = allocator.jobEvents.get(0); 
@@ -2744,22 +2705,20 @@ public class TestRMContainerAllocator {
     rm.start();
     AMRMTokenSecretManager secretMgr =
         rm.getRMContext().getAMRMTokenSecretManager();
-    DrainDispatcher dispatcher = (DrainDispatcher) rm.getRMContext()
-        .getDispatcher();
 
     // Submit the application
     RMApp app = rm.submitApp(1024);
-    dispatcher.await();
+    rm.drainEvents();
 
     MockNM amNodeManager = rm.registerNode("amNM:1234", 2048);
     amNodeManager.nodeHeartbeat(true);
-    dispatcher.await();
+    rm.drainEvents();
 
     final ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt()
         .getAppAttemptId();
     final ApplicationId appId = app.getApplicationId();
     rm.sendAMLaunched(appAttemptId);
-    dispatcher.await();
+    rm.drainEvents();
 
     JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0);
     final Job mockJob = mock(Job.class);
@@ -2947,21 +2906,19 @@ public class TestRMContainerAllocator {
     Configuration conf = new Configuration();
     MyResourceManager rm = new MyResourceManager(conf);
     rm.start();
-    DrainDispatcher dispatcher = (DrainDispatcher) rm.getRMContext()
-        .getDispatcher();
 
     // Submit the application
     RMApp app = rm.submitApp(1024);
-    dispatcher.await();
+    rm.drainEvents();
 
     MockNM amNodeManager = rm.registerNode("amNM:1234", 2048);
     amNodeManager.nodeHeartbeat(true);
-    dispatcher.await();
+    rm.drainEvents();
 
     ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt()
         .getAppAttemptId();
     rm.sendAMLaunched(appAttemptId);
-    dispatcher.await();
+    rm.drainEvents();
 
     JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0);
     Job mockJob = mock(Job.class);
@@ -2983,21 +2940,19 @@ public class TestRMContainerAllocator {
     Configuration conf = new Configuration();
     MyResourceManager rm = new MyResourceManager(conf);
     rm.start();
-    DrainDispatcher dispatcher =
-        (DrainDispatcher) rm.getRMContext().getDispatcher();
 
     // Submit the application
     RMApp app = rm.submitApp(1024);
-    dispatcher.await();
+    rm.drainEvents();
 
     MockNM amNodeManager = rm.registerNode("amNM:1234", 1260);
     amNodeManager.nodeHeartbeat(true);
-    dispatcher.await();
+    rm.drainEvents();
 
     ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt()
         .getAppAttemptId();
     rm.sendAMLaunched(appAttemptId);
-    dispatcher.await();
+    rm.drainEvents();
 
     JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0);
     Job mockJob = mock(Job.class);
@@ -3012,7 +2967,7 @@ public class TestRMContainerAllocator {
 
     // Register nodes to RM.
     MockNM nodeManager = rm.registerNode("h1:1234", 1024);
-    dispatcher.await();
+    rm.drainEvents();
 
     // Request 2 maps and 1 reducer(sone on nodes which are not registered).
     ContainerRequestEvent event1 =
@@ -3028,7 +2983,7 @@ public class TestRMContainerAllocator {
     // This will tell the scheduler about the requests but there will be no
     // allocations as nodes are not added.
     allocator.schedule();
-    dispatcher.await();
+    rm.drainEvents();
 
     // Advance clock so that maps can be considered as hanging.
     clock.setTime(System.currentTimeMillis() + 500000L);
@@ -3039,15 +2994,15 @@ public class TestRMContainerAllocator {
     allocator.sendRequest(event4);
 
     allocator.schedule();
-    dispatcher.await();
+    rm.drainEvents();
 
     // Update resources in scheduler through node heartbeat from h1.
     nodeManager.nodeHeartbeat(true);
-    dispatcher.await();
+    rm.drainEvents();
 
     rm.getMyFifoScheduler().forceResourceLimit(Resource.newInstance(1024, 1));
     allocator.schedule();
-    dispatcher.await();
+    rm.drainEvents();
 
     // One map is assigned.
     Assert.assertEquals(1, allocator.getAssignedRequests().maps.size());
@@ -3081,7 +3036,7 @@ public class TestRMContainerAllocator {
     // On next allocate request to scheduler, headroom reported will be 0.
     rm.getMyFifoScheduler().forceResourceLimit(Resource.newInstance(0, 0));
     allocator.schedule();
-    dispatcher.await();
+    rm.drainEvents();
     // After allocate response from scheduler, all scheduled reduces are ramped
     // down and move to pending. 3 asks are also updated with 0 containers to
     // indicate ramping down of reduces to scheduler.
@@ -3148,21 +3103,19 @@ public class TestRMContainerAllocator {
     Configuration conf = new Configuration();
     MyResourceManager rm = new MyResourceManager(conf);
     rm.start();
-    DrainDispatcher dispatcher =
-        (DrainDispatcher) rm.getRMContext().getDispatcher();
 
     // Submit the application
     RMApp app = rm.submitApp(1024);
-    dispatcher.await();
+    rm.drainEvents();
 
     MockNM amNodeManager = rm.registerNode("amNM:1234", 1260);
     amNodeManager.nodeHeartbeat(true);
-    dispatcher.await();
+    rm.drainEvents();
 
     ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt()
         .getAppAttemptId();
     rm.sendAMLaunched(appAttemptId);
-    dispatcher.await();
+    rm.drainEvents();
 
     JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0);
     Job mockJob = mock(Job.class);
@@ -3177,7 +3130,7 @@ public class TestRMContainerAllocator {
 
     // Register nodes to RM.
     MockNM nodeManager = rm.registerNode("h1:1234", 1024);
-    dispatcher.await();
+    rm.drainEvents();
 
     // Request 2 maps and 1 reducer(sone on nodes which are not registered).
     ContainerRequestEvent event1 =
@@ -3193,7 +3146,7 @@ public class TestRMContainerAllocator {
     // This will tell the scheduler about the requests but there will be no
     // allocations as nodes are not added.
     allocator.schedule();
-    dispatcher.await();
+    rm.drainEvents();
 
     // Advance clock so that maps can be considered as hanging.
     clock.setTime(System.currentTimeMillis() + 500000L);
@@ -3204,15 +3157,15 @@ public class TestRMContainerAllocator {
     allocator.sendRequest(event4);
 
     allocator.schedule();
-    dispatcher.await();
+    rm.drainEvents();
 
     // Update resources in scheduler through node heartbeat from h1.
     nodeManager.nodeHeartbeat(true);
-    dispatcher.await();
+    rm.drainEvents();
 
     rm.getMyFifoScheduler().forceResourceLimit(Resource.newInstance(1024, 1));
     allocator.schedule();
-    dispatcher.await();
+    rm.drainEvents();
 
     // One map is assigned.
     Assert.assertEquals(1, allocator.getAssignedRequests().maps.size());
@@ -3249,7 +3202,7 @@ public class TestRMContainerAllocator {
     // On next allocate request to scheduler, headroom reported will be 2048.
     rm.getMyFifoScheduler().forceResourceLimit(Resource.newInstance(2048, 0));
     allocator.schedule();
-    dispatcher.await();
+    rm.drainEvents();
     // After allocate response from scheduler, all scheduled reduces are ramped
     // down and move to pending. 3 asks are also updated with 0 containers to
     // indicate ramping down of reduces to scheduler.
@@ -3278,21 +3231,19 @@ public class TestRMContainerAllocator {
     conf.setInt(MRJobConfig.MR_JOB_REDUCER_UNCONDITIONAL_PREEMPT_DELAY_SEC, -1);
     MyResourceManager rm = new MyResourceManager(conf);
     rm.start();
-    DrainDispatcher dispatcher =
-        (DrainDispatcher) rm.getRMContext().getDispatcher();
 
     // Submit the application
     RMApp app = rm.submitApp(1024);
-    dispatcher.await();
+    rm.drainEvents();
 
     MockNM amNodeManager = rm.registerNode("amNM:1234", 1260);
     amNodeManager.nodeHeartbeat(true);
-    dispatcher.await();
+    rm.drainEvents();
 
     ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt()
         .getAppAttemptId();
     rm.sendAMLaunched(appAttemptId);
-    dispatcher.await();
+    rm.drainEvents();
 
     JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0);
     Job mockJob = mock(Job.class);
@@ -3308,10 +3259,10 @@ public class TestRMContainerAllocator {
         appAttemptId, mockJob);
 
     MockNM nodeManager = rm.registerNode("h1:1234", 4096);
-    dispatcher.await();
+    rm.drainEvents();
     // Register nodes to RM.
     MockNM nodeManager2 = rm.registerNode("h2:1234", 1024);
-    dispatcher.await();
+    rm.drainEvents();
 
     // Request 2 maps and 1 reducer(sone on nodes which are not registered).
     ContainerRequestEvent event1 =
@@ -3327,7 +3278,7 @@ public class TestRMContainerAllocator {
     // This will tell the scheduler about the requests but there will be no
     // allocations as nodes are not added.
     allocator.schedule();
-    dispatcher.await();
+    rm.drainEvents();
 
     // Request for another reducer on h3 which has not registered.
     ContainerRequestEvent event4 =
@@ -3335,15 +3286,15 @@ public class TestRMContainerAllocator {
     allocator.sendRequest(event4);
 
     allocator.schedule();
-    dispatcher.await();
+    rm.drainEvents();
 
     // Update resources in scheduler through node heartbeat from h1.
     nodeManager.nodeHeartbeat(true);
-    dispatcher.await();
+    rm.drainEvents();
 
     rm.getMyFifoScheduler().forceResourceLimit(Resource.newInstance(3072, 3));
     allocator.schedule();
-    dispatcher.await();
+    rm.drainEvents();
 
     // Two maps are assigned.
     Assert.assertEquals(2, allocator.getAssignedRequests().maps.size());
@@ -3356,15 +3307,15 @@ public class TestRMContainerAllocator {
     Assert.assertEquals(0, allocator.getAssignedRequests().maps.size());
 
     nodeManager.nodeHeartbeat(true);
-    dispatcher.await();
+    rm.drainEvents();
 
     rm.getMyFifoScheduler().forceResourceLimit(Resource.newInstance(1024, 1));
     allocator.schedule();
-    dispatcher.await();
+    rm.drainEvents();
 
     // h2 heartbeats.
     nodeManager2.nodeHeartbeat(true);
-    dispatcher.await();
+    rm.drainEvents();
 
     // Send request for one more mapper.
     ContainerRequestEvent event5 =
@@ -3373,7 +3324,7 @@ public class TestRMContainerAllocator {
 
     rm.getMyFifoScheduler().forceResourceLimit(Resource.newInstance(2048, 2));
     allocator.schedule();
-    dispatcher.await();
+    rm.drainEvents();
     // One reducer is assigned and one map is scheduled
     Assert.assertEquals(1, allocator.getScheduledRequests().maps.size());
     Assert.assertEquals(1, allocator.getAssignedRequests().reduces.size());
@@ -3381,7 +3332,7 @@ public class TestRMContainerAllocator {
     // enough if scheduled reducers resources are deducted.
     rm.getMyFifoScheduler().forceResourceLimit(Resource.newInstance(1260, 2));
     allocator.schedule();
-    dispatcher.await();
+    rm.drainEvents();
     // After allocate response, the one assigned reducer is preempted and killed
     Assert.assertEquals(1, MyContainerAllocator.getTaskAttemptKillEvents().size());
     Assert.assertEquals(RMContainerAllocator.RAMPDOWN_DIAGNOSTIC,

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3ae529ba/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientOnRMRestart.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientOnRMRestart.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientOnRMRestart.java
index ac77446..073b931 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientOnRMRestart.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientOnRMRestart.java
@@ -48,8 +48,6 @@ import org.apache.hadoop.yarn.api.records.UpdateContainerRequest;
 import org.apache.hadoop.yarn.client.api.AMRMClient;
 import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.event.Dispatcher;
-import org.apache.hadoop.yarn.event.DrainDispatcher;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.ipc.YarnRPC;
 import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
@@ -125,22 +123,20 @@ public class TestAMRMClientOnRMRestart {
     // Phase-1 Start 1st RM
     MyResourceManager rm1 = new MyResourceManager(conf, memStore);
     rm1.start();
-    DrainDispatcher dispatcher =
-        (DrainDispatcher) rm1.getRMContext().getDispatcher();
 
     // Submit the application
     RMApp app = rm1.submitApp(1024);
-    dispatcher.await();
+    rm1.drainEvents();
 
     MockNM nm1 = new MockNM("h1:1234", 15120, rm1.getResourceTrackerService());
     nm1.registerNode();
     nm1.nodeHeartbeat(true); // Node heartbeat
-    dispatcher.await();
+    rm1.drainEvents();
 
     ApplicationAttemptId appAttemptId =
         app.getCurrentAppAttempt().getAppAttemptId();
     rm1.sendAMLaunched(appAttemptId);
-    dispatcher.await();
+    rm1.drainEvents();
 
     org.apache.hadoop.security.token.Token<AMRMTokenIdentifier> token =
         rm1.getRMContext().getRMApps().get(appAttemptId.getApplicationId())
@@ -175,7 +171,7 @@ public class TestAMRMClientOnRMRestart {
     blacklistAdditions.remove("h2");// remove from local list
 
     AllocateResponse allocateResponse = amClient.allocate(0.1f);
-    dispatcher.await();
+    rm1.drainEvents();
     Assert.assertEquals("No of assignments must be 0", 0, allocateResponse
         .getAllocatedContainers().size());
 
@@ -188,10 +184,10 @@ public class TestAMRMClientOnRMRestart {
     // Step-2 : NM heart beat is sent.
     // On 2nd AM allocate request, RM allocates 3 containers to AM
     nm1.nodeHeartbeat(true); // Node heartbeat
-    dispatcher.await();
+    rm1.drainEvents();
 
     allocateResponse = amClient.allocate(0.2f);
-    dispatcher.await();
+    rm1.drainEvents();
     // 3 containers are allocated i.e for cRequest1, cRequest2 and cRequest3.
     Assert.assertEquals("No of assignments must be 0", 3, allocateResponse
         .getAllocatedContainers().size());
@@ -206,7 +202,7 @@ public class TestAMRMClientOnRMRestart {
     amClient.removeContainerRequest(cRequest3);
 
     allocateResponse = amClient.allocate(0.2f);
-    dispatcher.await();
+    rm1.drainEvents();
     Assert.assertEquals("No of assignments must be 0", 0, allocateResponse
         .getAllocatedContainers().size());
     assertAsksAndReleases(4, 0, rm1);
@@ -232,13 +228,13 @@ public class TestAMRMClientOnRMRestart {
     // request
     nm1.nodeHeartbeat(containerId.getApplicationAttemptId(),
         containerId.getContainerId(), ContainerState.RUNNING);
-    dispatcher.await();
+    rm1.drainEvents();
     amClient.requestContainerResourceChange(
         container, Resource.newInstance(2048, 1));
     it.remove();
 
     allocateResponse = amClient.allocate(0.3f);
-    dispatcher.await();
+    rm1.drainEvents();
     Assert.assertEquals("No of assignments must be 0", 0, allocateResponse
         .getAllocatedContainers().size());
     assertAsksAndReleases(3, pendingRelease, rm1);
@@ -254,7 +250,6 @@ public class TestAMRMClientOnRMRestart {
     rm2.start();
     nm1.setResourceTrackerService(rm2.getResourceTrackerService());
     ((MyAMRMClientImpl) amClient).updateRMProxy(rm2);
-    dispatcher = (DrainDispatcher) rm2.getRMContext().getDispatcher();
 
     // NM should be rebooted on heartbeat, even first heartbeat for nm2
     NodeHeartbeatResponse hbResponse = nm1.nodeHeartbeat(true);
@@ -270,7 +265,7 @@ public class TestAMRMClientOnRMRestart {
         Collections.singletonList(
             containerId.getApplicationAttemptId().getApplicationId()));
     nm1.nodeHeartbeat(true);
-    dispatcher.await();
+    rm2.drainEvents();
 
     blacklistAdditions.add("h3");
     amClient.updateBlacklist(blacklistAdditions, null);
@@ -292,7 +287,7 @@ public class TestAMRMClientOnRMRestart {
     // containerRequest and blacklisted nodes.
     // Intern RM send resync command,AMRMClient resend allocate request
     allocateResponse = amClient.allocate(0.3f);
-    dispatcher.await();
+    rm2.drainEvents();
 
     completedContainer =
         allocateResponse.getCompletedContainersStatuses().size();
@@ -309,7 +304,7 @@ public class TestAMRMClientOnRMRestart {
 
     // Step-5 : Allocater after resync command
     allocateResponse = amClient.allocate(0.5f);
-    dispatcher.await();
+    rm2.drainEvents();
     Assert.assertEquals("No of assignments must be 0", 0, allocateResponse
         .getAllocatedContainers().size());
 
@@ -322,10 +317,10 @@ public class TestAMRMClientOnRMRestart {
     int count = 5;
     while (count-- > 0) {
       nm1.nodeHeartbeat(true);
-      dispatcher.await();
+      rm2.drainEvents();
 
       allocateResponse = amClient.allocate(0.5f);
-      dispatcher.await();
+      rm2.drainEvents();
       noAssignedContainer += allocateResponse.getAllocatedContainers().size();
       if (noAssignedContainer == 3) {
         break;
@@ -354,22 +349,20 @@ public class TestAMRMClientOnRMRestart {
     // Phase-1 Start 1st RM
     MyResourceManager rm1 = new MyResourceManager(conf, memStore);
     rm1.start();
-    DrainDispatcher dispatcher =
-        (DrainDispatcher) rm1.getRMContext().getDispatcher();
 
     // Submit the application
     RMApp app = rm1.submitApp(1024);
-    dispatcher.await();
+    rm1.drainEvents();
 
     MockNM nm1 = new MockNM("h1:1234", 15120, rm1.getResourceTrackerService());
     nm1.registerNode();
     nm1.nodeHeartbeat(true); // Node heartbeat
-    dispatcher.await();
+    rm1.drainEvents();
 
     ApplicationAttemptId appAttemptId =
         app.getCurrentAppAttempt().getAppAttemptId();
     rm1.sendAMLaunched(appAttemptId);
-    dispatcher.await();
+    rm1.drainEvents();
 
     org.apache.hadoop.security.token.Token<AMRMTokenIdentifier> token =
         rm1.getRMContext().getRMApps().get(appAttemptId.getApplicationId())
@@ -389,7 +382,6 @@ public class TestAMRMClientOnRMRestart {
     rm2.start();
     nm1.setResourceTrackerService(rm2.getResourceTrackerService());
     ((MyAMRMClientImpl) amClient).updateRMProxy(rm2);
-    dispatcher = (DrainDispatcher) rm2.getRMContext().getDispatcher();
 
     // NM should be rebooted on heartbeat, even first heartbeat for nm2
     NodeHeartbeatResponse hbResponse = nm1.nodeHeartbeat(true);
@@ -405,7 +397,7 @@ public class TestAMRMClientOnRMRestart {
             Priority.newInstance(0), 0);
     nm1.registerNode(Arrays.asList(containerReport), null);
     nm1.nodeHeartbeat(true);
-    dispatcher.await();
+    rm2.drainEvents();
 
     amClient.unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED,
         null, null);
@@ -417,7 +409,6 @@ public class TestAMRMClientOnRMRestart {
     amClient.stop();
     rm1.stop();
     rm2.stop();
-
   }
 
 
@@ -435,22 +426,20 @@ public class TestAMRMClientOnRMRestart {
     // start first RM
     MyResourceManager2 rm1 = new MyResourceManager2(conf, memStore);
     rm1.start();
-    DrainDispatcher dispatcher =
-        (DrainDispatcher) rm1.getRMContext().getDispatcher();
     Long startTime = System.currentTimeMillis();
     // Submit the application
     RMApp app = rm1.submitApp(1024);
-    dispatcher.await();
+    rm1.drainEvents();
 
     MockNM nm1 = new MockNM("h1:1234", 15120, rm1.getResourceTrackerService());
     nm1.registerNode();
     nm1.nodeHeartbeat(true); // Node heartbeat
-    dispatcher.await();
+    rm1.drainEvents();
 
     ApplicationAttemptId appAttemptId =
         app.getCurrentAppAttempt().getAppAttemptId();
     rm1.sendAMLaunched(appAttemptId);
-    dispatcher.await();
+    rm1.drainEvents();
 
     AMRMTokenSecretManager amrmTokenSecretManagerForRM1 =
         rm1.getRMContext().getAMRMTokenSecretManager();
@@ -509,7 +498,6 @@ public class TestAMRMClientOnRMRestart {
     rm2.start();
     nm1.setResourceTrackerService(rm2.getResourceTrackerService());
     ((MyAMRMClientImpl) amClient).updateRMProxy(rm2);
-    dispatcher = (DrainDispatcher) rm2.getRMContext().getDispatcher();
 
     AMRMTokenSecretManager amrmTokenSecretManagerForRM2 =
         rm2.getRMContext().getAMRMTokenSecretManager();
@@ -612,11 +600,6 @@ public class TestAMRMClientOnRMRestart {
     }
 
     @Override
-    protected Dispatcher createDispatcher() {
-      return new DrainDispatcher();
-    }
-
-    @Override
     protected EventHandler<SchedulerEvent> createSchedulerEventDispatcher() {
       // Dispatch inline for test sanity
       return new EventHandler<SchedulerEvent>() {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3ae529ba/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/ACLsTestBase.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/ACLsTestBase.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/ACLsTestBase.java
index fbd5ac3..100eb7f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/ACLsTestBase.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/ACLsTestBase.java
@@ -30,14 +30,9 @@ import org.apache.hadoop.security.authorize.AccessControlList;
 import org.apache.hadoop.service.Service.STATE;
 import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.event.Dispatcher;
-import org.apache.hadoop.yarn.event.DrainDispatcher;
 import org.apache.hadoop.yarn.ipc.YarnRPC;
 import org.junit.Before;
 
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
 public abstract class ACLsTestBase {
 
   protected static final String COMMON_USER = "common_user";
@@ -81,11 +76,6 @@ public abstract class ACLsTestBase {
       }
 
       @Override
-      protected Dispatcher createDispatcher() {
-        return new DrainDispatcher();
-      }
-
-      @Override
       protected void doSecureLogin() throws IOException {
       }
     };

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3ae529ba/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/RMHATestBase.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/RMHATestBase.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/RMHATestBase.java
index c9ce7d7..c95bcdf 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/RMHATestBase.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/RMHATestBase.java
@@ -26,22 +26,17 @@ import org.apache.hadoop.ha.HAServiceProtocol;
 import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
 import org.apache.hadoop.ha.HAServiceProtocol.StateChangeRequestInfo;
 import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
-import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.conf.HAUtil;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.event.Dispatcher;
-import org.apache.hadoop.yarn.event.DrainDispatcher;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.ZKRMStateStore;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
 import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
-import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -108,20 +103,9 @@ public abstract class RMHATestBase extends ClientBaseWithFixes{
   }
 
   protected void startRMs() throws IOException {
-    rm1 = new MockRM(confForRM1, null, false, false){
-      @Override
-      protected Dispatcher createDispatcher() {
-        return new DrainDispatcher();
-      }
-    };
-    rm2 = new MockRM(confForRM2, null, false, false){
-      @Override
-      protected Dispatcher createDispatcher() {
-        return new DrainDispatcher();
-      }
-    };
+    rm1 = new MockRM(confForRM1, null, false, false);
+    rm2 = new MockRM(confForRM2, null, false, false);
     startRMs(rm1, confForRM1, rm2, confForRM2);
-
   }
 
   protected void startRMsWithCustomizedRMAppManager() throws IOException {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3ae529ba/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/ReservationACLsTestBase.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/ReservationACLsTestBase.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/ReservationACLsTestBase.java
index 03bc889..c8ee00e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/ReservationACLsTestBase.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/ReservationACLsTestBase.java
@@ -47,7 +47,6 @@ import org.apache.hadoop.yarn.api.records.ReservationRequest;
 import org.apache.hadoop.yarn.api.records.ReservationRequestInterpreter;
 import org.apache.hadoop.yarn.api.records.ReservationRequests;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.event.DrainDispatcher;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.server.resourcemanager.reservation.Plan;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
@@ -463,9 +462,7 @@ public class ReservationACLsTestBase extends ACLsTestBase {
       int attempts = 10;
       Collection<Plan> plans;
       do {
-        DrainDispatcher dispatcher =
-                (DrainDispatcher) resourceManager.getRMContext().getDispatcher();
-        dispatcher.await();
+        resourceManager.drainEvents();
         LOG.info("Waiting for node capacity to be added to plan");
         plans = resourceManager.getRMContext().getReservationSystem()
                 .getAllPlans().values();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3ae529ba/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java
index c4197a1..422b7eb 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java
@@ -40,8 +40,6 @@ import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.event.Dispatcher;
-import org.apache.hadoop.yarn.event.DrainDispatcher;
 import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
 import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore;
@@ -161,13 +159,7 @@ public class TestApplicationCleanup {
 
     Logger rootLogger = LogManager.getRootLogger();
     rootLogger.setLevel(Level.DEBUG);
-    final DrainDispatcher dispatcher = new DrainDispatcher();
-    MockRM rm = new MockRM() {
-      @Override
-      protected Dispatcher createDispatcher() {
-        return dispatcher;
-      }
-    };
+    MockRM rm = new MockRM();
     rm.start();
 
     MockNM nm1 = rm.registerNode("127.0.0.1:1234", 5000);
@@ -185,8 +177,8 @@ public class TestApplicationCleanup {
     int request = 2;
     am.allocate("127.0.0.1" , 1000, request, 
         new ArrayList<ContainerId>());
-    dispatcher.await();
-    
+    rm.drainEvents();
+
     //kick the scheduler
     nm1.nodeHeartbeat(true);
     List<Container> conts = am.allocate(new ArrayList<ResourceRequest>(),
@@ -199,7 +191,7 @@ public class TestApplicationCleanup {
       Thread.sleep(100);
       conts = am.allocate(new ArrayList<ResourceRequest>(),
           new ArrayList<ContainerId>()).getAllocatedContainers();
-      dispatcher.await();
+      rm.drainEvents();
       contReceived += conts.size();
       nm1.nodeHeartbeat(true);
     }
@@ -209,7 +201,7 @@ public class TestApplicationCleanup {
     ArrayList<ContainerId> release = new ArrayList<ContainerId>();
     release.add(conts.get(0).getId());
     am.allocate(new ArrayList<ResourceRequest>(), release);
-    dispatcher.await();
+    rm.drainEvents();
 
     // Send one more heartbeat with a fake running container. This is to
     // simulate the situation that can happen if the NM reports that container
@@ -224,7 +216,7 @@ public class TestApplicationCleanup {
     containerStatuses.put(app.getApplicationId(), containerStatusList);
 
     NodeHeartbeatResponse resp = nm1.nodeHeartbeat(containerStatuses, true);
-    waitForContainerCleanup(dispatcher, nm1, resp);
+    waitForContainerCleanup(rm, nm1, resp);
 
     // Now to test the case when RM already gave cleanup, and NM suddenly
     // realizes that the container is running.
@@ -240,17 +232,17 @@ public class TestApplicationCleanup {
     resp = nm1.nodeHeartbeat(containerStatuses, true);
     // The cleanup list won't be instantaneous as it is given out by scheduler
     // and not RMNodeImpl.
-    waitForContainerCleanup(dispatcher, nm1, resp);
+    waitForContainerCleanup(rm, nm1, resp);
 
     rm.stop();
   }
 
-  protected void waitForContainerCleanup(DrainDispatcher dispatcher, MockNM nm,
+  protected void waitForContainerCleanup(MockRM rm, MockNM nm,
       NodeHeartbeatResponse resp) throws Exception {
     int waitCount = 0, cleanedConts = 0;
     List<ContainerId> contsToClean;
     do {
-      dispatcher.await();
+      rm.drainEvents();
       contsToClean = resp.getContainersToCleanup();
       cleanedConts += contsToClean.size();
       if (cleanedConts >= 1) {
@@ -400,13 +392,7 @@ public class TestApplicationCleanup {
     memStore.init(conf);
 
     // start RM
-    final DrainDispatcher dispatcher = new DrainDispatcher();
-    MockRM rm1 = new MockRM(conf, memStore) {
-      @Override
-      protected Dispatcher createDispatcher() {
-        return dispatcher;
-      }
-    };
+    MockRM rm1 = new MockRM(conf, memStore);
     rm1.start();
     MockNM nm1 =
         new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService());
@@ -419,13 +405,7 @@ public class TestApplicationCleanup {
     rm1.waitForState(app0.getApplicationId(), RMAppState.RUNNING);
 
     // start new RM
-    final DrainDispatcher dispatcher2 = new DrainDispatcher();
-    MockRM rm2 = new MockRM(conf, memStore) {
-      @Override
-      protected Dispatcher createDispatcher() {
-        return dispatcher2;
-      }
-    };
+    MockRM rm2 = new MockRM(conf, memStore);
     rm2.start();
 
     // nm1 register to rm2, and do a heartbeat
@@ -437,7 +417,7 @@ public class TestApplicationCleanup {
     NodeHeartbeatResponse response = nm1.nodeHeartbeat(am0
         .getApplicationAttemptId(), 2, ContainerState.RUNNING);
 
-    waitForContainerCleanup(dispatcher2, nm1, response);
+    waitForContainerCleanup(rm2, nm1, response);
 
     rm1.stop();
     rm2.stop();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3ae529ba/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterLauncher.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterLauncher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterLauncher.java
index ad24708..92d6d2c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterLauncher.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterLauncher.java
@@ -59,8 +59,6 @@ import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.api.records.SerializedException;
 import org.apache.hadoop.yarn.api.records.Token;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.event.Dispatcher;
-import org.apache.hadoop.yarn.event.DrainDispatcher;
 import org.apache.hadoop.yarn.exceptions.ApplicationAttemptNotFoundException;
 import org.apache.hadoop.yarn.exceptions.ApplicationMasterNotRegisteredException;
 import org.apache.hadoop.yarn.exceptions.NMNotYetReadyException;
@@ -261,7 +259,6 @@ public class TestApplicationMasterLauncher {
     Configuration conf = new Configuration();
     conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 1);
     conf.setInt(YarnConfiguration.CLIENT_NM_CONNECT_RETRY_INTERVAL_MS, 1);
-    final DrainDispatcher dispatcher = new DrainDispatcher();
     MockRM rm = new MockRMWithCustomAMLauncher(conf, null) {
       @Override
       protected ApplicationMasterLauncher createAMLauncher() {
@@ -285,12 +282,8 @@ public class TestApplicationMasterLauncher {
           }
         };
       }
-
-      @Override
-      protected Dispatcher createDispatcher() {
-        return dispatcher;
-      }
     };
+
     rm.start();
     MockNM nm1 = rm.registerNode("127.0.0.1:1234", 5120);
 
@@ -298,7 +291,7 @@ public class TestApplicationMasterLauncher {
 
     // kick the scheduling
     nm1.nodeHeartbeat(true);
-    dispatcher.await();
+    rm.drainEvents();
 
     MockRM.waitForState(app.getCurrentAppAttempt(),
       RMAppAttemptState.LAUNCHED, 500);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3ae529ba/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java
index 23bed22..18c49bd 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java
@@ -42,8 +42,6 @@ import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.api.records.UpdateContainerRequest;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.event.Dispatcher;
-import org.apache.hadoop.yarn.event.DrainDispatcher;
 import org.apache.hadoop.yarn.exceptions.ApplicationMasterNotRegisteredException;
 import org.apache.hadoop.yarn.exceptions.InvalidContainerReleaseException;
 import org.apache.hadoop.yarn.proto.YarnServiceProtos.SchedulerResourceTypes;
@@ -327,10 +325,8 @@ public class TestApplicationMasterService {
 
   @Test(timeout=1200000)
   public void  testAllocateAfterUnregister() throws Exception {
-    MyResourceManager rm = new MyResourceManager(conf);
+    MockRM rm = new MockRM(conf);
     rm.start();
-    DrainDispatcher rmDispatcher = (DrainDispatcher) rm.getRMContext()
-            .getDispatcher();
     // Register node1
     MockNM nm1 = rm.registerNode("127.0.0.1:1234", 6 * GB);
 
@@ -351,7 +347,7 @@ public class TestApplicationMasterService {
     AllocateResponse alloc1Response = am1.schedule();
 
     nm1.nodeHeartbeat(true);
-    rmDispatcher.await();
+    rm.drainEvents();
     alloc1Response = am1.schedule();
     Assert.assertEquals(0, alloc1Response.getAllocatedContainers().size());
   }
@@ -474,17 +470,6 @@ public class TestApplicationMasterService {
     rm.stop();
   }
 
-  private static class MyResourceManager extends MockRM {
-
-    public MyResourceManager(YarnConfiguration conf) {
-      super(conf);
-    }
-    @Override
-    protected Dispatcher createDispatcher() {
-      return new DrainDispatcher();
-    }
-  }
-  
   private void sentRMContainerLaunched(MockRM rm, ContainerId containerId) {
     CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
     RMContainer rmContainer = cs.getRMContainer(containerId);


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


Mime
View raw message