tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bi...@apache.org
Subject git commit: TEZ-802. Determination of Task Placement for 1-1 Edges (bikas)
Date Wed, 30 Apr 2014 00:04:35 GMT
Repository: incubator-tez
Updated Branches:
  refs/heads/master 9fa541ba2 -> e9b222e9b


TEZ-802. Determination of Task Placement for 1-1 Edges (bikas)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/e9b222e9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/e9b222e9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/e9b222e9

Branch: refs/heads/master
Commit: e9b222e9b41ef13af962a80dc32d6512a75ad017
Parents: 9fa541b
Author: Bikas Saha <bikas@apache.org>
Authored: Tue Apr 29 17:04:32 2014 -0700
Committer: Bikas Saha <bikas@apache.org>
Committed: Tue Apr 29 17:04:32 2014 -0700

----------------------------------------------------------------------
 .../apache/tez/dag/app/dag/impl/VertexImpl.java | 99 ++++++++++++++------
 .../tez/dag/app/dag/impl/TestVertexImpl.java    | 98 ++++++++++++++++---
 2 files changed, 152 insertions(+), 45 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/e9b222e9/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index f20c1a0..f8fab0d 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -121,6 +121,7 @@ import org.apache.tez.dag.history.events.VertexFinishedEvent;
 import org.apache.tez.dag.history.events.VertexInitializedEvent;
 import org.apache.tez.dag.history.events.VertexParallelismUpdatedEvent;
 import org.apache.tez.dag.history.events.VertexStartedEvent;
+import org.apache.tez.dag.library.vertexmanager.InputReadyVertexManager;
 import org.apache.tez.dag.library.vertexmanager.ShuffleVertexManager;
 import org.apache.tez.dag.records.TezDAGID;
 import org.apache.tez.dag.records.TezTaskAttemptID;
@@ -1726,37 +1727,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
       }
     }
 
-    boolean hasUserVertexManager = vertexPlan.hasVertexManagerPlugin();
-
-    if (hasUserVertexManager) {
-      VertexManagerPluginDescriptor pluginDesc = DagTypeConverters
-          .convertVertexManagerPluginDescriptorFromDAGPlan(vertexPlan
-              .getVertexManagerPlugin());
-      LOG.info("Setting user vertex manager plugin: "
-          + pluginDesc.getClassName() + " on vertex: " + getName());
-      vertexManager = new VertexManager(pluginDesc, this, appContext);
-    } else {
-      if (hasBipartite) {
-        // setup vertex manager
-        // TODO this needs to consider data size and perhaps API.
-        // Currently implicitly BIPARTITE is the only edge type
-        LOG.info("Setting vertexManager to ShuffleVertexManager for "
-            + logIdentifier);
-        vertexManager = new VertexManager(new ShuffleVertexManager(),
-            this, appContext);
-      } else if (inputsWithInitializers != null) {
-        LOG.info("Setting vertexManager to RootInputVertexManager for "
-            + logIdentifier);
-        vertexManager = new VertexManager(new RootInputVertexManager(),
-            this, appContext);
-      } else {
-        // schedule all tasks upon vertex start. Default behavior.
-        LOG.info("Setting vertexManager to ImmediateStartVertexManager for "
-            + logIdentifier);
-        vertexManager = new VertexManager(
-            new ImmediateStartVertexManager(), this, appContext);
-      }
-    }
+    assignVertexManager();
 
     vertexManager.initialize();
 
@@ -1786,6 +1757,72 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
     checkTaskLimits();
     return VertexState.INITED;
   }
