tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bi...@apache.org
Subject tez git commit: TEZ-2036. OneToOneEdgeManager should enforce that source and destination tasks have same number (bikas) (cherry picked from commit 5f3a9a3edb51f029bb3754b67e3b91748c993474)
Date Fri, 06 Feb 2015 02:58:23 GMT
Repository: tez
Updated Branches:
  refs/heads/branch-0.5 c0b4b8a82 -> fe2163e0d


TEZ-2036. OneToOneEdgeManager should enforce that source and destination tasks have same number
(bikas)
(cherry picked from commit 5f3a9a3edb51f029bb3754b67e3b91748c993474)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/fe2163e0
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/fe2163e0
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/fe2163e0

Branch: refs/heads/branch-0.5
Commit: fe2163e0dc9544d0d5b053a7ec5bd5d417454e86
Parents: c0b4b8a
Author: Bikas Saha <bikas@apache.org>
Authored: Thu Feb 5 18:50:42 2015 -0800
Committer: Bikas Saha <bikas@apache.org>
Committed: Thu Feb 5 18:57:06 2015 -0800

----------------------------------------------------------------------
 CHANGES.txt                                     |  2 +
 .../dag/app/dag/impl/OneToOneEdgeManager.java   |  8 ++++
 .../apache/tez/dag/app/dag/impl/TestEdge.java   | 39 +++++++++++++++++++-
 3 files changed, 47 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/fe2163e0/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index bc7e09e..3aed18a 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -4,6 +4,8 @@ Apache Tez Change Log
 Release 0.5.4: Unreleased
 
 ALL CHANGES:
+  TEZ-2036. OneToOneEdgeManager should enforce that source and destination
+  tasks have same number
   TEZ-1895. Vertex reRunning should decrease successfulMembers of VertexGroupInfo.
   TEZ-2020. For 1-1 edge vertex configured event may be sent incorrectly
   TEZ-2015. VertexImpl.doneReconfiguringVertex() should check other criteria

http://git-wip-us.apache.org/repos/asf/tez/blob/fe2163e0/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/OneToOneEdgeManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/OneToOneEdgeManager.java
b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/OneToOneEdgeManager.java
index ced481d..11a6483 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/OneToOneEdgeManager.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/OneToOneEdgeManager.java
@@ -27,6 +27,8 @@ import org.apache.tez.dag.api.EdgeManagerPluginContext;
 import org.apache.tez.runtime.api.events.DataMovementEvent;
 import org.apache.tez.runtime.api.events.InputReadErrorEvent;
 
+import com.google.common.base.Preconditions;
+
 public class OneToOneEdgeManager extends EdgeManagerPlugin {
 
   List<Integer> destinationInputIndices = 
@@ -55,6 +57,12 @@ public class OneToOneEdgeManager extends EdgeManagerPlugin {
   public void routeDataMovementEventToDestination(DataMovementEvent event,
       int sourceTaskIndex, int sourceOutputIndex, 
       Map<Integer, List<Integer>> destinationTaskAndInputIndices) {
+    // by the time routing is initiated all task counts must be determined and stable
+    Preconditions.checkState(getContext().getSourceVertexNumTasks() == getContext()
+        .getDestinationVertexNumTasks(), "1-1 source and destination task counts must match."
+        + " Destination: " + getContext().getDestinationVertexName() + " tasks: "
+        + getContext().getDestinationVertexNumTasks() + " Source: "
+        + getContext().getSourceVertexName() + " tasks: " + getContext().getSourceVertexNumTasks());
     destinationTaskAndInputIndices.put(sourceTaskIndex, destinationInputIndices);
   }
   

http://git-wip-us.apache.org/repos/asf/tez/blob/fe2163e0/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestEdge.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestEdge.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestEdge.java
index 31c8064..a607f1b 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestEdge.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestEdge.java
@@ -26,14 +26,18 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.reset;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
 import java.nio.ByteBuffer;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
 import java.util.Map.Entry;
 
 import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.tez.dag.api.EdgeManagerPluginContext;
 import org.apache.tez.dag.api.EdgeProperty;
 import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
 import org.apache.tez.dag.api.EdgeProperty.DataSourceType;
@@ -51,13 +55,45 @@ import org.apache.tez.runtime.api.events.DataMovementEvent;
 import org.apache.tez.runtime.api.impl.EventMetaData;
 import org.apache.tez.runtime.api.impl.EventMetaData.EventProducerConsumerType;
 import org.apache.tez.runtime.api.impl.TezEvent;
+import org.junit.Assert;
 import org.junit.Test;
 import org.mockito.ArgumentCaptor;
 
+import com.google.common.collect.Maps;
+
 public class TestEdge {
 
+  @Test (timeout = 5000)
+  public void testOneToOneEdgeManager() {
+    EdgeManagerPluginContext mockContext = mock(EdgeManagerPluginContext.class);
+    when(mockContext.getSourceVertexName()).thenReturn("Source");
+    when(mockContext.getDestinationVertexName()).thenReturn("Destination");
+    when(mockContext.getSourceVertexNumTasks()).thenReturn(3);
+    OneToOneEdgeManager manager = new OneToOneEdgeManager(mockContext);
+    manager.initialize();
+    Map<Integer, List<Integer>> destinationTaskAndInputIndices = Maps.newHashMap();
+    DataMovementEvent event = DataMovementEvent.create(1, null);
+
+    // fail when source and destination are inconsistent
+    when(mockContext.getDestinationVertexNumTasks()).thenReturn(4);
+    try {
+      manager.routeDataMovementEventToDestination(event, 1, 1, destinationTaskAndInputIndices);
+      Assert.fail();
+    } catch (IllegalStateException e) {
+      Assert.assertTrue(e.getMessage().contains("1-1 source and destination task counts must
match"));
+    }
+    
+    // now make it consistent
+    when(mockContext.getDestinationVertexNumTasks()).thenReturn(3);
+    manager.routeDataMovementEventToDestination(event, 1, 1, destinationTaskAndInputIndices);
+    Assert.assertEquals(1, destinationTaskAndInputIndices.size());
+    Assert.assertEquals(1, destinationTaskAndInputIndices.entrySet().iterator().next().getKey()
+        .intValue());
+    Assert.assertEquals(0, destinationTaskAndInputIndices.entrySet().iterator().next().getValue()
+        .get(0).intValue());
+  }
 
-  @SuppressWarnings({ "rawtypes", "unchecked" })
+  @SuppressWarnings({ "rawtypes" })
   @Test (timeout = 5000)
   public void testCompositeEventHandling() throws AMUserCodeException {
     EventHandler eventHandler = mock(EventHandler.class);
@@ -107,7 +143,6 @@ public class TestEdge {
     verifyEvents(srcTAID, destTasks);
   }
 
-  @SuppressWarnings("rawtypes")
   private void verifyEvents(TezTaskAttemptID srcTAID, LinkedHashMap<TezTaskID, Task>
destTasks) {
     int count = 0;
 


Mime
View raw message