parquet-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jul...@apache.org
Subject parquet-mr git commit: PARQUET-220: Unnecessary warning in ParquetRecordReader.initialize
Date Tue, 06 Dec 2016 01:01:45 GMT
Repository: parquet-mr
Updated Branches:
  refs/heads/master 7987a544c -> 4fd34e651


PARQUET-220: Unnecessary warning in ParquetRecordReader.initialize

Rather than querying the COUNTER_METHOD up front, the counter method is resolved per object.
This allows us to use the
'getCounter' method on any TaskAttemptContext with the correct signature (ignoring versions
where TaskAttemptContext does
not have an appropriate method/signature - preserving current behavior).

Author: Reuben Kuhnert <reuben.kuhnert@cloudera.com>

Closes #280 from sircodesalotOfTheRound/context-utils-parquet-220 and squashes the following
commits:

f118990 [Reuben Kuhnert] PARQUET-220: Unnecessary warning in ParquetRecordReader.initialize


Project: http://git-wip-us.apache.org/repos/asf/parquet-mr/repo
Commit: http://git-wip-us.apache.org/repos/asf/parquet-mr/commit/4fd34e65
Tree: http://git-wip-us.apache.org/repos/asf/parquet-mr/tree/4fd34e65
Diff: http://git-wip-us.apache.org/repos/asf/parquet-mr/diff/4fd34e65

Branch: refs/heads/master
Commit: 4fd34e6517f2c400a06e3c1d43ec56df2ff5c392
Parents: 7987a54
Author: Reuben Kuhnert <reuben.kuhnert@cloudera.com>
Authored: Mon Dec 5 17:01:38 2016 -0800
Committer: Julien Le Dem <julien@dremio.com>
Committed: Mon Dec 5 17:01:38 2016 -0800

