tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mlidd...@apache.org
Subject [1/2] TEZ-25: Change DAG representation from Configuration object to structured protobuf message
Date Mon, 13 May 2013 18:21:44 GMT
Updated Branches:
  refs/heads/TEZ-1 97e1fa967 -> cb5758b42


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/cb5758b4/tez-dag/src/test/java/org/apache/tez/dag/api/TestVertexLocationHint.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/api/TestVertexLocationHint.java b/tez-dag/src/test/java/org/apache/tez/dag/api/TestVertexLocationHint.java
deleted file mode 100644
index eb8c781..0000000
--- a/tez-dag/src/test/java/org/apache/tez/dag/api/TestVertexLocationHint.java
+++ /dev/null
@@ -1,147 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.dag.api;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.DataInput;
-import java.io.DataInputStream;
-import java.io.DataOutput;
-import java.io.DataOutputStream;
-import java.io.IOException;
-
-import org.apache.tez.dag.api.VertexLocationHint.TaskLocationHint;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-public class TestVertexLocationHint {
-
-  private DataInput in;
-  private DataOutput out;
-  private ByteArrayOutputStream bOut;
-
-  @Before
-  public void setup() {
-    bOut = new ByteArrayOutputStream();
-    out = new DataOutputStream(bOut);
-  }
-
-  @After
-  public void teardown() {
-    in = null;
-    out = null;
-    bOut = null;
-  }
-
-  @Test
-  public void testNullTaskLocationHintSerDes() throws IOException {
-    TaskLocationHint expected = new TaskLocationHint(null, null);
-    expected.write(out);
-    in = new DataInputStream(new ByteArrayInputStream(bOut.toByteArray()));
-    TaskLocationHint actual = new TaskLocationHint();
-    actual.readFields(in);
-    Assert.assertNull(actual.getDataLocalHosts());
-    Assert.assertNull(actual.getRacks());
-  }
-
-  @Test
-  public void testTaskLocationHintSerDes() throws IOException {
-    String[] hosts = { "h1", "h2", "", null };
-    String[] racks = { "r1", "r2" };
-    TaskLocationHint expected = new TaskLocationHint(hosts, racks);
-    expected.write(out);
-    in = new DataInputStream(new ByteArrayInputStream(bOut.toByteArray()));
-    TaskLocationHint actual = new TaskLocationHint();
-    actual.readFields(in);
-    Assert.assertNotNull(actual.getDataLocalHosts());
-    Assert.assertNotNull(actual.getRacks());
-    Assert.assertArrayEquals(hosts, actual.getDataLocalHosts());
-    Assert.assertArrayEquals(racks, actual.getRacks());
-  }
-
-  @Test
-  public void testTaskLocationHintSerDes2() throws IOException {
-    String[] hosts = null;
-    String[] racks = { "r1", "r2" };
-    TaskLocationHint expected = new TaskLocationHint(hosts, racks);
-    expected.write(out);
-    in = new DataInputStream(new ByteArrayInputStream(bOut.toByteArray()));
-    TaskLocationHint actual = new TaskLocationHint();
-    actual.readFields(in);
-    Assert.assertNull(actual.getDataLocalHosts());
-    Assert.assertNotNull(actual.getRacks());
-    Assert.assertArrayEquals(racks, actual.getRacks());
-  }
-
-  @Test
-  public void testEmptyVertexLocationHintSerDes() throws IOException {
-    VertexLocationHint expected = new VertexLocationHint(0);
-    expected.write(out);
-    in = new DataInputStream(new ByteArrayInputStream(bOut.toByteArray()));
-    VertexLocationHint actual = new VertexLocationHint();
-    actual.readFields(in);
-    Assert.assertEquals(0, actual.getNumTasks());
-    Assert.assertNotNull(actual.getTaskLocationHints());
-    Assert.assertEquals(0, actual.getTaskLocationHints().length);
-  }
-
-  @Test
-  public void testVertexLocationHintSerDes() throws IOException {
-    String[] hosts = { "h1", "h2", "", null };
-    String[] racks = { "r1", "r2" };
-    VertexLocationHint expected = new VertexLocationHint(4);
-    expected.getTaskLocationHints()[0] = new TaskLocationHint(hosts, racks);
-    expected.getTaskLocationHints()[1] = null;
-    expected.getTaskLocationHints()[2] = new TaskLocationHint(null, racks);
-    expected.getTaskLocationHints()[3] = new TaskLocationHint(hosts, null);
-    expected.write(out);
-    in = new DataInputStream(new ByteArrayInputStream(bOut.toByteArray()));
-    VertexLocationHint actual = new VertexLocationHint();
-    actual.readFields(in);
-
-    Assert.assertEquals(4, actual.getNumTasks());
-    Assert.assertNotNull(actual.getTaskLocationHints());
-    Assert.assertEquals(4, actual.getTaskLocationHints().length);
-
-    Assert.assertNotNull(actual.getTaskLocationHints()[0]);
-    Assert.assertNull(actual.getTaskLocationHints()[1]);
-    Assert.assertNotNull(actual.getTaskLocationHints()[2]);
-    Assert.assertNotNull(actual.getTaskLocationHints()[3]);
-
-    Assert.assertArrayEquals(
-        expected.getTaskLocationHints()[0].getDataLocalHosts(),
-        actual.getTaskLocationHints()[0].getDataLocalHosts());
-    Assert.assertArrayEquals(
-        expected.getTaskLocationHints()[0].getRacks(),
-        actual.getTaskLocationHints()[0].getRacks());
-    Assert.assertNull(
-        actual.getTaskLocationHints()[2].getDataLocalHosts());
-    Assert.assertArrayEquals(
-        expected.getTaskLocationHints()[2].getRacks(),
-        actual.getTaskLocationHints()[2].getRacks());
-    Assert.assertArrayEquals(
-        expected.getTaskLocationHints()[3].getDataLocalHosts(),
-        actual.getTaskLocationHints()[3].getDataLocalHosts());
-    Assert.assertNull(
-        actual.getTaskLocationHints()[3].getRacks());
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/cb5758b4/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YARNRunner.java
----------------------------------------------------------------------
diff --git a/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YARNRunner.java b/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YARNRunner.java
index 715a364..9042846 100644
--- a/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YARNRunner.java
+++ b/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YARNRunner.java
@@ -99,7 +99,7 @@ import org.apache.hadoop.yarn.util.Apps;
 import org.apache.hadoop.yarn.util.BuilderUtils;
 import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.tez.dag.api.DAG;
-import org.apache.tez.dag.api.DAGConfiguration;
+import org.apache.tez.dag.api.DAGPlan.JobPlan;
 import org.apache.tez.dag.api.Edge;
 import org.apache.tez.dag.api.EdgeProperty;
 import org.apache.tez.dag.api.TezConfiguration;
@@ -737,6 +737,14 @@ public class YARNRunner implements ClientProtocol {
     List<String> vargs = new ArrayList<String>(8);
     vargs.add(Environment.JAVA_HOME.$() + "/bin/java");
 
+//[Debug AppMaster] Current simplest way to attach debugger to AppMaster
+// Uncomment the following, then launch a regular job, eg 
+// >hadoop jar {path}\hadoop-mapreduce-examples-3.0.0-SNAPSHOT.jar pi 2 2            
     
+//     LOG.error(" !!!!!!!!!");
+//     LOG.error(" !!!!!!!!! Launching AppMaster in debug/suspend mode.  Attach to port 8002");
+//     LOG.error(" !!!!!!!!!");
+//     vargs.add("-Xdebug -Djava.compiler=NONE -Xrunjdwp:transport=dt_socket,address=8002,server=y,suspend=y");
+    
     // FIXME set up logging related properties
     // TODO -Dtez.root.logger??
     // MRApps.addLog4jSystemProperties(logLevel, logSize, vargs);
@@ -795,24 +803,38 @@ public class YARNRunner implements ClientProtocol {
 
     setDAGParamsFromMRConf(dag);
 
-    // FIXME add serialized dag conf
-    DAGConfiguration dagConf = dag.serializeDag();
-    
-    Path dagConfFilePath = new Path(jobSubmitDir,
-        TezConfiguration.DAG_AM_PLAN_CONFIG_XML);
-    FSDataOutputStream dagConfOut =
-        FileSystem.create(fs, dagConfFilePath,
-            new FsPermission(DAG_FILE_PERMISSION));
+    // emit protobuf DAG file style
+    JobPlan dagPB = dag.createDag();
+    FSDataOutputStream dagPBOutBinaryStream = null;
+    FSDataOutputStream dagPBOutTextStream = null;
+    Path binaryPath =  new Path(jobSubmitDir, TezConfiguration.DAG_AM_PLAN_PB_BINARY);
+    Path textPath =  new Path(jobSubmitDir, TezConfiguration.DAG_AM_PLAN_PB_TEXT);
     try {
-      dagConf.writeXml(dagConfOut);
+      //binary output
+      dagPBOutBinaryStream = FileSystem.create(fs, binaryPath,
+          new FsPermission(DAG_FILE_PERMISSION));
+      dagPB.writeTo(dagPBOutBinaryStream);
+
+      // text / human-readable output
+      dagPBOutTextStream = FileSystem.create(fs, textPath,
+          new FsPermission(DAG_FILE_PERMISSION));
+      dagPBOutTextStream.writeUTF(dagPB.toString());
     } finally {
-      dagConfOut.close();
+      if(dagPBOutBinaryStream != null){
+        dagPBOutBinaryStream.close();
+      }
+      if(dagPBOutTextStream != null){
+        dagPBOutTextStream.close();
+      }
     }
-    localResources.put(TezConfiguration.DAG_AM_PLAN_CONFIG_XML,
+
+    localResources.put(TezConfiguration.DAG_AM_PLAN_PB_BINARY,
         createApplicationResource(defaultFileContext,
-            dagConfFilePath, LocalResourceType.FILE));
+            binaryPath, LocalResourceType.FILE));
 
-    // FIXME add tez conf if needed
+    localResources.put(TezConfiguration.DAG_AM_PLAN_PB_TEXT,
+        createApplicationResource(defaultFileContext,
+            textPath, LocalResourceType.FILE));
 
     // FIXME are we using MR acls for tez?
     Map<ApplicationAccessType, String> acls


Mime
View raw message