tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ss...@apache.org
Subject tez git commit: TEZ-2855. Fix a potential NPE while routing VertexManager events. (sseth)
Date Thu, 01 Oct 2015 00:09:17 GMT
Repository: tez
Updated Branches:
  refs/heads/branch-0.6 3875a0409 -> 6751a67f2


TEZ-2855. Fix a potential NPE while routing VertexManager events. (sseth)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/6751a67f
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/6751a67f
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/6751a67f

Branch: refs/heads/branch-0.6
Commit: 6751a67f2051f44932c8fb5b80f0729e37dae7b1
Parents: 3875a04
Author: Siddharth Seth <sseth@apache.org>
Authored: Wed Sep 30 17:09:01 2015 -0700
Committer: Siddharth Seth <sseth@apache.org>
Committed: Wed Sep 30 17:09:01 2015 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../apache/tez/dag/app/dag/impl/VertexImpl.java |  24 ++-
 .../tez/dag/app/dag/impl/TestVertexImpl.java    | 181 ++++++++++++++++++-
 3 files changed, 202 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/6751a67f/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 768db48..4fdcb66 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -6,6 +6,7 @@ Release 0.6.3: Unreleased
 INCOMPATIBLE CHANGES
 
 ALL CHANGES:
+  TEZ-2855. Fix a potential NPE while routing VertexManager events.
   TEZ-2716. DefaultSorter.isRleNeeded not thread safe.
   TEZ-2758. Remove append API in RecoveryService after TEZ-1909.
   TEZ-2851. Support a way for upstream applications to pass in a caller context to Tez.

http://git-wip-us.apache.org/repos/asf/tez/blob/6751a67f/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index ca91bb9..737b508 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -256,6 +256,9 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
 
   @VisibleForTesting
   final List<TezEvent> pendingInitializerEvents = new LinkedList<TezEvent>();
+
+  @VisibleForTesting
+  final List<VertexManagerEvent> pendingVmEvents = new LinkedList<VertexManagerEvent>();
   
   LegacySpeculator speculator;
 
@@ -643,8 +646,8 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
   private final ProcessorDescriptor processorDescriptor;
   
   private boolean vertexToBeReconfiguredByManager = false;
-  AtomicBoolean vmIsInitialized = new AtomicBoolean(false);
-  AtomicBoolean completelyConfiguredSent = new AtomicBoolean(false);
+  final AtomicBoolean vmIsInitialized = new AtomicBoolean(false);
+  final AtomicBoolean completelyConfiguredSent = new AtomicBoolean(false);
 
   @VisibleForTesting
   Map<Vertex, Edge> sourceVertices;
