tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From zjf...@apache.org
Subject tez git commit: TEZ-2305. MR compatibility sleep job fails with IOException: Undefined job output-path (zjffdu)
Date Wed, 29 Apr 2015 10:23:15 GMT
Repository: tez
Updated Branches:
  refs/heads/master 5b2f011f1 -> 9ba4b1b18


TEZ-2305. MR compatibility sleep job fails with IOException: Undefined job output-path (zjffdu)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/9ba4b1b1
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/9ba4b1b1
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/9ba4b1b1

Branch: refs/heads/master
Commit: 9ba4b1b18783ea9a7d7acb961fc04e6d47e34a40
Parents: 5b2f011
Author: Jeff Zhang <zjffdu@apache.org>
Authored: Wed Apr 29 18:23:05 2015 +0800
Committer: Jeff Zhang <zjffdu@apache.org>
Committed: Wed Apr 29 18:23:05 2015 +0800

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../apache/tez/mapreduce/output/MROutput.java   |  16 +-
 .../mapreduce/output/TestMROutputLegacy.java    | 184 +++++++++++++++++++
 3 files changed, 198 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/9ba4b1b1/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index e676cde..e248286 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -321,6 +321,7 @@ TEZ-UI CHANGES (TEZ-8):
 Release 0.5.4: Unreleased
 
 ALL CHANGES:
+  TEZ-2305. MR compatibility sleep job fails with IOException: Undefined job output-path
   TEZ-2303. ConcurrentModificationException while processing recovery.
   TEZ-2334. ContainerManagementProtocolProxy modifies IPC timeout conf without making a copy.
   TEZ-2317. Event processing backlog can result in task failures for short

