tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hit...@apache.org
Subject tez git commit: TEZ-3356. Fix initializing of stats when custom ShuffleVertexManager is used. (Peter Slawski via hitesh)
Date Tue, 19 Jul 2016 21:37:18 GMT
Repository: tez
Updated Branches:
  refs/heads/branch-0.8 62dd11ed0 -> 71ec8564f


TEZ-3356. Fix initializing of stats when custom ShuffleVertexManager is used. (Peter Slawski
via hitesh)

(cherry picked from commit 8bfbdfefa4bf9a87acd90eb4582fe6a621fb2389)

Conflicts:
	CHANGES.txt


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/71ec8564
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/71ec8564
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/71ec8564

Branch: refs/heads/branch-0.8
Commit: 71ec8564f72de5277f11592d9dc3626ac95166fc
Parents: 62dd11e
Author: Hitesh Shah <hitesh@apache.org>
Authored: Tue Jul 19 14:29:56 2016 -0700
Committer: Hitesh Shah <hitesh@apache.org>
Committed: Tue Jul 19 14:34:20 2016 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../tez/dag/app/dag/impl/TestVertexImpl.java    | 167 +++++++++++++++++--
 .../test/GraceShuffleVertexManagerForTest.java  | 159 ++++++++++++++++++
 .../vertexmanager/ShuffleVertexManager.java     |   8 +-
 4 files changed, 319 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/71ec8564/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 8684efa..1419aa6 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES
 
 ALL CHANGES:
 
+  TEZ-3356. Fix initializing of stats when custom ShuffleVertexManager is used.
   TEZ-3329. Tez ATS data is incomplete for a vertex which fails or gets killed before initialization.
   TEZ-3235. Modify Example TestOrderedWordCount job to test the IPC limit for large dag plans.
   TEZ-3337. Do not log empty fields of TaskAttemptFinishedEvent to avoid confusion.

http://git-wip-us.apache.org/repos/asf/tez/blob/71ec8564/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
index d165272..06ae442 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
@@ -40,6 +40,7 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Locale;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.Callable;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -61,6 +62,7 @@ import org.apache.tez.runtime.api.TaskFailureType;
 import org.apache.tez.runtime.api.VertexStatistics;
 import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils;
 import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads;
+import org.apache.tez.test.GraceShuffleVertexManagerForTest;
 import org.roaringbitmap.RoaringBitmap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -2256,6 +2258,102 @@ public class TestVertexImpl {
     return dag;
   }
 
