tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bi...@apache.org
Subject tez git commit: TEZ-2310. Deadlock caused by StateChangeNotifier sending notifications on thread holding locks (bikas)
Date Fri, 17 Apr 2015 08:59:50 GMT
Repository: tez
Updated Branches:
  refs/heads/master 3e6fc355c -> 8386cca03


TEZ-2310. Deadlock caused by StateChangeNotifier sending notifications on thread holding locks
(bikas)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/8386cca0
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/8386cca0
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/8386cca0

Branch: refs/heads/master
Commit: 8386cca03ab89248765b2ac87dd78a50ca84ff3c
Parents: 3e6fc35
Author: Bikas Saha <bikas@apache.org>
Authored: Fri Apr 17 01:59:41 2015 -0700
Committer: Bikas Saha <bikas@apache.org>
Committed: Fri Apr 17 01:59:41 2015 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |  2 +
 .../java/org/apache/tez/dag/app/dag/DAG.java    |  4 +
 .../tez/dag/app/dag/StateChangeNotifier.java    | 99 ++++++++++++++++++--
 .../apache/tez/dag/app/dag/impl/DAGImpl.java    |  6 +-
 .../tez/dag/app/dag/impl/VertexManager.java     |  4 +-
 .../dag/app/rm/TaskSchedulerEventHandler.java   |  2 -
 .../dag/app/dag/TestStateChangeNotifier.java    | 57 +++++++++--
 .../tez/dag/app/dag/impl/TestDAGImpl.java       |  8 +-
 .../tez/dag/app/dag/impl/TestVertexImpl.java    |  8 +-
 9 files changed, 164 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/8386cca0/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 1a7609f..b0ce3cf 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -9,6 +9,8 @@ INCOMPATIBLE CHANGES
   TEZ-1993. Implement a pluggable InputSizeEstimator for grouping fairly
 
 ALL CHANGES:
+  TEZ-2310. Deadlock caused by StateChangeNotifier sending notifications on
+  thread holding locks
   TEZ-1969. Stop the DAGAppMaster when a local mode client is stopped
   TEZ-714. OutputCommitters should not run in the main AM dispatcher thread
   TEZ-2323. Fix TestOrderedWordcount to use MR memory configs.

http://git-wip-us.apache.org/repos/asf/tez/blob/8386cca0/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java
index 1b64754..4c3426a 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.tez.common.counters.TezCounters;
 import org.apache.tez.dag.api.TezException;
 import org.apache.tez.dag.api.client.DAGStatusBuilder;
