tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From zjf...@apache.org
Subject tez git commit: TEZ-2162. org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat is not recognized (zjffdu)
Date Tue, 17 Mar 2015 02:20:30 GMT
Repository: tez
Updated Branches:
  refs/heads/branch-0.6 7c1727a2b -> 97472e2a5


TEZ-2162. org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat is not recognized
(zjffdu)

(cherry picked from commit 2960fd1c2a6823f5fe03fbf993e925fff0de0eb0)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/97472e2a
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/97472e2a
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/97472e2a

Branch: refs/heads/branch-0.6
Commit: 97472e2a53526869a23e161585869cbc55e06f83
Parents: 7c1727a
Author: Jeff Zhang <zjffdu@apache.org>
Authored: Tue Mar 17 10:19:07 2015 +0800
Committer: Jeff Zhang <zjffdu@apache.org>
Committed: Tue Mar 17 10:20:10 2015 +0800

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../apache/tez/mapreduce/output/MROutput.java   |  34 +++--
 .../tez/mapreduce/output/TestMROutput.java      | 147 +++++++++++++++++++
 .../output/TestMROutputConfigBuilder.java       | 141 ++++++++++++++++++
 4 files changed, 313 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/97472e2a/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index d2f84ae..fafbca8 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -168,6 +168,7 @@ TEZ-UI CHANGES (TEZ-8):
 Release 0.5.4: Unreleased
 
 ALL CHANGES:
+  TEZ-2162. org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat is not recognized
   TEZ-2193. Check returned value from EdgeManagerPlugin before using it
   TEZ-2133. Secured Impersonation: Failed to delete tez scratch data dir
   TEZ-2058. Flaky test: TestTezJobs::testInvalidQueueSubmission.

http://git-wip-us.apache.org/repos/asf/tez/blob/97472e2a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java
index 94a3c1f..88c192d 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java
@@ -28,8 +28,10 @@ import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicBoolean;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 
+import org.apache.commons.lang.StringUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Public;
@@ -45,7 +47,6 @@ import org.apache.hadoop.mapreduce.OutputCommitter;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.tez.client.TezClientUtils;
 import org.apache.tez.common.TezUtils;
 import org.apache.tez.common.counters.TaskCounter;
 import org.apache.tez.common.counters.TezCounter;
