tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From zjf...@apache.org
Subject tez git commit: TEZ-2219. Should verify the input_name/output_name to be unique per vertex (zjffdu)
Date Mon, 23 Mar 2015 06:56:00 GMT
Repository: tez
Updated Branches:
  refs/heads/master be982afd1 -> aa784be6e


TEZ-2219. Should verify the input_name/output_name to be unique per vertex (zjffdu)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/aa784be6
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/aa784be6
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/aa784be6

Branch: refs/heads/master
Commit: aa784be6e0a7d4b059212f047f3100cfa085f6e1
Parents: be982af
Author: Jeff Zhang <zjffdu@apache.org>
Authored: Mon Mar 23 14:55:46 2015 +0800
Committer: Jeff Zhang <zjffdu@apache.org>
Committed: Mon Mar 23 14:55:46 2015 +0800

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../java/org/apache/tez/dag/api/Vertex.java     |  35 ++++--
 .../java/org/apache/tez/dag/api/TestDAG.java    | 117 ++++++++++++++++++-
 3 files changed, 141 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/aa784be6/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 7a554d4..3c9d114 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -249,6 +249,7 @@ TEZ-UI CHANGES (TEZ-8):
 Release 0.5.4: Unreleased
 
 ALL CHANGES:
+  TEZ-2219. Should verify the input_name/output_name to be unique per vertex
   TEZ-2186. OOM with a simple scatter gather job with re-use
   TEZ-2220. TestTezJobs compile failure in branch 0.5.
   TEZ-2199. updateLocalResourcesForInputSplits assumes wrongly that split data is on same
FS as the default FS.