http://git-wip-us.apache.org/repos/asf/tez/blob/9ba4b1b1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java
index 483c92b..d19f707 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java
@@ -111,7 +111,11 @@ public class MROutput extends AbstractLogicalOutput {
         }
       } else {
         outputFormatProvided = false;
-        useNewApi = conf.getBoolean(MRJobConfig.NEW_API_REDUCER_CONFIG, true);
+        if (conf.get(MRJobConfig.NEW_API_REDUCER_CONFIG) == null) {
+          useNewApi = conf.getBoolean(MRJobConfig.NEW_API_MAPPER_CONFIG, true);
+        } else {
+          useNewApi = conf.getBoolean(MRJobConfig.NEW_API_REDUCER_CONFIG, true);
+        }
         try {
           if (useNewApi) {
             String outputClass = conf.get(MRJobConfig.OUTPUT_FORMAT_CLASS_ATTR);
@@ -236,8 +240,10 @@ public class MROutput extends AbstractLogicalOutput {
      * Creates the user payload to be set on the OutputDescriptor for MROutput
      */
     private UserPayload createUserPayload() {
+      // set which api is being used always
+      conf.setBoolean(MRJobConfig.NEW_API_REDUCER_CONFIG, useNewApi);
+      conf.setBoolean(MRJobConfig.NEW_API_MAPPER_CONFIG, useNewApi);
       if (outputFormatProvided) {
-        conf.setBoolean(MRJobConfig.NEW_API_REDUCER_CONFIG, useNewApi);
         if (useNewApi) {
           conf.set(MRJobConfig.OUTPUT_FORMAT_CLASS_ATTR, outputFormat.getName());
         } else {
@@ -347,9 +353,13 @@ public class MROutput extends AbstractLogicalOutput {
     this.jobConf = new JobConf(conf);
     // Add tokens to the jobConf - in case they are accessed within the RW / OF
     jobConf.getCredentials().mergeAll(UserGroupInformation.getCurrentUser().getCredentials());
-    this.useNewApi = this.jobConf.getUseNewReducer();
     this.isMapperOutput = jobConf.getBoolean(MRConfig.IS_MAP_PROCESSOR,
         false);
+    if (this.isMapperOutput) {
+      this.useNewApi = this.jobConf.getUseNewMapper();
+    } else {
+      this.useNewApi = this.jobConf.getUseNewReducer();
+    }
     jobConf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID,
         getContext().getDAGAttemptNumber());
     TaskAttemptID taskAttemptId = org.apache.tez.mapreduce.hadoop.mapreduce.TaskAttemptContextImpl

http://git-wip-us.apache.org/repos/asf/tez/blob/9ba4b1b1/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/output/TestMROutputLegacy.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/output/TestMROutputLegacy.java
b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/output/TestMROutputLegacy.java
new file mode 100644
index 0000000..e4fa0ea
--- /dev/null
+++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/output/TestMROutputLegacy.java
@@ -0,0 +1,184 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.mapreduce.output;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.tez.common.TezUtils;
+import org.apache.tez.common.counters.TezCounters;
+import org.apache.tez.dag.api.DataSinkDescriptor;
+import org.apache.tez.dag.api.OutputCommitterDescriptor;
+import org.apache.tez.dag.api.OutputDescriptor;
+import org.apache.tez.dag.api.UserPayload;
+import org.apache.tez.mapreduce.committer.MROutputCommitter;
+import org.apache.tez.mapreduce.hadoop.MRConfig;
+import org.apache.tez.runtime.api.OutputContext;
+import org.junit.Test;
+
+
+public class TestMROutputLegacy {
+
+  // simulate the behavior of translating MR to DAG using MR old API
+  @Test (timeout = 5000)
+  public void testOldAPI_MR() throws Exception {
+    String outputPath = "/tmp/output";
+    JobConf conf = new JobConf();
+    conf.setOutputKeyClass(NullWritable.class);
+    conf.setOutputValueClass(Text.class);
+    conf.setOutputFormat(org.apache.hadoop.mapred.SequenceFileOutputFormat.class);
+    org.apache.hadoop.mapred.SequenceFileOutputFormat.setOutputPath(conf, new Path(outputPath));
+    // the output is attached to reducer
+    conf.setBoolean(MRConfig.IS_MAP_PROCESSOR, false);
+    UserPayload vertexPayload = TezUtils.createUserPayloadFromConf(conf);
+    OutputDescriptor od = OutputDescriptor.create(MROutputLegacy.class.getName())
+        .setUserPayload(vertexPayload);
+    DataSinkDescriptor sink = DataSinkDescriptor.create(od,
+        OutputCommitterDescriptor.create(MROutputCommitter.class.getName()), null);
+
+    OutputContext outputContext = createMockOutputContext(sink.getOutputDescriptor().getUserPayload());
+    MROutputLegacy output = new MROutputLegacy(outputContext, 2);
+    output.initialize();
+    assertEquals(false, output.useNewApi);
+    assertEquals(org.apache.hadoop.mapred.SequenceFileOutputFormat.class, output.oldOutputFormat.getClass());
+    assertNull(output.newOutputFormat);
+    assertEquals(NullWritable.class, output.oldApiTaskAttemptContext.getOutputKeyClass());
+    assertEquals(Text.class, output.oldApiTaskAttemptContext.getOutputValueClass());
+    assertNull(output.newApiTaskAttemptContext);
+    assertNotNull(output.oldRecordWriter);
+    assertNull(output.newRecordWriter);
+    assertEquals(org.apache.hadoop.mapred.FileOutputCommitter.class, output.committer.getClass());
+  }
+
+  // simulate the behavior of translating MR to DAG using MR new API
+  @Test (timeout = 5000)
+  public void testNewAPI_MR() throws Exception {
+    String outputPath = "/tmp/output";
+    Job job = Job.getInstance();
+    job.setOutputKeyClass(NullWritable.class);
+    job.setOutputValueClass(Text.class);
+    job.setOutputFormatClass(SequenceFileOutputFormat.class);
+    SequenceFileOutputFormat.setOutputPath(job, new Path(outputPath));
+    job.getConfiguration().setBoolean("mapred.reducer.new-api", true);
+    // the output is attached to reducer
+    job.getConfiguration().setBoolean(MRConfig.IS_MAP_PROCESSOR, false);
+    UserPayload vertexPayload = TezUtils.createUserPayloadFromConf(job.getConfiguration());
+    OutputDescriptor od = OutputDescriptor.create(MROutputLegacy.class.getName())
+        .setUserPayload(vertexPayload);
+    DataSinkDescriptor sink = DataSinkDescriptor.create(od,
+        OutputCommitterDescriptor.create(MROutputCommitter.class.getName()), null);
+
+    OutputContext outputContext = createMockOutputContext(sink.getOutputDescriptor().getUserPayload());
+    MROutputLegacy output = new MROutputLegacy(outputContext, 2);
+    output.initialize();
+    assertEquals(true, output.useNewApi);
+    assertEquals(SequenceFileOutputFormat.class, output.newOutputFormat.getClass());
+    assertNull(output.oldOutputFormat);
+    assertEquals(NullWritable.class, output.newApiTaskAttemptContext.getOutputKeyClass());
+    assertEquals(Text.class, output.newApiTaskAttemptContext.getOutputValueClass());
+    assertNull(output.oldApiTaskAttemptContext);
+    assertNotNull(output.newRecordWriter);
+    assertNull(output.oldRecordWriter);
+    assertEquals(FileOutputCommitter.class, output.committer.getClass());
+  }
+
+  // simulate the behavior of translating Mapper-only job to DAG using MR old API
+  @Test (timeout = 5000)
+  public void testOldAPI_MapperOnly() throws Exception {
+    String outputPath = "/tmp/output";
+    JobConf conf = new JobConf();
+    conf.setOutputKeyClass(NullWritable.class);
+    conf.setOutputValueClass(Text.class);
+    conf.setOutputFormat(org.apache.hadoop.mapred.SequenceFileOutputFormat.class);
+    org.apache.hadoop.mapred.SequenceFileOutputFormat.setOutputPath(conf, new Path(outputPath));
+    // the output is attached to mapper
+    conf.setBoolean(MRConfig.IS_MAP_PROCESSOR, true);
+    UserPayload vertexPayload = TezUtils.createUserPayloadFromConf(conf);
+    OutputDescriptor od = OutputDescriptor.create(MROutputLegacy.class.getName())
+        .setUserPayload(vertexPayload);
+    DataSinkDescriptor sink = DataSinkDescriptor.create(od,
+        OutputCommitterDescriptor.create(MROutputCommitter.class.getName()), null);
+
+    OutputContext outputContext = createMockOutputContext(sink.getOutputDescriptor().getUserPayload());
+    MROutputLegacy output = new MROutputLegacy(outputContext, 2);
+    output.initialize();
+    assertEquals(false, output.useNewApi);
+    assertEquals(org.apache.hadoop.mapred.SequenceFileOutputFormat.class, output.oldOutputFormat.getClass());
+    assertNull(output.newOutputFormat);
+    assertEquals(NullWritable.class, output.oldApiTaskAttemptContext.getOutputKeyClass());
+    assertEquals(Text.class, output.oldApiTaskAttemptContext.getOutputValueClass());
+    assertNull(output.newApiTaskAttemptContext);
+    assertNotNull(output.oldRecordWriter);
+    assertNull(output.newRecordWriter);
+    assertEquals(org.apache.hadoop.mapred.FileOutputCommitter.class, output.committer.getClass());
+  }
+
+  //simulate the behavior of translating mapper-only job to DAG using MR new API
+  @Test (timeout = 5000)
+  public void testNewAPI_MapperOnly() throws Exception {
+    String outputPath = "/tmp/output";
+    Job job = Job.getInstance();
+    job.setOutputKeyClass(NullWritable.class);
+    job.setOutputValueClass(Text.class);
+    job.setOutputFormatClass(SequenceFileOutputFormat.class);
+    SequenceFileOutputFormat.setOutputPath(job, new Path(outputPath));
+    job.getConfiguration().setBoolean("mapred.mapper.new-api", true);
+    // the output is attached to mapper
+    job.getConfiguration().setBoolean(MRConfig.IS_MAP_PROCESSOR, true);
+    UserPayload vertexPayload = TezUtils.createUserPayloadFromConf(job.getConfiguration());
+    OutputDescriptor od = OutputDescriptor.create(MROutputLegacy.class.getName())
+        .setUserPayload(vertexPayload);
+    DataSinkDescriptor sink = DataSinkDescriptor.create(od,
+        OutputCommitterDescriptor.create(MROutputCommitter.class.getName()), null);
+
+    OutputContext outputContext = createMockOutputContext(sink.getOutputDescriptor().getUserPayload());
+    MROutputLegacy output = new MROutputLegacy(outputContext, 2);
+    output.initialize();
+    assertEquals(true, output.useNewApi);
+    assertEquals(SequenceFileOutputFormat.class, output.newOutputFormat.getClass());
+    assertNull(output.oldOutputFormat);
+    assertEquals(NullWritable.class, output.newApiTaskAttemptContext.getOutputKeyClass());
+    assertEquals(Text.class, output.newApiTaskAttemptContext.getOutputValueClass());
+    assertNull(output.oldApiTaskAttemptContext);
+    assertNotNull(output.newRecordWriter);
+    assertNull(output.oldRecordWriter);
+    assertEquals(FileOutputCommitter.class, output.committer.getClass());
+  }
+
+  private OutputContext createMockOutputContext(UserPayload payload) {
+    OutputContext outputContext = mock(OutputContext.class);
+    ApplicationId appId = ApplicationId.newInstance(System.currentTimeMillis(), 1);
+    when(outputContext.getUserPayload()).thenReturn(payload);
+    when(outputContext.getApplicationId()).thenReturn(appId);
+    when(outputContext.getTaskVertexIndex()).thenReturn(1);
+    when(outputContext.getTaskAttemptNumber()).thenReturn(1);
+    when(outputContext.getCounters()).thenReturn(new TezCounters());
+    return outputContext;
+  }
+}


Mime
View raw message