tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bi...@apache.org
Subject tez git commit: TEZ-1867. Create new central dispatcher for Tez AM (bikas)
Date Tue, 30 Dec 2014 02:43:39 GMT
Repository: tez
Updated Branches:
  refs/heads/master 7d51844df -> dcd73b38a


TEZ-1867. Create new central dispatcher for Tez AM (bikas)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/dcd73b38
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/dcd73b38
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/dcd73b38

Branch: refs/heads/master
Commit: dcd73b38a5e7ee75ad711d2122e4adf64e37174b
Parents: 7d51844
Author: Bikas Saha <bikas@apache.org>
Authored: Mon Dec 29 18:43:28 2014 -0800
Committer: Bikas Saha <bikas@apache.org>
Committed: Mon Dec 29 18:43:28 2014 -0800

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../org/apache/tez/common/AsyncDispatcher.java  | 320 +++++++++++++++++++
 .../apache/tez/common/TestAsyncDispatcher.java  | 124 +++++++
 .../org/apache/tez/dag/app/DAGAppMaster.java    |  39 +--
 4 files changed, 460 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/dcd73b38/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 0b90d74..2590b75 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@ Release 0.7.0: Unreleased
 INCOMPATIBLE CHANGES
 
 ALL CHANGES:
+  TEZ-1867. Create new central dispatcher for Tez AM
   TEZ-1844. Shouldn't invoke system.exit in local mode when AM is failed to start.
   TEZ-1889. Fix test-patch to provide correct findbugs report.
   TEZ-1313. Setup pre-commit build to test submitted patches.

