parquet-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From b...@apache.org
Subject [46/50] [abbrv] parquet-mr git commit: PARQUET-220: Unnecessary warning in ParquetRecordReader.initialize
Date Thu, 19 Jan 2017 01:27:57 GMT
PARQUET-220: Unnecessary warning in ParquetRecordReader.initialize

Rather than querying the COUNTER_METHOD up front, the counter method is resolved per object.
This allows us to use the
'getCounter' method on any TaskAttemptContext with the correct signature (ignoring versions
where TaskAttemptContext does
not have an appropriate method/signature - preserving current behavior).

Author: Reuben Kuhnert <reuben.kuhnert@cloudera.com>

Closes #280 from sircodesalotOfTheRound/context-utils-parquet-220 and squashes the following
commits:

f118990 [Reuben Kuhnert] PARQUET-220: Unnecessary warning in ParquetRecordReader.initialize


Project: http://git-wip-us.apache.org/repos/asf/parquet-mr/repo
Commit: http://git-wip-us.apache.org/repos/asf/parquet-mr/commit/b55390cf
Tree: http://git-wip-us.apache.org/repos/asf/parquet-mr/tree/b55390cf
Diff: http://git-wip-us.apache.org/repos/asf/parquet-mr/diff/b55390cf

Branch: refs/heads/parquet-1.8.x
Commit: b55390cf07e3b0a06e6e8d9831fe35dafc0030f8
Parents: 091ea27
Author: Reuben Kuhnert <reuben.kuhnert@cloudera.com>
Authored: Mon Dec 5 17:01:38 2016 -0800
Committer: Ryan Blue <blue@apache.org>
Committed: Mon Jan 9 16:58:15 2017 -0800

