Return-Path: X-Original-To: apmail-tez-commits-archive@minotaur.apache.org Delivered-To: apmail-tez-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 3007810B2D for ; Wed, 25 Sep 2013 07:35:52 +0000 (UTC) Received: (qmail 42230 invoked by uid 500); 25 Sep 2013 07:34:28 -0000 Delivered-To: apmail-tez-commits-archive@tez.apache.org Received: (qmail 40903 invoked by uid 500); 25 Sep 2013 07:32:51 -0000 Mailing-List: contact commits-help@tez.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@tez.incubator.apache.org Delivered-To: mailing list commits@tez.incubator.apache.org Received: (qmail 40348 invoked by uid 99); 25 Sep 2013 07:32:16 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 25 Sep 2013 07:32:16 +0000 X-ASF-Spam-Status: No, hits=-2002.3 required=5.0 tests=ALL_TRUSTED,RP_MATCHES_RCVD X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO mail.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with SMTP; Wed, 25 Sep 2013 07:31:51 +0000 Received: (qmail 39420 invoked by uid 99); 25 Sep 2013 07:31:11 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 25 Sep 2013 07:31:11 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id C930E9095E8; Wed, 25 Sep 2013 07:31:09 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sseth@apache.org To: commits@tez.incubator.apache.org Date: Wed, 25 Sep 2013 07:31:33 -0000 Message-Id: <2e9b572112044bc4a49e05e39732c21b@git.apache.org> In-Reply-To: <951a7e7fa257470e83418fce839114b5@git.apache.org> References: <951a7e7fa257470e83418fce839114b5@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [26/50] [abbrv] git commit: TEZ-474. Fix TestMapProcessor and TestReduceProcessor unit tests (part of TEZ-398). (sseth) X-Virus-Checked: Checked by ClamAV on apache.org TEZ-474. Fix TestMapProcessor and TestReduceProcessor unit tests (part of TEZ-398). (sseth) Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/c5a8a3c6 Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/c5a8a3c6 Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/c5a8a3c6 Branch: refs/heads/master Commit: c5a8a3c6ee72dcf5ec34e28fd87d2685e8b9bb1d Parents: d316f72 Author: Siddharth Seth Authored: Mon Sep 23 23:23:39 2013 -0700 Committer: Siddharth Seth Committed: Mon Sep 23 23:23:39 2013 -0700 ---------------------------------------------------------------------- .../org/apache/tez/common/TezJobConfig.java | 1 + .../apache/hadoop/mapred/YarnOutputFiles.java | 236 ------------------ .../tez/engine/lib/input/LocalMergedInput.java | 8 +- .../engine/lib/input/ShuffledMergedInput.java | 2 +- .../lib/input/ShuffledMergedInputLegacy.java | 30 +++ .../lib/output/LocalOnFileSorterOutput.java | 7 +- .../engine/lib/output/OnFileSortedOutput.java | 4 + .../LogicalIOProcessorRuntimeTask.java | 46 ++++ .../tez/mapreduce/examples/MRRSleepJob.java | 2 +- .../mapreduce/examples/OrderedWordCount.java | 2 +- .../apache/tez/mapreduce/TestMRRJobsDAGApi.java | 2 +- .../input/ShuffledMergedInputLegacy.java | 29 --- .../apache/tez/mapreduce/processor/MRTask.java | 5 - .../processor/reduce/ReduceProcessor.java | 2 +- .../mapreduce/task/impl/YarnOutputFiles.java | 239 ------------------- .../org/apache/tez/mapreduce/TestUmbilical.java | 62 +++++ .../tez/mapreduce/TestUmbilicalProtocol.java | 91 ------- .../tez/mapreduce/processor/MapUtils.java | 28 ++- .../processor/map/TestMapProcessor.java | 31 ++- .../processor/reduce/TestReduceProcessor.java | 59 +++-- .../org/apache/tez/mapreduce/YARNRunner.java | 2 +- 21 files changed, 233 insertions(+), 655 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/c5a8a3c6/tez-api/src/main/java/org/apache/tez/common/TezJobConfig.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/common/TezJobConfig.java b/tez-api/src/main/java/org/apache/tez/common/TezJobConfig.java index 7c4540c..2c4b911 100644 --- a/tez-api/src/main/java/org/apache/tez/common/TezJobConfig.java +++ b/tez-api/src/main/java/org/apache/tez/common/TezJobConfig.java @@ -64,6 +64,7 @@ public class TezJobConfig { /** * List of directories avialble to the engine. */ + @Private public static final String LOCAL_DIRS = "tez.engine.local.dirs"; public static final String DEFAULT_LOCAL_DIRS = "/tmp"; http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/c5a8a3c6/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnOutputFiles.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnOutputFiles.java b/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnOutputFiles.java deleted file mode 100644 index e43cf47..0000000 --- a/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnOutputFiles.java +++ /dev/null @@ -1,236 +0,0 @@ -/** -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ - -package org.apache.hadoop.mapred; - -import java.io.IOException; - -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.classification.InterfaceStability; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.LocalDirAllocator; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.mapreduce.JobContext; -import org.apache.hadoop.mapreduce.MRConfig; - -/** - * Manipulate the working area for the transient store for maps and reduces. - * - * This class is used by map and reduce tasks to identify the directories that - * they need to write to/read from for intermediate files. The callers of - * these methods are from child space. - */ -@InterfaceAudience.Private -@InterfaceStability.Unstable -public class YarnOutputFiles extends MapOutputFile { - - private JobConf conf; - - private static final String JOB_OUTPUT_DIR = "output"; - private static final String SPILL_FILE_PATTERN = "%s_spill_%d.out"; - private static final String SPILL_INDEX_FILE_PATTERN = SPILL_FILE_PATTERN - + ".index"; - - public YarnOutputFiles() { - } - - // assume configured to $localdir/usercache/$user/appcache/$appId - private LocalDirAllocator lDirAlloc = - new LocalDirAllocator(MRConfig.LOCAL_DIR); - - private Path getAttemptOutputDir() { - return new Path(JOB_OUTPUT_DIR, conf.get(JobContext.TASK_ATTEMPT_ID)); - } - - /** - * Return the path to local map output file created earlier - * - * @return path - * @throws IOException - */ - public Path getOutputFile() throws IOException { - Path attemptOutput = - new Path(getAttemptOutputDir(), MAP_OUTPUT_FILENAME_STRING); - return lDirAlloc.getLocalPathToRead(attemptOutput.toString(), conf); - } - - /** - * Create a local map output file name. - * - * @param size the size of the file - * @return path - * @throws IOException - */ - public Path getOutputFileForWrite(long size) throws IOException { - Path attemptOutput = - new Path(getAttemptOutputDir(), MAP_OUTPUT_FILENAME_STRING); - return lDirAlloc.getLocalPathForWrite(attemptOutput.toString(), size, conf); - } - - /** - * Create a local map output file name on the same volume. - */ - public Path getOutputFileForWriteInVolume(Path existing) { - Path outputDir = new Path(existing.getParent(), JOB_OUTPUT_DIR); - Path attemptOutputDir = new Path(outputDir, - conf.get(JobContext.TASK_ATTEMPT_ID)); - return new Path(attemptOutputDir, MAP_OUTPUT_FILENAME_STRING); - } - - /** - * Return the path to a local map output index file created earlier - * - * @return path - * @throws IOException - */ - public Path getOutputIndexFile() throws IOException { - Path attemptIndexOutput = - new Path(getAttemptOutputDir(), MAP_OUTPUT_FILENAME_STRING + - MAP_OUTPUT_INDEX_SUFFIX_STRING); - return lDirAlloc.getLocalPathToRead(attemptIndexOutput.toString(), conf); - } - - /** - * Create a local map output index file name. - * - * @param size the size of the file - * @return path - * @throws IOException - */ - public Path getOutputIndexFileForWrite(long size) throws IOException { - Path attemptIndexOutput = - new Path(getAttemptOutputDir(), MAP_OUTPUT_FILENAME_STRING + - MAP_OUTPUT_INDEX_SUFFIX_STRING); - return lDirAlloc.getLocalPathForWrite(attemptIndexOutput.toString(), - size, conf); - } - - /** - * Create a local map output index file name on the same volume. - */ - public Path getOutputIndexFileForWriteInVolume(Path existing) { - Path outputDir = new Path(existing.getParent(), JOB_OUTPUT_DIR); - Path attemptOutputDir = new Path(outputDir, - conf.get(JobContext.TASK_ATTEMPT_ID)); - return new Path(attemptOutputDir, MAP_OUTPUT_FILENAME_STRING + - MAP_OUTPUT_INDEX_SUFFIX_STRING); - } - - /** - * Return a local map spill file created earlier. - * - * @param spillNumber the number - * @return path - * @throws IOException - */ - public Path getSpillFile(int spillNumber) throws IOException { - return lDirAlloc.getLocalPathToRead( - String.format(SPILL_FILE_PATTERN, - conf.get(JobContext.TASK_ATTEMPT_ID), spillNumber), conf); - } - - /** - * Create a local map spill file name. - * - * @param spillNumber the number - * @param size the size of the file - * @return path - * @throws IOException - */ - public Path getSpillFileForWrite(int spillNumber, long size) - throws IOException { - return lDirAlloc.getLocalPathForWrite( - String.format(String.format(SPILL_FILE_PATTERN, - conf.get(JobContext.TASK_ATTEMPT_ID), spillNumber)), size, conf); - } - - /** - * Return a local map spill index file created earlier - * - * @param spillNumber the number - * @return path - * @throws IOException - */ - public Path getSpillIndexFile(int spillNumber) throws IOException { - return lDirAlloc.getLocalPathToRead( - String.format(SPILL_INDEX_FILE_PATTERN, - conf.get(JobContext.TASK_ATTEMPT_ID), spillNumber), conf); - } - - /** - * Create a local map spill index file name. - * - * @param spillNumber the number - * @param size the size of the file - * @return path - * @throws IOException - */ - public Path getSpillIndexFileForWrite(int spillNumber, long size) - throws IOException { - return lDirAlloc.getLocalPathForWrite( - String.format(SPILL_INDEX_FILE_PATTERN, - conf.get(JobContext.TASK_ATTEMPT_ID), spillNumber), size, conf); - } - - /** - * Return a local reduce input file created earlier - * - * @param mapId a map task id - * @return path - * @throws IOException - */ - public Path getInputFile(int mapId) throws IOException { - throw new UnsupportedOperationException("Incompatible with LocalRunner"); - } - - /** - * Create a local reduce input file name. - * - * @param mapId a map task id - * @param size the size of the file - * @return path - * @throws IOException - */ - public Path getInputFileForWrite(org.apache.hadoop.mapreduce.TaskID mapId, - long size) throws IOException { - return lDirAlloc.getLocalPathForWrite(String.format( - REDUCE_INPUT_FILE_FORMAT_STRING, - getAttemptOutputDir().toString(), mapId.getId()), - size, conf); - } - - /** Removes all of the files related to a task. */ - public void removeAll() throws IOException { - throw new UnsupportedOperationException("Incompatible with LocalRunner"); - } - - @Override - public void setConf(Configuration conf) { - if (conf instanceof JobConf) { - this.conf = (JobConf) conf; - } else { - this.conf = new JobConf(conf); - } - } - - @Override - public Configuration getConf() { - return conf; - } - -} http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/c5a8a3c6/tez-engine/src/main/java/org/apache/tez/engine/lib/input/LocalMergedInput.java ---------------------------------------------------------------------- diff --git a/tez-engine/src/main/java/org/apache/tez/engine/lib/input/LocalMergedInput.java b/tez-engine/src/main/java/org/apache/tez/engine/lib/input/LocalMergedInput.java index ed57c61..6371787 100644 --- a/tez-engine/src/main/java/org/apache/tez/engine/lib/input/LocalMergedInput.java +++ b/tez-engine/src/main/java/org/apache/tez/engine/lib/input/LocalMergedInput.java @@ -31,11 +31,7 @@ import org.apache.tez.engine.common.localshuffle.LocalShuffle; * LocalMergedInput in an {@link LogicalInput} which shuffles intermediate * sorted data, merges them and provides key/ to the consumer. */ -public class LocalMergedInput extends ShuffledMergedInput { - - - // TODO NEWTEZ Fix CombineProcessor - //private CombineInput raw; +public class LocalMergedInput extends ShuffledMergedInputLegacy { @Override public List initialize(TezInputContext inputContext) throws IOException { @@ -43,8 +39,8 @@ public class LocalMergedInput extends ShuffledMergedInput { this.conf = TezUtils.createConfFromUserPayload(inputContext.getUserPayload()); LocalShuffle localShuffle = new LocalShuffle(inputContext, conf, numInputs); - // TODO NEWTEZ async run and checkIfComplete methods rawIter = localShuffle.run(); + createValuesIterator(); return Collections.emptyList(); } http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/c5a8a3c6/tez-engine/src/main/java/org/apache/tez/engine/lib/input/ShuffledMergedInput.java ---------------------------------------------------------------------- diff --git a/tez-engine/src/main/java/org/apache/tez/engine/lib/input/ShuffledMergedInput.java b/tez-engine/src/main/java/org/apache/tez/engine/lib/input/ShuffledMergedInput.java index 3db0632..a984b0f 100644 --- a/tez-engine/src/main/java/org/apache/tez/engine/lib/input/ShuffledMergedInput.java +++ b/tez-engine/src/main/java/org/apache/tez/engine/lib/input/ShuffledMergedInput.java @@ -156,7 +156,7 @@ public class ShuffledMergedInput implements LogicalInput { } @SuppressWarnings({ "rawtypes", "unchecked" }) - private void createValuesIterator() + protected void createValuesIterator() throws IOException { vIter = new ValuesIterator(rawIter, (RawComparator) ConfigUtils.getIntermediateInputKeyComparator(conf), http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/c5a8a3c6/tez-engine/src/main/java/org/apache/tez/engine/lib/input/ShuffledMergedInputLegacy.java ---------------------------------------------------------------------- diff --git a/tez-engine/src/main/java/org/apache/tez/engine/lib/input/ShuffledMergedInputLegacy.java b/tez-engine/src/main/java/org/apache/tez/engine/lib/input/ShuffledMergedInputLegacy.java new file mode 100644 index 0000000..f2da031 --- /dev/null +++ b/tez-engine/src/main/java/org/apache/tez/engine/lib/input/ShuffledMergedInputLegacy.java @@ -0,0 +1,30 @@ +/** + * ShuffleMergedInput in a {@link LogicalInput} which shuffles + * intermediate sorted data, merges them and provides key/ to the + * consumer. + * + * The Copy and Merge will be triggered by the initialization - which is handled + * by the Tez framework. Input is not consumable until the Copy and Merge are + * complete. Methods are provided to check for this, as well as to wait for + * completion. Attempting to get a reader on a non-complete input will block. + * + */ + +package org.apache.tez.engine.lib.input; + +import java.io.IOException; + +import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate; +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.tez.engine.common.sort.impl.TezRawKeyValueIterator; + +@LimitedPrivate("mapreduce") +public class ShuffledMergedInputLegacy extends ShuffledMergedInput { + + @Private + public TezRawKeyValueIterator getIterator() throws IOException, InterruptedException { + // wait for input so that iterator is available + waitForInputReady(); + return rawIter; + } +} http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/c5a8a3c6/tez-engine/src/main/java/org/apache/tez/engine/lib/output/LocalOnFileSorterOutput.java ---------------------------------------------------------------------- diff --git a/tez-engine/src/main/java/org/apache/tez/engine/lib/output/LocalOnFileSorterOutput.java b/tez-engine/src/main/java/org/apache/tez/engine/lib/output/LocalOnFileSorterOutput.java index b24e10d..7fd26d7 100644 --- a/tez-engine/src/main/java/org/apache/tez/engine/lib/output/LocalOnFileSorterOutput.java +++ b/tez-engine/src/main/java/org/apache/tez/engine/lib/output/LocalOnFileSorterOutput.java @@ -48,11 +48,16 @@ public class LocalOnFileSorterOutput extends OnFileSortedOutput { outputContext.getTaskIndex(), localFs.getFileStatus(src).getLen()); + LOG.info("Renaming src = " + src + ", dst = " + dst); if (LOG.isDebugEnabled()) { LOG.debug("Renaming src = " + src + ", dst = " + dst); } localFs.rename(src, dst); - // TODO NEWTEZ Event generation. + return null; + } + + @Override + protected List generateDataMovementEventsOnClose() throws IOException { return null; } } http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/c5a8a3c6/tez-engine/src/main/java/org/apache/tez/engine/lib/output/OnFileSortedOutput.java ---------------------------------------------------------------------- diff --git a/tez-engine/src/main/java/org/apache/tez/engine/lib/output/OnFileSortedOutput.java b/tez-engine/src/main/java/org/apache/tez/engine/lib/output/OnFileSortedOutput.java index 685722e..9c9eba0 100644 --- a/tez-engine/src/main/java/org/apache/tez/engine/lib/output/OnFileSortedOutput.java +++ b/tez-engine/src/main/java/org/apache/tez/engine/lib/output/OnFileSortedOutput.java @@ -93,6 +93,10 @@ public class OnFileSortedOutput implements LogicalOutput { sorter.close(); this.endTime = System.nanoTime(); + return generateDataMovementEventsOnClose(); + } + + protected List generateDataMovementEventsOnClose() throws IOException { String host = System.getenv(ApplicationConstants.Environment.NM_HOST .toString()); ByteBuffer shuffleMetadata = outputContext http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/c5a8a3c6/tez-engine/src/main/java/org/apache/tez/engine/newruntime/LogicalIOProcessorRuntimeTask.java ---------------------------------------------------------------------- diff --git a/tez-engine/src/main/java/org/apache/tez/engine/newruntime/LogicalIOProcessorRuntimeTask.java b/tez-engine/src/main/java/org/apache/tez/engine/newruntime/LogicalIOProcessorRuntimeTask.java index bfd898b..29063f9 100644 --- a/tez-engine/src/main/java/org/apache/tez/engine/newruntime/LogicalIOProcessorRuntimeTask.java +++ b/tez-engine/src/main/java/org/apache/tez/engine/newruntime/LogicalIOProcessorRuntimeTask.java @@ -61,6 +61,7 @@ import org.apache.tez.engine.api.impl.EventMetaData.EventProducerConsumerType; import org.apache.tez.engine.common.security.JobTokenIdentifier; import org.apache.tez.engine.shuffle.common.ShuffleUtils; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; @Private @@ -75,6 +76,10 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask { private final List outputSpecs; private final List outputs; + private List inputContexts; + private List outputContexts; + private TezProcessorContext processorContext; + private final ProcessorDescriptor processorDescriptor; private final LogicalIOProcessor processor; @@ -95,6 +100,8 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask { super(taskSpec, tezConf, tezUmbilical); LOG.info("Initializing LogicalIOProcessorRuntimeTask with TaskSpec: " + taskSpec); + this.inputContexts = new ArrayList(taskSpec.getInputs().size()); + this.outputContexts = new ArrayList(taskSpec.getOutputs().size()); this.inputSpecs = taskSpec.getInputs(); this.inputs = createInputs(inputSpecs); this.outputSpecs = taskSpec.getOutputs(); @@ -185,6 +192,7 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask { private void initializeInput(Input input, InputSpec inputSpec) throws Exception { TezInputContext tezInputContext = createInputContext(inputSpec); + inputContexts.add(tezInputContext); if (input instanceof LogicalInput) { ((LogicalInput) input).setNumPhysicalInputs(inputSpec .getPhysicalEdgeCount()); @@ -199,6 +207,7 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask { private void initializeOutput(Output output, OutputSpec outputSpec) throws Exception { TezOutputContext tezOutputContext = createOutputContext(outputSpec); + outputContexts.add(tezOutputContext); if (output instanceof LogicalOutput) { ((LogicalOutput) output).setNumPhysicalOutputs(outputSpec .getPhysicalEdgeCount()); @@ -215,6 +224,7 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask { LOG.info("Initializing processor" + ", processorClassName=" + processorDescriptor.getClassName()); TezProcessorContext processorContext = createProcessorContext(); + this.processorContext = processorContext; processor.initialize(processorContext); } @@ -425,5 +435,41 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask { eventRouterThread.interrupt(); } } + + @Private + @VisibleForTesting + public List getInputContexts() { + return this.inputContexts; + } + + @Private + @VisibleForTesting + public List getOutputContexts() { + return this.outputContexts; + } + @Private + @VisibleForTesting + public TezProcessorContext getProcessorContext() { + return this.processorContext; + } + + @Private + @VisibleForTesting + public Map getInputs() { + return this.inputMap; + } + + @Private + @VisibleForTesting + public Map getOutputs() { + return this.outputMap; + } + + @Private + @VisibleForTesting + public LogicalIOProcessor getProcessor() { + return this.processor; + } + } http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/c5a8a3c6/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java index 429d458..05675b5 100644 --- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java +++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java @@ -80,12 +80,12 @@ import org.apache.tez.dag.api.client.DAGStatus; import org.apache.tez.engine.common.objectregistry.ObjectLifeCycle; import org.apache.tez.engine.common.objectregistry.ObjectRegistry; import org.apache.tez.engine.common.objectregistry.ObjectRegistryFactory; +import org.apache.tez.engine.lib.input.ShuffledMergedInputLegacy; import org.apache.tez.engine.lib.output.OnFileSortedOutput; import org.apache.tez.mapreduce.hadoop.InputSplitInfo; import org.apache.tez.mapreduce.hadoop.MRHelpers; import org.apache.tez.mapreduce.hadoop.MultiStageMRConfToTezTranslator; import org.apache.tez.mapreduce.hadoop.MultiStageMRConfigUtil; -import org.apache.tez.mapreduce.input.ShuffledMergedInputLegacy; import org.apache.tez.mapreduce.processor.map.MapProcessor; import org.apache.tez.mapreduce.processor.reduce.ReduceProcessor; http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/c5a8a3c6/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/OrderedWordCount.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/OrderedWordCount.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/OrderedWordCount.java index 07fe58a..ec419c1 100644 --- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/OrderedWordCount.java +++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/OrderedWordCount.java @@ -70,12 +70,12 @@ import org.apache.tez.dag.api.EdgeProperty.DataSourceType; import org.apache.tez.dag.api.EdgeProperty.SchedulingType; import org.apache.tez.dag.api.client.DAGClient; import org.apache.tez.dag.api.client.DAGStatus; +import org.apache.tez.engine.lib.input.ShuffledMergedInputLegacy; import org.apache.tez.engine.lib.output.OnFileSortedOutput; import org.apache.tez.mapreduce.hadoop.InputSplitInfo; import org.apache.tez.mapreduce.hadoop.MRHelpers; import org.apache.tez.mapreduce.hadoop.MRJobConfig; import org.apache.tez.mapreduce.hadoop.MultiStageMRConfToTezTranslator; -import org.apache.tez.mapreduce.input.ShuffledMergedInputLegacy; import org.apache.tez.mapreduce.processor.map.MapProcessor; import org.apache.tez.mapreduce.processor.reduce.ReduceProcessor; http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/c5a8a3c6/tez-mapreduce-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java b/tez-mapreduce-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java index aca5b8e..7e662cb 100644 --- a/tez-mapreduce-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java +++ b/tez-mapreduce-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java @@ -68,6 +68,7 @@ import org.apache.tez.dag.api.EdgeProperty.SchedulingType; import org.apache.tez.dag.api.client.DAGClient; import org.apache.tez.dag.api.client.DAGStatus; import org.apache.tez.dag.api.client.DAGStatus.State; +import org.apache.tez.engine.lib.input.ShuffledMergedInputLegacy; import org.apache.tez.engine.lib.output.OnFileSortedOutput; import org.apache.tez.mapreduce.examples.MRRSleepJob; import org.apache.tez.mapreduce.examples.MRRSleepJob.ISleepReducer; @@ -79,7 +80,6 @@ import org.apache.tez.mapreduce.hadoop.InputSplitInfo; import org.apache.tez.mapreduce.hadoop.MRHelpers; import org.apache.tez.mapreduce.hadoop.MRJobConfig; import org.apache.tez.mapreduce.hadoop.MultiStageMRConfToTezTranslator; -import org.apache.tez.mapreduce.input.ShuffledMergedInputLegacy; import org.apache.tez.mapreduce.processor.map.MapProcessor; import org.apache.tez.mapreduce.processor.reduce.ReduceProcessor; import org.junit.AfterClass; http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/c5a8a3c6/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/ShuffledMergedInputLegacy.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/ShuffledMergedInputLegacy.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/ShuffledMergedInputLegacy.java deleted file mode 100644 index 2d230d6..0000000 --- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/ShuffledMergedInputLegacy.java +++ /dev/null @@ -1,29 +0,0 @@ -/** - * ShuffleMergedInput in a {@link LogicalInput} which shuffles - * intermediate sorted data, merges them and provides key/ to the - * consumer. - * - * The Copy and Merge will be triggered by the initialization - which is handled - * by the Tez framework. Input is not consumable until the Copy and Merge are - * complete. Methods are provided to check for this, as well as to wait for - * completion. Attempting to get a reader on a non-complete input will block. - * - */ - -package org.apache.tez.mapreduce.input; - -import java.io.IOException; - -import org.apache.hadoop.classification.InterfaceAudience.Private; -import org.apache.tez.engine.common.sort.impl.TezRawKeyValueIterator; -import org.apache.tez.engine.lib.input.ShuffledMergedInput; - -public class ShuffledMergedInputLegacy extends ShuffledMergedInput { - - @Private - public TezRawKeyValueIterator getIterator() throws IOException, InterruptedException { - // wait for input so that iterator is available - waitForInputReady(); - return rawIter; - } -} http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/c5a8a3c6/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/MRTask.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/MRTask.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/MRTask.java index 1a01466..f7404d4 100644 --- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/MRTask.java +++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/MRTask.java @@ -44,7 +44,6 @@ import org.apache.hadoop.io.RawComparator; import org.apache.hadoop.mapred.FileAlreadyExistsException; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.JobContext; -import org.apache.hadoop.mapred.MapOutputFile; import org.apache.hadoop.mapred.RawKeyValueIterator; import org.apache.hadoop.mapred.TaskAttemptContext; import org.apache.hadoop.mapred.TaskAttemptID; @@ -83,7 +82,6 @@ import org.apache.tez.mapreduce.hadoop.MRJobConfig; import org.apache.tez.mapreduce.hadoop.mapred.TaskAttemptContextImpl; import org.apache.tez.mapreduce.hadoop.mapreduce.JobContextImpl; import org.apache.tez.mapreduce.output.SimpleOutput; -import org.apache.tez.mapreduce.task.impl.YarnOutputFiles; @SuppressWarnings("deprecation") public abstract class MRTask { @@ -204,9 +202,6 @@ public abstract class MRTask { // Containers. // Set it in conf, so as to be able to be used the the OutputCommitter. - jobConf.setClass(MRConfig.TASK_LOCAL_OUTPUT_CLASS, YarnOutputFiles.class, - MapOutputFile.class); // MR - // Not needed. This is probably being set via the source/consumer meta Token jobToken = TokenCache.getJobToken(credentials); if (jobToken != null) { http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/c5a8a3c6/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java index 9210187..9274765 100644 --- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java +++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java @@ -46,8 +46,8 @@ import org.apache.tez.engine.api.LogicalOutput; import org.apache.tez.engine.api.TezProcessorContext; import org.apache.tez.engine.common.ConfigUtils; import org.apache.tez.engine.common.sort.impl.TezRawKeyValueIterator; +import org.apache.tez.engine.lib.input.ShuffledMergedInputLegacy; import org.apache.tez.engine.lib.output.OnFileSortedOutput; -import org.apache.tez.mapreduce.input.ShuffledMergedInputLegacy; import org.apache.tez.mapreduce.output.SimpleOutput; import org.apache.tez.mapreduce.processor.MRTask; import org.apache.tez.mapreduce.processor.MRTaskReporter; http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/c5a8a3c6/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/task/impl/YarnOutputFiles.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/task/impl/YarnOutputFiles.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/task/impl/YarnOutputFiles.java deleted file mode 100644 index e28e474..0000000 --- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/task/impl/YarnOutputFiles.java +++ /dev/null @@ -1,239 +0,0 @@ -/** -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ - -package org.apache.tez.mapreduce.task.impl; - -import java.io.IOException; - -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.classification.InterfaceStability; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.LocalDirAllocator; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.MapOutputFile; -import org.apache.hadoop.mapreduce.JobContext; -import org.apache.hadoop.mapreduce.MRConfig; -import org.apache.tez.common.Constants; - -/** - * Manipulate the working area for the transient store for maps and reduces. - * - * This class is used by map and reduce tasks to identify the directories that - * they need to write to/read from for intermediate files. The callers of - * these methods are from child space. - */ -@InterfaceAudience.Private -@InterfaceStability.Unstable -public class YarnOutputFiles extends MapOutputFile { - - private JobConf conf; - - private static final String JOB_OUTPUT_DIR = "output"; - private static final String SPILL_FILE_PATTERN = "%s_spill_%d.out"; - private static final String SPILL_INDEX_FILE_PATTERN = SPILL_FILE_PATTERN - + ".index"; - - public YarnOutputFiles() { - } - - // assume configured to $localdir/usercache/$user/appcache/$appId - private LocalDirAllocator lDirAlloc = - new LocalDirAllocator(MRConfig.LOCAL_DIR); - - private Path getAttemptOutputDir() { - return new Path(JOB_OUTPUT_DIR, conf.get(JobContext.TASK_ATTEMPT_ID)); - } - - /** - * Return the path to local map output file created earlier - * - * @return path - * @throws IOException - */ - public Path getOutputFile() throws IOException { - Path attemptOutput = - new Path(getAttemptOutputDir(), Constants.MAP_OUTPUT_FILENAME_STRING); - return lDirAlloc.getLocalPathToRead(attemptOutput.toString(), conf); - } - - /** - * Create a local map output file name. - * - * @param size the size of the file - * @return path - * @throws IOException - */ - public Path getOutputFileForWrite(long size) throws IOException { - Path attemptOutput = - new Path(getAttemptOutputDir(), Constants.MAP_OUTPUT_FILENAME_STRING); - return lDirAlloc.getLocalPathForWrite(attemptOutput.toString(), size, conf); - } - - /** - * Create a local map output file name on the same volume. - */ - public Path getOutputFileForWriteInVolume(Path existing) { - Path outputDir = new Path(existing.getParent(), JOB_OUTPUT_DIR); - Path attemptOutputDir = new Path(outputDir, - conf.get(JobContext.TASK_ATTEMPT_ID)); - return new Path(attemptOutputDir, Constants.MAP_OUTPUT_FILENAME_STRING); - } - - /** - * Return the path to a local map output index file created earlier - * - * @return path - * @throws IOException - */ - public Path getOutputIndexFile() throws IOException { - Path attemptIndexOutput = - new Path(getAttemptOutputDir(), Constants.MAP_OUTPUT_FILENAME_STRING + - Constants.MAP_OUTPUT_INDEX_SUFFIX_STRING); - return lDirAlloc.getLocalPathToRead(attemptIndexOutput.toString(), conf); - } - - /** - * Create a local map output index file name. - * - * @param size the size of the file - * @return path - * @throws IOException - */ - public Path getOutputIndexFileForWrite(long size) throws IOException { - Path attemptIndexOutput = - new Path(getAttemptOutputDir(), Constants.MAP_OUTPUT_FILENAME_STRING + - Constants.MAP_OUTPUT_INDEX_SUFFIX_STRING); - return lDirAlloc.getLocalPathForWrite(attemptIndexOutput.toString(), - size, conf); - } - - /** - * Create a local map output index file name on the same volume. - */ - public Path getOutputIndexFileForWriteInVolume(Path existing) { - Path outputDir = new Path(existing.getParent(), JOB_OUTPUT_DIR); - Path attemptOutputDir = new Path(outputDir, - conf.get(JobContext.TASK_ATTEMPT_ID)); - return new Path(attemptOutputDir, Constants.MAP_OUTPUT_FILENAME_STRING + - Constants.MAP_OUTPUT_INDEX_SUFFIX_STRING); - } - - /** - * Return a local map spill file created earlier. - * - * @param spillNumber the number - * @return path - * @throws IOException - */ - public Path getSpillFile(int spillNumber) throws IOException { - return lDirAlloc.getLocalPathToRead( - String.format(SPILL_FILE_PATTERN, - conf.get(JobContext.TASK_ATTEMPT_ID), spillNumber), conf); - } - - /** - * Create a local map spill file name. - * - * @param spillNumber the number - * @param size the size of the file - * @return path - * @throws IOException - */ - public Path getSpillFileForWrite(int spillNumber, long size) - throws IOException { - return lDirAlloc.getLocalPathForWrite( - String.format(String.format(SPILL_FILE_PATTERN, - conf.get(JobContext.TASK_ATTEMPT_ID), spillNumber)), size, conf); - } - - /** - * Return a local map spill index file created earlier - * - * @param spillNumber the number - * @return path - * @throws IOException - */ - public Path getSpillIndexFile(int spillNumber) throws IOException { - return lDirAlloc.getLocalPathToRead( - String.format(SPILL_INDEX_FILE_PATTERN, - conf.get(JobContext.TASK_ATTEMPT_ID), spillNumber), conf); - } - - /** - * Create a local map spill index file name. - * - * @param spillNumber the number - * @param size the size of the file - * @return path - * @throws IOException - */ - public Path getSpillIndexFileForWrite(int spillNumber, long size) - throws IOException { - return lDirAlloc.getLocalPathForWrite( - String.format(SPILL_INDEX_FILE_PATTERN, - conf.get(JobContext.TASK_ATTEMPT_ID), spillNumber), size, conf); - } - - /** - * Return a local reduce input file created earlier - * - * @param mapId a map task id - * @return path - * @throws IOException - */ - public Path getInputFile(int mapId) throws IOException { - throw new UnsupportedOperationException("Incompatible with LocalRunner"); - } - - /** - * Create a local reduce input file name. - * - * @param mapId a map task id - * @param size the size of the file - * @return path - * @throws IOException - */ - public Path getInputFileForWrite(org.apache.hadoop.mapreduce.TaskID mapId, - long size) throws IOException { - return lDirAlloc.getLocalPathForWrite(String.format( - Constants.REDUCE_INPUT_FILE_FORMAT_STRING, - getAttemptOutputDir().toString(), mapId.getId()), - size, conf); - } - - /** Removes all of the files related to a task. */ - public void removeAll() throws IOException { - throw new UnsupportedOperationException("Incompatible with LocalRunner"); - } - - @Override - public void setConf(Configuration conf) { - if (conf instanceof JobConf) { - this.conf = (JobConf) conf; - } else { - this.conf = new JobConf(conf); - } - } - - @Override - public Configuration getConf() { - return conf; - } - -} http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/c5a8a3c6/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/TestUmbilical.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/TestUmbilical.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/TestUmbilical.java new file mode 100644 index 0000000..9de2ed1 --- /dev/null +++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/TestUmbilical.java @@ -0,0 +1,62 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.mapreduce; + +import java.io.IOException; +import java.util.Collection; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.tez.dag.records.TezTaskAttemptID; +import org.apache.tez.engine.api.impl.EventMetaData; +import org.apache.tez.engine.api.impl.TezEvent; +import org.apache.tez.engine.api.impl.TezUmbilical; + +public class TestUmbilical implements TezUmbilical { + + private static final Log LOG = LogFactory.getLog(TestUmbilical.class); + + public TestUmbilical() { + } + + @Override + public void addEvents(Collection events) { + if (events != null && events.size() > 0) { + LOG.info("#Events Received: " + events.size()); + for (TezEvent event : events) { + LOG.info("Event: " + event); + } + } + } + + @Override + public void signalFatalError(TezTaskAttemptID taskAttemptID, + String diagnostics, EventMetaData sourceInfo) { + LOG.info("Received fatal error from task: " + taskAttemptID + + ", Diagnostics: " + diagnostics); + + } + + @Override + public boolean canCommit(TezTaskAttemptID taskAttemptID) throws IOException { + LOG.info("Got canCommit from task: " + taskAttemptID); + return true; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/c5a8a3c6/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/TestUmbilicalProtocol.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/TestUmbilicalProtocol.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/TestUmbilicalProtocol.java deleted file mode 100644 index d5823f7..0000000 --- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/TestUmbilicalProtocol.java +++ /dev/null @@ -1,91 +0,0 @@ -/** -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ - -package org.apache.tez.mapreduce; - -import java.io.IOException; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.ipc.ProtocolSignature; -import org.apache.tez.common.ContainerContext; -import org.apache.tez.common.ContainerTask; -import org.apache.tez.common.TezTaskUmbilicalProtocol; -import org.apache.tez.common.records.ProceedToCompletionResponse; -import org.apache.tez.dag.records.TezTaskAttemptID; -import org.apache.tez.engine.api.impl.TezHeartbeatRequest; -import org.apache.tez.engine.api.impl.TezHeartbeatResponse; - -public class TestUmbilicalProtocol implements TezTaskUmbilicalProtocol { - - private static final Log LOG = LogFactory.getLog(TestUmbilicalProtocol.class); - private ProceedToCompletionResponse proceedToCompletionResponse; - - - public TestUmbilicalProtocol() { - proceedToCompletionResponse = new ProceedToCompletionResponse(false, true); - } - - public TestUmbilicalProtocol(boolean shouldLinger) { - if (shouldLinger) { - proceedToCompletionResponse = new ProceedToCompletionResponse(false, false); - } else { - proceedToCompletionResponse = new ProceedToCompletionResponse(false, true); - } - } - - @Override - public ProtocolSignature getProtocolSignature(String arg0, long arg1, int arg2) - throws IOException { - // TODO Auto-generated method stub - return null; - } - - @Override - public long getProtocolVersion(String arg0, long arg1) throws IOException { - // TODO Auto-generated method stub - return 0; - } - - @Override - public ContainerTask getTask(ContainerContext containerContext) - throws IOException { - // TODO Auto-generated method stub - return null; - } - - @Override - public boolean canCommit(TezTaskAttemptID taskid) throws IOException { - LOG.info("Got 'can-commit' from " + taskid); - return true; - } - - @Override - public ProceedToCompletionResponse proceedToCompletion( - TezTaskAttemptID taskAttemptId) throws IOException { - return proceedToCompletionResponse; - } - - @Override - public TezHeartbeatResponse heartbeat(TezHeartbeatRequest request) { - // TODO Auto-generated method stub - // TODO TODONEWTEZ - return null; - } - -} http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/c5a8a3c6/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/MapUtils.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/MapUtils.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/MapUtils.java index 85e6653..4b2c0e8 100644 --- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/MapUtils.java +++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/MapUtils.java @@ -43,13 +43,16 @@ import org.apache.hadoop.mapred.SequenceFileInputFormat; import org.apache.hadoop.mapreduce.split.JobSplit; import org.apache.hadoop.mapreduce.split.JobSplit.SplitMetaInfo; import org.apache.hadoop.mapreduce.split.SplitMetaInfoReaderTez; +import org.apache.hadoop.security.token.Token; import org.apache.hadoop.util.DiskChecker.DiskErrorException; import org.apache.tez.common.TezJobConfig; +import org.apache.tez.common.TezUtils; import org.apache.tez.dag.api.ProcessorDescriptor; import org.apache.tez.engine.api.impl.InputSpec; import org.apache.tez.engine.api.impl.OutputSpec; import org.apache.tez.engine.api.impl.TaskSpec; import org.apache.tez.engine.api.impl.TezUmbilical; +import org.apache.tez.engine.common.security.JobTokenIdentifier; import org.apache.tez.engine.newruntime.LogicalIOProcessorRuntimeTask; import org.apache.tez.mapreduce.TezTestUtils; import org.apache.tez.mapreduce.hadoop.MRJobConfig; @@ -108,6 +111,7 @@ public class MapUtils { throws IOException { FileInputFormat.setInputPaths(job, workDir); + LOG.info("Generating data at path: " + file); // create a file with length entries @SuppressWarnings("deprecation") SequenceFile.Writer writer = @@ -147,6 +151,7 @@ public class MapUtils { InputSplit split) throws IOException { Path jobSplitFile = new Path(conf.get(TezJobConfig.TASK_LOCAL_RESOURCE_DIR, TezJobConfig.DEFAULT_TASK_LOCAL_RESOURCE_DIR), MRJobConfig.JOB_SPLIT); + LOG.info("Writing split to: " + jobSplitFile); FSDataOutputStream out = FileSystem.create(fs, jobSplitFile, new FsPermission(JOB_FILE_PERMISSION)); @@ -173,17 +178,23 @@ public class MapUtils { outMeta.close(); } - public static LogicalIOProcessorRuntimeTask runMapProcessor(FileSystem fs, Path workDir, + public static void generateInputSplit(FileSystem fs, Path workDir, JobConf jobConf, Path mapInput) throws IOException { + jobConf.setInputFormat(SequenceFileInputFormat.class); + InputSplit split = createInputSplit(fs, workDir, jobConf, mapInput); + writeSplitFiles(fs, jobConf, split); + } + + public static LogicalIOProcessorRuntimeTask createLogicalTask(FileSystem fs, Path workDir, JobConf jobConf, int mapId, Path mapInput, TezUmbilical umbilical, String vertexName, List inputSpecs, List outputSpecs) throws Exception { jobConf.setInputFormat(SequenceFileInputFormat.class); - InputSplit split = createInputSplit(fs, workDir, jobConf, mapInput); ProcessorDescriptor mapProcessorDesc = new ProcessorDescriptor( - MapProcessor.class.getName()); - writeSplitFiles(fs, jobConf, split); + MapProcessor.class.getName()).setUserPayload(TezUtils.createUserPayloadFromConf(jobConf)); + + Token shuffleToken = new Token(); TaskSpec taskSpec = new TaskSpec( TezTestUtils.getMockTaskAttemptId(0, 0, mapId, 0), @@ -192,16 +203,13 @@ public class MapUtils { mapProcessorDesc, inputSpecs, outputSpecs); - - // TODO NEWTEZ Fix umbilical access + LogicalIOProcessorRuntimeTask task = new LogicalIOProcessorRuntimeTask( taskSpec, - 1, + 0, jobConf, umbilical, - null); - task.initialize(); - task.run(); + shuffleToken); return task; } } http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/c5a8a3c6/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java index 2ecce8b..06e2f4b 100644 --- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java +++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java @@ -30,23 +30,26 @@ import org.apache.hadoop.io.DataInputBuffer; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapreduce.MRConfig; import org.apache.tez.common.Constants; import org.apache.tez.common.TezJobConfig; import org.apache.tez.dag.api.InputDescriptor; import org.apache.tez.dag.api.OutputDescriptor; +import org.apache.tez.engine.api.TezInputContext; import org.apache.tez.engine.api.impl.InputSpec; import org.apache.tez.engine.api.impl.OutputSpec; -import org.apache.tez.engine.api.impl.TezUmbilical; import org.apache.tez.engine.common.InputAttemptIdentifier; import org.apache.tez.engine.common.sort.impl.IFile; import org.apache.tez.engine.common.task.local.output.TezLocalTaskOutputFiles; import org.apache.tez.engine.common.task.local.output.TezTaskOutput; import org.apache.tez.engine.lib.output.LocalOnFileSorterOutput; import org.apache.tez.engine.newruntime.LogicalIOProcessorRuntimeTask; +import org.apache.tez.mapreduce.TestUmbilical; import org.apache.tez.mapreduce.hadoop.MRJobConfig; import org.apache.tez.mapreduce.hadoop.MultiStageMRConfToTezTranslator; import org.apache.tez.mapreduce.hadoop.MultiStageMRConfigUtil; import org.apache.tez.mapreduce.input.SimpleInputLegacy; +import org.apache.tez.mapreduce.partition.MRPartitioner; import org.apache.tez.mapreduce.processor.MapUtils; import org.junit.After; import org.junit.Before; @@ -67,6 +70,7 @@ public class TestMapProcessor { workDir = new Path(new Path(System.getProperty("test.build.data", "/tmp")), "TestMapProcessor").makeQualified(localFs); + LOG.info("Using workDir: " + workDir); MapUtils.configureLocalDirs(defaultConf, workDir.toString()); } catch (IOException e) { throw new RuntimeException("init failure", e); @@ -79,10 +83,12 @@ public class TestMapProcessor { public void setUpJobConf(JobConf job) { job.set(TezJobConfig.LOCAL_DIRS, workDir.toString()); + job.set(MRConfig.LOCAL_DIR, workDir.toString()); job.setClass( Constants.TEZ_ENGINE_TASK_OUTPUT_MANAGER, TezLocalTaskOutputFiles.class, TezTaskOutput.class); + job.set(TezJobConfig.TEZ_ENGINE_PARTITIONER_CLASS, MRPartitioner.class.getName()); job.setNumReduceTasks(1); } @@ -97,7 +103,6 @@ public class TestMapProcessor { String vertexName = MultiStageMRConfigUtil.getInitialMapVertexName(); JobConf jobConf = new JobConf(defaultConf); setUpJobConf(jobConf); - TezTaskOutput mapOutputs = new TezLocalTaskOutputFiles(jobConf, "TODONEWTEZ_uniqueId"); Configuration conf = MultiStageMRConfToTezTranslator.convertMRToLinearTez(jobConf); conf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID, 0); @@ -110,15 +115,27 @@ public class TestMapProcessor { job.set(TezJobConfig.TASK_LOCAL_RESOURCE_DIR, new Path(workDir, "localized-resources").toUri().toString()); - InputSpec mapInputSpec = new InputSpec("NullSrcVertex", new InputDescriptor(SimpleInputLegacy.class.getName()), 1); + Path mapInput = new Path(workDir, "map0"); + + + MapUtils.generateInputSplit(localFs, workDir, job, mapInput); + + InputSpec mapInputSpec = new InputSpec("NullSrcVertex", new InputDescriptor(SimpleInputLegacy.class.getName()), 0); OutputSpec mapOutputSpec = new OutputSpec("NullDestVertex", new OutputDescriptor(LocalOnFileSorterOutput.class.getName()), 1); - // TODO NEWTEZ FIXME TezUmbilical handling - LogicalIOProcessorRuntimeTask t = MapUtils.runMapProcessor(localFs, workDir, job, 0, - new Path(workDir, "map0"), (TezUmbilical) null, vertexName, + LogicalIOProcessorRuntimeTask task = MapUtils.createLogicalTask(localFs, workDir, job, 0, + new Path(workDir, "map0"), new TestUmbilical(), vertexName, Collections.singletonList(mapInputSpec), Collections.singletonList(mapOutputSpec)); - + + task.initialize(); + task.run(); + task.close(); + + TezInputContext inputContext = task.getInputContexts().get(0); + TezTaskOutput mapOutputs = new TezLocalTaskOutputFiles(jobConf, inputContext.getUniqueIdentifier()); + + // TODO NEWTEZ FIXME OutputCommitter verification // MRTask mrTask = (MRTask)t.getProcessor(); // Assert.assertEquals(TezNullOutputCommitter.class.getName(), mrTask http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/c5a8a3c6/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java index 1d35f9b..a3abd76 100644 --- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java +++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java @@ -31,20 +31,24 @@ import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.FileOutputFormat; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.SequenceFileOutputFormat; +import org.apache.hadoop.mapreduce.MRConfig; +import org.apache.hadoop.security.token.Token; import org.apache.tez.common.Constants; import org.apache.tez.common.TezJobConfig; +import org.apache.tez.common.TezUtils; import org.apache.tez.dag.api.InputDescriptor; import org.apache.tez.dag.api.OutputDescriptor; import org.apache.tez.dag.api.ProcessorDescriptor; import org.apache.tez.engine.api.impl.InputSpec; import org.apache.tez.engine.api.impl.OutputSpec; import org.apache.tez.engine.api.impl.TaskSpec; -import org.apache.tez.engine.api.impl.TezUmbilical; +import org.apache.tez.engine.common.security.JobTokenIdentifier; import org.apache.tez.engine.common.task.local.output.TezLocalTaskOutputFiles; import org.apache.tez.engine.common.task.local.output.TezTaskOutput; import org.apache.tez.engine.lib.input.LocalMergedInput; import org.apache.tez.engine.lib.output.LocalOnFileSorterOutput; import org.apache.tez.engine.newruntime.LogicalIOProcessorRuntimeTask; +import org.apache.tez.mapreduce.TestUmbilical; import org.apache.tez.mapreduce.TezTestUtils; import org.apache.tez.mapreduce.hadoop.IDConverter; import org.apache.tez.mapreduce.hadoop.MRJobConfig; @@ -52,8 +56,8 @@ import org.apache.tez.mapreduce.hadoop.MultiStageMRConfToTezTranslator; import org.apache.tez.mapreduce.hadoop.MultiStageMRConfigUtil; import org.apache.tez.mapreduce.input.SimpleInputLegacy; import org.apache.tez.mapreduce.output.SimpleOutput; +import org.apache.tez.mapreduce.partition.MRPartitioner; import org.apache.tez.mapreduce.processor.MapUtils; -import org.apache.tez.mapreduce.processor.reduce.ReduceProcessor; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -75,7 +79,7 @@ public class TestReduceProcessor { workDir = new Path(new Path(System.getProperty("test.build.data", "/tmp")), "TestReduceProcessor").makeQualified(localFs); - + LOG.info("Using workDir: " + workDir); MapUtils.configureLocalDirs(defaultConf, workDir.toString()); } catch (IOException e) { throw new RuntimeException("init failure", e); @@ -84,10 +88,12 @@ public class TestReduceProcessor { public void setUpJobConf(JobConf job) { job.set(TezJobConfig.LOCAL_DIRS, workDir.toString()); + job.set(MRConfig.LOCAL_DIR, workDir.toString()); job.setClass( Constants.TEZ_ENGINE_TASK_OUTPUT_MANAGER, TezLocalTaskOutputFiles.class, TezTaskOutput.class); + job.set(TezJobConfig.TEZ_ENGINE_PARTITIONER_CLASS, MRPartitioner.class.getName()); job.setNumReduceTasks(1); } @@ -104,10 +110,10 @@ public class TestReduceProcessor { String reduceVertexName = MultiStageMRConfigUtil.getFinalReduceVertexName(); JobConf jobConf = new JobConf(defaultConf); setUpJobConf(jobConf); - TezTaskOutput mapOutputs = new TezLocalTaskOutputFiles(jobConf, "TODONEWTEZ_uniqueId"); Configuration conf = MultiStageMRConfToTezTranslator.convertMRToLinearTez(jobConf); conf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID, 0); + Configuration mapStageConf = MultiStageMRConfigUtil.getConfForVertex(conf, mapVertexName); @@ -116,19 +122,24 @@ public class TestReduceProcessor { mapConf.set(TezJobConfig.TASK_LOCAL_RESOURCE_DIR, new Path(workDir, "localized-resources").toUri().toString()); - InputSpec mapInputSpec = new InputSpec("NullSrcVertex", new InputDescriptor( - SimpleInputLegacy.class.getName()), 0); - OutputSpec mapOutputSpec = new OutputSpec("NullDestVertex", new OutputDescriptor( - LocalOnFileSorterOutput.class.getName()), 1); + Path mapInput = new Path(workDir, "map0"); + MapUtils.generateInputSplit(localFs, workDir, mapConf, mapInput); + + InputSpec mapInputSpec = new InputSpec("NullSrcVertex", new InputDescriptor(SimpleInputLegacy.class.getName()), 0); + OutputSpec mapOutputSpec = new OutputSpec("NullDestVertex", new OutputDescriptor(LocalOnFileSorterOutput.class.getName()), 1); // Run a map - // TODO NEWTEZ FIX Umbilical creation - MapUtils.runMapProcessor(localFs, workDir, mapConf, 0, - new Path(workDir, "map0"), (TezUmbilical) null, mapVertexName, + LogicalIOProcessorRuntimeTask mapTask = MapUtils.createLogicalTask(localFs, workDir, mapConf, 0, + mapInput, new TestUmbilical(), mapVertexName, Collections.singletonList(mapInputSpec), Collections.singletonList(mapOutputSpec)); + mapTask.initialize(); + mapTask.run(); + mapTask.close(); + LOG.info("Starting reduce..."); + Token shuffleToken = new Token(); Configuration reduceStageConf = MultiStageMRConfigUtil.getConfForVertex(conf, reduceVertexName); @@ -138,7 +149,7 @@ public class TestReduceProcessor { "localized-resources").toUri().toString()); FileOutputFormat.setOutputPath(reduceConf, new Path(workDir, "output")); ProcessorDescriptor reduceProcessorDesc = new ProcessorDescriptor( - ReduceProcessor.class.getName()); + ReduceProcessor.class.getName()).setUserPayload(TezUtils.createUserPayloadFromConf(reduceConf)); InputSpec reduceInputSpec = new InputSpec(mapVertexName, new InputDescriptor(LocalMergedInput.class.getName()), 1); OutputSpec reduceOutputSpec = new OutputSpec("NullDestinationVertex", new OutputDescriptor(SimpleOutput.class.getName()), 1); @@ -151,28 +162,26 @@ public class TestReduceProcessor { reduceProcessorDesc, Collections.singletonList(reduceInputSpec), Collections.singletonList(reduceOutputSpec)); - - // TODO NEWTEZ FIXME Umbilical and jobToken + LogicalIOProcessorRuntimeTask task = new LogicalIOProcessorRuntimeTask( taskSpec, - 1, + 0, reduceConf, - (TezUmbilical) null, - null); + new TestUmbilical(), + shuffleToken); task.initialize(); task.run(); - -// MRTask mrTask = (MRTask)t.getProcessor(); -// TODO NEWTEZ Verify the partitioner has been created -// Assert.assertNull(mrTask.getPartitioner()); task.close(); - // Can this be done via some utility class ? MapOutputFile derivative, or - // instantiating the OutputCommitter - + // MRTask mrTask = (MRTask)t.getProcessor(); + // TODO NEWTEZ Verify the partitioner has not been created + // Likely not applicable anymore. + // Assert.assertNull(mrTask.getPartitioner()); + + - // TODO NEWTEZ FIXME uniqueId generation and event generation (mockTaskId will not work here) + // Only a task commit happens, hence the data is still in the temporary directory. Path reduceOutputDir = new Path(new Path(workDir, "output"), "_temporary/0/" + IDConverter .toMRTaskId(TezTestUtils.getMockTaskId(0, 1, 0))); http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/c5a8a3c6/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YARNRunner.java ---------------------------------------------------------------------- diff --git a/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YARNRunner.java b/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YARNRunner.java index 56f9035..6496b55 100644 --- a/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YARNRunner.java +++ b/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YARNRunner.java @@ -95,13 +95,13 @@ import org.apache.tez.dag.api.Vertex; import org.apache.tez.dag.api.VertexLocationHint.TaskLocationHint; import org.apache.tez.dag.api.client.DAGClient; import org.apache.tez.dag.api.client.DAGStatus; +import org.apache.tez.engine.lib.input.ShuffledMergedInputLegacy; import org.apache.tez.engine.lib.output.OnFileSortedOutput; import org.apache.tez.mapreduce.hadoop.DeprecatedKeys; import org.apache.tez.mapreduce.hadoop.MRHelpers; import org.apache.tez.mapreduce.hadoop.MRJobConfig; import org.apache.tez.mapreduce.hadoop.MultiStageMRConfToTezTranslator; import org.apache.tez.mapreduce.hadoop.MultiStageMRConfigUtil; -import org.apache.tez.mapreduce.input.ShuffledMergedInputLegacy; import org.apache.tez.mapreduce.processor.map.MapProcessor; import org.apache.tez.mapreduce.processor.reduce.ReduceProcessor;