tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hit...@apache.org
Subject git commit: TEZ-1647. Issue with caching of events in VertexManager::onRootVertexInitialized. (Jeff Zhang via hitesh)
Date Mon, 13 Oct 2014 00:23:09 GMT
Repository: tez
Updated Branches:
  refs/heads/master 6d856f952 -> e0fe9aaea


TEZ-1647. Issue with caching of events in VertexManager::onRootVertexInitialized. (Jeff Zhang
via hitesh)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/e0fe9aae
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/e0fe9aae
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/e0fe9aae

Branch: refs/heads/master
Commit: e0fe9aaeacb43e8aa3fe1d0a5413b26598d076ed
Parents: 6d856f9
Author: Hitesh Shah <hitesh@apache.org>
Authored: Sun Oct 12 17:22:48 2014 -0700
Committer: Hitesh Shah <hitesh@apache.org>
Committed: Sun Oct 12 17:22:48 2014 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../tez/dag/app/dag/impl/VertexManager.java     | 23 +++--
 .../tez/dag/app/dag/impl/TestVertexManager.java | 99 ++++++++++++++++++++
 3 files changed, 114 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/e0fe9aae/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index c07511f..b1b184a 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -20,6 +20,7 @@ ALL CHANGES:
   TEZ-1641. Add debug logs in VertexManager to help debugging custom VertexManagerPlugins
   TEZ-1645. Add support for specifying additional local resources via config.
   TEZ-1646. Add support for augmenting classpath via configs.
+  TEZ-1647. Issue with caching of events in VertexManager::onRootVertexInitialized.
 
 Release 0.5.1: 2014-10-02
 

http://git-wip-us.apache.org/repos/asf/tez/blob/e0fe9aae/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java
index 8579064..8488f85 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java
@@ -20,11 +20,13 @@ package org.apache.tez.dag.app.dag.impl;
 
 import static com.google.common.base.Preconditions.checkNotNull;
 
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
 
 import javax.annotation.Nullable;
 
@@ -59,7 +61,7 @@ import org.apache.tez.runtime.api.impl.EventMetaData.EventProducerConsumerType;
 
 import com.google.common.base.Function;
 import com.google.common.base.Preconditions;
