tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bi...@apache.org
Subject tez git commit: TEZ-2330. Create reconfigureVertex() API for input based initialization (bikas)
Date Wed, 22 Apr 2015 02:24:00 GMT
Repository: tez
Updated Branches:
  refs/heads/master c6e400e2d -> ec45c510c


TEZ-2330. Create reconfigureVertex() API for input based initialization (bikas)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/ec45c510
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/ec45c510
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/ec45c510

Branch: refs/heads/master
Commit: ec45c510c04eead59799813521f5ce0c6868960f
Parents: c6e400e
Author: Bikas Saha <bikas@apache.org>
Authored: Tue Apr 21 19:24:00 2015 -0700
Committer: Bikas Saha <bikas@apache.org>
Committed: Tue Apr 21 19:24:00 2015 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../tez/dag/api/VertexManagerPluginContext.java | 26 ++++++++++++++++++++
 .../java/org/apache/tez/dag/app/dag/Vertex.java |  4 +++
 .../app/dag/impl/RootInputVertexManager.java    |  9 +++++--
 .../apache/tez/dag/app/dag/impl/VertexImpl.java | 12 ++++++---
 .../tez/dag/app/dag/impl/VertexManager.java     | 12 +++++++++
 .../tez/dag/app/dag/impl/TestVertexImpl.java    |  2 +-
 7 files changed, 59 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/ec45c510/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 0c83c08..35cf312 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -9,6 +9,7 @@ INCOMPATIBLE CHANGES
   TEZ-1993. Implement a pluggable InputSizeEstimator for grouping fairly
 
 ALL CHANGES:
+  TEZ-2330. Create reconfigureVertex() API for input based initialization
   TEZ-2292. Add e2e test for error reporting when vertex manager invokes
   plugin APIs
   TEZ-2308. Add set/get of record counts in task/vertex statistics

