Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 1E139200C69 for ; Tue, 28 Mar 2017 21:47:49 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 1CFC2160B89; Tue, 28 Mar 2017 19:47:49 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 33EEE160BC5 for ; Tue, 28 Mar 2017 21:47:45 +0200 (CEST) Received: (qmail 73170 invoked by uid 500); 28 Mar 2017 19:47:44 -0000 Mailing-List: contact commits-help@tez.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@tez.apache.org Delivered-To: mailing list commits@tez.apache.org Received: (qmail 71598 invoked by uid 99); 28 Mar 2017 19:47:43 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 28 Mar 2017 19:47:43 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 7B9E5E001D; Tue, 28 Mar 2017 19:47:43 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: zhiyuany@apache.org To: commits@tez.apache.org Date: Tue, 28 Mar 2017 19:48:31 -0000 Message-Id: <993d5b04aa59489bb3d5437562643098@git.apache.org> In-Reply-To: <3c41745652fc4012b06b29e1b1cbf344@git.apache.org> References: <3c41745652fc4012b06b29e1b1cbf344@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [50/50] [abbrv] tez git commit: TEZ-3511. Allow user to create named edge archived-at: Tue, 28 Mar 2017 19:47:49 -0000 TEZ-3511. Allow user to create named edge Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/d1b08e37 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/d1b08e37 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/d1b08e37 Branch: refs/heads/TEZ-1190 Commit: d1b08e37e3e5bc16c6eea130dfefb52aca84b3a5 Parents: 906c2a8 Author: Zhiyuan Yang Authored: Mon Dec 5 15:34:55 2016 -0800 Committer: Zhiyuan Yang Committed: Tue Mar 28 11:20:23 2017 -0700 ---------------------------------------------------------------------- .../main/java/org/apache/tez/dag/api/DAG.java | 114 ++++++--- .../main/java/org/apache/tez/dag/api/Edge.java | 81 +++--- .../org/apache/tez/dag/api/EdgeProperty.java | 4 +- .../java/org/apache/tez/dag/api/TestDAG.java | 26 ++ .../org/apache/tez/dag/api/TestDAGVerify.java | 249 +++++++++++++++++++ .../java/org/apache/tez/dag/api/TestEdge.java | 75 ++++++ 6 files changed, 480 insertions(+), 69 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/d1b08e37/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java b/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java index c136811..9151d48 100644 --- a/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java +++ b/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java @@ -273,6 +273,13 @@ public class DAG { /** * Add an {@link Edge} connecting vertices in the DAG + * + * All edges within a DAG must be either named (created via + * {@link org.apache.tez.dag.api.Edge#create(Vertex, Vertex, EdgeProperty, String)}) or unnamed + * (created via {@link org.apache.tez.dag.api.Edge#create(Vertex, Vertex, EdgeProperty)}). + * If edges are named, all inbound edges to a vertex should have unique names. Likewise for outbound edges. + * A vertex can have an inbound edge that uses the same name as that used by an outbound edge. + * * @param edge The edge to be added * @return {@link DAG} */ @@ -571,8 +578,6 @@ public class DAG { // check for valid vertices, duplicate vertex names, // and prepare for cycle detection Map vertexMap = new HashMap(); - Map> inboundVertexMap = new HashMap>(); - Map> outboundVertexMap = new HashMap>(); for (Vertex v : vertices.values()) { if (vertexMap.containsKey(v.getName())) { throw new IllegalStateException("DAG contains multiple vertices" @@ -581,33 +586,45 @@ public class DAG { vertexMap.put(v.getName(), new AnnotatedVertex(v)); } - Map> edgeMap = new HashMap>(); + // named edge cannot be mixed with unnamed edge or group edge + Edge namedEdge = null, unnamedEdge = null; + for (Edge e : edges) { + if (e.getName() == null) { + unnamedEdge = e; + } else { + namedEdge = e; + } + + if (namedEdge != null && !groupInputEdges.isEmpty()) { + throw new IllegalStateException("DAG shouldn't contains both named edge " + namedEdge + + " and group edge " + groupInputEdges.iterator().next()); + } + if (namedEdge != null && unnamedEdge != null) { + throw new IllegalStateException("DAG shouldn't contains both named edge " + namedEdge + + " and unnamed edge " + unnamedEdge); + } + } + + Map> inEdgeMap = new HashMap<>(); + Map> outEdgeMap = new HashMap<>(); for (Edge e : edges) { // Construct structure for cycle detection Vertex inputVertex = e.getInputVertex(); Vertex outputVertex = e.getOutputVertex(); - List edgeList = edgeMap.get(inputVertex); - if (edgeList == null) { - edgeList = new ArrayList(); - edgeMap.put(inputVertex, edgeList); - } - edgeList.add(e); - - // Construct map for Input name verification - Set inboundSet = inboundVertexMap.get(outputVertex); - if (inboundSet == null) { - inboundSet = new HashSet(); - inboundVertexMap.put(outputVertex, inboundSet); + + List outEdgeList = outEdgeMap.get(inputVertex); + if (outEdgeList == null) { + outEdgeList = new ArrayList(); + outEdgeMap.put(inputVertex, outEdgeList); } - inboundSet.add(inputVertex.getName()); - - // Construct map for Output name verification - Set outboundSet = outboundVertexMap.get(inputVertex); - if (outboundSet == null) { - outboundSet = new HashSet(); - outboundVertexMap.put(inputVertex, outboundSet); + outEdgeList.add(e); + + List inEdgeList = inEdgeMap.get(outputVertex); + if (inEdgeList == null) { + inEdgeList = new ArrayList(); + inEdgeMap.put(outputVertex, inEdgeList); } - outboundSet.add(outputVertex.getName()); + inEdgeList.add(e); } // check input and output names don't collide with vertex names @@ -633,29 +650,54 @@ public class DAG { } // Check for valid InputNames - for (Entry> entry : inboundVertexMap.entrySet()) { + for (Entry> entry : inEdgeMap.entrySet()) { Vertex vertex = entry.getKey(); + Set inputs = new HashSet<>(); + + for (Edge edge : entry.getValue()) { + String name = edge.getName(); + if (name == null) { + name = edge.getInputVertex().getName(); + } + if (inputs.contains(name)) { + throw new IllegalStateException("Vertex: " + vertex.getName() + " contains multiple " + + "incoming edges with name " + name); + } + inputs.add(name); + } for (RootInputLeafOutput input : vertex.getInputs()) { - if (entry.getValue().contains(input.getName())) { + if (inputs.contains(input.getName())) { throw new IllegalStateException("Vertex: " + vertex.getName() - + " contains an incoming vertex and Input with the same name: " - + input.getName()); + + " contains an incoming " + (namedEdge != null ? "edge" : "vertex") + + " and Input with the same name: " + input.getName()); } } } - // Check for valid OutputNames - for (Entry> entry : outboundVertexMap.entrySet()) { + for (Entry> entry : outEdgeMap.entrySet()) { Vertex vertex = entry.getKey(); - for (RootInputLeafOutput - output : vertex.getOutputs()) { - if (entry.getValue().contains(output.getName())) { + Set outputs = new HashSet<>(); + + for (Edge edge : entry.getValue()) { + String name = edge.getName(); + if (name == null) { + name = edge.getOutputVertex().getName(); + } + if (outputs.contains(name)) { + throw new IllegalStateException("Vertex: " + vertex.getName() + " contains multiple " + + "outgoing edges with name " + name); + } + outputs.add(name); + } + for (RootInputLeafOutput + output : vertex.getOutputs()) { + if (outputs.contains(output.getName())) { throw new IllegalStateException("Vertex: " - + vertex.getName() - + " contains an outgoing vertex and Output with the same name: " - + output.getName()); + + vertex.getName() + + " contains an outgoing " + (namedEdge != null ? "edge" : "vertex") + + " and Output with the same name: " + output.getName()); } } } @@ -666,7 +708,7 @@ public class DAG { // When additional inputs are supported, this can be chceked easily (and early) // within the addInput / addOutput call itself. - Deque topologicalVertexStack = detectCycles(edgeMap, vertexMap); + Deque topologicalVertexStack = detectCycles(outEdgeMap, vertexMap); checkAndInferOneToOneParallelism(); http://git-wip-us.apache.org/repos/asf/tez/blob/d1b08e37/tez-api/src/main/java/org/apache/tez/dag/api/Edge.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/Edge.java b/tez-api/src/main/java/org/apache/tez/dag/api/Edge.java index db509f7..794d88d 100644 --- a/tez-api/src/main/java/org/apache/tez/dag/api/Edge.java +++ b/tez-api/src/main/java/org/apache/tez/dag/api/Edge.java @@ -24,7 +24,7 @@ import org.apache.hadoop.classification.InterfaceAudience.Public; * Edge defines the connection between a producer and consumer vertex in the DAG. * @link {@link EdgeProperty} defines the relationship between them. The producer * vertex provides input to the edge and the consumer vertex reads output from the - * edge. + * edge. Edge could be either named or not. * */ @Public @@ -33,13 +33,16 @@ public class Edge { private final Vertex inputVertex; private final Vertex outputVertex; private final EdgeProperty edgeProperty; + private final String name; private Edge(Vertex inputVertex, Vertex outputVertex, - EdgeProperty edgeProperty) { + EdgeProperty edgeProperty, + String name) { this.inputVertex = inputVertex; this.outputVertex = outputVertex; this.edgeProperty = edgeProperty; + this.name = name; } @@ -57,7 +60,25 @@ public class Edge { public static Edge create(Vertex inputVertex, Vertex outputVertex, EdgeProperty edgeProperty) { - return new Edge(inputVertex, outputVertex, edgeProperty); + return new Edge(inputVertex, outputVertex, edgeProperty, null); + } + + /** + * Creates an edge with specified name between the specified vertices. + * + * InputVertex(EdgeInput) ----- Edge ----- OutputVertex(EdgeOutput)] + * + * @param inputVertex the vertex which generates data to the edge. + * @param outputVertex the vertex which consumes data from the edge + * @param edgeProperty {@link org.apache.tez.dag.api.EdgeProperty} associated with this edge + * @param name name of edge + * @return the {@link org.apache.tez.dag.api.Edge} + */ + public static Edge create(Vertex inputVertex, + Vertex outputVertex, + EdgeProperty edgeProperty, + String name) { + return new Edge(inputVertex, outputVertex, edgeProperty, name); } /** @@ -83,6 +104,14 @@ public class Edge { public EdgeProperty getEdgeProperty() { return edgeProperty; } + + /** + * The name of this edge (or null if edge has no name) + * @return edge name or null + */ + public String getName() { + return name; + } /* * Used to identify the edge in the configuration @@ -95,39 +124,29 @@ public class Edge { @Override public String toString() { - return inputVertex + " -> " + outputVertex + " (" + edgeProperty + ")"; + return "{" + name + " : " + inputVertex + " -> " + outputVertex + " " + edgeProperty + "}"; } @Override - public int hashCode() { - final int prime = 31; - int result = 1; - result = prime * result - + ((inputVertex == null) ? 0 : inputVertex.hashCode()); - result = prime * result - + ((outputVertex == null) ? 0 : outputVertex.hashCode()); - return result; - } + public boolean equals(Object other) { + if (this == other) return true; + if (other == null || getClass() != other.getClass()) return false; - @Override - public boolean equals(Object obj) { - if (this == obj) - return true; - if (obj == null) - return false; - if (getClass() != obj.getClass()) - return false; - Edge other = (Edge) obj; - if (inputVertex == null) { - if (other.inputVertex != null) - return false; - } else if (!inputVertex.equals(other.inputVertex)) + Edge edge = (Edge) other; + + if (inputVertex != null ? !inputVertex.equals(edge.inputVertex) : edge.inputVertex != null) return false; - if (outputVertex == null) { - if (other.outputVertex != null) - return false; - } else if (!outputVertex.equals(other.outputVertex)) + if (outputVertex != null ? !outputVertex.equals(edge.outputVertex) : edge.outputVertex != null) return false; - return true; + return name != null ? name.equals(edge.name) : edge.name == null; + + } + + @Override + public int hashCode() { + int result = inputVertex != null ? inputVertex.hashCode() : 0; + result = 31 * result + (outputVertex != null ? outputVertex.hashCode() : 0); + result = 31 * result + (name != null ? name.hashCode() : 0); + return result; } } http://git-wip-us.apache.org/repos/asf/tez/blob/d1b08e37/tez-api/src/main/java/org/apache/tez/dag/api/EdgeProperty.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/EdgeProperty.java b/tez-api/src/main/java/org/apache/tez/dag/api/EdgeProperty.java index 07fb2c1..3723433 100644 --- a/tez-api/src/main/java/org/apache/tez/dag/api/EdgeProperty.java +++ b/tez-api/src/main/java/org/apache/tez/dag/api/EdgeProperty.java @@ -243,10 +243,10 @@ public class EdgeProperty { @Override public String toString() { - return "{ " + dataMovementType + " : " + inputDescriptor.getClassName() + return "(" + dataMovementType + " : " + inputDescriptor.getClassName() + " >> " + dataSourceType + " >> " + outputDescriptor.getClassName() + " >> " + (edgeManagerDescriptor == null ? "NullEdgeManager" : edgeManagerDescriptor.getClassName()) - + " }"; + + ")"; } } http://git-wip-us.apache.org/repos/asf/tez/blob/d1b08e37/tez-api/src/test/java/org/apache/tez/dag/api/TestDAG.java ---------------------------------------------------------------------- diff --git a/tez-api/src/test/java/org/apache/tez/dag/api/TestDAG.java b/tez-api/src/test/java/org/apache/tez/dag/api/TestDAG.java index 05c4e30..691b4c1 100644 --- a/tez-api/src/test/java/org/apache/tez/dag/api/TestDAG.java +++ b/tez-api/src/test/java/org/apache/tez/dag/api/TestDAG.java @@ -88,6 +88,32 @@ public class TestDAG { } @Test(timeout = 5000) + public void testAddDuplicatedNamedEdge() { + Vertex v1 = Vertex.create("v1", ProcessorDescriptor.create("Processor"), + dummyTaskCount, dummyTaskResource); + Vertex v2 = Vertex.create("v2", ProcessorDescriptor.create("Processor"), + dummyTaskCount, dummyTaskResource); + + Edge edge1 = Edge.create(v1, v2, EdgeProperty.create( + DataMovementType.SCATTER_GATHER, DataSourceType.PERSISTED, + SchedulingType.SEQUENTIAL, OutputDescriptor.create("output"), + InputDescriptor.create("input")), "Edge1"); + Edge edge2 = Edge.create(v1, v2, EdgeProperty.create( + DataMovementType.SCATTER_GATHER, DataSourceType.PERSISTED, + SchedulingType.SEQUENTIAL, OutputDescriptor.create("output"), + InputDescriptor.create("input")), "Edge1"); + + DAG dag = DAG.create("testDAG").addVertex(v1).addVertex(v2).addEdge(edge1); + + try { + dag.addEdge(edge2); + Assert.fail("should fail it due to duplicate named edges"); + } catch (Exception e) { + Assert.assertTrue(e.getMessage().contains(edge1 + " already defined")); + } + } + + @Test(timeout = 5000) public void testDuplicatedVertexGroup() { Vertex v1 = Vertex.create("v1", ProcessorDescriptor.create("Processor"), dummyTaskCount, dummyTaskResource); http://git-wip-us.apache.org/repos/asf/tez/blob/d1b08e37/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGVerify.java ---------------------------------------------------------------------- diff --git a/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGVerify.java b/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGVerify.java index 5706542..720820d 100644 --- a/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGVerify.java +++ b/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGVerify.java @@ -1191,4 +1191,253 @@ public class TestDAGVerify { Assert.assertTrue(e.getMessage().contains("There is conflicting local resource")); } } + + @Test(timeout = 5000) + public void testNamedEdge() { + Vertex v1 = Vertex.create("v1", ProcessorDescriptor.create("Processor"), + dummyTaskCount, dummyTaskResource); + Vertex v2 = Vertex.create("v2", ProcessorDescriptor.create("Processor"), + dummyTaskCount, dummyTaskResource); + + + Edge edge1 = Edge.create(v1, v2, EdgeProperty.create( + DataMovementType.SCATTER_GATHER, DataSourceType.PERSISTED, + SchedulingType.SEQUENTIAL, OutputDescriptor.create("output"), + InputDescriptor.create("input")), "Edge1"); + + DAG dag = DAG.create("testDAG").addVertex(v1).addVertex(v2).addEdge(edge1); + dag.verify(); + + Edge edge2 = Edge.create(v1, v2, EdgeProperty.create( + DataMovementType.SCATTER_GATHER, DataSourceType.PERSISTED, + SchedulingType.SEQUENTIAL, OutputDescriptor.create("output"), + InputDescriptor.create("input")), "Edge2"); + + dag.addEdge(edge2).verify(); + } + + @Test(timeout = 5000) + public void testNamedEdgeMixedWithUnnamedEdge() { + Vertex v1 = Vertex.create("v1", ProcessorDescriptor.create("Processor"), + dummyTaskCount, dummyTaskResource); + Vertex v2 = Vertex.create("v2", ProcessorDescriptor.create("Processor"), + dummyTaskCount, dummyTaskResource); + + + Edge edge1 = Edge.create(v1, v2, EdgeProperty.create( + DataMovementType.SCATTER_GATHER, DataSourceType.PERSISTED, + SchedulingType.SEQUENTIAL, OutputDescriptor.create("output"), + InputDescriptor.create("input")), "Edge1"); + Edge edge2 = Edge.create(v1, v2, EdgeProperty.create( + DataMovementType.SCATTER_GATHER, DataSourceType.PERSISTED, + SchedulingType.SEQUENTIAL, OutputDescriptor.create("output"), + InputDescriptor.create("input"))); + + DAG dag = DAG.create("testDAG").addVertex(v1).addVertex(v2).addEdge(edge1).addEdge(edge2); + + try { + dag.verify(); + Assert.fail("should fail it because DAG has both named and unnamed edge"); + } catch (Exception e) { + Assert.assertTrue( + e.getMessage().contains( + "DAG shouldn't contains both named edge " + edge1 + " and unnamed edge " + edge2)); + } + } + + @Test(timeout = 5000) + public void testNamedEdgeWithGroupEdge() { + Vertex v1 = Vertex.create("v1", + ProcessorDescriptor.create("Processor"), + dummyTaskCount, dummyTaskResource); + Vertex v2 = Vertex.create("v2", + ProcessorDescriptor.create("Processor"), + dummyTaskCount, dummyTaskResource); + Vertex v3 = Vertex.create("v3", + ProcessorDescriptor.create("Processor"), + dummyTaskCount, dummyTaskResource); + + DAG dag = DAG.create("testDag"); + dag.addVertex(v1); + dag.addVertex(v2); + dag.addVertex(v3); + String groupName1 = "uv12"; + VertexGroup uv12 = dag.createVertexGroup(groupName1, v1, v2); + + GroupInputEdge e1 = GroupInputEdge.create(uv12, v3, + EdgeProperty.create(DataMovementType.SCATTER_GATHER, + DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL, + OutputDescriptor.create("dummy output class"), + InputDescriptor.create("dummy input class")), + InputDescriptor.create("dummy input class")); + + Edge e2 = Edge.create(v1, v2, EdgeProperty.create( + DataMovementType.SCATTER_GATHER, DataSourceType.PERSISTED, + SchedulingType.CONCURRENT, OutputDescriptor.create("output"), + InputDescriptor.create("input")), "e2"); + + dag.addEdge(e1).addEdge(e2); + try { + dag.verify(); + Assert.fail("should fail it because DAG has both named edge and group edge"); + } catch (Exception e) { + Assert.assertTrue( + e.getMessage().contains("DAG shouldn't contains both named edge " + e2 + " and group edge " + + e1)); + } + } + + @Test(timeout = 5000) + public void testInNamedEdgeCollide() { + Vertex v1 = Vertex.create("v1", ProcessorDescriptor.create("Processor"), + dummyTaskCount, dummyTaskResource); + Vertex v2 = Vertex.create("v2", ProcessorDescriptor.create("Processor"), + dummyTaskCount, dummyTaskResource); + Vertex v3 = Vertex.create("v3", ProcessorDescriptor.create("Processor"), + dummyTaskCount, dummyTaskResource); + + Edge edge1 = Edge.create(v1, v3, EdgeProperty.create( + DataMovementType.SCATTER_GATHER, DataSourceType.PERSISTED, + SchedulingType.SEQUENTIAL, OutputDescriptor.create("output"), + InputDescriptor.create("input")), "Edge1"); + Edge edge2 = Edge.create(v2, v3, EdgeProperty.create( + DataMovementType.SCATTER_GATHER, DataSourceType.PERSISTED, + SchedulingType.SEQUENTIAL, OutputDescriptor.create("output"), + InputDescriptor.create("input")), "Edge1"); + + DAG dag = + DAG.create("testDAG").addVertex(v1).addVertex(v2).addVertex(v3).addEdge(edge1).addEdge(edge2); + + try { + dag.verify(); + Assert.fail("should fail it because v3 gets multiple incoming edges with same name"); + } catch (Exception e) { + Assert.assertTrue( + e.getMessage().contains("v3 contains multiple incoming edges with name Edge1")); + } + } + + @Test(timeout = 5000) + public void testOutNamedEdgeCollide() { + Vertex v1 = Vertex.create("v1", ProcessorDescriptor.create("Processor"), + dummyTaskCount, dummyTaskResource); + Vertex v2 = Vertex.create("v2", ProcessorDescriptor.create("Processor"), + dummyTaskCount, dummyTaskResource); + Vertex v3 = Vertex.create("v3", ProcessorDescriptor.create("Processor"), + dummyTaskCount, dummyTaskResource); + + Edge edge1 = Edge.create(v1, v3, EdgeProperty.create( + DataMovementType.SCATTER_GATHER, DataSourceType.PERSISTED, + SchedulingType.SEQUENTIAL, OutputDescriptor.create("output"), + InputDescriptor.create("input")), "Edge1"); + Edge edge2 = Edge.create(v1, v2, EdgeProperty.create( + DataMovementType.SCATTER_GATHER, DataSourceType.PERSISTED, + SchedulingType.SEQUENTIAL, OutputDescriptor.create("output"), + InputDescriptor.create("input")), "Edge1"); + + DAG dag = + DAG.create("testDAG").addVertex(v1).addVertex(v2).addVertex(v3).addEdge(edge1).addEdge(edge2); + + try { + dag.verify(); + Assert.fail("should fail it because v3 gets multiple outgoing edges with same name"); + } catch (Exception e) { + Assert.assertTrue( + e.getMessage().contains("v1 contains multiple outgoing edges with name Edge1")); + } + } + + @Test(timeout = 5000) + public void testInEdgeOutEdgeWithSameName() { + Vertex v1 = Vertex.create("v1", ProcessorDescriptor.create("Processor"), + dummyTaskCount, dummyTaskResource); + Vertex v2 = Vertex.create("v2", ProcessorDescriptor.create("Processor"), + dummyTaskCount, dummyTaskResource); + Vertex v3 = Vertex.create("v3", ProcessorDescriptor.create("Processor"), + dummyTaskCount, dummyTaskResource); + + Edge edge1 = Edge.create(v1, v2, EdgeProperty.create( + DataMovementType.SCATTER_GATHER, DataSourceType.PERSISTED, + SchedulingType.SEQUENTIAL, OutputDescriptor.create("output"), + InputDescriptor.create("input")), "Edge1"); + Edge edge2 = Edge.create(v2, v3, EdgeProperty.create( + DataMovementType.SCATTER_GATHER, DataSourceType.PERSISTED, + SchedulingType.SEQUENTIAL, OutputDescriptor.create("output"), + InputDescriptor.create("input")), "Edge1"); + + DAG.create("testDAG") + .addVertex(v1).addVertex(v2).addVertex(v3) + .addEdge(edge1).addEdge(edge2) + .verify(); + } + + @Test(timeout = 5000) + public void testNamedEdgeCollideWithRootInput() { + Vertex v1 = Vertex.create("v1", ProcessorDescriptor.create("Processor"), + dummyTaskCount, dummyTaskResource); + Vertex v2 = Vertex.create("v2", ProcessorDescriptor.create("Processor"), + dummyTaskCount, dummyTaskResource); + v2.addDataSource("input", + DataSourceDescriptor.create(InputDescriptor.create("input"), null, null)); + + Edge edge1 = Edge.create(v1, v2, EdgeProperty.create( + DataMovementType.SCATTER_GATHER, DataSourceType.PERSISTED, + SchedulingType.SEQUENTIAL, OutputDescriptor.create("output"), + InputDescriptor.create("input")), "input"); + + DAG dag = DAG.create("testDag").addVertex(v1).addVertex(v2).addEdge(edge1); + + try { + dag.verify(); + Assert.fail("should fail it because v2 get incoming edge and input with same name"); + } catch (Exception e) { + Assert.assertTrue( + e.getMessage().contains("v2 contains an incoming edge and Input with the same name: input")); + } + } + + @Test(timeout = 5000) + public void testNamedEdgeCollideWithLeafOutput() { + Vertex v1 = Vertex.create("v1", ProcessorDescriptor.create("Processor"), + dummyTaskCount, dummyTaskResource); + Vertex v2 = Vertex.create("v2", ProcessorDescriptor.create("Processor"), + dummyTaskCount, dummyTaskResource); + v1.addDataSink("output", + DataSinkDescriptor.create(OutputDescriptor.create("output"), null, null)); + + Edge edge1 = Edge.create(v1, v2, EdgeProperty.create( + DataMovementType.SCATTER_GATHER, DataSourceType.PERSISTED, + SchedulingType.SEQUENTIAL, OutputDescriptor.create("output"), + InputDescriptor.create("input")), "output"); + + DAG dag = DAG.create("testDag").addVertex(v1).addVertex(v2).addEdge(edge1); + + try { + dag.verify(); + Assert.fail("should fail it because v2 get outgoing edge and output with same name"); + } catch (Exception e) { + Assert.assertTrue(e.getMessage().contains( + "v1 contains an outgoing edge and Output with the same name: output")); + } + } + + @Test(timeout = 5000) + public void testNamedEdgeUsingVertexName() { + Vertex v1 = Vertex.create("v1", ProcessorDescriptor.create("Processor"), + dummyTaskCount, dummyTaskResource); + Vertex v2 = Vertex.create("v2", ProcessorDescriptor.create("Processor"), + dummyTaskCount, dummyTaskResource); + + Edge edge1 = Edge.create(v1, v2, EdgeProperty.create( + DataMovementType.SCATTER_GATHER, DataSourceType.PERSISTED, + SchedulingType.SEQUENTIAL, OutputDescriptor.create("output"), + InputDescriptor.create("input")), "v1"); + Edge edge2 = Edge.create(v1, v2, EdgeProperty.create( + DataMovementType.SCATTER_GATHER, DataSourceType.PERSISTED, + SchedulingType.SEQUENTIAL, OutputDescriptor.create("output"), + InputDescriptor.create("input")), "v2"); + + DAG dag = DAG.create("testDag").addVertex(v1).addVertex(v2).addEdge(edge1).addEdge(edge2); + dag.verify(); + } } http://git-wip-us.apache.org/repos/asf/tez/blob/d1b08e37/tez-api/src/test/java/org/apache/tez/dag/api/TestEdge.java ---------------------------------------------------------------------- diff --git a/tez-api/src/test/java/org/apache/tez/dag/api/TestEdge.java b/tez-api/src/test/java/org/apache/tez/dag/api/TestEdge.java new file mode 100644 index 0000000..61f4fbc --- /dev/null +++ b/tez-api/src/test/java/org/apache/tez/dag/api/TestEdge.java @@ -0,0 +1,75 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.tez.dag.api; + +import org.junit.Before; +import org.junit.Test; + +import java.util.HashSet; +import java.util.Set; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertTrue; + +public class TestEdge { + Vertex v1, v2; + EdgeProperty edgeProperty; + Set set; + + @Before + public void setup() { + v1 = Vertex.create("v1", ProcessorDescriptor.create("Processor")); + v2 = Vertex.create("v2", ProcessorDescriptor.create("Processor")); + edgeProperty = EdgeProperty.create(EdgeProperty.DataMovementType.SCATTER_GATHER, + EdgeProperty.DataSourceType.PERSISTED, EdgeProperty.SchedulingType.CONCURRENT, + OutputDescriptor.create("output"), InputDescriptor.create("input")); + set = new HashSet<>(); + } + + @Test(timeout = 5000) + public void testHashAndEqualsUnnamed() { + // edges without name but everything else same are equal and have same hash + Edge e1 = Edge.create(v1, v2, edgeProperty); + Edge e2 = Edge.create(v1, v2, edgeProperty); + assertEquals(e1, e2); + set.add(e1); + assertTrue(set.contains(e2)); + } + + @Test(timeout = 5000) + public void testHashAndEqualsNamed() { + // edges with everything same including name are equal and have same hash + Edge e1 = Edge.create(v1, v2, edgeProperty, "e1"); + Edge e2 = Edge.create(v1, v2, edgeProperty, "e1"); + assertEquals(e1, e2); + set.add(e1); + assertTrue(set.contains(e2)); + } + + @Test(timeout = 5000) + public void testHashAndEqualsDifferentName() { + // edges with different name but everything else same are not equal and have different hash + Edge e1 = Edge.create(v1, v2, edgeProperty, "e1"); + Edge e2 = Edge.create(v1, v2, edgeProperty, "e2"); + assertNotEquals(e1, e2); + set.add(e1); + assertFalse(set.contains(e2)); + } +}