+  
+  private void assignVertexManager() {
+    boolean hasBipartite = false;
+    boolean hasOneToOne = false;
+    boolean hasCustom = false;
+    if (sourceVertices != null) {
+      for (Edge edge : sourceVertices.values()) {
+        switch(edge.getEdgeProperty().getDataMovementType()) {
+        case SCATTER_GATHER:
+          hasBipartite = true;
+          break;
+        case ONE_TO_ONE:
+          hasOneToOne = true;
+          break;
+        case BROADCAST:
+          break;
+        case CUSTOM:
+          hasCustom = true;
+          break;
+        default:
+          throw new TezUncheckedException("Unknown data movement type: " + 
+              edge.getEdgeProperty().getDataMovementType());
+        }
+      }
+    }
+    
+    boolean hasUserVertexManager = vertexPlan.hasVertexManagerPlugin();
+
+    if (hasUserVertexManager) {
+      VertexManagerPluginDescriptor pluginDesc = DagTypeConverters
+          .convertVertexManagerPluginDescriptorFromDAGPlan(vertexPlan
+              .getVertexManagerPlugin());
+      LOG.info("Setting user vertex manager plugin: "
+          + pluginDesc.getClassName() + " on vertex: " + getName());
+      vertexManager = new VertexManager(pluginDesc, this, appContext);
+    } else {
+      // Intended order of picking a vertex manager
+      // If there is an InputInitializer then we use the RootInputVertexManager. May be fixed
by TEZ-703
+      // If there is a custom edge we fall back to default ImmediateStartVertexManager
+      // If there is a one to one edge then we use the InputReadyVertexManager
+      // If there is a scatter-gather edge then we use the ShuffleVertexManager
+      // Else we use the default ImmediateStartVertexManager
+      if (inputsWithInitializers != null) {
+        LOG.info("Setting vertexManager to RootInputVertexManager for "
+            + logIdentifier);
+        vertexManager = new VertexManager(new RootInputVertexManager(),
+            this, appContext);
+      } else if (hasOneToOne && !hasCustom) {
+        LOG.info("Setting vertexManager to InputReadyVertexManager for "
+            + logIdentifier);
+        vertexManager = new VertexManager(new InputReadyVertexManager(),
+            this, appContext);
+      } else if (hasBipartite && !hasCustom) {
+        LOG.info("Setting vertexManager to ShuffleVertexManager for "
+            + logIdentifier);
+        vertexManager = new VertexManager(new ShuffleVertexManager(),
+            this, appContext);
+      } else {
+        // schedule all tasks upon vertex start. Default behavior.
+        LOG.info("Setting vertexManager to ImmediateStartVertexManager for "
+            + logIdentifier);
+        vertexManager = new VertexManager(
+            new ImmediateStartVertexManager(), this, appContext);
+      }
+    }
+  }
 
   public static class StartRecoverTransition implements
       MultipleArcTransition<VertexImpl, VertexEvent, VertexState> {

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/e9b222e9/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
index 518a3c5..a813ebf 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
@@ -57,9 +57,14 @@ import org.apache.tez.dag.api.DagTypeConverters;
 import org.apache.tez.dag.api.EdgeManager;
 import org.apache.tez.dag.api.EdgeManagerDescriptor;
 import org.apache.tez.dag.api.EdgeProperty;
+import org.apache.tez.dag.api.EdgeProperty.DataSourceType;
+import org.apache.tez.dag.api.EdgeProperty.SchedulingType;
 import org.apache.tez.dag.api.InputDescriptor;
+import org.apache.tez.dag.api.OutputDescriptor;
+import org.apache.tez.dag.api.ProcessorDescriptor;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.VertexLocationHint;
+import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
 import org.apache.tez.dag.api.VertexLocationHint.TaskLocationHint;
 import org.apache.tez.dag.api.VertexManagerPluginContext.TaskWithLocationHint;
 import org.apache.tez.dag.api.client.VertexStatus;
@@ -106,6 +111,7 @@ import org.apache.tez.dag.app.dag.event.VertexEventType;
 import org.apache.tez.dag.app.dag.impl.DAGImpl.VertexGroupInfo;
 import org.apache.tez.dag.app.rm.TaskSchedulerEventHandler;
 import org.apache.tez.dag.history.HistoryEventHandler;
+import org.apache.tez.dag.library.vertexmanager.InputReadyVertexManager;
 import org.apache.tez.dag.library.vertexmanager.ShuffleVertexManager;
 import org.apache.tez.dag.records.TezDAGID;
 import org.apache.tez.dag.records.TezTaskAttemptID;
@@ -349,6 +355,50 @@ public class TestVertexImpl {
         .build();
     return dag;
   }
+  
+  private DAGPlan createDAGPlanWithMixedEdges() {
+    LOG.info("Setting up mixed edge dag plan");
+    org.apache.tez.dag.api.DAG dag = new org.apache.tez.dag.api.DAG("MixedEdges");
+    org.apache.tez.dag.api.Vertex v1 = new org.apache.tez.dag.api.Vertex("vertex1",
+        new ProcessorDescriptor("v1.class"), 1, Resource.newInstance(0, 0));
+    org.apache.tez.dag.api.Vertex v2 = new org.apache.tez.dag.api.Vertex("vertex2",
+        new ProcessorDescriptor("v2.class"), 1, Resource.newInstance(0, 0));
+    org.apache.tez.dag.api.Vertex v3 = new org.apache.tez.dag.api.Vertex("vertex3",
+        new ProcessorDescriptor("v3.class"), 1, Resource.newInstance(0, 0));
+    org.apache.tez.dag.api.Vertex v4 = new org.apache.tez.dag.api.Vertex("vertex4",
+        new ProcessorDescriptor("v4.class"), 1, Resource.newInstance(0, 0));
+    org.apache.tez.dag.api.Vertex v5 = new org.apache.tez.dag.api.Vertex("vertex5",
+        new ProcessorDescriptor("v5.class"), 1, Resource.newInstance(0, 0));
+    org.apache.tez.dag.api.Vertex v6 = new org.apache.tez.dag.api.Vertex("vertex6",
+        new ProcessorDescriptor("v6.class"), 1, Resource.newInstance(0, 0));
+    dag.addVertex(v1).addVertex(v2).addVertex(v3).addVertex(v4).addVertex(v5).addVertex(v6);
+    dag.addEdge(new org.apache.tez.dag.api.Edge(v1, v2, 
+        new EdgeProperty(DataMovementType.BROADCAST, DataSourceType.PERSISTED, 
+            SchedulingType.SEQUENTIAL, new OutputDescriptor("out.class"), 
+            new InputDescriptor("out.class")))); 
+    dag.addEdge(new org.apache.tez.dag.api.Edge(v1, v3, 
+        new EdgeProperty(DataMovementType.BROADCAST, DataSourceType.PERSISTED, 
+            SchedulingType.SEQUENTIAL, new OutputDescriptor("out.class"), 
+            new InputDescriptor("out.class"))));
+    dag.addEdge(new org.apache.tez.dag.api.Edge(v4, v2, 
+        new EdgeProperty(DataMovementType.SCATTER_GATHER, DataSourceType.PERSISTED, 
+            SchedulingType.SEQUENTIAL, new OutputDescriptor("out.class"), 
+            new InputDescriptor("out.class"))));
+    dag.addEdge(new org.apache.tez.dag.api.Edge(v5, v3, 
+        new EdgeProperty(DataMovementType.ONE_TO_ONE, DataSourceType.PERSISTED, 
+            SchedulingType.SEQUENTIAL, new OutputDescriptor("out.class"), 
+            new InputDescriptor("out.class"))));
+    dag.addEdge(new org.apache.tez.dag.api.Edge(v4, v6, 
+        new EdgeProperty(DataMovementType.SCATTER_GATHER, DataSourceType.PERSISTED, 
+            SchedulingType.SEQUENTIAL, new OutputDescriptor("out.class"), 
+            new InputDescriptor("out.class"))));
+    dag.addEdge(new org.apache.tez.dag.api.Edge(v5, v6, 
+        new EdgeProperty(DataMovementType.ONE_TO_ONE, DataSourceType.PERSISTED, 
+            SchedulingType.SEQUENTIAL, new OutputDescriptor("out.class"), 
+            new InputDescriptor("out.class"))));
+   
+    return dag.createDag(conf);
+  }
 
   private DAGPlan createDAGPlanWithInputInitializer(String initializerClassName) {
     LOG.info("Setting up invalid dag plan with input initializer");
@@ -714,12 +764,7 @@ public class TestVertexImpl {
                     .setInputVertexName("vertex1")
                     .setEdgeSource(TezEntityDescriptorProto.newBuilder().setClassName("o1"))
                     .setOutputVertexName("vertex3")
-                    .setDataMovementType(PlanEdgeDataMovementType.CUSTOM)
-                    .setEdgeManager(
-                        TezEntityDescriptorProto.newBuilder()
-                        .setClassName(EdgeManagerForTest.class.getName())
-                        .setUserPayload(ByteString.copyFrom(edgePayload))
-                        .build())
+                    .setDataMovementType(PlanEdgeDataMovementType.SCATTER_GATHER)
                     .setId("e1")
                     .setDataSourceType(PlanEdgeDataSourceType.PERSISTED)
                     .setSchedulingType(PlanEdgeSchedulingType.SEQUENTIAL)
@@ -755,7 +800,12 @@ public class TestVertexImpl {
                     .setInputVertexName("vertex3")
                     .setEdgeSource(TezEntityDescriptorProto.newBuilder().setClassName("o3_v5"))
                     .setOutputVertexName("vertex5")
-                    .setDataMovementType(PlanEdgeDataMovementType.SCATTER_GATHER)
+                    .setDataMovementType(PlanEdgeDataMovementType.CUSTOM)
+                    .setEdgeManager(
+                        TezEntityDescriptorProto.newBuilder()
+                        .setClassName(EdgeManagerForTest.class.getName())
+                        .setUserPayload(ByteString.copyFrom(edgePayload))
+                        .build())
                     .setId("e4")
                     .setDataSourceType(PlanEdgeDataSourceType.PERSISTED)
                     .setSchedulingType(PlanEdgeSchedulingType.SEQUENTIAL)
@@ -1466,7 +1516,7 @@ public class TestVertexImpl {
   @Test(timeout = 5000)
   public void testSetCustomEdgeManager() throws UnsupportedEncodingException {
     initAllVertices(VertexState.INITED);
-    Edge edge = edges.get("e1");
+    Edge edge = edges.get("e4");
     EdgeManager em = edge.getEdgeManager();
     EdgeManagerForTest originalEm = (EdgeManagerForTest) em;
     Assert.assertTrue(Arrays.equals(edgePayload, originalEm.getEdgeManagerContext()
@@ -1477,18 +1527,18 @@ public class TestVertexImpl {
         new EdgeManagerDescriptor(EdgeManagerForTest.class.getName());
     edgeManagerDescriptor.setUserPayload(userPayload);
 
-    Vertex v1 = vertices.get("vertex1");
-    Vertex v3 = vertices.get("vertex3"); // Vertex3 linked to v1 (v1 src, v3
+    Vertex v3 = vertices.get("vertex3");
+    Vertex v5 = vertices.get("vertex5"); // Vertex5 linked to v3 (v3 src, v5
                                          // dest)
 
     Map<String, EdgeManagerDescriptor> edgeManagerDescriptors =
-        Collections.singletonMap(v1.getName(), edgeManagerDescriptor);
-    Assert.assertTrue(v3.setParallelism(v3.getTotalTasks() - 1, null,
+        Collections.singletonMap(v3.getName(), edgeManagerDescriptor);
+    Assert.assertTrue(v5.setParallelism(v5.getTotalTasks() - 1, null,
         edgeManagerDescriptors)); // Must decrease.
 
-    VertexImpl v3Impl = (VertexImpl) v3;
+    VertexImpl v5Impl = (VertexImpl) v5;
 
-    EdgeManager modifiedEdgeManager = v3Impl.sourceVertices.get(v1)
+    EdgeManager modifiedEdgeManager = v5Impl.sourceVertices.get(v3)
         .getEdgeManager();
     Assert.assertNotNull(modifiedEdgeManager);
     Assert.assertTrue(modifiedEdgeManager instanceof EdgeManagerForTest);
@@ -2086,6 +2136,26 @@ public class TestVertexImpl {
     Assert.assertEquals(1, committer.setupCounter);
   }
 
+  @Test(timeout = 5000)
+  public void testVertexManagerHeuristic() {
+    setupPreDagCreation();
+    dagPlan = createDAGPlanWithMixedEdges();
+    setupPostDagCreation();
+    initAllVertices(VertexState.INITED);
+    Assert.assertEquals(ImmediateStartVertexManager.class, 
+        vertices.get("vertex1").getVertexManager().getPlugin().getClass());
+    Assert.assertEquals(ShuffleVertexManager.class, 
+        vertices.get("vertex2").getVertexManager().getPlugin().getClass());
+    Assert.assertEquals(InputReadyVertexManager.class, 
+        vertices.get("vertex3").getVertexManager().getPlugin().getClass());
+    Assert.assertEquals(ImmediateStartVertexManager.class, 
+        vertices.get("vertex4").getVertexManager().getPlugin().getClass());
+    Assert.assertEquals(ImmediateStartVertexManager.class, 
+        vertices.get("vertex5").getVertexManager().getPlugin().getClass());
+    Assert.assertEquals(InputReadyVertexManager.class, 
+        vertices.get("vertex6").getVertexManager().getPlugin().getClass());
+  }
+
 
   @Test(timeout = 5000)
   public void testVertexWithOneToOneSplit() {


Mime
View raw message