http://git-wip-us.apache.org/repos/asf/tez/blob/dcd73b38/tez-common/src/main/java/org/apache/tez/common/AsyncDispatcher.java
----------------------------------------------------------------------
diff --git a/tez-common/src/main/java/org/apache/tez/common/AsyncDispatcher.java b/tez-common/src/main/java/org/apache/tez/common/AsyncDispatcher.java
new file mode 100644
index 0000000..c23d669
--- /dev/null
+++ b/tez-common/src/main/java/org/apache/tez/common/AsyncDispatcher.java
@@ -0,0 +1,320 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.common;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Evolving;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.service.CompositeService;
+import org.apache.hadoop.util.ShutdownHookManager;
+import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.event.Event;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
+
+/**
+ * Dispatches {@link Event}s in a separate thread. Currently only single thread
+ * does that. Potentially there could be multiple channels for each event type
+ * class and a thread pool can be used to dispatch the events.
+ */
+@SuppressWarnings({"rawtypes", "unchecked"})
+@Public
+@Evolving
+public class AsyncDispatcher extends CompositeService implements Dispatcher {
+
+  private static final Log LOG = LogFactory.getLog(AsyncDispatcher.class);
+
+  private final String name;
+  private final BlockingQueue<Event> eventQueue;
+  private volatile boolean stopped = false;
+
+  // Configuration flag for enabling/disabling draining dispatcher's events on
+  // stop functionality.
+  private volatile boolean drainEventsOnStop = false;
+
+  // Indicates all the remaining dispatcher's events on stop have been drained
+  // and processed.
+  private volatile boolean drained = true;
+  private Object waitForDrained = new Object();
+
+  // For drainEventsOnStop enabled only, block newly coming events into the
+  // queue while stopping.
+  private volatile boolean blockNewEvents = false;
+  private EventHandler handlerInstance = new GenericEventHandler();
+
+  private Thread eventHandlingThread;
+  protected final Map<Class<? extends Enum>, EventHandler> eventHandlers;
+  protected final Map<Class<? extends Enum>, AsyncDispatcher> eventDispatchers;
+  private boolean exitOnDispatchException;
+
+  public AsyncDispatcher(String name) {
+    this(name, new LinkedBlockingQueue<Event>());
+  }
+
+  public AsyncDispatcher(String name, BlockingQueue<Event> eventQueue) {
+    super("Dispatcher");
+    this.name = name;
+    this.eventQueue = eventQueue;
+    this.eventHandlers = Maps.newHashMap();
+    this.eventDispatchers = Maps.newHashMap();
+  }
+
+  Runnable createThread() {
+    return new Runnable() {
+      @Override
+      public void run() {
+        while (!stopped && !Thread.currentThread().isInterrupted()) {
+          drained = eventQueue.isEmpty();
+          // blockNewEvents is only set when dispatcher is draining to stop,
+          // adding this check is to avoid the overhead of acquiring the lock
+          // and calling notify every time in the normal run of the loop.
+          if (blockNewEvents) {
+            synchronized (waitForDrained) {
+              if (drained) {
+                waitForDrained.notify();
+              }
+            }
+          }
+          Event event;
+          try {
+            event = eventQueue.take();
+          } catch(InterruptedException ie) {
+            if (!stopped) {
+              LOG.warn("AsyncDispatcher thread interrupted", ie);
+            }
+            return;
+          }
+          if (event != null) {
+            dispatch(event);
+          }
+        }
+      }
+    };
+  }
+
+  @Override
+  protected void serviceInit(Configuration conf) throws Exception {
+    this.exitOnDispatchException =
+        conf.getBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY,
+          Dispatcher.DEFAULT_DISPATCHER_EXIT_ON_ERROR);
+    super.serviceInit(conf);
+  }
+
+  @Override
+  protected void serviceStart() throws Exception {
+    eventHandlingThread = new Thread(createThread());
+    eventHandlingThread.setName("Dispatcher thread: " + name);
+    eventHandlingThread.start();
+    
+    //start all the components
+    super.serviceStart();
+  }
+
+  public void setDrainEventsOnStop() {
+    drainEventsOnStop = true;
+  }
+
+  @Override
+  protected void serviceStop() throws Exception {
+    if (drainEventsOnStop) {
+      blockNewEvents = true;
+      LOG.info("AsyncDispatcher is draining to stop, ignoring any new events.");
+      synchronized (waitForDrained) {
+        while (!drained && eventHandlingThread.isAlive()) {
+          waitForDrained.wait(1000);
+          LOG.info("Waiting for AsyncDispatcher to drain.");
+        }
+      }
+      
+    }
+    stopped = true;
+    if (eventHandlingThread != null) {
+      eventHandlingThread.interrupt();
+      try {
+        eventHandlingThread.join();
+      } catch (InterruptedException ie) {
+        LOG.warn("Interrupted Exception while stopping", ie);
+      }
+    }
+
+    // stop all the components
+    super.serviceStop();
+  }
+
+  protected void dispatch(Event event) {
+    //all events go thru this loop
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Dispatching the event " + event.getClass().getName() + "."
+          + event.toString());
+    }
+
+    Class<? extends Enum> type = event.getType().getDeclaringClass();
+
+    try{
+      EventHandler handler = eventHandlers.get(type);
+      if(handler != null) {
+        handler.handle(event);
+      } else {
+        throw new Exception("No handler for registered for " + type);
+      }
+    } catch (Throwable t) {
+      LOG.fatal("Error in dispatcher thread", t);
+      // If serviceStop is called, we should exit this thread gracefully.
+      if (exitOnDispatchException
+          && (ShutdownHookManager.get().isShutdownInProgress()) == false
+          && stopped == false) {
+        LOG.info("Exiting, bbye..");
+        System.exit(-1);
+      }
+    }
+  }
+
+  /**
+   * Add an EventHandler for events handled inline on this dispatcher
+   */
+  @Override
+  public void register(Class<? extends Enum> eventType,
+      EventHandler handler) {
+    Preconditions.checkState(getServiceState() == STATE.NOTINITED);
+    /* check to see if we have a listener registered */
+    EventHandler<Event> registeredHandler = (EventHandler<Event>) eventHandlers.get(eventType);
+    AsyncDispatcher registeredDispatcher = eventDispatchers.get(eventType);
+    Preconditions.checkState(registeredDispatcher == null,
+        "Cannot register same event on multiple dispatchers");
+    LOG.info("Registering " + eventType + " for " + handler.getClass());
+    if (registeredHandler == null) {
+      eventHandlers.put(eventType, handler);
+    } else if (!(registeredHandler instanceof MultiListenerHandler)){
+      /* for multiple listeners of an event add the multiple listener handler */
+      MultiListenerHandler multiHandler = new MultiListenerHandler();
+      multiHandler.addHandler(registeredHandler);
+      multiHandler.addHandler(handler);
+      eventHandlers.put(eventType, multiHandler);
+    } else {
+      /* already a multilistener, just add to it */
+      MultiListenerHandler multiHandler
+      = (MultiListenerHandler) registeredHandler;
+      multiHandler.addHandler(handler);
+    }
+  }
+  
+  /**
+   * Add an EventHandler for events handled in their own dispatchers with given name
+   */
+  public void registerAndCreateDispatcher(Class<? extends Enum> eventType,
+      EventHandler handler, String dispatcherName) {
+    Preconditions.checkState(getServiceState() == STATE.NOTINITED);
+    AsyncDispatcher dispatcher = new AsyncDispatcher(dispatcherName);
+    dispatcher.register(eventType, handler);
+    
+    /* check to see if we have a listener registered */
+    AsyncDispatcher registeredDispatcher = eventDispatchers.get(eventType);
+    EventHandler<Event> registeredHandler = (EventHandler<Event>) eventHandlers.get(eventType);
+    Preconditions.checkState(registeredHandler == null, 
+        "Cannot register same event on multiple dispatchers");
+    LOG.info("Registering " + eventType + " for independent dispatch using: " + handler.getClass());
+    Preconditions.checkState(registeredDispatcher == null, 
+        "Multiple dispatchers cannot be registered for: " + eventType.getName());
+    eventDispatchers.put(eventType, dispatcher);
+    addIfService(dispatcher);
+  }
+
+  @Override
+  public EventHandler getEventHandler() {
+    return handlerInstance;
+  }
+
+  class GenericEventHandler implements EventHandler<Event> {
+    public void handle(Event event) {
+      if (stopped) {
+        return;
+      }
+      if (blockNewEvents) {
+        return;
+      }
+      drained = false;
+
+      // offload to specific dispatcher is one exists
+      Class<? extends Enum> type = event.getType().getDeclaringClass();
+      AsyncDispatcher registeredDispatcher = eventDispatchers.get(type);
+      if (registeredDispatcher != null) {
+        registeredDispatcher.getEventHandler().handle(event);
+        return;
+      }
+      
+      // no registered dispatcher. use internal dispatcher.
+      
+      /* all this method does is enqueue all the events onto the queue */
+      int qSize = eventQueue.size();
+      if (qSize !=0 && qSize %1000 == 0) {
+        LOG.info("Size of event-queue is " + qSize);
+      }
+      int remCapacity = eventQueue.remainingCapacity();
+      if (remCapacity < 1000) {
+        LOG.warn("Very low remaining capacity in the event-queue: "
+            + remCapacity);
+      }
+      try {
+        eventQueue.put(event);
+      } catch (InterruptedException e) {
+        if (!stopped) {
+          LOG.warn("AsyncDispatcher thread interrupted", e);
+        }
+        throw new YarnRuntimeException(e);
+      }
+    };
+  }
+
+  /**
+   * Multiplexing an event. Sending it to different handlers that
+   * are interested in the event.
+   * @param <T> the type of event these multiple handlers are interested in.
+   */
+  static class MultiListenerHandler implements EventHandler<Event> {
+    List<EventHandler<Event>> listofHandlers;
+
+    public MultiListenerHandler() {
+      listofHandlers = new ArrayList<EventHandler<Event>>();
+    }
+
+    @Override
+    public void handle(Event event) {
+      for (EventHandler<Event> handler: listofHandlers) {
+        handler.handle(event);
+      }
+    }
+
+    void addHandler(EventHandler<Event> handler) {
+      listofHandlers.add(handler);
+    }
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/dcd73b38/tez-common/src/test/java/org/apache/tez/common/TestAsyncDispatcher.java
----------------------------------------------------------------------
diff --git a/tez-common/src/test/java/org/apache/tez/common/TestAsyncDispatcher.java b/tez-common/src/test/java/org/apache/tez/common/TestAsyncDispatcher.java
new file mode 100644
index 0000000..ad7f5df
--- /dev/null
+++ b/tez-common/src/test/java/org/apache/tez/common/TestAsyncDispatcher.java
@@ -0,0 +1,124 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.tez.common;
+
+import java.util.concurrent.CountDownLatch;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.event.AbstractEvent;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestAsyncDispatcher {
+
+  static class CountDownEventHandler {
+    static CountDownLatch latch;
+    public void handle() {
+      latch.countDown();
+      try {
+        latch.await();
+      } catch (InterruptedException e) {
+        e.printStackTrace();
+      }
+    }
+  }
+
+  public enum TestEventType1 { TYPE1 }
+  public class TestEvent1 extends AbstractEvent<TestEventType1> {
+    public TestEvent1(TestEventType1 type) {
+      super(type);
+    }
+  }
+  class TestEventHandler1 extends CountDownEventHandler implements EventHandler<TestEvent1>
{
+    @Override
+    public void handle(TestEvent1 event) {
+      handle();
+    }
+  }
+  public enum TestEventType2 { TYPE2 }
+  public class TestEvent2 extends AbstractEvent<TestEventType2> {
+    public TestEvent2(TestEventType2 type) {
+      super(type);
+    }
+  }
+  class TestEventHandler2 extends CountDownEventHandler implements EventHandler<TestEvent2>
{
+    @Override
+    public void handle(TestEvent2 event) {
+      handle();
+    }
+  }
+  public enum TestEventType3 { TYPE3 }
+  public class TestEvent3 extends AbstractEvent<TestEventType3> {
+    public TestEvent3(TestEventType3 type) {
+      super(type);
+    }
+  }
+  class TestEventHandler3 extends CountDownEventHandler implements EventHandler<TestEvent3>
{
+    @Override
+    public void handle(TestEvent3 event) {
+      handle();
+    }
+  }
+  
+  @SuppressWarnings("unchecked")
+  @Test (timeout=5000)
+  public void testBasic() throws Exception {
+    CountDownLatch latch = new CountDownLatch(4);
+    CountDownEventHandler.latch = latch;
+    
+    AsyncDispatcher central = new AsyncDispatcher("Type1");
+    central.register(TestEventType1.class, new TestEventHandler1());
+    central.registerAndCreateDispatcher(TestEventType2.class, new TestEventHandler2(), "Type2");
+    central.registerAndCreateDispatcher(TestEventType3.class, new TestEventHandler3(), "Type3");
+    
+    central.init(new Configuration());
+    central.start();
+    central.getEventHandler().handle(new TestEvent1(TestEventType1.TYPE1));
+    central.getEventHandler().handle(new TestEvent2(TestEventType2.TYPE2));
+    central.getEventHandler().handle(new TestEvent3(TestEventType3.TYPE3));
+    latch.countDown();
+    latch.await();
+    central.close();
+  }
+  
+  @Test (timeout=5000)
+  public void testMultipleRegisterFail() throws Exception {
+    AsyncDispatcher central = new AsyncDispatcher("Type1");
+    try {
+      central.register(TestEventType1.class, new TestEventHandler1());
+      central.registerAndCreateDispatcher(TestEventType1.class, new TestEventHandler2(),
"Type2");
+      Assert.fail();
+    } catch (IllegalStateException e) {
+      Assert.assertTrue(e.getMessage().contains("Cannot register same event on multiple dispatchers"));
+    } finally {
+      central.close();
+    }
+    
+    central = new AsyncDispatcher("Type1");
+    try {
+      central.registerAndCreateDispatcher(TestEventType1.class, new TestEventHandler2(),
"Type2");
+      central.register(TestEventType1.class, new TestEventHandler1());
+      Assert.fail();
+    } catch (IllegalStateException e) {
+      Assert.assertTrue(e.getMessage().contains("Cannot register same event on multiple dispatchers"));
+    } finally {
+      central.close();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/dcd73b38/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
index 0699529..1347c23 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
@@ -80,7 +80,6 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.event.AsyncDispatcher;
 import org.apache.hadoop.yarn.event.Dispatcher;
 import org.apache.hadoop.yarn.event.Event;
 import org.apache.hadoop.yarn.event.EventHandler;
@@ -88,6 +87,7 @@ import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
 import org.apache.hadoop.yarn.util.Clock;
 import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.hadoop.yarn.util.SystemClock;
+import org.apache.tez.common.AsyncDispatcher;
 import org.apache.tez.common.TezCommonUtils;
 import org.apache.tez.common.TezConverterUtils;
 import org.apache.tez.common.TezUtilsInternal;
@@ -209,8 +209,7 @@ public class DAGAppMaster extends AbstractService {
   private AMNodeTracker nodes;
   private AppContext context;
   private Configuration amConf;
-  private Dispatcher dispatcher;
-  private Dispatcher speculatorDispatcher;
+  private AsyncDispatcher dispatcher;
   private ContainerLauncher containerLauncher;
   private ContainerHeartbeatHandler containerHeartbeatHandler;
   private TaskHeartbeatHandler taskHeartbeatHandler;
@@ -406,9 +405,7 @@ public class DAGAppMaster extends AbstractService {
     dispatcher.register(TaskAttemptEventType.class, new TaskAttemptEventDispatcher());
     
     // register other delegating dispatchers
-    this.speculatorDispatcher = createSpeculatorEventDispatcher();
-    addIfService(speculatorDispatcher, true);
-    dispatcher.register(SpeculatorEventType.class, speculatorDispatcher.getEventHandler());
+    dispatcher.registerAndCreateDispatcher(SpeculatorEventType.class, new SpeculatorEventHandler(),
"Speculator");
 
     this.taskSchedulerEventHandler = new TaskSchedulerEventHandler(context,
         clientRpcServer, dispatcher.getEventHandler(), containerSignatureMatcher);
@@ -491,8 +488,8 @@ public class DAGAppMaster extends AbstractService {
   }
   
   @VisibleForTesting
-  protected Dispatcher createDispatcher() {
-    return new AsyncDispatcher();
+  protected AsyncDispatcher createDispatcher() {
+    return new AsyncDispatcher("Central");
   }
 
   /**
@@ -1713,22 +1710,16 @@ public class DAGAppMaster extends AbstractService {
     }
   }
   
-  AsyncDispatcher createSpeculatorEventDispatcher() {
-    AsyncDispatcher dispatcher = new AsyncDispatcher();
-    dispatcher.register(SpeculatorEventType.class, 
-        new EventHandler<SpeculatorEvent>() {
-          @Override
-          public void handle(SpeculatorEvent event) {
-            DAG dag = context.getCurrentDAG();
-            TezVertexID vertexId = event.getVertexId();
-            Vertex v = dag.getVertex(vertexId);
-            Preconditions.checkState(v != null,
-                "Unknown vertex: " + vertexId + " for DAG: " + dag.getID());
-            v.handleSpeculatorEvent(event);
-          }
-        }
-      );
-    return dispatcher;
+  private class SpeculatorEventHandler implements EventHandler<SpeculatorEvent> {
+    @Override
+    public void handle(SpeculatorEvent event) {
+      DAG dag = context.getCurrentDAG();
+      TezVertexID vertexId = event.getVertexId();
+      Vertex v = dag.getVertex(vertexId);
+      Preconditions.checkState(v != null,
+          "Unknown vertex: " + vertexId + " for DAG: " + dag.getID());
+      v.handleSpeculatorEvent(event);
+    }
   }
 
   private class TaskAttemptEventDispatcher


Mime
View raw message