tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ss...@apache.org
Subject [3/3] git commit: TEZ-210. Change the DAG constructor accept DAG name as a parameter. (sseth)
Date Wed, 12 Jun 2013 17:09:39 GMT
TEZ-210. Change the DAG constructor accept DAG name as a parameter.
(sseth)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/2d8c9b6a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/2d8c9b6a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/2d8c9b6a

Branch: refs/heads/master
Commit: 2d8c9b6a949ab714ec3f185ac3c31a31d5eae283
Parents: 66c32a4
Author: Siddharth Seth <sseth@apache.org>
Authored: Wed Jun 12 10:09:00 2013 -0700
Committer: Siddharth Seth <sseth@apache.org>
Committed: Wed Jun 12 10:09:00 2013 -0700

----------------------------------------------------------------------
 .../main/java/org/apache/tez/client/TezClient.java  | 10 ++++------
 .../src/main/java/org/apache/tez/dag/api/DAG.java   | 16 ++++++++--------
 .../java/org/apache/tez/dag/api/TestDAGPlan.java    |  2 +-
 .../java/org/apache/tez/dag/api/TestDAGVerify.java  | 12 ++++++------
 .../org/apache/tez/dag/app/MRRExampleHelper.java    |  4 ++--
 .../org/apache/tez/mapreduce/TestMRRJobsDAGApi.java |  6 ++----
 .../java/org/apache/tez/mapreduce/YARNRunner.java   | 14 +++++---------
 7 files changed, 28 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/2d8c9b6a/tez-dag-api/src/main/java/org/apache/tez/client/TezClient.java
