tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bi...@apache.org
Subject [2/2] git commit: TEZ-678. Support for union operations via VertexGroup abstraction (bikas)
Date Fri, 31 Jan 2014 22:26:38 GMT
TEZ-678. Support for union operations via VertexGroup abstraction (bikas)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/2e231264
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/2e231264
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/2e231264

Branch: refs/heads/master
Commit: 2e2312646d25ac0fc40b9cca443f83d3e10b392b
Parents: f55dbfb
Author: Bikas Saha <bikas@apache.org>
Authored: Fri Jan 31 14:26:24 2014 -0800
Committer: Bikas Saha <bikas@apache.org>
Committed: Fri Jan 31 14:26:24 2014 -0800

----------------------------------------------------------------------
 .../main/java/org/apache/tez/dag/api/DAG.java   | 102 +++-
 .../apache/tez/dag/api/DagTypeConverters.java   |   2 +-
 .../main/java/org/apache/tez/dag/api/Edge.java  |   4 +-
 .../org/apache/tez/dag/api/GroupInputEdge.java  |  86 ++++
 .../apache/tez/dag/api/TezConfiguration.java    |   2 +-
 .../java/org/apache/tez/dag/api/Vertex.java     |  33 +-
 .../org/apache/tez/dag/api/VertexGroup.java     | 126 +++++
 .../tez/runtime/api/MergedLogicalInput.java     |  61 +++
 tez-api/src/main/proto/DAGApiRecords.proto      |  13 +
 .../org/apache/tez/dag/api/TestDAGVerify.java   | 130 +++++
 .../java/org/apache/tez/dag/app/dag/Vertex.java |   5 +
 .../apache/tez/dag/app/dag/impl/DAGImpl.java    | 265 ++++++++--
 .../tez/dag/app/dag/impl/TaskAttemptImpl.java   |   5 +-
 .../apache/tez/dag/app/dag/impl/VertexImpl.java |  69 ++-
 .../tez/dag/app/dag/impl/TestDAGImpl.java       | 106 +++-
 .../tez/dag/app/dag/impl/TestVertexImpl.java    | 143 +++++-
 .../tez/dag/app/rm/TestContainerReuse.java      |   2 +-
 .../tez/mapreduce/examples/ExampleDriver.java   |   2 +
 .../tez/mapreduce/examples/UnionExample.java    | 501 +++++++++++++++++++
 .../mapreduce/committer/MROutputCommitter.java  |   1 +
 .../tez/mapreduce/processor/MapUtils.java       |   2 +-
 .../processor/reduce/TestReduceProcessor.java   |   2 +-
 .../runtime/LogicalIOProcessorRuntimeTask.java  |  61 ++-
 .../tez/runtime/api/impl/GroupInputSpec.java    |  94 ++++
 .../apache/tez/runtime/api/impl/TaskSpec.java   |  38 +-
 .../input/ConcatenatedMergedKeyValuesInput.java |  72 +++
 .../apache/tez/mapreduce/TestMRRJobsDAGApi.java |  26 +
 27 files changed, 1849 insertions(+), 104 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/2e231264/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java b/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java
index 1945a39..4e69e1e 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java
@@ -37,12 +37,15 @@ import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.hadoop.yarn.api.records.LocalResourceType;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
+import org.apache.tez.dag.api.VertexGroup.GroupInfo;
 import org.apache.tez.dag.api.EdgeProperty.DataSourceType;
 import org.apache.tez.dag.api.EdgeProperty.SchedulingType;
 import org.apache.tez.dag.api.VertexLocationHint.TaskLocationHint;
 import org.apache.tez.dag.api.records.DAGProtos.ConfigurationProto;
 import org.apache.tez.dag.api.records.DAGProtos.DAGPlan;
 import org.apache.tez.dag.api.records.DAGProtos.EdgePlan;
+import org.apache.tez.dag.api.records.DAGProtos.PlanGroupInputEdgeInfo;
+import org.apache.tez.dag.api.records.DAGProtos.PlanVertexGroupInfo;
 import org.apache.tez.dag.api.records.DAGProtos.PlanKeyValuePair;
 import org.apache.tez.dag.api.records.DAGProtos.PlanLocalResource;
 import org.apache.tez.dag.api.records.DAGProtos.PlanTaskConfiguration;
@@ -53,18 +56,20 @@ import org.apache.tez.dag.api.records.DAGProtos.VertexPlan;
 import com.google.common.base.Preconditions;
 import org.apache.commons.collections4.bidimap.DualLinkedHashBidiMap;
 import org.apache.commons.collections4.BidiMap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
 
 public class DAG { // FIXME rename to Topology
-  final BidiMap<String, Vertex> vertices;
-  final List<Edge> edges;
+  final BidiMap<String, Vertex> vertices = 
+      new DualLinkedHashBidiMap<String, Vertex>();
+  final Set<Edge> edges = Sets.newHashSet();
   final String name;
   final List<URI> urisForCredentials = new LinkedList<URI>();
   Credentials credentials;
-  
+  Set<VertexGroup> vertexGroups = Sets.newHashSet();
+  Set<GroupInputEdge> groupInputEdges = Sets.newHashSet();
 
   public DAG(String name) {
-    this.vertices = new DualLinkedHashBidiMap<String, Vertex>();
-    this.edges = new ArrayList<Edge>();
     this.name = name;
   }
 
@@ -99,6 +104,12 @@ public class DAG { // FIXME rename to Topology
     this.credentials = credentials;
     return this;
   }
+  
+  public synchronized VertexGroup createVertexGroup(String name, Vertex... members) {
+    VertexGroup uv = new VertexGroup(name, members);
+    vertexGroups.add(uv);
+    return uv;
+  }
 
   @Private
   public synchronized Credentials getCredentials() {
@@ -159,17 +170,66 @@ public class DAG { // FIXME rename to Topology
         "Edge " + edge + " already defined!");
     }
 
-    // Inform the vertices
-    edge.getInputVertex().addOutputVertex(edge.getOutputVertex(), edge.getId());
-    edge.getOutputVertex().addInputVertex(edge.getInputVertex(), edge.getId());
+    // inform the vertices
+    edge.getInputVertex().addOutputVertex(edge.getOutputVertex(), edge);
+    edge.getOutputVertex().addInputVertex(edge.getInputVertex(), edge);
 
     edges.add(edge);
     return this;
   }
+  
+  public synchronized DAG addEdge(GroupInputEdge edge) {
+    // Sanity checks
+    if (!vertexGroups.contains(edge.getInputVertexGroup())) {
+      throw new IllegalArgumentException(
+        "Input vertex " + edge.getInputVertexGroup() + " doesn't exist!");
+    }
+    if (!vertices.containsValue(edge.getOutputVertex())) {
+      throw new IllegalArgumentException(
+        "Output vertex " + edge.getOutputVertex() + " doesn't exist!");
+    }
+    if (groupInputEdges.contains(edge)) {
+      throw new IllegalArgumentException(
+        "Edge " + edge + " already defined!");
+    }
 
+    VertexGroup av = edge.getInputVertexGroup();
+    av.addOutputVertex(edge.getOutputVertex(), edge);
+    groupInputEdges.add(edge);
+    return this;
+  }
+  
   public String getName() {
     return this.name;
   }
