Return-Path: X-Original-To: apmail-tez-commits-archive@minotaur.apache.org Delivered-To: apmail-tez-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id E6FBA10AF0 for ; Tue, 30 Dec 2014 02:43:39 +0000 (UTC) Received: (qmail 44551 invoked by uid 500); 30 Dec 2014 02:43:40 -0000 Delivered-To: apmail-tez-commits-archive@tez.apache.org Received: (qmail 44508 invoked by uid 500); 30 Dec 2014 02:43:40 -0000 Mailing-List: contact commits-help@tez.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@tez.apache.org Delivered-To: mailing list commits@tez.apache.org Received: (qmail 44499 invoked by uid 99); 30 Dec 2014 02:43:40 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 30 Dec 2014 02:43:40 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id F30809D0EB5; Tue, 30 Dec 2014 02:43:39 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: bikas@apache.org To: commits@tez.apache.org Message-Id: <5a41b15e3835466e82b83f0477ddf70c@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: tez git commit: TEZ-1867. Create new central dispatcher for Tez AM (bikas) Date: Tue, 30 Dec 2014 02:43:39 +0000 (UTC) Repository: tez Updated Branches: refs/heads/master 7d51844df -> dcd73b38a TEZ-1867. Create new central dispatcher for Tez AM (bikas) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/dcd73b38 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/dcd73b38 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/dcd73b38 Branch: refs/heads/master Commit: dcd73b38a5e7ee75ad711d2122e4adf64e37174b Parents: 7d51844 Author: Bikas Saha Authored: Mon Dec 29 18:43:28 2014 -0800 Committer: Bikas Saha Committed: Mon Dec 29 18:43:28 2014 -0800 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../org/apache/tez/common/AsyncDispatcher.java | 320 +++++++++++++++++++ .../apache/tez/common/TestAsyncDispatcher.java | 124 +++++++ .../org/apache/tez/dag/app/DAGAppMaster.java | 39 +-- 4 files changed, 460 insertions(+), 24 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/dcd73b38/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 0b90d74..2590b75 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -7,6 +7,7 @@ Release 0.7.0: Unreleased INCOMPATIBLE CHANGES ALL CHANGES: + TEZ-1867. Create new central dispatcher for Tez AM TEZ-1844. Shouldn't invoke system.exit in local mode when AM is failed to start. TEZ-1889. Fix test-patch to provide correct findbugs report. TEZ-1313. Setup pre-commit build to test submitted patches. http://git-wip-us.apache.org/repos/asf/tez/blob/dcd73b38/tez-common/src/main/java/org/apache/tez/common/AsyncDispatcher.java ---------------------------------------------------------------------- diff --git a/tez-common/src/main/java/org/apache/tez/common/AsyncDispatcher.java b/tez-common/src/main/java/org/apache/tez/common/AsyncDispatcher.java new file mode 100644 index 0000000..c23d669 --- /dev/null +++ b/tez-common/src/main/java/org/apache/tez/common/AsyncDispatcher.java @@ -0,0 +1,320 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.common; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Evolving; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.service.CompositeService; +import org.apache.hadoop.util.ShutdownHookManager; +import org.apache.hadoop.yarn.event.Dispatcher; +import org.apache.hadoop.yarn.event.Event; +import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Maps; + +/** + * Dispatches {@link Event}s in a separate thread. Currently only single thread + * does that. Potentially there could be multiple channels for each event type + * class and a thread pool can be used to dispatch the events. + */ +@SuppressWarnings({"rawtypes", "unchecked"}) +@Public +@Evolving +public class AsyncDispatcher extends CompositeService implements Dispatcher { + + private static final Log LOG = LogFactory.getLog(AsyncDispatcher.class); + + private final String name; + private final BlockingQueue eventQueue; + private volatile boolean stopped = false; + + // Configuration flag for enabling/disabling draining dispatcher's events on + // stop functionality. + private volatile boolean drainEventsOnStop = false; + + // Indicates all the remaining dispatcher's events on stop have been drained + // and processed. + private volatile boolean drained = true; + private Object waitForDrained = new Object(); + + // For drainEventsOnStop enabled only, block newly coming events into the + // queue while stopping. + private volatile boolean blockNewEvents = false; + private EventHandler handlerInstance = new GenericEventHandler(); + + private Thread eventHandlingThread; + protected final Map, EventHandler> eventHandlers; + protected final Map, AsyncDispatcher> eventDispatchers; + private boolean exitOnDispatchException; + + public AsyncDispatcher(String name) { + this(name, new LinkedBlockingQueue()); + } + + public AsyncDispatcher(String name, BlockingQueue eventQueue) { + super("Dispatcher"); + this.name = name; + this.eventQueue = eventQueue; + this.eventHandlers = Maps.newHashMap(); + this.eventDispatchers = Maps.newHashMap(); + } + + Runnable createThread() { + return new Runnable() { + @Override + public void run() { + while (!stopped && !Thread.currentThread().isInterrupted()) { + drained = eventQueue.isEmpty(); + // blockNewEvents is only set when dispatcher is draining to stop, + // adding this check is to avoid the overhead of acquiring the lock + // and calling notify every time in the normal run of the loop. + if (blockNewEvents) { + synchronized (waitForDrained) { + if (drained) { + waitForDrained.notify(); + } + } + } + Event event; + try { + event = eventQueue.take(); + } catch(InterruptedException ie) { + if (!stopped) { + LOG.warn("AsyncDispatcher thread interrupted", ie); + } + return; + } + if (event != null) { + dispatch(event); + } + } + } + }; + } + + @Override + protected void serviceInit(Configuration conf) throws Exception { + this.exitOnDispatchException = + conf.getBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY, + Dispatcher.DEFAULT_DISPATCHER_EXIT_ON_ERROR); + super.serviceInit(conf); + } + + @Override + protected void serviceStart() throws Exception { + eventHandlingThread = new Thread(createThread()); + eventHandlingThread.setName("Dispatcher thread: " + name); + eventHandlingThread.start(); + + //start all the components + super.serviceStart(); + } + + public void setDrainEventsOnStop() { + drainEventsOnStop = true; + } + + @Override + protected void serviceStop() throws Exception { + if (drainEventsOnStop) { + blockNewEvents = true; + LOG.info("AsyncDispatcher is draining to stop, ignoring any new events."); + synchronized (waitForDrained) { + while (!drained && eventHandlingThread.isAlive()) { + waitForDrained.wait(1000); + LOG.info("Waiting for AsyncDispatcher to drain."); + } + } + + } + stopped = true; + if (eventHandlingThread != null) { + eventHandlingThread.interrupt(); + try { + eventHandlingThread.join(); + } catch (InterruptedException ie) { + LOG.warn("Interrupted Exception while stopping", ie); + } + } + + // stop all the components + super.serviceStop(); + } + + protected void dispatch(Event event) { + //all events go thru this loop + if (LOG.isDebugEnabled()) { + LOG.debug("Dispatching the event " + event.getClass().getName() + "." + + event.toString()); + } + + Class type = event.getType().getDeclaringClass(); + + try{ + EventHandler handler = eventHandlers.get(type); + if(handler != null) { + handler.handle(event); + } else { + throw new Exception("No handler for registered for " + type); + } + } catch (Throwable t) { + LOG.fatal("Error in dispatcher thread", t); + // If serviceStop is called, we should exit this thread gracefully. + if (exitOnDispatchException + && (ShutdownHookManager.get().isShutdownInProgress()) == false + && stopped == false) { + LOG.info("Exiting, bbye.."); + System.exit(-1); + } + } + } + + /** + * Add an EventHandler for events handled inline on this dispatcher + */ + @Override + public void register(Class eventType, + EventHandler handler) { + Preconditions.checkState(getServiceState() == STATE.NOTINITED); + /* check to see if we have a listener registered */ + EventHandler registeredHandler = (EventHandler) eventHandlers.get(eventType); + AsyncDispatcher registeredDispatcher = eventDispatchers.get(eventType); + Preconditions.checkState(registeredDispatcher == null, + "Cannot register same event on multiple dispatchers"); + LOG.info("Registering " + eventType + " for " + handler.getClass()); + if (registeredHandler == null) { + eventHandlers.put(eventType, handler); + } else if (!(registeredHandler instanceof MultiListenerHandler)){ + /* for multiple listeners of an event add the multiple listener handler */ + MultiListenerHandler multiHandler = new MultiListenerHandler(); + multiHandler.addHandler(registeredHandler); + multiHandler.addHandler(handler); + eventHandlers.put(eventType, multiHandler); + } else { + /* already a multilistener, just add to it */ + MultiListenerHandler multiHandler + = (MultiListenerHandler) registeredHandler; + multiHandler.addHandler(handler); + } + } + + /** + * Add an EventHandler for events handled in their own dispatchers with given name + */ + public void registerAndCreateDispatcher(Class eventType, + EventHandler handler, String dispatcherName) { + Preconditions.checkState(getServiceState() == STATE.NOTINITED); + AsyncDispatcher dispatcher = new AsyncDispatcher(dispatcherName); + dispatcher.register(eventType, handler); + + /* check to see if we have a listener registered */ + AsyncDispatcher registeredDispatcher = eventDispatchers.get(eventType); + EventHandler registeredHandler = (EventHandler) eventHandlers.get(eventType); + Preconditions.checkState(registeredHandler == null, + "Cannot register same event on multiple dispatchers"); + LOG.info("Registering " + eventType + " for independent dispatch using: " + handler.getClass()); + Preconditions.checkState(registeredDispatcher == null, + "Multiple dispatchers cannot be registered for: " + eventType.getName()); + eventDispatchers.put(eventType, dispatcher); + addIfService(dispatcher); + } + + @Override + public EventHandler getEventHandler() { + return handlerInstance; + } + + class GenericEventHandler implements EventHandler { + public void handle(Event event) { + if (stopped) { + return; + } + if (blockNewEvents) { + return; + } + drained = false; + + // offload to specific dispatcher is one exists + Class type = event.getType().getDeclaringClass(); + AsyncDispatcher registeredDispatcher = eventDispatchers.get(type); + if (registeredDispatcher != null) { + registeredDispatcher.getEventHandler().handle(event); + return; + } + + // no registered dispatcher. use internal dispatcher. + + /* all this method does is enqueue all the events onto the queue */ + int qSize = eventQueue.size(); + if (qSize !=0 && qSize %1000 == 0) { + LOG.info("Size of event-queue is " + qSize); + } + int remCapacity = eventQueue.remainingCapacity(); + if (remCapacity < 1000) { + LOG.warn("Very low remaining capacity in the event-queue: " + + remCapacity); + } + try { + eventQueue.put(event); + } catch (InterruptedException e) { + if (!stopped) { + LOG.warn("AsyncDispatcher thread interrupted", e); + } + throw new YarnRuntimeException(e); + } + }; + } + + /** + * Multiplexing an event. Sending it to different handlers that + * are interested in the event. + * @param the type of event these multiple handlers are interested in. + */ + static class MultiListenerHandler implements EventHandler { + List> listofHandlers; + + public MultiListenerHandler() { + listofHandlers = new ArrayList>(); + } + + @Override + public void handle(Event event) { + for (EventHandler handler: listofHandlers) { + handler.handle(event); + } + } + + void addHandler(EventHandler handler) { + listofHandlers.add(handler); + } + + } +} http://git-wip-us.apache.org/repos/asf/tez/blob/dcd73b38/tez-common/src/test/java/org/apache/tez/common/TestAsyncDispatcher.java ---------------------------------------------------------------------- diff --git a/tez-common/src/test/java/org/apache/tez/common/TestAsyncDispatcher.java b/tez-common/src/test/java/org/apache/tez/common/TestAsyncDispatcher.java new file mode 100644 index 0000000..ad7f5df --- /dev/null +++ b/tez-common/src/test/java/org/apache/tez/common/TestAsyncDispatcher.java @@ -0,0 +1,124 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.tez.common; + +import java.util.concurrent.CountDownLatch; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.event.AbstractEvent; +import org.apache.hadoop.yarn.event.EventHandler; +import org.junit.Assert; +import org.junit.Test; + +public class TestAsyncDispatcher { + + static class CountDownEventHandler { + static CountDownLatch latch; + public void handle() { + latch.countDown(); + try { + latch.await(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + } + + public enum TestEventType1 { TYPE1 } + public class TestEvent1 extends AbstractEvent { + public TestEvent1(TestEventType1 type) { + super(type); + } + } + class TestEventHandler1 extends CountDownEventHandler implements EventHandler { + @Override + public void handle(TestEvent1 event) { + handle(); + } + } + public enum TestEventType2 { TYPE2 } + public class TestEvent2 extends AbstractEvent { + public TestEvent2(TestEventType2 type) { + super(type); + } + } + class TestEventHandler2 extends CountDownEventHandler implements EventHandler { + @Override + public void handle(TestEvent2 event) { + handle(); + } + } + public enum TestEventType3 { TYPE3 } + public class TestEvent3 extends AbstractEvent { + public TestEvent3(TestEventType3 type) { + super(type); + } + } + class TestEventHandler3 extends CountDownEventHandler implements EventHandler { + @Override + public void handle(TestEvent3 event) { + handle(); + } + } + + @SuppressWarnings("unchecked") + @Test (timeout=5000) + public void testBasic() throws Exception { + CountDownLatch latch = new CountDownLatch(4); + CountDownEventHandler.latch = latch; + + AsyncDispatcher central = new AsyncDispatcher("Type1"); + central.register(TestEventType1.class, new TestEventHandler1()); + central.registerAndCreateDispatcher(TestEventType2.class, new TestEventHandler2(), "Type2"); + central.registerAndCreateDispatcher(TestEventType3.class, new TestEventHandler3(), "Type3"); + + central.init(new Configuration()); + central.start(); + central.getEventHandler().handle(new TestEvent1(TestEventType1.TYPE1)); + central.getEventHandler().handle(new TestEvent2(TestEventType2.TYPE2)); + central.getEventHandler().handle(new TestEvent3(TestEventType3.TYPE3)); + latch.countDown(); + latch.await(); + central.close(); + } + + @Test (timeout=5000) + public void testMultipleRegisterFail() throws Exception { + AsyncDispatcher central = new AsyncDispatcher("Type1"); + try { + central.register(TestEventType1.class, new TestEventHandler1()); + central.registerAndCreateDispatcher(TestEventType1.class, new TestEventHandler2(), "Type2"); + Assert.fail(); + } catch (IllegalStateException e) { + Assert.assertTrue(e.getMessage().contains("Cannot register same event on multiple dispatchers")); + } finally { + central.close(); + } + + central = new AsyncDispatcher("Type1"); + try { + central.registerAndCreateDispatcher(TestEventType1.class, new TestEventHandler2(), "Type2"); + central.register(TestEventType1.class, new TestEventHandler1()); + Assert.fail(); + } catch (IllegalStateException e) { + Assert.assertTrue(e.getMessage().contains("Cannot register same event on multiple dispatchers")); + } finally { + central.close(); + } + } +} http://git-wip-us.apache.org/repos/asf/tez/blob/dcd73b38/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java index 0699529..1347c23 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java @@ -80,7 +80,6 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.event.AsyncDispatcher; import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.Event; import org.apache.hadoop.yarn.event.EventHandler; @@ -88,6 +87,7 @@ import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.util.Clock; import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.hadoop.yarn.util.SystemClock; +import org.apache.tez.common.AsyncDispatcher; import org.apache.tez.common.TezCommonUtils; import org.apache.tez.common.TezConverterUtils; import org.apache.tez.common.TezUtilsInternal; @@ -209,8 +209,7 @@ public class DAGAppMaster extends AbstractService { private AMNodeTracker nodes; private AppContext context; private Configuration amConf; - private Dispatcher dispatcher; - private Dispatcher speculatorDispatcher; + private AsyncDispatcher dispatcher; private ContainerLauncher containerLauncher; private ContainerHeartbeatHandler containerHeartbeatHandler; private TaskHeartbeatHandler taskHeartbeatHandler; @@ -406,9 +405,7 @@ public class DAGAppMaster extends AbstractService { dispatcher.register(TaskAttemptEventType.class, new TaskAttemptEventDispatcher()); // register other delegating dispatchers - this.speculatorDispatcher = createSpeculatorEventDispatcher(); - addIfService(speculatorDispatcher, true); - dispatcher.register(SpeculatorEventType.class, speculatorDispatcher.getEventHandler()); + dispatcher.registerAndCreateDispatcher(SpeculatorEventType.class, new SpeculatorEventHandler(), "Speculator"); this.taskSchedulerEventHandler = new TaskSchedulerEventHandler(context, clientRpcServer, dispatcher.getEventHandler(), containerSignatureMatcher); @@ -491,8 +488,8 @@ public class DAGAppMaster extends AbstractService { } @VisibleForTesting - protected Dispatcher createDispatcher() { - return new AsyncDispatcher(); + protected AsyncDispatcher createDispatcher() { + return new AsyncDispatcher("Central"); } /** @@ -1713,22 +1710,16 @@ public class DAGAppMaster extends AbstractService { } } - AsyncDispatcher createSpeculatorEventDispatcher() { - AsyncDispatcher dispatcher = new AsyncDispatcher(); - dispatcher.register(SpeculatorEventType.class, - new EventHandler() { - @Override - public void handle(SpeculatorEvent event) { - DAG dag = context.getCurrentDAG(); - TezVertexID vertexId = event.getVertexId(); - Vertex v = dag.getVertex(vertexId); - Preconditions.checkState(v != null, - "Unknown vertex: " + vertexId + " for DAG: " + dag.getID()); - v.handleSpeculatorEvent(event); - } - } - ); - return dispatcher; + private class SpeculatorEventHandler implements EventHandler { + @Override + public void handle(SpeculatorEvent event) { + DAG dag = context.getCurrentDAG(); + TezVertexID vertexId = event.getVertexId(); + Vertex v = dag.getVertex(vertexId); + Preconditions.checkState(v != null, + "Unknown vertex: " + vertexId + " for DAG: " + dag.getID()); + v.handleSpeculatorEvent(event); + } } private class TaskAttemptEventDispatcher