http://git-wip-us.apache.org/repos/asf/tez/blob/aa784be6/tez-api/src/main/java/org/apache/tez/dag/api/Vertex.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/Vertex.java b/tez-api/src/main/java/org/apache/tez/dag/api/Vertex.java
index c8d3df7..0ed4bd8 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/Vertex.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/Vertex.java
@@ -24,6 +24,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceAudience.Public;
@@ -56,10 +57,10 @@ public class Vertex {
   private final Map<String, LocalResource> taskLocalResources = new HashMap<String,
LocalResource>();
   private Map<String, String> taskEnvironment = new HashMap<String, String>();
   private Map<String, String> vertexConf = new HashMap<String, String>();
-  private final List<RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor>>
additionalInputs 
-                      = new ArrayList<RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor>>();
-  private final List<RootInputLeafOutput<OutputDescriptor, OutputCommitterDescriptor>>
additionalOutputs 
-                      = new ArrayList<RootInputLeafOutput<OutputDescriptor, OutputCommitterDescriptor>>();
+  private final Map<String, RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor>>
additionalInputs
+                      = new HashMap<String, RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor>>();
+  private final Map<String, RootInputLeafOutput<OutputDescriptor, OutputCommitterDescriptor>>
additionalOutputs
+                      = new HashMap<String, RootInputLeafOutput<OutputDescriptor, OutputCommitterDescriptor>>();
   private VertexManagerPluginDescriptor vertexManagerPlugin;
 
   private final List<Vertex> inputVertices = new ArrayList<Vertex>();
@@ -329,8 +330,12 @@ public class Vertex {
    * @return this Vertex
    */
   public Vertex addDataSource(String inputName, DataSourceDescriptor dataSourceDescriptor)
{
+    Preconditions.checkArgument(StringUtils.isNotBlank(inputName),
+        "InputName should not be null, empty or white space only, inputName=" + inputName);
+    Preconditions.checkArgument(!additionalInputs.containsKey(inputName),
+        "Duplicated input:" + inputName + ", vertexName=" + vertexName);
     additionalInputs
-        .add(new RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor>(
+        .put(inputName, new RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor>(
             inputName, dataSourceDescriptor.getInputDescriptor(),
             dataSourceDescriptor.getInputInitializerDescriptor()));
     this.dataSources.add(dataSourceDescriptor);
@@ -356,19 +361,27 @@ public class Vertex {
    * @return this Vertex
    */
   public Vertex addDataSink(String outputName, DataSinkDescriptor dataSinkDescriptor) {
+    Preconditions.checkArgument(StringUtils.isNotBlank(outputName),
+        "OutputName should not be null, empty or white space only, outputName=" + outputName);
+    Preconditions.checkArgument(!additionalOutputs.containsKey(outputName),
+        "Duplicated output:" + outputName + ", vertexName=" + vertexName);
     additionalOutputs
-        .add(new RootInputLeafOutput<OutputDescriptor, OutputCommitterDescriptor>(
+        .put(outputName, new RootInputLeafOutput<OutputDescriptor, OutputCommitterDescriptor>(
             outputName, dataSinkDescriptor.getOutputDescriptor(),
             dataSinkDescriptor.getOutputCommitterDescriptor()));
     this.dataSinks.add(dataSinkDescriptor);
     return this;
   }
-  
+
   Vertex addAdditionalDataSink(RootInputLeafOutput<OutputDescriptor, OutputCommitterDescriptor>
output) {
-    additionalOutputs.add(output);
+    Preconditions.checkArgument(StringUtils.isNotBlank(output.getName()),
+        "OutputName should not be null, empty or white space only, outputName=" + output.getName());
+    Preconditions.checkArgument(!additionalOutputs.containsKey(output.getName()),
+        "Duplicated output:" + output.getName() + ", vertexName=" + vertexName);
+    additionalOutputs.put(output.getName(), output);
     return this;
   }
-  
+
   /**
    * Specifies a {@link VertexManagerPlugin} for the vertex. This plugin can be
    * used to modify the parallelism or reconfigure the vertex at runtime using
@@ -471,11 +484,11 @@ public class Vertex {
   }
   
   List<RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor>> getInputs()
{
-    return additionalInputs;
+    return Lists.newArrayList(additionalInputs.values());
   }
 
   List<RootInputLeafOutput<OutputDescriptor, OutputCommitterDescriptor>> getOutputs()
{
-    return additionalOutputs;
+    return Lists.newArrayList(additionalOutputs.values());
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/tez/blob/aa784be6/tez-api/src/test/java/org/apache/tez/dag/api/TestDAG.java
----------------------------------------------------------------------
diff --git a/tez-api/src/test/java/org/apache/tez/dag/api/TestDAG.java b/tez-api/src/test/java/org/apache/tez/dag/api/TestDAG.java
index 8e7f80b..0590907 100644
--- a/tez-api/src/test/java/org/apache/tez/dag/api/TestDAG.java
+++ b/tez-api/src/test/java/org/apache/tez/dag/api/TestDAG.java
@@ -177,7 +177,7 @@ public class TestDAG {
     // set invalid DAG level configuration
     try {
       v1.setConf(TezConfiguration.TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS, false + "");
-      Assert.fail();
+      Assert.fail("should fail due to invalid configuration set");
     } catch (IllegalStateException e) {
       Assert.assertEquals("tez.am.commit-all-outputs-on-dag-success is set at the scope of
VERTEX,"
           + " but it is only valid in the scope of DAG",
@@ -186,4 +186,119 @@ public class TestDAG {
     // set valid Vertex level configuration
     v1.setConf(TezConfiguration.TEZ_AM_TASK_MAX_FAILED_ATTEMPTS, 3 + "");
   }
+
+  @Test(timeout = 5000)
+  public void testDuplicatedInput() {
+    Vertex v1 = Vertex.create("v1", ProcessorDescriptor.create("dummyProcessor"));
+    DataSourceDescriptor dataSource =
+        DataSourceDescriptor.create(InputDescriptor.create("dummyInput"), null, null);
+    try {
+      v1.addDataSource(null, dataSource);
+      Assert.fail("Should fail due to invalid inputName");
+    } catch (IllegalArgumentException e) {
+      Assert.assertTrue(e.getMessage()
+          .contains("InputName should not be null, empty or white space only,"));
+    }
+    try {
+      v1.addDataSource("", dataSource);
+      Assert.fail("Should fail due to invalid inputName");
+    } catch (IllegalArgumentException e) {
+      Assert.assertTrue(e.getMessage()
+          .contains("InputName should not be null, empty or white space only,"));
+    }
+    try {
+      v1.addDataSource(" ", dataSource);
+      Assert.fail("Should fail due to invalid inputName");
+    } catch (IllegalArgumentException e) {
+      Assert.assertTrue(e.getMessage()
+          .contains("InputName should not be null, empty or white space only,"));
+    }
+
+    v1.addDataSource("input_1", dataSource);
+    try {
+      v1.addDataSource("input_1",
+          DataSourceDescriptor.create(InputDescriptor.create("dummyInput"), null, null));
+      Assert.fail("Should fail due to duplicated input");
+    } catch (IllegalArgumentException e) {
+      Assert.assertEquals("Duplicated input:input_1, vertexName=v1", e.getMessage());
+    }
+  }
+
+  @Test(timeout = 5000)
+  public void testDuplicatedOutput_1() {
+    Vertex v1 = Vertex.create("v1", ProcessorDescriptor.create("dummyProcessor"));
+    DataSinkDescriptor dataSink =
+        DataSinkDescriptor.create(OutputDescriptor.create("dummyOutput"), null, null);
+    try {
+      v1.addDataSink(null, dataSink);
+      Assert.fail("Should fail due to invalid outputName");
+    } catch (IllegalArgumentException e) {
+      Assert.assertTrue(e.getMessage()
+          .contains("OutputName should not be null, empty or white space only,"));
+    }
+    try {
+      v1.addDataSink("", dataSink);
+      Assert.fail("Should fail due to invalid outputName");
+    } catch (IllegalArgumentException e) {
+      Assert.assertTrue(e.getMessage()
+          .contains("OutputName should not be null, empty or white space only,"));
+    }
+    try {
+      v1.addDataSink(" ", dataSink);
+      Assert.fail("Should fail due to invalid outputName");
+    } catch (IllegalArgumentException e) {
+      Assert.assertTrue(e.getMessage()
+          .contains("OutputName should not be null, empty or white space only,"));
+    }
+
+    v1.addDataSink("output_1",
+        DataSinkDescriptor.create(OutputDescriptor.create("dummyOutput"), null, null));
+    try {
+      v1.addDataSink("output_1",
+          DataSinkDescriptor.create(OutputDescriptor.create("dummyOutput"), null, null));
+      Assert.fail("Should fail due to duplicated output");
+    } catch (IllegalArgumentException e) {
+      Assert.assertEquals("Duplicated output:output_1, vertexName=v1", e.getMessage());
+    }
+  }
+
+  @Test(timeout = 5000)
+  public void testDuplicatedOutput_2() {
+    DAG dag = DAG.create("dag1");
+    Vertex v1 = Vertex.create("v1", ProcessorDescriptor.create("dummyProcessor"));
+    DataSinkDescriptor dataSink =
+        DataSinkDescriptor.create(OutputDescriptor.create("dummyOutput"), null, null);
+    try {
+      v1.addDataSink(null, dataSink);
+      Assert.fail("Should fail due to invalid outputName");
+    } catch (IllegalArgumentException e) {
+      Assert.assertTrue(e.getMessage()
+          .contains("OutputName should not be null, empty or white space only,"));
+    }
+    try {
+      v1.addDataSink("", dataSink);
+      Assert.fail("Should fail due to invalid outputName");
+    } catch (IllegalArgumentException e) {
+      Assert.assertTrue(e.getMessage()
+          .contains("OutputName should not be null, empty or white space only,"));
+    }
+    try {
+      v1.addDataSink(" ", dataSink);
+      Assert.fail("Should fail due to invalid outputName");
+    } catch (IllegalArgumentException e) {
+      Assert.assertTrue(e.getMessage()
+          .contains("OutputName should not be null, empty or white space only,"));
+    }
+
+    v1.addDataSink("output_1", dataSink);
+    Vertex v2 = Vertex.create("v1", ProcessorDescriptor.create("dummyProcessor"));
+    VertexGroup vGroup = dag.createVertexGroup("group_1", v1,v2);
+    try {
+      vGroup.addDataSink("output_1",
+          DataSinkDescriptor.create(OutputDescriptor.create("dummyOutput"), null, null));
+      Assert.fail("Should fail due to duplicated output");
+    } catch (IllegalArgumentException e) {
+      Assert.assertEquals("Duplicated output:output_1, vertexName=v1", e.getMessage());
+    }
+  }
 }


Mime
View raw message