----------------------------------------------------------------------
 .../parquet/hadoop/ParquetRecordReader.java     | 10 ++--
 .../apache/parquet/hadoop/util/ContextUtil.java | 57 +++++++++++++++-----
 .../hadoop/util/counters/BenchmarkCounter.java  |  4 +-
 .../mapreduce/MapReduceCounterLoader.java       |  5 +-
 .../hadoop/example/TestInputOutputFormat.java   |  1 +
 pom.xml                                         |  1 +
 6 files changed, 58 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b55390cf/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetRecordReader.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetRecordReader.java
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetRecordReader.java
index f2f656d..ebdc686 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetRecordReader.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetRecordReader.java
@@ -135,11 +135,13 @@ public class ParquetRecordReader<T> extends RecordReader<Void,
T> {
   @Override
   public void initialize(InputSplit inputSplit, TaskAttemptContext context)
       throws IOException, InterruptedException {
-    if (context instanceof TaskInputOutputContext<?, ?, ?, ?>) {
-      BenchmarkCounter.initCounterFromContext((TaskInputOutputContext<?, ?, ?, ?>)
context);
+
+    if (ContextUtil.hasCounterMethod(context)) {
+      BenchmarkCounter.initCounterFromContext(context);
     } else {
-      LOG.error("Can not initialize counter due to context is not a instance of TaskInputOutputContext,
but is "
-              + context.getClass().getCanonicalName());
+      LOG.error(
+          String.format("Can not initialize counter because the class '%s' does not have
a '.getCounterMethod'",
+               context.getClass().getCanonicalName()));
     }
 
     initializeInternalReader(toParquetSplit(inputSplit), ContextUtil.getConfiguration(context));

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b55390cf/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/ContextUtil.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/ContextUtil.java
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/ContextUtil.java
index 106fb0c..b2fec1b 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/ContextUtil.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/ContextUtil.java
@@ -22,6 +22,8 @@ import java.lang.reflect.Constructor;
 import java.lang.reflect.Field;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
+import java.util.HashMap;
+import java.util.Map;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapreduce.Counter;
@@ -61,9 +63,10 @@ public class ContextUtil {
   private static final Field WRAPPED_CONTEXT_FIELD;
 
   private static final Method GET_CONFIGURATION_METHOD;
-  private static final Method GET_COUNTER_METHOD;
   private static final Method INCREMENT_COUNTER_METHOD;
 
+  private static final Map<Class, Method> COUNTER_METHODS_BY_CLASS = new HashMap<Class,
Method>();
+
   static {
     boolean v21 = true;
     final String PACKAGE = "org.apache.hadoop.mapreduce";
@@ -140,15 +143,20 @@ public class ContextUtil {
         WRAPPED_CONTEXT_FIELD =
             innerMapContextCls.getDeclaredField("mapContext");
         WRAPPED_CONTEXT_FIELD.setAccessible(true);
-        Method get_counter_method;
         try {
-          get_counter_method = Class.forName(PACKAGE + ".TaskAttemptContext").getMethod("getCounter",
String.class,
-                  String.class);
-        } catch (Exception e) {
-          get_counter_method = Class.forName(PACKAGE + ".TaskInputOutputContext").getMethod("getCounter",
-                  String.class, String.class);
+          Class<?> taskAttemptContextClass = Class.forName(PACKAGE + ".TaskAttemptContext");
+          Method getCounterMethodForTaskAttemptContext
+            = taskAttemptContextClass.getMethod("getCounter", String.class, String.class);
+
+          COUNTER_METHODS_BY_CLASS.put(taskAttemptContextClass, getCounterMethodForTaskAttemptContext);
+
+        } catch (ClassNotFoundException e) {
+          Class<?> taskInputOutputContextClass = Class.forName(PACKAGE + ".TaskInputOutputContext");
+          Method getCounterMethodForTaskInputOutputContextClass =
+            taskInputOutputContextClass.getMethod("getCounter", String.class, String.class);
+
+          COUNTER_METHODS_BY_CLASS.put(taskInputOutputContextClass, getCounterMethodForTaskInputOutputContextClass);
         }
-        GET_COUNTER_METHOD=get_counter_method;
       } else {
         MAP_CONTEXT_CONSTRUCTOR =
             innerMapContextCls.getConstructor(mapCls,
@@ -161,7 +169,8 @@ public class ContextUtil {
                 InputSplit.class);
         MAP_CONTEXT_IMPL_CONSTRUCTOR = null;
         WRAPPED_CONTEXT_FIELD = null;
-        GET_COUNTER_METHOD=taskIOContextCls.getMethod("getCounter", String.class, String.class);
+
+        COUNTER_METHODS_BY_CLASS.put(taskIOContextCls, taskIOContextCls.getMethod("getCounter",
String.class, String.class));
       }
       MAP_CONTEXT_CONSTRUCTOR.setAccessible(true);
       READER_FIELD = mapContextCls.getDeclaredField("reader");
@@ -251,9 +260,33 @@ public class ContextUtil {
     }
   }
 
-  public static Counter getCounter(TaskInputOutputContext context,
-                                   String groupName, String counterName) {
-    return (Counter) invoke(GET_COUNTER_METHOD, context, groupName, counterName);
+  public static Counter getCounter(TaskAttemptContext context, String groupName, String counterName)
{
+    Method counterMethod = findCounterMethod(context);
+    return (Counter)invoke(counterMethod, context, groupName, counterName);
+  }
+
+  public static boolean hasCounterMethod(TaskAttemptContext context) {
+    return findCounterMethod(context) != null;
+  }
+
+  private static Method findCounterMethod(TaskAttemptContext context) {
+    if (context != null) {
+      if (COUNTER_METHODS_BY_CLASS.containsKey(context.getClass())) {
+        return COUNTER_METHODS_BY_CLASS.get(context.getClass());
+      }
+
+      try {
+        Method method = context.getClass().getMethod("getCounter", String.class, String.class);
+        if (method.getReturnType().isAssignableFrom(Counter.class)) {
+          COUNTER_METHODS_BY_CLASS.put(context.getClass(), method);
+          return method;
+        }
+      } catch (NoSuchMethodException e) {
+        return null;
+      }
+    }
+
+    return null;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b55390cf/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/counters/BenchmarkCounter.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/counters/BenchmarkCounter.java
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/counters/BenchmarkCounter.java
index e537783..b8521b3 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/counters/BenchmarkCounter.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/counters/BenchmarkCounter.java
@@ -20,7 +20,7 @@ package org.apache.parquet.hadoop.util.counters;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapred.Reporter;
-import org.apache.hadoop.mapreduce.TaskInputOutputContext;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.parquet.hadoop.util.counters.mapred.MapRedCounterLoader;
 import org.apache.parquet.hadoop.util.counters.mapreduce.MapReduceCounterLoader;
 
@@ -48,7 +48,7 @@ public class BenchmarkCounter {
    *
    * @param context
    */
-  public static void initCounterFromContext(TaskInputOutputContext<?, ?, ?, ?> context)
{
+  public static void initCounterFromContext(TaskAttemptContext context) {
     counterLoader = new MapReduceCounterLoader(context);
     loadCounters();
   }

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b55390cf/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/counters/mapreduce/MapReduceCounterLoader.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/counters/mapreduce/MapReduceCounterLoader.java
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/counters/mapreduce/MapReduceCounterLoader.java
index 1540f03..1bf4b97 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/counters/mapreduce/MapReduceCounterLoader.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/counters/mapreduce/MapReduceCounterLoader.java
@@ -18,6 +18,7 @@
  */
 package org.apache.parquet.hadoop.util.counters.mapreduce;
 
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.TaskInputOutputContext;
 import org.apache.parquet.hadoop.util.ContextUtil;
 import org.apache.parquet.hadoop.util.counters.BenchmarkCounter;
@@ -30,9 +31,9 @@ import org.apache.parquet.hadoop.util.counters.ICounter;
  * @author Tianshuo Deng
  */
 public class MapReduceCounterLoader implements CounterLoader {
-  private TaskInputOutputContext<?, ?, ?, ?> context;
+  private TaskAttemptContext context;
 
-  public MapReduceCounterLoader(TaskInputOutputContext<?, ?, ?, ?> context) {
+  public MapReduceCounterLoader(TaskAttemptContext context) {
     this.context = context;
   }
 

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b55390cf/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/example/TestInputOutputFormat.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/example/TestInputOutputFormat.java
b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/example/TestInputOutputFormat.java
index d1b5267..c829dc1 100644
--- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/example/TestInputOutputFormat.java
+++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/example/TestInputOutputFormat.java
@@ -329,6 +329,7 @@ public class TestInputOutputFormat {
   @Test
   public void testReadWriteWithCounter() throws Exception {
     runMapReduceJob(CompressionCodecName.GZIP);
+
     assertTrue(value(readJob, "parquet", "bytesread") > 0L);
     assertTrue(value(readJob, "parquet", "bytestotal") > 0L);
     assertTrue(value(readJob, "parquet", "bytesread")

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b55390cf/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index e6977fb..17c87c5 100644
--- a/pom.xml
+++ b/pom.xml
@@ -221,6 +221,7 @@
                    <dumpDetails>true</dumpDetails>
                    <previousVersion>${previous.version}</previousVersion>
                    <excludes>
+                     <exclude>org/apache/parquet/hadoop/util/**</exclude>
                      <exclude>org/apache/parquet/thrift/projection/**</exclude>
                      <exclude>org/apache/parquet/thrift/ThriftSchemaConverter</exclude>
                      <exclude>org/apache/parquet/filter2/**</exclude>


Mime
View raw message