hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From naganarasimha...@apache.org
Subject hadoop git commit: YARN-5911. DrainDispatcher does not drain all events on stop even if setDrainEventsOnStop is true. Contributed by Varun Saxena.
Date Wed, 23 Nov 2016 04:05:42 GMT
Repository: hadoop
Updated Branches:
  refs/heads/trunk 2bf9a15e8 -> 466756416


YARN-5911. DrainDispatcher does not drain all events on stop even if setDrainEventsOnStop
is true. Contributed by Varun Saxena.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/46675641
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/46675641
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/46675641

Branch: refs/heads/trunk
Commit: 466756416214a4bbc77af8a29da1a33e01106864
Parents: 2bf9a15
Author: Naganarasimha <naganarasimha_gr@apache.org>
Authored: Wed Nov 23 08:44:58 2016 +0530
Committer: Naganarasimha <naganarasimha_gr@apache.org>
Committed: Wed Nov 23 08:49:48 2016 +0530

----------------------------------------------------------------------
 .../hadoop/yarn/event/AsyncDispatcher.java      |  6 ++-
 .../hadoop/yarn/event/DrainDispatcher.java      |  9 +----
 .../hadoop/yarn/event/TestAsyncDispatcher.java  | 42 ++++++++++++++++++++
 3 files changed, 48 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/46675641/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java
index 42a6819..94bfab6 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java
@@ -151,7 +151,7 @@ public class AsyncDispatcher extends AbstractService implements Dispatcher
{
         while (!isDrained() && eventHandlingThread != null
             && eventHandlingThread.isAlive()
             && System.currentTimeMillis() < endTime) {
-          waitForDrained.wait(1000);
+          waitForDrained.wait(100);
           LOG.info("Waiting for AsyncDispatcher to drain. Thread state is :" +
               eventHandlingThread.getState());
         }
@@ -308,4 +308,8 @@ public class AsyncDispatcher extends AbstractService implements Dispatcher
{
   protected boolean isDrained() {
     return drained;
   }
+
+  protected boolean isStopped() {
+    return stopped;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/46675641/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/DrainDispatcher.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/DrainDispatcher.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/DrainDispatcher.java
index 1369465..c5ba072 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/DrainDispatcher.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/DrainDispatcher.java
@@ -25,7 +25,6 @@ import java.util.concurrent.LinkedBlockingQueue;
 @SuppressWarnings("rawtypes")
 public class DrainDispatcher extends AsyncDispatcher {
   private volatile boolean drained = false;
-  private volatile boolean stopped = false;
   private final BlockingQueue<Event> queue;
   private final Object mutex;
 
@@ -69,7 +68,7 @@ public class DrainDispatcher extends AsyncDispatcher {
     return new Runnable() {
       @Override
       public void run() {
-        while (!stopped && !Thread.currentThread().isInterrupted()) {
+        while (!isStopped() && !Thread.currentThread().isInterrupted()) {
           synchronized (mutex) {
             // !drained if dispatch queued new events on this dispatcher
             drained = queue.isEmpty();
@@ -109,10 +108,4 @@ public class DrainDispatcher extends AsyncDispatcher {
       return drained;
     }
   }
-
-  @Override
-  protected void serviceStop() throws Exception {
-    stopped = true;
-    super.serviceStop();
-  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/46675641/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/TestAsyncDispatcher.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/TestAsyncDispatcher.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/TestAsyncDispatcher.java
index 018096b..2b9d745 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/TestAsyncDispatcher.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/TestAsyncDispatcher.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.junit.Assert;
 import org.junit.Test;
 
+import static org.junit.Assert.assertEquals;
 import static org.mockito.Mockito.*;
 
 public class TestAsyncDispatcher {
@@ -77,5 +78,46 @@ public class TestAsyncDispatcher {
     disp.waitForEventThreadToWait();
     disp.close();
   }
+
+  @SuppressWarnings("rawtypes")
+  private static class DummyHandler implements EventHandler<Event> {
+    @Override
+    public void handle(Event event) {
+      try {
+        Thread.sleep(500);
+      } catch (InterruptedException e) {}
+    }
+  }
+
+  private enum DummyType {
+    DUMMY
+  }
+
+  @SuppressWarnings({ "rawtypes", "unchecked" })
+  private void dispatchDummyEvents(Dispatcher disp, int count) {
+    for (int i = 0; i < count; i++) {
+      Event event = mock(Event.class);
+      when(event.getType()).thenReturn(DummyType.DUMMY);
+      disp.getEventHandler().handle(event);
+    }
+  }
+
+  // Test if drain dispatcher drains events on stop.
+  @SuppressWarnings({ "rawtypes" })
+  @Test(timeout=10000)
+  public void testDrainDispatcherDrainEventsOnStop() throws Exception {
+    YarnConfiguration conf = new YarnConfiguration();
+    conf.setInt(YarnConfiguration.DISPATCHER_DRAIN_EVENTS_TIMEOUT, 2000);
+    BlockingQueue<Event> queue = new LinkedBlockingQueue<Event>();
+    DrainDispatcher disp = new DrainDispatcher(queue);
+    disp.init(conf);
+    disp.register(DummyType.class, new DummyHandler());
+    disp.setDrainEventsOnStop();
+    disp.start();
+    disp.waitForEventThreadToWait();
+    dispatchDummyEvents(disp, 2);
+    disp.close();
+    assertEquals(0, queue.size());
+  }
 }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


Mime
View raw message