+  private DAGPlan createDAGPlanForGraceParallelism() throws IOException {
+    LOG.info("Setting up grace parallelism dag plan");
+    return DAGPlan.newBuilder()
+        .setName("GraceParallelismDAG")
+        .addVertex(
+            VertexPlan.newBuilder()
+                .setName("A")
+                .setProcessorDescriptor(TezEntityDescriptorProto.newBuilder().setClassName("A.class"))
+                .setType(PlanVertexType.NORMAL)
+                .setTaskConfig(
+                    PlanTaskConfiguration.newBuilder()
+                        .setNumTasks(1)
+                        .setVirtualCores(4)
+                        .setMemoryMb(1024)
+                        .setJavaOpts("")
+                        .setTaskModule("")
+                        .build()
+                )
+                .addOutEdgeId("A_B")
+                .build()
+        )
+        .addVertex(
+            VertexPlan.newBuilder()
+                .setName("B")
+                .setProcessorDescriptor(TezEntityDescriptorProto.newBuilder().setClassName("B.class"))
+                .setType(PlanVertexType.NORMAL)
+                .setTaskConfig(
+                    PlanTaskConfiguration.newBuilder()
+                        .setNumTasks(1)
+                        .setVirtualCores(4)
+                        .setMemoryMb(1024)
+                        .setJavaOpts("")
+                        .setTaskModule("")
+                        .build()
+                )
+                .addInEdgeId("A_B")
+                .addOutEdgeId("B_C")
+                .build()
+        )
+        .addVertex(
+            VertexPlan.newBuilder()
+                .setName("C")
+                .setProcessorDescriptor(TezEntityDescriptorProto.newBuilder().setClassName("C.class"))
+                .setType(PlanVertexType.NORMAL)
+                .setTaskConfig(
+                    PlanTaskConfiguration.newBuilder()
+                        .setNumTasks(-1)
+                        .setVirtualCores(4)
+                        .setMemoryMb(1024)
+                        .setJavaOpts("")
+                        .setTaskModule("")
+                        .build()
+                )
+                .setVertexManagerPlugin(TezEntityDescriptorProto.newBuilder()
+                    .setClassName(GraceShuffleVertexManagerForTest.class.getName())
+                    .setTezUserPayload(
+                        DAGProtos.TezUserPayloadProto.newBuilder()
+                            .setUserPayload(
+                                GraceShuffleVertexManagerForTest.newConfBuilder()
+                                    .setGrandparentVertex("A")
+                                    .setDesiredParallelism(1)
+                                    .toByteString()
+                            )
+                            .build()
+                    )
+                )
+                .addInEdgeId("B_C")
+                .build()
+        )
+        .addEdge(
+            EdgePlan.newBuilder()
+                .setEdgeDestination(TezEntityDescriptorProto.newBuilder().setClassName("A_B"))
+                .setInputVertexName("A")
+                .setEdgeSource(TezEntityDescriptorProto.newBuilder().setClassName("A_B.class"))
+                .setOutputVertexName("B")
+                .setDataMovementType(PlanEdgeDataMovementType.SCATTER_GATHER)
+                .setId("A_B")
+                .setDataSourceType(PlanEdgeDataSourceType.PERSISTED)
+                .setSchedulingType(PlanEdgeSchedulingType.SEQUENTIAL)
+                .build()
+        )
+        .addEdge(
+            EdgePlan.newBuilder()
+                .setEdgeDestination(TezEntityDescriptorProto.newBuilder().setClassName("B_C"))
+                .setInputVertexName("B")
+                .setEdgeSource(TezEntityDescriptorProto.newBuilder().setClassName("B_C.class"))
+                .setOutputVertexName("C")
+                .setDataMovementType(PlanEdgeDataMovementType.SCATTER_GATHER)
+                .setId("B_C")
+                .setDataSourceType(PlanEdgeDataSourceType.PERSISTED)
+                .setSchedulingType(PlanEdgeSchedulingType.SEQUENTIAL)
+                .build()
+        )
+        .build();
+  }
+
   private void setupVertices() {
     int vCnt = dagPlan.getVertexCount();
     LOG.info("Setting up vertices from dag plan, verticesCnt=" + vCnt);
@@ -2538,6 +2636,17 @@ public class TestVertexImpl {
     }
   }
 