----------------------------------------------------------------------
 .../parquet/hadoop/ParquetRecordReader.java     | 10 ++--
 .../apache/parquet/hadoop/util/ContextUtil.java | 57 +++++++++++++++-----
 .../hadoop/util/counters/BenchmarkCounter.java  |  4 +-
 .../mapreduce/MapReduceCounterLoader.java       |  5 +-
 .../hadoop/example/TestInputOutputFormat.java   |  1 +
 pom.xml                                         |  1 +
 6 files changed, 58 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/4fd34e65/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetRecordReader.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetRecordReader.java
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetRecordReader.java
index f2f656d..ebdc686 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetRecordReader.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetRecordReader.java
@@ -135,11 +135,13 @@ public class ParquetRecordReader<T> extends RecordReader<Void,
T> {
   @Override
   public void initialize(InputSplit inputSplit, TaskAttemptContext context)
       throws IOException, InterruptedException {
-    if (context instanceof TaskInputOutputContext<?, ?, ?, ?>) {
-      BenchmarkCounter.initCounterFromContext((TaskInputOutputContext<?, ?, ?, ?>)
context);
+
+    if (ContextUtil.hasCounterMethod(context)) {
+      BenchmarkCounter.initCounterFromContext(context);
     } else {
-      LOG.error("Can not initialize counter due to context is not a instance of TaskInputOutputContext,
but is "
-              + context.getClass().getCanonicalName());
+      LOG.error(
+          String.format("Can not initialize counter because the class '%s' does not have
a '.getCounterMethod'",
+               context.getClass().getCanonicalName()));
     }
 
     initializeInternalReader(toParquetSplit(inputSplit), ContextUtil.getConfiguration(context));

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/4fd34e65/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/ContextUtil.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/ContextUtil.java
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/ContextUtil.java
index 106fb0c..b2fec1b 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/ContextUtil.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/ContextUtil.java
@@ -22,6 +22,8 @@ import java.lang.reflect.Constructor;
 import java.lang.reflect.Field;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
+import java.util.HashMap;
+import java.util.Map;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapreduce.Counter;
@@ -61,9 +63,10 @@ public class ContextUtil {
   private static final Field WRAPPED_CONTEXT_FIELD;
 
   private static final Method GET_CONFIGURATION_METHOD;
-  private static final Method GET_COUNTER_METHOD;
   private static final Method INCREMENT_COUNTER_METHOD;
 
+  private static final Map<Class, Method> COUNTER_METHODS_BY_CLASS = new HashMap<Class,
Method>();
+
   static {
     boolean v21 = true;
     final String PACKAGE = "org.apache.hadoop.mapreduce";
@@ -140,15 +143,20 @@ public class ContextUtil {
         WRAPPED_CONTEXT_FIELD =
             innerMapContextCls.getDeclaredField("mapContext");
         WRAPPED_CONTEXT_FIELD.setAccessible(true);
-        Method get_counter_method;
         try {
-          get_counter_method = Class.forName(PACKAGE + ".TaskAttemptContext").getMethod("getCounter",
String.class,
-                  String.class);
-        } catch (Exception e) {
-          get_counter_method = Class.forName(PACKAGE + ".TaskInputOutputContext").getMethod("getCounter",
-                  String.class, String.class);
+          Class<?> taskAttemptContextClass = Class.forName(PACKAGE + ".TaskAttemptContext");
+          Method getCounterMethodForTaskAttemptContext
+            = taskAttemptContextClass.getMethod("getCounter", String.class, String.class);
+
+          COUNTER_METHODS_BY_CLASS.put(taskAttemptContextClass, getCounterMethodForTaskAttemptContext);
+
+        } catch (ClassNotFoundException e) {
+          Class<?> taskInputOutputContextClass = Class.forName(PACKAGE + ".TaskInputOutputContext");
+          Method getCounterMethodForTaskInputOutputContextClass =
+            taskInputOutputContextClass.getMethod("getCounter", String.class, String.class);
+
+          COUNTER_METHODS_BY_CLASS.put(taskInputOutputContextClass, getCounterMethodForTaskInputOutputContextClass);
         }
-        GET_COUNTER_METHOD=get_counter_method;
       } else {
         MAP_CONTEXT_CONSTRUCTOR =
             innerMapContextCls.getConstructor(mapCls,
@@ -161,7 +169,8 @@ public class ContextUtil {
                 InputSplit.class);
         MAP_CONTEXT_IMPL_CONSTRUCTOR = null;
         WRAPPED_CONTEXT_FIELD = null;
-        GET_COUNTER_METHOD=taskIOContextCls.getMethod("getCounter", String.class, String.class);
+
+        COUNTER_METHODS_BY_CLASS.put(taskIOContextCls, taskIOContextCls.getMethod("getCounter",
String.class, String.class));
       }
       MAP_CONTEXT_CONSTRUCTOR.setAccessible(true);
       READER_FIELD = mapContextCls.getDeclaredField("reader");
@@ -251,9 +260,33 @@ public class ContextUtil {
     }
   }
 
-  public static Counter getCounter(TaskInputOutputContext context,
-                                   String groupName, String counterName) {
-    return (Counter) invoke(GET_COUNTER_METHOD, context, groupName, counterName);
+  public static Counter getCounter(TaskAttemptContext context, String groupName, String counterName)
{
+    Method counterMethod = findCounterMethod(context);
+    return (Counter)invoke(counterMethod, context, groupName, counterName);
+  }
+
+  public static boolean hasCounterMethod(TaskAttemptContext context) {
+    return findCounterMethod(context) != null;
+  }
+
+  private static Method findCounterMethod(TaskAttemptContext context) {
+    if (context != null) {
+      if (COUNTER_METHODS_BY_CLASS.containsKey(context.getClass())) {
+        return COUNTER_METHODS_BY_CLASS.get(context.getClass());
+      }
+
+      try {
+        Method method = context.getClass().getMethod("getCounter", String.class, String.class);
+        if (method.getReturnType().isAssignableFrom(Counter.class)) {
+          COUNTER_METHODS_BY_CLASS.put(context.getClass(), method);
+          return method;
+        }
+      } catch (NoSuchMethodException e) {
+        return null;
+      }
+    }
+
+    return null;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/4fd34e65/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/counters/BenchmarkCounter.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/counters/BenchmarkCounter.java
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/counters/BenchmarkCounter.java
index e537783..b8521b3 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/counters/BenchmarkCounter.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/counters/BenchmarkCounter.java
@@ -20,7 +20,7 @@ package org.apache.parquet.hadoop.util.counters;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapred.Reporter;
-import org.apache.hadoop.mapreduce.TaskInputOutputContext;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.parquet.hadoop.util.counters.mapred.MapRedCounterLoader;
 import org.apache.parquet.hadoop.util.counters.mapreduce.MapReduceCounterLoader;
 
@@ -48,7 +48,7 @@ public class BenchmarkCounter {
    *
    * @param context
    */
-  public static void initCounterFromContext(TaskInputOutputContext<?, ?, ?, ?> context)
{
+  public static void initCounterFromContext(TaskAttemptContext context) {
     counterLoader = new MapReduceCounterLoader(context);
     loadCounters();
   }

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/4fd34e65/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/counters/mapreduce/MapReduceCounterLoader.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/counters/mapreduce/MapReduceCounterLoader.java
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/counters/mapreduce/MapReduceCounterLoader.java
index 1540f03..1bf4b97 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/counters/mapreduce/MapReduceCounterLoader.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/counters/mapreduce/MapReduceCounterLoader.java
@@ -18,6 +18,7 @@
  */
 package org.apache.parquet.hadoop.util.counters.mapreduce;
 
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.TaskInputOutputContext;
 import org.apache.parquet.hadoop.util.ContextUtil;
 import org.apache.parquet.hadoop.util.counters.BenchmarkCounter;
@@ -30,9 +31,9 @@ import org.apache.parquet.hadoop.util.counters.ICounter;
  * @author Tianshuo Deng
  */
 public class MapReduceCounterLoader implements CounterLoader {
-  private TaskInputOutputContext<?, ?, ?, ?> context;
+  private TaskAttemptContext context;
 
-  public MapReduceCounterLoader(TaskInputOutputContext<?, ?, ?, ?> context) {
+  public MapReduceCounterLoader(TaskAttemptContext context) {
     this.context = context;
   }
 

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/4fd34e65/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/example/TestInputOutputFormat.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/example/TestInputOutputFormat.java
b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/example/TestInputOutputFormat.java
index d1b5267..c829dc1 100644
--- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/example/TestInputOutputFormat.java
+++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/example/TestInputOutputFormat.java
@@ -329,6 +329,7 @@ public class TestInputOutputFormat {
   @Test
   public void testReadWriteWithCounter() throws Exception {
     runMapReduceJob(CompressionCodecName.GZIP);
+
     assertTrue(value(readJob, "parquet", "bytesread") > 0L);
     assertTrue(value(readJob, "parquet", "bytestotal") > 0L);
     assertTrue(value(readJob, "parquet", "bytesread")

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/4fd34e65/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index beb1e93..7d6187d 100644
--- a/pom.xml
+++ b/pom.xml
@@ -225,6 +225,7 @@
                    <dumpDetails>true</dumpDetails>
                    <previousVersion>${previous.version}</previousVersion>
                    <excludes>
+                     <exclude>org/apache/parquet/hadoop/util/**</exclude>
                      <exclude>org/apache/parquet/thrift/projection/**</exclude>
                      <exclude>org/apache/parquet/thrift/ThriftSchemaConverter</exclude>
                      <exclude>org/apache/parquet/filter2/**</exclude>


Mime
View raw message