Return-Path: X-Original-To: apmail-tez-commits-archive@minotaur.apache.org Delivered-To: apmail-tez-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 8039210C0C for ; Fri, 8 Aug 2014 18:43:21 +0000 (UTC) Received: (qmail 54442 invoked by uid 500); 8 Aug 2014 18:43:21 -0000 Delivered-To: apmail-tez-commits-archive@tez.apache.org Received: (qmail 54385 invoked by uid 500); 8 Aug 2014 18:43:21 -0000 Mailing-List: contact commits-help@tez.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@tez.apache.org Delivered-To: mailing list commits@tez.apache.org Received: (qmail 54376 invoked by uid 99); 8 Aug 2014 18:43:21 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 08 Aug 2014 18:43:21 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id E75AC8A1265; Fri, 8 Aug 2014 18:43:20 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: bikas@apache.org To: commits@tez.apache.org Date: Fri, 08 Aug 2014 18:43:20 -0000 Message-Id: <73d610358e484d12be46b77ae9c53a9d@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [1/2] git commit: TEZ-1394. Create example code for OrderedWordCount (bikas) Repository: tez Updated Branches: refs/heads/master 47ccad809 -> 5328978f4 TEZ-1394. Create example code for OrderedWordCount (bikas) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/ed32980f Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/ed32980f Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/ed32980f Branch: refs/heads/master Commit: ed32980f8352615aee9af8cbc8c865749fa37d4c Parents: 47ccad8 Author: Bikas Saha Authored: Fri Aug 8 11:42:51 2014 -0700 Committer: Bikas Saha Committed: Fri Aug 8 11:42:51 2014 -0700 ---------------------------------------------------------------------- CHANGES.txt | 1 + INSTALL.txt | 10 +- .../apache/tez/examples/OrderedWordCount.java | 215 ++++++++ .../tez/mapreduce/examples/ExampleDriver.java | 3 + .../mapreduce/examples/OrderedWordCount.java | 527 ------------------- .../tez/mapreduce/examples/WordCount.java | 82 +-- .../apache/tez/mapreduce/TestMRRJobsDAGApi.java | 197 +++++++ .../org/apache/tez/test/TestSecureShuffle.java | 4 +- .../java/org/apache/tez/test/TestTezJobs.java | 248 +-------- 9 files changed, 454 insertions(+), 833 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/ed32980f/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index c1e5450..228e682 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -49,6 +49,7 @@ INCOMPATIBLE CHANGES TEZ-1379. Allow EdgeConfigurers to accept Configuration for Comparators. Change the way partitioner, comparator, combiner confs are set (from Hadoop Configuration to Map). Rename specific Input/Output classes from *Configuration to *Configurer. TEZ-1382. Change ObjectRegistry API to allow for future extensions TEZ-1386. TezGroupedSplitsInputFormat should not need to be setup to enable grouping. + TEZ-1394. Create example code for OrderedWordCount Release 0.4.0-incubating: 2014-04-05 http://git-wip-us.apache.org/repos/asf/tez/blob/ed32980f/INSTALL.txt ---------------------------------------------------------------------- diff --git a/INSTALL.txt b/INSTALL.txt index 1328997..6756b8f 100644 --- a/INSTALL.txt +++ b/INSTALL.txt @@ -40,25 +40,25 @@ $HADOOP_PREFIX/bin/hadoop jar hadoop-mapreduce-client-jobclient-2.2.0-tests.jar This will use the TEZ DAG ApplicationMaster to run the MR job. This can be verified by looking at the AM's logs from the YARN ResourceManager UI. -7) There is a basic example of using an MRR job in the tez-mapreduce-examples.jar. Refer to OrderedWordCount.java +7) There is a basic example of using an MRR job in the tez-mapreduce-examples.jar. Refer to TestOrderedWordCount.java in the source code. To run this example: -$HADOOP_PREFIX/bin/hadoop jar tez-mapreduce-examples.jar orderedwordcount +$HADOOP_PREFIX/bin/hadoop jar tez-mapreduce-examples.jar testorderedwordcount This will use the TEZ DAG ApplicationMaster to run the ordered word count job. This job is similar to the word count example except that it also orders all words based on the frequency of occurrence. -There are multiple variations to run orderedwordcount. You can use it to run multiple +There are multiple variations to run testorderedwordcount. You can use it to run multiple DAGs serially on different inputs/outputs. These DAGs could be run separately as different applications or serially within a single TEZ session. -$HADOOP_PREFIX/bin/hadoop jar tez-mapreduce-examples.jar orderedwordcount ... +$HADOOP_PREFIX/bin/hadoop jar tez-mapreduce-examples.jar testorderedwordcount ... The above will run multiple DAGs for each input-output pair. To use TEZ sessions, set -DUSE_TEZ_SESSION=true -$HADOOP_PREFIX/bin/hadoop jar tez-mapreduce-examples.jar orderedwordcount -DUSE_TEZ_SESSION=true +$HADOOP_PREFIX/bin/hadoop jar tez-mapreduce-examples.jar testorderedwordcount -DUSE_TEZ_SESSION=true http://git-wip-us.apache.org/repos/asf/tez/blob/ed32980f/tez-mapreduce-examples/src/main/java/org/apache/tez/examples/OrderedWordCount.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/examples/OrderedWordCount.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/examples/OrderedWordCount.java new file mode 100644 index 0000000..d0fd83b --- /dev/null +++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/examples/OrderedWordCount.java @@ -0,0 +1,215 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.examples; + +import java.io.IOException; +import java.util.Map; +import java.util.TreeMap; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; +import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; +import org.apache.hadoop.util.GenericOptionsParser; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; +import org.apache.hadoop.yarn.api.records.LocalResource; +import org.apache.tez.client.TezClient; +import org.apache.tez.dag.api.DAG; +import org.apache.tez.dag.api.DataSinkDescriptor; +import org.apache.tez.dag.api.DataSourceDescriptor; +import org.apache.tez.dag.api.Edge; +import org.apache.tez.dag.api.ProcessorDescriptor; +import org.apache.tez.dag.api.TezConfiguration; +import org.apache.tez.dag.api.Vertex; +import org.apache.tez.dag.api.client.DAGClient; +import org.apache.tez.dag.api.client.DAGStatus; +import org.apache.tez.mapreduce.examples.WordCount.TokenProcessor; +import org.apache.tez.mapreduce.hadoop.MRHelpers; +import org.apache.tez.mapreduce.input.MRInput; +import org.apache.tez.mapreduce.output.MROutput; +import org.apache.tez.mapreduce.processor.SimpleMRProcessor; +import org.apache.tez.runtime.api.TezProcessorContext; +import org.apache.tez.runtime.library.api.KeyValueWriter; +import org.apache.tez.runtime.library.api.KeyValuesReader; +import org.apache.tez.runtime.library.conf.OrderedPartitionedKVEdgeConfigurer; +import org.apache.tez.runtime.library.partitioner.HashPartitioner; +import org.apache.tez.runtime.library.processor.SimpleProcessor; + +import com.google.common.base.Preconditions; + +public class OrderedWordCount extends Configured implements Tool { + + public static class SumProcessor extends SimpleProcessor { + public SumProcessor(TezProcessorContext context) { + super(context); + } + + @Override + public void run() throws Exception { + Preconditions.checkArgument(getInputs().size() == 1); + Preconditions.checkArgument(getOutputs().size() == 1); + KeyValueWriter kvWriter = (KeyValueWriter) getOutputs().values().iterator().next().getWriter(); + KeyValuesReader kvReader = (KeyValuesReader) getInputs().values().iterator().next() + .getReader(); + while (kvReader.next()) { + Text word = (Text) kvReader.getCurrentKey(); + int sum = 0; + for (Object value : kvReader.getCurrentValues()) { + sum += ((IntWritable) value).get(); + } + kvWriter.write(new IntWritable(sum), word); + } + } + } + + /** + * OrderPartitionedEdge ensures that we get the data sorted by + * the sum key which means that the result is totally ordered + */ + public static class NoOpSorter extends SimpleMRProcessor { + + public NoOpSorter(TezProcessorContext context) { + super(context); + } + + @Override + public void run() throws Exception { + Preconditions.checkArgument(getInputs().size() == 1); + Preconditions.checkArgument(getOutputs().size() == 1); + KeyValueWriter kvWriter = (KeyValueWriter) getOutputs().values().iterator().next().getWriter(); + KeyValuesReader kvReader = (KeyValuesReader) getInputs().values().iterator().next() + .getReader(); + while (kvReader.next()) { + Object sum = kvReader.getCurrentKey(); + for (Object word : kvReader.getCurrentValues()) { + kvWriter.write(word, sum); + } + } + // deriving from SimpleMRProcessor takes care of committing the output + } + } + + private DAG createDAG(TezConfiguration tezConf, Map localResources, + String inputPath, String outputPath, int numPartitions) throws IOException { + + DataSourceDescriptor dataSource = MRInput.createConfigurer(new Configuration(tezConf), + TextInputFormat.class, inputPath).create(); + + DataSinkDescriptor dataSink = MROutput.createConfigurer(new Configuration(tezConf), + TextOutputFormat.class, outputPath).create(); + + Vertex tokenizerVertex = new Vertex("Tokenizer", new ProcessorDescriptor( + TokenProcessor.class.getName()), -1, MRHelpers.getMapResource(tezConf)); + tokenizerVertex.addDataSource("MRInput", dataSource); + + Vertex summationVertex = new Vertex("Summation", + new ProcessorDescriptor( + SumProcessor.class.getName()), numPartitions, MRHelpers.getReduceResource(tezConf)); + + // 1 task for global sorted order + Vertex sorterVertex = new Vertex("Sorter", + new ProcessorDescriptor( + NoOpSorter.class.getName()), 1, MRHelpers.getReduceResource(tezConf)); + sorterVertex.addDataSink("MROutput", dataSink); + + OrderedPartitionedKVEdgeConfigurer summationEdgeConf = OrderedPartitionedKVEdgeConfigurer + .newBuilder(Text.class.getName(), IntWritable.class.getName(), + HashPartitioner.class.getName()).build(); + + OrderedPartitionedKVEdgeConfigurer sorterEdgeConf = OrderedPartitionedKVEdgeConfigurer + .newBuilder(IntWritable.class.getName(), Text.class.getName(), + HashPartitioner.class.getName()).build(); + + // No need to add jar containing this class as assumed to be part of the tez jars. + + DAG dag = new DAG("OrderedWordCount"); + dag.addVertex(tokenizerVertex) + .addVertex(summationVertex) + .addVertex(sorterVertex) + .addEdge( + new Edge(tokenizerVertex, summationVertex, summationEdgeConf.createDefaultEdgeProperty())) + .addEdge( + new Edge(summationVertex, sorterVertex, sorterEdgeConf.createDefaultEdgeProperty())); + return dag; + } + + private static void printUsage() { + System.err.println("Usage: " + " orderedwordcount in out [numPartitions]"); + ToolRunner.printGenericCommandUsage(System.err); + } + + public boolean run(String inputPath, String outputPath, Configuration conf, + int numPartitions) throws Exception { + System.out.println("Running OrderedWordCount"); + TezConfiguration tezConf; + if (conf != null) { + tezConf = new TezConfiguration(conf); + } else { + tezConf = new TezConfiguration(); + } + + TezClient tezClient = new TezClient("OrderedWordCount", tezConf); + tezClient.start(); + + try { + Map localResources = + new TreeMap(); + + DAG dag = createDAG(tezConf, localResources, inputPath, outputPath, numPartitions); + + tezClient.waitTillReady(); + DAGClient dagClient = tezClient.submitDAG(dag); + + // monitoring + DAGStatus dagStatus = dagClient.waitForCompletionWithAllStatusUpdates(null); + if (dagStatus.getState() != DAGStatus.State.SUCCEEDED) { + System.out.println("OrderedWordCount failed with diagnostics: " + dagStatus.getDiagnostics()); + return false; + } + return true; + } finally { + tezClient.stop(); + } + } + + @Override + public int run(String[] args) throws Exception { + Configuration conf = getConf(); + String [] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); + + if (otherArgs.length < 2 || otherArgs.length > 3) { + printUsage(); + return 2; + } + OrderedWordCount job = new OrderedWordCount(); + if (job.run(otherArgs[0], otherArgs[1], conf, + (otherArgs.length == 3 ? Integer.parseInt(otherArgs[2]) : 1))) { + return 0; + } + return 1; + } + + public static void main(String[] args) throws Exception { + int res = ToolRunner.run(new Configuration(), new OrderedWordCount(), args); + System.exit(res); + } +} http://git-wip-us.apache.org/repos/asf/tez/blob/ed32980f/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/ExampleDriver.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/ExampleDriver.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/ExampleDriver.java index 636e83b..daebf7c 100644 --- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/ExampleDriver.java +++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/ExampleDriver.java @@ -31,6 +31,7 @@ import org.apache.tez.dag.api.client.DAGStatus; import org.apache.tez.dag.api.client.Progress; import org.apache.tez.dag.api.client.StatusGetOpts; import org.apache.tez.dag.api.client.VertexStatus; +import org.apache.tez.examples.OrderedWordCount; import org.apache.tez.mapreduce.examples.terasort.TeraGen; import org.apache.tez.mapreduce.examples.terasort.TeraSort; import org.apache.tez.mapreduce.examples.terasort.TeraValidate; @@ -79,6 +80,8 @@ public class ExampleDriver { "MRR Sleep Job"); pgd.addClass("orderedwordcount", OrderedWordCount.class, "Word Count with words sorted on frequency"); + pgd.addClass("testorderedwordcount", TestOrderedWordCount.class, + "Word Count with words sorted on frequency"); pgd.addClass("unionexample", UnionExample.class, "Union example"); pgd.addClass("broadcastAndOneToOneExample", BroadcastAndOneToOneExample.class, http://git-wip-us.apache.org/repos/asf/tez/blob/ed32980f/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/OrderedWordCount.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/OrderedWordCount.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/OrderedWordCount.java deleted file mode 100644 index 1d72745..0000000 --- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/OrderedWordCount.java +++ /dev/null @@ -1,527 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tez.mapreduce.examples; - -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.util.ArrayList; -import java.util.EnumSet; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.StringTokenizer; -import java.util.TreeMap; - -import org.apache.commons.cli.ParseException; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.conf.Configured; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.IntWritable; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapred.FileAlreadyExistsException; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapreduce.Mapper; -import org.apache.hadoop.mapreduce.Reducer; -import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; -import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; -import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; -import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; -import org.apache.hadoop.mapreduce.security.TokenCache; -import org.apache.hadoop.security.Credentials; -import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.util.GenericOptionsParser; -import org.apache.hadoop.util.Tool; -import org.apache.hadoop.util.ToolRunner; -import org.apache.hadoop.yarn.api.records.LocalResource; -import org.apache.tez.client.PreWarmContext; -import org.apache.tez.client.TezClientUtils; -import org.apache.tez.client.TezClient; -import org.apache.tez.dag.api.DAG; -import org.apache.tez.dag.api.Edge; -import org.apache.tez.dag.api.InputInitializerDescriptor; -import org.apache.tez.dag.api.ProcessorDescriptor; -import org.apache.tez.dag.api.TezConfiguration; -import org.apache.tez.dag.api.TezException; -import org.apache.tez.dag.api.Vertex; -import org.apache.tez.dag.api.VertexLocationHint; -import org.apache.tez.dag.api.client.DAGClient; -import org.apache.tez.dag.api.client.DAGStatus; -import org.apache.tez.dag.api.client.StatusGetOpts; -import org.apache.tez.mapreduce.common.MRInputAMSplitGenerator; -import org.apache.tez.mapreduce.examples.helpers.SplitsInClientOptionParser; -import org.apache.tez.mapreduce.hadoop.InputSplitInfo; -import org.apache.tez.mapreduce.hadoop.MRHelpers; -import org.apache.tez.mapreduce.hadoop.MRJobConfig; -import org.apache.tez.mapreduce.processor.map.MapProcessor; -import org.apache.tez.mapreduce.processor.reduce.ReduceProcessor; -import org.apache.tez.runtime.api.TezRootInputInitializer; -import org.apache.tez.runtime.library.api.TezRuntimeConfiguration; -import org.apache.tez.runtime.library.conf.OrderedPartitionedKVEdgeConfigurer; -import org.apache.tez.runtime.library.partitioner.HashPartitioner; -import org.apache.tez.runtime.library.processor.SleepProcessor; - -import com.google.common.annotations.VisibleForTesting; - -/** - * An MRR job built on top of word count to return words sorted by - * their frequency of occurrence. - * - * Use -DUSE_TEZ_SESSION=true to run jobs in a session mode. - * If multiple input/outputs are provided, this job will process each pair - * as a separate DAG in a sequential manner. - * Use -DINTER_JOB_SLEEP_INTERVAL= where N is the sleep interval in seconds - * between the sequential DAGs. - */ -public class OrderedWordCount extends Configured implements Tool { - - private static Log LOG = LogFactory.getLog(OrderedWordCount.class); - - public static class TokenizerMapper - extends Mapper{ - - private final static IntWritable one = new IntWritable(1); - private Text word = new Text(); - - public void map(Object key, Text value, Context context - ) throws IOException, InterruptedException { - StringTokenizer itr = new StringTokenizer(value.toString()); - while (itr.hasMoreTokens()) { - word.set(itr.nextToken()); - context.write(word, one); - } - } - } - - public static class IntSumReducer - extends Reducer { - private IntWritable result = new IntWritable(); - - public void reduce(Text key, Iterable values, - Context context - ) throws IOException, InterruptedException { - int sum = 0; - for (IntWritable val : values) { - sum += val.get(); - } - result.set(sum); - context.write(result, key); - } - } - - /** - * Shuffle ensures ordering based on count of employees per department - * hence the final reducer is a no-op and just emits the department name - * with the employee count per department. - */ - public static class MyOrderByNoOpReducer - extends Reducer { - - public void reduce(IntWritable key, Iterable values, - Context context - ) throws IOException, InterruptedException { - for (Text word : values) { - context.write(word, key); - } - } - } - - private Credentials credentials = new Credentials(); - - @VisibleForTesting - public DAG createDAG(FileSystem fs, Configuration conf, - Map commonLocalResources, Path stagingDir, - int dagIndex, String inputPath, String outputPath, - boolean generateSplitsInClient) throws Exception { - - Configuration mapStageConf = new JobConf(conf); - mapStageConf.set(MRJobConfig.MAP_CLASS_ATTR, - TokenizerMapper.class.getName()); - mapStageConf.set(MRJobConfig.INPUT_FORMAT_CLASS_ATTR, - TextInputFormat.class.getName()); - mapStageConf.set(FileInputFormat.INPUT_DIR, inputPath); - mapStageConf.setBoolean("mapred.mapper.new-api", true); - - InputSplitInfo inputSplitInfo = null; - if (generateSplitsInClient) { - inputSplitInfo = MRHelpers.generateInputSplits(mapStageConf, stagingDir); - mapStageConf.setInt(MRJobConfig.NUM_MAPS, inputSplitInfo.getNumTasks()); - } - - MRHelpers.translateVertexConfToTez(mapStageConf); - - Configuration iReduceStageConf = new JobConf(conf); - // TODO replace with auto-reduce parallelism - iReduceStageConf.setInt(MRJobConfig.NUM_REDUCES, 2); - iReduceStageConf.set(MRJobConfig.REDUCE_CLASS_ATTR, - IntSumReducer.class.getName()); - iReduceStageConf.set(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_CLASS, Text.class.getName()); - iReduceStageConf.set(TezRuntimeConfiguration.TEZ_RUNTIME_VALUE_CLASS, - IntWritable.class.getName()); - iReduceStageConf.setBoolean("mapred.mapper.new-api", true); - MRHelpers.translateVertexConfToTez(iReduceStageConf); - - Configuration finalReduceConf = new JobConf(conf); - finalReduceConf.setInt(MRJobConfig.NUM_REDUCES, 1); - finalReduceConf.set(MRJobConfig.REDUCE_CLASS_ATTR, - MyOrderByNoOpReducer.class.getName()); - finalReduceConf.set(MRJobConfig.OUTPUT_FORMAT_CLASS_ATTR, - TextOutputFormat.class.getName()); - finalReduceConf.set(FileOutputFormat.OUTDIR, outputPath); - finalReduceConf.setBoolean("mapred.mapper.new-api", true); - finalReduceConf.set(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_CLASS, IntWritable.class.getName()); - finalReduceConf.set(TezRuntimeConfiguration.TEZ_RUNTIME_VALUE_CLASS, Text.class.getName()); - MRHelpers.translateVertexConfToTez(finalReduceConf); - - MRHelpers.doJobClientMagic(mapStageConf); - MRHelpers.doJobClientMagic(iReduceStageConf); - MRHelpers.doJobClientMagic(finalReduceConf); - - List vertices = new ArrayList(); - - ByteArrayOutputStream outputStream = new ByteArrayOutputStream(4096); - mapStageConf.writeXml(outputStream); - String mapStageHistoryText = new String(outputStream.toByteArray(), "UTF-8"); - byte[] mapPayload = MRHelpers.createUserPayloadFromConf(mapStageConf); - byte[] mapInputPayload; - if (generateSplitsInClient) { - mapInputPayload = MRHelpers.createMRInputPayload(mapPayload); - } else { - mapInputPayload = MRHelpers.createMRInputPayloadWithGrouping(mapPayload); - } - int numMaps = generateSplitsInClient ? inputSplitInfo.getNumTasks() : -1; - Vertex mapVertex = new Vertex("initialmap", new ProcessorDescriptor( - MapProcessor.class.getName()).setUserPayload(mapPayload) - .setHistoryText(mapStageHistoryText), - numMaps, MRHelpers.getMapResource(mapStageConf)); - if (generateSplitsInClient) { - mapVertex.setLocationHint(new VertexLocationHint(inputSplitInfo.getTaskLocationHints())); - Map mapLocalResources = - new HashMap(); - mapLocalResources.putAll(commonLocalResources); - MRHelpers.updateLocalResourcesForInputSplits(fs, inputSplitInfo, - mapLocalResources); - mapVertex.setTaskLocalFiles(mapLocalResources); - } else { - mapVertex.setTaskLocalFiles(commonLocalResources); - } - - Class initializerClazz = generateSplitsInClient ? null - : MRInputAMSplitGenerator.class; - MRHelpers.addMRInput(mapVertex, mapInputPayload, - (initializerClazz==null) ? null : - new InputInitializerDescriptor(initializerClazz.getName())); - vertices.add(mapVertex); - - ByteArrayOutputStream iROutputStream = new ByteArrayOutputStream(4096); - iReduceStageConf.writeXml(iROutputStream); - String iReduceStageHistoryText = new String(iROutputStream.toByteArray(), "UTF-8"); - Vertex ivertex = new Vertex("intermediate_reducer", new ProcessorDescriptor( - ReduceProcessor.class.getName()) - .setUserPayload(MRHelpers.createUserPayloadFromConf(iReduceStageConf)) - .setHistoryText(iReduceStageHistoryText), - 2, MRHelpers.getReduceResource(iReduceStageConf)); - ivertex.setTaskLocalFiles(commonLocalResources); - vertices.add(ivertex); - - ByteArrayOutputStream finalReduceOutputStream = new ByteArrayOutputStream(4096); - finalReduceConf.writeXml(finalReduceOutputStream); - String finalReduceStageHistoryText = new String(finalReduceOutputStream.toByteArray(), "UTF-8"); - byte[] finalReducePayload = MRHelpers.createUserPayloadFromConf(finalReduceConf); - Vertex finalReduceVertex = new Vertex("finalreduce", - new ProcessorDescriptor( - ReduceProcessor.class.getName()) - .setUserPayload(finalReducePayload) - .setHistoryText(finalReduceStageHistoryText), 1, - MRHelpers.getReduceResource(finalReduceConf)); - finalReduceVertex.setTaskLocalFiles(commonLocalResources); - MRHelpers.addMROutputLegacy(finalReduceVertex, finalReducePayload); - vertices.add(finalReduceVertex); - - DAG dag = new DAG("OrderedWordCount" + dagIndex); - for (int i = 0; i < vertices.size(); ++i) { - dag.addVertex(vertices.get(i)); - } - - OrderedPartitionedKVEdgeConfigurer edgeConf1 = OrderedPartitionedKVEdgeConfigurer - .newBuilder(Text.class.getName(), IntWritable.class.getName(), - HashPartitioner.class.getName()).setFromConfiguration(conf) - .configureInput().useLegacyInput().done().build(); - dag.addEdge( - new Edge(dag.getVertex("initialmap"), dag.getVertex("intermediate_reducer"), - edgeConf1.createDefaultEdgeProperty())); - - OrderedPartitionedKVEdgeConfigurer edgeConf2 = OrderedPartitionedKVEdgeConfigurer - .newBuilder(IntWritable.class.getName(), Text.class.getName(), - HashPartitioner.class.getName()).setFromConfiguration(conf) - .configureInput().useLegacyInput().done().build(); - dag.addEdge( - new Edge(dag.getVertex("intermediate_reducer"), dag.getVertex("finalreduce"), - edgeConf2.createDefaultEdgeProperty())); - - return dag; - } - - private static void printUsage() { - String options = " [-generateSplitsInClient true/]"; - System.err.println("Usage: orderedwordcount " + options); - System.err.println("Usage (In Session Mode):" - + " orderedwordcount ... " + options); - ToolRunner.printGenericCommandUsage(System.err); - } - - - @Override - public int run(String[] args) throws Exception { - Configuration conf = getConf(); - String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); - - boolean generateSplitsInClient; - - SplitsInClientOptionParser splitCmdLineParser = new SplitsInClientOptionParser(); - try { - generateSplitsInClient = splitCmdLineParser.parse(otherArgs, false); - otherArgs = splitCmdLineParser.getRemainingArgs(); - } catch (ParseException e1) { - System.err.println("Invalid options"); - printUsage(); - return 2; - } - - boolean useTezSession = conf.getBoolean("USE_TEZ_SESSION", true); - long interJobSleepTimeout = conf.getInt("INTER_JOB_SLEEP_INTERVAL", 0) - * 1000; - - boolean retainStagingDir = conf.getBoolean("RETAIN_STAGING_DIR", false); - - if (((otherArgs.length%2) != 0) - || (!useTezSession && otherArgs.length != 2)) { - printUsage(); - return 2; - } - - List inputPaths = new ArrayList(); - List outputPaths = new ArrayList(); - - for (int i = 0; i < otherArgs.length; i+=2) { - inputPaths.add(otherArgs[i]); - outputPaths.add(otherArgs[i+1]); - } - - UserGroupInformation.setConfiguration(conf); - - TezConfiguration tezConf = new TezConfiguration(conf); - OrderedWordCount instance = new OrderedWordCount(); - - FileSystem fs = FileSystem.get(conf); - - String stagingDirStr = conf.get(TezConfiguration.TEZ_AM_STAGING_DIR, - TezConfiguration.TEZ_AM_STAGING_DIR_DEFAULT) + Path.SEPARATOR + - Long.toString(System.currentTimeMillis()); - Path stagingDir = new Path(stagingDirStr); - FileSystem pathFs = stagingDir.getFileSystem(tezConf); - pathFs.mkdirs(new Path(stagingDirStr)); - - tezConf.set(TezConfiguration.TEZ_AM_STAGING_DIR, stagingDirStr); - stagingDir = pathFs.makeQualified(new Path(stagingDirStr)); - - TokenCache.obtainTokensForNamenodes(instance.credentials, new Path[] {stagingDir}, conf); - TezClientUtils.ensureStagingDirExists(tezConf, stagingDir); - - // No need to add jar containing this class as assumed to be part of - // the tez jars. - - // TEZ-674 Obtain tokens based on the Input / Output paths. For now assuming staging dir - // is the same filesystem as the one used for Input/Output. - - if (useTezSession) { - LOG.info("Creating Tez Session"); - tezConf.setBoolean(TezConfiguration.TEZ_AM_SESSION_MODE, true); - } else { - tezConf.setBoolean(TezConfiguration.TEZ_AM_SESSION_MODE, false); - } - TezClient tezSession = new TezClient("OrderedWordCountSession", tezConf, - null, instance.credentials); - tezSession.start(); - - DAGStatus dagStatus = null; - DAGClient dagClient = null; - String[] vNames = { "initialmap", "intermediate_reducer", - "finalreduce" }; - - Set statusGetOpts = EnumSet.of(StatusGetOpts.GET_COUNTERS); - try { - for (int dagIndex = 1; dagIndex <= inputPaths.size(); ++dagIndex) { - if (dagIndex != 1 - && interJobSleepTimeout > 0) { - try { - LOG.info("Sleeping between jobs, sleepInterval=" - + (interJobSleepTimeout/1000)); - Thread.sleep(interJobSleepTimeout); - } catch (InterruptedException e) { - LOG.info("Main thread interrupted. Breaking out of job loop"); - break; - } - } - - String inputPath = inputPaths.get(dagIndex-1); - String outputPath = outputPaths.get(dagIndex-1); - - if (fs.exists(new Path(outputPath))) { - throw new FileAlreadyExistsException("Output directory " - + outputPath + " already exists"); - } - LOG.info("Running OrderedWordCount DAG" - + ", dagIndex=" + dagIndex - + ", inputPath=" + inputPath - + ", outputPath=" + outputPath); - - Map localResources = - new TreeMap(); - - DAG dag = instance.createDAG(fs, conf, localResources, - stagingDir, dagIndex, inputPath, outputPath, - generateSplitsInClient); - - boolean doPreWarm = dagIndex == 1 && useTezSession - && conf.getBoolean("PRE_WARM_SESSION", true); - int preWarmNumContainers = 0; - if (doPreWarm) { - preWarmNumContainers = conf.getInt("PRE_WARM_NUM_CONTAINERS", 0); - if (preWarmNumContainers <= 0) { - doPreWarm = false; - } - } - if (doPreWarm) { - LOG.info("Pre-warming Session"); - VertexLocationHint vertexLocationHint = - new VertexLocationHint(null); - ProcessorDescriptor sleepProcDescriptor = - new ProcessorDescriptor(SleepProcessor.class.getName()); - SleepProcessor.SleepProcessorConfig sleepProcessorConfig = - new SleepProcessor.SleepProcessorConfig(4000); - sleepProcDescriptor.setUserPayload( - sleepProcessorConfig.toUserPayload()); - PreWarmContext context = new PreWarmContext(sleepProcDescriptor, - dag.getVertex("initialmap").getTaskResource(), preWarmNumContainers, - vertexLocationHint); - - Map contextLocalRsrcs = - new TreeMap(); - contextLocalRsrcs.putAll( - dag.getVertex("initialmap").getTaskLocalFiles()); - Map contextEnv = new TreeMap(); - contextEnv.putAll(dag.getVertex("initialmap").getTaskEnvironment()); - String contextJavaOpts = - dag.getVertex("initialmap").getTaskLaunchCmdOpts(); - context - .setLocalResources(contextLocalRsrcs) - .setJavaOpts(contextJavaOpts) - .setEnvironment(contextEnv); - - tezSession.preWarm(context); - } - - if (useTezSession) { - LOG.info("Waiting for TezSession to get into ready state"); - waitForTezSessionReady(tezSession); - LOG.info("Submitting DAG to Tez Session, dagIndex=" + dagIndex); - dagClient = tezSession.submitDAG(dag); - LOG.info("Submitted DAG to Tez Session, dagIndex=" + dagIndex); - } else { - LOG.info("Submitting DAG as a new Tez Application"); - dagClient = tezSession.submitDAG(dag); - } - - while (true) { - dagStatus = dagClient.getDAGStatus(statusGetOpts); - if (dagStatus.getState() == DAGStatus.State.RUNNING || - dagStatus.getState() == DAGStatus.State.SUCCEEDED || - dagStatus.getState() == DAGStatus.State.FAILED || - dagStatus.getState() == DAGStatus.State.KILLED || - dagStatus.getState() == DAGStatus.State.ERROR) { - break; - } - try { - Thread.sleep(500); - } catch (InterruptedException e) { - // continue; - } - } - - - while (dagStatus.getState() != DAGStatus.State.SUCCEEDED && - dagStatus.getState() != DAGStatus.State.FAILED && - dagStatus.getState() != DAGStatus.State.KILLED && - dagStatus.getState() != DAGStatus.State.ERROR) { - if (dagStatus.getState() == DAGStatus.State.RUNNING) { - ExampleDriver.printDAGStatus(dagClient, vNames); - } - try { - try { - Thread.sleep(1000); - } catch (InterruptedException e) { - // continue; - } - dagStatus = dagClient.getDAGStatus(statusGetOpts); - } catch (TezException e) { - LOG.fatal("Failed to get application progress. Exiting"); - return -1; - } - } - ExampleDriver.printDAGStatus(dagClient, vNames, - true, true); - LOG.info("DAG " + dagIndex + " completed. " - + "FinalState=" + dagStatus.getState()); - if (dagStatus.getState() != DAGStatus.State.SUCCEEDED) { - LOG.info("DAG " + dagIndex + " diagnostics: " - + dagStatus.getDiagnostics()); - } - } - } catch (Exception e) { - LOG.error("Error occurred when submitting/running DAGs", e); - throw e; - } finally { - if (!retainStagingDir) { - pathFs.delete(stagingDir, true); - } - LOG.info("Shutting down session"); - tezSession.stop(); - } - - if (!useTezSession) { - ExampleDriver.printDAGStatus(dagClient, vNames); - LOG.info("Application completed. " + "FinalState=" + dagStatus.getState()); - } - return dagStatus.getState() == DAGStatus.State.SUCCEEDED ? 0 : 1; - } - - private static void waitForTezSessionReady(TezClient tezSession) - throws IOException, TezException, InterruptedException { - tezSession.waitTillReady(); - } - - public static void main(String[] args) throws Exception { - int res = ToolRunner.run(new Configuration(), new OrderedWordCount(), args); - System.exit(res); - } -} http://git-wip-us.apache.org/repos/asf/tez/blob/ed32980f/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/WordCount.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/WordCount.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/WordCount.java index cb13025..593eeff 100644 --- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/WordCount.java +++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/WordCount.java @@ -18,24 +18,17 @@ package org.apache.tez.mapreduce.examples; import java.io.IOException; -import java.util.Map; import java.util.StringTokenizer; -import java.util.TreeMap; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapred.FileAlreadyExistsException; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; -import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.GenericOptionsParser; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; -import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.tez.client.TezClient; import org.apache.tez.dag.api.DAG; import org.apache.tez.dag.api.DataSinkDescriptor; @@ -60,10 +53,11 @@ import org.apache.tez.runtime.library.conf.OrderedPartitionedKVEdgeConfigurer; import com.google.common.base.Preconditions; import org.apache.tez.runtime.library.partitioner.HashPartitioner; +import org.apache.tez.runtime.library.processor.SimpleProcessor; public class WordCount extends Configured implements Tool { - public static class TokenProcessor extends SimpleMRProcessor { + public static class TokenProcessor extends SimpleProcessor { IntWritable one = new IntWritable(1); Text word = new Text(); @@ -98,8 +92,8 @@ public class WordCount extends Configured implements Tool { @Override public void run() throws Exception { Preconditions.checkArgument(getInputs().size() == 1); - MROutput out = (MROutput) getOutputs().values().iterator().next(); - KeyValueWriter kvWriter = out.getWriter(); + Preconditions.checkArgument(getOutputs().size() == 1); + KeyValueWriter kvWriter = (KeyValueWriter) getOutputs().values().iterator().next().getWriter(); KeyValuesReader kvReader = (KeyValuesReader) getInputs().values().iterator().next() .getReader(); while (kvReader.next()) { @@ -110,12 +104,12 @@ public class WordCount extends Configured implements Tool { } kvWriter.write(word, new IntWritable(sum)); } + // deriving from SimpleMRProcessor takes care of committing the output } } - private DAG createDAG(FileSystem fs, TezConfiguration tezConf, - Map localResources, Path stagingDir, - String inputPath, String outputPath) throws IOException { + private DAG createDAG(TezConfiguration tezConf, String inputPath, String outputPath, + int numPartitions) throws IOException { DataSourceDescriptor dataSource = MRInput.createConfigurer(new Configuration(tezConf), TextInputFormat.class, inputPath).create(); @@ -123,19 +117,21 @@ public class WordCount extends Configured implements Tool { DataSinkDescriptor dataSink = MROutput.createConfigurer(new Configuration(tezConf), TextOutputFormat.class, outputPath).create(); - Vertex tokenizerVertex = new Vertex("tokenizer", new ProcessorDescriptor( + Vertex tokenizerVertex = new Vertex("Tokenizer", new ProcessorDescriptor( TokenProcessor.class.getName()), -1, MRHelpers.getMapResource(tezConf)); - tokenizerVertex.addDataSource("MRInput", dataSource); + tokenizerVertex.addDataSource("Input", dataSource); - Vertex summerVertex = new Vertex("summer", + Vertex summerVertex = new Vertex("Summer", new ProcessorDescriptor( - SumProcessor.class.getName()), 1, MRHelpers.getReduceResource(tezConf)); - summerVertex.addDataSink("MROutput", dataSink); + SumProcessor.class.getName()), numPartitions, MRHelpers.getReduceResource(tezConf)); + summerVertex.addDataSink("Output", dataSink); OrderedPartitionedKVEdgeConfigurer edgeConf = OrderedPartitionedKVEdgeConfigurer .newBuilder(Text.class.getName(), IntWritable.class.getName(), HashPartitioner.class.getName()).build(); + // No need to add jar containing this class as assumed to be part of + // the tez jars. DAG dag = new DAG("WordCount"); dag.addVertex(tokenizerVertex) .addVertex(summerVertex) @@ -145,66 +141,38 @@ public class WordCount extends Configured implements Tool { } private static void printUsage() { - System.err.println("Usage: " + " wordcount "); + System.err.println("Usage: " + " wordcount in out [numPartitions]"); ToolRunner.printGenericCommandUsage(System.err); } - public boolean run(String inputPath, String outputPath, Configuration conf) throws Exception { + public boolean run(String inputPath, String outputPath, Configuration conf, + int numPartitions) throws Exception { System.out.println("Running WordCount"); - // conf and UGI TezConfiguration tezConf; if (conf != null) { tezConf = new TezConfiguration(conf); } else { tezConf = new TezConfiguration(); } - UserGroupInformation.setConfiguration(tezConf); - String user = UserGroupInformation.getCurrentUser().getShortUserName(); - - // staging dir - FileSystem fs = FileSystem.get(tezConf); - String stagingDirStr = Path.SEPARATOR + "user" + Path.SEPARATOR - + user + Path.SEPARATOR+ ".staging" + Path.SEPARATOR - + Path.SEPARATOR + Long.toString(System.currentTimeMillis()); - Path stagingDir = new Path(stagingDirStr); - tezConf.set(TezConfiguration.TEZ_AM_STAGING_DIR, stagingDirStr); - stagingDir = fs.makeQualified(stagingDir); - - // No need to add jar containing this class as assumed to be part of - // the tez jars. - // TEZ-674 Obtain tokens based on the Input / Output paths. For now assuming staging dir - // is the same filesystem as the one used for Input/Output. - TezClient tezSession = new TezClient("WordCountSession", tezConf); tezSession.start(); - DAGClient dagClient = null; try { - if (fs.exists(new Path(outputPath))) { - throw new FileAlreadyExistsException("Output directory " - + outputPath + " already exists"); - } - - Map localResources = - new TreeMap(); - - DAG dag = createDAG(fs, tezConf, localResources, - stagingDir, inputPath, outputPath); + DAG dag = createDAG(tezConf, inputPath, outputPath, numPartitions); tezSession.waitTillReady(); - dagClient = tezSession.submitDAG(dag); + DAGClient dagClient = tezSession.submitDAG(dag); // monitoring DAGStatus dagStatus = dagClient.waitForCompletionWithAllStatusUpdates(null); if (dagStatus.getState() != DAGStatus.State.SUCCEEDED) { - System.out.println("DAG diagnostics: " + dagStatus.getDiagnostics()); + System.out.println("WordCount failed with diagnostics: " + dagStatus.getDiagnostics()); return false; } return true; } finally { - fs.delete(stagingDir, true); tezSession.stop(); } } @@ -213,14 +181,16 @@ public class WordCount extends Configured implements Tool { public int run(String[] args) throws Exception { Configuration conf = getConf(); String [] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); - - if (otherArgs.length != 2) { + if (otherArgs.length < 2 || otherArgs.length > 3) { printUsage(); return 2; } WordCount job = new WordCount(); - job.run(otherArgs[0], otherArgs[1], conf); - return 0; + if (job.run(otherArgs[0], otherArgs[1], conf, + (otherArgs.length == 3 ? Integer.parseInt(otherArgs[2]) : 1))) { + return 0; + } + return 1; } public static void main(String[] args) throws Exception { http://git-wip-us.apache.org/repos/asf/tez/blob/ed32980f/tez-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java ---------------------------------------------------------------------- diff --git a/tez-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java b/tez-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java index 7f7bd1e..4228382 100644 --- a/tez-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java +++ b/tez-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java @@ -18,6 +18,11 @@ package org.apache.tez.mapreduce; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + import java.io.File; import java.io.IOException; import java.io.InputStream; @@ -68,6 +73,8 @@ import org.apache.tez.client.TezClientUtils; import org.apache.tez.client.TezClient; import org.apache.tez.client.TezAppMasterStatus; import org.apache.tez.common.ReflectionUtils; +import org.apache.tez.common.counters.FileSystemCounter; +import org.apache.tez.common.counters.TaskCounter; import org.apache.tez.dag.api.DAG; import org.apache.tez.dag.api.Edge; import org.apache.tez.dag.api.EdgeProperty; @@ -85,9 +92,12 @@ import org.apache.tez.dag.api.Vertex; import org.apache.tez.dag.api.VertexLocationHint; import org.apache.tez.dag.api.client.DAGClient; import org.apache.tez.dag.api.client.DAGStatus; +import org.apache.tez.dag.api.client.StatusGetOpts; import org.apache.tez.dag.api.client.DAGStatus.State; +import org.apache.tez.dag.history.logging.impl.SimpleHistoryLoggingService; import org.apache.tez.mapreduce.common.MRInputAMSplitGenerator; import org.apache.tez.mapreduce.examples.BroadcastAndOneToOneExample; +import org.apache.tez.mapreduce.examples.ExampleDriver; import org.apache.tez.mapreduce.examples.MRRSleepJob; import org.apache.tez.mapreduce.examples.MRRSleepJob.ISleepReducer; import org.apache.tez.mapreduce.examples.MRRSleepJob.MRRSleepJobPartitioner; @@ -106,6 +116,8 @@ import org.apache.tez.runtime.api.TezRootInputInitializer; import org.apache.tez.runtime.api.TezRootInputInitializerContext; import org.apache.tez.runtime.library.input.ShuffledMergedInputLegacy; import org.apache.tez.runtime.library.output.OnFileSortedOutput; +import org.apache.tez.runtime.library.processor.SleepProcessor; +import org.apache.tez.runtime.library.processor.SleepProcessor.SleepProcessorConfig; import org.apache.tez.test.MiniTezCluster; import org.junit.AfterClass; import org.junit.Assert; @@ -113,6 +125,7 @@ import org.junit.BeforeClass; import org.junit.Test; import com.google.common.base.Preconditions; +import com.google.common.collect.Sets; public class TestMRRJobsDAGApi { @@ -163,6 +176,190 @@ public class TestMRRJobsDAGApi { } // TODO Add cleanup code. } + + @Test(timeout = 60000) + public void testSleepJob() throws TezException, IOException, InterruptedException { + SleepProcessorConfig spConf = new SleepProcessorConfig(1); + + DAG dag = new DAG("TezSleepProcessor"); + Vertex vertex = new Vertex("SleepVertex", new ProcessorDescriptor( + SleepProcessor.class.getName()).setUserPayload(spConf.toUserPayload()), 1, + Resource.newInstance(1024, 1)); + dag.addVertex(vertex); + + TezConfiguration tezConf = new TezConfiguration(mrrTezCluster.getConfig()); + Path remoteStagingDir = remoteFs.makeQualified(new Path("/tmp", String.valueOf(random + .nextInt(100000)))); + remoteFs.mkdirs(remoteStagingDir); + tezConf.set(TezConfiguration.TEZ_AM_STAGING_DIR, remoteStagingDir.toString()); + + TezClient tezSession = new TezClient("TezSleepProcessor", tezConf, false); + tezSession.start(); + + DAGClient dagClient = tezSession.submitDAG(dag); + + DAGStatus dagStatus = dagClient.getDAGStatus(null); + while (!dagStatus.isCompleted()) { + LOG.info("Waiting for job to complete. Sleeping for 500ms." + " Current state: " + + dagStatus.getState()); + Thread.sleep(500l); + dagStatus = dagClient.getDAGStatus(null); + } + dagStatus = dagClient.getDAGStatus(Sets.newHashSet(StatusGetOpts.GET_COUNTERS)); + + assertEquals(DAGStatus.State.SUCCEEDED, dagStatus.getState()); + assertNotNull(dagStatus.getDAGCounters()); + assertNotNull(dagStatus.getDAGCounters().getGroup(FileSystemCounter.class.getName())); + assertNotNull(dagStatus.getDAGCounters().findCounter(TaskCounter.GC_TIME_MILLIS)); + ExampleDriver.printDAGStatus(dagClient, new String[] { "SleepVertex" }, true, true); + tezSession.stop(); + } + + @Test(timeout = 100000) + public void testMultipleDAGsWithDuplicateName() throws TezException, IOException, + InterruptedException { + TezClient tezSession = null; + try { + TezConfiguration tezConf = new TezConfiguration(mrrTezCluster.getConfig()); + Path remoteStagingDir = remoteFs.makeQualified(new Path("/tmp", String.valueOf(random + .nextInt(100000)))); + remoteFs.mkdirs(remoteStagingDir); + tezConf.set(TezConfiguration.TEZ_AM_STAGING_DIR, remoteStagingDir.toString()); + tezSession = new TezClient("OrderedWordCountSession", tezConf, true); + tezSession.start(); + + SleepProcessorConfig spConf = new SleepProcessorConfig(1); + for (int dagIndex = 1; dagIndex <= 2; dagIndex++) { + DAG dag = new DAG("TezSleepProcessor"); + Vertex vertex = new Vertex("SleepVertex", new ProcessorDescriptor( + SleepProcessor.class.getName()).setUserPayload(spConf.toUserPayload()), 1, + Resource.newInstance(1024, 1)); + dag.addVertex(vertex); + + DAGClient dagClient = null; + try { + dagClient = tezSession.submitDAG(dag); + if (dagIndex > 1) { + fail("Should fail due to duplicate dag name for dagIndex: " + dagIndex); + } + } catch (TezException tex) { + if (dagIndex > 1) { + assertTrue(tex.getMessage().contains("Duplicate dag name ")); + continue; + } + fail("DuplicateDAGName exception thrown for 1st DAG submission"); + } + DAGStatus dagStatus = dagClient.getDAGStatus(null); + while (!dagStatus.isCompleted()) { + LOG.debug("Waiting for job to complete. Sleeping for 500ms." + " Current state: " + + dagStatus.getState()); + Thread.sleep(500l); + dagStatus = dagClient.getDAGStatus(null); + } + } + } finally { + if (tezSession != null) { + tezSession.stop(); + } + } + } + + + @Test + public void testNonDefaultFSStagingDir() throws Exception { + SleepProcessorConfig spConf = new SleepProcessorConfig(1); + + DAG dag = new DAG("TezSleepProcessor"); + Vertex vertex = new Vertex("SleepVertex", new ProcessorDescriptor( + SleepProcessor.class.getName()).setUserPayload(spConf.toUserPayload()), 1, + Resource.newInstance(1024, 1)); + dag.addVertex(vertex); + + TezConfiguration tezConf = new TezConfiguration(mrrTezCluster.getConfig()); + Path stagingDir = new Path(TEST_ROOT_DIR, "testNonDefaultFSStagingDir" + + String.valueOf(random.nextInt(100000))); + FileSystem localFs = FileSystem.getLocal(tezConf); + stagingDir = localFs.makeQualified(stagingDir); + localFs.mkdirs(stagingDir); + tezConf.set(TezConfiguration.TEZ_AM_STAGING_DIR, stagingDir.toString()); + + TezClient tezSession = new TezClient("TezSleepProcessor", tezConf, false); + tezSession.start(); + + DAGClient dagClient = tezSession.submitDAG(dag); + + DAGStatus dagStatus = dagClient.getDAGStatus(null); + while (!dagStatus.isCompleted()) { + LOG.info("Waiting for job to complete. Sleeping for 500ms." + " Current state: " + + dagStatus.getState()); + Thread.sleep(500l); + dagStatus = dagClient.getDAGStatus(null); + } + dagStatus = dagClient.getDAGStatus(Sets.newHashSet(StatusGetOpts.GET_COUNTERS)); + + assertEquals(DAGStatus.State.SUCCEEDED, dagStatus.getState()); + assertNotNull(dagStatus.getDAGCounters()); + assertNotNull(dagStatus.getDAGCounters().getGroup(FileSystemCounter.class.getName())); + assertNotNull(dagStatus.getDAGCounters().findCounter(TaskCounter.GC_TIME_MILLIS)); + ExampleDriver.printDAGStatus(dagClient, new String[] { "SleepVertex" }, true, true); + tezSession.stop(); + } + + // Submits a simple 5 stage sleep job using tez session. Then kills it. + @Test(timeout = 60000) + public void testHistoryLogging() throws IOException, + InterruptedException, TezException, ClassNotFoundException, YarnException { + SleepProcessorConfig spConf = new SleepProcessorConfig(1); + + DAG dag = new DAG("TezSleepProcessorHistoryLogging"); + Vertex vertex = new Vertex("SleepVertex", new ProcessorDescriptor( + SleepProcessor.class.getName()).setUserPayload(spConf.toUserPayload()), 2, + Resource.newInstance(1024, 1)); + dag.addVertex(vertex); + + TezConfiguration tezConf = new TezConfiguration(mrrTezCluster.getConfig()); + Path remoteStagingDir = remoteFs.makeQualified(new Path("/tmp", String.valueOf(random + .nextInt(100000)))); + remoteFs.mkdirs(remoteStagingDir); + tezConf.set(TezConfiguration.TEZ_AM_STAGING_DIR, remoteStagingDir.toString()); + + FileSystem localFs = FileSystem.getLocal(tezConf); + Path historyLogDir = new Path(TEST_ROOT_DIR, "testHistoryLogging"); + localFs.mkdirs(historyLogDir); + + tezConf.set(TezConfiguration.TEZ_SIMPLE_HISTORY_LOGGING_DIR, + localFs.makeQualified(historyLogDir).toString()); + + tezConf.setBoolean(TezConfiguration.TEZ_AM_SESSION_MODE, false); + TezClient tezSession = new TezClient("TezSleepProcessorHistoryLogging", tezConf); + tezSession.start(); + + DAGClient dagClient = tezSession.submitDAG(dag); + + DAGStatus dagStatus = dagClient.getDAGStatus(null); + while (!dagStatus.isCompleted()) { + LOG.info("Waiting for job to complete. Sleeping for 500ms." + " Current state: " + + dagStatus.getState()); + Thread.sleep(500l); + dagStatus = dagClient.getDAGStatus(null); + } + assertEquals(DAGStatus.State.SUCCEEDED, dagStatus.getState()); + + FileStatus historyLogFileStatus = null; + for (FileStatus fileStatus : localFs.listStatus(historyLogDir)) { + if (fileStatus.isDirectory()) { + continue; + } + Path p = fileStatus.getPath(); + if (p.getName().startsWith(SimpleHistoryLoggingService.LOG_FILE_NAME_PREFIX)) { + historyLogFileStatus = fileStatus; + break; + } + } + Assert.assertNotNull(historyLogFileStatus); + Assert.assertTrue(historyLogFileStatus.getLen() > 0); + tezSession.stop(); + } // Submits a simple 5 stage sleep job using the DAG submit API instead of job // client. http://git-wip-us.apache.org/repos/asf/tez/blob/ed32980f/tez-tests/src/test/java/org/apache/tez/test/TestSecureShuffle.java ---------------------------------------------------------------------- diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestSecureShuffle.java b/tez-tests/src/test/java/org/apache/tez/test/TestSecureShuffle.java index 9df1580..a947d4b 100644 --- a/tez-tests/src/test/java/org/apache/tez/test/TestSecureShuffle.java +++ b/tez-tests/src/test/java/org/apache/tez/test/TestSecureShuffle.java @@ -34,7 +34,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.mapreduce.MRConfig; import org.apache.hadoop.security.ssl.KeyStoreTestUtil; -import org.apache.tez.mapreduce.examples.OrderedWordCount; +import org.apache.tez.mapreduce.examples.TestOrderedWordCount; import org.apache.tez.runtime.library.api.TezRuntimeConfiguration; import org.junit.After; import org.junit.Before; @@ -141,7 +141,7 @@ public class TestSecureShuffle { TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_ENABLE_SSL, true); } - OrderedWordCount wordCount = new OrderedWordCount(); + TestOrderedWordCount wordCount = new TestOrderedWordCount(); wordCount.setConf(new Configuration(miniTezCluster.getConfig())); String[] args = new String[] { inputLoc.toString(), outputLoc.toString() }; http://git-wip-us.apache.org/repos/asf/tez/blob/ed32980f/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java ---------------------------------------------------------------------- diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java b/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java index e35bc07..2515ecb 100644 --- a/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java +++ b/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java @@ -19,7 +19,6 @@ package org.apache.tez.test; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -30,10 +29,7 @@ import java.io.IOException; import java.io.InputStreamReader; import java.io.OutputStreamWriter; import java.util.HashSet; -import java.util.Map; -import java.util.Random; import java.util.Set; -import java.util.TreeMap; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -45,38 +41,20 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.hdfs.MiniDFSCluster; -import org.apache.hadoop.io.IntWritable; -import org.apache.hadoop.yarn.api.records.LocalResource; -import org.apache.hadoop.yarn.api.records.Resource; -import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.tez.client.TezClient; -import org.apache.tez.common.counters.FileSystemCounter; -import org.apache.tez.common.counters.TaskCounter; -import org.apache.tez.dag.api.DAG; -import org.apache.tez.dag.api.ProcessorDescriptor; import org.apache.tez.dag.api.TezConfiguration; -import org.apache.tez.dag.api.TezException; -import org.apache.tez.dag.api.Vertex; -import org.apache.tez.dag.api.client.DAGClient; -import org.apache.tez.dag.api.client.DAGStatus; -import org.apache.tez.dag.api.client.StatusGetOpts; -import org.apache.tez.dag.history.logging.impl.SimpleHistoryLoggingService; -import org.apache.tez.mapreduce.examples.ExampleDriver; +import org.apache.tez.examples.OrderedWordCount; import org.apache.tez.mapreduce.examples.IntersectDataGen; import org.apache.tez.mapreduce.examples.IntersectExample; import org.apache.tez.mapreduce.examples.IntersectValidate; -import org.apache.tez.mapreduce.examples.OrderedWordCount; -import org.apache.tez.runtime.library.processor.SleepProcessor; -import org.apache.tez.runtime.library.processor.SleepProcessor.SleepProcessorConfig; import org.junit.AfterClass; import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Test; -import com.google.common.collect.Sets; /** - * Tests which do not rely on Map/Reduce processor + * Tests for Tez example jobs * */ public class TestTezJobs { @@ -88,7 +66,6 @@ public class TestTezJobs { private static Configuration conf = new Configuration(); private static FileSystem remoteFs; - private Random random = new Random(); private static String TEST_ROOT_DIR = "target" + Path.SEPARATOR + TestTezJobs.class.getName() + "-tmpDir"; @@ -128,94 +105,6 @@ public class TestTezJobs { } @Test(timeout = 60000) - public void testSleepJob() throws TezException, IOException, InterruptedException { - SleepProcessorConfig spConf = new SleepProcessorConfig(1); - - DAG dag = new DAG("TezSleepProcessor"); - Vertex vertex = new Vertex("SleepVertex", new ProcessorDescriptor( - SleepProcessor.class.getName()).setUserPayload(spConf.toUserPayload()), 1, - Resource.newInstance(1024, 1)); - dag.addVertex(vertex); - - TezConfiguration tezConf = new TezConfiguration(mrrTezCluster.getConfig()); - Path remoteStagingDir = remoteFs.makeQualified(new Path("/tmp", String.valueOf(random - .nextInt(100000)))); - remoteFs.mkdirs(remoteStagingDir); - tezConf.set(TezConfiguration.TEZ_AM_STAGING_DIR, remoteStagingDir.toString()); - - TezClient tezSession = new TezClient("TezSleepProcessor", tezConf, false); - tezSession.start(); - - DAGClient dagClient = tezSession.submitDAG(dag); - - DAGStatus dagStatus = dagClient.getDAGStatus(null); - while (!dagStatus.isCompleted()) { - LOG.info("Waiting for job to complete. Sleeping for 500ms." + " Current state: " - + dagStatus.getState()); - Thread.sleep(500l); - dagStatus = dagClient.getDAGStatus(null); - } - dagStatus = dagClient.getDAGStatus(Sets.newHashSet(StatusGetOpts.GET_COUNTERS)); - - assertEquals(DAGStatus.State.SUCCEEDED, dagStatus.getState()); - assertNotNull(dagStatus.getDAGCounters()); - assertNotNull(dagStatus.getDAGCounters().getGroup(FileSystemCounter.class.getName())); - assertNotNull(dagStatus.getDAGCounters().findCounter(TaskCounter.GC_TIME_MILLIS)); - ExampleDriver.printDAGStatus(dagClient, new String[] { "SleepVertex" }, true, true); - tezSession.stop(); - } - - @Test(timeout = 100000) - public void testMultipleDAGsWithDuplicateName() throws TezException, IOException, - InterruptedException { - TezClient tezSession = null; - try { - TezConfiguration tezConf = new TezConfiguration(mrrTezCluster.getConfig()); - Path remoteStagingDir = remoteFs.makeQualified(new Path("/tmp", String.valueOf(random - .nextInt(100000)))); - remoteFs.mkdirs(remoteStagingDir); - tezConf.set(TezConfiguration.TEZ_AM_STAGING_DIR, remoteStagingDir.toString()); - tezSession = new TezClient("OrderedWordCountSession", tezConf, true); - tezSession.start(); - - SleepProcessorConfig spConf = new SleepProcessorConfig(1); - for (int dagIndex = 1; dagIndex <= 2; dagIndex++) { - DAG dag = new DAG("TezSleepProcessor"); - Vertex vertex = new Vertex("SleepVertex", new ProcessorDescriptor( - SleepProcessor.class.getName()).setUserPayload(spConf.toUserPayload()), 1, - Resource.newInstance(1024, 1)); - dag.addVertex(vertex); - - DAGClient dagClient = null; - try { - dagClient = tezSession.submitDAG(dag); - if (dagIndex > 1) { - fail("Should fail due to duplicate dag name for dagIndex: " + dagIndex); - } - } catch (TezException tex) { - if (dagIndex > 1) { - assertTrue(tex.getMessage().contains("Duplicate dag name ")); - continue; - } - fail("DuplicateDAGName exception thrown for 1st DAG submission"); - } - DAGStatus dagStatus = dagClient.getDAGStatus(null); - while (!dagStatus.isCompleted()) { - LOG.debug("Waiting for job to complete. Sleeping for 500ms." + " Current state: " - + dagStatus.getState()); - Thread.sleep(500l); - dagStatus = dagClient.getDAGStatus(null); - } - } - } finally { - if (tezSession != null) { - tezSession.stop(); - } - } - } - - - @Test(timeout = 60000) public void testIntersectExample() throws Exception { IntersectExample intersectExample = new IntersectExample(); intersectExample.setConf(new Configuration(mrrTezCluster.getConfig())); @@ -313,102 +202,6 @@ public class TestTezJobs { } } } - - @Test - public void testNonDefaultFSStagingDir() throws Exception { - SleepProcessorConfig spConf = new SleepProcessorConfig(1); - - DAG dag = new DAG("TezSleepProcessor"); - Vertex vertex = new Vertex("SleepVertex", new ProcessorDescriptor( - SleepProcessor.class.getName()).setUserPayload(spConf.toUserPayload()), 1, - Resource.newInstance(1024, 1)); - dag.addVertex(vertex); - - TezConfiguration tezConf = new TezConfiguration(mrrTezCluster.getConfig()); - Path stagingDir = new Path(TEST_ROOT_DIR, "testNonDefaultFSStagingDir" - + String.valueOf(random.nextInt(100000))); - FileSystem localFs = FileSystem.getLocal(tezConf); - stagingDir = localFs.makeQualified(stagingDir); - localFs.mkdirs(stagingDir); - tezConf.set(TezConfiguration.TEZ_AM_STAGING_DIR, stagingDir.toString()); - - TezClient tezSession = new TezClient("TezSleepProcessor", tezConf, false); - tezSession.start(); - - DAGClient dagClient = tezSession.submitDAG(dag); - - DAGStatus dagStatus = dagClient.getDAGStatus(null); - while (!dagStatus.isCompleted()) { - LOG.info("Waiting for job to complete. Sleeping for 500ms." + " Current state: " - + dagStatus.getState()); - Thread.sleep(500l); - dagStatus = dagClient.getDAGStatus(null); - } - dagStatus = dagClient.getDAGStatus(Sets.newHashSet(StatusGetOpts.GET_COUNTERS)); - - assertEquals(DAGStatus.State.SUCCEEDED, dagStatus.getState()); - assertNotNull(dagStatus.getDAGCounters()); - assertNotNull(dagStatus.getDAGCounters().getGroup(FileSystemCounter.class.getName())); - assertNotNull(dagStatus.getDAGCounters().findCounter(TaskCounter.GC_TIME_MILLIS)); - ExampleDriver.printDAGStatus(dagClient, new String[] { "SleepVertex" }, true, true); - tezSession.stop(); - } - - // Submits a simple 5 stage sleep job using tez session. Then kills it. - @Test(timeout = 60000) - public void testHistoryLogging() throws IOException, - InterruptedException, TezException, ClassNotFoundException, YarnException { - SleepProcessorConfig spConf = new SleepProcessorConfig(1); - - DAG dag = new DAG("TezSleepProcessorHistoryLogging"); - Vertex vertex = new Vertex("SleepVertex", new ProcessorDescriptor( - SleepProcessor.class.getName()).setUserPayload(spConf.toUserPayload()), 2, - Resource.newInstance(1024, 1)); - dag.addVertex(vertex); - - TezConfiguration tezConf = new TezConfiguration(mrrTezCluster.getConfig()); - Path remoteStagingDir = remoteFs.makeQualified(new Path("/tmp", String.valueOf(random - .nextInt(100000)))); - remoteFs.mkdirs(remoteStagingDir); - tezConf.set(TezConfiguration.TEZ_AM_STAGING_DIR, remoteStagingDir.toString()); - - FileSystem localFs = FileSystem.getLocal(tezConf); - Path historyLogDir = new Path(TEST_ROOT_DIR, "testHistoryLogging"); - localFs.mkdirs(historyLogDir); - - tezConf.set(TezConfiguration.TEZ_SIMPLE_HISTORY_LOGGING_DIR, - localFs.makeQualified(historyLogDir).toString()); - - tezConf.setBoolean(TezConfiguration.TEZ_AM_SESSION_MODE, false); - TezClient tezSession = new TezClient("TezSleepProcessorHistoryLogging", tezConf); - tezSession.start(); - - DAGClient dagClient = tezSession.submitDAG(dag); - - DAGStatus dagStatus = dagClient.getDAGStatus(null); - while (!dagStatus.isCompleted()) { - LOG.info("Waiting for job to complete. Sleeping for 500ms." + " Current state: " - + dagStatus.getState()); - Thread.sleep(500l); - dagStatus = dagClient.getDAGStatus(null); - } - assertEquals(DAGStatus.State.SUCCEEDED, dagStatus.getState()); - - FileStatus historyLogFileStatus = null; - for (FileStatus fileStatus : localFs.listStatus(historyLogDir)) { - if (fileStatus.isDirectory()) { - continue; - } - Path p = fileStatus.getPath(); - if (p.getName().startsWith(SimpleHistoryLoggingService.LOG_FILE_NAME_PREFIX)) { - historyLogFileStatus = fileStatus; - break; - } - } - Assert.assertNotNull(historyLogFileStatus); - Assert.assertTrue(historyLogFileStatus.getLen() > 0); - tezSession.stop(); - } private void generateOrderedWordCountInput(Path inputDir) throws IOException { Path dataPath1 = new Path(inputDir, "inPath1"); @@ -481,40 +274,14 @@ public class TestTezJobs { Path outputDir = new Path(outputDirStr); TezConfiguration tezConf = new TezConfiguration(mrrTezCluster.getConfig()); - Path simpleLogPath = new Path("/tmp/owc-logging/"); - remoteFs.mkdirs(simpleLogPath); - simpleLogPath = remoteFs.resolvePath(simpleLogPath); - tezConf.set(TezConfiguration.TEZ_SIMPLE_HISTORY_LOGGING_DIR, simpleLogPath.toString()); tezConf.set(TezConfiguration.TEZ_AM_STAGING_DIR, stagingDirPath.toString()); TezClient tezSession = null; try { - tezSession = new TezClient("OrderedWordCountSession", tezConf); - tezSession.start(); - tezSession.waitTillReady(); - - Map localResourceMap = new TreeMap(); - OrderedWordCount orderedWordCount = new OrderedWordCount(); - DAG dag = orderedWordCount.createDAG(remoteFs, tezConf, localResourceMap, stagingDirPath, - 1, inputDirStr, outputDirStr, false); + OrderedWordCount job = new OrderedWordCount(); + Assert.assertTrue("OrderedWordCount failed", job.run(inputDirStr, outputDirStr, tezConf, 2)); - DAGClient dagClient = tezSession.submitDAG(dag); - DAGStatus dagStatus = dagClient.getDAGStatus(null); - while (!dagStatus.isCompleted()) { - LOG.info("Waiting for job to complete. Sleeping for 500ms." + " Current state: " - + dagStatus.getState()); - Thread.sleep(500l); - dagStatus = dagClient.getDAGStatus(null); - } - dagStatus = dagClient.getDAGStatus(Sets.newHashSet(StatusGetOpts.GET_COUNTERS)); - - assertEquals(DAGStatus.State.SUCCEEDED, dagStatus.getState()); - assertTrue(remoteFs.exists(outputDir)); - if (tezSession != null) { - tezSession.stop(); - tezSession = null; - } FileStatus[] fileStatuses = remoteFs.listStatus(outputDir); Path resultFile = null; @@ -542,13 +309,8 @@ public class TestTezJobs { assertTrue(resultFile != null); assertTrue(foundSuccessFile); verifyOrderedWordCountOutput(resultFile); - - // check simple history log exists - FileStatus[] fileStatuses1 = remoteFs.listStatus(simpleLogPath); - Assert.assertEquals(1, fileStatuses1.length); - Assert.assertTrue(fileStatuses1[0].getPath().getName().startsWith( - SimpleHistoryLoggingService.LOG_FILE_NAME_PREFIX)); } finally { + remoteFs.delete(stagingDirPath, true); if (tezSession != null) { tezSession.stop(); }