@@ -55,6 +56,9 @@ public interface DAG {
    * @return job-counters and aggregate task-counters
    */
   TezCounters getAllCounters();
+  
+  @SuppressWarnings("rawtypes")
+  EventHandler getEventHandler();
 
   /**
    * Get Vertex by vertex name

http://git-wip-us.apache.org/repos/asf/tez/blob/8386cca0/tez-dag/src/main/java/org/apache/tez/dag/app/dag/StateChangeNotifier.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/StateChangeNotifier.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/StateChangeNotifier.java
index d2b298b..260cbf3 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/StateChangeNotifier.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/StateChangeNotifier.java
@@ -22,19 +22,27 @@ package org.apache.tez.dag.app.dag;
 import java.util.EnumSet;
 import java.util.List;
 import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.HashMultimap;
 import com.google.common.collect.LinkedListMultimap;
 import com.google.common.collect.ListMultimap;
 import com.google.common.collect.Multimaps;
 import com.google.common.collect.SetMultimap;
+
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.tez.dag.api.TezUncheckedException;
 import org.apache.tez.dag.api.event.VertexStateUpdate;
+import org.apache.tez.dag.app.dag.event.DAGEvent;
+import org.apache.tez.dag.app.dag.event.DAGEventType;
 import org.apache.tez.dag.records.TezTaskID;
 import org.apache.tez.dag.records.TezVertexID;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Tracks status updates from various components, and informs registered components about
updates.
@@ -42,18 +50,88 @@ import org.apache.tez.dag.records.TezVertexID;
 @InterfaceAudience.Private
 public class StateChangeNotifier {
 
+  private static final Logger LOG = LoggerFactory.getLogger(StateChangeNotifier.class);
+  
   private final DAG dag;
   private final SetMultimap<TezVertexID, ListenerContainer> vertexListeners;
   private final ListMultimap<TezVertexID, VertexStateUpdate> lastKnowStatesMap;
   private final ReentrantReadWriteLock listenersLock = new ReentrantReadWriteLock();
-  private final ReentrantReadWriteLock.ReadLock readLock = listenersLock.readLock();
   private final ReentrantReadWriteLock.WriteLock writeLock = listenersLock.writeLock();
 
+  BlockingQueue<NotificationEvent> eventQueue = new LinkedBlockingQueue<NotificationEvent>();
+  private Thread eventHandlingThread;
+  private volatile boolean stopEventHandling = false;
+  
+  private static class NotificationEvent {
+    final VertexStateUpdate update;
+    final VertexStateUpdateListener listener;
+    
+    public NotificationEvent(VertexStateUpdate update, VertexStateUpdateListener listener)
{
+      this.update = update;
+      this.listener = listener;
+    }
+    
+    void sentUpdate() {
+      listener.onStateUpdated(update);
+    }
+    
+    @Override
+    public String toString() {
+      return "[ VertexState:(" + update + ") Listener:" + listener + " ]";
+    }
+  }
+
   public StateChangeNotifier(DAG dag) {
     this.dag = dag;
     this.vertexListeners = Multimaps.synchronizedSetMultimap(
         HashMultimap.<TezVertexID, ListenerContainer>create());
     this.lastKnowStatesMap = LinkedListMultimap.create();
+    startThread();
+  }
+  
+  private void startThread() {
+    this.eventHandlingThread = new Thread("State Change Notifier DAG: " + dag.getID()) {
+      @SuppressWarnings("unchecked")
+      @Override
+      public void run() {
+        while (!stopEventHandling && !Thread.currentThread().isInterrupted()) {
+          NotificationEvent event;
+          try {
+            event = eventQueue.take();
+          } catch (InterruptedException e) {
+            if(!stopEventHandling) {
+              LOG.warn("Continuing after interrupt : ", e);
+            }
+            continue;
+          }
+          try {
+            event.sentUpdate();
+            processedEventFromQueue();
+          } catch (Exception e) {
+            // TODO send user code exception - TEZ-2332
+            LOG.error("Error in state update notification for " + event, e);
+            dag.getEventHandler().handle(new DAGEvent(dag.getID(), DAGEventType.INTERNAL_ERROR));
+            return;
+          }
+        }
+      }
+    };
+    this.eventHandlingThread.setDaemon(true); // dont block exit on this
+    this.eventHandlingThread.start();
+  }
+  
+  @VisibleForTesting
+  protected void processedEventFromQueue() {
+  }
+  
+  @VisibleForTesting
+  protected void addedEventToQueue() {
+  }
+  
+  public void stop() {
+    this.stopEventHandling = true;
+    if (eventHandlingThread != null)
+      eventHandlingThread.interrupt();
   }
 
   // -------------- VERTEX STATE CHANGE SECTION ---------------
@@ -99,14 +177,14 @@ public class StateChangeNotifier {
   }
 
   public void stateChanged(TezVertexID vertexId, VertexStateUpdate vertexStateUpdate) {
-    readLock.lock();
+    writeLock.lock();
     try {
       lastKnowStatesMap.put(vertexId, vertexStateUpdate);
       if (vertexListeners.containsKey(vertexId)) {
         sendStateUpdate(vertexId, vertexStateUpdate);
       }
     } finally {
-      readLock.unlock();
+      writeLock.unlock();
     }
   }
 
@@ -115,11 +193,18 @@ public class StateChangeNotifier {
     for (ListenerContainer listenerContainer : vertexListeners.get(vertexId)) {
       listenerContainer.sendStateUpdate(event);
     }
-
   }
 
-
-  private static final class ListenerContainer {
+  private void enqueueNotification(NotificationEvent event) {
+    try {
+      eventQueue.put(event);
+      addedEventToQueue();
+    } catch (InterruptedException e) {
+      LOG.error("Failed to put event", e);
+    }
+  }
+  
+  private final class ListenerContainer {
     final VertexStateUpdateListener listener;
     final Set<org.apache.tez.dag.api.event.VertexState> states;
 
@@ -135,7 +220,7 @@ public class StateChangeNotifier {
 
     private void sendStateUpdate(VertexStateUpdate stateUpdate) {
       if (states.contains(stateUpdate.getVertexState())) {
-        listener.onStateUpdated(stateUpdate);
+        enqueueNotification(new NotificationEvent(stateUpdate, listener));
       }
     }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/8386cca0/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
index 1c93dc6..9e55088 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
@@ -180,7 +180,8 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
   private final AppContext appContext;
   private final UserGroupInformation dagUGI;
   private final ACLManager aclManager;
-  private final StateChangeNotifier entityUpdateTracker;
+  @VisibleForTesting
+  StateChangeNotifier entityUpdateTracker;
 
   volatile Map<TezVertexID, Vertex> vertices = new HashMap<TezVertexID, Vertex>();
   @VisibleForTesting
@@ -586,7 +587,8 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
     return jobPlan;
   }
 
-  EventHandler getEventHandler() {
+  @Override
+  public EventHandler getEventHandler() {
     return this.eventHandler;
   }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/8386cca0/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java
index 0be0aaa..bcea22c 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java
@@ -340,7 +340,9 @@ public class VertexManager {
     }
 
     @Override
-    public synchronized void onStateUpdated(VertexStateUpdate event) {
+    public void onStateUpdated(VertexStateUpdate event) {
+      // this is not called by the vertex manager plugin. 
+      // no need to synchronize this. similar to other external notification methods
       enqueueAndScheduleNextEvent(new VertexManagerEventOnVertexStateUpdate(event));
     }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/8386cca0/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
index 0985d58..250446d 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
@@ -19,8 +19,6 @@
 package org.apache.tez.dag.app.rm;
 
 import java.net.InetSocketAddress;
-import java.net.URI;
-import java.net.URISyntaxException;
 import java.nio.ByteBuffer;
 import java.util.List;
 import java.util.Map;

http://git-wip-us.apache.org/repos/asf/tez/blob/8386cca0/tez-dag/src/test/java/org/apache/tez/dag/app/dag/TestStateChangeNotifier.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/TestStateChangeNotifier.java
b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/TestStateChangeNotifier.java
index 6a505ef..e6d1c31 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/TestStateChangeNotifier.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/TestStateChangeNotifier.java
@@ -30,18 +30,60 @@ import static org.mockito.Mockito.verify;
 import java.util.EnumSet;
 import java.util.Iterator;
 import java.util.List;
-
+import java.util.concurrent.atomic.AtomicInteger;
 import com.google.common.collect.Lists;
+
 import org.apache.tez.dag.api.TezUncheckedException;
 import org.apache.tez.dag.api.event.VertexState;
 import org.apache.tez.dag.api.event.VertexStateUpdate;
 import org.apache.tez.dag.api.event.VertexStateUpdateParallelismUpdated;
 import org.apache.tez.dag.records.TezDAGID;
 import org.apache.tez.dag.records.TezVertexID;
+import org.junit.Assert;
 import org.junit.Test;
 import org.mockito.ArgumentCaptor;
 
 public class TestStateChangeNotifier {
+  
+  // uses the thread based notification code path but effectively blocks update
+  // events till listeners have been notified
+  public static class StateChangeNotifierForTest extends StateChangeNotifier {
+    AtomicInteger count = new AtomicInteger(0);
+    AtomicInteger totalCount = new AtomicInteger(0);
+    
+    public StateChangeNotifierForTest(DAG dag) {
+      super(dag);
+    }
+
+    public void reset() {
+      count.set(0);
+      totalCount.set(0);
+    }
+    
+    @Override
+    protected void processedEventFromQueue() {
+      synchronized (count) {
+        if (count.decrementAndGet() == 0) {
+          count.notifyAll();
+        }
+      }
+    }
+    
+    @Override
+    protected void addedEventToQueue() {
+      totalCount.incrementAndGet();
+      synchronized (count) {
+        // processing may finish by the time we get here
+        if (count.incrementAndGet() > 0) {
+          try {
+            count.wait();
+          } catch (InterruptedException e) {
+            e.printStackTrace();
+          }
+        }
+      }
+    }
+  }
 
   @Test(timeout = 5000)
   public void testEventsOnRegistration() {
@@ -51,7 +93,7 @@ public class TestStateChangeNotifier {
     Vertex v3 = createMockVertex(dagId, 3);
     DAG dag = createMockDag(dagId, v1, v2, v3);
 
-    StateChangeNotifier tracker = new StateChangeNotifier(dag);
+    StateChangeNotifierForTest tracker = new StateChangeNotifierForTest(dag);
 
     // Vertex has sent one event
     notifyTracker(tracker, v1, VertexState.RUNNING);
@@ -72,7 +114,6 @@ public class TestStateChangeNotifier {
         VertexState.SUCCEEDED), mockListener14);
     ArgumentCaptor<VertexStateUpdate> argumentCaptor =
         ArgumentCaptor.forClass(VertexStateUpdate.class);
-
     verify(mockListener11, times(1)).onStateUpdated(argumentCaptor.capture());
     assertEquals(VertexState.RUNNING,
         argumentCaptor.getValue().getVertexState());
@@ -85,8 +126,10 @@ public class TestStateChangeNotifier {
     verify(mockListener14, never()).onStateUpdated(any(VertexStateUpdate.class));
 
     // Vertex has not notified of state
+    tracker.reset();
     VertexStateUpdateListener mockListener2 = mock(VertexStateUpdateListener.class);
     tracker.registerForVertexUpdates(v2.getName(), null, mockListener2);
+    Assert.assertEquals(0, tracker.totalCount.get()); // there should no be any event sent
out
     verify(mockListener2, never()).onStateUpdated(any(VertexStateUpdate.class));
 
     // Vertex has notified about parallelism update only
@@ -104,7 +147,7 @@ public class TestStateChangeNotifier {
     Vertex v1 = createMockVertex(dagId, 1);
     DAG dag = createMockDag(dagId, v1);
 
-    StateChangeNotifier tracker = new StateChangeNotifier(dag);
+    StateChangeNotifierForTest tracker = new StateChangeNotifierForTest(dag);
 
     VertexStateUpdateListener mockListener = mock(VertexStateUpdateListener.class);
     tracker.registerForVertexUpdates(v1.getName(), null, mockListener);
@@ -139,7 +182,7 @@ public class TestStateChangeNotifier {
     Vertex v1 = createMockVertex(dagId, 1);
     DAG dag = createMockDag(dagId, v1);
 
-    StateChangeNotifier tracker = new StateChangeNotifier(dag);
+    StateChangeNotifierForTest tracker = new StateChangeNotifierForTest(dag);
     VertexStateUpdateListener mockListener = mock(VertexStateUpdateListener.class);
 
     tracker.registerForVertexUpdates(v1.getName(), null, mockListener);
@@ -157,7 +200,7 @@ public class TestStateChangeNotifier {
     Vertex v1 = createMockVertex(dagId, 1);
     DAG dag = createMockDag(dagId, v1);
 
-    StateChangeNotifier tracker = new StateChangeNotifier(dag);
+    StateChangeNotifierForTest tracker = new StateChangeNotifierForTest(dag);
 
     VertexStateUpdateListener mockListener = mock(VertexStateUpdateListener.class);
     tracker.registerForVertexUpdates(v1.getName(), EnumSet.of(
@@ -199,7 +242,7 @@ public class TestStateChangeNotifier {
     Vertex v1 = createMockVertex(dagId, 1);
     DAG dag = createMockDag(dagId, v1);
 
-    StateChangeNotifier tracker = new StateChangeNotifier(dag);
+    StateChangeNotifierForTest tracker = new StateChangeNotifierForTest(dag);
 
     VertexStateUpdateListener mockListener = mock(VertexStateUpdateListener.class);
     tracker.registerForVertexUpdates(v1.getName(), null, mockListener);

http://git-wip-us.apache.org/repos/asf/tez/blob/8386cca0/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
index 7e944ef..228d6b8 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
@@ -33,9 +33,7 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.Callable;
-import java.util.concurrent.Executors;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.ReentrantLock;
 
@@ -46,7 +44,6 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.event.DrainDispatcher;
 import org.apache.hadoop.yarn.event.EventHandler;
@@ -97,6 +94,7 @@ import org.apache.tez.dag.app.dag.DAGTerminationCause;
 import org.apache.tez.dag.app.dag.Task;
 import org.apache.tez.dag.app.dag.TaskAttempt;
 import org.apache.tez.dag.app.dag.TaskAttemptStateInternal;
+import org.apache.tez.dag.app.dag.TestStateChangeNotifier.StateChangeNotifierForTest;
 import org.apache.tez.dag.app.dag.Vertex;
 import org.apache.tez.dag.app.dag.VertexState;
 import org.apache.tez.dag.app.dag.VertexTerminationCause;
@@ -786,6 +784,7 @@ public class TestDAGImpl {
     dag = new DAGImpl(dagId, conf, dagPlan,
         dispatcher.getEventHandler(),  taskAttemptListener,
         fsTokens, clock, "user", thh, appContext);
+    dag.entityUpdateTracker = new StateChangeNotifierForTest(dag);
     doReturn(dag).when(appContext).getCurrentDAG();
     mrrAppContext = mock(AppContext.class);
     doReturn(aclManager).when(mrrAppContext).getAMACLManager();
@@ -796,6 +795,7 @@ public class TestDAGImpl {
         dispatcher.getEventHandler(),  taskAttemptListener,
         fsTokens, clock, "user", thh,
         mrrAppContext);
+    mrrDag.entityUpdateTracker = new StateChangeNotifierForTest(mrrDag);
     doReturn(conf).when(mrrAppContext).getAMConf();
     doReturn(mrrDag).when(mrrAppContext).getCurrentDAG();
     doReturn(appAttemptId).when(mrrAppContext).getApplicationAttemptId();
@@ -810,6 +810,7 @@ public class TestDAGImpl {
         dispatcher.getEventHandler(),  taskAttemptListener,
         fsTokens, clock, "user", thh,
         groupAppContext);
+    groupDag.entityUpdateTracker = new StateChangeNotifierForTest(groupDag);
     doReturn(conf).when(groupAppContext).getAMConf();
     doReturn(groupDag).when(groupAppContext).getCurrentDAG();
     doReturn(appAttemptId).when(groupAppContext).getApplicationAttemptId();
@@ -858,6 +859,7 @@ public class TestDAGImpl {
     dagWithCustomEdge = new DAGImpl(dagWithCustomEdgeId, conf, dagPlanWithCustomEdge,
         dispatcher.getEventHandler(),  taskAttemptListener,
         fsTokens, clock, "user", thh, dagWithCustomEdgeAppContext);
+    dagWithCustomEdge.entityUpdateTracker = new StateChangeNotifierForTest(dagWithCustomEdge);
     doReturn(conf).when(dagWithCustomEdgeAppContext).getAMConf();
     doReturn(execService).when(dagWithCustomEdgeAppContext).getExecService();
     doReturn(dagWithCustomEdge).when(dagWithCustomEdgeAppContext).getCurrentDAG();

http://git-wip-us.apache.org/repos/asf/tez/blob/8386cca0/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
index 891da23..c752965 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
@@ -114,6 +114,7 @@ import org.apache.tez.dag.app.dag.RootInputInitializerManager;
 import org.apache.tez.dag.app.dag.StateChangeNotifier;
 import org.apache.tez.dag.app.dag.Task;
 import org.apache.tez.dag.app.dag.TaskAttemptStateInternal;
+import org.apache.tez.dag.app.dag.TestStateChangeNotifier.StateChangeNotifierForTest;
 import org.apache.tez.dag.app.dag.Vertex;
 import org.apache.tez.dag.app.dag.VertexState;
 import org.apache.tez.dag.app.dag.VertexStateUpdateListener;
@@ -221,7 +222,7 @@ public class TestVertexImpl {
   private VertexEventDispatcher vertexEventDispatcher;
   private DagEventDispatcher dagEventDispatcher;
   private HistoryEventHandler historyEventHandler;
-  private StateChangeNotifier updateTracker;
+  private StateChangeNotifierForTest updateTracker;
   private static TaskSpecificLaunchCmdOption taskSpecificLaunchCmdOption;
 
   public static class CountingOutputCommitter extends OutputCommitter {
@@ -2174,7 +2175,7 @@ public class TestVertexImpl {
     for (PlanVertexGroupInfo groupInfo : dagPlan.getVertexGroupsList()) {
       vertexGroups.put(groupInfo.getGroupName(), new VertexGroupInfo(groupInfo));
     }
-    updateTracker = new StateChangeNotifier(appContext.getCurrentDAG());
+    updateTracker = new StateChangeNotifierForTest(appContext.getCurrentDAG());
     setupVertices();
     when(dag.getVertex(any(TezVertexID.class))).thenAnswer(new Answer<Vertex>() {
       @Override
@@ -2243,6 +2244,7 @@ public class TestVertexImpl {
 
   @After
   public void teardown() {
+    updateTracker.stop();
     if (dispatcher.isInState(STATE.STARTED)) {
       dispatcher.await();
       dispatcher.stop();
@@ -3734,7 +3736,6 @@ public class TestVertexImpl {
       v1.handle(new VertexEventTaskCompleted(taskId, TaskState.SUCCEEDED));
     }
     dispatcher.await();
-
     Assert.assertEquals(VertexState.SUCCEEDED, v1.getState());
 
     // At this point, 3 events should have been received - since the dispatcher is complete.
@@ -4108,7 +4109,6 @@ public class TestVertexImpl {
     while (v3.getState()  != VertexState.RUNNING) {
       Thread.sleep(10);
     }
-
     Assert.assertEquals(VertexState.RUNNING, v3.getState());
     // Events should have been cleared from the vertex.
     Assert.assertEquals(0, v3.pendingInitializerEvents.size());


Mime
View raw message