+  private void completeAllTasksSuccessfully(Vertex v) {
+    Assert.assertEquals(VertexState.RUNNING, v.getState());
+    Set<TezTaskID> tasks = v.getTasks().keySet();
+    Assert.assertFalse(tasks.isEmpty());
+    for (TezTaskID task : tasks) {
+      dispatcher.getEventHandler()
+          .handle(new VertexEventTaskCompleted(task, TaskState.SUCCEEDED));
+    }
+    dispatcher.await();
+  }
+
   @Test(timeout = 5000)
   public void testVertexInit() throws AMUserCodeException {
     initAllVertices(VertexState.INITED);
@@ -5883,16 +5992,9 @@ public class TestVertexImpl {
     Assert.assertEquals(VertexState.INITED, vC.getState());
 
     //Send VertexManagerEvent
-    long[] sizes = new long[]{(100 * 1000l * 1000l)};
-    Event vmEvent = getVertexManagerEvent(sizes, 1060000000, "B");
-
-    TezTaskAttemptID taId = TezTaskAttemptID.getInstance(
-        TezTaskID.getInstance(vC.getVertexId(), 1), 1);
-    EventMetaData sourceInfo = new EventMetaData(EventProducerConsumerType.INPUT, "B", "C",
taId);
-    TezEvent tezEvent = new TezEvent(vmEvent, sourceInfo);
-    dispatcher.getEventHandler().handle(new VertexEventRouteEvent(vC.getVertexId(),
-        Lists.newArrayList(tezEvent)));
-    dispatcher.await();
+    long[] sizes = new long[]{(100_000_000L)};
+    Event vmEvent = getVertexManagerEvent(sizes, 1_060_000_000L, vB);
+    sendTaskGeneratedEvent(vmEvent, EventProducerConsumerType.INPUT, vC, vB);
     Assert.assertEquals(VertexState.INITED, vC.getState());
 
     //vB start
@@ -5902,7 +6004,48 @@ public class TestVertexImpl {
 
   }
 
-  VertexManagerEvent getVertexManagerEvent(long[] sizes, long totalSize, String vertexName)
+  @Test(timeout = 5000)
+  public void testVertexGraceParallelism() throws IOException, TezException {
+    setupPreDagCreation();
+    dagPlan = createDAGPlanForGraceParallelism();
+    setupPostDagCreation();
+
+    VertexImpl vA = vertices.get("A");
+    VertexImpl vB = vertices.get("B");
+    VertexImpl vC = vertices.get("C");
+
+    initVertex(vA);
+    Assert.assertEquals(VertexState.INITED, vA.getState());
+    Assert.assertEquals(VertexState.INITED, vB.getState());
+    Assert.assertEquals(VertexState.INITIALIZING, vC.getState());
+
+    long[] sizes = new long[]{(100_000_000L)};
+    Event vmEvent = getVertexManagerEvent(sizes, 1_060_000_000L, vC);
+    sendTaskGeneratedEvent(vmEvent, EventProducerConsumerType.OUTPUT, vB, vC);
+    Assert.assertEquals(VertexState.INITIALIZING, vC.getState());
+
+    startVertex(vA);
+    completeAllTasksSuccessfully(vA);
+    Assert.assertEquals(VertexState.SUCCEEDED, vA.getState());
+    Assert.assertEquals(VertexState.RUNNING, vC.getState());
+  }
+
+  private void sendTaskGeneratedEvent(Event event, EventProducerConsumerType generator,
+                                      Vertex taskVertex, Vertex edgeVertex) {
+    TezTaskAttemptID taId = TezTaskAttemptID.getInstance(
+        TezTaskID.getInstance(taskVertex.getVertexId(), 1), 1);
+    EventMetaData sourceInfo = new EventMetaData(generator,
+        taskVertex.getName(), edgeVertex.getName(), taId);
+    sendVertexEventRouteEvent(taskVertex, new TezEvent(event, sourceInfo));
+  }
+
+  private void sendVertexEventRouteEvent(Vertex sourceVertex, TezEvent... tezEvents) {
+    dispatcher.getEventHandler().handle(new VertexEventRouteEvent(sourceVertex.getVertexId(),
+        Arrays.asList(tezEvents)));
+    dispatcher.await();
+  }
+
+  private VertexManagerEvent getVertexManagerEvent(long[] sizes, long totalSize, Vertex vertex)
       throws IOException {
     ByteBuffer payload = null;
     if (sizes != null) {
@@ -5924,7 +6067,7 @@ public class TestVertexImpl {
               .build().toByteString()
               .asReadOnlyByteBuffer();
     }
-    VertexManagerEvent vmEvent = VertexManagerEvent.create(vertexName, payload);
+    VertexManagerEvent vmEvent = VertexManagerEvent.create(vertex.getName(), payload);
     return vmEvent;
   }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/71ec8564/tez-dag/src/test/java/org/apache/tez/test/GraceShuffleVertexManagerForTest.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/test/GraceShuffleVertexManagerForTest.java
b/tez-dag/src/test/java/org/apache/tez/test/GraceShuffleVertexManagerForTest.java
new file mode 100644
index 0000000..40a6bd3
--- /dev/null
+++ b/tez-dag/src/test/java/org/apache/tez/test/GraceShuffleVertexManagerForTest.java
@@ -0,0 +1,159 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.test;
+
+import com.google.common.base.Preconditions;
+import com.google.protobuf.ByteString;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.common.TezUtils;
+import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.tez.dag.api.VertexManagerPluginContext;
+import org.apache.tez.dag.api.event.VertexState;
+import org.apache.tez.dag.api.event.VertexStateUpdate;
+import org.apache.tez.dag.library.vertexmanager.ShuffleVertexManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.EnumSet;
+
+/**
+ * A shuffle vertex manager that will set the vertex's parallelism upon
+ * completion of its single grandparent simulating a very simplified
+ * version of PigGraceShuffleVertexManager for testing purposes.
+ *
+ * This manager plugin should only be used for vertices that have a single
+ * grandparent.
+ */
+public final class GraceShuffleVertexManagerForTest extends ShuffleVertexManager {
+
+  private static final Logger logger = LoggerFactory.getLogger(GraceShuffleVertexManagerForTest.class);
+
+  private GraceConf graceConf;
+  private boolean isParallelismSet = false;
+
+  public GraceShuffleVertexManagerForTest(VertexManagerPluginContext context) {
+    super(context);
+  }
+
+  @Override
+  public void initialize() {
+    try {
+      Configuration conf = TezUtils.createConfFromUserPayload(getContext().getUserPayload());
+      graceConf = GraceConf.fromConfiguration(conf);
+    } catch (IOException e) {
+      throw new TezUncheckedException(e);
+    }
+    getContext().registerForVertexStateUpdates(graceConf.grandparentVertex,
+        EnumSet.of(VertexState.SUCCEEDED));
+    logger.info("Watching {}", graceConf.grandparentVertex);
+    super.initialize();
+  }
+
+  @Override
+  public synchronized void onVertexStateUpdated(VertexStateUpdate stateUpdate) {
+    logger.info("Received onVertexStateUpdated");
+
+    String vertexName = stateUpdate.getVertexName();
+    VertexState vertexState = stateUpdate.getVertexState();
+
+    Preconditions.checkState(graceConf != null,
+        "Received state notification {} for vertex {} in vertex {} before manager was initialized",
+        vertexState, vertexName, getContext().getVertexName());
+
+    if (!shouldSetParallelism(stateUpdate)) {
+      return;
+    }
+    getContext().reconfigureVertex(graceConf.desiredParallelism, null, null);
+    isParallelismSet = true;
+
+    logger.info("Initialize parallelism for {} to {}",
+        getContext().getVertexName(), graceConf.desiredParallelism);
+  }
+
+  private boolean shouldSetParallelism(VertexStateUpdate update) {
+    return !isParallelismSet &&
+        update.getVertexState().equals(VertexState.SUCCEEDED) &&
+        update.getVertexName().equals(graceConf.grandparentVertex);
+  }
+
+  private static final class GraceConf {
+
+    static final String TEST_GRACE_GRANDPARENT_VERTEX = "test.grace.grandparent-vertex";
+    static final String TEST_GRACE_DESIRED_PARALLELISM = "test.grace.desired-parallelism";
+
+    final String grandparentVertex;
+    final int desiredParallelism;
+
+    GraceConf(GraceConfBuilder builder) {
+      grandparentVertex = builder.grandparentVertex;
+      desiredParallelism = builder.desiredParallelism;
+    }
+
+    static GraceConf fromConfiguration(Configuration conf) {
+      return newConfBuilder()
+          .setGrandparentVertex(conf.get(TEST_GRACE_GRANDPARENT_VERTEX))
+          .setDesiredParallelism(conf.getInt(TEST_GRACE_DESIRED_PARALLELISM, -1))
+          .build();
+    }
+
+    Configuration toConfiguration() {
+      Configuration conf = new Configuration();
+      conf.setStrings(TEST_GRACE_GRANDPARENT_VERTEX, grandparentVertex);
+      conf.setInt(TEST_GRACE_DESIRED_PARALLELISM, desiredParallelism);
+      return conf;
+    }
+  }
+
+  public static GraceConfBuilder newConfBuilder() {
+    return new GraceConfBuilder();
+  }
+
+  public static final class GraceConfBuilder {
+
+    private String grandparentVertex;
+    private int desiredParallelism;
+
+    private GraceConfBuilder() {
+    }
+
+    public GraceConfBuilder setGrandparentVertex(String grandparentVertex) {
+      this.grandparentVertex = grandparentVertex;
+      return this;
+    }
+
+    public GraceConfBuilder setDesiredParallelism(int desiredParallelism) {
+      this.desiredParallelism = desiredParallelism;
+      return this;
+    }
+
+    public ByteString toByteString() throws IOException {
+      return TezUtils.createByteStringFromConf(build().toConfiguration());
+    }
+
+    private GraceConf build() {
+      Preconditions.checkNotNull(grandparentVertex,
+          "Grandparent vertex is required");
+      Preconditions.checkArgument(desiredParallelism > 0,
+          "Desired parallelism must be greater than 0");
+      return new GraceConf(this);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/71ec8564/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
index 6104a1d..c4058c4 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
@@ -508,14 +508,14 @@ public class ShuffleVertexManager extends VertexManagerPlugin {
     }
     pendingStateUpdates.clear();
 
+    // track the tasks in this vertex
+    updatePendingTasks();
+
     for (VertexManagerEvent vmEvent : pendingVMEvents) {
       handleVertexManagerEvent(vmEvent);
     }
     pendingVMEvents.clear();
-    
-    // track the tasks in this vertex
-    updatePendingTasks();
-    
+
     LOG.info("OnVertexStarted vertex: " + getContext().getVertexName() +
              " with " + totalNumBipartiteSourceTasks + " source tasks and " +
              totalTasksToSchedule + " pending tasks");


Mime
View raw message