Return-Path: X-Original-To: apmail-tez-commits-archive@minotaur.apache.org Delivered-To: apmail-tez-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 0334911D0F for ; Tue, 15 Jul 2014 00:09:02 +0000 (UTC) Received: (qmail 66844 invoked by uid 500); 15 Jul 2014 00:09:01 -0000 Delivered-To: apmail-tez-commits-archive@tez.apache.org Received: (qmail 66807 invoked by uid 500); 15 Jul 2014 00:09:01 -0000 Mailing-List: contact commits-help@tez.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@tez.incubator.apache.org Delivered-To: mailing list commits@tez.incubator.apache.org Received: (qmail 66798 invoked by uid 99); 15 Jul 2014 00:09:01 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 15 Jul 2014 00:09:01 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED,RP_MATCHES_RCVD X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO mail.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with SMTP; Tue, 15 Jul 2014 00:08:57 +0000 Received: (qmail 66202 invoked by uid 99); 15 Jul 2014 00:08:36 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 15 Jul 2014 00:08:36 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 9E9FE94B656; Tue, 15 Jul 2014 00:08:36 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sseth@apache.org To: commits@tez.incubator.apache.org Date: Tue, 15 Jul 2014 00:08:37 -0000 Message-Id: <710ae06406b94a84b3614420ce171458@git.apache.org> In-Reply-To: <85146f54afd54be19e6d55eb64fec3b4@git.apache.org> References: <85146f54afd54be19e6d55eb64fec3b4@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [2/2] git commit: TEZ-1272. Change YARNRunner to make use of EdgeConfigurations. (sseth) X-Virus-Checked: Checked by ClamAV on apache.org TEZ-1272. Change YARNRunner to make use of EdgeConfigurations. (sseth) Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/97805278 Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/97805278 Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/97805278 Branch: refs/heads/master Commit: 97805278ed9b7e9f8a20521574b22515d1c553e7 Parents: 650151a Author: Siddharth Seth Authored: Mon Jul 14 17:08:17 2014 -0700 Committer: Siddharth Seth Committed: Mon Jul 14 17:08:17 2014 -0700 ---------------------------------------------------------------------- CHANGES.txt | 6 +- .../org/apache/tez/common/TezJobConfig.java | 66 +--- .../tez/mapreduce/examples/MRRSleepJob.java | 30 +- .../mapreduce/examples/OrderedWordCount.java | 19 +- .../apache/tez/mapreduce/client/YARNRunner.java | 40 +-- .../tez/mapreduce/hadoop/DeprecatedKeys.java | 78 +---- .../apache/tez/mapreduce/hadoop/MRHelpers.java | 80 ++++- .../hadoop/MultiStageMRConfToTezTranslator.java | 307 +------------------ .../hadoop/MultiStageMRConfigUtil.java | 150 +-------- .../org/apache/tez/mapreduce/input/MRInput.java | 3 +- .../apache/tez/mapreduce/output/MROutput.java | 4 +- .../hadoop/TestConfigTranslationMRToTez.java | 110 +------ .../mapreduce/hadoop/TestDeprecatedKeys.java | 16 +- .../processor/map/TestMapProcessor.java | 20 +- .../processor/reduce/TestReduceProcessor.java | 40 +-- .../tez/runtime/library/common/ConfigUtils.java | 22 +- .../conf/OnFileSortedOutputConfiguration.java | 10 +- .../OnFileUnorderedKVOutputConfiguration.java | 8 +- ...orderedPartitionedKVOutputConfiguration.java | 8 +- .../OrderedPartitionedKVEdgeConfiguration.java | 3 +- .../conf/ShuffledMergedInputConfiguration.java | 42 ++- .../ShuffledUnorderedKVInputConfiguration.java | 8 +- .../library/input/ShuffledMergedInput.java | 12 +- .../library/input/ShuffledUnorderedKVInput.java | 8 +- .../library/output/OnFileSortedOutput.java | 10 +- .../library/output/OnFileUnorderedKVOutput.java | 8 +- .../OnFileUnorderedPartitionedKVOutput.java | 8 +- .../TestUnorderedPartitionedKVWriter.java | 8 +- .../TestOnFileSortedOutputConfiguration.java | 29 +- ...estOnFileUnorderedKVOutputConfiguration.java | 19 +- .../TestOnFileUnorderedPartitionedKVOutput.java | 23 +- ...stOrderedPartitionedKVEdgeConfiguration.java | 38 ++- .../TestShuffledMergedInputConfiguration.java | 23 +- ...stShuffledUnorderedKVInputConfiguration.java | 19 +- ...UnorderedPartitionedKVEdgeConfiguration.java | 12 +- ...orderedUnpartitionedKVEdgeConfiguration.java | 12 +- .../output/TestOnFileUnorderedKVOutput.java | 5 +- .../src/test/resources/tez-site.xml | 6 +- .../apache/tez/mapreduce/TestMRRJobsDAGApi.java | 13 +- 39 files changed, 371 insertions(+), 952 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/97805278/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index ea0447f..d32ff1c 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -4,8 +4,6 @@ Apache Tez Change Log Release 0.5.0-incubating: Unreleased INCOMPATIBLE CHANGES - TEZ-1213. Fix parameter naming in TezJobConfig. - - Details at https://issues.apache.org/jira/browse/TEZ-1213?focusedCommentId=14039381&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-14039381 TEZ-960. VertexManagerPluginContext::getTotalAVailableResource() changed to VertexManagerPluginContext::getTotalAvailableResource() TEZ-1025. Rename tez.am.max.task.attempts to tez.am.task.max.failed.attempts @@ -17,6 +15,10 @@ INCOMPATIBLE CHANGES config TEZ-692. Unify job submission in either TezClient or TezSession TEZ-1130. Replace confusing names on Vertex API + TEZ-1213. Fix parameter naming in TezJobConfig. + - Details at https://issues.apache.org/jira/browse/TEZ-1213?focusedCommentId=14039381&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-14039381 + TEZ-1272. Change YARNRunner to use EdgeConfigs. + - Removes separation of runtime configs into input/ouput configs. Also refactors public methods used for this conversion. Release 0.4.0-incubating: 2014-04-05 http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/97805278/tez-api/src/main/java/org/apache/tez/common/TezJobConfig.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/common/TezJobConfig.java b/tez-api/src/main/java/org/apache/tez/common/TezJobConfig.java index dc8aca7..b36e789 100644 --- a/tez-api/src/main/java/org/apache/tez/common/TezJobConfig.java +++ b/tez-api/src/main/java/org/apache/tez/common/TezJobConfig.java @@ -257,51 +257,20 @@ public class TezJobConfig { public static final String TEZ_RUNTIME_INTERNAL_SORTER_CLASS = TEZ_RUNTIME_PREFIX + "internal.sorter.class"; - public static final String TEZ_RUNTIME_INTERMEDIATE_OUTPUT_KEY_COMPARATOR_CLASS = - TEZ_RUNTIME_PREFIX + - "intermediate-output.key.comparator.class"; - public static final String TEZ_RUNTIME_INTERMEDIATE_INPUT_KEY_COMPARATOR_CLASS = - TEZ_RUNTIME_PREFIX + - "intermediate-input.key.comparator.class"; - - public static final String TEZ_RUNTIME_INTERMEDIATE_OUTPUT_KEY_CLASS = TEZ_RUNTIME_PREFIX + - "intermediate-output.key.class"; - public static final String TEZ_RUNTIME_INTERMEDIATE_INPUT_KEY_CLASS = TEZ_RUNTIME_PREFIX + - "intermediate-input.key.class"; + public static final String TEZ_RUNTIME_KEY_COMPARATOR_CLASS = + TEZ_RUNTIME_PREFIX + "key.comparator.class"; - public static final String TEZ_RUNTIME_INTERMEDIATE_OUTPUT_VALUE_CLASS = TEZ_RUNTIME_PREFIX + - "intermediate-output.value.class"; - public static final String TEZ_RUNTIME_INTERMEDIATE_INPUT_VALUE_CLASS = TEZ_RUNTIME_PREFIX + - "intermediate-input.value.class"; + public static final String TEZ_RUNTIME_KEY_CLASS = TEZ_RUNTIME_PREFIX + "key.class"; + public static final String TEZ_RUNTIME_VALUE_CLASS = TEZ_RUNTIME_PREFIX + "value.class"; - /** - * Whether intermediate output should be compressed or not - */ - public static final String TEZ_RUNTIME_INTERMEDIATE_OUTPUT_SHOULD_COMPRESS = TEZ_RUNTIME_PREFIX + - "intermediate-output.should-compress"; - /** - * Whether intermediate input is compressed - */ - public static final String TEZ_RUNTIME_INTERMEDIATE_INPUT_IS_COMPRESSED = TEZ_RUNTIME_PREFIX + - "intermediate-input.is-compressed"; - /** - * The coded to be used if compressing intermediate output. Only applicable if - * tez.runtime.intermediate-output.should-compress is enabled. - */ - public static final String TEZ_RUNTIME_INTERMEDIATE_OUTPUT_COMPRESS_CODEC = TEZ_RUNTIME_PREFIX + - "intermediate-output.compress.codec"; - /** - * The coded to be used when reading intermediate compressed input. Only - * applicable if tez.runtime.intermediate-input.is-compressed is enabled. - */ - public static final String TEZ_RUNTIME_INTERMEDIATE_INPUT_COMPRESS_CODEC = TEZ_RUNTIME_PREFIX + - "intermediate-input.compress.codec"; + public static final String TEZ_RUNTIME_COMPRESS = TEZ_RUNTIME_PREFIX + "compress"; + public static final String TEZ_RUNTIME_COMPRESS_CODEC = TEZ_RUNTIME_PREFIX + "compress.codec"; - public static final String TEZ_RUNTIME_INTERMEDIATE_INPUT_KEY_SECONDARY_COMPARATOR_CLASS = - TEZ_RUNTIME_PREFIX + - "intermediate-input.key.secondary.comparator.class"; + // TODO Move this key to MapReduce + public static final String TEZ_RUNTIME_KEY_SECONDARY_COMPARATOR_CLASS = + TEZ_RUNTIME_PREFIX + "key.secondary.comparator.class"; public static final String TEZ_RUNTIME_EMPTY_PARTITION_INFO_VIA_EVENTS_ENABLED = TEZ_RUNTIME_PREFIX + @@ -430,17 +399,12 @@ public class TezJobConfig { tezRuntimeKeys.add(TEZ_RUNTIME_INPUT_BUFFER_PERCENT); tezRuntimeKeys.add(TEZ_RUNTIME_GROUP_COMPARATOR_CLASS); tezRuntimeKeys.add(TEZ_RUNTIME_INTERNAL_SORTER_CLASS); - tezRuntimeKeys.add(TEZ_RUNTIME_INTERMEDIATE_OUTPUT_KEY_COMPARATOR_CLASS); - tezRuntimeKeys.add(TEZ_RUNTIME_INTERMEDIATE_INPUT_KEY_COMPARATOR_CLASS); - tezRuntimeKeys.add(TEZ_RUNTIME_INTERMEDIATE_OUTPUT_KEY_CLASS); - tezRuntimeKeys.add(TEZ_RUNTIME_INTERMEDIATE_INPUT_KEY_CLASS); - tezRuntimeKeys.add(TEZ_RUNTIME_INTERMEDIATE_OUTPUT_VALUE_CLASS); - tezRuntimeKeys.add(TEZ_RUNTIME_INTERMEDIATE_INPUT_VALUE_CLASS); - tezRuntimeKeys.add(TEZ_RUNTIME_INTERMEDIATE_OUTPUT_SHOULD_COMPRESS); - tezRuntimeKeys.add(TEZ_RUNTIME_INTERMEDIATE_INPUT_IS_COMPRESSED); - tezRuntimeKeys.add(TEZ_RUNTIME_INTERMEDIATE_OUTPUT_COMPRESS_CODEC); - tezRuntimeKeys.add(TEZ_RUNTIME_INTERMEDIATE_INPUT_COMPRESS_CODEC); - tezRuntimeKeys.add(TEZ_RUNTIME_INTERMEDIATE_INPUT_KEY_SECONDARY_COMPARATOR_CLASS); + tezRuntimeKeys.add(TEZ_RUNTIME_KEY_COMPARATOR_CLASS); + tezRuntimeKeys.add(TEZ_RUNTIME_KEY_CLASS); + tezRuntimeKeys.add(TEZ_RUNTIME_VALUE_CLASS); + tezRuntimeKeys.add(TEZ_RUNTIME_COMPRESS); + tezRuntimeKeys.add(TEZ_RUNTIME_COMPRESS_CODEC); + tezRuntimeKeys.add(TEZ_RUNTIME_KEY_SECONDARY_COMPARATOR_CLASS); tezRuntimeKeys.add(TEZ_RUNTIME_EMPTY_PARTITION_INFO_VIA_EVENTS_ENABLED); tezRuntimeKeys.add(TEZ_RUNTIME_BROADCAST_DATA_VIA_EVENTS_ENABLED); tezRuntimeKeys.add(TEZ_RUNTIME_BROADCAST_DATA_VIA_EVENTS_MAX_SIZE); http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/97805278/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java index fa76bca..01c6d51 100644 --- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java +++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java @@ -75,7 +75,6 @@ import org.apache.tez.mapreduce.common.MRInputAMSplitGenerator; import org.apache.tez.mapreduce.common.MRInputSplitDistributor; import org.apache.tez.mapreduce.hadoop.MRHelpers; import org.apache.tez.mapreduce.hadoop.InputSplitInfo; -import org.apache.tez.mapreduce.hadoop.MultiStageMRConfToTezTranslator; import org.apache.tez.mapreduce.hadoop.MultiStageMRConfigUtil; import org.apache.tez.mapreduce.partition.MRPartitioner; import org.apache.tez.mapreduce.processor.map.MapProcessor; @@ -440,8 +439,7 @@ public class MRRSleepJob extends Configured implements Tool { NullOutputFormat.class.getName()); } - MultiStageMRConfToTezTranslator.translateVertexConfToTez(mapStageConf, - null); + MRHelpers.translateVertexConfToTez(mapStageConf); Configuration[] intermediateReduceStageConfs = null; if (iReduceStagesCount > 0 @@ -461,14 +459,8 @@ public class MRRSleepJob extends Configured implements Tool { iReduceStageConf.set(MRJobConfig.PARTITIONER_CLASS_ATTR, MRRSleepJobPartitioner.class.getName()); - if (i == 1) { - MultiStageMRConfToTezTranslator.translateVertexConfToTez( - iReduceStageConf, mapStageConf); - } - else { - MultiStageMRConfToTezTranslator.translateVertexConfToTez( - iReduceStageConf, intermediateReduceStageConfs[i-2]); - } + + MRHelpers.translateVertexConfToTez(iReduceStageConf); intermediateReduceStageConfs[i-1] = iReduceStageConf; } } @@ -487,14 +479,7 @@ public class MRRSleepJob extends Configured implements Tool { finalReduceConf.set(MRJobConfig.OUTPUT_FORMAT_CLASS_ATTR, NullOutputFormat.class.getName()); - if (iReduceStagesCount > 0 - && numIReducer > 0) { - MultiStageMRConfToTezTranslator.translateVertexConfToTez(finalReduceConf, - intermediateReduceStageConfs[iReduceStagesCount-1]); - } else { - MultiStageMRConfToTezTranslator.translateVertexConfToTez(finalReduceConf, - mapStageConf); - } + MRHelpers.translateVertexConfToTez(finalReduceConf); } MRHelpers.doJobClientMagic(mapStageConf); @@ -634,13 +619,14 @@ public class MRRSleepJob extends Configured implements Tool { partitionerConf.set(MRJobConfig.PARTITIONER_CLASS_ATTR, MRRSleepJobPartitioner.class.getName()); OrderedPartitionedKVEdgeConfiguration edgeConf = OrderedPartitionedKVEdgeConfiguration .newBuilder(IntWritable.class.getName(), IntWritable.class.getName()).configureOutput( - MRPartitioner.class.getName(), partitionerConf).done().build(); + MRPartitioner.class.getName(), partitionerConf).done().configureInput().useLegacyInput() + .done().build(); for (int i = 0; i < vertices.size(); ++i) { dag.addVertex(vertices.get(i)); if (i != 0) { - dag.addEdge(new Edge(vertices.get(i-1), - vertices.get(i), edgeConf.createDefaultEdgeProperty())); + dag.addEdge( + new Edge(vertices.get(i - 1), vertices.get(i), edgeConf.createDefaultEdgeProperty())); } } http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/97805278/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/OrderedWordCount.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/OrderedWordCount.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/OrderedWordCount.java index 6e4f22c..9ed6294 100644 --- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/OrderedWordCount.java +++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/OrderedWordCount.java @@ -71,7 +71,6 @@ import org.apache.tez.mapreduce.examples.helpers.SplitsInClientOptionParser; import org.apache.tez.mapreduce.hadoop.InputSplitInfo; import org.apache.tez.mapreduce.hadoop.MRHelpers; import org.apache.tez.mapreduce.hadoop.MRJobConfig; -import org.apache.tez.mapreduce.hadoop.MultiStageMRConfToTezTranslator; import org.apache.tez.mapreduce.processor.map.MapProcessor; import org.apache.tez.mapreduce.processor.reduce.ReduceProcessor; import org.apache.tez.runtime.api.TezRootInputInitializer; @@ -168,8 +167,7 @@ public class OrderedWordCount extends Configured implements Tool { mapStageConf.setInt(MRJobConfig.NUM_MAPS, inputSplitInfo.getNumTasks()); } - MultiStageMRConfToTezTranslator.translateVertexConfToTez(mapStageConf, - null); + MRHelpers.translateVertexConfToTez(mapStageConf); Configuration iReduceStageConf = new JobConf(conf); iReduceStageConf.setInt(MRJobConfig.NUM_REDUCES, 2); // TODO NEWTEZ - NOT NEEDED NOW??? @@ -181,8 +179,7 @@ public class OrderedWordCount extends Configured implements Tool { Text.class.getName()); iReduceStageConf.setBoolean("mapred.mapper.new-api", true); - MultiStageMRConfToTezTranslator.translateVertexConfToTez(iReduceStageConf, - mapStageConf); + MRHelpers.translateVertexConfToTez(iReduceStageConf); Configuration finalReduceConf = new JobConf(conf); finalReduceConf.setInt(MRJobConfig.NUM_REDUCES, 1); @@ -193,8 +190,7 @@ public class OrderedWordCount extends Configured implements Tool { finalReduceConf.set(FileOutputFormat.OUTDIR, outputPath); finalReduceConf.setBoolean("mapred.mapper.new-api", true); - MultiStageMRConfToTezTranslator.translateVertexConfToTez(finalReduceConf, - iReduceStageConf); + MRHelpers.translateVertexConfToTez(finalReduceConf); MRHelpers.doJobClientMagic(mapStageConf); MRHelpers.doJobClientMagic(iReduceStageConf); @@ -245,14 +241,15 @@ public class OrderedWordCount extends Configured implements Tool { OrderedPartitionedKVEdgeConfiguration edgeConf = OrderedPartitionedKVEdgeConfiguration .newBuilder(IntWritable.class.getName(), Text.class.getName()).configureOutput( - HashPartitioner.class.getName(), null).done().build(); + HashPartitioner.class.getName(), + null).done().configureInput().useLegacyInput().done().build(); DAG dag = new DAG("OrderedWordCount" + dagIndex); for (int i = 0; i < vertices.size(); ++i) { dag.addVertex(vertices.get(i)); if (i != 0) { - dag.addEdge(new Edge(vertices.get(i - 1), - vertices.get(i), edgeConf.createDefaultEdgeProperty())); + dag.addEdge( + new Edge(vertices.get(i - 1), vertices.get(i), edgeConf.createDefaultEdgeProperty())); } } return dag; @@ -272,7 +269,7 @@ public class OrderedWordCount extends Configured implements Tool { Configuration conf = getConf(); String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); - boolean generateSplitsInClient = false; + boolean generateSplitsInClient; SplitsInClientOptionParser splitCmdLineParser = new SplitsInClientOptionParser(); try { http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/97805278/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/client/YARNRunner.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/client/YARNRunner.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/client/YARNRunner.java index 90f3055..536049a 100644 --- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/client/YARNRunner.java +++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/client/YARNRunner.java @@ -78,14 +78,9 @@ import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.hadoop.yarn.util.Records; import org.apache.tez.client.TezClient; +import org.apache.tez.common.TezJobConfig; import org.apache.tez.dag.api.DAG; import org.apache.tez.dag.api.Edge; -import org.apache.tez.dag.api.EdgeProperty; -import org.apache.tez.dag.api.EdgeProperty.DataMovementType; -import org.apache.tez.dag.api.EdgeProperty.DataSourceType; -import org.apache.tez.dag.api.EdgeProperty.SchedulingType; -import org.apache.tez.dag.api.InputDescriptor; -import org.apache.tez.dag.api.OutputDescriptor; import org.apache.tez.dag.api.ProcessorDescriptor; import org.apache.tez.dag.api.TezConfiguration; import org.apache.tez.dag.api.TezException; @@ -100,10 +95,10 @@ import org.apache.tez.mapreduce.hadoop.MRHelpers; import org.apache.tez.mapreduce.hadoop.MRJobConfig; import org.apache.tez.mapreduce.hadoop.MultiStageMRConfToTezTranslator; import org.apache.tez.mapreduce.hadoop.MultiStageMRConfigUtil; +import org.apache.tez.mapreduce.partition.MRPartitioner; import org.apache.tez.mapreduce.processor.map.MapProcessor; import org.apache.tez.mapreduce.processor.reduce.ReduceProcessor; -import org.apache.tez.runtime.library.input.ShuffledMergedInputLegacy; -import org.apache.tez.runtime.library.output.OnFileSortedOutput; +import org.apache.tez.runtime.library.conf.OrderedPartitionedKVEdgeConfiguration; import com.google.common.annotations.VisibleForTesting; @@ -475,14 +470,17 @@ public class YARNRunner implements ClientProtocol { for (int i = 0; i < vertices.length; i++) { dag.addVertex(vertices[i]); if (i > 0) { - EdgeProperty edgeProperty = new EdgeProperty( - DataMovementType.SCATTER_GATHER, DataSourceType.PERSISTED, - SchedulingType.SEQUENTIAL, - new OutputDescriptor(OnFileSortedOutput.class.getName()), - new InputDescriptor(ShuffledMergedInputLegacy.class.getName())); - - Edge edge = null; - edge = new Edge(vertices[i - 1], vertices[i], edgeProperty); + // Set edge conf based on Input conf (compression etc properties for MapReduce are + // w.r.t Outputs - MAP_OUTPUT_COMPRESS for example) + OrderedPartitionedKVEdgeConfiguration edgeConf = + OrderedPartitionedKVEdgeConfiguration.newBuilder(stageConfs[i - 1].get( + TezJobConfig.TEZ_RUNTIME_KEY_CLASS), + stageConfs[i - 1].get(TezJobConfig.TEZ_RUNTIME_VALUE_CLASS)) + .configureOutput( + MRPartitioner.class.getName(), stageConfs[i - 1]).done() + .configureInput().useLegacyInput().done() + .setFromConfiguration(stageConfs[i - 1]).build(); + Edge edge = new Edge(vertices[i-1], vertices[i], edgeConf.createDefaultEdgeProperty()); dag.addEdge(edge); } @@ -535,15 +533,11 @@ public class YARNRunner implements ClientProtocol { JobConf jobConf = new JobConf(new TezConfiguration(conf)); // Extract individual raw MR configs. - Configuration[] stageConfs = MultiStageMRConfToTezTranslator - .getStageConfs(jobConf); + Configuration[] stageConfs = MultiStageMRConfToTezTranslator.getStageConfs(jobConf); // Transform all confs to use Tez keys - MultiStageMRConfToTezTranslator.translateVertexConfToTez(stageConfs[0], - null); - for (int i = 1; i < stageConfs.length; i++) { - MultiStageMRConfToTezTranslator.translateVertexConfToTez(stageConfs[i], - stageConfs[i - 1]); + for (int i = 0; i < stageConfs.length; i++) { + MRHelpers.translateVertexConfToTez(stageConfs[i]); } // create inputs to tezClient.submit() http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/97805278/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/DeprecatedKeys.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/DeprecatedKeys.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/DeprecatedKeys.java index 90a5142..6104b0e 100644 --- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/DeprecatedKeys.java +++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/DeprecatedKeys.java @@ -27,8 +27,6 @@ import org.apache.tez.common.TezJobConfig; import org.apache.tez.dag.api.TezConfiguration; import org.apache.tez.runtime.library.common.Constants; -import com.google.common.collect.Maps; - public class DeprecatedKeys { @@ -39,17 +37,6 @@ public class DeprecatedKeys { */ private static Map mrParamToDAGParamMap = new HashMap(); - - public static enum MultiStageKeys { - INPUT, OUTPUT - } - /** - * Keys which are used across an edge. i.e. by an Output-Input pair. - */ - private static Map> multiStageParamMap = - new HashMap>(); - - /** * Keys used by the Tez Runtime. */ @@ -61,51 +48,9 @@ public class DeprecatedKeys { static { populateMRToTezRuntimeParamMap(); populateMRToDagParamMap(); - populateMultiStageParamMap(); addDeprecatedKeys(); } - - - private static void populateMultiStageParamMap() { - - multiStageParamMap.put( - MRJobConfig.KEY_COMPARATOR, - getDeprecationMap( - TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_KEY_COMPARATOR_CLASS, - TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_KEY_COMPARATOR_CLASS)); - - multiStageParamMap.put( - MRJobConfig.MAP_OUTPUT_KEY_CLASS, - getDeprecationMap( - TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_KEY_CLASS, - TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_KEY_CLASS)); - - multiStageParamMap.put( - MRJobConfig.MAP_OUTPUT_VALUE_CLASS, - getDeprecationMap( - TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_VALUE_CLASS, - TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_VALUE_CLASS)); - - multiStageParamMap.put( - MRJobConfig.MAP_OUTPUT_COMPRESS, - getDeprecationMap( - TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_IS_COMPRESSED, - TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_SHOULD_COMPRESS)); - - multiStageParamMap.put( - MRJobConfig.MAP_OUTPUT_COMPRESS_CODEC, - getDeprecationMap( - TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_COMPRESS_CODEC, - TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_COMPRESS_CODEC)); - } - - private static Map getDeprecationMap(String inputKey, String outputKey) { - Map m = Maps.newEnumMap(MultiStageKeys.class); - m.put(MultiStageKeys.INPUT, inputKey); - m.put(MultiStageKeys.OUTPUT, outputKey); - return m; - } - + private static void populateMRToDagParamMap() { // TODO Default value handling. mrParamToDAGParamMap.put(MRJobConfig.MR_AM_TASK_LISTENER_THREAD_COUNT, @@ -192,9 +137,20 @@ public class DeprecatedKeys { registerMRToRuntimeKeyTranslation(MRJobConfig.GROUP_COMPARATOR_CLASS, TezJobConfig.TEZ_RUNTIME_GROUP_COMPARATOR_CLASS); - registerMRToRuntimeKeyTranslation(MRJobConfig.GROUP_COMPARATOR_CLASS, TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_KEY_SECONDARY_COMPARATOR_CLASS); + registerMRToRuntimeKeyTranslation(MRJobConfig.GROUP_COMPARATOR_CLASS, TezJobConfig.TEZ_RUNTIME_KEY_SECONDARY_COMPARATOR_CLASS); - registerMRToRuntimeKeyTranslation(TezJobConfig.TEZ_CREDENTIALS_PATH, MRJobConfig.MAPREDUCE_JOB_CREDENTIALS_BINARY); + registerMRToRuntimeKeyTranslation(MRJobConfig.MAPREDUCE_JOB_CREDENTIALS_BINARY, TezJobConfig.TEZ_CREDENTIALS_PATH); + + registerMRToRuntimeKeyTranslation(MRJobConfig.KEY_COMPARATOR, TezJobConfig.TEZ_RUNTIME_KEY_COMPARATOR_CLASS); + + registerMRToRuntimeKeyTranslation(MRJobConfig.MAP_OUTPUT_KEY_CLASS, TezJobConfig.TEZ_RUNTIME_KEY_CLASS); + + registerMRToRuntimeKeyTranslation(MRJobConfig.MAP_OUTPUT_VALUE_CLASS, TezJobConfig.TEZ_RUNTIME_VALUE_CLASS); + + registerMRToRuntimeKeyTranslation(MRJobConfig.MAP_OUTPUT_COMPRESS, TezJobConfig.TEZ_RUNTIME_COMPRESS); + + registerMRToRuntimeKeyTranslation(MRJobConfig.MAP_OUTPUT_COMPRESS_CODEC, TezJobConfig.TEZ_RUNTIME_COMPRESS_CODEC); + } private static void addDeprecatedKeys() { @@ -217,10 +173,4 @@ public class DeprecatedKeys { public static Map getMRToTezRuntimeParamMap() { return Collections.unmodifiableMap(mrParamToTezRuntimeParamMap); } - - // TODO Ideally, multi-stage should not be exposed. - public static Map> getMultiStageParamMap() { - return Collections.unmodifiableMap(multiStageParamMap); - } - } http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/97805278/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRHelpers.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRHelpers.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRHelpers.java index 7f30e7f..f0eda1f 100644 --- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRHelpers.java +++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRHelpers.java @@ -30,8 +30,6 @@ import java.util.List; import java.util.Map; import java.util.Vector; -import javax.annotation.Nullable; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate; @@ -99,6 +97,84 @@ public class MRHelpers { "job.splitmetainfo"; /** + * Translates MR keys to Tez for the provided vertex conf. The conversion is + * done in place. + * + * @param conf + * Configuration for the vertex being configured. + */ + @LimitedPrivate("Hive, Pig") + @Unstable + public static void translateVertexConfToTez(Configuration conf) { + convertVertexConfToTez(conf); + } + + private static void convertVertexConfToTez(Configuration vertexConf) { + setStageKeysFromBaseConf(vertexConf, vertexConf, "unknown"); + processDirectConversion(vertexConf); + } + + /** + * Pulls in specific keys from the base configuration, if they are not set at + * the stage level. An explicit list of keys is copied over (not all), which + * require translation to tez keys. + */ + private static void setStageKeysFromBaseConf(Configuration conf, + Configuration baseConf, String stage) { + JobConf jobConf = new JobConf(baseConf); + // Don't clobber explicit tez config. + if (conf.get(TezJobConfig.TEZ_RUNTIME_KEY_CLASS) == null) { + // If this is set, but the comparator is not set, and their types differ - + // the job will break. + if (conf.get(MRJobConfig.MAP_OUTPUT_KEY_CLASS) == null) { + // Pull this in from the baseConf + conf.set(MRJobConfig.MAP_OUTPUT_KEY_CLASS, jobConf + .getMapOutputKeyClass().getName()); + + if (LOG.isDebugEnabled()) { + LOG.debug("Setting " + MRJobConfig.MAP_OUTPUT_KEY_CLASS + + " for stage: " + stage + + " based on job level configuration. Value: " + + conf.get(MRJobConfig.MAP_OUTPUT_KEY_CLASS)); + } + } + } + + if (conf.get(TezJobConfig.TEZ_RUNTIME_VALUE_CLASS) == null) { + if (conf.get(MRJobConfig.MAP_OUTPUT_VALUE_CLASS) == null) { + conf.set(MRJobConfig.MAP_OUTPUT_VALUE_CLASS, jobConf + .getMapOutputValueClass().getName()); + if (LOG.isDebugEnabled()) { + LOG.debug("Setting " + MRJobConfig.MAP_OUTPUT_VALUE_CLASS + + " for stage: " + stage + + " based on job level configuration. Value: " + + conf.get(MRJobConfig.MAP_OUTPUT_VALUE_CLASS)); + } + } + } + } + + private static void processDirectConversion(Configuration conf) { + for (Map.Entry dep : DeprecatedKeys.getMRToTezRuntimeParamMap().entrySet()) { + if (conf.get(dep.getKey()) != null) { + // TODO Deprecation reason does not seem to reflect in the config ? + // The ordering is important in case of keys which are also deprecated. + // Unset will unset the deprecated keys and all it's variants. + final String mrValue = conf.get(dep.getKey()); + final String tezValue = conf.get(dep.getValue()); + conf.unset(dep.getKey()); + if (tezValue == null) { + conf.set(dep.getValue(), mrValue, "TRANSLATED_TO_TEZ"); + } + if (LOG.isDebugEnabled()) { + LOG.debug("Config: mr(unset):" + dep.getKey() + ", mr initial value=" + + mrValue + ", tez:" + dep.getValue() + "=" + conf.get(dep.getValue())); + } + } + } + } + + /** * Comparator for org.apache.hadoop.mapreduce.InputSplit */ private static class InputSplitComparator http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/97805278/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MultiStageMRConfToTezTranslator.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MultiStageMRConfToTezTranslator.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MultiStageMRConfToTezTranslator.java index 3e1980c..a9f3c3f 100644 --- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MultiStageMRConfToTezTranslator.java +++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MultiStageMRConfToTezTranslator.java @@ -18,193 +18,15 @@ package org.apache.tez.mapreduce.hadoop; -import java.util.Map; -import java.util.Map.Entry; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate; import org.apache.hadoop.classification.InterfaceAudience.Private; -import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.mapred.JobConf; import org.apache.tez.common.TezJobConfig; import org.apache.tez.mapreduce.combine.MRCombiner; -import org.apache.tez.mapreduce.hadoop.DeprecatedKeys.MultiStageKeys; import org.apache.tez.mapreduce.partition.MRPartitioner; -import com.google.common.base.Preconditions; public class MultiStageMRConfToTezTranslator { - private static final Log LOG = LogFactory.getLog(MultiStageMRConfToTezTranslator.class); - - private enum DeprecationReason { - DEPRECATED_DIRECT_TRANSLATION, DEPRECATED_MULTI_STAGE - } - - // FIXME Add unit tests. - // This will convert configs to tez.. for - // properties which it understands. Doing this for the initial and final task - // as well to verify functionality. - // - - /** - * Converts a single completely configured MR* job to something that can be - * understood by the Tez MR runtime. - * - * @param srcConf - * the configuration for the entire MR* job. Configs for the first - * and last stage are expected to be set at root level. Configs for - * intermediate stages will be prefixed with the stage number. - * @return A translated MR* config with keys translated over to Tez. - */ - // TODO Set the cause properly. - public static Configuration convertMRToLinearTez(Configuration srcConf) { - Configuration newConf = new Configuration(srcConf); - - int numIntermediateStages = MultiStageMRConfigUtil - .getNumIntermediateStages(srcConf); - boolean hasFinalReduceStage = (srcConf.getInt(MRJobConfig.NUM_REDUCES, 0) > 0); - - // Assuming no 0 map jobs, and the first stage is always a map. - int totalStages = numIntermediateStages + (hasFinalReduceStage ? 2 : 1); - int numEdges = totalStages - 1; - - Configuration[] allConfs = extractStageConfs(newConf, numEdges); - - for (int i = 0; i < allConfs.length; i++) { - setStageKeysFromBaseConf(allConfs[i], srcConf, Integer.toString(i)); - processDirectConversion(allConfs[i]); - } - for (int i = 0; i < allConfs.length - 1; i++) { - translateMultiStageWithSuccessor(allConfs[i], allConfs[i + 1]); - - } - // Unset unnecessary keys in the last stage. Will end up being called for - // single stage as well which should be harmless. - translateMultiStageWithSuccessor(allConfs[allConfs.length - 1], null); - - for (int i = 0; i < allConfs.length; i++) { - String vertexName; - if (i == 0) { - vertexName = MultiStageMRConfigUtil.getInitialMapVertexName(); - } else if (i == allConfs.length - 1) { - vertexName = MultiStageMRConfigUtil.getFinalReduceVertexName(); - } else { - // Intermediate vertices start at 1 - vertexName = MultiStageMRConfigUtil.getIntermediateStageVertexName(i); - } - MultiStageMRConfigUtil.addConfigurationForVertex(newConf, vertexName, - allConfs[i]); - } - - return newConf; - } - - - - /** - * Translates MR keys to Tez for the provided vertex conf. The conversion is - * done in place. - * - * This method should be called for each stage config of the MR* job. The call - * for the first vertex should set the predecessorConf as null. - * - * Since there's no separation of input / output params at the moment, the - * config generated by this can be set as the configuration for Tez - * Input/Output and Processor. - * - * @param conf - * Configuration for the vertex being configured. - * @param predecessorConf - * Configuration for the previous vertex in the MR* chain - */ - @LimitedPrivate("Hive, Pig") - @Unstable - public static void translateVertexConfToTez(Configuration conf, - Configuration predecessorConf) { - convertVertexConfToTez(conf, predecessorConf); - } - - /** - * Given a source and destination vertex, returns the config which should be - * used for the Output on this edge. The configs must be configured with tez - * keys - or run through translateVertexConfToTez. - * - * @param srcVertex - * The tez configuration for the source vertex. - * @param destVertex - * The tez configuration for the destination vertex. - * @return the output Configuration object for the edge - */ - @LimitedPrivate("Hive, Pig") - @Unstable - public static Configuration getOutputConfOnEdge(Configuration srcVertex, - Configuration destVertex) { - Preconditions.checkNotNull(srcVertex, "srcVertex cannot be null for an edge"); - Preconditions.checkNotNull(destVertex, "destVertex cannot be null for an edge"); - return srcVertex; - } - - /** - * Given a source and destination vertex, returns the config which should be - * used for the Input on this edge. The configs must be configured with tez - * keys - or run through translateVertexConfToTez. - * - * @param srcVertex - * The tez configuration for the source vertex. - * @param destVertex - * The tez configuration for the destination vertex. - * @return the input Configuration object for the edge - */ - @LimitedPrivate("Hive, Pig") - @Unstable - public static Configuration getInputConfOnEdge(Configuration srcVertex, - Configuration destVertex) { - Preconditions.checkNotNull(srcVertex, "srcVertex cannot be null for an edge"); - Preconditions.checkNotNull(destVertex, "destVertex cannot be null for an edge"); - return destVertex; - } - - private static void convertVertexConfToTez(Configuration vertexConf, - Configuration predecessorConf) { - setStageKeysFromBaseConf(vertexConf, vertexConf, "unknown"); - processDirectConversion(vertexConf); - translateMultiStageWithPredecessor(vertexConf, predecessorConf); - } - - /** - * Constructs a list containing individual configuration for each stage of the - * linear MR job, including the first map and last reduce if applicable. - * - * Generates basic configurations - i.e. without inheriting any keys from the - * top level conf. // TODO Validate this comment. - */ - private static Configuration[] extractStageConfs(Configuration conf, - int totalEdges) { - int numStages = totalEdges + 1; - Configuration confs[] = new Configuration[numStages]; - // TODO Make moer efficient instead of multiple scans. - Configuration nonIntermediateConf = MultiStageMRConfigUtil - .getAndRemoveBasicNonIntermediateStageConf(conf); - if (numStages == 1) { - confs[0] = nonIntermediateConf; - } else { - confs[0] = nonIntermediateConf; - confs[numStages - 1] = new Configuration(nonIntermediateConf); - } - if (numStages > 2) { - for (int i = 1; i < numStages - 1; i++) { - confs[i] = MultiStageMRConfigUtil - .getAndRemoveBasicIntermediateStageConf(conf, i); - } - } - return confs; - } - - - /** * Given a single base MRR config, returns a list of complete stage * configurations. @@ -260,131 +82,4 @@ public class MultiStageMRConfToTezTranslator { } return confs; } - - private static void processDirectConversion(Configuration conf) { - for (Entry dep : DeprecatedKeys.getMRToTezRuntimeParamMap() - .entrySet()) { - if (conf.get(dep.getKey()) != null) { - // TODO Deprecation reason does not seem to reflect in the config ? - // The ordering is important in case of keys which are also deprecated. - // Unset will unset the deprecated keys and all it's variants. - final String mrValue = conf.get(dep.getKey()); - final String tezValue = conf.get(dep.getValue()); - conf.unset(dep.getKey()); - if (tezValue == null) { - conf.set(dep.getValue(), mrValue, - DeprecationReason.DEPRECATED_DIRECT_TRANSLATION.name()); - } - if (LOG.isDebugEnabled()) { - LOG.debug("Config: mr(unset):" + dep.getKey() + ", mr initial value=" - + mrValue + ", tez:" + dep.getValue() + "=" + conf.get(dep.getValue())); - } - } - } - } - - /** - * Takes as parameters configurations for the vertex and it's predecessor - * (already translated to Tez). Modifies the vertex conf in place. - */ - private static void translateMultiStageWithPredecessor( - Configuration vertexConf, Configuration predecessorConf) { - Preconditions.checkNotNull(vertexConf, - "Configuration for vertex being translated cannot be null"); - for (Entry> dep : DeprecatedKeys - .getMultiStageParamMap().entrySet()) { - if (vertexConf.get(dep.getKey()) != null) { - String value = vertexConf.get(dep.getKey()); - vertexConf.unset(dep.getKey()); - if (vertexConf.get(dep.getValue().get(MultiStageKeys.OUTPUT)) == null) { - vertexConf.set(dep.getValue().get(MultiStageKeys.OUTPUT), value, - DeprecationReason.DEPRECATED_MULTI_STAGE.name()); - if (LOG.isDebugEnabled()) { - LOG.debug("Config: mr(unset):" + dep.getKey() + ", mr initial value=" - + value + ", tez:" + dep.getValue().get(MultiStageKeys.OUTPUT) + "=" - + vertexConf.get(dep.getValue().get(MultiStageKeys.OUTPUT))); - } - } - } - // Set keys from the predecessor conf. - if (predecessorConf != null) { - String expPredecessorKey = dep.getValue().get(MultiStageKeys.OUTPUT); - if (predecessorConf.get(expPredecessorKey) != null) { - String value = predecessorConf.get(expPredecessorKey); - vertexConf.set(dep.getValue().get(MultiStageKeys.INPUT), value); - } - } - } - } - - /** - * Takes as parameters configurations for the vertex and it's successor. - * Modifies both in place. - */ - private static void translateMultiStageWithSuccessor(Configuration srcVertexConf, - Configuration destVertexConf) { - // All MR keys which need such translation are specified at src - hence, - // this is ok. - // No key exists in which the map is inferring something based on the reduce - // value. - for (Entry> dep : DeprecatedKeys - .getMultiStageParamMap().entrySet()) { - if (srcVertexConf.get(dep.getKey()) != null) { - if (destVertexConf != null) { - String value = srcVertexConf.get(dep.getKey()); - srcVertexConf.unset(dep.getKey()); - srcVertexConf.set(dep.getValue().get(MultiStageKeys.OUTPUT), value, - DeprecationReason.DEPRECATED_MULTI_STAGE.name()); - destVertexConf.set(dep.getValue().get(MultiStageKeys.INPUT), value, - DeprecationReason.DEPRECATED_MULTI_STAGE.name()); - if (LOG.isDebugEnabled()) { - LOG.debug("Config: mr(unset):" + dep.getKey() + ", mr initial value=" - + value + ", tez:" + dep.getValue() + "=" - + destVertexConf.get(dep.getValue().get(MultiStageKeys.INPUT))); - } - } else { // Last stage. Just remove the key reference. - srcVertexConf.unset(dep.getKey()); - } - } - } - } - - /** - * Pulls in specific keys from the base configuration, if they are not set at - * the stage level. An explicit list of keys is copied over (not all), which - * require translation to tez keys. - */ - private static void setStageKeysFromBaseConf(Configuration conf, - Configuration baseConf, String stage) { - JobConf jobConf = new JobConf(baseConf); - // Don't clobber explicit tez config. - if (conf.get(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_KEY_CLASS) == null) { - // If this is set, but the comparator is not set, and their types differ - - // the job will break. - if (conf.get(MRJobConfig.MAP_OUTPUT_KEY_CLASS) == null) { - // Pull this in from the baseConf - conf.set(MRJobConfig.MAP_OUTPUT_KEY_CLASS, jobConf - .getMapOutputKeyClass().getName()); - if (LOG.isDebugEnabled()) { - LOG.debug("Setting " + MRJobConfig.MAP_OUTPUT_KEY_CLASS - + " for stage: " + stage - + " based on job level configuration. Value: " - + conf.get(MRJobConfig.MAP_OUTPUT_KEY_CLASS)); - } - } - } - - if (conf.get(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_VALUE_CLASS) == null) { - if (conf.get(MRJobConfig.MAP_OUTPUT_VALUE_CLASS) == null) { - conf.set(MRJobConfig.MAP_OUTPUT_VALUE_CLASS, jobConf - .getMapOutputValueClass().getName()); - if (LOG.isDebugEnabled()) { - LOG.debug("Setting " + MRJobConfig.MAP_OUTPUT_VALUE_CLASS - + " for stage: " + stage - + " based on job level configuration. Value: " - + conf.get(MRJobConfig.MAP_OUTPUT_VALUE_CLASS)); - } - } - } - } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/97805278/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MultiStageMRConfigUtil.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MultiStageMRConfigUtil.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MultiStageMRConfigUtil.java index 0a7a940..13e0b86 100644 --- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MultiStageMRConfigUtil.java +++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MultiStageMRConfigUtil.java @@ -24,62 +24,21 @@ import java.util.Map.Entry; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.conf.Configuration; +@Private public class MultiStageMRConfigUtil { ////////////////////////////////////////////////////////////////////////////// // Methods based on Stage Num // ////////////////////////////////////////////////////////////////////////////// - // Returns config settings specific to stage - public static Configuration getBasicIntermediateStageConf( - Configuration baseConf, int i) { - return getBasicIntermediateStageConfInternal(baseConf, - getPropertyNameForIntermediateStage(i, ""), false, true); - } - - // Returns and removes config settings specific to stage - public static Configuration getAndRemoveBasicIntermediateStageConf( - Configuration baseConf, int i) { - return getBasicIntermediateStageConfInternal(baseConf, - getPropertyNameForIntermediateStage(i, ""), true, true); - } - - // TODO Get rid of this once YARNRunner starts using VertexNames. - public static Configuration getIntermediateStageConf(Configuration baseConf, - int i) { - return getBasicIntermediateStageConfInternal(baseConf, - getPropertyNameForIntermediateStage(i, ""), false, false); - } - - // FIXME small perf hit. Change this to parse through all keys once and - // generate objects per - // stage instead of scanning through conf multiple times. - public static Configuration getAndRemoveBasicNonIntermediateStageConf( - Configuration baseConf) { - Configuration newConf = new Configuration(false); - for (String key : DeprecatedKeys.getMRToTezRuntimeParamMap().keySet()) { - if (baseConf.get(key) != null) { - newConf.set(key, baseConf.get(key)); - baseConf.unset(key); - } - } - - for (String key : DeprecatedKeys.getMultiStageParamMap().keySet()) { - if (baseConf.get(key) != null) { - newConf.set(key, baseConf.get(key)); - baseConf.unset(key); - } - } - return newConf; - } - - // TODO MRR FIXME based on conf format. + @Private public static int getNumIntermediateStages(Configuration conf) { return conf.getInt(MRJobConfig.MRR_INTERMEDIATE_STAGES, 0); } // TODO MRR FIXME based on conf format. // Intermediate stage numbers should start from 1. + @Private public static String getPropertyNameForIntermediateStage( int intermediateStage, String originalPropertyName) { return MRJobConfig.MRR_INTERMEDIATE_STAGE_PREFIX + intermediateStage + "." @@ -94,84 +53,20 @@ public class MultiStageMRConfigUtil { private static final String FINAL_REDUCE_VERTEX_NAME = "finalreduce"; private static final String INTERMEDIATE_TASK_VERTEX_NAME_PREFIX = "ivertex"; + @Private public static String getInitialMapVertexName() { return INITIAL_MAP_VERTEX_NAME; } - - public boolean isInitialMapVertex(String vertexName) { - return vertexName.equals(INITIAL_MAP_VERTEX_NAME); - } + @Private public static String getFinalReduceVertexName() { return FINAL_REDUCE_VERTEX_NAME; } - public boolean isFinalReduceVertex(String vertexName) { - return vertexName.equals(FINAL_REDUCE_VERTEX_NAME); - } - + @Private public static String getIntermediateStageVertexName(int stageNum) { return INTERMEDIATE_TASK_VERTEX_NAME_PREFIX + stageNum; } - - public static int getIntermediateStageNum(String vertexName) { - if (vertexName.matches(INTERMEDIATE_TASK_VERTEX_NAME_PREFIX + "\\d+")) { - return Integer.parseInt(vertexName - .substring(INTERMEDIATE_TASK_VERTEX_NAME_PREFIX.length())); - } else { - return -1; - } - } - - // Returns config settings specific to named vertex - public static Configuration getBasicConfForVertex(Configuration baseConf, - String vertexName) { - return getBasicIntermediateStageConfInternal(baseConf, - getPropertyNameForVertex(vertexName, ""), false, true); - } - - // Returns and removes config settings specific to named vertex - public static Configuration getAndRemoveBasicConfForVertex( - Configuration baseConf, String vertexName) { - return getBasicIntermediateStageConfInternal(baseConf, - getPropertyNameForVertex(vertexName, ""), true, true); - } - - // Returns a config with all parameters, and vertex specific params moved to - // the top level. - public static Configuration getConfForVertex(Configuration baseConf, - String vertexName) { - return getBasicIntermediateStageConfInternal(baseConf, - getPropertyNameForVertex(vertexName, ""), false, false); - } - - public static void addConfigurationForVertex(Configuration baseConf, - String vertexName, Configuration vertexConf) { - Iterator> confEntries = vertexConf.iterator(); - while (confEntries.hasNext()) { - Entry entry = confEntries.next(); - baseConf.set(getPropertyNameForVertex(vertexName, entry.getKey()), - entry.getValue()); - } - } - - // TODO This is TezEngineLand - public static String getPropertyNameForVertex(String vertexName, - String originalPropertyName) { - return MRJobConfig.MRR_VERTEX_PREFIX + vertexName + "." - + originalPropertyName; - } - - // TODO Get rid of this. Temporary for testing. - public static void printConf(Configuration conf) { - Iterator> confEntries = conf.iterator(); - while (confEntries.hasNext()) { - Entry entry = confEntries.next(); - String key = entry.getKey(); - String value = entry.getValue(); - System.err.println("Key: " + key + ", Value: " + value); - } - } @Private static Configuration extractStageConf(Configuration baseConf, @@ -209,37 +104,4 @@ public class MultiStageMRConfigUtil { } return conf; } - - // TODO MRR FIXME based on conf format. - private static Configuration getBasicIntermediateStageConfInternal( - Configuration baseConf, String prefix, boolean remove, boolean stageOnly) { - Configuration strippedConf = new Configuration(false); - Configuration conf = new Configuration(false); - Iterator> confEntries = baseConf.iterator(); - while (confEntries.hasNext()) { - Entry entry = confEntries.next(); - String key = entry.getKey(); - if (key.startsWith(prefix)) { - if (remove) { - baseConf.unset(key); - } - String newKey = key.replace(prefix, ""); - strippedConf.set(newKey, entry.getValue()); - } else if (!stageOnly) { - conf.set(key, entry.getValue()); - } - } - // Replace values from strippedConf into the finalConf. Override values - // which may have been copied over from the baseConf root level. - if (stageOnly) { - conf = strippedConf; - } else { - Iterator> entries = strippedConf.iterator(); - while (entries.hasNext()) { - Entry entry = entries.next(); - conf.set(entry.getKey(), entry.getValue()); - } - } - return conf; - } } http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/97805278/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java index e8fbf55..4e3e5e7 100644 --- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java +++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java @@ -35,7 +35,6 @@ import org.apache.tez.common.counters.TaskCounter; import org.apache.tez.dag.api.Vertex; import org.apache.tez.mapreduce.hadoop.MRHelpers; import org.apache.tez.mapreduce.hadoop.MRJobConfig; -import org.apache.tez.mapreduce.hadoop.MultiStageMRConfToTezTranslator; import org.apache.tez.mapreduce.input.base.MRInputBase; import org.apache.tez.mapreduce.lib.MRInputUtils; import org.apache.tez.mapreduce.lib.MRReader; @@ -115,7 +114,7 @@ public class MRInput extends MRInputBase { inputConf.set(MRJobConfig.INPUT_FORMAT_CLASS_ATTR, configInputFormatClassName); inputConf.setBoolean("mapred.mapper.new-api", useNewApi); - MultiStageMRConfToTezTranslator.translateVertexConfToTez(inputConf, null); + MRHelpers.translateVertexConfToTez(inputConf); MRHelpers.doJobClientMagic(inputConf); if (groupSplitsInAM) { return MRHelpers.createMRInputPayloadWithGrouping(inputConf, http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/97805278/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java index 987ca09..3cca35c 100644 --- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java +++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java @@ -43,7 +43,6 @@ import org.apache.tez.common.counters.TezCounter; import org.apache.tez.mapreduce.hadoop.MRConfig; import org.apache.tez.mapreduce.hadoop.MRHelpers; import org.apache.tez.mapreduce.hadoop.MRJobConfig; -import org.apache.tez.mapreduce.hadoop.MultiStageMRConfToTezTranslator; import org.apache.tez.mapreduce.hadoop.mapred.MRReporter; import org.apache.tez.mapreduce.hadoop.mapreduce.TaskAttemptContextImpl; import org.apache.tez.mapreduce.processor.MRTaskReporter; @@ -94,8 +93,7 @@ public class MROutput extends AbstractLogicalOutput { Configuration outputConf = new JobConf(conf); outputConf.set(MRJobConfig.OUTPUT_FORMAT_CLASS_ATTR, outputFormatName); outputConf.setBoolean("mapred.mapper.new-api", useNewApi); - MultiStageMRConfToTezTranslator.translateVertexConfToTez(outputConf, - null); + MRHelpers.translateVertexConfToTez(outputConf); MRHelpers.doJobClientMagic(outputConf); return TezUtils.createUserPayloadFromConf(outputConf); } http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/97805278/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/hadoop/TestConfigTranslationMRToTez.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/hadoop/TestConfigTranslationMRToTez.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/hadoop/TestConfigTranslationMRToTez.java index 5e3d201..3078e02 100644 --- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/hadoop/TestConfigTranslationMRToTez.java +++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/hadoop/TestConfigTranslationMRToTez.java @@ -19,15 +19,10 @@ package org.apache.tez.mapreduce.hadoop; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.io.ByteWritable; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; -import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.JobConf; import org.apache.tez.runtime.library.common.ConfigUtils; import org.junit.Test; @@ -40,122 +35,41 @@ public class TestConfigTranslationMRToTez { public void testComplexKeys() { JobConf confVertex1 = new JobConf(); - JobConf confVertex2 = new JobConf(); confVertex1.set(MRJobConfig.MAP_OUTPUT_KEY_CLASS, IntWritable.class.getName()); - confVertex2.set(MRJobConfig.MAP_OUTPUT_KEY_CLASS, ByteWritable.class.getName()); confVertex1.unset(MRJobConfig.KEY_COMPARATOR); confVertex1.unset(MRJobConfig.GROUP_COMPARATOR_CLASS); - confVertex2.unset(MRJobConfig.KEY_COMPARATOR); - confVertex2.unset(MRJobConfig.GROUP_COMPARATOR_CLASS); - - MultiStageMRConfToTezTranslator.translateVertexConfToTez(confVertex1, null); - MultiStageMRConfToTezTranslator.translateVertexConfToTez(confVertex2, - confVertex1); + MRHelpers.translateVertexConfToTez(confVertex1); + assertEquals(IntWritable.Comparator.class.getName(), ConfigUtils .getIntermediateOutputKeyComparator(confVertex1).getClass().getName()); assertEquals(IntWritable.Comparator.class.getName(), ConfigUtils - .getIntermediateInputKeyComparator(confVertex2).getClass().getName()); + .getIntermediateInputKeyComparator(confVertex1).getClass().getName()); } @Test - public void testMultiStageConversion() { + public void testMRToTezKeyTranslation() { JobConf confVertex1 = new JobConf(); confVertex1.set(MRJobConfig.MAP_OUTPUT_KEY_CLASS, IntWritable.class.getName()); confVertex1.set(MRJobConfig.MAP_OUTPUT_VALUE_CLASS, - IntWritable.class.getName()); - confVertex1.setBoolean(MRJobConfig.MAP_OUTPUT_COMPRESS, true); - - JobConf confVertex2 = new JobConf(); - confVertex2.set(MRJobConfig.MAP_OUTPUT_KEY_CLASS, - LongWritable.class.getName()); - confVertex2.set(MRJobConfig.MAP_OUTPUT_VALUE_CLASS, LongWritable.class.getName()); - confVertex2.setBoolean(MRJobConfig.MAP_OUTPUT_COMPRESS, false); - - JobConf confVertex3 = new JobConf(); - confVertex3.set(MRJobConfig.MAP_OUTPUT_KEY_CLASS, - ByteWritable.class.getName()); - confVertex3.set(MRJobConfig.MAP_OUTPUT_VALUE_CLASS, - ByteWritable.class.getName()); - confVertex3.setBoolean(MRJobConfig.MAP_OUTPUT_COMPRESS, true); - - JobConf confVertex4 = new JobConf(); - confVertex4.set(MRJobConfig.MAP_OUTPUT_KEY_CLASS, Text.class.getName()); - confVertex4.set(MRJobConfig.MAP_OUTPUT_VALUE_CLASS, Text.class.getName()); - - MultiStageMRConfToTezTranslator.translateVertexConfToTez(confVertex1, null); - MultiStageMRConfToTezTranslator.translateVertexConfToTez(confVertex2, - confVertex1); - MultiStageMRConfToTezTranslator.translateVertexConfToTez(confVertex3, - confVertex2); - MultiStageMRConfToTezTranslator.translateVertexConfToTez(confVertex4, - confVertex3); + confVertex1.setBoolean(MRJobConfig.MAP_OUTPUT_COMPRESS, true); - // Verify input params for first vertex. - assertFalse(ConfigUtils.isIntermediateInputCompressed(confVertex1)); - assertNull(ConfigUtils.getIntermediateInputKeyClass(confVertex1)); - assertNull(ConfigUtils.getIntermediateInputValueClass(confVertex1)); + MRHelpers.translateVertexConfToTez(confVertex1); - // Verify edge between v1 and v2 + // Verify translation assertEquals(IntWritable.class.getName(), ConfigUtils .getIntermediateOutputKeyClass(confVertex1).getName()); - assertEquals(IntWritable.class.getName(), ConfigUtils + assertEquals(LongWritable.class.getName(), ConfigUtils .getIntermediateOutputValueClass(confVertex1).getName()); assertEquals(IntWritable.class.getName(), ConfigUtils - .getIntermediateInputKeyClass(confVertex2).getName()); - assertEquals(IntWritable.class.getName(), ConfigUtils - .getIntermediateInputValueClass(confVertex2).getName()); - assertTrue(ConfigUtils.shouldCompressIntermediateOutput(confVertex1)); - assertTrue(ConfigUtils.isIntermediateInputCompressed(confVertex2)); - - // Verify edge between v2 and v3 + .getIntermediateInputKeyClass(confVertex1).getName()); assertEquals(LongWritable.class.getName(), ConfigUtils - .getIntermediateOutputKeyClass(confVertex2).getName()); - assertEquals(LongWritable.class.getName(), ConfigUtils - .getIntermediateOutputValueClass(confVertex2).getName()); - assertEquals(LongWritable.class.getName(), ConfigUtils - .getIntermediateInputKeyClass(confVertex3).getName()); - assertEquals(LongWritable.class.getName(), ConfigUtils - .getIntermediateInputValueClass(confVertex3).getName()); - assertFalse(ConfigUtils.shouldCompressIntermediateOutput(confVertex2)); - assertFalse(ConfigUtils.isIntermediateInputCompressed(confVertex3)); - - // Verify edge between v3 and v4 - assertEquals(ByteWritable.class.getName(), ConfigUtils - .getIntermediateOutputKeyClass(confVertex3).getName()); - assertEquals(ByteWritable.class.getName(), ConfigUtils - .getIntermediateOutputValueClass(confVertex3).getName()); - assertEquals(ByteWritable.class.getName(), ConfigUtils - .getIntermediateInputKeyClass(confVertex4).getName()); - assertEquals(ByteWritable.class.getName(), ConfigUtils - .getIntermediateInputValueClass(confVertex4).getName()); - assertTrue(ConfigUtils.shouldCompressIntermediateOutput(confVertex3)); - assertTrue(ConfigUtils.isIntermediateInputCompressed(confVertex4)); - - // Verify output params for first vertex. - assertFalse(ConfigUtils.shouldCompressIntermediateOutput(confVertex4)); - - // Verify Edge configuration - Configuration edge1OutputConf = MultiStageMRConfToTezTranslator - .getOutputConfOnEdge(confVertex1, confVertex2); - Configuration edge1InputConf = MultiStageMRConfToTezTranslator - .getInputConfOnEdge(confVertex1, confVertex2); - - assertEquals(IntWritable.class.getName(), ConfigUtils - .getIntermediateOutputKeyClass(edge1OutputConf).getName()); - assertEquals(IntWritable.class.getName(), ConfigUtils - .getIntermediateOutputValueClass(edge1OutputConf).getName()); - assertTrue(ConfigUtils.shouldCompressIntermediateOutput(edge1OutputConf)); - - assertEquals(IntWritable.class.getName(), ConfigUtils - .getIntermediateInputKeyClass(edge1InputConf).getName()); - assertEquals(IntWritable.class.getName(), ConfigUtils - .getIntermediateInputValueClass(edge1InputConf).getName()); - assertTrue(ConfigUtils.isIntermediateInputCompressed(edge1InputConf)); - + .getIntermediateInputValueClass(confVertex1).getName()); + assertTrue(ConfigUtils.shouldCompressIntermediateOutput(confVertex1)); + assertTrue(ConfigUtils.isIntermediateInputCompressed(confVertex1)); } } http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/97805278/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/hadoop/TestDeprecatedKeys.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/hadoop/TestDeprecatedKeys.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/hadoop/TestDeprecatedKeys.java index 2dde271..bbfdc24 100644 --- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/hadoop/TestDeprecatedKeys.java +++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/hadoop/TestDeprecatedKeys.java @@ -42,7 +42,7 @@ public class TestDeprecatedKeys { jobConf.setBoolean(MRJobConfig.REDUCE_MEMTOMEM_ENABLED, true); jobConf.setFloat(MRJobConfig.REDUCE_INPUT_BUFFER_PERCENT, 0.33f); - MultiStageMRConfToTezTranslator.translateVertexConfToTez(jobConf, null); + MRHelpers.translateVertexConfToTez(jobConf); assertEquals(0.4f, jobConf.getFloat( TezJobConfig.TEZ_RUNTIME_SHUFFLE_INPUT_BUFFER_PERCENT, 0f), 0.01f); @@ -70,7 +70,6 @@ public class TestDeprecatedKeys { */ public void verifyTezOverridenKeys() { JobConf jobConf = new JobConf(); - JobConf jobConf2 = new JobConf(); jobConf.setInt(MRJobConfig.IO_SORT_FACTOR, 2000); jobConf.setInt(MRJobConfig.IO_SORT_MB, 100); jobConf.setInt(MRJobConfig.COUNTERS_MAX_KEY, 100); @@ -97,14 +96,12 @@ public class TestDeprecatedKeys { jobConf.setFloat(TezJobConfig.TEZ_RUNTIME_INPUT_BUFFER_PERCENT, 10.0f); jobConf.set(TezJobConfig.TEZ_RUNTIME_INTERNAL_SORTER_CLASS, "DefaultSorter"); jobConf.set(TezJobConfig.TEZ_RUNTIME_GROUP_COMPARATOR_CLASS, "groupComparator"); - jobConf.set(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_KEY_SECONDARY_COMPARATOR_CLASS, "SecondaryComparator"); + jobConf.set(TezJobConfig.TEZ_RUNTIME_KEY_SECONDARY_COMPARATOR_CLASS, "SecondaryComparator"); jobConf.setBoolean(MRJobConfig.MAP_OUTPUT_COMPRESS, false); - jobConf2.setBoolean(MRJobConfig.MAP_OUTPUT_COMPRESS, false); - jobConf.setBoolean(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_SHOULD_COMPRESS, true); + jobConf.setBoolean(TezJobConfig.TEZ_RUNTIME_COMPRESS, true); - MultiStageMRConfToTezTranslator.translateVertexConfToTez(jobConf, null); - MultiStageMRConfToTezTranslator.translateVertexConfToTez(jobConf2, jobConf); + MRHelpers.translateVertexConfToTez(jobConf); assertEquals(1000, jobConf.getInt(TezJobConfig.TEZ_RUNTIME_IO_SORT_FACTOR, 0)); assertEquals(200, jobConf.getInt(TezJobConfig.TEZ_RUNTIME_IO_SORT_MB, 100)); @@ -127,10 +124,9 @@ public class TestDeprecatedKeys { assertEquals(10.0f, jobConf.getFloat(TezJobConfig.TEZ_RUNTIME_INPUT_BUFFER_PERCENT, 0.0f), 0.0f); assertEquals("DefaultSorter", jobConf.get(TezJobConfig.TEZ_RUNTIME_INTERNAL_SORTER_CLASS, "")); assertEquals("groupComparator", jobConf.get(TezJobConfig.TEZ_RUNTIME_GROUP_COMPARATOR_CLASS, "")); - assertEquals("SecondaryComparator", jobConf.get(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_KEY_SECONDARY_COMPARATOR_CLASS, "")); + assertEquals("SecondaryComparator", jobConf.get(TezJobConfig.TEZ_RUNTIME_KEY_SECONDARY_COMPARATOR_CLASS, "")); assertEquals("DefaultSorter", jobConf.get(TezJobConfig.TEZ_RUNTIME_INTERNAL_SORTER_CLASS, "")); - assertTrue(jobConf.getBoolean(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_SHOULD_COMPRESS, false)); - assertTrue(jobConf2.getBoolean(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_IS_COMPRESSED, false)); + assertTrue(jobConf.getBoolean(TezJobConfig.TEZ_RUNTIME_COMPRESS, false)); assertNull(jobConf.get(MRConfig.MAPRED_IFILE_READAHEAD)); assertNull(jobConf.get(MRConfig.MAPRED_IFILE_READAHEAD_BYTES)); http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/97805278/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java index 1bd24e7..a91ad31 100644 --- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java +++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java @@ -23,7 +23,6 @@ import java.util.Collections; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.DataInputBuffer; @@ -39,7 +38,6 @@ import org.apache.tez.dag.api.OutputDescriptor; import org.apache.tez.mapreduce.TestUmbilical; import org.apache.tez.mapreduce.hadoop.MRHelpers; import org.apache.tez.mapreduce.hadoop.MRJobConfig; -import org.apache.tez.mapreduce.hadoop.MultiStageMRConfToTezTranslator; import org.apache.tez.mapreduce.hadoop.MultiStageMRConfigUtil; import org.apache.tez.mapreduce.input.MRInputLegacy; import org.apache.tez.mapreduce.partition.MRPartitioner; @@ -108,30 +106,26 @@ public class TestMapProcessor { JobConf jobConf = new JobConf(defaultConf); setUpJobConf(jobConf); - Configuration conf = MultiStageMRConfToTezTranslator.convertMRToLinearTez(jobConf); - conf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID, 0); + MRHelpers.translateVertexConfToTez(jobConf); + jobConf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID, 0); - Configuration stageConf = MultiStageMRConfigUtil.getConfForVertex(conf, - vertexName); - - JobConf job = new JobConf(stageConf); - job.setBoolean(MRJobConfig.MR_TEZ_SPLITS_VIA_EVENTS, false); + jobConf.setBoolean(MRJobConfig.MR_TEZ_SPLITS_VIA_EVENTS, false); - job.set(MRFrameworkConfigs.TASK_LOCAL_RESOURCE_DIR, new Path(workDir, + jobConf.set(MRFrameworkConfigs.TASK_LOCAL_RESOURCE_DIR, new Path(workDir, "localized-resources").toUri().toString()); Path mapInput = new Path(workDir, "map0"); - MapUtils.generateInputSplit(localFs, workDir, job, mapInput); + MapUtils.generateInputSplit(localFs, workDir, jobConf, mapInput); InputSpec mapInputSpec = new InputSpec("NullSrcVertex", new InputDescriptor(MRInputLegacy.class.getName()) - .setUserPayload(MRHelpers.createMRInputPayload(job, null)), + .setUserPayload(MRHelpers.createMRInputPayload(jobConf, null)), 1); OutputSpec mapOutputSpec = new OutputSpec("NullDestVertex", new OutputDescriptor(LocalOnFileSorterOutput.class.getName()), 1); - LogicalIOProcessorRuntimeTask task = MapUtils.createLogicalTask(localFs, workDir, job, 0, + LogicalIOProcessorRuntimeTask task = MapUtils.createLogicalTask(localFs, workDir, jobConf, 0, new Path(workDir, "map0"), new TestUmbilical(), dagName, vertexName, Collections.singletonList(mapInputSpec), Collections.singletonList(mapOutputSpec)); http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/97805278/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java index 639cfcd..eff108b 100644 --- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java +++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java @@ -26,7 +26,6 @@ import java.util.Map; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; @@ -50,7 +49,6 @@ import org.apache.tez.mapreduce.TezTestUtils; import org.apache.tez.mapreduce.hadoop.IDConverter; import org.apache.tez.mapreduce.hadoop.MRHelpers; import org.apache.tez.mapreduce.hadoop.MRJobConfig; -import org.apache.tez.mapreduce.hadoop.MultiStageMRConfToTezTranslator; import org.apache.tez.mapreduce.hadoop.MultiStageMRConfigUtil; import org.apache.tez.mapreduce.input.MRInputLegacy; import org.apache.tez.mapreduce.output.MROutputLegacy; @@ -121,28 +119,23 @@ public class TestReduceProcessor { JobConf jobConf = new JobConf(defaultConf); setUpJobConf(jobConf); - Configuration conf = MultiStageMRConfToTezTranslator.convertMRToLinearTez(jobConf); - conf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID, 0); - - Configuration mapStageConf = MultiStageMRConfigUtil.getConfForVertex(conf, - mapVertexName); - - JobConf mapConf = new JobConf(mapStageConf); - - mapConf.set(MRFrameworkConfigs.TASK_LOCAL_RESOURCE_DIR, new Path(workDir, + MRHelpers.translateVertexConfToTez(jobConf); + jobConf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID, 0); + + jobConf.set(MRFrameworkConfigs.TASK_LOCAL_RESOURCE_DIR, new Path(workDir, "localized-resources").toUri().toString()); - mapConf.setBoolean(MRJobConfig.MR_TEZ_SPLITS_VIA_EVENTS, false); + jobConf.setBoolean(MRJobConfig.MR_TEZ_SPLITS_VIA_EVENTS, false); Path mapInput = new Path(workDir, "map0"); - MapUtils.generateInputSplit(localFs, workDir, mapConf, mapInput); + MapUtils.generateInputSplit(localFs, workDir, jobConf, mapInput); InputSpec mapInputSpec = new InputSpec("NullSrcVertex", new InputDescriptor(MRInputLegacy.class.getName()) - .setUserPayload(MRHelpers.createMRInputPayload(mapConf, null)), + .setUserPayload(MRHelpers.createMRInputPayload(jobConf, null)), 1); OutputSpec mapOutputSpec = new OutputSpec("NullDestVertex", new OutputDescriptor(LocalOnFileSorterOutput.class.getName()), 1); // Run a map - LogicalIOProcessorRuntimeTask mapTask = MapUtils.createLogicalTask(localFs, workDir, mapConf, 0, + LogicalIOProcessorRuntimeTask mapTask = MapUtils.createLogicalTask(localFs, workDir, jobConf, 0, mapInput, new TestUmbilical(), dagName, mapVertexName, Collections.singletonList(mapInputSpec), Collections.singletonList(mapOutputSpec)); @@ -154,16 +147,13 @@ public class TestReduceProcessor { LOG.info("Starting reduce..."); Token shuffleToken = new Token(); - - Configuration reduceStageConf = MultiStageMRConfigUtil.getConfForVertex(conf, - reduceVertexName); - JobConf reduceConf = new JobConf(reduceStageConf); - reduceConf.setOutputFormat(SequenceFileOutputFormat.class); - reduceConf.set(MRFrameworkConfigs.TASK_LOCAL_RESOURCE_DIR, new Path(workDir, + + jobConf.setOutputFormat(SequenceFileOutputFormat.class); + jobConf.set(MRFrameworkConfigs.TASK_LOCAL_RESOURCE_DIR, new Path(workDir, "localized-resources").toUri().toString()); - FileOutputFormat.setOutputPath(reduceConf, new Path(workDir, "output")); + FileOutputFormat.setOutputPath(jobConf, new Path(workDir, "output")); ProcessorDescriptor reduceProcessorDesc = new ProcessorDescriptor( - ReduceProcessor.class.getName()).setUserPayload(TezUtils.createUserPayloadFromConf(reduceConf)); + ReduceProcessor.class.getName()).setUserPayload(TezUtils.createUserPayloadFromConf(jobConf)); InputSpec reduceInputSpec = new InputSpec(mapVertexName, new InputDescriptor(LocalMergedInput.class.getName()), 1); @@ -186,7 +176,7 @@ public class TestReduceProcessor { LogicalIOProcessorRuntimeTask task = new LogicalIOProcessorRuntimeTask( taskSpec, 0, - reduceConf, + jobConf, new String[] {workDir.toString()}, new TestUmbilical(), serviceConsumerMetadata, @@ -211,7 +201,7 @@ public class TestReduceProcessor { Path reduceOutputFile = new Path(reduceOutputDir, "part-v001-o000-00000"); SequenceFile.Reader reader = new SequenceFile.Reader(localFs, - reduceOutputFile, reduceConf); + reduceOutputFile, jobConf); LongWritable key = new LongWritable(); Text value = new Text(); http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/97805278/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/ConfigUtils.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/ConfigUtils.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/ConfigUtils.java index 0749b2c..8077d77 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/ConfigUtils.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/ConfigUtils.java @@ -45,7 +45,7 @@ public class ConfigUtils { Configuration conf, Class defaultValue) { Class codecClass = defaultValue; String name = conf - .get(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_COMPRESS_CODEC); + .get(TezJobConfig.TEZ_RUNTIME_COMPRESS_CODEC); if (name != null) { try { codecClass = conf.getClassByName(name).asSubclass( @@ -62,7 +62,7 @@ public class ConfigUtils { Configuration conf, Class defaultValue) { Class codecClass = defaultValue; String name = conf - .get(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_COMPRESS_CODEC); + .get(TezJobConfig.TEZ_RUNTIME_COMPRESS_CODEC); if (name != null) { try { codecClass = conf.getClassByName(name).asSubclass( @@ -80,45 +80,45 @@ public class ConfigUtils { public static boolean shouldCompressIntermediateOutput(Configuration conf) { return conf.getBoolean( - TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_SHOULD_COMPRESS, false); + TezJobConfig.TEZ_RUNTIME_COMPRESS, false); } public static boolean isIntermediateInputCompressed(Configuration conf) { return conf.getBoolean( - TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_IS_COMPRESSED, false); + TezJobConfig.TEZ_RUNTIME_COMPRESS, false); } public static Class getIntermediateOutputValueClass(Configuration conf) { Class retv = (Class) conf.getClass( - TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_VALUE_CLASS, null, + TezJobConfig.TEZ_RUNTIME_VALUE_CLASS, null, Object.class); return retv; } public static Class getIntermediateInputValueClass(Configuration conf) { Class retv = (Class) conf.getClass( - TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_VALUE_CLASS, null, + TezJobConfig.TEZ_RUNTIME_VALUE_CLASS, null, Object.class); return retv; } public static Class getIntermediateOutputKeyClass(Configuration conf) { Class retv = (Class) conf.getClass( - TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_KEY_CLASS, null, + TezJobConfig.TEZ_RUNTIME_KEY_CLASS, null, Object.class); return retv; } public static Class getIntermediateInputKeyClass(Configuration conf) { Class retv = (Class) conf.getClass( - TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_KEY_CLASS, null, + TezJobConfig.TEZ_RUNTIME_KEY_CLASS, null, Object.class); return retv; } public static RawComparator getIntermediateOutputKeyComparator(Configuration conf) { Class theClass = conf.getClass( - TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_KEY_COMPARATOR_CLASS, null, + TezJobConfig.TEZ_RUNTIME_KEY_COMPARATOR_CLASS, null, RawComparator.class); if (theClass != null) return ReflectionUtils.newInstance(theClass, conf); @@ -128,7 +128,7 @@ public class ConfigUtils { public static RawComparator getIntermediateInputKeyComparator(Configuration conf) { Class theClass = conf.getClass( - TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_KEY_COMPARATOR_CLASS, null, + TezJobConfig.TEZ_RUNTIME_KEY_COMPARATOR_CLASS, null, RawComparator.class); if (theClass != null) return ReflectionUtils.newInstance(theClass, conf); @@ -143,7 +143,7 @@ public class ConfigUtils { Configuration conf) { Class theClass = conf .getClass( - TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_KEY_SECONDARY_COMPARATOR_CLASS, + TezJobConfig.TEZ_RUNTIME_KEY_SECONDARY_COMPARATOR_CLASS, null, RawComparator.class); if (theClass == null) { return getIntermediateInputKeyComparator(conf); http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/97805278/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OnFileSortedOutputConfiguration.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OnFileSortedOutputConfiguration.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OnFileSortedOutputConfiguration.java index 44ebf2f..e6952c7 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OnFileSortedOutputConfiguration.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OnFileSortedOutputConfiguration.java @@ -202,14 +202,14 @@ public class OnFileSortedOutputConfiguration { @InterfaceAudience.Private Builder setKeyClassName(String keyClassName) { Preconditions.checkNotNull(keyClassName, "Key class name cannot be null"); - this.conf.set(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_KEY_CLASS, keyClassName); + this.conf.set(TezJobConfig.TEZ_RUNTIME_KEY_CLASS, keyClassName); return this; } @InterfaceAudience.Private Builder setValueClassName(String valueClassName) { Preconditions.checkNotNull(valueClassName, "Value class name cannot be null"); - this.conf.set(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_VALUE_CLASS, valueClassName); + this.conf.set(TezJobConfig.TEZ_RUNTIME_VALUE_CLASS, valueClassName); return this; } @@ -292,16 +292,16 @@ public class OnFileSortedOutputConfiguration { * @return instance of the current builder */ public Builder setKeyComparatorClass(String comparatorClassName) { - this.conf.set(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_KEY_COMPARATOR_CLASS, + this.conf.set(TezJobConfig.TEZ_RUNTIME_KEY_COMPARATOR_CLASS, comparatorClassName); return this; } public Builder enableCompression(String compressionCodec) { - this.conf.setBoolean(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_SHOULD_COMPRESS, true); + this.conf.setBoolean(TezJobConfig.TEZ_RUNTIME_COMPRESS, true); if (compressionCodec != null) { this.conf - .set(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_COMPRESS_CODEC, compressionCodec); + .set(TezJobConfig.TEZ_RUNTIME_COMPRESS_CODEC, compressionCodec); } return this; } http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/97805278/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OnFileUnorderedKVOutputConfiguration.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OnFileUnorderedKVOutputConfiguration.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OnFileUnorderedKVOutputConfiguration.java index e13e075..e5a5de9 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OnFileUnorderedKVOutputConfiguration.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OnFileUnorderedKVOutputConfiguration.java @@ -151,14 +151,14 @@ public class OnFileUnorderedKVOutputConfiguration { @InterfaceAudience.Private Builder setKeyClassName(String keyClassName) { Preconditions.checkNotNull(keyClassName, "Key class name cannot be null"); - this.conf.set(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_KEY_CLASS, keyClassName); + this.conf.set(TezJobConfig.TEZ_RUNTIME_KEY_CLASS, keyClassName); return this; } @InterfaceAudience.Private Builder setValueClassName(String valueClassName) { Preconditions.checkNotNull(valueClassName, "Value class name cannot be null"); - this.conf.set(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_VALUE_CLASS, valueClassName); + this.conf.set(TezJobConfig.TEZ_RUNTIME_VALUE_CLASS, valueClassName); return this; } @@ -200,10 +200,10 @@ public class OnFileUnorderedKVOutputConfiguration { } public Builder enableCompression(String compressionCodec) { - this.conf.setBoolean(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_SHOULD_COMPRESS, true); + this.conf.setBoolean(TezJobConfig.TEZ_RUNTIME_COMPRESS, true); if (compressionCodec != null) { this.conf - .set(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_COMPRESS_CODEC, compressionCodec); + .set(TezJobConfig.TEZ_RUNTIME_COMPRESS_CODEC, compressionCodec); } return this; } http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/97805278/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OnFileUnorderedPartitionedKVOutputConfiguration.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OnFileUnorderedPartitionedKVOutputConfiguration.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OnFileUnorderedPartitionedKVOutputConfiguration.java index dc8f34d..ce6ae03 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OnFileUnorderedPartitionedKVOutputConfiguration.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OnFileUnorderedPartitionedKVOutputConfiguration.java @@ -171,14 +171,14 @@ public class OnFileUnorderedPartitionedKVOutputConfiguration { @InterfaceAudience.Private Builder setKeyClassName(String keyClassName) { Preconditions.checkNotNull(keyClassName, "Key class name cannot be null"); - this.conf.set(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_KEY_CLASS, keyClassName); + this.conf.set(TezJobConfig.TEZ_RUNTIME_KEY_CLASS, keyClassName); return this; } @InterfaceAudience.Private Builder setValueClassName(String valueClassName) { Preconditions.checkNotNull(valueClassName, "Value class name cannot be null"); - this.conf.set(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_VALUE_CLASS, valueClassName); + this.conf.set(TezJobConfig.TEZ_RUNTIME_VALUE_CLASS, valueClassName); return this; } @@ -239,10 +239,10 @@ public class OnFileUnorderedPartitionedKVOutputConfiguration { } public Builder enableCompression(String compressionCodec) { - this.conf.setBoolean(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_SHOULD_COMPRESS, true); + this.conf.setBoolean(TezJobConfig.TEZ_RUNTIME_COMPRESS, true); if (compressionCodec != null) { this.conf - .set(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_COMPRESS_CODEC, compressionCodec); + .set(TezJobConfig.TEZ_RUNTIME_COMPRESS_CODEC, compressionCodec); } return this; } http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/97805278/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OrderedPartitionedKVEdgeConfiguration.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OrderedPartitionedKVEdgeConfiguration.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OrderedPartitionedKVEdgeConfiguration.java index be1a92e..94d1d27 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OrderedPartitionedKVEdgeConfiguration.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OrderedPartitionedKVEdgeConfiguration.java @@ -27,7 +27,6 @@ import org.apache.hadoop.conf.Configuration; import org.apache.tez.dag.api.EdgeProperty; import org.apache.tez.dag.api.InputDescriptor; import org.apache.tez.dag.api.OutputDescriptor; -import org.apache.tez.runtime.library.input.ShuffledMergedInput; import org.apache.tez.runtime.library.output.OnFileSortedOutput; /** @@ -74,7 +73,7 @@ public class OrderedPartitionedKVEdgeConfiguration extends HadoopKeyValuesBasedB @Override public String getInputClassName() { - return ShuffledMergedInput.class.getName(); + return inputConf.getInputClassName(); } /**