----------------------------------------------------------------------
diff --git a/tez-dag-api/src/main/java/org/apache/tez/client/TezClient.java b/tez-dag-api/src/main/java/org/apache/tez/client/TezClient.java
index d3b1e41..ff8c486 100644
--- a/tez-dag-api/src/main/java/org/apache/tez/client/TezClient.java
+++ b/tez-dag-api/src/main/java/org/apache/tez/client/TezClient.java
@@ -130,7 +130,6 @@ public class TezClient {
    * @param appStagingDir FileSystem path in which resources will be copied
    * @param ts Application credentials
    * @param amQueueName Queue to which the application will be submitted
-   * @param amName Application name
    * @param amArgs Command line Java arguments for the ApplicationMaster
    * @param amEnv Environment to be added to the ApplicationMaster
    * @param amLocalResources YARN local resource for the ApplicationMaster
@@ -139,11 +138,11 @@ public class TezClient {
    * @throws YarnException
    */
   public ApplicationId submitDAGApplication(DAG dag, Path appStagingDir,
-      Credentials ts, String amQueueName, String amName, List<String> amArgs,
+      Credentials ts, String amQueueName, List<String> amArgs,
       Map<String, String> amEnv, Map<String, LocalResource> amLocalResources)
       throws IOException, YarnException {
     ApplicationId appId = createApplication();
-    submitDAGApplication(appId, dag, appStagingDir, ts, amQueueName, amName,
+    submitDAGApplication(appId, dag, appStagingDir, ts, amQueueName,
         amArgs, amEnv, amLocalResources);
     return appId;
   }
@@ -156,7 +155,6 @@ public class TezClient {
    * @param appStagingDir FileSystem path in which resources will be copied
    * @param ts Application credentials
    * @param amQueueName Queue to which the application will be submitted
-   * @param amName Application name
    * @param amArgs Command line Java arguments for the ApplicationMaster
    * @param amEnv Environment to be added to the ApplicationMaster
    * @param amLocalResources YARN local resource for the ApplicationMaster
@@ -165,12 +163,12 @@ public class TezClient {
    * @throws YarnException
    */
   public void submitDAGApplication(ApplicationId appId, DAG dag,
-      Path appStagingDir, Credentials ts, String amQueueName, String amName,
+      Path appStagingDir, Credentials ts, String amQueueName,
       List<String> amArgs, Map<String, String> amEnv,
       Map<String, LocalResource> amLocalResources) throws IOException,
       YarnException {
     ApplicationSubmissionContext appContext = createApplicationSubmissionContext(
-        appId, dag, appStagingDir, ts, amQueueName, amName, amArgs, amEnv,
+        appId, dag, appStagingDir, ts, amQueueName, dag.getName(), amArgs, amEnv,
         amLocalResources);
 
     yarnClient.submitApplication(appContext);

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/2d8c9b6a/tez-dag-api/src/main/java/org/apache/tez/dag/api/DAG.java
----------------------------------------------------------------------
diff --git a/tez-dag-api/src/main/java/org/apache/tez/dag/api/DAG.java b/tez-dag-api/src/main/java/org/apache/tez/dag/api/DAG.java
index 05a22cd..9369c8f 100644
--- a/tez-dag-api/src/main/java/org/apache/tez/dag/api/DAG.java
+++ b/tez-dag-api/src/main/java/org/apache/tez/dag/api/DAG.java
@@ -40,15 +40,16 @@ import org.apache.tez.dag.api.records.DAGProtos.VertexPlan;
 
 
 public class DAG { // FIXME rename to Topology
-  List<Vertex> vertices;
-  List<Edge> edges;
-  String name;
+  final List<Vertex> vertices;
+  final List<Edge> edges;
+  final String name;
   
   HashMap<String, String> config = new HashMap<String, String>();
   
-  public DAG() {
+  public DAG(String name) {
     this.vertices = new ArrayList<Vertex>();
     this.edges = new ArrayList<Edge>();
+    this.name = name;
   }
 
   public synchronized DAG addVertex(Vertex vertex) {
@@ -87,10 +88,9 @@ public class DAG { // FIXME rename to Topology
     config.put(key, value);
     return this;
   }
-  
-  public DAG setName(String name) {
-    this.name = name;
-    return this;
+
+  public String getName() {
+    return this.name;
   }
   
   // AnnotatedVertex is used by verify() 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/2d8c9b6a/tez-dag-api/src/test/java/org/apache/tez/dag/api/TestDAGPlan.java
----------------------------------------------------------------------
diff --git a/tez-dag-api/src/test/java/org/apache/tez/dag/api/TestDAGPlan.java b/tez-dag-api/src/test/java/org/apache/tez/dag/api/TestDAGPlan.java
index 1eb047f..7e52329 100644
--- a/tez-dag-api/src/test/java/org/apache/tez/dag/api/TestDAGPlan.java
+++ b/tez-dag-api/src/test/java/org/apache/tez/dag/api/TestDAGPlan.java
@@ -95,7 +95,7 @@ public class TestDAGPlan {
   
   @Test
   public void testUserPayloadSerde() {
-    DAG dag = new DAG().setName("testDag");
+    DAG dag = new DAG("testDag");
     ProcessorDescriptor pd1 = new ProcessorDescriptor("processor1",
         ByteBuffer.wrap("processor1Bytes".getBytes()));
     ProcessorDescriptor pd2 = new ProcessorDescriptor("processor2",

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/2d8c9b6a/tez-dag-api/src/test/java/org/apache/tez/dag/api/TestDAGVerify.java
----------------------------------------------------------------------
diff --git a/tez-dag-api/src/test/java/org/apache/tez/dag/api/TestDAGVerify.java b/tez-dag-api/src/test/java/org/apache/tez/dag/api/TestDAGVerify.java
index 9b534c6..6134f42 100644
--- a/tez-dag-api/src/test/java/org/apache/tez/dag/api/TestDAGVerify.java
+++ b/tez-dag-api/src/test/java/org/apache/tez/dag/api/TestDAGVerify.java
@@ -38,7 +38,7 @@ public class TestDAGVerify {
     Vertex v1 = new Vertex("v1", new ProcessorDescriptor(dummyProcessorClassName, null),
dummyTaskCount);
     Vertex v2 = new Vertex("v2", new ProcessorDescriptor("org.apache.tez.mapreduce.processor.reduce.MapProcessor",
null), dummyTaskCount);
     Edge e1 = new Edge(v1, v2, new EdgeProperty(ConnectionPattern.BIPARTITE, SourceType.STABLE,
new OutputDescriptor(dummyOutputClassName, null), new InputDescriptor(dummyInputClassName,
null)));
-    DAG dag = new DAG();
+    DAG dag = new DAG("testDag");
     dag.addVertex(v1);
     dag.addVertex(v2);
     dag.addEdge(e1);
@@ -61,7 +61,7 @@ public class TestDAGVerify {
     Edge e2 = new Edge(v2, v3, new EdgeProperty(ConnectionPattern.BIPARTITE, SourceType.STABLE,
new OutputDescriptor("dummy output class", null), new InputDescriptor("dummy input class",
null)));
     Edge e3 = new Edge(v2, v4, new EdgeProperty(ConnectionPattern.BIPARTITE, SourceType.STABLE,
new OutputDescriptor("dummy output class", null), new InputDescriptor("dummy input class",
null)));
     Edge e4 = new Edge(v4, v1, new EdgeProperty(ConnectionPattern.BIPARTITE, SourceType.STABLE,
new OutputDescriptor("dummy output class", null), new InputDescriptor("dummy input class",
null)));
-    DAG dag = new DAG();
+    DAG dag = new DAG("testDag");
     dag.addVertex(v1);
     dag.addVertex(v2);
     dag.addVertex(v3);
@@ -97,7 +97,7 @@ public class TestDAGVerify {
     Edge e2 = new Edge(v2, v3, new EdgeProperty(ConnectionPattern.BIPARTITE, SourceType.STABLE,
new OutputDescriptor("dummy output class", null), new InputDescriptor("dummy input class",
null)));
     Edge e3 = new Edge(v2, v4, new EdgeProperty(ConnectionPattern.BIPARTITE, SourceType.STABLE,
new OutputDescriptor("dummy output class", null), new InputDescriptor("dummy input class",
null)));
     Edge e4 = new Edge(v3, v2, new EdgeProperty(ConnectionPattern.BIPARTITE, SourceType.STABLE,
new OutputDescriptor("dummy output class", null), new InputDescriptor("dummy input class",
null)));
-    DAG dag = new DAG();
+    DAG dag = new DAG("testDag");
     dag.addVertex(v1);
     dag.addVertex(v2);
     dag.addVertex(v3);
@@ -122,7 +122,7 @@ public class TestDAGVerify {
     IllegalStateException ex=null;
     Vertex v1 = new Vertex("v1", new ProcessorDescriptor("org.apache.tez.mapreduce.processor.reduce.MapProcessor",
null), dummyTaskCount);
     Vertex v1repeat = new Vertex("v1", new ProcessorDescriptor("org.apache.tez.mapreduce.processor.reduce.MapProcessor",
null), dummyTaskCount);
-    DAG dag = new DAG();
+    DAG dag = new DAG("testDag");
     dag.addVertex(v1);
     dag.addVertex(v1repeat);
     try {
@@ -148,7 +148,7 @@ public class TestDAGVerify {
       Vertex v3 = new Vertex("v3", new ProcessorDescriptor("org.apache.tez.mapreduce.processor.reduce.MapProcessor",
null), dummyTaskCount);
       Edge e1 = new Edge(v1, v3, new EdgeProperty(ConnectionPattern.BIPARTITE, SourceType.STABLE,
new OutputDescriptor("dummy output class", null), new InputDescriptor("dummy input class",
null)));
       Edge e2 = new Edge(v2, v3, new EdgeProperty(ConnectionPattern.BIPARTITE, SourceType.STABLE,
new OutputDescriptor("dummy output class", null), new InputDescriptor("dummy input class",
null)));
-      DAG dag = new DAG();
+      DAG dag = new DAG("testDag");
       dag.addVertex(v1);
       dag.addVertex(v2);
       dag.addVertex(v3);
@@ -176,7 +176,7 @@ public class TestDAGVerify {
       Vertex v3 = new Vertex("v3", new ProcessorDescriptor("org.apache.tez.mapreduce.processor.reduce.MapProcessor",
null), dummyTaskCount);
       Edge e1 = new Edge(v1, v2, new EdgeProperty(ConnectionPattern.BIPARTITE, SourceType.STABLE,
new OutputDescriptor("dummy output class", null), new InputDescriptor("dummy input class",
null)));
       Edge e2 = new Edge(v1, v2, new EdgeProperty(ConnectionPattern.BIPARTITE, SourceType.STABLE,
new OutputDescriptor("dummy output class", null), new InputDescriptor("dummy input class",
null)));
-      DAG dag = new DAG();
+      DAG dag = new DAG("testDag");
       dag.addVertex(v1);
       dag.addVertex(v2);
       dag.addVertex(v3);

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/2d8c9b6a/tez-dag/src/main/java/org/apache/tez/dag/app/MRRExampleHelper.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/MRRExampleHelper.java b/tez-dag/src/main/java/org/apache/tez/dag/app/MRRExampleHelper.java
index 3f5388c..cfd6554 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/MRRExampleHelper.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/MRRExampleHelper.java
@@ -90,7 +90,7 @@ public class MRRExampleHelper {
  //       -> not tested with new DagPB system.
  
  static DAGPlan createDAGConfigurationForMRR() throws IOException {
-   org.apache.tez.dag.api.DAG dag = new org.apache.tez.dag.api.DAG();
+   org.apache.tez.dag.api.DAG dag = new org.apache.tez.dag.api.DAG("examplemrrjob");
     Vertex mapVertex = new Vertex("map", new ProcessorDescriptor(
         "org.apache.tez.mapreduce.task.InitialTask", null), 6);
     Vertex reduce1Vertex = new Vertex("reduce1", new ProcessorDescriptor(
@@ -158,7 +158,7 @@ public class MRRExampleHelper {
 
  // TODO remove once client is in place
  static DAGPlan createDAGConfigurationForMR() throws IOException {
-   org.apache.tez.dag.api.DAG dag = new org.apache.tez.dag.api.DAG();
+   org.apache.tez.dag.api.DAG dag = new org.apache.tez.dag.api.DAG("examplemrjob");
     Vertex mapVertex = new Vertex("map", new ProcessorDescriptor(
         "org.apache.tez.mapreduce.task.InitialTask", null), 6);
     Vertex reduceVertex = new Vertex("reduce", new ProcessorDescriptor(

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/2d8c9b6a/tez-mapreduce-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java
b/tez-mapreduce-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java
index c210953..9052e02 100644
--- a/tez-mapreduce-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java
+++ b/tez-mapreduce-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java
@@ -237,8 +237,7 @@ public class TestMRRJobsDAGApi {
     InputSplitInfo inputSplitInfo = MRHelpers.generateInputSplits(stage1Conf,
         remoteStagingDir);
 
-    DAG dag = new DAG();
-    dag.setName("testMRRSleepJobDagSubmit");
+    DAG dag = new DAG("testMRRSleepJobDagSubmit");
     Vertex stage1Vertex = new Vertex("map", new ProcessorDescriptor(
         MapProcessor.class.getName(),
         MRHelpers.createByteBufferFromConf(stage1Conf)),
@@ -319,8 +318,7 @@ public class TestMRRJobsDAGApi {
         mrrTezCluster.getConfig()));
     // TODO Use utility method post TEZ-205 to figure out AM arguments etc.
     ApplicationId appId = tezClient.submitDAGApplication(dag, remoteStagingDir,
-        null, "default", "testMRRSleepJobDagSubmit",
-        Collections.singletonList(""), commonEnv, amLocalResources);
+        null, "default", Collections.singletonList(""), commonEnv, amLocalResources);
     DAGClient dagClient = tezClient.getDAGClient(appId.toString());
 
     while (dagClient == null) {

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/2d8c9b6a/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YARNRunner.java
----------------------------------------------------------------------
diff --git a/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YARNRunner.java b/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YARNRunner.java
index 48e3f12..1eae3bb 100644
--- a/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YARNRunner.java
+++ b/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YARNRunner.java
@@ -538,8 +538,10 @@ public class YARNRunner implements ClientProtocol {
   private DAG createDAG(FileSystem fs, JobID jobId, JobConf jobConf,
       String jobSubmitDir, Credentials ts,
       Map<String, LocalResource> jobLocalResources) throws IOException {
-
-    DAG dag = new DAG();
+    
+    String jobName = jobConf.get(MRJobConfig.JOB_NAME,
+        YarnConfiguration.DEFAULT_APPLICATION_NAME);
+    DAG dag = new DAG(jobName);
 
     int numMaps = jobConf.getInt(MRJobConfig.NUM_MAPS, 0);
     int numReduces = jobConf.getInt(MRJobConfig.NUM_REDUCES, 0);
@@ -691,11 +693,6 @@ public class YARNRunner implements ClientProtocol {
         dag.addConfiguration(entry.getValue(), mrConf.get(entry.getKey()));
       }
     }
-
-    String jobName = mrConf.get(MRJobConfig.JOB_NAME);
-    if(jobName != null) {
-      dag.setName(jobName);
-    }
   }
 
   private void writeTezConf(String jobSubmitDir, FileSystem fs,
@@ -783,8 +780,7 @@ public class YARNRunner implements ClientProtocol {
           dag, 
           appStagingDir, 
           ts,
-          jobConf.get(JobContext.QUEUE_NAME, YarnConfiguration.DEFAULT_QUEUE_NAME), 
-          jobConf.get(JobContext.JOB_NAME, YarnConfiguration.DEFAULT_APPLICATION_NAME), 
+          jobConf.get(JobContext.QUEUE_NAME, YarnConfiguration.DEFAULT_QUEUE_NAME),
           vargs, 
           environment, 
           jobLocalResources);


Mime
View raw message