tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ss...@apache.org
Subject tez git commit: TEZ-2855. Fix a potential NPE while routing VertexManager events. (sseth)
Date Thu, 01 Oct 2015 00:08:26 GMT
Repository: tez
Updated Branches:
  refs/heads/branch-0.7 98a72e5aa -> a7b7ed7fe


TEZ-2855. Fix a potential NPE while routing VertexManager events.  (sseth)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/a7b7ed7f
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/a7b7ed7f
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/a7b7ed7f

Branch: refs/heads/branch-0.7
Commit: a7b7ed7fe2f125d231458da7bcc1de6bed3efd61
Parents: 98a72e5
Author: Siddharth Seth <sseth@apache.org>
Authored: Wed Sep 30 17:08:15 2015 -0700
Committer: Siddharth Seth <sseth@apache.org>
Committed: Wed Sep 30 17:08:15 2015 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |   2 +
 .../apache/tez/dag/app/dag/impl/VertexImpl.java |  24 ++-
 .../tez/dag/app/dag/impl/TestVertexImpl.java    | 180 ++++++++++++++++++-
 3 files changed, 202 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/a7b7ed7f/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index c55f4bf..614f67c 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -6,6 +6,7 @@ Release 0.7.1: Unreleased
 INCOMPATIBLE CHANGES
 
 ALL CHANGES
+  TEZ-2855. Fix a potential NPE while routing VertexManager events.
   TEZ-2758. Remove append API in RecoveryService after TEZ-1909.
   TEZ-2851. Support a way for upstream applications to pass in a caller context to Tez.
   TEZ-2858. Stop using System.currentTimeMillis in TestInputReadyTracker.
@@ -283,6 +284,7 @@ Release 0.6.3: Unreleased
 INCOMPATIBLE CHANGES
 
 ALL CHANGES:
+  TEZ-2855. Fix a potential NPE while routing VertexManager events.
   TEZ-2716. DefaultSorter.isRleNeeded not thread safe
   TEZ-2758. Remove append API in RecoveryService after TEZ-1909.
   TEZ-2851. Support a way for upstream applications to pass in a caller context to Tez.

http://git-wip-us.apache.org/repos/asf/tez/blob/a7b7ed7f/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index 4a8309e..2f81a50 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -277,6 +277,9 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
EventHandl
 
   @VisibleForTesting
   final List<TezEvent> pendingInitializerEvents = new LinkedList<TezEvent>();
+
+  @VisibleForTesting
+  final List<VertexManagerEvent> pendingVmEvents = new LinkedList<VertexManagerEvent>();
   
   LegacySpeculator speculator;
 
@@ -710,8 +713,8 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
EventHandl
   private final ProcessorDescriptor processorDescriptor;
   
   private boolean vertexToBeReconfiguredByManager = false;
-  AtomicBoolean vmIsInitialized = new AtomicBoolean(false);
-  AtomicBoolean completelyConfiguredSent = new AtomicBoolean(false);
+  final AtomicBoolean vmIsInitialized = new AtomicBoolean(false);
+  final AtomicBoolean completelyConfiguredSent = new AtomicBoolean(false);
 
   @VisibleForTesting
   Map<Vertex, Edge> sourceVertices;
