tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From zjf...@apache.org
Subject tez git commit: TEZ-1521. VertexDataMovementEventsGeneratedEvent may be logged twice in recovery log (zjffdu)
Date Thu, 30 Apr 2015 04:24:24 GMT
Repository: tez
Updated Branches:
  refs/heads/master 5f63de8ee -> 3894c5ec6


TEZ-1521. VertexDataMovementEventsGeneratedEvent may be logged twice in recovery log (zjffdu)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/3894c5ec
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/3894c5ec
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/3894c5ec

Branch: refs/heads/master
Commit: 3894c5ec6b707d7fff6381091fbbdf05c89f0f81
Parents: 5f63de8
Author: Jeff Zhang <zjffdu@apache.org>
Authored: Thu Apr 30 11:15:53 2015 +0800
Committer: Jeff Zhang <zjffdu@apache.org>
Committed: Thu Apr 30 11:15:53 2015 +0800

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../apache/tez/dag/app/dag/impl/VertexImpl.java | 11 +++--
 .../tez/dag/app/dag/impl/TestVertexImpl.java    | 50 ++++++++++++++++++++
 3 files changed, 57 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/3894c5ec/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 3a1867e..cfdc679 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -325,6 +325,7 @@ TEZ-UI CHANGES (TEZ-8):
 Release 0.5.4: Unreleased
 
 ALL CHANGES:
+  TEZ-1521. VertexDataMovementEventsGeneratedEvent may be logged twice in recovery log
   TEZ-2348. EOF exception during UnorderedKVReader.next().
   TEZ-1560. Invalid state machine handling for V_SOURCE_VERTEX_RECOVERED in recovery.
   TEZ-2305. MR compatibility sleep job fails with IOException: Undefined job output-path

