Return-Path: X-Original-To: apmail-tez-commits-archive@minotaur.apache.org Delivered-To: apmail-tez-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id C8BAE17D6A for ; Mon, 29 Sep 2014 00:35:08 +0000 (UTC) Received: (qmail 59278 invoked by uid 500); 29 Sep 2014 00:35:08 -0000 Delivered-To: apmail-tez-commits-archive@tez.apache.org Received: (qmail 59162 invoked by uid 500); 29 Sep 2014 00:35:08 -0000 Mailing-List: contact commits-help@tez.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@tez.apache.org Delivered-To: mailing list commits@tez.apache.org Received: (qmail 58377 invoked by uid 99); 29 Sep 2014 00:35:08 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 29 Sep 2014 00:35:08 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id DC5A79B9C5C; Mon, 29 Sep 2014 00:35:07 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: bikas@apache.org To: commits@tez.apache.org Date: Mon, 29 Sep 2014 00:35:24 -0000 Message-Id: <70aad6f7fdd5477cb99ed26edab96116@git.apache.org> In-Reply-To: <82bc55d5adc141e4922f88c793b00754@git.apache.org> References: <82bc55d5adc141e4922f88c793b00754@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [18/50] [abbrv] git commit: TEZ-1581. GroupByOrderByMRRTest no longer functional. (hitesh) TEZ-1581. GroupByOrderByMRRTest no longer functional. (hitesh) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/7d1303fa Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/7d1303fa Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/7d1303fa Branch: refs/heads/branch-0.5 Commit: 7d1303fa606e700bd4d4b2a122a73a5badbbc889 Parents: edb841c Author: Hitesh Shah Authored: Fri Sep 12 15:18:05 2014 -0700 Committer: Hitesh Shah Committed: Fri Sep 12 15:18:05 2014 -0700 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../examples/GroupByOrderByMRRTest.java | 283 +++++++++++++------ 2 files changed, 205 insertions(+), 79 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/7d1303fa/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 59be260..3198323 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -18,6 +18,7 @@ ALL CHANGES: TEZ-1569. Add tests for preemption TEZ-1580. Change TestOrderedWordCount to optionally use MR configs. TEZ-1524. Resolve user group information only if ACLs are enabled. + TEZ-1581. GroupByOrderByMRRTest no longer functional. Release 0.5.1: Unreleased http://git-wip-us.apache.org/repos/asf/tez/blob/7d1303fa/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/GroupByOrderByMRRTest.java ---------------------------------------------------------------------- diff --git a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/GroupByOrderByMRRTest.java b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/GroupByOrderByMRRTest.java index 939bea0..393faea 100644 --- a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/GroupByOrderByMRRTest.java +++ b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/GroupByOrderByMRRTest.java @@ -18,34 +18,54 @@ package org.apache.tez.mapreduce.examples; +import java.io.ByteArrayOutputStream; import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; import java.util.StringTokenizer; +import java.util.TreeMap; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.fs.FileAlreadyExistsException; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.JobID; +import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; -import org.apache.hadoop.mapreduce.TypeConverter; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; -import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; +import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.GenericOptionsParser; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; -import org.apache.hadoop.yarn.api.records.ApplicationId; -import org.apache.tez.client.MRTezClient; +import org.apache.hadoop.yarn.api.records.LocalResource; +import org.apache.tez.client.TezClient; +import org.apache.tez.common.TezUtils; +import org.apache.tez.dag.api.DAG; +import org.apache.tez.dag.api.DataSourceDescriptor; +import org.apache.tez.dag.api.Edge; +import org.apache.tez.dag.api.ProcessorDescriptor; import org.apache.tez.dag.api.TezConfiguration; -import org.apache.tez.dag.api.TezException; +import org.apache.tez.dag.api.UserPayload; +import org.apache.tez.dag.api.Vertex; import org.apache.tez.dag.api.client.DAGClient; import org.apache.tez.dag.api.client.DAGStatus; +import org.apache.tez.mapreduce.hadoop.MRHelpers; +import org.apache.tez.mapreduce.hadoop.MRInputHelpers; import org.apache.tez.mapreduce.hadoop.MRJobConfig; -import org.apache.tez.mapreduce.hadoop.MultiStageMRConfigUtil; +import org.apache.tez.mapreduce.output.MROutputLegacy; +import org.apache.tez.mapreduce.processor.map.MapProcessor; +import org.apache.tez.mapreduce.processor.reduce.ReduceProcessor; +import org.apache.tez.runtime.library.api.TezRuntimeConfiguration; +import org.apache.tez.runtime.library.conf.OrderedPartitionedKVEdgeConfig; +import org.apache.tez.runtime.library.partitioner.HashPartitioner; /** * Simple example that does a GROUP BY ORDER BY in an MRR job @@ -94,7 +114,7 @@ public class GroupByOrderByMRRTest extends Configured implements Tool { public void map(Object key, Text value, Context context ) throws IOException, InterruptedException { StringTokenizer itr = new StringTokenizer(value.toString()); - String empName = ""; + String empName; String deptName = ""; if (itr.hasMoreTokens()) { empName = itr.nextToken(); @@ -149,25 +169,141 @@ public class GroupByOrderByMRRTest extends Configured implements Tool { } } + private static DAG createDAG(Configuration conf, Map commonLocalResources, + Path stagingDir, String inputPath, String outputPath, boolean useMRSettings) + throws Exception { + + Configuration mapStageConf = new JobConf(conf); + mapStageConf.set(MRJobConfig.MAP_CLASS_ATTR, + MyMapper.class.getName()); + + MRHelpers.translateMRConfToTez(mapStageConf); + + Configuration iReduceStageConf = new JobConf(conf); + // TODO replace with auto-reduce parallelism + iReduceStageConf.setInt(MRJobConfig.NUM_REDUCES, 2); + iReduceStageConf.set(MRJobConfig.REDUCE_CLASS_ATTR, + MyGroupByReducer.class.getName()); + iReduceStageConf.set(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_CLASS, Text.class.getName()); + iReduceStageConf.set(TezRuntimeConfiguration.TEZ_RUNTIME_VALUE_CLASS, + IntWritable.class.getName()); + iReduceStageConf.setBoolean("mapred.mapper.new-api", true); + MRHelpers.translateMRConfToTez(iReduceStageConf); + + Configuration finalReduceConf = new JobConf(conf); + finalReduceConf.setInt(MRJobConfig.NUM_REDUCES, 1); + finalReduceConf.set(MRJobConfig.REDUCE_CLASS_ATTR, + MyOrderByNoOpReducer.class.getName()); + finalReduceConf.set(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_CLASS, IntWritable.class.getName()); + finalReduceConf.set(TezRuntimeConfiguration.TEZ_RUNTIME_VALUE_CLASS, Text.class.getName()); + MRHelpers.translateMRConfToTez(finalReduceConf); + + MRHelpers.configureMRApiUsage(mapStageConf); + MRHelpers.configureMRApiUsage(iReduceStageConf); + MRHelpers.configureMRApiUsage(finalReduceConf); + + List vertices = new ArrayList(); + + ByteArrayOutputStream outputStream = new ByteArrayOutputStream(4096); + mapStageConf.writeXml(outputStream); + String mapStageHistoryText = new String(outputStream.toByteArray(), "UTF-8"); + mapStageConf.set(MRJobConfig.INPUT_FORMAT_CLASS_ATTR, + TextInputFormat.class.getName()); + mapStageConf.set(FileInputFormat.INPUT_DIR, inputPath); + mapStageConf.setBoolean("mapred.mapper.new-api", true); + DataSourceDescriptor dsd = MRInputHelpers.configureMRInputWithLegacySplitGeneration( + mapStageConf, stagingDir, true); + + Vertex mapVertex; + ProcessorDescriptor mapProcessorDescriptor = + ProcessorDescriptor.create(MapProcessor.class.getName()) + .setUserPayload( + TezUtils.createUserPayloadFromConf(mapStageConf)) + .setHistoryText(mapStageHistoryText); + if (!useMRSettings) { + mapVertex = Vertex.create("initialmap", mapProcessorDescriptor); + } else { + mapVertex = Vertex.create("initialmap", mapProcessorDescriptor, -1, + MRHelpers.getResourceForMRMapper(mapStageConf)); + mapVertex.setTaskLaunchCmdOpts(MRHelpers.getJavaOptsForMRMapper(mapStageConf)); + } + mapVertex.addTaskLocalFiles(commonLocalResources) + .addDataSource("MRInput", dsd); + vertices.add(mapVertex); + + ByteArrayOutputStream iROutputStream = new ByteArrayOutputStream(4096); + iReduceStageConf.writeXml(iROutputStream); + String iReduceStageHistoryText = new String(iROutputStream.toByteArray(), "UTF-8"); + + ProcessorDescriptor iReduceProcessorDescriptor = ProcessorDescriptor.create( + ReduceProcessor.class.getName()) + .setUserPayload(TezUtils.createUserPayloadFromConf(iReduceStageConf)) + .setHistoryText(iReduceStageHistoryText); + + Vertex intermediateVertex; + if (!useMRSettings) { + intermediateVertex = Vertex.create("ireduce1", iReduceProcessorDescriptor, 1); + } else { + intermediateVertex = Vertex.create("ireduce1", iReduceProcessorDescriptor, + 1, MRHelpers.getResourceForMRReducer(iReduceStageConf)); + intermediateVertex.setTaskLaunchCmdOpts(MRHelpers.getJavaOptsForMRReducer(iReduceStageConf)); + } + intermediateVertex.addTaskLocalFiles(commonLocalResources); + vertices.add(intermediateVertex); + + ByteArrayOutputStream finalReduceOutputStream = new ByteArrayOutputStream(4096); + finalReduceConf.writeXml(finalReduceOutputStream); + String finalReduceStageHistoryText = new String(finalReduceOutputStream.toByteArray(), "UTF-8"); + UserPayload finalReducePayload = TezUtils.createUserPayloadFromConf(finalReduceConf); + Vertex finalReduceVertex; + + ProcessorDescriptor finalReduceProcessorDescriptor = + ProcessorDescriptor.create( + ReduceProcessor.class.getName()) + .setUserPayload(finalReducePayload) + .setHistoryText(finalReduceStageHistoryText); + if (!useMRSettings) { + finalReduceVertex = Vertex.create("finalreduce", finalReduceProcessorDescriptor, 1); + } else { + finalReduceVertex = Vertex.create("finalreduce", finalReduceProcessorDescriptor, 1, + MRHelpers.getResourceForMRReducer(finalReduceConf)); + finalReduceVertex.setTaskLaunchCmdOpts(MRHelpers.getJavaOptsForMRReducer(finalReduceConf)); + } + finalReduceVertex.addTaskLocalFiles(commonLocalResources); + finalReduceVertex.addDataSink("MROutput", + MROutputLegacy.createConfigBuilder(finalReduceConf, TextOutputFormat.class, outputPath) + .build()); + vertices.add(finalReduceVertex); + + DAG dag = DAG.create("groupbyorderbymrrtest"); + for (Vertex v : vertices) { + dag.addVertex(v); + } + + OrderedPartitionedKVEdgeConfig edgeConf1 = OrderedPartitionedKVEdgeConfig + .newBuilder(Text.class.getName(), IntWritable.class.getName(), + HashPartitioner.class.getName()).setFromConfiguration(conf) + .configureInput().useLegacyInput().done().build(); + dag.addEdge( + Edge.create(dag.getVertex("initialmap"), dag.getVertex("ireduce1"), + edgeConf1.createDefaultEdgeProperty())); + + OrderedPartitionedKVEdgeConfig edgeConf2 = OrderedPartitionedKVEdgeConfig + .newBuilder(IntWritable.class.getName(), Text.class.getName(), + HashPartitioner.class.getName()).setFromConfiguration(conf) + .configureInput().useLegacyInput().done().build(); + dag.addEdge( + Edge.create(dag.getVertex("ireduce1"), dag.getVertex("finalreduce"), + edgeConf2.createDefaultEdgeProperty())); + + return dag; + } + + @Override public int run(String[] args) throws Exception { Configuration conf = getConf(); - // Configure intermediate reduces - conf.setInt(MRJobConfig.MRR_INTERMEDIATE_STAGES, 1); - - // Set reducer class for intermediate reduce - conf.setClass(MultiStageMRConfigUtil.getPropertyNameForIntermediateStage(1, - "mapreduce.job.reduce.class"), MyGroupByReducer.class, Reducer.class); - // Set reducer output key class - conf.setClass(MultiStageMRConfigUtil.getPropertyNameForIntermediateStage(1, - "mapreduce.map.output.key.class"), IntWritable.class, Object.class); - // Set reducer output value class - conf.setClass(MultiStageMRConfigUtil.getPropertyNameForIntermediateStage(1, - "mapreduce.map.output.value.class"), Text.class, Object.class); - conf.setInt(MultiStageMRConfigUtil.getPropertyNameForIntermediateStage(1, - "mapreduce.job.reduces"), 2); - String[] otherArgs = new GenericOptionsParser(conf, args). getRemainingArgs(); if (otherArgs.length != 2) { @@ -176,66 +312,55 @@ public class GroupByOrderByMRRTest extends Configured implements Tool { return 2; } - @SuppressWarnings("deprecation") - Job job = new Job(conf, "groupbyorderbymrrtest"); - - job.setJarByClass(GroupByOrderByMRRTest.class); - - // Configure map - job.setMapperClass(MyMapper.class); - job.setMapOutputKeyClass(Text.class); - job.setMapOutputValueClass(IntWritable.class); - - // Configure reduce - job.setReducerClass(MyOrderByNoOpReducer.class); - job.setOutputKeyClass(Text.class); - job.setOutputValueClass(IntWritable.class); - job.setNumReduceTasks(1); - - FileInputFormat.addInputPath(job, new Path(otherArgs[0])); - FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); - - job.submit(); - JobID jobId = job.getJobID(); - ApplicationId appId = TypeConverter.toYarn(jobId).getAppId(); - - DAGClient dagClient = MRTezClient.getDAGClient(appId, new TezConfiguration(conf), null); - DAGStatus dagStatus; - String[] vNames = { "initialmap" , "ireduce1" , "finalreduce" }; - while (true) { - dagStatus = dagClient.getDAGStatus(null); - if(dagStatus.getState() == DAGStatus.State.RUNNING || - dagStatus.getState() == DAGStatus.State.SUCCEEDED || - dagStatus.getState() == DAGStatus.State.FAILED || - dagStatus.getState() == DAGStatus.State.KILLED || - dagStatus.getState() == DAGStatus.State.ERROR) { - break; - } - try { - Thread.sleep(500); - } catch (InterruptedException e) { - // continue; - } + String inputPath = otherArgs[0]; + String outputPath = otherArgs[1]; + + UserGroupInformation.setConfiguration(conf); + + TezConfiguration tezConf = new TezConfiguration(conf); + FileSystem fs = FileSystem.get(conf); + + if (fs.exists(new Path(outputPath))) { + throw new FileAlreadyExistsException("Output directory " + + outputPath + " already exists"); } - while (dagStatus.getState() == DAGStatus.State.RUNNING) { - try { - ExampleDriver.printDAGStatus(dagClient, vNames); - try { - Thread.sleep(1000); - } catch (InterruptedException e) { - // continue; - } - dagStatus = dagClient.getDAGStatus(null); - } catch (TezException e) { - LOG.fatal("Failed to get application progress. Exiting"); + Map localResources = + new TreeMap(); + + String stagingDirStr = conf.get(TezConfiguration.TEZ_AM_STAGING_DIR, + TezConfiguration.TEZ_AM_STAGING_DIR_DEFAULT) + Path.SEPARATOR + + Long.toString(System.currentTimeMillis()); + Path stagingDir = new Path(stagingDirStr); + FileSystem pathFs = stagingDir.getFileSystem(tezConf); + pathFs.mkdirs(new Path(stagingDirStr)); + + tezConf.set(TezConfiguration.TEZ_AM_STAGING_DIR, stagingDirStr); + stagingDir = pathFs.makeQualified(new Path(stagingDirStr)); + + TezClient tezClient = TezClient.create("groupbyorderbymrrtest", tezConf); + tezClient.start(); + + LOG.info("Submitting groupbyorderbymrrtest DAG as a new Tez Application"); + + try { + DAG dag = createDAG(conf, localResources, stagingDir, inputPath, outputPath, true); + + tezClient.waitTillReady(); + + DAGClient dagClient = tezClient.submitDAG(dag); + + DAGStatus dagStatus = dagClient.waitForCompletionWithStatusUpdates(null); + if (dagStatus.getState() != DAGStatus.State.SUCCEEDED) { + LOG.error("groupbyorderbymrrtest failed, state=" + dagStatus.getState() + + ", diagnostics=" + dagStatus.getDiagnostics()); return -1; } + LOG.info("Application completed. " + "FinalState=" + dagStatus.getState()); + return 0; + } finally { + tezClient.stop(); } - - ExampleDriver.printDAGStatus(dagClient, vNames); - LOG.info("Application completed. " + "FinalState=" + dagStatus.getState()); - return dagStatus.getState() == DAGStatus.State.SUCCEEDED ? 0 : 1; } public static void main(String[] args) throws Exception {