tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From zjf...@apache.org
Subject tez git commit: TEZ-2221. VertexGroup name should be unqiue (zjffdu)
Date Thu, 30 Apr 2015 05:01:22 GMT
Repository: tez
Updated Branches:
  refs/heads/branch-0.5 816ac188a -> 210359641


TEZ-2221. VertexGroup name should be unqiue (zjffdu)

(cherry picked from commit 2382f09ffaabe1be6ede8203025027198c3e6d2b)

Conflicts:
	tez-api/src/main/java/org/apache/tez/dag/api/DAG.java
	tez-api/src/main/java/org/apache/tez/dag/api/VertexGroup.java
	tez-api/src/test/java/org/apache/tez/dag/api/TestDAG.java


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/21035964
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/21035964
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/21035964

Branch: refs/heads/branch-0.5
Commit: 210359641ec79bf95d2193cc2976e8cd9e6ce6a1
Parents: 816ac18
Author: Jeff Zhang <zjffdu@apache.org>
Authored: Thu Apr 30 12:53:30 2015 +0800
Committer: Jeff Zhang <zjffdu@apache.org>
Committed: Thu Apr 30 13:01:09 2015 +0800

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../main/java/org/apache/tez/dag/api/DAG.java   | 25 ++++++++++++-
 .../org/apache/tez/dag/api/VertexGroup.java     | 30 ++++++++++++++--
 .../java/org/apache/tez/dag/api/TestDAG.java    | 38 ++++++++++++++++++++
 4 files changed, 91 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/21035964/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 16b368e..bfac4b5 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -4,6 +4,7 @@ Apache Tez Change Log
 Release 0.5.4: Unreleased
 
 ALL CHANGES:
+  TEZ-2221. VertexGroup name should be unqiue
   TEZ-1521. VertexDataMovementEventsGeneratedEvent may be logged twice in recovery log
   TEZ-1560. Invalid state machine handling for V_SOURCE_VERTEX_RECOVERED in recovery.
   TEZ-2348. EOF exception during UnorderedKVReader.next().

http://git-wip-us.apache.org/repos/asf/tez/blob/21035964/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java b/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java
index edb10d6..ed06db1 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java
@@ -60,7 +60,9 @@ import org.apache.tez.dag.api.records.DAGProtos.PlanVertexType;
 import org.apache.tez.dag.api.records.DAGProtos.VertexPlan;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Function;
 import com.google.common.base.Preconditions;