-import com.google.common.collect.Iterables;
+import com.google.common.collect.Collections2;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 
@@ -70,7 +72,7 @@ public class VertexManager {
   VertexManagerPluginContextImpl pluginContext;
   UserPayload payload = null;
   AppContext appContext;
-  ConcurrentHashMap<String, List<TezEvent>> cachedRootInputEventMap;
+  BlockingQueue<TezEvent> rootInputInitEventQueue;
 
   private static final Log LOG = LogFactory.getLog(VertexManager.class);
 
@@ -137,7 +139,7 @@ public class VertexManager {
     public void addRootInputEvents(final String inputName,
         Collection<InputDataInformationEvent> events) {
       verifyIsRootInput(inputName);
-      Iterable<TezEvent> tezEvents = Iterables.transform(events,
+      Collection<TezEvent> tezEvents = Collections2.transform(events,
           new Function<InputDataInformationEvent, TezEvent>() {
             @Override
             public TezEvent apply(InputDataInformationEvent riEvent) {
@@ -151,7 +153,7 @@ public class VertexManager {
         LOG.debug("vertex:" + managedVertex.getName() + "; Added " + events.size() + " for
input " +
                 "name " + inputName);
       }
-      cachedRootInputEventMap.put(inputName,Lists.newArrayList(tezEvents));
+      rootInputInitEventQueue.addAll(tezEvents);
       // Recovery handling is taken care of by the Vertex.
     }
 
@@ -217,7 +219,8 @@ public class VertexManager {
     this.pluginDesc = pluginDesc;
     this.managedVertex = managedVertex;
     this.appContext = appContext;
-    this.cachedRootInputEventMap = new ConcurrentHashMap<String, List<TezEvent>>();
+    // don't specify the size of rootInputInitEventQueue, otherwise it will fail when addAll
+    this.rootInputInitEventQueue = new LinkedBlockingQueue<TezEvent>();
   }
 
   public VertexManagerPlugin getPlugin() {
@@ -268,9 +271,11 @@ public class VertexManager {
       InputDescriptor inputDescriptor, List<Event> events) {
     plugin.onRootVertexInitialized(inputName, inputDescriptor, events);
     if (LOG.isDebugEnabled()) {
-      LOG.debug("vertex:" + managedVertex.getName() + "; For input name "
-          + inputName + " task events size is " + cachedRootInputEventMap.get(inputName).size());
+      LOG.debug("vertex:" + managedVertex.getName() + "; after call of VertexManagerPlugin.onRootVertexInitialized"
+          + " on input:" + inputName + ", current task events size is " + rootInputInitEventQueue.size());
     }
-    return cachedRootInputEventMap.get(inputName);
+    List<TezEvent> resultEvents = new ArrayList<TezEvent>();
+    rootInputInitEventQueue.drainTo(resultEvents);
+    return resultEvents;
   }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/e0fe9aae/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexManager.java
b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexManager.java
index b3e66bc..b1825a5 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexManager.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexManager.java
@@ -25,18 +25,28 @@ import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
+import java.util.HashMap;
+import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.Map;
+import java.util.Set;
 
 import org.apache.tez.dag.api.InputDescriptor;
+import org.apache.tez.dag.api.VertexManagerPlugin;
+import org.apache.tez.dag.api.VertexManagerPluginContext;
 import org.apache.tez.dag.api.VertexManagerPluginDescriptor;
 import org.apache.tez.dag.app.AppContext;
 import org.apache.tez.dag.app.dag.Vertex;
 import org.apache.tez.runtime.api.Event;
 import org.apache.tez.runtime.api.events.InputDataInformationEvent;
+import org.apache.tez.runtime.api.events.VertexManagerEvent;
 import org.apache.tez.runtime.api.impl.TezEvent;
 import org.junit.Test;
 
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+
 public class TestVertexManager {
 
   @Test
@@ -73,4 +83,93 @@ public class TestVertexManager {
     assertEquals(tezEvents2.size(), 1);
     assertEquals(diEvent2, tezEvents2.get(0).getEvent());
   }
+
+  /**
+   * TEZ-1647
+   * custom vertex manager generates events only when both i1 and i2 are initialized.
+   */
+  @Test
+  public void testOnRootVertexInitialized2() {
+    Vertex mockVertex = mock(Vertex.class, RETURNS_DEEP_STUBS);
+    AppContext mockAppContext = mock(AppContext.class, RETURNS_DEEP_STUBS);
+    doReturn("vertex1").when(mockVertex).getName();
+    when(
+        mockAppContext.getCurrentDAG().getVertex(any(String.class))
+            .getTotalTasks()).thenReturn(1);
+
+    VertexManager vm =
+        new VertexManager(
+            VertexManagerPluginDescriptor.create(CustomVertexManager.class
+                .getName()), mockVertex, mockAppContext);
+    vm.initialize();
+    InputDescriptor id1 = mock(InputDescriptor.class);
+    List<Event> events1 = new LinkedList<Event>();
+    InputDataInformationEvent diEvent1 =
+        InputDataInformationEvent.createWithSerializedPayload(0, null);
+    events1.add(diEvent1);
+
+    // do not call context.addRootInputEvents, just cache the TezEvent
+    List<TezEvent> tezEventsAfterInput1 = vm.onRootVertexInitialized("input1", id1,
events1);
+    assertEquals(0, tezEventsAfterInput1.size());
+
+    InputDescriptor id2 = mock(InputDescriptor.class);
+    List<Event> events2 = new LinkedList<Event>();
+    InputDataInformationEvent diEvent2 =
+        InputDataInformationEvent.createWithSerializedPayload(0, null);
+    events2.add(diEvent2);
+    // call context.addRootInputEvents(input1), context.addRootInputEvents(input2)
+    List<TezEvent> tezEventsAfterInput2 =
+        vm.onRootVertexInitialized("input2", id2, events2);
+    assertEquals(2, tezEventsAfterInput2.size());
+
+    // also verify the EventMetaData
+    Set<String> edgeVertexSet = new HashSet<String>();
+    for (TezEvent tezEvent : tezEventsAfterInput2) {
+      edgeVertexSet.add(tezEvent.getDestinationInfo().getEdgeVertexName());
+    }
+    assertEquals(Sets.newHashSet("input1","input2"), edgeVertexSet);
+  }
+
+  public static class CustomVertexManager extends VertexManagerPlugin {
+
+    private Map<String,List<Event>> cachedEventMap = new HashMap<String, List<Event>>();
+
+    public CustomVertexManager(VertexManagerPluginContext context) {
+      super(context);
+    }
+
+    @Override
+    public void initialize() {
+    }
+
+    @Override
+    public void onVertexStarted(Map<String, List<Integer>> completions) {
+    }
+
+    @Override
+    public void onSourceTaskCompleted(String srcVertexName, Integer taskId) {
+    }
+
+    @Override
+    public void onVertexManagerEventReceived(VertexManagerEvent vmEvent) {
+    }
+
+    /**
+     * only addRootInputEvents when it is "input2", otherwise just cache it.
+     */
+    @Override
+    public void onRootVertexInitialized(String inputName,
+        InputDescriptor inputDescriptor, List<Event> events) {
+      cachedEventMap.put(inputName, events);
+      if (inputName.equals("input2")) {
+        for (Map.Entry<String, List<Event>> entry : cachedEventMap.entrySet())
{
+          List<InputDataInformationEvent> riEvents = Lists.newLinkedList();
+          for (Event event : events) {
+            riEvents.add((InputDataInformationEvent)event);
+          }
+          getContext().addRootInputEvents(entry.getKey(), riEvents);
+        }
+      }
+    }
+  }
 }


Mime
View raw message