@@ -2114,6 +2117,16 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
     try {
       vertexManager.initialize();
       vmIsInitialized.set(true);
+      if (!pendingVmEvents.isEmpty()) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Processing: " + pendingVmEvents.size() + " pending VMEvents for Vertex:
" +
+              logIdentifier);
+        }
+        for (VertexManagerEvent vmEvent : pendingVmEvents) {
+          vertexManager.onVertexManagerEventReceived(vmEvent);
+        }
+        pendingVmEvents.clear();
+      }
     } catch (AMUserCodeException e) {
       String msg = "Exception in " + e.getSource()+ ", vertex:" + logIdentifier;
       LOG.error(msg, e);
@@ -3820,7 +3833,12 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
         Preconditions.checkArgument(target != null,
             "Event sent to unkown vertex: " + vmEvent.getTargetVertexName());
         if (target == vertex) {
-          vertex.vertexManager.onVertexManagerEventReceived(vmEvent);
+          if (!vertex.vmIsInitialized.get()) {
+            // The VM hasn't been setup yet, defer event consumption
+            vertex.pendingVmEvents.add(vmEvent);
+          } else {
+            vertex.vertexManager.onVertexManagerEventReceived(vmEvent);
+          }
         } else {
           checkEventSourceMetadata(vertex, sourceMeta);
           vertex.eventHandler.handle(new VertexEventRouteEvent(target

http://git-wip-us.apache.org/repos/asf/tez/blob/6751a67f/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
index 66a18b3..befcac2 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
@@ -20,6 +20,7 @@ package org.apache.tez.dag.app.dag.impl;
 
 import java.nio.ByteBuffer;
 
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyInt;
@@ -40,6 +41,7 @@ import java.util.List;
 import java.util.Locale;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.ReentrantLock;
 
@@ -406,7 +408,92 @@ public class TestVertexImpl {
         .build();
     return dag;
   }
-  
+
+  // Simple dag with a CountingVM on v3 (which has v1, v2 as inputs)
+  // v1, v2 -> v3
+  private DAGPlan createDAGPlanWithCountingVM() {
+    LOG.info("Setting up dag plan with coutning VertexManager");
+    DAGPlan dag = DAGPlan.newBuilder()
+        .setName("dagWithCountingVM")
+        .addVertex(
+            VertexPlan.newBuilder()
+                .setName("vertex1")
+                .setType(PlanVertexType.NORMAL)
+                .setTaskConfig(
+                    PlanTaskConfiguration.newBuilder()
+                        .setNumTasks(1)
+                        .setVirtualCores(4)
+                        .setMemoryMb(1024)
+                        .setJavaOpts("")
+                        .setTaskModule("x1.y1")
+                        .build()
+                )
+                .addOutEdgeId("e1")
+                .build()
+        )
+        .addVertex(
+            VertexPlan.newBuilder()
+                .setName("vertex2")
+                .setType(PlanVertexType.NORMAL)
+                .setTaskConfig(
+                    PlanTaskConfiguration.newBuilder()
+                        .setNumTasks(1)
+                        .setVirtualCores(4)
+                        .setMemoryMb(1024)
+                        .setJavaOpts("")
+                        .setTaskModule("x2.y2")
+                        .build()
+                )
+                .addOutEdgeId("e2")
+                .build()
+        )
+        .addVertex(
+            VertexPlan.newBuilder()
+                .setName("vertex3")
+                .setType(PlanVertexType.NORMAL)
+                .setTaskConfig(
+                    PlanTaskConfiguration.newBuilder()
+                        .setNumTasks(1)
+                        .setVirtualCores(4)
+                        .setMemoryMb(1024)
+                        .setJavaOpts("")
+                        .setTaskModule("x3.y3")
+                        .build()
+                )
+                .addInEdgeId("e1")
+                .addInEdgeId("e2")
+                .setVertexManagerPlugin(TezEntityDescriptorProto.newBuilder()
+                    .setClassName(InvocationCountingVertexManager.class.getName()))
+                .build()
+        )
+        .addEdge(
+            EdgePlan.newBuilder()
+                .setEdgeDestination(TezEntityDescriptorProto.newBuilder().setClassName("v1_v2"))
+                .setInputVertexName("vertex1")
+                .setEdgeSource(TezEntityDescriptorProto.newBuilder().setClassName("o2"))
+                .setOutputVertexName("vertex3")
+                .setDataMovementType(PlanEdgeDataMovementType.BROADCAST)
+                .setId("e1")
+                .setDataSourceType(PlanEdgeDataSourceType.PERSISTED)
+                .setSchedulingType(PlanEdgeSchedulingType.SEQUENTIAL)
+                .build()
+        )
+        .addEdge(
+            EdgePlan.newBuilder()
+                .setEdgeDestination(TezEntityDescriptorProto.newBuilder().setClassName("v1_v3"))
+                .setInputVertexName("vertex2")
+                .setEdgeSource(TezEntityDescriptorProto.newBuilder().setClassName("o2"))
+                .setOutputVertexName("vertex3")
+                .setDataMovementType(PlanEdgeDataMovementType.BROADCAST)
+                .setId("e2")
+                .setDataSourceType(PlanEdgeDataSourceType.PERSISTED)
+                .setSchedulingType(PlanEdgeSchedulingType.SEQUENTIAL)
+                .build()
+        )
+        .build();
+    return dag;
+  }
+
   /**
    * v1 -> v2
    */
@@ -5218,6 +5305,64 @@ public class TestVertexImpl {
 
   @SuppressWarnings("unchecked")
   @Test(timeout = 5000)
+  public void testVMEventBeforeVertexInitialized() throws Exception {
+    useCustomInitializer = true;
+    setupPreDagCreation();
+    dagPlan = createDAGPlanWithCountingVM();
+    setupPostDagCreation();
+
+    VertexImpl v1 = vertices.get("vertex1");
+    VertexImpl v2 = vertices.get("vertex2");
+    VertexImpl v3 = vertices.get("vertex3");
+    dispatcher.getEventHandler().handle(new VertexEvent(v1.getVertexId(),
+        VertexEventType.V_INIT));
+    dispatcher.await();
+    assertEquals(VertexState.INITED, v1.getState());
+    dispatcher.getEventHandler().handle(new VertexEvent(v1.getVertexId(), VertexEventType.V_START));
+    dispatcher.await();
+    assertEquals(VertexState.RUNNING, v1.getState());
+
+    assertEquals(VertexState.NEW, v3.getState());
+    // Generate a VM event for v1, targeted at v3
+    VertexManagerEvent vmEvent = VertexManagerEvent.create("vertex3", ByteBuffer.wrap(new
byte[0]));
+    TezEvent tezVmEvent = new TezEvent(vmEvent,
+        new EventMetaData(EventProducerConsumerType.OUTPUT, "vertex1", null,
+            TezTaskAttemptID.getInstance(
+                TezTaskID.getInstance(v1.getVertexId(), 1), 1)));
+    dispatcher.getEventHandler()
+        .handle(new VertexEventRouteEvent(v1.getVertexId(), Collections.singletonList(tezVmEvent)));
+    dispatcher.await();
+
+    assertEquals(1, v3.pendingVmEvents.size());
+    assertEquals(0, InvocationCountingVertexManager.numVmEventsReceived.get());
+
+    // Initialize v2, which will trigger initialization of v3
+    dispatcher.getEventHandler().handle(new VertexEvent(v2.getVertexId(),
+        VertexEventType.V_INIT));
+    dispatcher.await();
+
+    assertEquals(VertexState.INITED, v3.getState());
+
+    // The VM event should have been processed.
+    assertEquals(0, v3.pendingVmEvents.size());
+    assertEquals(1, InvocationCountingVertexManager.numVmEventsReceived.get());
+
+    // Send another VM event - make sure it's processed without additional events.
+    vmEvent = VertexManagerEvent.create("vertex3", ByteBuffer.wrap(new byte[0]));
+    tezVmEvent = new TezEvent(vmEvent,
+        new EventMetaData(EventProducerConsumerType.OUTPUT, "vertex1", null,
+            TezTaskAttemptID.getInstance(
+                TezTaskID.getInstance(v1.getVertexId(), 1), 2)));
+    dispatcher.getEventHandler()
+        .handle(new VertexEventRouteEvent(v1.getVertexId(), Collections.singletonList(tezVmEvent)));
+    dispatcher.await();
+
+    assertEquals(0, v3.pendingVmEvents.size());
+    assertEquals(2, InvocationCountingVertexManager.numVmEventsReceived.get());
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test(timeout = 5000)
   public void testExceptionFromVM_Initialize() throws AMUserCodeException {
     useCustomInitializer = true;
     setupPreDagCreation();
@@ -5693,6 +5838,40 @@ public class TestVertexImpl {
     }
   }
 
+  public static class InvocationCountingVertexManager extends VertexManagerPlugin {
+
+    static final AtomicInteger numVmEventsReceived = new AtomicInteger(0);
+    static final AtomicInteger numInitializedInputs = new AtomicInteger(0);
+
+    public InvocationCountingVertexManager(VertexManagerPluginContext context) {
+      super(context);
+    }
+
+    @Override
+    public void initialize() throws Exception {
+    }
+
+    @Override
+    public void onVertexStarted(Map<String, List<Integer>> completions) throws
Exception {
+    }
+
+    @Override
+    public void onSourceTaskCompleted(String srcVertexName, Integer taskId) throws Exception
{
+
+    }
+
+    @Override
+    public void onVertexManagerEventReceived(VertexManagerEvent vmEvent) throws Exception
{
+      numVmEventsReceived.incrementAndGet();
+    }
+
+    @Override
+    public void onRootVertexInitialized(String inputName, InputDescriptor inputDescriptor,
+                                        List<Event> events) throws Exception {
+      numInitializedInputs.incrementAndGet();
+    }
+  }
+
   @InterfaceAudience.Private
   public static class VertexManagerWithException extends RootInputVertexManager{
 


Mime
View raw message