Return-Path: X-Original-To: apmail-tez-commits-archive@minotaur.apache.org Delivered-To: apmail-tez-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id CB653182EF for ; Wed, 29 Apr 2015 10:24:49 +0000 (UTC) Received: (qmail 40823 invoked by uid 500); 29 Apr 2015 10:24:49 -0000 Delivered-To: apmail-tez-commits-archive@tez.apache.org Received: (qmail 40783 invoked by uid 500); 29 Apr 2015 10:24:49 -0000 Mailing-List: contact commits-help@tez.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@tez.apache.org Delivered-To: mailing list commits@tez.apache.org Received: (qmail 40774 invoked by uid 99); 29 Apr 2015 10:24:49 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 29 Apr 2015 10:24:49 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 89D89E008F; Wed, 29 Apr 2015 10:24:49 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: zjffdu@apache.org To: commits@tez.apache.org Message-Id: <03341498b62e400a9ab5357c839d5ea0@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: tez git commit: TEZ-2305. MR compatibility sleep job fails with IOException: Undefined job output-path (zjffdu) Date: Wed, 29 Apr 2015 10:24:49 +0000 (UTC) Repository: tez Updated Branches: refs/heads/branch-0.5 8b14a12d9 -> d72aab4be TEZ-2305. MR compatibility sleep job fails with IOException: Undefined job output-path (zjffdu) (cherry picked from commit 9ba4b1b18783ea9a7d7acb961fc04e6d47e34a40) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/d72aab4b Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/d72aab4b Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/d72aab4b Branch: refs/heads/branch-0.5 Commit: d72aab4be532dbc1593b4a0cbe3fd8b32ebe90d1 Parents: 8b14a12 Author: Jeff Zhang Authored: Wed Apr 29 18:23:05 2015 +0800 Committer: Jeff Zhang Committed: Wed Apr 29 18:24:17 2015 +0800 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../apache/tez/mapreduce/output/MROutput.java | 16 +- .../mapreduce/output/TestMROutputLegacy.java | 184 +++++++++++++++++++ 3 files changed, 198 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/d72aab4b/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 88ad211..fc9cf3e 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -4,6 +4,7 @@ Apache Tez Change Log Release 0.5.4: Unreleased ALL CHANGES: + TEZ-2305. MR compatibility sleep job fails with IOException: Undefined job output-path TEZ-2303. ConcurrentModificationException while processing recovery. TEZ-2334. ContainerManagementProtocolProxy modifies IPC timeout conf without making a copy. TEZ-2317. Event processing backlog can result in task failures for short http://git-wip-us.apache.org/repos/asf/tez/blob/d72aab4b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java index ab18dd7..55af06f 100644 --- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java +++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java @@ -111,7 +111,11 @@ public class MROutput extends AbstractLogicalOutput { } } else { outputFormatProvided = false; - useNewApi = conf.getBoolean(MRJobConfig.NEW_API_REDUCER_CONFIG, true); + if (conf.get(MRJobConfig.NEW_API_REDUCER_CONFIG) == null) { + useNewApi = conf.getBoolean(MRJobConfig.NEW_API_MAPPER_CONFIG, true); + } else { + useNewApi = conf.getBoolean(MRJobConfig.NEW_API_REDUCER_CONFIG, true); + } try { if (useNewApi) { String outputClass = conf.get(MRJobConfig.OUTPUT_FORMAT_CLASS_ATTR); @@ -231,8 +235,10 @@ public class MROutput extends AbstractLogicalOutput { * Creates the user payload to be set on the OutputDescriptor for MROutput */ private UserPayload createUserPayload() { + // set which api is being used always + conf.setBoolean(MRJobConfig.NEW_API_REDUCER_CONFIG, useNewApi); + conf.setBoolean(MRJobConfig.NEW_API_MAPPER_CONFIG, useNewApi); if (outputFormatProvided) { - conf.setBoolean(MRJobConfig.NEW_API_REDUCER_CONFIG, useNewApi); if (useNewApi) { conf.set(MRJobConfig.OUTPUT_FORMAT_CLASS_ATTR, outputFormat.getName()); } else { @@ -342,9 +348,13 @@ public class MROutput extends AbstractLogicalOutput { this.jobConf = new JobConf(conf); // Add tokens to the jobConf - in case they are accessed within the RW / OF jobConf.getCredentials().mergeAll(UserGroupInformation.getCurrentUser().getCredentials()); - this.useNewApi = this.jobConf.getUseNewReducer(); this.isMapperOutput = jobConf.getBoolean(MRConfig.IS_MAP_PROCESSOR, false); + if (this.isMapperOutput) { + this.useNewApi = this.jobConf.getUseNewMapper(); + } else { + this.useNewApi = this.jobConf.getUseNewReducer(); + } jobConf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID, getContext().getDAGAttemptNumber()); TaskAttemptID taskAttemptId = org.apache.tez.mapreduce.hadoop.mapreduce.TaskAttemptContextImpl http://git-wip-us.apache.org/repos/asf/tez/blob/d72aab4b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/output/TestMROutputLegacy.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/output/TestMROutputLegacy.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/output/TestMROutputLegacy.java new file mode 100644 index 0000000..e4fa0ea --- /dev/null +++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/output/TestMROutputLegacy.java @@ -0,0 +1,184 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.tez.mapreduce.output; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter; +import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.tez.common.TezUtils; +import org.apache.tez.common.counters.TezCounters; +import org.apache.tez.dag.api.DataSinkDescriptor; +import org.apache.tez.dag.api.OutputCommitterDescriptor; +import org.apache.tez.dag.api.OutputDescriptor; +import org.apache.tez.dag.api.UserPayload; +import org.apache.tez.mapreduce.committer.MROutputCommitter; +import org.apache.tez.mapreduce.hadoop.MRConfig; +import org.apache.tez.runtime.api.OutputContext; +import org.junit.Test; + + +public class TestMROutputLegacy { + + // simulate the behavior of translating MR to DAG using MR old API + @Test (timeout = 5000) + public void testOldAPI_MR() throws Exception { + String outputPath = "/tmp/output"; + JobConf conf = new JobConf(); + conf.setOutputKeyClass(NullWritable.class); + conf.setOutputValueClass(Text.class); + conf.setOutputFormat(org.apache.hadoop.mapred.SequenceFileOutputFormat.class); + org.apache.hadoop.mapred.SequenceFileOutputFormat.setOutputPath(conf, new Path(outputPath)); + // the output is attached to reducer + conf.setBoolean(MRConfig.IS_MAP_PROCESSOR, false); + UserPayload vertexPayload = TezUtils.createUserPayloadFromConf(conf); + OutputDescriptor od = OutputDescriptor.create(MROutputLegacy.class.getName()) + .setUserPayload(vertexPayload); + DataSinkDescriptor sink = DataSinkDescriptor.create(od, + OutputCommitterDescriptor.create(MROutputCommitter.class.getName()), null); + + OutputContext outputContext = createMockOutputContext(sink.getOutputDescriptor().getUserPayload()); + MROutputLegacy output = new MROutputLegacy(outputContext, 2); + output.initialize(); + assertEquals(false, output.useNewApi); + assertEquals(org.apache.hadoop.mapred.SequenceFileOutputFormat.class, output.oldOutputFormat.getClass()); + assertNull(output.newOutputFormat); + assertEquals(NullWritable.class, output.oldApiTaskAttemptContext.getOutputKeyClass()); + assertEquals(Text.class, output.oldApiTaskAttemptContext.getOutputValueClass()); + assertNull(output.newApiTaskAttemptContext); + assertNotNull(output.oldRecordWriter); + assertNull(output.newRecordWriter); + assertEquals(org.apache.hadoop.mapred.FileOutputCommitter.class, output.committer.getClass()); + } + + // simulate the behavior of translating MR to DAG using MR new API + @Test (timeout = 5000) + public void testNewAPI_MR() throws Exception { + String outputPath = "/tmp/output"; + Job job = Job.getInstance(); + job.setOutputKeyClass(NullWritable.class); + job.setOutputValueClass(Text.class); + job.setOutputFormatClass(SequenceFileOutputFormat.class); + SequenceFileOutputFormat.setOutputPath(job, new Path(outputPath)); + job.getConfiguration().setBoolean("mapred.reducer.new-api", true); + // the output is attached to reducer + job.getConfiguration().setBoolean(MRConfig.IS_MAP_PROCESSOR, false); + UserPayload vertexPayload = TezUtils.createUserPayloadFromConf(job.getConfiguration()); + OutputDescriptor od = OutputDescriptor.create(MROutputLegacy.class.getName()) + .setUserPayload(vertexPayload); + DataSinkDescriptor sink = DataSinkDescriptor.create(od, + OutputCommitterDescriptor.create(MROutputCommitter.class.getName()), null); + + OutputContext outputContext = createMockOutputContext(sink.getOutputDescriptor().getUserPayload()); + MROutputLegacy output = new MROutputLegacy(outputContext, 2); + output.initialize(); + assertEquals(true, output.useNewApi); + assertEquals(SequenceFileOutputFormat.class, output.newOutputFormat.getClass()); + assertNull(output.oldOutputFormat); + assertEquals(NullWritable.class, output.newApiTaskAttemptContext.getOutputKeyClass()); + assertEquals(Text.class, output.newApiTaskAttemptContext.getOutputValueClass()); + assertNull(output.oldApiTaskAttemptContext); + assertNotNull(output.newRecordWriter); + assertNull(output.oldRecordWriter); + assertEquals(FileOutputCommitter.class, output.committer.getClass()); + } + + // simulate the behavior of translating Mapper-only job to DAG using MR old API + @Test (timeout = 5000) + public void testOldAPI_MapperOnly() throws Exception { + String outputPath = "/tmp/output"; + JobConf conf = new JobConf(); + conf.setOutputKeyClass(NullWritable.class); + conf.setOutputValueClass(Text.class); + conf.setOutputFormat(org.apache.hadoop.mapred.SequenceFileOutputFormat.class); + org.apache.hadoop.mapred.SequenceFileOutputFormat.setOutputPath(conf, new Path(outputPath)); + // the output is attached to mapper + conf.setBoolean(MRConfig.IS_MAP_PROCESSOR, true); + UserPayload vertexPayload = TezUtils.createUserPayloadFromConf(conf); + OutputDescriptor od = OutputDescriptor.create(MROutputLegacy.class.getName()) + .setUserPayload(vertexPayload); + DataSinkDescriptor sink = DataSinkDescriptor.create(od, + OutputCommitterDescriptor.create(MROutputCommitter.class.getName()), null); + + OutputContext outputContext = createMockOutputContext(sink.getOutputDescriptor().getUserPayload()); + MROutputLegacy output = new MROutputLegacy(outputContext, 2); + output.initialize(); + assertEquals(false, output.useNewApi); + assertEquals(org.apache.hadoop.mapred.SequenceFileOutputFormat.class, output.oldOutputFormat.getClass()); + assertNull(output.newOutputFormat); + assertEquals(NullWritable.class, output.oldApiTaskAttemptContext.getOutputKeyClass()); + assertEquals(Text.class, output.oldApiTaskAttemptContext.getOutputValueClass()); + assertNull(output.newApiTaskAttemptContext); + assertNotNull(output.oldRecordWriter); + assertNull(output.newRecordWriter); + assertEquals(org.apache.hadoop.mapred.FileOutputCommitter.class, output.committer.getClass()); + } + + //simulate the behavior of translating mapper-only job to DAG using MR new API + @Test (timeout = 5000) + public void testNewAPI_MapperOnly() throws Exception { + String outputPath = "/tmp/output"; + Job job = Job.getInstance(); + job.setOutputKeyClass(NullWritable.class); + job.setOutputValueClass(Text.class); + job.setOutputFormatClass(SequenceFileOutputFormat.class); + SequenceFileOutputFormat.setOutputPath(job, new Path(outputPath)); + job.getConfiguration().setBoolean("mapred.mapper.new-api", true); + // the output is attached to mapper + job.getConfiguration().setBoolean(MRConfig.IS_MAP_PROCESSOR, true); + UserPayload vertexPayload = TezUtils.createUserPayloadFromConf(job.getConfiguration()); + OutputDescriptor od = OutputDescriptor.create(MROutputLegacy.class.getName()) + .setUserPayload(vertexPayload); + DataSinkDescriptor sink = DataSinkDescriptor.create(od, + OutputCommitterDescriptor.create(MROutputCommitter.class.getName()), null); + + OutputContext outputContext = createMockOutputContext(sink.getOutputDescriptor().getUserPayload()); + MROutputLegacy output = new MROutputLegacy(outputContext, 2); + output.initialize(); + assertEquals(true, output.useNewApi); + assertEquals(SequenceFileOutputFormat.class, output.newOutputFormat.getClass()); + assertNull(output.oldOutputFormat); + assertEquals(NullWritable.class, output.newApiTaskAttemptContext.getOutputKeyClass()); + assertEquals(Text.class, output.newApiTaskAttemptContext.getOutputValueClass()); + assertNull(output.oldApiTaskAttemptContext); + assertNotNull(output.newRecordWriter); + assertNull(output.oldRecordWriter); + assertEquals(FileOutputCommitter.class, output.committer.getClass()); + } + + private OutputContext createMockOutputContext(UserPayload payload) { + OutputContext outputContext = mock(OutputContext.class); + ApplicationId appId = ApplicationId.newInstance(System.currentTimeMillis(), 1); + when(outputContext.getUserPayload()).thenReturn(payload); + when(outputContext.getApplicationId()).thenReturn(appId); + when(outputContext.getTaskVertexIndex()).thenReturn(1); + when(outputContext.getTaskAttemptNumber()).thenReturn(1); + when(outputContext.getCounters()).thenReturn(new TezCounters()); + return outputContext; + } +}