Return-Path: X-Original-To: apmail-tez-commits-archive@minotaur.apache.org Delivered-To: apmail-tez-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id D50BF11467 for ; Mon, 13 May 2013 18:22:06 +0000 (UTC) Received: (qmail 94488 invoked by uid 500); 13 May 2013 18:22:06 -0000 Delivered-To: apmail-tez-commits-archive@tez.apache.org Received: (qmail 94468 invoked by uid 500); 13 May 2013 18:22:06 -0000 Mailing-List: contact commits-help@tez.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@tez.apache.org Delivered-To: mailing list commits@tez.apache.org Received: (qmail 94461 invoked by uid 99); 13 May 2013 18:22:06 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 13 May 2013 18:22:06 +0000 X-ASF-Spam-Status: No, hits=-2000.6 required=5.0 tests=ALL_TRUSTED,RP_MATCHES_RCVD X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO mail.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with SMTP; Mon, 13 May 2013 18:22:05 +0000 Received: (qmail 93764 invoked by uid 99); 13 May 2013 18:21:45 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 13 May 2013 18:21:45 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id E0C1C88FB56; Mon, 13 May 2013 18:21:44 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: mliddell@apache.org To: commits@tez.incubator.apache.org Date: Mon, 13 May 2013 18:21:44 -0000 Message-Id: <56217bddeb8e4ab9be61c0df00154afa@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [1/2] TEZ-25: Change DAG representation from Configuration object to structured protobuf message X-Virus-Checked: Checked by ClamAV on apache.org Updated Branches: refs/heads/TEZ-1 97e1fa967 -> cb5758b42 http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/cb5758b4/tez-dag/src/test/java/org/apache/tez/dag/api/TestVertexLocationHint.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/api/TestVertexLocationHint.java b/tez-dag/src/test/java/org/apache/tez/dag/api/TestVertexLocationHint.java deleted file mode 100644 index eb8c781..0000000 --- a/tez-dag/src/test/java/org/apache/tez/dag/api/TestVertexLocationHint.java +++ /dev/null @@ -1,147 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tez.dag.api; - -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.DataInput; -import java.io.DataInputStream; -import java.io.DataOutput; -import java.io.DataOutputStream; -import java.io.IOException; - -import org.apache.tez.dag.api.VertexLocationHint.TaskLocationHint; -import org.junit.After; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; - -public class TestVertexLocationHint { - - private DataInput in; - private DataOutput out; - private ByteArrayOutputStream bOut; - - @Before - public void setup() { - bOut = new ByteArrayOutputStream(); - out = new DataOutputStream(bOut); - } - - @After - public void teardown() { - in = null; - out = null; - bOut = null; - } - - @Test - public void testNullTaskLocationHintSerDes() throws IOException { - TaskLocationHint expected = new TaskLocationHint(null, null); - expected.write(out); - in = new DataInputStream(new ByteArrayInputStream(bOut.toByteArray())); - TaskLocationHint actual = new TaskLocationHint(); - actual.readFields(in); - Assert.assertNull(actual.getDataLocalHosts()); - Assert.assertNull(actual.getRacks()); - } - - @Test - public void testTaskLocationHintSerDes() throws IOException { - String[] hosts = { "h1", "h2", "", null }; - String[] racks = { "r1", "r2" }; - TaskLocationHint expected = new TaskLocationHint(hosts, racks); - expected.write(out); - in = new DataInputStream(new ByteArrayInputStream(bOut.toByteArray())); - TaskLocationHint actual = new TaskLocationHint(); - actual.readFields(in); - Assert.assertNotNull(actual.getDataLocalHosts()); - Assert.assertNotNull(actual.getRacks()); - Assert.assertArrayEquals(hosts, actual.getDataLocalHosts()); - Assert.assertArrayEquals(racks, actual.getRacks()); - } - - @Test - public void testTaskLocationHintSerDes2() throws IOException { - String[] hosts = null; - String[] racks = { "r1", "r2" }; - TaskLocationHint expected = new TaskLocationHint(hosts, racks); - expected.write(out); - in = new DataInputStream(new ByteArrayInputStream(bOut.toByteArray())); - TaskLocationHint actual = new TaskLocationHint(); - actual.readFields(in); - Assert.assertNull(actual.getDataLocalHosts()); - Assert.assertNotNull(actual.getRacks()); - Assert.assertArrayEquals(racks, actual.getRacks()); - } - - @Test - public void testEmptyVertexLocationHintSerDes() throws IOException { - VertexLocationHint expected = new VertexLocationHint(0); - expected.write(out); - in = new DataInputStream(new ByteArrayInputStream(bOut.toByteArray())); - VertexLocationHint actual = new VertexLocationHint(); - actual.readFields(in); - Assert.assertEquals(0, actual.getNumTasks()); - Assert.assertNotNull(actual.getTaskLocationHints()); - Assert.assertEquals(0, actual.getTaskLocationHints().length); - } - - @Test - public void testVertexLocationHintSerDes() throws IOException { - String[] hosts = { "h1", "h2", "", null }; - String[] racks = { "r1", "r2" }; - VertexLocationHint expected = new VertexLocationHint(4); - expected.getTaskLocationHints()[0] = new TaskLocationHint(hosts, racks); - expected.getTaskLocationHints()[1] = null; - expected.getTaskLocationHints()[2] = new TaskLocationHint(null, racks); - expected.getTaskLocationHints()[3] = new TaskLocationHint(hosts, null); - expected.write(out); - in = new DataInputStream(new ByteArrayInputStream(bOut.toByteArray())); - VertexLocationHint actual = new VertexLocationHint(); - actual.readFields(in); - - Assert.assertEquals(4, actual.getNumTasks()); - Assert.assertNotNull(actual.getTaskLocationHints()); - Assert.assertEquals(4, actual.getTaskLocationHints().length); - - Assert.assertNotNull(actual.getTaskLocationHints()[0]); - Assert.assertNull(actual.getTaskLocationHints()[1]); - Assert.assertNotNull(actual.getTaskLocationHints()[2]); - Assert.assertNotNull(actual.getTaskLocationHints()[3]); - - Assert.assertArrayEquals( - expected.getTaskLocationHints()[0].getDataLocalHosts(), - actual.getTaskLocationHints()[0].getDataLocalHosts()); - Assert.assertArrayEquals( - expected.getTaskLocationHints()[0].getRacks(), - actual.getTaskLocationHints()[0].getRacks()); - Assert.assertNull( - actual.getTaskLocationHints()[2].getDataLocalHosts()); - Assert.assertArrayEquals( - expected.getTaskLocationHints()[2].getRacks(), - actual.getTaskLocationHints()[2].getRacks()); - Assert.assertArrayEquals( - expected.getTaskLocationHints()[3].getDataLocalHosts(), - actual.getTaskLocationHints()[3].getDataLocalHosts()); - Assert.assertNull( - actual.getTaskLocationHints()[3].getRacks()); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/cb5758b4/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YARNRunner.java ---------------------------------------------------------------------- diff --git a/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YARNRunner.java b/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YARNRunner.java index 715a364..9042846 100644 --- a/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YARNRunner.java +++ b/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YARNRunner.java @@ -99,7 +99,7 @@ import org.apache.hadoop.yarn.util.Apps; import org.apache.hadoop.yarn.util.BuilderUtils; import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.tez.dag.api.DAG; -import org.apache.tez.dag.api.DAGConfiguration; +import org.apache.tez.dag.api.DAGPlan.JobPlan; import org.apache.tez.dag.api.Edge; import org.apache.tez.dag.api.EdgeProperty; import org.apache.tez.dag.api.TezConfiguration; @@ -737,6 +737,14 @@ public class YARNRunner implements ClientProtocol { List vargs = new ArrayList(8); vargs.add(Environment.JAVA_HOME.$() + "/bin/java"); +//[Debug AppMaster] Current simplest way to attach debugger to AppMaster +// Uncomment the following, then launch a regular job, eg +// >hadoop jar {path}\hadoop-mapreduce-examples-3.0.0-SNAPSHOT.jar pi 2 2 +// LOG.error(" !!!!!!!!!"); +// LOG.error(" !!!!!!!!! Launching AppMaster in debug/suspend mode. Attach to port 8002"); +// LOG.error(" !!!!!!!!!"); +// vargs.add("-Xdebug -Djava.compiler=NONE -Xrunjdwp:transport=dt_socket,address=8002,server=y,suspend=y"); + // FIXME set up logging related properties // TODO -Dtez.root.logger?? // MRApps.addLog4jSystemProperties(logLevel, logSize, vargs); @@ -795,24 +803,38 @@ public class YARNRunner implements ClientProtocol { setDAGParamsFromMRConf(dag); - // FIXME add serialized dag conf - DAGConfiguration dagConf = dag.serializeDag(); - - Path dagConfFilePath = new Path(jobSubmitDir, - TezConfiguration.DAG_AM_PLAN_CONFIG_XML); - FSDataOutputStream dagConfOut = - FileSystem.create(fs, dagConfFilePath, - new FsPermission(DAG_FILE_PERMISSION)); + // emit protobuf DAG file style + JobPlan dagPB = dag.createDag(); + FSDataOutputStream dagPBOutBinaryStream = null; + FSDataOutputStream dagPBOutTextStream = null; + Path binaryPath = new Path(jobSubmitDir, TezConfiguration.DAG_AM_PLAN_PB_BINARY); + Path textPath = new Path(jobSubmitDir, TezConfiguration.DAG_AM_PLAN_PB_TEXT); try { - dagConf.writeXml(dagConfOut); + //binary output + dagPBOutBinaryStream = FileSystem.create(fs, binaryPath, + new FsPermission(DAG_FILE_PERMISSION)); + dagPB.writeTo(dagPBOutBinaryStream); + + // text / human-readable output + dagPBOutTextStream = FileSystem.create(fs, textPath, + new FsPermission(DAG_FILE_PERMISSION)); + dagPBOutTextStream.writeUTF(dagPB.toString()); } finally { - dagConfOut.close(); + if(dagPBOutBinaryStream != null){ + dagPBOutBinaryStream.close(); + } + if(dagPBOutTextStream != null){ + dagPBOutTextStream.close(); + } } - localResources.put(TezConfiguration.DAG_AM_PLAN_CONFIG_XML, + + localResources.put(TezConfiguration.DAG_AM_PLAN_PB_BINARY, createApplicationResource(defaultFileContext, - dagConfFilePath, LocalResourceType.FILE)); + binaryPath, LocalResourceType.FILE)); - // FIXME add tez conf if needed + localResources.put(TezConfiguration.DAG_AM_PLAN_PB_TEXT, + createApplicationResource(defaultFileContext, + textPath, LocalResourceType.FILE)); // FIXME are we using MR acls for tez? Map acls