http://git-wip-us.apache.org/repos/asf/tez/blob/3894c5ec/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index b63466a..c5de19b 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -1397,7 +1397,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
       if (!pendingTaskEvents.isEmpty()) {
         LOG.info("Routing pending task events for vertex: " + logIdentifier);
         try {
-          handleRoutedTezEvents(this, pendingTaskEvents, false);
+          handleRoutedTezEvents(this, pendingTaskEvents, false, true);
         } catch (AMUserCodeException e) {
           String msg = "Exception in " + e.getSource() +", vertex=" + logIdentifier;
           LOG.error(msg, e);
@@ -3025,7 +3025,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
         vertex.recoveredEvents.clear();
         if (!vertex.pendingRouteEvents.isEmpty()) {
           try {
-            handleRoutedTezEvents(vertex, vertex.pendingRouteEvents, false);
+            handleRoutedTezEvents(vertex, vertex.pendingRouteEvents, false, true);
             vertex.pendingRouteEvents.clear();
           } catch (AMUserCodeException e) {
             String msg = "Exception in " + e.getSource() + ", vertex=" + vertex.getLogIdentifier();
@@ -3284,7 +3284,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
       List<TezEvent> inputInfoEvents = iEvent.getEvents();
       try {
         if (inputInfoEvents != null && !inputInfoEvents.isEmpty()) {
-          VertexImpl.handleRoutedTezEvents(vertex, inputInfoEvents, false);
+          VertexImpl.handleRoutedTezEvents(vertex, inputInfoEvents, false, false);
         }
       } catch (AMUserCodeException e) {
         String msg = "Exception in " + e.getSource() + ", vertex:" + vertex.getLogIdentifier();
@@ -3941,7 +3941,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
       boolean recovered = rEvent.isRecovered();
       List<TezEvent> tezEvents = rEvent.getEvents();
       try {
-        VertexImpl.handleRoutedTezEvents(vertex, tezEvents, recovered);
+        VertexImpl.handleRoutedTezEvents(vertex, tezEvents, recovered, false);
       } catch (AMUserCodeException e) {
         String msg = "Exception in " + e.getSource() + ", vertex=" + vertex.getLogIdentifier();
         LOG.error(msg, e);
@@ -3960,9 +3960,10 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
     }
   }
 
-  private static void handleRoutedTezEvents(VertexImpl vertex, List<TezEvent> tezEvents,
boolean recovered) throws AMUserCodeException {
+  private static void handleRoutedTezEvents(VertexImpl vertex, List<TezEvent> tezEvents,
boolean recovered, boolean isPendingEvents) throws AMUserCodeException {
     if (vertex.getAppContext().isRecoveryEnabled()
         && !recovered
+        && !isPendingEvents
         && !tezEvents.isEmpty()) {
       List<TezEvent> recoveryEvents =
           Lists.newArrayList();

http://git-wip-us.apache.org/repos/asf/tez/blob/3894c5ec/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
index 3147093..99ec6cf 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
@@ -145,7 +145,10 @@ import org.apache.tez.dag.app.dag.impl.TestVertexImpl.VertexManagerWithException
 import org.apache.tez.dag.app.rm.TaskSchedulerEventHandler;
 import org.apache.tez.dag.app.rm.container.AMContainerMap;
 import org.apache.tez.dag.app.rm.container.ContainerContextMatcher;
+import org.apache.tez.dag.history.DAGHistoryEvent;
 import org.apache.tez.dag.history.HistoryEventHandler;
+import org.apache.tez.dag.history.HistoryEventType;
+import org.apache.tez.dag.history.events.VertexRecoverableEventsGeneratedEvent;
 import org.apache.tez.dag.library.vertexmanager.InputReadyVertexManager;
 import org.apache.tez.dag.library.vertexmanager.ShuffleVertexManager;
 import org.apache.tez.dag.records.TaskAttemptTerminationCause;
@@ -182,6 +185,9 @@ import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Ignore;
 import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.ArgumentMatcher;
+import org.mockito.Matchers;
 import org.mockito.Mockito;
 import org.mockito.internal.util.collections.Sets;
 
@@ -5528,6 +5534,50 @@ public class TestVertexImpl {
     Assert.assertEquals(VertexTerminationCause.ROOT_INPUT_INIT_FAILURE, v2.getTerminationCause());
   }
 
+  @Test (timeout = 5000)
+  public void testRouteEvent_RecoveredEvent() throws IOException {
+    doReturn(historyEventHandler).when(appContext).getHistoryHandler();
+    doReturn(true).when(appContext).isRecoveryEnabled();
+
+    initAllVertices(VertexState.INITED);
+    VertexImpl v1 = (VertexImpl)vertices.get("vertex1");
+    VertexImpl v2 = (VertexImpl)vertices.get("vertex2");
+    VertexImpl v3 = (VertexImpl)vertices.get("vertex3");
+    startVertex(v1);
+    startVertex(v2);
+    TezTaskID taskId = TezTaskID.getInstance(v1.getVertexId(), 0);
+    v1.handle(new VertexEventTaskCompleted(taskId, TaskState.SUCCEEDED));
+    DataMovementEvent dmEvent = DataMovementEvent.create(0, ByteBuffer.wrap(new byte[0]));
+    TezTaskAttemptID taId = TezTaskAttemptID.getInstance(taskId, 0);
+    TezEvent tezEvent1 = new TezEvent(dmEvent, new EventMetaData(EventProducerConsumerType.OUTPUT,
"vertex1", "vertex3", taId));
+    v1.handle(new VertexEventRouteEvent(v1.getVertexId(), Lists.newArrayList(tezEvent1)));
+    dispatcher.await();
+    assertTrue(v3.pendingTaskEvents.size() != 0);
+    ArgumentCaptor<DAGHistoryEvent> argCaptor = ArgumentCaptor.forClass(DAGHistoryEvent.class);
+    verify(historyEventHandler, atLeast(1)).handle(argCaptor.capture());
+    verifyHistoryEvents(argCaptor.getAllValues(), HistoryEventType.VERTEX_DATA_MOVEMENT_EVENTS_GENERATED,
1);
+
+    v3.scheduleTasks(Lists.newArrayList(new TaskWithLocationHint(0, null)));
+    dispatcher.await();
+    assertTrue(v3.pendingTaskEvents.size() == 0);
+    // recovery events is not only handled one time
+    argCaptor = ArgumentCaptor.forClass(DAGHistoryEvent.class);
+    verify(historyEventHandler, atLeast(1)).handle(argCaptor.capture());
+    verifyHistoryEvents(argCaptor.getAllValues(), HistoryEventType.VERTEX_DATA_MOVEMENT_EVENTS_GENERATED,
1);
+  }
+
+  private void verifyHistoryEvents(List<DAGHistoryEvent> events, HistoryEventType eventType,
int expectedTimes) {
+    int actualTimes = 0;
+    LOG.info("");
+    for (DAGHistoryEvent event : events) {
+      LOG.info(event.getHistoryEvent().getEventType() + "");
+      if (event.getHistoryEvent().getEventType() == eventType) {
+        actualTimes ++;
+      }
+    }
+    Assert.assertEquals(actualTimes, expectedTimes);
+  }
+
   @InterfaceAudience.Private
   public static class RootInputSpecUpdaterVertexManager extends VertexManagerPlugin {
 


Mime
View raw message