+import com.google.common.collect.Collections2;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
@@ -83,6 +85,9 @@ public class DAG {
   final Collection<URI> urisForCredentials = new HashSet<URI>();
   Credentials credentials = new Credentials();
   Set<VertexGroup> vertexGroups = Sets.newHashSet();
+  // to verify the vertex Group memberSet should be unique
+  private Set<Set<String>> vertexGroupMemberSets = Sets.newHashSet();
+
   Set<GroupInputEdge> groupInputEdges = Sets.newHashSet();
   private DAGAccessControls dagAccessControls;
   Map<String, LocalResource> commonTaskLocalFiles = Maps.newHashMap();
@@ -174,8 +179,26 @@ public class DAG {
    * @return {@link DAG}
    */
   public synchronized VertexGroup createVertexGroup(String name, Vertex... members) {
+    // vertex group member set should be unique
+    Collection<String> memberNames =
+        Collections2.transform(Lists.newArrayList(members), new Function<Vertex, String>()
{
+      @Override
+      public String apply(Vertex v) {
+        return v.getName();
+      }
+    });
+    if (!vertexGroupMemberSets.add(Sets.newHashSet(memberNames))){
+      throw new IllegalStateException(
+          "VertexGroup " + memberNames + " already defined as another group!");
+    }
+
+    // vertex group name should be unique.
     VertexGroup uv = new VertexGroup(name, members);
-    vertexGroups.add(uv);
+    if (!vertexGroups.add(uv)) {
+      throw new IllegalStateException(
+          "VertexGroup " + name + " already defined!");
+    }
+
     return uv;
   }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/21035964/tez-api/src/main/java/org/apache/tez/dag/api/VertexGroup.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/VertexGroup.java b/tez-api/src/main/java/org/apache/tez/dag/api/VertexGroup.java
index f1887a1..06d0d51 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/VertexGroup.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/VertexGroup.java
@@ -108,7 +108,7 @@ public class VertexGroup {
   
   @Override
   public String toString() {
-    return "[ VertexGroup: " + groupInfo.getGroupName() + "]";
+    return "[ VertexGroup: " + groupInfo.getGroupName() + "]" + ", members=" + groupInfo.members;
   }
 
   GroupInfo getGroupInfo() {
@@ -122,5 +122,31 @@ public class VertexGroup {
   void addOutputVertex(Vertex outputVertex, GroupInputEdge edge) {
     this.groupInfo.edgeMergedInputs.put(outputVertex.getName(), edge.getMergedInput());
   }
-  
+
+  @Override
+  public int hashCode() {
+    final int prime = 31;
+    int result = 1;
+    result = prime * result
+        + ((groupInfo.groupName == null) ? 0 : groupInfo.groupName.hashCode());
+    return result;
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj)
+      return true;
+    if (obj == null)
+      return false;
+    if (getClass() != obj.getClass())
+      return false;
+    VertexGroup other = (VertexGroup) obj;
+    if (groupInfo.groupName == null) {
+      if (other.groupInfo.groupName != null)
+        return false;
+    } else if (!groupInfo.groupName.equals(other.groupInfo.groupName))
+      return false;
+
+    return true;
+  }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/21035964/tez-api/src/test/java/org/apache/tez/dag/api/TestDAG.java
----------------------------------------------------------------------
diff --git a/tez-api/src/test/java/org/apache/tez/dag/api/TestDAG.java b/tez-api/src/test/java/org/apache/tez/dag/api/TestDAG.java
index a694b8d..595246e 100644
--- a/tez-api/src/test/java/org/apache/tez/dag/api/TestDAG.java
+++ b/tez-api/src/test/java/org/apache/tez/dag/api/TestDAG.java
@@ -18,11 +18,49 @@
 
 package org.apache.tez.dag.api;
 
+import org.apache.hadoop.yarn.api.records.Resource;
 import org.junit.Assert;
 import org.junit.Test;
 
 public class TestDAG {
 
+  private final int dummyTaskCount = 2;
+  private final Resource dummyTaskResource = Resource.newInstance(1, 1);
+
+  @Test(timeout = 5000)
+  public void testDuplicatedVertexGroup() {
+    Vertex v1 = Vertex.create("v1", ProcessorDescriptor.create("Processor"),
+        dummyTaskCount, dummyTaskResource);
+    Vertex v2 = Vertex.create("v2", ProcessorDescriptor.create("Processor"),
+        dummyTaskCount, dummyTaskResource);
+    Vertex v3 = Vertex.create("v3", ProcessorDescriptor.create("Processor"),
+        dummyTaskCount, dummyTaskResource);
+
+    DAG dag = DAG.create("testDAG");
+    dag.createVertexGroup("group_1", v1,v2);
+    try {
+      dag.createVertexGroup("group_1", v1,v2);
+      Assert.fail("should fail it due to duplicated VertexGroups");
+    } catch (Exception e) {
+      e.printStackTrace();
+      Assert.assertEquals("VertexGroup [v1, v2] already defined as another group!", e.getMessage());
+    }
+    try {
+      dag.createVertexGroup("group_1", v2, v3);
+      Assert.fail("should fail it due to duplicated VertexGroups");
+    } catch (Exception e) {
+      e.printStackTrace();
+      Assert.assertEquals("VertexGroup group_1 already defined!", e.getMessage());
+    }
+    try {
+      dag.createVertexGroup("group_2", v1, v2);
+      Assert.fail("should fail it due to duplicated VertexGroups");
+    } catch (Exception e) {
+      e.printStackTrace();
+      Assert.assertEquals("VertexGroup [v1, v2] already defined as another group!", e.getMessage());
+    }
+  }
+
   @Test(timeout = 5000)
   public void testDuplicatedInput() {
     Vertex v1 = Vertex.create("v1", ProcessorDescriptor.create("dummyProcessor"));


Mime
View raw message