@@ -114,13 +115,23 @@ public class MROutput extends AbstractLogicalOutput {
         useNewApi = conf.getBoolean(MRJobConfig.NEW_API_REDUCER_CONFIG, true);
         try {
           if (useNewApi) {
-            this.outputFormat = conf.getClassByName(conf.get(MRJobConfig.OUTPUT_FORMAT_CLASS_ATTR));
+            String outputClass = conf.get(MRJobConfig.OUTPUT_FORMAT_CLASS_ATTR);
+            if (StringUtils.isEmpty(outputClass)) {
+              throw new TezUncheckedException("no outputFormat setting on Configuration,
useNewAPI:" + useNewApi);
+            }
+            this.outputFormat = conf.getClassByName(outputClass);
             Preconditions.checkState(org.apache.hadoop.mapreduce.OutputFormat.class
-                .isAssignableFrom(this.outputFormat));
+                .isAssignableFrom(this.outputFormat), "outputFormat must be assignable from
"
+                    + "org.apache.hadoop.mapreduce.OutputFormat");
           } else {
-            this.outputFormat = conf.getClassByName(conf.get("mapred.output.format.class"));
+            String outputClass = conf.get("mapred.output.format.class");
+            if (StringUtils.isEmpty(outputClass)) {
+              throw new TezUncheckedException("no outputFormat setting on Configuration,
useNewAPI:" + useNewApi);
+            }
+            this.outputFormat = conf.getClassByName(outputClass);
             Preconditions.checkState(org.apache.hadoop.mapred.OutputFormat.class
-                .isAssignableFrom(this.outputFormat));
+                .isAssignableFrom(this.outputFormat), "outputFormat must be assignable from
"
+                    + "org.apache.hadoop.mapred.OutputFormat");
           }
         } catch (ClassNotFoundException e) {
           throw new TezUncheckedException(e);
@@ -132,7 +143,7 @@ public class MROutput extends AbstractLogicalOutput {
     private MROutputConfigBuilder setOutputPath(String outputPath) {
       if (!(org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.class.isAssignableFrom(outputFormat)
|| 
           FileOutputFormat.class.isAssignableFrom(outputFormat))) {
-        throw new TezUncheckedException("When setting outputPath the outputFormat must "
+ 
+        throw new TezUncheckedException("When setting outputPath the outputFormat must "
+
             "be assignable from either org.apache.hadoop.mapred.FileOutputFormat or " +
             "org.apache.hadoop.mapreduce.lib.output.FileOutputFormat. " +
             "Otherwise use the non-path config builder." +
@@ -311,10 +322,13 @@ public class MROutput extends AbstractLogicalOutput {
 
   private TezCounter outputRecordCounter;
 
-  private TaskAttemptContext newApiTaskAttemptContext;
-  private org.apache.hadoop.mapred.TaskAttemptContext oldApiTaskAttemptContext;
+  @VisibleForTesting
+  TaskAttemptContext newApiTaskAttemptContext;
+  @VisibleForTesting
+  org.apache.hadoop.mapred.TaskAttemptContext oldApiTaskAttemptContext;
 
-  private boolean isMapperOutput;
+  @VisibleForTesting
+  boolean isMapperOutput;
 
   protected OutputCommitter committer;
 
@@ -334,7 +348,7 @@ public class MROutput extends AbstractLogicalOutput {
     this.jobConf = new JobConf(conf);
     // Add tokens to the jobConf - in case they are accessed within the RW / OF
     jobConf.getCredentials().mergeAll(UserGroupInformation.getCurrentUser().getCredentials());
-    this.useNewApi = this.jobConf.getUseNewMapper();
+    this.useNewApi = this.jobConf.getUseNewReducer();
     this.isMapperOutput = jobConf.getBoolean(MRConfig.IS_MAP_PROCESSOR,
         false);
     jobConf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID,

http://git-wip-us.apache.org/repos/asf/tez/blob/97472e2a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/output/TestMROutput.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/output/TestMROutput.java
b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/output/TestMROutput.java
new file mode 100644
index 0000000..b898fe0
--- /dev/null
+++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/output/TestMROutput.java
@@ -0,0 +1,147 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.mapreduce.output;
+
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.*;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.tez.common.counters.TezCounters;
+import org.apache.tez.dag.api.DataSinkDescriptor;
+import org.apache.tez.dag.api.UserPayload;
+import org.apache.tez.mapreduce.hadoop.MRConfig;
+import org.apache.tez.runtime.api.OutputContext;
+import org.junit.Test;
+
+
+public class TestMROutput {
+
+  @Test(timeout = 5000)
+  public void testNewAPI_TextOutputFormat() throws Exception {
+    String outputPath = "/tmp/output";
+    Configuration conf = new Configuration();
+    conf.setBoolean(MRConfig.IS_MAP_PROCESSOR, true);
+    DataSinkDescriptor dataSink = MROutput
+        .createConfigBuilder(conf, TextOutputFormat.class, outputPath)
+        .build();
+
+    OutputContext outputContext = createMockOutputContext(dataSink.getOutputDescriptor().getUserPayload());
+    MROutput output = new MROutput(outputContext, 2);
+    output.initialize();
+
+    assertEquals(true, output.isMapperOutput);
+    assertEquals(true, output.useNewApi);
+    assertEquals(TextOutputFormat.class, output.newOutputFormat.getClass());
+    assertNull(output.oldOutputFormat);
+    assertNotNull(output.newApiTaskAttemptContext);
+    assertNull(output.oldApiTaskAttemptContext);
+    assertNotNull(output.newRecordWriter);
+    assertNull(output.oldRecordWriter);
+    assertEquals(FileOutputCommitter.class, output.committer.getClass());
+  }
+
+  @Test(timeout = 5000)
+  public void testOldAPI_TextOutputFormat() throws Exception {
+    String outputPath = "/tmp/output";
+    Configuration conf = new Configuration();
+    conf.setBoolean(MRConfig.IS_MAP_PROCESSOR, false);
+    DataSinkDescriptor dataSink = MROutput
+        .createConfigBuilder(conf, org.apache.hadoop.mapred.TextOutputFormat.class, outputPath)
+        .build();
+
+    OutputContext outputContext = createMockOutputContext(dataSink.getOutputDescriptor().getUserPayload());
+    MROutput output = new MROutput(outputContext, 2);
+    output.initialize();
+
+    assertEquals(false, output.isMapperOutput);
+    assertEquals(false, output.useNewApi);
+    assertEquals(org.apache.hadoop.mapred.TextOutputFormat.class, output.oldOutputFormat.getClass());
+    assertNull(output.newOutputFormat);
+    assertNotNull(output.oldApiTaskAttemptContext);
+    assertNull(output.newApiTaskAttemptContext);
+    assertNotNull(output.oldRecordWriter);
+    assertNull(output.newRecordWriter);
+    assertEquals(org.apache.hadoop.mapred.FileOutputCommitter.class, output.committer.getClass());
+  }
+
+  @Test(timeout = 5000)
+  public void testNewAPI_SequenceFileOutputFormat() throws Exception {
+    String outputPath = "/tmp/output";
+    JobConf conf = new JobConf();
+    conf.setOutputKeyClass(NullWritable.class);
+    conf.setOutputValueClass(Text.class);
+    DataSinkDescriptor dataSink = MROutput
+        .createConfigBuilder(conf, SequenceFileOutputFormat.class, outputPath)
+        .build();
+
+    OutputContext outputContext = createMockOutputContext(dataSink.getOutputDescriptor().getUserPayload());
+    MROutput output = new MROutput(outputContext, 2);
+    output.initialize();
+    assertEquals(true, output.useNewApi);
+    assertEquals(SequenceFileOutputFormat.class, output.newOutputFormat.getClass());
+    assertNull(output.oldOutputFormat);
+    assertEquals(NullWritable.class, output.newApiTaskAttemptContext.getOutputKeyClass());
+    assertEquals(Text.class, output.newApiTaskAttemptContext.getOutputValueClass());
+    assertNull(output.oldApiTaskAttemptContext);
+    assertNotNull(output.newRecordWriter);
+    assertNull(output.oldRecordWriter);
+    assertEquals(FileOutputCommitter.class, output.committer.getClass());
+  }
+
+  @Test(timeout = 5000)
+  public void testOldAPI_SequenceFileOutputFormat() throws Exception {
+    String outputPath = "/tmp/output";
+    JobConf conf = new JobConf();
+    conf.setOutputKeyClass(NullWritable.class);
+    conf.setOutputValueClass(Text.class);
+    DataSinkDescriptor dataSink = MROutput
+        .createConfigBuilder(conf, org.apache.hadoop.mapred.SequenceFileOutputFormat.class,
outputPath)
+        .build();
+
+    OutputContext outputContext = createMockOutputContext(dataSink.getOutputDescriptor().getUserPayload());
+    MROutput output = new MROutput(outputContext, 2);
+    output.initialize();
+    assertEquals(false, output.useNewApi);
+    assertEquals(org.apache.hadoop.mapred.SequenceFileOutputFormat.class, output.oldOutputFormat.getClass());
+    assertNull(output.newOutputFormat);
+    assertEquals(NullWritable.class, output.oldApiTaskAttemptContext.getOutputKeyClass());
+    assertEquals(Text.class, output.oldApiTaskAttemptContext.getOutputValueClass());
+    assertNull(output.newApiTaskAttemptContext);
+    assertNotNull(output.oldRecordWriter);
+    assertNull(output.newRecordWriter);
+    assertEquals(org.apache.hadoop.mapred.FileOutputCommitter.class, output.committer.getClass());
+  }
+
+  private OutputContext createMockOutputContext(UserPayload payload) {
+    OutputContext outputContext = mock(OutputContext.class);
+    ApplicationId appId = ApplicationId.newInstance(System.currentTimeMillis(), 1);
+    when(outputContext.getUserPayload()).thenReturn(payload);
+    when(outputContext.getApplicationId()).thenReturn(appId);
+    when(outputContext.getTaskVertexIndex()).thenReturn(1);
+    when(outputContext.getTaskAttemptNumber()).thenReturn(1);
+    when(outputContext.getCounters()).thenReturn(new TezCounters());
+    return outputContext;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/97472e2a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/output/TestMROutputConfigBuilder.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/output/TestMROutputConfigBuilder.java
b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/output/TestMROutputConfigBuilder.java
new file mode 100644
index 0000000..b54ec3a
--- /dev/null
+++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/output/TestMROutputConfigBuilder.java
@@ -0,0 +1,141 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.mapreduce.output;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.lib.db.DBOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.tez.mapreduce.hadoop.MRJobConfig;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+
+public class TestMROutputConfigBuilder {
+
+  @Test(timeout = 5000)
+  public void testNewAPI() {
+    Configuration conf = new Configuration();
+    try {
+      MROutput.createConfigBuilder(conf, TextOutputFormat.class).build();
+      fail();
+    } catch (TezUncheckedException e) {
+      assertEquals("OutputPaths must be specified for OutputFormats based "
+          + "on org.apache.hadoop.mapreduce.lib.output.FileOutputFormat "
+          +"or org.apache.hadoop.mapred.FileOutputFormat", e.getMessage());
+    }
+    MROutput.createConfigBuilder(conf, TextOutputFormat.class, "/tmp/output").build();
+
+    // no outputPaths needs to be specified
+    MROutput.createConfigBuilder(conf, DBOutputFormat.class).build();
+  }
+
+  @Test(timeout = 5000)
+  public void testNewAPI_ThroughConf() {
+    Configuration conf = new Configuration();
+    try {
+      MROutput.createConfigBuilder(conf, null).build();
+      fail();
+    } catch (TezUncheckedException e) {
+      assertEquals("no outputFormat setting on Configuration, useNewAPI:true",  e.getMessage());
+    }
+
+    // wrong output_format class
+    conf.set(MRJobConfig.OUTPUT_FORMAT_CLASS_ATTR,
+        org.apache.hadoop.mapred.TextOutputFormat.class.getName());
+    try {
+      MROutput.createConfigBuilder(conf, null).build();
+      fail();
+    } catch (IllegalStateException e) {
+      assertEquals("outputFormat must be assignable from org.apache.hadoop.mapreduce.OutputFormat",
+          e.getMessage());
+    }
+
+    // correct output_format class, but no output_dir specified
+    conf.set(MRJobConfig.OUTPUT_FORMAT_CLASS_ATTR,
+        TextOutputFormat.class.getName());
+    try {
+      MROutput.createConfigBuilder(conf, null).build();
+      fail();
+    } catch (TezUncheckedException e) {
+      assertEquals("OutputPaths must be specified for OutputFormats based "
+          + "on org.apache.hadoop.mapreduce.lib.output.FileOutputFormat "
+          +"or org.apache.hadoop.mapred.FileOutputFormat", e.getMessage());
+    }
+
+    conf.set(org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.OUTDIR, "/tmp/output");
+    MROutput.createConfigBuilder(conf, null).build();
+  }
+
+  @Test(timeout = 5000 )
+  public void testOldAPI() {
+    Configuration conf = new Configuration();
+    try {
+      MROutput.createConfigBuilder(conf, org.apache.hadoop.mapred.TextOutputFormat.class).build();
+      fail();
+    } catch (TezUncheckedException e) {
+      assertEquals("OutputPaths must be specified for OutputFormats based "
+          + "on org.apache.hadoop.mapreduce.lib.output.FileOutputFormat "
+          +"or org.apache.hadoop.mapred.FileOutputFormat", e.getMessage());
+    }
+    MROutput.createConfigBuilder(conf, org.apache.hadoop.mapred.TextOutputFormat.class,
+        "/tmp/output").build();
+
+    // no outputPaths needs to be specified
+    MROutput.createConfigBuilder(conf, org.apache.hadoop.mapred.lib.db.DBOutputFormat.class).build();
+  }
+
+  @Test(timeout = 5000)
+  public void testOldAPI_ThroughConf() {
+    Configuration conf = new Configuration();
+    conf.setBoolean(MRJobConfig.NEW_API_REDUCER_CONFIG, false);
+    try {
+      MROutput.createConfigBuilder(conf, null).build();
+      fail();
+    } catch (TezUncheckedException e) {
+      assertEquals("no outputFormat setting on Configuration, useNewAPI:false",  e.getMessage());
+    }
+
+    // wrong output_format class
+    conf.set("mapred.output.format.class",
+        TextOutputFormat.class.getName());
+    try {
+      MROutput.createConfigBuilder(conf, null).build();
+      fail();
+    } catch (IllegalStateException e) {
+      assertEquals("outputFormat must be assignable from org.apache.hadoop.mapred.OutputFormat",
+          e.getMessage());
+    }
+
+    // correct output_format class, but no output_dir specified
+    conf.set("mapred.output.format.class",
+        org.apache.hadoop.mapred.TextOutputFormat.class.getName());
+    try {
+      MROutput.createConfigBuilder(conf, null).build();
+      fail();
+    } catch (TezUncheckedException e) {
+      assertEquals("OutputPaths must be specified for OutputFormats based "
+          + "on org.apache.hadoop.mapreduce.lib.output.FileOutputFormat "
+          +"or org.apache.hadoop.mapred.FileOutputFormat", e.getMessage());
+    }
+
+    conf.set(org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.OUTDIR, "/tmp/output");
+    MROutput.createConfigBuilder(conf, null).build();
+  }
+}


Mime
View raw message