@@ -2429,6 +2432,16 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
EventHandl
     try {
       vertexManager.initialize();
       vmIsInitialized.set(true);
+      if (!pendingVmEvents.isEmpty()) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Processing: " + pendingVmEvents.size() + " pending VMEvents for Vertex:
" +
+              logIdentifier);
+        }
+        for (VertexManagerEvent vmEvent : pendingVmEvents) {
+          vertexManager.onVertexManagerEventReceived(vmEvent);
+        }
+        pendingVmEvents.clear();
+      }
     } catch (AMUserCodeException e) {
       String msg = "Exception in " + e.getSource()+ ", vertex:" + logIdentifier;
       LOG.error(msg, e);
@@ -4356,7 +4369,12 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
EventHandl
         Preconditions.checkArgument(target != null,
             "Event sent to unkown vertex: " + vmEvent.getTargetVertexName());
         if (target == this) {
-          vertexManager.onVertexManagerEventReceived(vmEvent);
+          if (!vmIsInitialized.get()) {
+            // The VM hasn't been setup yet, defer event consumption
+            pendingVmEvents.add(vmEvent);
+          } else {
+            vertexManager.onVertexManagerEventReceived(vmEvent);
+          }
         } else {
           checkEventSourceMetadata(this, sourceMeta);
           eventHandler.handle(new VertexEventRouteEvent(target

http://git-wip-us.apache.org/repos/asf/tez/blob/a7b7ed7f/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
index f65ecab..e6387b3 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
@@ -20,6 +20,7 @@ package org.apache.tez.dag.app.dag.impl;
 
 import java.nio.ByteBuffer;
 
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyInt;
@@ -41,6 +42,7 @@ import java.util.Locale;
 import java.util.Map;
 import java.util.concurrent.Callable;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.ReentrantLock;
 
@@ -423,7 +425,92 @@ public class TestVertexImpl {
         .build();
     return dag;
   }
-  
+
+  // Simple dag with a CountingVM on v3 (which has v1, v2 as inputs)
+  // v1, v2 -> v3
+  private DAGPlan createDAGPlanWithCountingVM() {
+    LOG.info("Setting up dag plan with coutning VertexManager");
+    DAGPlan dag = DAGPlan.newBuilder()
+        .setName("dagWithCountingVM")
+        .addVertex(
+            VertexPlan.newBuilder()
+                .setName("vertex1")
+                .setType(PlanVertexType.NORMAL)
+                .setTaskConfig(
+                    PlanTaskConfiguration.newBuilder()
+                        .setNumTasks(1)
+                        .setVirtualCores(4)
+                        .setMemoryMb(1024)
+                        .setJavaOpts("")
+                        .setTaskModule("x1.y1")
+                        .build()
+                )
+                .addOutEdgeId("e1")
+                .build()
+        )
+        .addVertex(
+            VertexPlan.newBuilder()
+                .setName("vertex2")
+                .setType(PlanVertexType.NORMAL)
+                .setTaskConfig(
+                    PlanTaskConfiguration.newBuilder()
+                        .setNumTasks(1)
+                        .setVirtualCores(4)
+                        .setMemoryMb(1024)
+                        .setJavaOpts("")
+                        .setTaskModule("x2.y2")
+                        .build()
+                )
+                .addOutEdgeId("e2")
+                .build()
+        )
+        .addVertex(
+            VertexPlan.newBuilder()
+                .setName("vertex3")
+                .setType(PlanVertexType.NORMAL)
+                .setTaskConfig(
+                    PlanTaskConfiguration.newBuilder()
+                        .setNumTasks(1)
+                        .setVirtualCores(4)
+                        .setMemoryMb(1024)
+                        .setJavaOpts("")
+                        .setTaskModule("x3.y3")
+                        .build()
+                )
+                .addInEdgeId("e1")
+                .addInEdgeId("e2")
+                .setVertexManagerPlugin(TezEntityDescriptorProto.newBuilder()
+                    .setClassName(InvocationCountingVertexManager.class.getName()))
+                .build()
+        )
+        .addEdge(
+            EdgePlan.newBuilder()
+                .setEdgeDestination(TezEntityDescriptorProto.newBuilder().setClassName("v1_v2"))
+                .setInputVertexName("vertex1")
+                .setEdgeSource(TezEntityDescriptorProto.newBuilder().setClassName("o2"))
+                .setOutputVertexName("vertex3")
+                .setDataMovementType(PlanEdgeDataMovementType.BROADCAST)
+                .setId("e1")
+                .setDataSourceType(PlanEdgeDataSourceType.PERSISTED)
+                .setSchedulingType(PlanEdgeSchedulingType.SEQUENTIAL)
+                .build()
+        )
+        .addEdge(
+            EdgePlan.newBuilder()
+                .setEdgeDestination(TezEntityDescriptorProto.newBuilder().setClassName("v1_v3"))
+                .setInputVertexName("vertex2")
+                .setEdgeSource(TezEntityDescriptorProto.newBuilder().setClassName("o2"))
+                .setOutputVertexName("vertex3")
+                .setDataMovementType(PlanEdgeDataMovementType.BROADCAST)
+                .setId("e2")
+                .setDataSourceType(PlanEdgeDataSourceType.PERSISTED)
+                .setSchedulingType(PlanEdgeSchedulingType.SEQUENTIAL)
+                .build()
+        )
+        .build();
+    return dag;
+  }
+
   /**
    * v1 -> v2
    */
@@ -5607,6 +5694,63 @@ public class TestVertexImpl {
   }
 
   @Test(timeout = 5000)
+  public void testVMEventBeforeVertexInitialized() throws Exception {
+    useCustomInitializer = true;
+    setupPreDagCreation();
+    dagPlan = createDAGPlanWithCountingVM();
+    setupPostDagCreation();
+
+    VertexImpl v1 = vertices.get("vertex1");
+    VertexImpl v2 = vertices.get("vertex2");
+    VertexImpl v3 = vertices.get("vertex3");
+    dispatcher.getEventHandler().handle(new VertexEvent(v1.getVertexId(),
+        VertexEventType.V_INIT));
+    dispatcher.await();
+    assertEquals(VertexState.INITED, v1.getState());
+    dispatcher.getEventHandler().handle(new VertexEvent(v1.getVertexId(), VertexEventType.V_START));
+    dispatcher.await();
+    assertEquals(VertexState.RUNNING, v1.getState());
+
+    assertEquals(VertexState.NEW, v3.getState());
+    // Generate a VM event for v1, targeted at v3
+    VertexManagerEvent vmEvent = VertexManagerEvent.create("vertex3", ByteBuffer.wrap(new
byte[0]));
+    TezEvent tezVmEvent = new TezEvent(vmEvent,
+        new EventMetaData(EventProducerConsumerType.OUTPUT, "vertex1", null,
+            TezTaskAttemptID.getInstance(
+                TezTaskID.getInstance(v1.getVertexId(), 1), 1)));
+    dispatcher.getEventHandler()
+        .handle(new VertexEventRouteEvent(v1.getVertexId(), Collections.singletonList(tezVmEvent)));
+    dispatcher.await();
+
+    assertEquals(1, v3.pendingVmEvents.size());
+    assertEquals(0, InvocationCountingVertexManager.numVmEventsReceived.get());
+
+    // Initialize v2, which will trigger initialization of v3
+    dispatcher.getEventHandler().handle(new VertexEvent(v2.getVertexId(),
+        VertexEventType.V_INIT));
+    dispatcher.await();
+
+    assertEquals(VertexState.INITED, v3.getState());
+
+    // The VM event should have been processed.
+    assertEquals(0, v3.pendingVmEvents.size());
+    assertEquals(1, InvocationCountingVertexManager.numVmEventsReceived.get());
+
+    // Send another VM event - make sure it's processed without additional events.
+    vmEvent = VertexManagerEvent.create("vertex3", ByteBuffer.wrap(new byte[0]));
+    tezVmEvent = new TezEvent(vmEvent,
+        new EventMetaData(EventProducerConsumerType.OUTPUT, "vertex1", null,
+            TezTaskAttemptID.getInstance(
+                TezTaskID.getInstance(v1.getVertexId(), 1), 2)));
+    dispatcher.getEventHandler()
+        .handle(new VertexEventRouteEvent(v1.getVertexId(), Collections.singletonList(tezVmEvent)));
+    dispatcher.await();
+
+    assertEquals(0, v3.pendingVmEvents.size());
+    assertEquals(2, InvocationCountingVertexManager.numVmEventsReceived.get());
+  }
+
+  @Test(timeout = 5000)
   public void testExceptionFromVM_Initialize() throws TezException {
     useCustomInitializer = true;
     setupPreDagCreation();
@@ -6075,6 +6219,40 @@ public class TestVertexImpl {
     }
   }
 
+  public static class InvocationCountingVertexManager extends VertexManagerPlugin {
+
+    static final AtomicInteger numVmEventsReceived = new AtomicInteger(0);
+    static final AtomicInteger numInitializedInputs = new AtomicInteger(0);
+
+    public InvocationCountingVertexManager(VertexManagerPluginContext context) {
+      super(context);
+    }
+
+    @Override
+    public void initialize() throws Exception {
+    }
+
+    @Override
+    public void onVertexStarted(Map<String, List<Integer>> completions) throws
Exception {
+    }
+
+    @Override
+    public void onSourceTaskCompleted(String srcVertexName, Integer taskId) throws Exception
{
+
+    }
+
+    @Override
+    public void onVertexManagerEventReceived(VertexManagerEvent vmEvent) throws Exception
{
+      numVmEventsReceived.incrementAndGet();
+    }
+
+    @Override
+    public void onRootVertexInitialized(String inputName, InputDescriptor inputDescriptor,
+                                        List<Event> events) throws Exception {
+      numInitializedInputs.incrementAndGet();
+    }
+  }
+
   @InterfaceAudience.Private
   public static class VertexManagerWithException extends RootInputVertexManager{
 


Mime
View raw message