tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ss...@apache.org
Subject [10/50] [abbrv] tez git commit: TEZ-2599. Dont send obsoleted data movement events to tasks (bikas)
Date Wed, 15 Jul 2015 00:25:43 GMT
TEZ-2599. Dont send obsoleted data movement events to tasks (bikas)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/aca83090
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/aca83090
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/aca83090

Branch: refs/heads/TEZ-2003
Commit: aca83090ed2580d093a1fec59217d78f2e575a6c
Parents: 2073351
Author: Bikas Saha <bikas@apache.org>
Authored: Sat Jul 4 01:17:24 2015 -0700
Committer: Bikas Saha <bikas@apache.org>
Committed: Sat Jul 4 01:17:24 2015 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../apache/tez/dag/app/dag/impl/VertexImpl.java |  52 ++++++--
 .../tez/dag/app/dag/impl/TestVertexImpl.java    | 133 +++++++++++++++++++
 .../java/org/apache/tez/test/TestInput.java     |   4 +-
 4 files changed, 180 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/aca83090/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 85ff94a..5a62b45 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -11,6 +11,7 @@ INCOMPATIBLE CHANGES
   TEZ-2468. Change the minimum Java version to Java 7.
 
 ALL CHANGES:
+  TEZ-2599. Dont send obsoleted data movement events to tasks
   TEZ-2542. TezDAGID fromString array length check.
   TEZ-2296. Add option to print counters for tez-examples.
   TEZ-2570. Fix license header issue for eps image files.

http://git-wip-us.apache.org/repos/asf/tez/blob/aca83090/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index a9bcdd8..8d8a2de 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -77,7 +77,6 @@ import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.TezUncheckedException;
 import org.apache.tez.dag.api.VertexLocationHint;
 import org.apache.tez.dag.api.TaskLocationHint;
-import org.apache.tez.dag.api.VertexManagerPluginContext;
 import org.apache.tez.dag.api.VertexManagerPluginContext.TaskWithLocationHint;
 import org.apache.tez.dag.api.VertexManagerPluginDescriptor;
 import org.apache.tez.dag.api.client.ProgressBuilder;
@@ -793,6 +792,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
EventHandl
     final TezEvent tezEvent;
     final Edge eventEdge;
     final int eventTaskIndex;
