Return-Path: X-Original-To: apmail-tez-commits-archive@minotaur.apache.org Delivered-To: apmail-tez-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 02ED51180B for ; Thu, 18 Sep 2014 19:49:48 +0000 (UTC) Received: (qmail 46599 invoked by uid 500); 18 Sep 2014 19:49:47 -0000 Delivered-To: apmail-tez-commits-archive@tez.apache.org Received: (qmail 46537 invoked by uid 500); 18 Sep 2014 19:49:47 -0000 Mailing-List: contact commits-help@tez.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@tez.apache.org Delivered-To: mailing list commits@tez.apache.org Received: (qmail 46075 invoked by uid 99); 18 Sep 2014 19:49:47 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 18 Sep 2014 19:49:47 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 645D8A1B8EF; Thu, 18 Sep 2014 19:49:47 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: jeagles@apache.org To: commits@tez.apache.org Date: Thu, 18 Sep 2014 19:50:01 -0000 Message-Id: <10508e0fa4b34531b02edc4bdab12303@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [16/25] git commit: TEZ-1580. Change TestOrderedWordCount to optionally use MR configs. (hitesh) TEZ-1580. Change TestOrderedWordCount to optionally use MR configs. (hitesh) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/9dd0cb4d Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/9dd0cb4d Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/9dd0cb4d Branch: refs/heads/TEZ-8 Commit: 9dd0cb4d8e933d5f57b3d9ae532e7167978aed68 Parents: 5e5683a Author: Hitesh Shah Authored: Fri Sep 12 14:25:17 2014 -0700 Committer: Hitesh Shah Committed: Fri Sep 12 14:25:17 2014 -0700 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../examples/TestOrderedWordCount.java | 61 +++++++++++++++----- 2 files changed, 49 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/9dd0cb4d/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 73a3671..f71c2e2 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -16,6 +16,7 @@ ALL CHANGES: TEZ-1575. MRRSleepJob does not pick MR settings for container size and java opts. TEZ-1578. Remove TeraSort from Tez codebase. TEZ-1569. Add tests for preemption + TEZ-1580. Change TestOrderedWordCount to optionally use MR configs. Release 0.5.1: Unreleased http://git-wip-us.apache.org/repos/asf/tez/blob/9dd0cb4d/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/TestOrderedWordCount.java ---------------------------------------------------------------------- diff --git a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/TestOrderedWordCount.java b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/TestOrderedWordCount.java index 2c5db10..a36d1d2 100644 --- a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/TestOrderedWordCount.java +++ b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/TestOrderedWordCount.java @@ -149,7 +149,9 @@ public class TestOrderedWordCount extends Configured implements Tool { public DAG createDAG(FileSystem fs, Configuration conf, Map commonLocalResources, Path stagingDir, int dagIndex, String inputPath, String outputPath, - boolean generateSplitsInClient) throws Exception { + boolean generateSplitsInClient, + boolean useMRSettings, + int intermediateNumReduceTasks) throws Exception { Configuration mapStageConf = new JobConf(conf); mapStageConf.set(MRJobConfig.MAP_CLASS_ATTR, @@ -196,32 +198,62 @@ public class TestOrderedWordCount extends Configured implements Tool { dsd = MRInputLegacy.createConfigBuilder(mapStageConf, TextInputFormat.class, inputPath).build(); } - Vertex mapVertex = Vertex.create("initialmap", ProcessorDescriptor.create( - MapProcessor.class.getName()).setUserPayload( - TezUtils.createUserPayloadFromConf(mapStageConf)) - .setHistoryText(mapStageHistoryText)).addTaskLocalFiles(commonLocalResources); - mapVertex.addDataSource("MRInput", dsd); + Vertex mapVertex; + ProcessorDescriptor mapProcessorDescriptor = + ProcessorDescriptor.create(MapProcessor.class.getName()) + .setUserPayload( + TezUtils.createUserPayloadFromConf(mapStageConf)) + .setHistoryText(mapStageHistoryText); + if (!useMRSettings) { + mapVertex = Vertex.create("initialmap", mapProcessorDescriptor); + } else { + mapVertex = Vertex.create("initialmap", mapProcessorDescriptor, -1, + MRHelpers.getResourceForMRMapper(mapStageConf)); + mapVertex.setTaskLaunchCmdOpts(MRHelpers.getJavaOptsForMRMapper(mapStageConf)); + } + mapVertex.addTaskLocalFiles(commonLocalResources) + .addDataSource("MRInput", dsd); vertices.add(mapVertex); ByteArrayOutputStream iROutputStream = new ByteArrayOutputStream(4096); iReduceStageConf.writeXml(iROutputStream); String iReduceStageHistoryText = new String(iROutputStream.toByteArray(), "UTF-8"); - Vertex ivertex = Vertex.create("intermediate_reducer", ProcessorDescriptor.create( + + ProcessorDescriptor iReduceProcessorDescriptor = ProcessorDescriptor.create( ReduceProcessor.class.getName()) .setUserPayload(TezUtils.createUserPayloadFromConf(iReduceStageConf)) - .setHistoryText(iReduceStageHistoryText), 2); - ivertex.addTaskLocalFiles(commonLocalResources); - vertices.add(ivertex); + .setHistoryText(iReduceStageHistoryText); + + Vertex intermediateVertex; + if (!useMRSettings) { + intermediateVertex = Vertex.create("intermediate_reducer", iReduceProcessorDescriptor, + intermediateNumReduceTasks); + } else { + intermediateVertex = Vertex.create("intermediate_reducer", iReduceProcessorDescriptor, + intermediateNumReduceTasks, MRHelpers.getResourceForMRReducer(iReduceStageConf)); + intermediateVertex.setTaskLaunchCmdOpts(MRHelpers.getJavaOptsForMRReducer(iReduceStageConf)); + } + intermediateVertex.addTaskLocalFiles(commonLocalResources); + vertices.add(intermediateVertex); ByteArrayOutputStream finalReduceOutputStream = new ByteArrayOutputStream(4096); finalReduceConf.writeXml(finalReduceOutputStream); String finalReduceStageHistoryText = new String(finalReduceOutputStream.toByteArray(), "UTF-8"); UserPayload finalReducePayload = TezUtils.createUserPayloadFromConf(finalReduceConf); - Vertex finalReduceVertex = Vertex.create("finalreduce", + Vertex finalReduceVertex; + + ProcessorDescriptor finalReduceProcessorDescriptor = ProcessorDescriptor.create( ReduceProcessor.class.getName()) .setUserPayload(finalReducePayload) - .setHistoryText(finalReduceStageHistoryText), 1); + .setHistoryText(finalReduceStageHistoryText); + if (!useMRSettings) { + finalReduceVertex = Vertex.create("finalreduce", finalReduceProcessorDescriptor, 1); + } else { + finalReduceVertex = Vertex.create("finalreduce", finalReduceProcessorDescriptor, 1, + MRHelpers.getResourceForMRReducer(finalReduceConf)); + finalReduceVertex.setTaskLaunchCmdOpts(MRHelpers.getJavaOptsForMRReducer(finalReduceConf)); + } finalReduceVertex.addTaskLocalFiles(commonLocalResources); finalReduceVertex.addDataSink("MROutput", MROutputLegacy.createConfigBuilder(finalReduceConf, TextOutputFormat.class, outputPath) @@ -283,6 +315,9 @@ public class TestOrderedWordCount extends Configured implements Tool { * 1000; boolean retainStagingDir = conf.getBoolean("RETAIN_STAGING_DIR", false); + boolean useMRSettings = conf.getBoolean("USE_MR_CONFIGS", true); + // TODO needs to use auto reduce parallelism + int intermediateNumReduceTasks = conf.getInt("IREDUCE_NUM_TASKS", 2); if (((otherArgs.length%2) != 0) || (!useTezSession && otherArgs.length != 2)) { @@ -371,7 +406,7 @@ public class TestOrderedWordCount extends Configured implements Tool { DAG dag = instance.createDAG(fs, conf, localResources, stagingDir, dagIndex, inputPath, outputPath, - generateSplitsInClient); + generateSplitsInClient, useMRSettings, intermediateNumReduceTasks); boolean doPreWarm = dagIndex == 1 && useTezSession && conf.getBoolean("PRE_WARM_SESSION", true);