http://git-wip-us.apache.org/repos/asf/tez/blob/ec45c510/tez-api/src/main/java/org/apache/tez/dag/api/VertexManagerPluginContext.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/VertexManagerPluginContext.java
b/tez-api/src/main/java/org/apache/tez/dag/api/VertexManagerPluginContext.java
index 8b0e89e..345ea43 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/VertexManagerPluginContext.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/VertexManagerPluginContext.java
@@ -147,6 +147,7 @@ public interface VertexManagerPluginContext {
    * @param rootInputSpecUpdate Updated Root Input specifications, if any.
    *        If none specified, a default of 1 physical input is used
    */
+  @Deprecated
   public void setVertexParallelism(int parallelism,
       @Nullable VertexLocationHint locationHint,
       @Nullable Map<String, EdgeManagerPluginDescriptor> sourceEdgeManagers,
@@ -178,6 +179,31 @@ public interface VertexManagerPluginContext {
       @Nullable Map<String, EdgeProperty> sourceEdgeProperties);
 
   /**
+   * API to reconfigure a {@link Vertex} that is reading root inputs based on
+   * the data read from the root inputs. Root inputs are external data sources
+   * that provide the initial data for the DAG and are added to the
+   * {@link Vertex} using the
+   * {@link Vertex#addDataSource(String, DataSourceDescriptor)} API. Typically,
+   * the parallelism of such vertices is determined at runtime by gathering
+   * information about the data source. This API may be used to set the
+   * parallelism of the vertex at runtime based on the data sources, as well as
+   * changing the specification for those inputs.
+   * @param rootInputSpecUpdate
+   *          The key of the map is the name of the data source and the value is
+   *          the updated {@link InputSpecUpdate} for that data source. If none
+   *          specified, a default value is used. See {@link InputSpecUpdate}
+   *          for details.
+   * @param locationHint
+   *          the placement policy for tasks specified at
+   *          {@link VertexLocationHint}s
+   * @param parallelism
+   *          New number of tasks in the vertex
+   */
+  public void reconfigureVertex(@Nullable Map<String, InputSpecUpdate> rootInputSpecUpdate,
+      @Nullable VertexLocationHint locationHint,
+      int parallelism);
+
+  /**
    * Allows a VertexManagerPlugin to assign Events for Root Inputs
    * 
    * For regular Event Routing changes - the EdgeManager should be configured

http://git-wip-us.apache.org/repos/asf/tez/blob/ec45c510/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
index 6c85b85..77ef6e0 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
@@ -101,6 +101,10 @@ public interface Vertex extends Comparable<Vertex> {
   public void reconfigureVertex(int parallelism,
       @Nullable VertexLocationHint locationHint,
       @Nullable Map<String, EdgeProperty> sourceEdgeProperties) throws AMUserCodeException;
+  
+  public void reconfigureVertex(@Nullable Map<String, InputSpecUpdate> rootInputSpecUpdate,
+      int parallelism,
+      @Nullable VertexLocationHint locationHint) throws AMUserCodeException;
 
   void setVertexLocationHint(VertexLocationHint vertexLocationHint);
   void vertexReconfigurationPlanned();

http://git-wip-us.apache.org/repos/asf/tez/blob/ec45c510/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/RootInputVertexManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/RootInputVertexManager.java
b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/RootInputVertexManager.java
index e850286..c1e96f3 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/RootInputVertexManager.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/RootInputVertexManager.java
@@ -30,12 +30,17 @@ import org.apache.tez.runtime.api.InputSpecUpdate;
 import org.apache.tez.runtime.api.events.InputConfigureVertexTasksEvent;
 import org.apache.tez.runtime.api.events.InputDataInformationEvent;
 import org.apache.tez.runtime.api.events.InputUpdatePayloadEvent;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 
 public class RootInputVertexManager extends ImmediateStartVertexManager {
 
+  private static final Logger LOG = 
+      LoggerFactory.getLogger(RootInputVertexManager.class);
+  
   private String configuredInputName;
 
   public RootInputVertexManager(VertexManagerPluginContext context) {
@@ -66,8 +71,8 @@ public class RootInputVertexManager extends ImmediateStartVertexManager
{
             inputName,
             cEvent.getInputSpecUpdate() == null ? InputSpecUpdate
                 .getDefaultSinglePhysicalInputSpecUpdate() : cEvent.getInputSpecUpdate());
-        getContext().setVertexParallelism(cEvent.getNumTasks(),
-            cEvent.getLocationHint(), null, rootInputSpecUpdate);
+        getContext().reconfigureVertex(rootInputSpecUpdate, cEvent.getLocationHint(),
+              cEvent.getNumTasks());
       }
       if (event instanceof InputUpdatePayloadEvent) {
         // No tasks should have been started yet. Checked by initial state check.

http://git-wip-us.apache.org/repos/asf/tez/blob/ec45c510/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index 5dfcb8e..e22343b 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -24,7 +24,6 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.EnumSet;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.LinkedList;
@@ -82,7 +81,6 @@ import org.apache.tez.dag.api.VertexManagerPluginDescriptor;
 import org.apache.tez.dag.api.client.ProgressBuilder;
 import org.apache.tez.dag.api.client.StatusGetOpts;
 import org.apache.tez.dag.api.client.VertexStatus;
-import org.apache.tez.dag.api.client.VertexStatus.State;
 import org.apache.tez.dag.api.client.VertexStatusBuilder;
 import org.apache.tez.dag.api.event.VertexStateUpdate;
 import org.apache.tez.dag.api.event.VertexStateUpdateParallelismUpdated;
@@ -96,7 +94,6 @@ import org.apache.tez.dag.app.ContainerContext;
 import org.apache.tez.dag.app.TaskAttemptListener;
 import org.apache.tez.dag.app.TaskHeartbeatHandler;
 import org.apache.tez.dag.app.dag.DAG;
-import org.apache.tez.dag.app.dag.DAGState;
 import org.apache.tez.dag.app.dag.RootInputInitializerManager;
 import org.apache.tez.dag.app.dag.StateChangeNotifier;
 import org.apache.tez.dag.app.dag.Task;
@@ -1433,7 +1430,14 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
       @Nullable Map<String, EdgeProperty> sourceEdgeProperties) throws AMUserCodeException
{
     setParallelism(parallelism, locationHint, sourceEdgeProperties, null, false, true);
   }
-
+  
+  @Override
+  public void reconfigureVertex(@Nullable Map<String, InputSpecUpdate> rootInputSpecUpdate,
+      int parallelism,
+      @Nullable VertexLocationHint locationHint) throws AMUserCodeException {
+    setParallelism(parallelism, locationHint, null, rootInputSpecUpdate, false, true);
+  }
+  
   @Override
   public void setParallelism(int parallelism, VertexLocationHint vertexLocationHint,
       Map<String, EdgeManagerPluginDescriptor> sourceEdgeManagers,

http://git-wip-us.apache.org/repos/asf/tez/blob/ec45c510/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java
index 2ac1acf..1ed42fc 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java
@@ -185,6 +185,18 @@ public class VertexManager {
         throw new TezUncheckedException(e);
       }
     }
+    
+    @Override
+    public void reconfigureVertex(@Nullable Map<String, InputSpecUpdate> rootInputSpecUpdate,
+        @Nullable VertexLocationHint locationHint,
+        int parallelism) {
+      checkAndThrowIfDone();
+      try {
+        managedVertex.reconfigureVertex(rootInputSpecUpdate, parallelism, locationHint);
+      } catch (AMUserCodeException e) {
+        throw new TezUncheckedException(e);
+      }
+    }
 
     @Override
     public synchronized void scheduleVertexTasks(List<TaskWithLocationHint> tasks)
{

http://git-wip-us.apache.org/repos/asf/tez/blob/ec45c510/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
index 2403599..3147093 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
@@ -5566,7 +5566,7 @@ public class TestVertexImpl {
         }
         map.put("input4", InputSpecUpdate.createPerTaskInputSpecUpdate(pInputList));
       }
-      getContext().setVertexParallelism(NUM_TASKS, null, null, map);
+      getContext().reconfigureVertex(map, null, NUM_TASKS);
     }
   }
 


Mime
View raw message