+    boolean isObsolete = false;
     EventInfo(TezEvent tezEvent, Edge eventEdge, int eventTaskIndex) {
       this.tezEvent = tezEvent;
       this.eventEdge = eventEdge;
@@ -4092,12 +4092,18 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
EventHandl
           Preconditions.checkState(taskIndex < tasks.size(), "Invalid task index for TA:
" + attemptID
               + " vertex: " + getLogIdentifier());
           boolean isFirstEvent = true;
+          boolean firstEventObsoleted = false;
           for (nextFromEventId = fromEventId; nextFromEventId < currEventCount; ++nextFromEventId)
{
             boolean earlyExit = false;
             if (events.size() == maxEvents) {
               break;
             }
             EventInfo eventInfo = onDemandRouteEvents.get(nextFromEventId);
+            if (eventInfo.isObsolete) {
+              // ignore obsolete events
+              firstEventObsoleted = true;
+              continue;
+            }
             TezEvent tezEvent = eventInfo.tezEvent;
             switch(tezEvent.getEventType()) {
             case INPUT_FAILED_EVENT:
@@ -4108,11 +4114,19 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
EventHandl
                 Edge srcEdge = eventInfo.eventEdge;
                 PendingEventRouteMetadata pendingRoute = null;
                 if (isFirstEvent) {
-                  // do this precondition check only for the first event
+                  // the first event is the one that can have pending routes because its
expanded
+                  // events had not been completely sent in the last round.
                   isFirstEvent = false;
                   pendingRoute = srcEdge.removePendingEvents(attemptID);
                   if (pendingRoute != null) {
-                    Preconditions.checkState(tezEvent == pendingRoute.getTezEvent()); //
same object
+                    // the first event must match the pending route event
+                    // the only reason it may not match is if in between rounds that event
got
+                    // obsoleted
+                    if(tezEvent != pendingRoute.getTezEvent()) {
+                      Preconditions.checkState(firstEventObsoleted);
+                      // pending routes can be ignored for obsoleted events
+                      pendingRoute = null;
+                    }
                   }
                 }
                 if (!srcEdge.maybeAddTezEventForDestinationTask(tezEvent, attemptID, srcTaskIndex,
@@ -4237,12 +4251,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
EventHandl
                     getLogIdentifier());
               }
               if (srcEdge.hasOnDemandRouting()) {
-                onDemandRouteEventsWriteLock.lock();
-                try {
-                  onDemandRouteEvents.add(new EventInfo(tezEvent, srcEdge, srcTaskIndex));
-                } finally {
-                  onDemandRouteEventsWriteLock.unlock();
-                }
+                processOnDemandEvent(tezEvent, srcEdge, srcTaskIndex);
               } else {
                 // send to tasks            
                 srcEdge.sendTezEventToDestinationTasks(tezEvent);
@@ -4367,6 +4376,31 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
EventHandl
       }
     }
   }
+  
+  private void processOnDemandEvent(TezEvent tezEvent, Edge srcEdge, int srcTaskIndex) {
+    onDemandRouteEventsWriteLock.lock();
+    try {
+      onDemandRouteEvents.add(new EventInfo(tezEvent, srcEdge, srcTaskIndex));
+      if (tezEvent.getEventType() == EventType.INPUT_FAILED_EVENT) {
+        for (EventInfo eventInfo : onDemandRouteEvents) {
+          if (eventInfo.eventEdge == srcEdge 
+              && eventInfo.tezEvent.getSourceInfo().getTaskAttemptID().equals(
+                 tezEvent.getSourceInfo().getTaskAttemptID())
+              && (eventInfo.tezEvent.getEventType() == EventType.DATA_MOVEMENT_EVENT
+                  || eventInfo.tezEvent
+                      .getEventType() == EventType.COMPOSITE_DATA_MOVEMENT_EVENT)) {
+            // any earlier data movement events from the same source
+            // edge+task
+            // can be obsoleted by an input failed event from the
+            // same source edge+task
+            eventInfo.isObsolete = true;
+          }
+        }
+      }
+    } finally {
+      onDemandRouteEventsWriteLock.unlock();
+    }
+  }
 
   private static class InternalErrorTransition implements
       SingleArcTransition<VertexImpl, VertexEvent> {

http://git-wip-us.apache.org/repos/asf/tez/blob/aca83090/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
index 91465b5..98ef973 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
@@ -171,6 +171,7 @@ import org.apache.tez.runtime.api.events.CompositeDataMovementEvent;
 import org.apache.tez.runtime.api.events.DataMovementEvent;
 import org.apache.tez.runtime.api.events.InputConfigureVertexTasksEvent;
 import org.apache.tez.runtime.api.events.InputDataInformationEvent;
+import org.apache.tez.runtime.api.events.InputFailedEvent;
 import org.apache.tez.runtime.api.events.InputInitializerEvent;
 import org.apache.tez.runtime.api.events.InputUpdatePayloadEvent;
 import org.apache.tez.runtime.api.events.TaskAttemptFailedEvent;
@@ -179,6 +180,7 @@ import org.apache.tez.test.EdgeManagerForTest;
 import org.apache.tez.test.VertexManagerPluginForTest;
 import org.apache.tez.test.VertexManagerPluginForTest.VertexManagerPluginForTestConfig;
 import org.apache.tez.runtime.api.impl.EventMetaData;
+import org.apache.tez.runtime.api.impl.EventType;
 import org.apache.tez.runtime.api.impl.EventMetaData.EventProducerConsumerType;
 import org.apache.tez.runtime.api.impl.GroupInputSpec;
 import org.apache.tez.runtime.api.impl.InputSpec;
@@ -2546,6 +2548,13 @@ public class TestVertexImpl {
     Assert.assertEquals(11, eventInfo.getNextFromEventId()); // all events traversed
     Assert.assertEquals(0, eventInfo.getEvents().size()); // no events
     
+    // ask for events with sufficient buffer. get all events in a single shot.
+    fromEventId = 0;
+    eventInfo = v4.getTaskAttemptTezEvents(v4TaId, fromEventId, 0, 100);
+    fromEventId = eventInfo.getNextFromEventId();
+    Assert.assertEquals(11, fromEventId);
+    Assert.assertEquals(11, eventInfo.getEvents().size());
+    
     // change max events to larger value. max events does not evenly divide total events
     fromEventId = 0;
     for (int i=1; i<=2; ++i) {
@@ -2575,6 +2584,130 @@ public class TestVertexImpl {
     Assert.assertEquals(11, eventInfo.getNextFromEventId()); // all events traversed
     Assert.assertEquals(2, eventInfo.getEvents().size()); // remainder events
   }
+  
+  @Test (timeout = 5000)
+  public void testVertexGetTAAttemptsObsoletion() throws Exception {
+    initAllVertices(VertexState.INITED);
+    VertexImpl v1 = vertices.get("vertex1");
+    startVertex(v1);
+    VertexImpl v2 = vertices.get("vertex2");
+    startVertex(v2);
+    VertexImpl v3 = vertices.get("vertex3");
+    VertexImpl v4 = vertices.get("vertex4");
+    
+    List<TaskWithLocationHint> taskList = new LinkedList<VertexManagerPluginContext.TaskWithLocationHint>();
+    // scheduling start to trigger edge routing to begin
+    for (int i=0; i<v4.getTotalTasks(); ++i) {
+      taskList.add(new TaskWithLocationHint(i, null));
+    }
+    v4.scheduleTasks(taskList);
+    Assert.assertEquals(VertexState.RUNNING, v4.getState());
+    Assert.assertEquals(1, v4.sourceVertices.size());
+    Edge e = v4.sourceVertices.get(v3);
+    TezTaskAttemptID v3TaId = TezTaskAttemptID.getInstance(
+        TezTaskID.getInstance(v3.getVertexId(), 0), 0);
+    TezTaskAttemptID v4TaId = TezTaskAttemptID.getInstance(
+        TezTaskID.getInstance(v4.getVertexId(), 0), 0);
+    
+    for (int i=0; i<11; ++i) {
+      v4.handle(new VertexEventRouteEvent(v4.getVertexId(), Collections.singletonList(
+          new TezEvent(DataMovementEvent.create(0, null), 
+              new EventMetaData(EventProducerConsumerType.OUTPUT, v3.getName(), v3.getName(),
v3TaId)))));
+    }
+    dispatcher.await();
+    // verify all events have been are in taskEvents
+    Assert.assertEquals(11, v4.getOnDemandRouteEvents().size());
+    
+    TaskAttemptEventInfo eventInfo;
+    EdgeManagerPluginOnDemand mockPlugin = mock(EdgeManagerPluginOnDemand.class);
+    EventRouteMetadata mockRoute = EventRouteMetadata.create(1, new int[]{0});
+    e.edgeManager = mockPlugin;
+    when(
+        mockPlugin.routeInputSourceTaskFailedEventToDestination(anyInt(),
+            anyInt())).thenReturn(mockRoute);
+    when(
+        mockPlugin.routeDataMovementEventToDestination(anyInt(),
+            anyInt(), anyInt())).thenReturn(mockRoute);
+    
+    // send an input failed event
+    v4.handle(new VertexEventRouteEvent(v4.getVertexId(), Collections.singletonList(
+        new TezEvent(InputFailedEvent.create(0, 0), 
+            new EventMetaData(EventProducerConsumerType.OUTPUT, v3.getName(), v3.getName(),
v3TaId)))));
+
+    // ask for events with sufficient buffer. get only input failed event. all DM events
obsoleted
+    int fromEventId = 0;
+    eventInfo = v4.getTaskAttemptTezEvents(v4TaId, fromEventId, 0, 100);
+    fromEventId = eventInfo.getNextFromEventId();
+    Assert.assertEquals(12, fromEventId);
+    Assert.assertEquals(1, eventInfo.getEvents().size());
+    Assert.assertEquals(EventType.INPUT_FAILED_EVENT, eventInfo.getEvents().get(0).getEventType());
+  }
+  
+  @Test (timeout = 5000)
+  public void testVertexGetTAAttemptsObsoletionWithPendingRoutes() throws Exception {
+    initAllVertices(VertexState.INITED);
+    VertexImpl v1 = vertices.get("vertex1");
+    startVertex(v1);
+    VertexImpl v2 = vertices.get("vertex2");
+    startVertex(v2);
+    VertexImpl v3 = vertices.get("vertex3");
+    VertexImpl v4 = vertices.get("vertex4");
+    
+    List<TaskWithLocationHint> taskList = new LinkedList<VertexManagerPluginContext.TaskWithLocationHint>();
+    // scheduling start to trigger edge routing to begin
+    for (int i=0; i<v4.getTotalTasks(); ++i) {
+      taskList.add(new TaskWithLocationHint(i, null));
+    }
+    v4.scheduleTasks(taskList);
+    Assert.assertEquals(VertexState.RUNNING, v4.getState());
+    Assert.assertEquals(1, v4.sourceVertices.size());
+    Edge e = v4.sourceVertices.get(v3);
+    TezTaskAttemptID v3TaId = TezTaskAttemptID.getInstance(
+        TezTaskID.getInstance(v3.getVertexId(), 0), 0);
+    TezTaskAttemptID v4TaId = TezTaskAttemptID.getInstance(
+        TezTaskID.getInstance(v4.getVertexId(), 0), 0);
+    
+    for (int i=0; i<11; ++i) {
+      v4.handle(new VertexEventRouteEvent(v4.getVertexId(), Collections.singletonList(
+          new TezEvent(DataMovementEvent.create(0, null), 
+              new EventMetaData(EventProducerConsumerType.OUTPUT, v3.getName(), v3.getName(),
v3TaId)))));
+    }
+    dispatcher.await();
+    // verify all events have been are in taskEvents
+    Assert.assertEquals(11, v4.getOnDemandRouteEvents().size());
+    
+    TaskAttemptEventInfo eventInfo;
+    EdgeManagerPluginOnDemand mockPlugin = mock(EdgeManagerPluginOnDemand.class);
+    EventRouteMetadata mockFailedRoute = EventRouteMetadata.create(1, new int[]{0});
+    e.edgeManager = mockPlugin;
+    when(
+        mockPlugin.routeInputSourceTaskFailedEventToDestination(anyInt(),
+            anyInt())).thenReturn(mockFailedRoute);
+
+    // return more events that dont evenly fit in max size
+    EventRouteMetadata mockRoute = EventRouteMetadata.create(2, new int[]{0, 0});    
+    when(
+        mockPlugin.routeDataMovementEventToDestination(anyInt(),
+            anyInt(), anyInt())).thenReturn(mockRoute);
+    
+    int fromEventId = 0;
+    eventInfo = v4.getTaskAttemptTezEvents(v4TaId, fromEventId, 0, 5);
+    fromEventId = eventInfo.getNextFromEventId();
+    Assert.assertEquals(2, fromEventId); // 0-1 events expanded and fit, 2nd event has pending
routes
+    Assert.assertEquals(5, eventInfo.getEvents().size());
+    
+    // send an input failed event
+    v4.handle(new VertexEventRouteEvent(v4.getVertexId(), Collections.singletonList(
+        new TezEvent(InputFailedEvent.create(0, 0), 
+            new EventMetaData(EventProducerConsumerType.OUTPUT, v3.getName(), v3.getName(),
v3TaId)))));
+
+    // get only input failed event. all DM events obsoleted
+    eventInfo = v4.getTaskAttemptTezEvents(v4TaId, fromEventId, 0, 5);
+    fromEventId = eventInfo.getNextFromEventId();
+    Assert.assertEquals(12, fromEventId);
+    Assert.assertEquals(1, eventInfo.getEvents().size());
+    Assert.assertEquals(EventType.INPUT_FAILED_EVENT, eventInfo.getEvents().get(0).getEventType());
+  }
 
   @Test(timeout = 5000)
   public void testVertexReconfigurePlannedAfterInit() throws Exception {

http://git-wip-us.apache.org/repos/asf/tez/blob/aca83090/tez-tests/src/test/java/org/apache/tez/test/TestInput.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestInput.java b/tez-tests/src/test/java/org/apache/tez/test/TestInput.java
index eeb565c..87ca93d 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/TestInput.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/TestInput.java
@@ -343,7 +343,9 @@ public class TestInput extends AbstractLogicalInput {
             dmEvent.getUserPayload().getInt();
       } else if (event instanceof InputFailedEvent) {
         InputFailedEvent ifEvent = (InputFailedEvent) event;
-        numCompletedInputs--;
+        if (this.completedInputVersion[ifEvent.getTargetIndex()] == ifEvent.getVersion())
{
+          numCompletedInputs--;
+        }
         LOG.info("Received InputFailed event targetId: " + ifEvent.getTargetIndex() +
             " version: " + ifEvent.getVersion() +
             " numInputs: " + getNumPhysicalInputs() +


Mime
View raw message