+  
+  private void processEdgesAndGroups() throws IllegalStateException {
+    // process all VertexGroups by transferring outgoing connections to the members
+    
+    // add edges between VertexGroup members and destination vertices
+    List<Edge> newEdges = Lists.newLinkedList();
+    for (GroupInputEdge e : groupInputEdges) {
+      Vertex  dstVertex = e.getOutputVertex();
+      VertexGroup uv = e.getInputVertexGroup();
+      for (Vertex member : uv.getMembers()) {
+        newEdges.add(new Edge(member, dstVertex, e.getEdgeProperty()));
+      }
+      dstVertex.addGroupInput(uv.getGroupName(), uv.getGroupInfo());
+    }
+    
+    for (Edge e : newEdges) {
+      addEdge(e);
+    }
+    
+    // add outputs to VertexGroup members
+    for(VertexGroup av : vertexGroups) {
+      for (RootInputLeafOutput<OutputDescriptor> output : av.getOutputs()) {
+        for (Vertex member : av.getMembers()) {
+          member.addAdditionalOutput(output);
+        }
+      }
+    }
+  }
 
   // AnnotatedVertex is used by verify()
   private static class AnnotatedVertex {
@@ -179,13 +239,11 @@ public class DAG { // FIXME rename to Topology
     int lowlink; //for Tarjan's algorithm
     boolean onstack; //for Tarjan's algorithm
 
-    int outDegree;
 
     private AnnotatedVertex(Vertex v) {
       this.v = v;
       index = -1;
       lowlink = -1;
-      outDegree = 0;
     }
   }
 
@@ -217,6 +275,8 @@ public class DAG { // FIXME rename to Topology
       throw new IllegalStateException("Invalid dag containing 0 vertices");
     }
 
+    processEdgesAndGroups();
+    
     // check for valid vertices, duplicate vertex names,
     // and prepare for cycle detection
     Map<String, AnnotatedVertex> vertexMap = new HashMap<String, AnnotatedVertex>();
@@ -315,7 +375,6 @@ public class DAG { // FIXME rename to Topology
 
     if (restricted) {
       for (Edge e : edges) {
-        vertexMap.get(e.getInputVertex().getVertexName()).outDegree++;
         if (e.getEdgeProperty().getDataSourceType() !=
           DataSourceType.PERSISTED) {
           throw new IllegalStateException(
@@ -399,6 +458,25 @@ public class DAG { // FIXME rename to Topology
     DAGPlan.Builder dagBuilder = DAGPlan.newBuilder();
 
     dagBuilder.setName(this.name);
+    
+    if (!vertexGroups.isEmpty()) {
+      for (VertexGroup av : vertexGroups) {
+        GroupInfo groupInfo = av.getGroupInfo();
+        PlanVertexGroupInfo.Builder groupBuilder = PlanVertexGroupInfo.newBuilder();
+        groupBuilder.setGroupName(groupInfo.getGroupName());
+        for (Vertex v : groupInfo.getMembers()) {
+          groupBuilder.addGroupMembers(v.getVertexName());
+        }
+        groupBuilder.addAllOutputs(groupInfo.outputs);
+        for (Map.Entry<String, InputDescriptor> entry : 
+             groupInfo.edgeMergedInputs.entrySet()) {
+          groupBuilder.addEdgeMergedInputs(
+              PlanGroupInputEdgeInfo.newBuilder().setDestVertexName(entry.getKey()).
+              setMergedInput(DagTypeConverters.convertToDAGPlan(entry.getValue())));
+        }
+        dagBuilder.addVertexGroups(groupBuilder); 
+      }
+    }
 
     for (Vertex vertex : vertices.values()) {
       VertexPlan.Builder vertexBuilder = VertexPlan.newBuilder();
@@ -452,7 +530,7 @@ public class DAG { // FIXME rename to Topology
           taskConfigBuilder.addLocalResource(localResourcesBuilder);
         }
       }
-
+      
       if (vertex.getTaskEnvironment() != null) {
         for (String key : vertex.getTaskEnvironment().keySet()) {
           PlanKeyValuePair.Builder envSettingBuilder = PlanKeyValuePair.newBuilder();

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/2e231264/tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java b/tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java
index ba6647f..0a4668f 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java
@@ -174,7 +174,7 @@ public class DagTypeConverters {
     }
     return new VertexLocationHint(outputList.size(), outputList);
   }
-
+  
   // notes re HDFS URL handling:
   //   Resource URLs in the protobuf message are strings of the form hdfs://host:port/path
   //   org.apache.hadoop.fs.Path.Path  is actually a URI type that allows any scheme

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/2e231264/tez-api/src/main/java/org/apache/tez/dag/api/Edge.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/Edge.java b/tez-api/src/main/java/org/apache/tez/dag/api/Edge.java
index 0e384b3..31a45f3 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/Edge.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/Edge.java
@@ -33,7 +33,7 @@ public class Edge {
     this.outputVertex = outputVertex;
     this.edgeProperty = edgeProperty;
   }
-
+  
   // RENAME to source and destination
   public Vertex getInputVertex() {
     return inputVertex;
@@ -46,7 +46,7 @@ public class Edge {
   public EdgeProperty getEdgeProperty() {
     return edgeProperty;
   }
-
+  
   /*
    * Used to identify the edge in the configuration
    */

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/2e231264/tez-api/src/main/java/org/apache/tez/dag/api/GroupInputEdge.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/GroupInputEdge.java b/tez-api/src/main/java/org/apache/tez/dag/api/GroupInputEdge.java
new file mode 100644
index 0000000..e7f25ea
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/GroupInputEdge.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.dag.api;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+
+public class GroupInputEdge {
+
+  private final VertexGroup inputVertexGroup;
+  private final Vertex outputVertex;
+  private final EdgeProperty edgeProperty;
+  private final InputDescriptor mergedInput;
+
+  // InputVertex(EdgeInput) ----- Edge ----- OutputVertex(EdgeOutput)]
+  /**
+   * An Edge that connects a VertexGroup to a destination Vertex. The framework
+   * takes care of connecting the VertexGroup members with the destination
+   * vertex. The tasks of the destination vertex see only 1 input named after
+   * the VertexGroup instead of individual inputs from group members. These
+   * individual inputs are merged using the mergedInput before presenting them
+   * to the destination task.
+   * 
+   * @param inputVertexGroup source VertexGroup
+   * @param outputVertex destination Vertex
+   * @param edgeProperty edge properties
+   * @param mergedInput MergedLogicalInput 
+   */
+  public GroupInputEdge(VertexGroup inputVertexGroup, 
+      Vertex outputVertex, 
+      EdgeProperty edgeProperty,
+      InputDescriptor mergedInput) {
+    this.inputVertexGroup = inputVertexGroup;
+    this.outputVertex = outputVertex;
+    this.edgeProperty = edgeProperty;
+    if (mergedInput == null) {
+      throw new TezUncheckedException(
+          "Merged input must be specified when using GroupInputEdge");
+    }
+    this.mergedInput = mergedInput;
+  }
+
+  public VertexGroup getInputVertexGroup() {
+    return inputVertexGroup;
+  }
+
+  public Vertex getOutputVertex() {
+    return outputVertex;
+  }
+
+  public EdgeProperty getEdgeProperty() {
+    return edgeProperty;
+  }
+  
+  InputDescriptor getMergedInput() {
+    return mergedInput;
+  }
+
+  /*
+   * Used to identify the edge in the configuration
+   */
+  @Private
+  public String getId() {
+    return String.valueOf(this.hashCode());
+  }
+ 
+  @Override
+  public String toString() {
+    return inputVertexGroup + " -> " + outputVertex + " (" + edgeProperty + ")";
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/2e231264/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
index b5dac99..18fac77 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
@@ -50,7 +50,7 @@ public class TezConfiguration extends Configuration {
   public static final String TEZ_AM_LOG_LEVEL = TEZ_AM_PREFIX+"log.level";
   public static final String TEZ_AM_LOG_LEVEL_DEFAULT = "INFO";
 
-  public static final String TEZ_AM_ABORT_ALL_OUTPUTS_ON_DAG_FAILURE = 
+  public static final String TEZ_AM_ABORT_ALL_OUTPUTS_ON_DAG_FAILURE =
       TEZ_AM_PREFIX + "abort-all-outputs-on-dag-failure";
   public static final boolean TEZ_AM_ABORT_ALL_OUTPUTS_ON_DAG_FAILURE_DEFAULT = true;
   

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/2e231264/tez-api/src/main/java/org/apache/tez/dag/api/Vertex.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/Vertex.java b/tez-api/src/main/java/org/apache/tez/dag/api/Vertex.java
index a58fa3a..d2e8ac7 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/Vertex.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/Vertex.java
@@ -25,12 +25,15 @@ import java.util.Map;
 
 import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.tez.dag.api.VertexGroup.GroupInfo;
 import org.apache.tez.dag.api.VertexLocationHint.TaskLocationHint;
 import org.apache.tez.runtime.api.LogicalIOProcessor;
 import org.apache.tez.runtime.api.OutputCommitter;
 import org.apache.tez.runtime.api.TezRootInputInitializer;
 import org.apache.tez.runtime.api.events.RootInputDataInformationEvent;
 
+import com.google.common.collect.Maps;
+
 public class Vertex {
 
   private final String vertexName;
@@ -51,9 +54,10 @@ public class Vertex {
   private final List<Vertex> outputVertices = new ArrayList<Vertex>();
   private final List<String> inputEdgeIds = new ArrayList<String>();
   private final List<String> outputEdgeIds = new ArrayList<String>();
+  private final Map<String, GroupInfo> groupInputs = Maps.newHashMap();
+  
   private String javaOpts = "";
 
-
   public Vertex(String vertexName,
       ProcessorDescriptor processorDescriptor,
       int parallelism,
@@ -188,6 +192,11 @@ public class Vertex {
         outputDescriptor, outputCommitterClazz));
     return this;
   }
+  
+  Vertex addAdditionalOutput(RootInputLeafOutput<OutputDescriptor> output) {
+    additionalOutputs.add(output);
+    return this;
+  }
 
   public Vertex addOutput(String outputName, OutputDescriptor outputDescriptor) {
     return addOutput(outputName, outputDescriptor, null);
@@ -220,16 +229,28 @@ public class Vertex {
     return vertexManagerPlugin;
   }
 
-  void addInputVertex(Vertex inputVertex, String edgeId) {
+  Map<String, GroupInfo> getGroupInputs() {
+    return groupInputs;
+  }
+  
+  void addGroupInput(String groupName, GroupInfo groupInputInfo) {
+    if (groupInputs.put(groupName, groupInputInfo) != null) {
+      throw new IllegalStateException(
+          "Vertex: " + getVertexName() + 
+          " already has group input with name:" + groupName);
+    }
+  }
+
+  void addInputVertex(Vertex inputVertex, Edge edge) {
     inputVertices.add(inputVertex);
-    inputEdgeIds.add(edgeId);
+    inputEdgeIds.add(edge.getId());
   }
 
-  void addOutputVertex(Vertex outputVertex, String edgeId) {
+  void addOutputVertex(Vertex outputVertex, Edge edge) {
     outputVertices.add(outputVertex);
-    outputEdgeIds.add(edgeId);
+    outputEdgeIds.add(edge.getId());
   }
-
+  
   public List<Vertex> getInputVertices() {
     return Collections.unmodifiableList(inputVertices);
   }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/2e231264/tez-api/src/main/java/org/apache/tez/dag/api/VertexGroup.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/VertexGroup.java b/tez-api/src/main/java/org/apache/tez/dag/api/VertexGroup.java
new file mode 100644
index 0000000..db54a39
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/VertexGroup.java
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.api;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.tez.runtime.api.OutputCommitter;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
+/**
+ * Represents a virtual collection of vertices whose members can be treated as a single 
+ * named collection for graph operations. Only the following connections are valid.
+ * A VertexGroup can be connected as an input to a consumer Vertex. The tasks of 
+ * the destination vertex see a single input named after the VertexGroup instead 
+ * multiple inputs from the members of the VertexGroup. 
+ * An output can be added to a VertexGroup.
+ * All outgoing edges & outputs of a VertexGroup are automatically transferred to the 
+ * member vertices of the VertexGroup.
+ * A VertexGroup is not part of the final DAG.
+ */
+public class VertexGroup {
+
+  static class GroupInfo {
+    String groupName;
+    Set<Vertex> members = new HashSet<Vertex>();
+    Set<String> outputs = new HashSet<String>();
+    // destination vertex name to merged input map
+    Map<String, InputDescriptor> edgeMergedInputs = Maps.newHashMap();
+    
+    GroupInfo(String groupName, Vertex... vertices) {
+      this.groupName = groupName;
+      members = Sets.newHashSetWithExpectedSize(vertices.length);
+      for (Vertex v : vertices) {
+        members.add(v);
+      }
+    }
+    String getGroupName() {
+      return groupName;
+    }
+    Set<Vertex> getMembers() {
+      return members;
+    }
+    Set<String> getOutputs() {
+      return outputs;
+    }
+  }
+  
+  List<RootInputLeafOutput<OutputDescriptor>> outputs = Lists.newLinkedList();
+  
+  GroupInfo groupInfo;
+  
+  /**
+   * Create an object representing a group of vertices
+   * @param groupName name of the group
+   */
+  VertexGroup(String groupName, Vertex... members) {
+    if (members == null || members.length < 2) {
+      throw new IllegalArgumentException("VertexGroup must have at least 2 members");
+    }
+    this.groupInfo = new GroupInfo(groupName, members);
+  }
+
+  /**
+   * Get the name of the group
+   * @return name
+   */
+  public String getGroupName() {
+    return groupInfo.groupName;
+  }
+  
+  /**
+   * Add an common output to the group of vertices.
+   * Refer to {@link Vertex#addOutput(String, OutputDescriptor, Class)}
+   */
+  public VertexGroup addOutput(String outputName, OutputDescriptor outputDescriptor,
+      Class<? extends OutputCommitter> outputCommitterClazz) {
+    outputs.add(new RootInputLeafOutput<OutputDescriptor>(outputName,
+        outputDescriptor, outputCommitterClazz));
+    this.groupInfo.outputs.add(outputName);
+    return this;
+  }
+  
+  @Override
+  public String toString() {
+    return "[ VertexGroup: " + groupInfo.getGroupName() + "]";
+  }
+
+  List<RootInputLeafOutput<OutputDescriptor>> getOutputs() {
+    return outputs;
+  }
+  
+  GroupInfo getGroupInfo() {
+    return groupInfo;
+  }
+  
+  Set<Vertex> getMembers() {
+    return groupInfo.members;
+  }
+  
+  void addOutputVertex(Vertex outputVertex, GroupInputEdge edge) {
+    this.groupInfo.edgeMergedInputs.put(outputVertex.getVertexName(), edge.getMergedInput());
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/2e231264/tez-api/src/main/java/org/apache/tez/runtime/api/MergedLogicalInput.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/MergedLogicalInput.java b/tez-api/src/main/java/org/apache/tez/runtime/api/MergedLogicalInput.java
new file mode 100644
index 0000000..b879b4c
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/runtime/api/MergedLogicalInput.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.api;
+
+import java.util.List;
+
+/**
+ * A LogicalInput that is used to merge the data from multiple inputs and provide a 
+ * single <code>Reader</code> to read that data.
+ * This Input is not initialized or closed. It is only expected to provide a 
+ * merged view of the real inputs. It cannot send or receive events
+ */
+public abstract class MergedLogicalInput implements LogicalInput {
+
+  private List<Input> inputs;
+  
+  public void initialize(List<Input> inputs) {
+    this.inputs = inputs;
+  }
+  
+  protected List<Input> getInputs() {
+    return inputs;
+  }
+  
+  @Override
+  public final List<Event> initialize(TezInputContext inputContext) throws Exception {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public final void handleEvents(List<Event> inputEvents) throws Exception {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public final List<Event> close() throws Exception {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public final void setNumPhysicalInputs(int numInputs) {
+    throw new UnsupportedOperationException();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/2e231264/tez-api/src/main/proto/DAGApiRecords.proto
----------------------------------------------------------------------
diff --git a/tez-api/src/main/proto/DAGApiRecords.proto b/tez-api/src/main/proto/DAGApiRecords.proto
index f83e6db..e658763 100644
--- a/tez-api/src/main/proto/DAGApiRecords.proto
+++ b/tez-api/src/main/proto/DAGApiRecords.proto
@@ -88,6 +88,18 @@ message PlanTaskLocationHint {
   repeated string host = 2;
 }
 
+message PlanGroupInputEdgeInfo {
+  required string dest_vertex_name = 1;
+  required TezEntityDescriptorProto merged_input = 2;
+}
+
+message PlanVertexGroupInfo {
+  optional string group_name = 1;
+  repeated string group_members = 2;
+  repeated string outputs = 3;
+  repeated PlanGroupInputEdgeInfo edge_merged_inputs = 4;
+}
+
 message PlanTaskConfiguration {
   required int32 numTasks = 1;
   required int32 memoryMb = 2;
@@ -144,6 +156,7 @@ message DAGPlan {
   repeated EdgePlan edge = 3;
   optional ConfigurationProto dagKeyValues = 4;
   optional bytes credentials_binary = 5;
+  repeated PlanVertexGroupInfo vertex_groups = 6;
 }
 
 // DAG monitoring messages

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/2e231264/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGVerify.java
----------------------------------------------------------------------
diff --git a/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGVerify.java b/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGVerify.java
index e60a7a5..7d55c39 100644
--- a/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGVerify.java
+++ b/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGVerify.java
@@ -414,6 +414,136 @@ public class TestDAGVerify {
     dag.addEdge(e2);
     dag.verify();
   }
+  
+  @Test
+  public void testVertexGroupWithMultipleOutputEdges() {
+    Vertex v1 = new Vertex("v1",
+        new ProcessorDescriptor("Processor"),
+        dummyTaskCount, dummyTaskResource);
+    Vertex v2 = new Vertex("v2",
+        new ProcessorDescriptor("Processor"),
+        dummyTaskCount, dummyTaskResource);
+    Vertex v3 = new Vertex("v3",
+        new ProcessorDescriptor("Processor"),
+        dummyTaskCount, dummyTaskResource);
+    Vertex v4 = new Vertex("v4",
+        new ProcessorDescriptor("Processor"),
+        dummyTaskCount, dummyTaskResource);
+    
+    DAG dag = new DAG("testDag");
+    VertexGroup uv12 = dag.createVertexGroup("uv12", v1, v2);
+    OutputDescriptor outDesc = new OutputDescriptor();
+    uv12.addOutput("uvOut", outDesc, null);
+    
+    GroupInputEdge e1 = new GroupInputEdge(uv12, v3,
+        new EdgeProperty(DataMovementType.SCATTER_GATHER, 
+            DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL,
+            new OutputDescriptor("dummy output class"),
+            new InputDescriptor("dummy input class")),
+            new InputDescriptor("dummy input class"));
+    
+    GroupInputEdge e2 = new GroupInputEdge(uv12, v4,
+        new EdgeProperty(DataMovementType.SCATTER_GATHER, 
+            DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL,
+            new OutputDescriptor("dummy output class"),
+            new InputDescriptor("dummy input class")),
+            new InputDescriptor("dummy input class"));
+
+    dag.addVertex(v1);
+    dag.addVertex(v2);
+    dag.addVertex(v3);
+    dag.addVertex(v4);
+    dag.addEdge(e1);
+    dag.addEdge(e2);
+    dag.verify();
+    
+    Assert.assertEquals(2, v1.getOutputVertices().size());
+    Assert.assertEquals(2, v2.getOutputVertices().size());
+    Assert.assertTrue(v1.getOutputVertices().contains(v3));
+    Assert.assertTrue(v1.getOutputVertices().contains(v4));
+    Assert.assertTrue(v2.getOutputVertices().contains(v3));
+    Assert.assertTrue(v2.getOutputVertices().contains(v4));
+  }
+  
+  @Test
+  public void testVertexGroup() {
+    Vertex v1 = new Vertex("v1",
+        new ProcessorDescriptor("Processor"),
+        dummyTaskCount, dummyTaskResource);
+    Vertex v2 = new Vertex("v2",
+        new ProcessorDescriptor("Processor"),
+        dummyTaskCount, dummyTaskResource);
+    Vertex v3 = new Vertex("v3",
+        new ProcessorDescriptor("Processor"),
+        dummyTaskCount, dummyTaskResource);
+    Vertex v4 = new Vertex("v4",
+        new ProcessorDescriptor("Processor"),
+        dummyTaskCount, dummyTaskResource);
+    Vertex v5 = new Vertex("v5",
+        new ProcessorDescriptor("Processor"),
+        dummyTaskCount, dummyTaskResource);
+    
+    DAG dag = new DAG("testDag");
+    String groupName1 = "uv12";
+    VertexGroup uv12 = dag.createVertexGroup(groupName1, v1, v2);
+    OutputDescriptor outDesc = new OutputDescriptor();
+    uv12.addOutput("uvOut", outDesc, null);
+    
+    String groupName2 = "uv23";
+    VertexGroup uv23 = dag.createVertexGroup(groupName2, v2, v3);
+    
+    GroupInputEdge e1 = new GroupInputEdge(uv12, v4,
+        new EdgeProperty(DataMovementType.SCATTER_GATHER, 
+            DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL,
+            new OutputDescriptor("dummy output class"),
+            new InputDescriptor("dummy input class")),
+            new InputDescriptor("dummy input class"));
+    GroupInputEdge e2 = new GroupInputEdge(uv23, v5,
+        new EdgeProperty(DataMovementType.SCATTER_GATHER, 
+            DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL,
+            new OutputDescriptor("dummy output class"),
+            new InputDescriptor("dummy input class")),
+            new InputDescriptor("dummy input class"));
+    
+    dag.addVertex(v1);
+    dag.addVertex(v2);
+    dag.addVertex(v3);
+    dag.addVertex(v4);
+    dag.addVertex(v5);
+    dag.addEdge(e1);
+    dag.addEdge(e2);
+    dag.verify();
+
+    // for the first Group v1 and v2 should get connected to v4 and also have 1 output
+    // for the second Group v2 and v3 should get connected to v5
+    // the Group place holders should disappear
+    Assert.assertNull(dag.getVertex(uv12.getGroupName()));
+    Assert.assertNull(dag.getVertex(uv23.getGroupName()));
+    Assert.assertFalse(dag.edges.contains(e1));
+    Assert.assertFalse(dag.edges.contains(e2));
+    Assert.assertEquals(1, v1.getOutputs().size());
+    Assert.assertEquals(1, v2.getOutputs().size());
+    Assert.assertEquals(outDesc, v1.getOutputs().get(0).getDescriptor());
+    Assert.assertEquals(outDesc, v2.getOutputs().get(0).getDescriptor());
+    Assert.assertEquals(1, v1.getOutputVertices().size());
+    Assert.assertEquals(1, v3.getOutputVertices().size());
+    Assert.assertEquals(2, v2.getOutputVertices().size());
+    Assert.assertTrue(v1.getOutputVertices().contains(v4));
+    Assert.assertTrue(v3.getOutputVertices().contains(v5));
+    Assert.assertTrue(v2.getOutputVertices().contains(v4));
+    Assert.assertTrue(v2.getOutputVertices().contains(v5));
+    Assert.assertEquals(2, v4.getInputVertices().size());
+    Assert.assertTrue(v4.getInputVertices().contains(v1));
+    Assert.assertTrue(v4.getInputVertices().contains(v2));
+    Assert.assertEquals(2, v5.getInputVertices().size());
+    Assert.assertTrue(v5.getInputVertices().contains(v2));
+    Assert.assertTrue(v5.getInputVertices().contains(v3));
+    Assert.assertEquals(1, v4.getGroupInputs().size());
+    Assert.assertTrue(v4.getGroupInputs().containsKey(groupName1));
+    Assert.assertEquals(1, v5.getGroupInputs().size());
+    Assert.assertTrue(v5.getGroupInputs().containsKey(groupName2));
+    Assert.assertEquals(2, dag.vertexGroups.size());
+  }
 
   //   v1
   //  |  |

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/2e231264/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
index 134e5fe..03fc0b2 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
@@ -39,6 +39,7 @@ import org.apache.tez.dag.app.dag.impl.RootInputLeafOutputDescriptor;
 import org.apache.tez.dag.records.TezTaskID;
 import org.apache.tez.dag.records.TezVertexID;
 import org.apache.tez.runtime.api.OutputCommitter;
+import org.apache.tez.runtime.api.impl.GroupInputSpec;
 import org.apache.tez.runtime.api.impl.InputSpec;
 import org.apache.tez.runtime.api.impl.OutputSpec;
 
@@ -94,6 +95,10 @@ public interface Vertex extends Comparable<Vertex> {
 
   List<InputSpec> getInputSpecList(int taskIndex);
   List<OutputSpec> getOutputSpecList(int taskIndex);
+  
+  List<GroupInputSpec> getGroupInputSpecList(int taskIndex);
+  void addSharedOutputs(Set<String> outputs);
+  Set<String> getSharedOutputs();
 
   int getInputVerticesCount();
   int getOutputVerticesCount();

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/2e231264/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
index 16ae914..4027fc5 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
@@ -26,6 +26,7 @@ import java.util.Collections;
 import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -53,6 +54,7 @@ import org.apache.tez.common.counters.TezCounters;
 import org.apache.tez.dag.api.DagTypeConverters;
 import org.apache.tez.dag.api.EdgeManagerDescriptor;
 import org.apache.tez.dag.api.EdgeProperty;
+import org.apache.tez.dag.api.InputDescriptor;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.TezUncheckedException;
 import org.apache.tez.dag.api.VertexLocationHint;
@@ -65,6 +67,8 @@ import org.apache.tez.dag.api.client.VertexStatus;
 import org.apache.tez.dag.api.client.VertexStatusBuilder;
 import org.apache.tez.dag.api.records.DAGProtos.DAGPlan;
 import org.apache.tez.dag.api.records.DAGProtos.EdgePlan;
+import org.apache.tez.dag.api.records.DAGProtos.PlanGroupInputEdgeInfo;
+import org.apache.tez.dag.api.records.DAGProtos.PlanVertexGroupInfo;
 import org.apache.tez.dag.api.records.DAGProtos.VertexPlan;
 import org.apache.tez.dag.app.AppContext;
 import org.apache.tez.dag.app.TaskAttemptListener;
@@ -98,6 +102,8 @@ import org.apache.tez.mapreduce.hadoop.MRJobConfig;
 import org.apache.tez.runtime.api.OutputCommitter;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 
 /** Implementation of Job interface. Maintains the state machines of Job.
@@ -207,7 +213,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
               EnumSet.of(DAGState.RUNNING, DAGState.SUCCEEDED, DAGState.TERMINATING,DAGState.FAILED),
               DAGEventType.DAG_VERTEX_COMPLETED,
               new VertexCompletedTransition())
-          .addTransition(DAGState.RUNNING, DAGState.RUNNING,
+          .addTransition(DAGState.RUNNING, EnumSet.of(DAGState.RUNNING, DAGState.TERMINATING),
               DAGEventType.DAG_VERTEX_RERUNNING,
               new VertexReRunningTransition())
           .addTransition(DAGState.RUNNING, DAGState.TERMINATING,
@@ -334,6 +340,30 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
   private long initTime;
   private long startTime;
   private long finishTime;
+  
+  Map<String, VertexGroupInfo> vertexGroups = Maps.newHashMap();
+  Map<String, List<VertexGroupInfo>> vertexGroupInfo = Maps.newHashMap();
+  
+  static class VertexGroupInfo {
+    String groupName;
+    Set<String> groupMembers;
+    Set<String> outputs;
+    Map<String, InputDescriptor> edgeMergedInputs;
+    int successfulMembers;
+    boolean committed;
+    VertexGroupInfo(PlanVertexGroupInfo groupInfo) {
+      groupName = groupInfo.getGroupName();
+      groupMembers = Sets.newHashSet(groupInfo.getGroupMembersList());
+      edgeMergedInputs = Maps.newHashMapWithExpectedSize(groupInfo.getEdgeMergedInputsCount());
+      for (PlanGroupInputEdgeInfo edgInfo : groupInfo.getEdgeMergedInputsList()) {
+        edgeMergedInputs.put(edgInfo.getDestVertexName(), 
+            DagTypeConverters.convertInputDescriptorFromDAGPlan(edgInfo.getMergedInput()));
+      }
+      outputs = Sets.newHashSet(groupInfo.getOutputsList());
+      successfulMembers = 0;
+      committed = false;
+    }
+  }
 
   public DAGImpl(TezDAGID dagId,
       Configuration conf,
@@ -609,6 +639,23 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
     }
   }
   
+  private boolean commitOutput(String outputName, OutputCommitter outputCommitter) {
+    final OutputCommitter committer = outputCommitter;
+    try {
+      getDagUGI().doAs(new PrivilegedExceptionAction<Void>() {
+        @Override
+        public Void run() throws Exception {
+          committer.commitOutput();
+          return null;
+        }
+      });
+      return true;
+    } catch (Exception e) {
+      LOG.info("Exception in committing output: " + outputName, e);
+    }
+    return false;
+  }
+  
   private synchronized boolean commitOrAbortOutputs(boolean dagSucceeded) {
     if (this.committedOrAborted) {
       LOG.info("Ignoring multiple output commit/abort");
@@ -619,17 +666,53 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
     
     boolean successfulOutputsAlreadyCommitted = !abortAllOutputsOnFailure;
     boolean failedWhileCommitting = false;
-    final Set<OutputCommitter> committedOutputs = Sets.newHashSet();
     if (dagSucceeded && !successfulOutputsAlreadyCommitted) {
-      // commit all outputs
+      // commit all shared outputs
+      for (VertexGroupInfo groupInfo : vertexGroups.values()) {
+        if (failedWhileCommitting) {
+          break;
+        }
+        if (!groupInfo.outputs.isEmpty()) {
+          groupInfo.committed = true;
+          Vertex v = getVertex(groupInfo.groupMembers.iterator().next());
+          for (String outputName : groupInfo.outputs) {
+            OutputCommitter committer = v.getOutputCommitters().get(outputName);
+            LOG.info("Committing output: " + outputName + " for group: " + groupInfo.groupName);
+            if (!commitOutput(outputName, committer)) {
+              failedWhileCommitting = true;
+              break;
+            }
+          }
+        }
+      }
+      // commit all other outputs
       // we come here for successful dag completion and when outputs need to be
       // committed at the end for all or none visibility
       for (Vertex vertex : vertices.values()) {
-        Map<String, OutputCommitter> outputCommitters = vertex.getOutputCommitters();
-        if (outputCommitters == null || outputCommitters.isEmpty()) {
+        if (failedWhileCommitting) {
+          break;
+        }
+        if (vertex.getOutputCommitters() == null) {
           LOG.info("No output committers for vertex: " + vertex.getName());
           continue;
         }
+        Map<String, OutputCommitter> outputCommitters = 
+            new HashMap<String, OutputCommitter>(vertex.getOutputCommitters());
+        Set<String> sharedOutputs = vertex.getSharedOutputs();
+        // remove shared outputs
+        if (sharedOutputs != null) {
+          Iterator<Map.Entry<String, OutputCommitter>> iter = outputCommitters
+              .entrySet().iterator();
+          while (iter.hasNext()) {
+            if (sharedOutputs.contains(iter.next().getKey())) {
+              iter.remove();
+            }
+          }
+        }
+        if (outputCommitters.isEmpty()) {
+          LOG.info("No exclusive output committers for vertex: " + vertex.getName());
+          continue;
+        }
         for (Map.Entry<String, OutputCommitter> entry : outputCommitters.entrySet()) {
           LOG.info("Committing output: " + entry.getKey() + " for vertex: "
               + vertex.getVertexId());
@@ -637,28 +720,11 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
             throw new TezUncheckedException("Vertex: " + vertex.getName() + 
                 " not in SUCCEEDED state. State= " + vertex.getState());
           }
-          final OutputCommitter committer = entry.getValue();
-          try {
-            getDagUGI().doAs(new PrivilegedExceptionAction<Void>() {
-              @Override
-              public Void run() throws Exception {
-                committer.commitOutput();
-                committedOutputs.add(committer);
-                return null;
-              }
-            });
-          } catch (Exception e) {
+          if (!commitOutput(entry.getKey(), entry.getValue())) {
             failedWhileCommitting = true;
-            LOG.info("Exception in committing output: " + entry.getKey()
-                + " for vertex: " + vertex.getVertexId(), e);
-          }
-          if (failedWhileCommitting) {
             break;
           }
         }
-        if (failedWhileCommitting) {
-          break;
-        }
       }
     }
     
@@ -799,13 +865,12 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
     }
 
     if (dag.numCompletedVertices == dag.numVertices) {
+      dag.setFinishTime();
       //Only succeed if vertices complete successfully and no terminationCause is registered.
       if(dag.numSuccessfulVertices == dag.numVertices && dag.terminationCause == null) {
-        dag.setFinishTime();
         return dag.finished(DAGState.SUCCEEDED);
       }
-      else if(dag.terminationCause == DAGTerminationCause.DAG_KILL ){
-        dag.setFinishTime();
+      if(dag.terminationCause == DAGTerminationCause.DAG_KILL ){
         String diagnosticMsg = "DAG killed due to user-initiated kill." +
             " failedVertices:" + dag.numFailedVertices +
             " killedVertices:" + dag.numKilledVertices;
@@ -814,7 +879,6 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
         return dag.finished(DAGState.KILLED);
       }
       if(dag.terminationCause == DAGTerminationCause.VERTEX_FAILURE ){
-        dag.setFinishTime();
         String diagnosticMsg = "DAG failed due to vertex failure." +
             " failedVertices:" + dag.numFailedVertices +
             " killedVertices:" + dag.numKilledVertices;
@@ -822,16 +886,26 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
         dag.addDiagnostic(diagnosticMsg);
         return dag.finished(DAGState.FAILED);
       }
-      else {
-        // should never get here.
-        throw new TezUncheckedException("All vertices complete, but cannot determine final state of DAG"
-            + ", numCompletedVertices=" + dag.numCompletedVertices
-            + ", numSuccessfulVertices=" + dag.numSuccessfulVertices
-            + ", numFailedVertices=" + dag.numFailedVertices
-            + ", numKilledVertices=" + dag.numKilledVertices
-            + ", numVertices=" + dag.numVertices
-            + ", terminationCause=" + dag.terminationCause);
+      if(dag.terminationCause == DAGTerminationCause.COMMIT_FAILURE ){
+        String diagnosticMsg = "DAG failed due to commit failure." +
+            " failedVertices:" + dag.numFailedVertices +
+            " killedVertices:" + dag.numKilledVertices;
+        LOG.info(diagnosticMsg);
+        dag.addDiagnostic(diagnosticMsg);
+        return dag.finished(DAGState.FAILED);
       }
+      
+      // catch all
+      String diagnosticMsg = "All vertices complete, but cannot determine final state of DAG"
+          + ", numCompletedVertices=" + dag.numCompletedVertices
+          + ", numSuccessfulVertices=" + dag.numSuccessfulVertices
+          + ", numFailedVertices=" + dag.numFailedVertices
+          + ", numKilledVertices=" + dag.numKilledVertices
+          + ", numVertices=" + dag.numVertices
+          + ", terminationCause=" + dag.terminationCause;
+      LOG.error(diagnosticMsg);
+      dag.addDiagnostic(diagnosticMsg);
+      return dag.finished(DAGState.ERROR);
     }
 
     //return the current state, Job not finished yet
@@ -982,7 +1056,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
     return amInfos;
   }
   */
-
+  
   private static class InitTransition
       implements MultipleArcTransition<DAGImpl, DAGEvent, DAGState> {
 
@@ -1011,6 +1085,22 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
         dag.trySetTerminationCause(DAGTerminationCause.ZERO_VERTICES);
         return dag.finished(DAGState.FAILED);
       }
+      
+      if (dag.jobPlan.getVertexGroupsCount() > 0) {
+        for (PlanVertexGroupInfo groupInfo : dag.jobPlan.getVertexGroupsList()) {
+          dag.vertexGroups.put(groupInfo.getGroupName(), new VertexGroupInfo(groupInfo));
+        }
+        for (VertexGroupInfo groupInfo : dag.vertexGroups.values()) {
+          for (String vertexName : groupInfo.groupMembers) {
+            List<VertexGroupInfo> groupList = dag.vertexGroupInfo.get(vertexName);
+            if (groupList == null) {
+              groupList = Lists.newLinkedList();
+              dag.vertexGroupInfo.put(vertexName, groupList);
+            }
+            groupList.add(groupInfo);
+          }
+        }
+      }
 
       // create the vertices`
       for (int i=0; i < dag.numVertices; ++i) {
@@ -1033,6 +1123,22 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
       }
 
       assignDAGScheduler(dag);
+      
+      for (Map.Entry<String, VertexGroupInfo> entry : dag.vertexGroups.entrySet()) {
+        String groupName = entry.getKey();
+        VertexGroupInfo groupInfo = entry.getValue();
+        if (!groupInfo.outputs.isEmpty()) {
+          // shared outputs
+          for (String vertexName : groupInfo.groupMembers) {
+            if (LOG.isDebugEnabled()) {
+              LOG.debug("Setting shared outputs for group: " + groupName + 
+                  " on vertex: " + vertexName);
+            }
+            Vertex v = dag.getVertex(vertexName);
+            v.addSharedOutputs(groupInfo.outputs);
+          }
+        }
+      }
 
       // TODO Metrics
       //dag.metrics.endPreparingJob(dag);
@@ -1080,7 +1186,8 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
           vertexId, vertexPlan, vertexName, dag.conf,
           dag.eventHandler, dag.taskAttemptListener, 
           dag.clock, dag.taskHeartbeatHandler,
-          !dag.abortAllOutputsOnFailure, dag.appContext, vertexLocationHint);
+          !dag.abortAllOutputsOnFailure, dag.appContext, vertexLocationHint,
+          dag.vertexGroups);
       return v;
     }
 
@@ -1238,7 +1345,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
           // vertex succeeded for the first time
           job.dagScheduler.vertexCompleted(vertex);
         }
-        job.vertexSucceeded(vertex);
+        forceTransitionToKillWait = !(job.vertexSucceeded(vertex));
       }
       else if (vertexEvent.getVertexState() == VertexState.FAILED) {
         job.enactKill(
@@ -1275,13 +1382,14 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
   }
 
   private static class VertexReRunningTransition implements
-      SingleArcTransition<DAGImpl, DAGEvent> {
+    MultipleArcTransition<DAGImpl, DAGEvent, DAGState> {
+
     @Override
-    public void transition(DAGImpl job, DAGEvent event) {
+    public DAGState transition(DAGImpl job, DAGEvent event) {
       DAGEventVertexReRunning vertexEvent = (DAGEventVertexReRunning) event;
       Vertex vertex = job.vertices.get(vertexEvent.getVertexId());
       job.numCompletedVertices--;
-      job.vertexReRunning(vertex);
+      boolean failed = job.vertexReRunning(vertex);
 
 
       LOG.info("Vertex " + vertex.getVertexId() + " re-running."
@@ -1290,23 +1398,84 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
           + ", numFailedVertices=" + job.numFailedVertices
           + ", numKilledVertices=" + job.numKilledVertices
           + ", numVertices=" + job.numVertices);
+      
+      if (failed) {
+        return DAGState.TERMINATING;
+      }
+      return DAGState.RUNNING;
     }
   }
 
-  private void vertexSucceeded(Vertex vertex) {
+  private boolean vertexSucceeded(Vertex vertex) {
     numSuccessfulVertices++;
-    // TODO: Metrics
-    //job.metrics.completedTask(task);
+    boolean failedCommit = false;
+    if (!abortAllOutputsOnFailure) {
+      // committing successful outputs immediately. check for shared outputs
+      List<VertexGroupInfo> groupsList = vertexGroupInfo.get(vertex.getName());
+      if (groupsList != null) {
+        List<VertexGroupInfo> commitList = Lists.newArrayListWithCapacity(groupsList
+            .size());
+        for (VertexGroupInfo groupInfo : groupsList) {
+          groupInfo.successfulMembers++;
+          if (groupInfo.groupMembers.size() == groupInfo.successfulMembers
+              && !groupInfo.outputs.isEmpty()) {
+            // group has outputs and all vertex members are done
+            LOG.info("All members of group: " + groupInfo.groupName
+                + " are succeeded. Commiting outputs");
+            commitList.add(groupInfo);
+          }
+        }
+        for (VertexGroupInfo groupInfo : commitList) {
+          groupInfo.committed = true;
+          Vertex v = getVertex(groupInfo.groupMembers.iterator().next());
+          for (String outputName : groupInfo.outputs) {
+            OutputCommitter committer = v.getOutputCommitters().get(outputName);
+            LOG.info("Committing output: " + outputName);
+            if (!commitOutput(outputName, committer)) {
+              // using same logic as vertex level commit. stop after first failure.
+              failedCommit = true;
+              break;
+            }
+          }
+          if (failedCommit) {
+            break;
+          }
+        }
+      }
+    }
+
+    if (failedCommit) {
+      LOG.info("Aborting job due to failure in commit.");
+      enactKill(DAGTerminationCause.COMMIT_FAILURE,
+          VertexTerminationCause.COMMIT_FAILURE);
+    }
+    
+    return !failedCommit;
   }
 
-  private void vertexReRunning(Vertex vertex) {
+  private boolean vertexReRunning(Vertex vertex) {
     reRunningVertices.add(vertex.getVertexId());
     numSuccessfulVertices--;
     addDiagnostic("Vertex re-running"
       + ", vertexName=" + vertex.getName()
       + ", vertexId=" + vertex.getVertexId());
-    // TODO: Metrics
-    //job.metrics.completedTask(task);
+    
+    if (!abortAllOutputsOnFailure) {
+      // partial output may already have been committed. fail if so
+      List<VertexGroupInfo> groupList = vertexGroupInfo.get(vertex.getName());
+      if (groupList != null) {
+        for (VertexGroupInfo groupInfo : groupList) {
+          if (groupInfo.committed) {
+            LOG.info("Aborting job as committed vertex: "
+                + vertex.getVertexId() + " is re-running");
+            enactKill(DAGTerminationCause.COMMIT_FAILURE,
+                VertexTerminationCause.COMMIT_FAILURE);
+            return true;
+          }
+        }
+      }
+    }
+    return false;
   }
 
   private void vertexFailed(Vertex vertex) {

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/2e231264/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
index b02d51f..d4012f2 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
@@ -297,8 +297,9 @@ public class TaskAttemptImpl implements TaskAttempt,
     Vertex vertex = getVertex();
     ProcessorDescriptor procDesc = vertex.getProcessorDescriptor();
     int taskId = getTaskID().getId();
-    return new TaskSpec(getID(), vertex.getName(), procDesc, vertex.getInputSpecList(taskId),
-        vertex.getOutputSpecList(taskId));
+    return new TaskSpec(getID(), vertex.getName(), procDesc, 
+        vertex.getInputSpecList(taskId), vertex.getOutputSpecList(taskId), 
+        vertex.getGroupInputSpecList(taskId));
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/2e231264/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index 87c54bf..0ed07a9 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -105,6 +105,7 @@ import org.apache.tez.dag.app.dag.event.VertexEventTaskReschedule;
 import org.apache.tez.dag.app.dag.event.VertexEventTermination;
 import org.apache.tez.dag.app.dag.event.VertexEventType;
 import org.apache.tez.dag.app.dag.event.VertexEventOneToOneSourceSplit;
+import org.apache.tez.dag.app.dag.impl.DAGImpl.VertexGroupInfo;
 import org.apache.tez.dag.history.DAGHistoryEvent;
 import org.apache.tez.dag.history.events.VertexFinishedEvent;
 import org.apache.tez.dag.history.events.VertexStartedEvent;
@@ -122,6 +123,7 @@ import org.apache.tez.runtime.api.events.InputFailedEvent;
 import org.apache.tez.runtime.api.events.TaskAttemptFailedEvent;
 import org.apache.tez.runtime.api.events.TaskStatusUpdateEvent;
 import org.apache.tez.runtime.api.events.VertexManagerEvent;
+import org.apache.tez.runtime.api.impl.GroupInputSpec;
 import org.apache.tez.runtime.api.impl.EventMetaData;
 import org.apache.tez.runtime.api.impl.EventType;
 import org.apache.tez.runtime.api.impl.InputSpec;
@@ -187,6 +189,8 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
 
   List<InputSpec> inputSpecList;
   List<OutputSpec> outputSpecList;
+  List<GroupInputSpec> groupInputSpecList;
+  Set<String> sharedOutputs = Sets.newHashSet();
 
   private static final InternalErrorTransition
       INTERNAL_ERROR_TRANSITION = new InternalErrorTransition();
@@ -468,6 +472,8 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
   private AtomicBoolean aborted = new AtomicBoolean(false);
   private boolean commitVertexOutputs = false;
   
+  private Map<String, VertexGroupInfo> dagVertexGroups;
+  
   private VertexLocationHint vertexLocationHint;
   private Map<String, LocalResource> localResources;
   private Map<String, String> environment;
@@ -481,7 +487,8 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
       String vertexName, Configuration conf, EventHandler eventHandler,
       TaskAttemptListener taskAttemptListener, Clock clock,
       TaskHeartbeatHandler thh, boolean commitVertexOutputs, 
-      AppContext appContext, VertexLocationHint vertexLocationHint) {
+      AppContext appContext, VertexLocationHint vertexLocationHint, 
+      Map<String, VertexGroupInfo> dagVertexGroups) {
     this.vertexId = vertexId;
     this.vertexPlan = vertexPlan;
     this.vertexName = StringInterner.weakIntern(vertexName);
@@ -528,6 +535,8 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
     if (vertexPlan.getOutputsCount() > 0) {
       setAdditionalOutputs(vertexPlan.getOutputsList());
     }
+    
+    this.dagVertexGroups = dagVertexGroups;
 
     logIdentifier =  this.getVertexId() + " [" + this.getName() + "]";
     // This "this leak" is okay because the retained pointer is in an
@@ -1056,21 +1065,27 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
         LOG.info("Vertex succeeded: " + vertex.logIdentifier);
         try {
           if (vertex.commitVertexOutputs && !vertex.committed.getAndSet(true)) {
-            // commit only once
+            // commit only once. Dont commit shared outputs
             LOG.info("Invoking committer commit for vertex, vertexId="
                 + vertex.logIdentifier);
             if (vertex.outputCommitters != null) {
-              vertex.dagUgi.doAs(new PrivilegedExceptionAction<Void>() {
-                @Override
-                public Void run() throws Exception {
-                  for (Entry<String, OutputCommitter> entry : vertex.outputCommitters.entrySet()) {
-                    LOG.info("Invoking committer commit for output=" + entry.getKey()
-                        + ", vertexId=" + vertex.logIdentifier);
-                    entry.getValue().commitOutput();
-                  }
-                  return null;
+              for (Entry<String, OutputCommitter> entry : vertex.outputCommitters.entrySet()) {
+                final OutputCommitter committer = entry.getValue();
+                final String outputName = entry.getKey();
+                if (vertex.sharedOutputs.contains(outputName)) {
+                  // dont commit shared committers. Will be committed by the DAG
+                  continue;
                 }
-              });
+                vertex.dagUgi.doAs(new PrivilegedExceptionAction<Void>() {
+                  @Override
+                  public Void run() throws Exception {
+                      LOG.info("Invoking committer commit for output=" + outputName
+                          + ", vertexId=" + vertex.logIdentifier);
+                      committer.commitOutput();
+                    return null;
+                  }
+                });
+              }
             }
           }
         } catch (Exception e) {
@@ -1310,6 +1325,20 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
       // Inputs - since events generated by them will be routed to the
       // VertexManager for handling.
 
+      if (vertex.dagVertexGroups != null && !vertex.dagVertexGroups.isEmpty()) {
+        List<GroupInputSpec> groupSpecList = Lists.newLinkedList();
+        for (VertexGroupInfo groupInfo : vertex.dagVertexGroups.values()) {
+          if (groupInfo.edgeMergedInputs.containsKey(vertex.getName())) {
+            InputDescriptor mergedInput = groupInfo.edgeMergedInputs.get(vertex.getName());
+            groupSpecList.add(new GroupInputSpec(groupInfo.groupName, 
+                Lists.newLinkedList(groupInfo.groupMembers), mergedInput));
+          }
+        }
+        if (!groupSpecList.isEmpty()) {
+          vertex.groupInputSpecList = groupSpecList;
+        }
+      }
+      
       // Check if any inputs need initializers
       if (vertex.additionalInputs != null) {
         LOG.info("Root Inputs exist for Vertex: " + vertex.getName() + " : "
@@ -2283,6 +2312,22 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
     }
     return outputSpecList;
   }
+  
+  //TODO Eventually remove synchronization.
+  @Override
+  public synchronized List<GroupInputSpec> getGroupInputSpecList(int taskIndex) {
+    return groupInputSpecList;
+  }
+  
+  @Override
+  public synchronized void addSharedOutputs(Set<String> outputs) {
+    this.sharedOutputs.addAll(outputs);
+  }
+  
+  @Override
+  public synchronized Set<String> getSharedOutputs() {
+    return this.sharedOutputs;
+  }
 
   @VisibleForTesting
   VertexManager getVertexManager() {

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/2e231264/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
index 4c94d74..1d8274b 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
@@ -33,11 +33,21 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.event.DrainDispatcher;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.util.Clock;
 import org.apache.hadoop.yarn.util.SystemClock;
+import org.apache.tez.dag.api.GroupInputEdge;
+import org.apache.tez.dag.api.DAG;
+import org.apache.tez.dag.api.EdgeProperty;
+import org.apache.tez.dag.api.InputDescriptor;
+import org.apache.tez.dag.api.OutputDescriptor;
+import org.apache.tez.dag.api.ProcessorDescriptor;
 import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
+import org.apache.tez.dag.api.EdgeProperty.DataSourceType;
+import org.apache.tez.dag.api.EdgeProperty.SchedulingType;
 import org.apache.tez.dag.api.oldrecords.TaskState;
 import org.apache.tez.dag.api.records.DAGProtos;
 import org.apache.tez.dag.api.records.DAGProtos.DAGPlan;
@@ -112,14 +122,20 @@ public class TestDAGImpl {
   private DAGPlan mrrDagPlan;
   private DAGImpl mrrDag;
   private TezDAGID mrrDagId;
+  private AppContext groupAppContext;
+  private DAGPlan groupDagPlan;
+  private DAGImpl groupDag;
+  private TezDAGID groupDagId;
 
   private class DagEventDispatcher implements EventHandler<DAGEvent> {
     @Override
     public void handle(DAGEvent event) {
       if (event.getDAGId().equals(dagId)) {
         dag.handle(event);
-      }  else if (event.getDAGId().equals(mrrDagId)) {
+      } else if (event.getDAGId().equals(mrrDagId)) {
         mrrDag.handle(event);
+      } else if (event.getDAGId().equals(groupDagId)) {
+        groupDag.handle(event);
       } else {
         throw new RuntimeException("Invalid event, unknown dag"
             + ", dagId=" + event.getDAGId());
@@ -143,8 +159,8 @@ public class TestDAGImpl {
     @SuppressWarnings("unchecked")
     @Override
     public void handle(TaskEvent event) {
-      DAGImpl handler = event.getTaskID().getVertexID().getDAGId().equals(dagId) ?
-          dag : mrrDag;
+      TezDAGID id = event.getTaskID().getVertexID().getDAGId();
+      DAGImpl handler = id.equals(dagId) ? dag : (id.equals(mrrDagId) ? mrrDag : groupDag);
       Vertex vertex = handler.getVertex(event.getTaskID().getVertexID());
       Task task = vertex.getTask(event.getTaskID());
       ((EventHandler<TaskEvent>)task).handle(event);
@@ -157,8 +173,8 @@ public class TestDAGImpl {
     @SuppressWarnings("unchecked")
     @Override
     public void handle(VertexEvent event) {
-      DAGImpl handler = event.getVertexId().getDAGId().equals(dagId) ?
-          dag : mrrDag;
+      TezDAGID id = event.getVertexId().getDAGId();
+      DAGImpl handler = id.equals(dagId) ? dag : (id.equals(mrrDagId) ? mrrDag : groupDag);
       Vertex vertex = handler.getVertex(event.getVertexId());
       ((EventHandler<VertexEvent>) vertex).handle(event);
     }
@@ -298,6 +314,54 @@ public class TestDAGImpl {
 
     return dag;
   }
+  
+  public static class TotalCountingOutputCommitter extends CountingOutputCommitter {
+    static int totalCommitCounter = 0;
+    public TotalCountingOutputCommitter() {
+      super();
+    }
+    @Override
+    public void commitOutput() throws IOException {
+      ++totalCommitCounter;
+      super.commitOutput();
+    }
+  }
+  
+  // Create a plan with 3 vertices: A, B, C. Group(A,B)->C
+  private DAGPlan createGroupDAGPlan() {
+    LOG.info("Setting up group dag plan");
+    int dummyTaskCount = 1;
+    Resource dummyTaskResource = Resource.newInstance(1, 1);
+    org.apache.tez.dag.api.Vertex v1 = new org.apache.tez.dag.api.Vertex("vertex1",
+        new ProcessorDescriptor("Processor"),
+        dummyTaskCount, dummyTaskResource);
+    org.apache.tez.dag.api.Vertex v2 = new org.apache.tez.dag.api.Vertex("vertex2",
+        new ProcessorDescriptor("Processor"),
+        dummyTaskCount, dummyTaskResource);
+    org.apache.tez.dag.api.Vertex v3 = new org.apache.tez.dag.api.Vertex("vertex3",
+        new ProcessorDescriptor("Processor"),
+        dummyTaskCount, dummyTaskResource);
+    
+    DAG dag = new DAG("testDag");
+    String groupName1 = "uv12";
+    org.apache.tez.dag.api.VertexGroup uv12 = dag.createVertexGroup(groupName1, v1, v2);
+    OutputDescriptor outDesc = new OutputDescriptor("output.class");
+    uv12.addOutput("uvOut", outDesc, TotalCountingOutputCommitter.class);
+    v3.addOutput("uvOut", outDesc, TotalCountingOutputCommitter.class);
+    
+    GroupInputEdge e1 = new GroupInputEdge(uv12, v3,
+        new EdgeProperty(DataMovementType.SCATTER_GATHER, 
+            DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL,
+            new OutputDescriptor("dummy output class"),
+            new InputDescriptor("dummy input class")), 
+            new InputDescriptor("merge.class"));
+    
+    dag.addVertex(v1);
+    dag.addVertex(v2);
+    dag.addVertex(v3);
+    dag.addEdge(e1);
+    return dag.createDag(conf);
+  }
 
   private DAGPlan createTestDAGPlan() {
     LOG.info("Setting up dag plan");
@@ -548,6 +612,16 @@ public class TestDAGImpl {
     doReturn(conf).when(mrrAppContext).getAMConf();
     doReturn(mrrDag).when(mrrAppContext).getCurrentDAG();
     doReturn(appAttemptId).when(mrrAppContext).getApplicationAttemptId();
+    groupAppContext = mock(AppContext.class);
+    groupDagId = TezDAGID.getInstance(appAttemptId.getApplicationId(), 3);
+    groupDagPlan = createGroupDAGPlan();
+    groupDag = new DAGImpl(groupDagId, conf, groupDagPlan,
+        dispatcher.getEventHandler(),  taskAttemptListener,
+        fsTokens, clock, "user", thh,
+        groupAppContext);
+    doReturn(conf).when(groupAppContext).getAMConf();
+    doReturn(groupDag).when(groupAppContext).getCurrentDAG();
+    doReturn(appAttemptId).when(groupAppContext).getApplicationAttemptId();
     taskEventDispatcher = new TaskEventDispatcher();
     dispatcher.register(TaskEventType.class, taskEventDispatcher);
     vertexEventDispatcher = new VertexEventDispatcher();
@@ -640,6 +714,28 @@ public class TestDAGImpl {
   
   @SuppressWarnings("unchecked")
   @Test(timeout = 5000)
+  public void testGroupDAGCompletionWithCommitSuccess() {
+    // should have only 2 commits. 1 vertex3 commit and 1 group commit.
+    initDAG(groupDag);
+    startDAG(groupDag);
+    dispatcher.await();
+
+    for (int i=0; i<3; ++i) {
+      Vertex v = groupDag.getVertex("vertex"+(i+1));
+      dispatcher.getEventHandler().handle(new VertexEventTaskCompleted(
+          TezTaskID.getInstance(v.getVertexId(), 0), TaskState.SUCCEEDED));
+      dispatcher.await();
+      Assert.assertEquals(VertexState.SUCCEEDED, v.getState());
+      Assert.assertEquals(i+1, groupDag.getSuccessfulVertices());
+    }
+    
+    Assert.assertEquals(3, groupDag.getSuccessfulVertices());
+    Assert.assertEquals(DAGState.SUCCEEDED, groupDag.getState());
+    Assert.assertEquals(2, TotalCountingOutputCommitter.totalCommitCounter);
+  }  
+  
+  @SuppressWarnings("unchecked")
+  @Test(timeout = 5000)
   public void testDAGCompletionWithCommitSuccess() {
     // all vertices completed -> DAG completion and commit
     initDAG(mrrDag);

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/2e231264/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
index 187f9f9..6a668e3 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
@@ -65,6 +65,8 @@ import org.apache.tez.dag.api.oldrecords.TaskState;
 import org.apache.tez.dag.api.records.DAGProtos;
 import org.apache.tez.dag.api.records.DAGProtos.DAGPlan;
 import org.apache.tez.dag.api.records.DAGProtos.EdgePlan;
+import org.apache.tez.dag.api.records.DAGProtos.PlanGroupInputEdgeInfo;
+import org.apache.tez.dag.api.records.DAGProtos.PlanVertexGroupInfo;
 import org.apache.tez.dag.api.records.DAGProtos.PlanEdgeDataMovementType;
 import org.apache.tez.dag.api.records.DAGProtos.PlanEdgeDataSourceType;
 import org.apache.tez.dag.api.records.DAGProtos.PlanEdgeSchedulingType;
@@ -98,6 +100,7 @@ import org.apache.tez.dag.app.dag.event.VertexEventTaskCompleted;
 import org.apache.tez.dag.app.dag.event.VertexEventTaskReschedule;
 import org.apache.tez.dag.app.dag.event.VertexEventTermination;
 import org.apache.tez.dag.app.dag.event.VertexEventType;
+import org.apache.tez.dag.app.dag.impl.DAGImpl.VertexGroupInfo;
 import org.apache.tez.dag.app.rm.TaskSchedulerEventHandler;
 import org.apache.tez.dag.history.DAGHistoryEvent;
 import org.apache.tez.dag.history.avro.HistoryEventType;
@@ -112,6 +115,7 @@ import org.apache.tez.runtime.api.OutputCommitterContext;
 import org.apache.tez.runtime.api.events.RootInputConfigureVertexTasksEvent;
 import org.apache.tez.runtime.api.events.RootInputDataInformationEvent;
 import org.apache.tez.test.EdgeManagerForTest;
+import org.apache.tez.runtime.api.impl.GroupInputSpec;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -120,6 +124,8 @@ import org.junit.Test;
 import org.mockito.internal.util.collections.Sets;
 
 import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
@@ -143,6 +149,7 @@ public class TestVertexImpl {
   private VertexLocationHint vertexLocationHint = null;
   private Configuration conf;
   private Map<String, Edge> edges;
+  private Map<String, VertexGroupInfo> vertexGroups;
   private byte[] edgePayload = "EP".getBytes();
 
   private TaskAttemptEventDispatcher taskAttemptEventDispatcher;
@@ -899,6 +906,103 @@ public class TestVertexImpl {
 
     return dag;
   }
+  
+  // Create a plan with 3 vertices: A, B, C. Group(A,B)->C
+  private DAGPlan createVertexGroupDAGPlan() {
+    LOG.info("Setting up group dag plan");
+    DAGPlan dag = DAGPlan.newBuilder()
+        .setName("TestGroupDAG")
+        .addVertex(
+          VertexPlan.newBuilder()
+            .setName("A")
+            .setProcessorDescriptor(TezEntityDescriptorProto.newBuilder().setClassName("A.class"))
+            .setType(PlanVertexType.NORMAL)
+            .setTaskConfig(
+              PlanTaskConfiguration.newBuilder()
+                .setNumTasks(1)
+                .setVirtualCores(4)
+                .setMemoryMb(1024)
+                .setJavaOpts("")
+                .setTaskModule("A.class")
+                .build()
+            )
+            .addOutEdgeId("A_C")
+            .build()
+        )
+        .addVertex(
+          VertexPlan.newBuilder()
+            .setName("B")
+            .setProcessorDescriptor(TezEntityDescriptorProto.newBuilder().setClassName("B.class"))
+            .setType(PlanVertexType.NORMAL)
+            .setTaskConfig(
+              PlanTaskConfiguration.newBuilder()
+                .setNumTasks(2)
+                .setVirtualCores(4)
+                .setMemoryMb(1024)
+                .setJavaOpts("")
+                .setTaskModule("")
+                .build()
+            )
+            .addOutEdgeId("B_C")
+            .build()
+        )
+        .addVertex(
+          VertexPlan.newBuilder()
+            .setName("C")
+            .setType(PlanVertexType.NORMAL)
+            .setProcessorDescriptor(TezEntityDescriptorProto.newBuilder().setClassName("C.class"))
+            .setTaskConfig(
+              PlanTaskConfiguration.newBuilder()
+                .setNumTasks(2)
+                .setVirtualCores(4)
+                .setMemoryMb(1024)
+                .setJavaOpts("foo")
+                .setTaskModule("x3.y3")
+                .build()
+            )
+            .addInEdgeId("A_C")
+            .addInEdgeId("B_C")
+            .build()
+        )
+        .addEdge(
+            EdgePlan.newBuilder()
+                .setEdgeDestination(TezEntityDescriptorProto.newBuilder().setClassName("A_C"))
+                .setInputVertexName("A")
+                .setEdgeSource(TezEntityDescriptorProto.newBuilder().setClassName("A_C.class"))
+                .setOutputVertexName("C")
+                .setDataMovementType(PlanEdgeDataMovementType.SCATTER_GATHER)
+                .setId("A_C")
+                .setDataSourceType(PlanEdgeDataSourceType.PERSISTED)
+                .setSchedulingType(PlanEdgeSchedulingType.SEQUENTIAL)
+                .build()
+        )
+        .addEdge(
+            EdgePlan.newBuilder()
+                .setEdgeDestination(TezEntityDescriptorProto.newBuilder().setClassName("B_C.class"))
+                .setInputVertexName("B")
+                .setEdgeSource(TezEntityDescriptorProto.newBuilder().setClassName("B_C.class"))
+                .setOutputVertexName("C")
+                .setDataMovementType(PlanEdgeDataMovementType.SCATTER_GATHER)
+                .setId("B_C")
+                .setDataSourceType(PlanEdgeDataSourceType.PERSISTED)
+                .setSchedulingType(PlanEdgeSchedulingType.SEQUENTIAL)
+                .build()
+          )
+         .addVertexGroups(
+              PlanVertexGroupInfo.newBuilder().
+                setGroupName("Group").
+                addGroupMembers("A").
+                addGroupMembers("B").
+                addEdgeMergedInputs(
+                    PlanGroupInputEdgeInfo.newBuilder().setDestVertexName("C").
+                    setMergedInput(
+                        TezEntityDescriptorProto.newBuilder().
+                          setClassName("Group.class")
+                          .build()).build()))
+        .build();
+
+    return dag;
+  }
 
   // Create a plan with 3 vertices: A, B, C
   // A -> C, B -> C
@@ -1024,7 +1128,7 @@ public class TestVertexImpl {
       } else {
         v = new VertexImpl(vertexId, vPlan, vPlan.getName(), conf,
             dispatcher.getEventHandler(), taskAttemptListener,
-            clock, thh, true, appContext, locationHint);
+            clock, thh, true, appContext, locationHint, vertexGroups);
       }
       vertices.put(vName, v);
       vertexIdMap.put(vertexId, v);
@@ -1104,6 +1208,11 @@ public class TestVertexImpl {
     doReturn(taskScheduler).when(appContext).getTaskScheduler();
     doReturn(Resource.newInstance(102400, 60)).when(taskScheduler).getTotalResources();
 
+    vertexGroups = Maps.newHashMap();
+    for (PlanVertexGroupInfo groupInfo : dagPlan.getVertexGroupsList()) {
+      vertexGroups.put(groupInfo.getGroupName(), new VertexGroupInfo(groupInfo));
+    }
+    
     setupVertices();
     when(dag.getVertex(any(TezVertexID.class))).thenAnswer(new Answer<Vertex>() {
       @Override
@@ -2044,7 +2153,7 @@ public class TestVertexImpl {
       VertexPlan vPlan = invalidDagPlan.getVertex(0);
       VertexImpl v = new VertexImpl(vId, vPlan, vPlan.getName(), conf,
           dispatcher.getEventHandler(), taskAttemptListener,
-          clock, thh, true, appContext, vertexLocationHint);
+          clock, thh, true, appContext, vertexLocationHint, null);
       vertexIdMap.put(vId, v);
       v.handle(new VertexEvent(vId, VertexEventType.V_INIT));
       dispatcher.await();
@@ -2072,7 +2181,7 @@ public class TestVertexImpl {
         AppContext appContext, VertexLocationHint vertexLocationHint, DrainDispatcher dispatcher) {
       super(vertexId, vertexPlan, vertexName, conf, eventHandler,
           taskAttemptListener, clock, thh, true,
-          appContext, vertexLocationHint);
+          appContext, vertexLocationHint, null);
       this.dispatcher = dispatcher;
     }
 
@@ -2150,6 +2259,34 @@ public class TestVertexImpl {
   }
 
   @SuppressWarnings("unchecked")
+  @Test(timeout=5000)
+  public void testVertexGroupInput() {
+    setupPreDagCreation();
+    dagPlan = createVertexGroupDAGPlan();
+    setupPostDagCreation();
+
+    VertexImpl vA = vertices.get("A");
+    VertexImpl vB = vertices.get("B");
+    VertexImpl vC = vertices.get("C");
+
+    dispatcher.getEventHandler().handle(new VertexEvent(vA.getVertexId(),
+      VertexEventType.V_INIT));
+    dispatcher.getEventHandler().handle(new VertexEvent(vB.getVertexId(),
+        VertexEventType.V_INIT));
+    dispatcher.await();
+    
+    Assert.assertNull(vA.getGroupInputSpecList(0));
+    Assert.assertNull(vB.getGroupInputSpecList(0));
+    
+    List<GroupInputSpec> groupInSpec = vC.getGroupInputSpecList(0);
+    Assert.assertEquals(1, groupInSpec.size());
+    Assert.assertEquals("Group", groupInSpec.get(0).getGroupName());
+    Assert.assertTrue(groupInSpec.get(0).getGroupVertices().contains("A"));
+    Assert.assertTrue(groupInSpec.get(0).getGroupVertices().contains("B"));
+    groupInSpec.get(0).getMergedInputDescriptor().getClassName().equals("Group.class");
+  }
+  
+  @SuppressWarnings("unchecked")
   @Test(timeout = 5000)
   public void testInitStartRace() {
     // Race when a source vertex manages to start before the target vertex has

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/2e231264/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java
index 4b039b0..61d60ae 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java
@@ -900,7 +900,7 @@ public class TestContainerReuse {
       Collections.singletonList(new InputSpec("vertexName",
         new InputDescriptor("inputClassName"), 1)),
       Collections.singletonList(new OutputSpec("vertexName",
-        new OutputDescriptor("outputClassName"), 1))), ta, hosts, racks,
+        new OutputDescriptor("outputClassName"), 1)), null), ta, hosts, racks,
       priority, containerContext);
     return lr;
   }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/2e231264/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/ExampleDriver.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/ExampleDriver.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/ExampleDriver.java
index cb34b2a..d07db3c 100644
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/ExampleDriver.java
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/ExampleDriver.java
@@ -82,6 +82,8 @@ public class ExampleDriver {
           "MRR Sleep Job");
       pgd.addClass("orderedwordcount", OrderedWordCount.class,
           "Word Count with words sorted on frequency");
+      pgd.addClass("unionexample", UnionExample.class,
+          "Union example");
       pgd.addClass("filterLinesByWord", FilterLinesByWord.class,
           "Filters lines by the specified word using broadcast edge");
       pgd.addClass("filterLinesByWordOneToOne", FilterLinesByWordOneToOne.class,


Mime
View raw message