accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From md...@apache.org
Subject [04/19] git commit: ACCUMULO-2564 Backport changes to unify Hadoop 1/2
Date Fri, 28 Mar 2014 21:26:08 GMT
ACCUMULO-2564 Backport changes to unify Hadoop 1/2

This is a backport of the changes made for Accumulo 1.5.0 and 1.5.1 to
establish binary compatibility with both Hadoop 1 and 2 branches. This
work was originally done under several commits and issues, as outlined
below. Additionally, there was some new work that needed to be done,
but is not applicable to later branches.

ACCUMULO-1421, commit c9c0d45 by Adam Fuchs
ACCUMULO-1421, commit d7ba6ca by Christopher Tubbs
ACCUMULO-1421, commit 261cf36 by Eric Newton
ACCUMULO-1421, commit cc3c2d8 by Eric Newton

Issue: ACCUMULO-1809
Author: Eric Newton
Reason: use reflection tricks to update counters
Commit: a7e159219a29ca6f127616fd965aa857900e3f9c

Issue: n/a
Reason: Delegate reflection to ContextFactory

The InsertWithOutputFormat functional test uses ContextFactory, so we
have to update that to use the reflection hack as well. And while we're
there, might as well keep the code in one place.


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/9e5854fb
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/9e5854fb
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/9e5854fb

Branch: refs/heads/1.5.2-SNAPSHOT
Commit: 9e5854fb3ff90c0e80523394401f4103eb4212a2
Parents: 1258b4e
Author: Mike Drob <mdrob@cloudera.com>
Authored: Thu Mar 27 02:31:04 2014 -0400
Committer: Mike Drob <mdrob@cloudera.com>
Committed: Fri Mar 28 16:56:54 2014 -0400

----------------------------------------------------------------------
 .../mapreduce/AccumuloFileOutputFormat.java     | 12 +--
 .../client/mapreduce/AccumuloInputFormat.java   |  2 +-
 .../client/mapreduce/AccumuloOutputFormat.java  | 38 ++++-----
 .../core/client/mapreduce/InputFormatBase.java  | 82 ++++++++++----------
 .../lib/partition/RangePartitioner.java         |  7 +-
 .../accumulo/core/util/ContextFactory.java      | 24 +++++-
 .../org/apache/accumulo/server/Accumulo.java    | 16 +---
 .../apache/accumulo/server/master/LogSort.java  |  7 +-
 .../server/test/continuous/ContinuousMoru.java  |  2 +-
 .../test/continuous/ContinuousVerify.java       | 29 ++++++-
 10 files changed, 127 insertions(+), 92 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/9e5854fb/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloFileOutputFormat.java
----------------------------------------------------------------------
diff --git a/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloFileOutputFormat.java
b/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloFileOutputFormat.java
index 7cfab8b..8b03ace 100644
--- a/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloFileOutputFormat.java
+++ b/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloFileOutputFormat.java
@@ -56,7 +56,7 @@ public class AccumuloFileOutputFormat extends FileOutputFormat<Key,Value>
{
   @Override
   public RecordWriter<Key,Value> getRecordWriter(TaskAttemptContext job) throws IOException,
InterruptedException {
     // get the path of the temporary output file
-    final Configuration conf = job.getConfiguration();
+    final Configuration conf = InputFormatBase.getConfiguration(job);
     
     String extension = conf.get(FILE_TYPE);
     if (extension == null || extension.isEmpty())
@@ -92,7 +92,7 @@ public class AccumuloFileOutputFormat extends FileOutputFormat<Key,Value>
{
    */
   @Deprecated
   protected static void handleBlockSize(JobContext job) {
-    handleBlockSize(job.getConfiguration());
+    handleBlockSize(InputFormatBase.getConfiguration(job));
   }
   
   protected static void handleBlockSize(Configuration conf) {
@@ -111,7 +111,7 @@ public class AccumuloFileOutputFormat extends FileOutputFormat<Key,Value>
{
    */
   @Deprecated
   public static void setFileType(JobContext job, String type) {
-    setFileType(job.getConfiguration(), type);
+    setFileType(InputFormatBase.getConfiguration(job), type);
   }
   
   public static void setFileType(Configuration conf, String type) {
@@ -123,7 +123,7 @@ public class AccumuloFileOutputFormat extends FileOutputFormat<Key,Value>
{
    */
   @Deprecated
   public static void setBlockSize(JobContext job, int blockSize) {
-    setBlockSize(job.getConfiguration(), blockSize);
+    setBlockSize(InputFormatBase.getConfiguration(job), blockSize);
   }
   
   public static void setBlockSize(Configuration conf, int blockSize) {
@@ -139,7 +139,7 @@ public class AccumuloFileOutputFormat extends FileOutputFormat<Key,Value>
{
    */
   @Deprecated
   public static void setZooKeeperInstance(JobContext job, String instanceName, String zooKeepers)
{
-    setZooKeeperInstance(job.getConfiguration(), instanceName, zooKeepers);
+    setZooKeeperInstance(InputFormatBase.getConfiguration(job), instanceName, zooKeepers);
   }
   
   public static void setZooKeeperInstance(Configuration conf, String instanceName, String
zooKeepers) {
@@ -159,7 +159,7 @@ public class AccumuloFileOutputFormat extends FileOutputFormat<Key,Value>
{
    */
   @Deprecated
   protected static Instance getInstance(JobContext job) {
-    return getInstance(job.getConfiguration());
+    return getInstance(InputFormatBase.getConfiguration(job));
   }
   
   /**

http://git-wip-us.apache.org/repos/asf/accumulo/blob/9e5854fb/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormat.java
----------------------------------------------------------------------
diff --git a/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormat.java
b/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormat.java
index c9a70eb..5c5c18d 100644
--- a/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormat.java
+++ b/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormat.java
@@ -44,7 +44,7 @@ import org.apache.log4j.Level;
 public class AccumuloInputFormat extends InputFormatBase<Key,Value> {
   @Override
   public RecordReader<Key,Value> createRecordReader(InputSplit split, TaskAttemptContext
context) throws IOException, InterruptedException {
-    log.setLevel(getLogLevel(context.getConfiguration()));
+    log.setLevel(getLogLevel(InputFormatBase.getConfiguration(context)));
     
     // Override the log level from the configuration as if the RangeInputSplit has one it's
the more correct one to use.
     if (split instanceof RangeInputSplit) {

http://git-wip-us.apache.org/repos/asf/accumulo/blob/9e5854fb/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormat.java
----------------------------------------------------------------------
diff --git a/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormat.java
b/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormat.java
index ed0aebf..7aee8b6 100644
--- a/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormat.java
+++ b/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormat.java
@@ -105,7 +105,7 @@ public class AccumuloOutputFormat extends OutputFormat<Text,Mutation>
{
    */
   @Deprecated
   public static void setOutputInfo(JobContext job, String user, byte[] passwd, boolean createTables,
String defaultTable) {
-    setOutputInfo(job.getConfiguration(), user, passwd, createTables, defaultTable);
+    setOutputInfo(InputFormatBase.getConfiguration(job), user, passwd, createTables, defaultTable);
   }
   
   /**
@@ -140,7 +140,7 @@ public class AccumuloOutputFormat extends OutputFormat<Text,Mutation>
{
    */
   @Deprecated
   public static void setZooKeeperInstance(JobContext job, String instanceName, String zooKeepers)
{
-    setZooKeeperInstance(job.getConfiguration(), instanceName, zooKeepers);
+    setZooKeeperInstance(InputFormatBase.getConfiguration(job), instanceName, zooKeepers);
   }
   
   public static void setZooKeeperInstance(Configuration conf, String instanceName, String
zooKeepers) {
@@ -158,7 +158,7 @@ public class AccumuloOutputFormat extends OutputFormat<Text,Mutation>
{
    */
   @Deprecated
   public static void setMockInstance(JobContext job, String instanceName) {
-    setMockInstance(job.getConfiguration(), instanceName);
+    setMockInstance(InputFormatBase.getConfiguration(job), instanceName);
   }
   
   public static void setMockInstance(Configuration conf, String instanceName) {
@@ -172,7 +172,7 @@ public class AccumuloOutputFormat extends OutputFormat<Text,Mutation>
{
    */
   @Deprecated
   public static void setMaxMutationBufferSize(JobContext job, long numberOfBytes) {
-    setMaxMutationBufferSize(job.getConfiguration(), numberOfBytes);
+    setMaxMutationBufferSize(InputFormatBase.getConfiguration(job), numberOfBytes);
   }
   
   public static void setMaxMutationBufferSize(Configuration conf, long numberOfBytes) {
@@ -184,7 +184,7 @@ public class AccumuloOutputFormat extends OutputFormat<Text,Mutation>
{
    */
   @Deprecated
   public static void setMaxLatency(JobContext job, int numberOfMilliseconds) {
-    setMaxLatency(job.getConfiguration(), numberOfMilliseconds);
+    setMaxLatency(InputFormatBase.getConfiguration(job), numberOfMilliseconds);
   }
   
   public static void setMaxLatency(Configuration conf, int numberOfMilliseconds) {
@@ -196,7 +196,7 @@ public class AccumuloOutputFormat extends OutputFormat<Text,Mutation>
{
    */
   @Deprecated
   public static void setMaxWriteThreads(JobContext job, int numberOfThreads) {
-    setMaxWriteThreads(job.getConfiguration(), numberOfThreads);
+    setMaxWriteThreads(InputFormatBase.getConfiguration(job), numberOfThreads);
   }
   
   public static void setMaxWriteThreads(Configuration conf, int numberOfThreads) {
@@ -208,7 +208,7 @@ public class AccumuloOutputFormat extends OutputFormat<Text,Mutation>
{
    */
   @Deprecated
   public static void setLogLevel(JobContext job, Level level) {
-    setLogLevel(job.getConfiguration(), level);
+    setLogLevel(InputFormatBase.getConfiguration(job), level);
   }
   
   public static void setLogLevel(Configuration conf, Level level) {
@@ -221,7 +221,7 @@ public class AccumuloOutputFormat extends OutputFormat<Text,Mutation>
{
    */
   @Deprecated
   public static void setSimulationMode(JobContext job) {
-    setSimulationMode(job.getConfiguration());
+    setSimulationMode(InputFormatBase.getConfiguration(job));
   }
   
   public static void setSimulationMode(Configuration conf) {
@@ -233,7 +233,7 @@ public class AccumuloOutputFormat extends OutputFormat<Text,Mutation>
{
    */
   @Deprecated
   protected static String getUsername(JobContext job) {
-    return getUsername(job.getConfiguration());
+    return getUsername(InputFormatBase.getConfiguration(job));
   }
   
   protected static String getUsername(Configuration conf) {
@@ -248,7 +248,7 @@ public class AccumuloOutputFormat extends OutputFormat<Text,Mutation>
{
    */
   @Deprecated
   protected static byte[] getPassword(JobContext job) {
-    return getPassword(job.getConfiguration());
+    return getPassword(InputFormatBase.getConfiguration(job));
   }
   
   /**
@@ -264,7 +264,7 @@ public class AccumuloOutputFormat extends OutputFormat<Text,Mutation>
{
    */
   @Deprecated
   protected static boolean canCreateTables(JobContext job) {
-    return canCreateTables(job.getConfiguration());
+    return canCreateTables(InputFormatBase.getConfiguration(job));
   }
   
   protected static boolean canCreateTables(Configuration conf) {
@@ -276,7 +276,7 @@ public class AccumuloOutputFormat extends OutputFormat<Text,Mutation>
{
    */
   @Deprecated
   protected static String getDefaultTableName(JobContext job) {
-    return getDefaultTableName(job.getConfiguration());
+    return getDefaultTableName(InputFormatBase.getConfiguration(job));
   }
   
   protected static String getDefaultTableName(Configuration conf) {
@@ -288,7 +288,7 @@ public class AccumuloOutputFormat extends OutputFormat<Text,Mutation>
{
    */
   @Deprecated
   protected static Instance getInstance(JobContext job) {
-    return getInstance(job.getConfiguration());
+    return getInstance(InputFormatBase.getConfiguration(job));
   }
   
   protected static Instance getInstance(Configuration conf) {
@@ -302,7 +302,7 @@ public class AccumuloOutputFormat extends OutputFormat<Text,Mutation>
{
    */
   @Deprecated
   protected static long getMaxMutationBufferSize(JobContext job) {
-    return getMaxMutationBufferSize(job.getConfiguration());
+    return getMaxMutationBufferSize(InputFormatBase.getConfiguration(job));
   }
   
   protected static long getMaxMutationBufferSize(Configuration conf) {
@@ -314,7 +314,7 @@ public class AccumuloOutputFormat extends OutputFormat<Text,Mutation>
{
    */
   @Deprecated
   protected static int getMaxLatency(JobContext job) {
-    return getMaxLatency(job.getConfiguration());
+    return getMaxLatency(InputFormatBase.getConfiguration(job));
   }
   
   protected static int getMaxLatency(Configuration conf) {
@@ -326,7 +326,7 @@ public class AccumuloOutputFormat extends OutputFormat<Text,Mutation>
{
    */
   @Deprecated
   protected static int getMaxWriteThreads(JobContext job) {
-    return getMaxWriteThreads(job.getConfiguration());
+    return getMaxWriteThreads(InputFormatBase.getConfiguration(job));
   }
   
   protected static int getMaxWriteThreads(Configuration conf) {
@@ -338,7 +338,7 @@ public class AccumuloOutputFormat extends OutputFormat<Text,Mutation>
{
    */
   @Deprecated
   protected static Level getLogLevel(JobContext job) {
-    return getLogLevel(job.getConfiguration());
+    return getLogLevel(InputFormatBase.getConfiguration(job));
   }
   
   protected static Level getLogLevel(Configuration conf) {
@@ -352,7 +352,7 @@ public class AccumuloOutputFormat extends OutputFormat<Text,Mutation>
{
    */
   @Deprecated
   protected static boolean getSimulationMode(JobContext job) {
-    return getSimulationMode(job.getConfiguration());
+    return getSimulationMode(InputFormatBase.getConfiguration(job));
   }
   
   protected static boolean getSimulationMode(Configuration conf) {
@@ -513,7 +513,7 @@ public class AccumuloOutputFormat extends OutputFormat<Text,Mutation>
{
   
   @Override
   public void checkOutputSpecs(JobContext job) throws IOException {
-    Configuration conf = job.getConfiguration();
+    Configuration conf = InputFormatBase.getConfiguration(job);
     if (!conf.getBoolean(OUTPUT_INFO_HAS_BEEN_SET, false))
       throw new IOException("Output info has not been set.");
     if (!conf.getBoolean(INSTANCE_HAS_BEEN_SET, false))

http://git-wip-us.apache.org/repos/asf/accumulo/blob/9e5854fb/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java
----------------------------------------------------------------------
diff --git a/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java
b/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java
index a11096c..b71f592 100644
--- a/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java
+++ b/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java
@@ -72,6 +72,7 @@ import org.apache.accumulo.core.security.TablePermission;
 import org.apache.accumulo.core.security.thrift.AuthInfo;
 import org.apache.accumulo.core.util.ArgumentChecker;
 import org.apache.accumulo.core.util.ColumnFQ;
+import org.apache.accumulo.core.util.ContextFactory;
 import org.apache.accumulo.core.util.Pair;
 import org.apache.accumulo.core.util.TextUtil;
 import org.apache.accumulo.core.util.UtilWaitThread;
@@ -148,7 +149,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V>
{
    */
   @Deprecated
   public static void setIsolated(JobContext job, boolean enable) {
-    setIsolated(job.getConfiguration(), enable);
+    setIsolated(InputFormatBase.getConfiguration(job), enable);
   }
 
   /**
@@ -168,7 +169,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V>
{
    */
   @Deprecated
   public static void setLocalIterators(JobContext job, boolean enable) {
-    setLocalIterators(job.getConfiguration(), enable);
+    setLocalIterators(InputFormatBase.getConfiguration(job), enable);
   }
 
   /**
@@ -188,7 +189,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V>
{
    */
   @Deprecated
   public static void setInputInfo(JobContext job, String user, byte[] passwd, String table,
Authorizations auths) {
-    setInputInfo(job.getConfiguration(), user, passwd, table, auths);
+    setInputInfo(InputFormatBase.getConfiguration(job), user, passwd, table, auths);
   }
 
   /**
@@ -223,7 +224,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V>
{
    */
   @Deprecated
   public static void setZooKeeperInstance(JobContext job, String instanceName, String zooKeepers)
{
-    setZooKeeperInstance(job.getConfiguration(), instanceName, zooKeepers);
+    setZooKeeperInstance(InputFormatBase.getConfiguration(job), instanceName, zooKeepers);
   }
 
   /**
@@ -251,7 +252,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V>
{
    */
   @Deprecated
   public static void setMockInstance(JobContext job, String instanceName) {
-    setMockInstance(job.getConfiguration(), instanceName);
+    setMockInstance(InputFormatBase.getConfiguration(job), instanceName);
   }
 
   /**
@@ -273,7 +274,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V>
{
    */
   @Deprecated
   public static void setRanges(JobContext job, Collection<Range> ranges) {
-    setRanges(job.getConfiguration(), ranges);
+    setRanges(InputFormatBase.getConfiguration(job), ranges);
   }
 
   /**
@@ -304,7 +305,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V>
{
    */
   @Deprecated
   public static void disableAutoAdjustRanges(JobContext job) {
-    disableAutoAdjustRanges(job.getConfiguration());
+    disableAutoAdjustRanges(InputFormatBase.getConfiguration(job));
   }
 
   /**
@@ -354,7 +355,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V>
{
         throw new NoSuchElementException();
     }
     try {
-      job.getConfiguration().set(key, URLEncoder.encode(regex, "UTF-8"));
+      InputFormatBase.getConfiguration(job).set(key, URLEncoder.encode(regex, "UTF-8"));
     } catch (UnsupportedEncodingException e) {
       log.error("Failedd to encode regular expression", e);
       throw new RuntimeException(e);
@@ -366,7 +367,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V>
{
    */
   @Deprecated
   public static void setMaxVersions(JobContext job, int maxVersions) throws IOException {
-    setMaxVersions(job.getConfiguration(), maxVersions);
+    setMaxVersions(InputFormatBase.getConfiguration(job), maxVersions);
   }
 
   /**
@@ -422,7 +423,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V>
{
    */
   @Deprecated
   public static void fetchColumns(JobContext job, Collection<Pair<Text,Text>>
columnFamilyColumnQualifierPairs) {
-    fetchColumns(job.getConfiguration(), columnFamilyColumnQualifierPairs);
+    fetchColumns(InputFormatBase.getConfiguration(job), columnFamilyColumnQualifierPairs);
   }
 
   /**
@@ -460,7 +461,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V>
{
    */
   @Deprecated
   public static void setLogLevel(JobContext job, Level level) {
-    setLogLevel(job.getConfiguration(), level);
+    setLogLevel(InputFormatBase.getConfiguration(job), level);
   }
 
   /**
@@ -482,7 +483,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V>
{
    */
   @Deprecated
   public static void addIterator(JobContext job, IteratorSetting cfg) {
-    addIterator(job.getConfiguration(), cfg);
+    addIterator(InputFormatBase.getConfiguration(job), cfg);
   }
 
   /**
@@ -543,7 +544,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V>
{
   @Deprecated
   public static void setIterator(JobContext job, int priority, String iteratorClass, String
iteratorName) {
     // First check to see if anything has been set already
-    String iterators = job.getConfiguration().get(ITERATORS);
+    String iterators = InputFormatBase.getConfiguration(job).get(ITERATORS);
 
     // No iterators specified yet, create a new string
     if (iterators == null || iterators.isEmpty()) {
@@ -553,7 +554,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V>
{
       iterators = iterators.concat(ITERATORS_DELIM + new AccumuloIterator(priority, iteratorClass,
iteratorName).toString());
     }
     // Store the iterators w/ the job
-    job.getConfiguration().set(ITERATORS, iterators);
+    InputFormatBase.getConfiguration(job).set(ITERATORS, iterators);
 
   }
 
@@ -576,7 +577,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V>
{
     if (iteratorName == null || key == null || value == null)
       return;
 
-    String iteratorOptions = job.getConfiguration().get(ITERATORS_OPTIONS);
+    String iteratorOptions = InputFormatBase.getConfiguration(job).get(ITERATORS_OPTIONS);
 
     // No options specified yet, create a new string
     if (iteratorOptions == null || iteratorOptions.isEmpty()) {
@@ -587,7 +588,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V>
{
     }
 
     // Store the options w/ the job
-    job.getConfiguration().set(ITERATORS_OPTIONS, iteratorOptions);
+    InputFormatBase.getConfiguration(job).set(ITERATORS_OPTIONS, iteratorOptions);
   }
 
   /**
@@ -595,7 +596,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V>
{
    */
   @Deprecated
   protected static boolean isIsolated(JobContext job) {
-    return isIsolated(job.getConfiguration());
+    return isIsolated(InputFormatBase.getConfiguration(job));
   }
 
   /**
@@ -615,7 +616,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V>
{
    */
   @Deprecated
   protected static boolean usesLocalIterators(JobContext job) {
-    return usesLocalIterators(job.getConfiguration());
+    return usesLocalIterators(InputFormatBase.getConfiguration(job));
   }
 
   /**
@@ -635,7 +636,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V>
{
    */
   @Deprecated
   protected static String getUsername(JobContext job) {
-    return getUsername(job.getConfiguration());
+    return getUsername(InputFormatBase.getConfiguration(job));
   }
 
   /**
@@ -658,7 +659,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V>
{
    */
   @Deprecated
   protected static byte[] getPassword(JobContext job) {
-    return getPassword(job.getConfiguration());
+    return getPassword(InputFormatBase.getConfiguration(job));
   }
 
   /**
@@ -679,7 +680,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V>
{
    */
   @Deprecated
   protected static String getTablename(JobContext job) {
-    return getTablename(job.getConfiguration());
+    return getTablename(InputFormatBase.getConfiguration(job));
   }
 
   /**
@@ -699,7 +700,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V>
{
    */
   @Deprecated
   protected static Authorizations getAuthorizations(JobContext job) {
-    return getAuthorizations(job.getConfiguration());
+    return getAuthorizations(InputFormatBase.getConfiguration(job));
   }
 
   /**
@@ -720,7 +721,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V>
{
    */
   @Deprecated
   protected static Instance getInstance(JobContext job) {
-    return getInstance(job.getConfiguration());
+    return getInstance(InputFormatBase.getConfiguration(job));
   }
 
   /**
@@ -743,7 +744,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V>
{
    */
   @Deprecated
   protected static TabletLocator getTabletLocator(JobContext job) throws TableNotFoundException
{
-    return getTabletLocator(job.getConfiguration());
+    return getTabletLocator(InputFormatBase.getConfiguration(job));
   }
 
   /**
@@ -771,7 +772,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V>
{
    */
   @Deprecated
   protected static List<Range> getRanges(JobContext job) throws IOException {
-    return getRanges(job.getConfiguration());
+    return getRanges(InputFormatBase.getConfiguration(job));
   }
 
   /**
@@ -819,7 +820,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V>
{
         throw new NoSuchElementException();
     }
     try {
-      String s = job.getConfiguration().get(key);
+      String s = InputFormatBase.getConfiguration(job).get(key);
       if (s == null)
         return null;
       return URLDecoder.decode(s, "UTF-8");
@@ -834,7 +835,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V>
{
    */
   @Deprecated
   protected static Set<Pair<Text,Text>> getFetchedColumns(JobContext job) {
-    return getFetchedColumns(job.getConfiguration());
+    return getFetchedColumns(InputFormatBase.getConfiguration(job));
   }
 
   /**
@@ -872,7 +873,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V>
{
    */
   @Deprecated
   protected static boolean getAutoAdjustRanges(JobContext job) {
-    return getAutoAdjustRanges(job.getConfiguration());
+    return getAutoAdjustRanges(InputFormatBase.getConfiguration(job));
   }
 
   /**
@@ -892,7 +893,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V>
{
    */
   @Deprecated
   protected static Level getLogLevel(JobContext job) {
-    return getLogLevel(job.getConfiguration());
+    return getLogLevel(InputFormatBase.getConfiguration(job));
   }
 
   /**
@@ -914,7 +915,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V>
{
    */
   @Deprecated
   protected static void validateOptions(JobContext job) throws IOException {
-    validateOptions(job.getConfiguration());
+    validateOptions(InputFormatBase.getConfiguration(job));
   }
 
   // InputFormat doesn't have the equivalent of OutputFormat's
@@ -960,7 +961,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V>
{
    */
   @Deprecated
   protected static int getMaxVersions(JobContext job) {
-    return getMaxVersions(job.getConfiguration());
+    return getMaxVersions(InputFormatBase.getConfiguration(job));
   }
 
   /**
@@ -986,7 +987,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V>
{
    */
   @Deprecated
   protected static List<AccumuloIterator> getIterators(JobContext job) {
-    return getIterators(job.getConfiguration());
+    return getIterators(InputFormatBase.getConfiguration(job));
   }
 
   /**
@@ -1020,7 +1021,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V>
{
    */
   @Deprecated
   protected static List<AccumuloIteratorOption> getIteratorOptions(JobContext job)
{
-    return getIteratorOptions(job.getConfiguration());
+    return getIteratorOptions(InputFormatBase.getConfiguration(job));
   }
 
   /**
@@ -1136,7 +1137,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V>
{
       Scanner scanner;
       split = (RangeInputSplit) inSplit;
       log.debug("Initializing input split: " + split.getRange());
-      Configuration conf = attempt.getConfiguration();
+      Configuration conf = InputFormatBase.getConfiguration(attempt);
 
       Instance instance = split.getInstance();
       if (null == instance) {
@@ -1296,8 +1297,8 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V>
{
 
     Map<String,Map<KeyExtent,List<Range>>> binnedRanges = new HashMap<String,Map<KeyExtent,List<Range>>>();
 
-    Instance instance = getInstance(job.getConfiguration());
-    Connector conn = instance.getConnector(getUsername(job.getConfiguration()), getPassword(job.getConfiguration()));
+    Instance instance = getInstance(InputFormatBase.getConfiguration(job));
+    Connector conn = instance.getConnector(getUsername(InputFormatBase.getConfiguration(job)),
getPassword(InputFormatBase.getConfiguration(job)));
     String tableId = Tables.getTableId(instance, tableName);
 
     if (Tables.getTableState(instance, tableId) != TableState.OFFLINE) {
@@ -1395,7 +1396,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V>
{
    * Read the metadata table to get tablets and match up ranges to them.
    */
   public List<InputSplit> getSplits(JobContext job) throws IOException {
-    Configuration conf = job.getConfiguration();
+    Configuration conf = InputFormatBase.getConfiguration(job);
 
     log.setLevel(getLogLevel(conf));
     validateOptions(conf);
@@ -1428,7 +1429,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V>
{
     Map<String,Map<KeyExtent,List<Range>>> binnedRanges = new HashMap<String,Map<KeyExtent,List<Range>>>();
     TabletLocator tl;
     try {
-      if (isOfflineScan(job.getConfiguration())) {
+      if (isOfflineScan(InputFormatBase.getConfiguration(job))) {
         binnedRanges = binOfflineTable(job, tableName, ranges);
         while (binnedRanges == null) {
           // Some tablets were still online, try again
@@ -1437,7 +1438,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V>
{
         }
       } else {
         String tableId = null;
-        tl = getTabletLocator(job.getConfiguration());
+        tl = getTabletLocator(InputFormatBase.getConfiguration(job));
         // its possible that the cache could contain complete, but old information about
a tables tablets... so clear it
         tl.invalidateCache();
         while (!tl.binRanges(ranges, binnedRanges).isEmpty()) {
@@ -1622,4 +1623,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V>
{
 
   }
 
+  public static Configuration getConfiguration(JobContext context) {
+    return ContextFactory.getConfiguration(context);
+  }
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/9e5854fb/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/partition/RangePartitioner.java
----------------------------------------------------------------------
diff --git a/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/partition/RangePartitioner.java
b/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/partition/RangePartitioner.java
index ae537f9..2a39dc7 100644
--- a/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/partition/RangePartitioner.java
+++ b/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/partition/RangePartitioner.java
@@ -25,6 +25,7 @@ import java.util.Arrays;
 import java.util.Scanner;
 import java.util.TreeSet;
 
+import org.apache.accumulo.core.client.mapreduce.InputFormatBase;
 import org.apache.commons.codec.binary.Base64;
 import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
@@ -119,14 +120,14 @@ public class RangePartitioner extends Partitioner<Text,Writable>
implements Conf
    */
   public static void setSplitFile(JobContext job, String file) {
     URI uri = new Path(file).toUri();
-    DistributedCache.addCacheFile(uri, job.getConfiguration());
-    job.getConfiguration().set(CUTFILE_KEY, uri.getPath());
+    DistributedCache.addCacheFile(uri, InputFormatBase.getConfiguration(job));
+    InputFormatBase.getConfiguration(job).set(CUTFILE_KEY, uri.getPath());
   }
   
   /**
    * Sets the number of random sub-bins per range
    */
   public static void setNumSubBins(JobContext job, int num) {
-    job.getConfiguration().setInt(NUM_SUBBINS, num);
+    InputFormatBase.getConfiguration(job).setInt(NUM_SUBBINS, num);
   }
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/9e5854fb/src/core/src/main/java/org/apache/accumulo/core/util/ContextFactory.java
----------------------------------------------------------------------
diff --git a/src/core/src/main/java/org/apache/accumulo/core/util/ContextFactory.java b/src/core/src/main/java/org/apache/accumulo/core/util/ContextFactory.java
index 5a1c2ef..4979dfa 100644
--- a/src/core/src/main/java/org/apache/accumulo/core/util/ContextFactory.java
+++ b/src/core/src/main/java/org/apache/accumulo/core/util/ContextFactory.java
@@ -20,6 +20,7 @@ package org.apache.accumulo.core.util;
 
 import java.lang.reflect.Constructor;
 import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapreduce.InputSplit;
@@ -46,6 +47,7 @@ public class ContextFactory {
   private static final Constructor<?> MAP_CONSTRUCTOR;
   private static final Constructor<?> MAP_CONTEXT_CONSTRUCTOR;
   private static final Constructor<?> MAP_CONTEXT_IMPL_CONSTRUCTOR;
+  private static final Method GET_CONFIGURATION_METHOD;
   private static final Class<?> TASK_TYPE_CLASS;
   private static final boolean useV21;
   
@@ -63,6 +65,8 @@ public class ContextFactory {
     Class<?> mapCls;
     Class<?> mapContextCls;
     Class<?> innerMapContextCls;
+    Class<?> jobContextRoot;
+
     try {
       if (v21) {
         jobContextCls = Class.forName(PACKAGE + ".task.JobContextImpl");
@@ -79,6 +83,7 @@ public class ContextFactory {
         mapCls = Class.forName(PACKAGE + ".Mapper");
         innerMapContextCls = Class.forName(PACKAGE + ".Mapper$Context");
       }
+      jobContextRoot = Class.forName(PACKAGE + ".JobContext");
     } catch (ClassNotFoundException e) {
       throw new IllegalArgumentException("Can't find class", e);
     }
@@ -104,6 +109,7 @@ public class ContextFactory {
         MAP_CONTEXT_IMPL_CONSTRUCTOR = null;
       }
       MAP_CONTEXT_CONSTRUCTOR.setAccessible(true);
+      GET_CONFIGURATION_METHOD = jobContextRoot.getMethod("getConfiguration");
     } catch (SecurityException e) {
       throw new IllegalArgumentException("Can't run constructor ", e);
     } catch (NoSuchMethodException e) {
@@ -127,8 +133,20 @@ public class ContextFactory {
     }
   }
   
+  public static Configuration getConfiguration(JobContext context) {
+    try {
+      return (Configuration) GET_CONFIGURATION_METHOD.invoke(context, new Object[0]);
+    } catch (IllegalArgumentException e) {
+      throw new IllegalArgumentException("Can't invoke method", e);
+    } catch (IllegalAccessException e) {
+      throw new IllegalArgumentException("Can't invoke method", e);
+    } catch (InvocationTargetException e) {
+      throw new IllegalArgumentException("Can't invoke method", e);
+    }
+  }
+
   public static TaskAttemptContext createTaskAttemptContext(JobContext job) {
-    return createTaskAttemptContext(job.getConfiguration());
+    return createTaskAttemptContext(getConfiguration(job));
   }
   
   public static TaskAttemptContext createTaskAttemptContext(Configuration conf) {
@@ -157,10 +175,10 @@ public class ContextFactory {
       RecordWriter<K2,V2> writer, OutputCommitter committer, StatusReporter reporter,
InputSplit split) {
     try {
       if (useV21) {
-        Object basis = MAP_CONTEXT_IMPL_CONSTRUCTOR.newInstance(tac.getConfiguration(), tac.getTaskAttemptID(),
reader, writer, committer, reporter, split);
+        Object basis = MAP_CONTEXT_IMPL_CONSTRUCTOR.newInstance(getConfiguration(tac), tac.getTaskAttemptID(),
reader, writer, committer, reporter, split);
         return (Mapper.Context) MAP_CONTEXT_CONSTRUCTOR.newInstance((Mapper<K1,V1,K2,V2>)
MAP_CONSTRUCTOR.newInstance(), basis);
       } else {
-        return (Mapper.Context) MAP_CONTEXT_CONSTRUCTOR.newInstance(m, tac.getConfiguration(),
tac.getTaskAttemptID(), reader, writer, committer, reporter,
+        return (Mapper.Context) MAP_CONTEXT_CONSTRUCTOR.newInstance(m, getConfiguration(tac),
tac.getTaskAttemptID(), reader, writer, committer, reporter,
             split);
       }
     } catch (InstantiationException e) {

http://git-wip-us.apache.org/repos/asf/accumulo/blob/9e5854fb/src/server/src/main/java/org/apache/accumulo/server/Accumulo.java
----------------------------------------------------------------------
diff --git a/src/server/src/main/java/org/apache/accumulo/server/Accumulo.java b/src/server/src/main/java/org/apache/accumulo/server/Accumulo.java
index 184692c..7747ea3 100644
--- a/src/server/src/main/java/org/apache/accumulo/server/Accumulo.java
+++ b/src/server/src/main/java/org/apache/accumulo/server/Accumulo.java
@@ -250,28 +250,18 @@ public class Accumulo {
     DistributedFileSystem dfs = (DistributedFileSystem) FileSystem.get(CachedConfiguration.getInstance());
     // So this:  if (!dfs.setSafeMode(SafeModeAction.SAFEMODE_GET))
     // Becomes this:
-    Class<?> constantClass;
+    Class<?> safeModeAction;
     try {
       // hadoop 2.0
-      constantClass = Class.forName("org.apache.hadoop.hdfs.protocol.HdfsConstants");
+      safeModeAction = Class.forName("org.apache.hadoop.hdfs.protocol.HdfsConstants$SafeModeAction");
     } catch (ClassNotFoundException ex) {
       // hadoop 1.0
       try {
-        constantClass = Class.forName("org.apache.hadoop.hdfs.protocol.FSConstants");
+        safeModeAction = Class.forName("org.apache.hadoop.hdfs.protocol.FSConstants$SafeModeAction");
       } catch (ClassNotFoundException e) {
         throw new RuntimeException("Cannot figure out the right class for Constants");
       }
     }
-    Class<?> safeModeAction = null;
-    for (Class<?> klass : constantClass.getDeclaredClasses()) {
-      if (klass.getSimpleName().equals("SafeModeAction")) {
-        safeModeAction = klass;
-        break;
-      }
-    }
-    if (safeModeAction == null) {
-      throw new RuntimeException("Cannot find SafeModeAction in constants class");
-    }
     
     Object get = null;
     for (Object obj : safeModeAction.getEnumConstants()) {

http://git-wip-us.apache.org/repos/asf/accumulo/blob/9e5854fb/src/server/src/main/java/org/apache/accumulo/server/master/LogSort.java
----------------------------------------------------------------------
diff --git a/src/server/src/main/java/org/apache/accumulo/server/master/LogSort.java b/src/server/src/main/java/org/apache/accumulo/server/master/LogSort.java
index 1c384a3..10f9151 100644
--- a/src/server/src/main/java/org/apache/accumulo/server/master/LogSort.java
+++ b/src/server/src/main/java/org/apache/accumulo/server/master/LogSort.java
@@ -21,6 +21,7 @@ import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.accumulo.core.client.mapreduce.AccumuloFileOutputFormat;
+import org.apache.accumulo.core.client.mapreduce.InputFormatBase;
 import org.apache.accumulo.core.util.CachedConfiguration;
 import org.apache.accumulo.server.client.HdfsZooInstance;
 import org.apache.accumulo.server.logger.IdentityReducer;
@@ -76,7 +77,7 @@ public class LogSort extends Configured implements Tool {
     public SortCommit(Path outputPath, TaskAttemptContext context) throws IOException {
       super(outputPath, context);
       this.outputPath = outputPath;
-      outputFileSystem = outputPath.getFileSystem(context.getConfiguration());
+      outputFileSystem = outputPath.getFileSystem(InputFormatBase.getConfiguration(context));
     }
 
     @Override
@@ -207,7 +208,7 @@ public class LogSort extends Configured implements Tool {
       // get the path of the temporary output file
       Path file = getDefaultWorkFile(job, "");
 
-      FileSystem fs = file.getFileSystem(job.getConfiguration());
+      FileSystem fs = file.getFileSystem(InputFormatBase.getConfiguration(job));
       CompressionCodec codec = null;
       CompressionType compressionType = CompressionType.NONE;
       if (getCompressOutput(job)) {
@@ -216,7 +217,7 @@ public class LogSort extends Configured implements Tool {
 
         // find the right codec
         Class<? extends CompressionCodec> codecClass = getOutputCompressorClass(job,
DefaultCodec.class);
-        codec = ReflectionUtils.newInstance(codecClass, job.getConfiguration());
+        codec = ReflectionUtils.newInstance(codecClass, InputFormatBase.getConfiguration(job));
       }
 
       Progressable progress = new Progressable() {

http://git-wip-us.apache.org/repos/asf/accumulo/blob/9e5854fb/src/server/src/main/java/org/apache/accumulo/server/test/continuous/ContinuousMoru.java
----------------------------------------------------------------------
diff --git a/src/server/src/main/java/org/apache/accumulo/server/test/continuous/ContinuousMoru.java
b/src/server/src/main/java/org/apache/accumulo/server/test/continuous/ContinuousMoru.java
index 1c384cc..88fbb25 100644
--- a/src/server/src/main/java/org/apache/accumulo/server/test/continuous/ContinuousMoru.java
+++ b/src/server/src/main/java/org/apache/accumulo/server/test/continuous/ContinuousMoru.java
@@ -102,7 +102,7 @@ public class ContinuousMoru extends Configured implements Tool {
         }
         
       } else {
-        context.getCounter(Counts.SELF_READ).increment(1);
+        ContinuousVerify.increment(context.getCounter(Counts.SELF_READ));
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/9e5854fb/src/server/src/main/java/org/apache/accumulo/server/test/continuous/ContinuousVerify.java
----------------------------------------------------------------------
diff --git a/src/server/src/main/java/org/apache/accumulo/server/test/continuous/ContinuousVerify.java
b/src/server/src/main/java/org/apache/accumulo/server/test/continuous/ContinuousVerify.java
index 9441cf5..4b465a8 100644
--- a/src/server/src/main/java/org/apache/accumulo/server/test/continuous/ContinuousVerify.java
+++ b/src/server/src/main/java/org/apache/accumulo/server/test/continuous/ContinuousVerify.java
@@ -17,6 +17,7 @@
 package org.apache.accumulo.server.test.continuous;
 
 import java.io.IOException;
+import java.lang.reflect.Method;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -38,6 +39,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.VLongWritable;
+import org.apache.hadoop.mapreduce.Counter;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.hadoop.mapreduce.Reducer;
@@ -50,6 +52,25 @@ import org.apache.hadoop.util.ToolRunner;
  */
 
 public class ContinuousVerify extends Configured implements Tool {
+  // work around hadoop-1/hadoop-2 runtime incompatibility
+  static private Method INCREMENT;
+  static {
+    try {
+      Class<Counter> counter = Counter.class;
+
+      INCREMENT = counter.getMethod("increment", Long.TYPE);
+    } catch (Exception ex) {
+      throw new RuntimeException(ex);
+    }
+  }
+
+  static void increment(Object obj) {
+    try {
+      INCREMENT.invoke(obj, 1L);
+    } catch (Exception ex) {
+      throw new RuntimeException(ex);
+    }
+  }
 
   public static final VLongWritable DEF = new VLongWritable(-1);
 
@@ -69,7 +90,7 @@ public class ContinuousVerify extends Configured implements Tool {
       try {
         ContinuousWalk.validate(key, data);
       } catch (BadChecksumException bce) {
-        context.getCounter(Counts.CORRUPT).increment(1);
+        increment(context.getCounter(Counts.CORRUPT));
         if (corrupt < 1000) {
           System.out.println("ERROR Bad checksum : " + key);
         } else if (corrupt == 1000) {
@@ -123,12 +144,12 @@ public class ContinuousVerify extends Configured implements Tool {
         }
 
         context.write(new Text(ContinuousIngest.genRow(key.get())), new Text(sb.toString()));
-        context.getCounter(Counts.UNDEFINED).increment(1);
+        increment(context.getCounter(Counts.UNDEFINED));
 
       } else if (defCount > 0 && refs.size() == 0) {
-        context.getCounter(Counts.UNREFERENCED).increment(1);
+        increment(context.getCounter(Counts.UNREFERENCED));
       } else {
-        context.getCounter(Counts.REFERENCED).increment(1);
+        increment(context.getCounter(Counts.REFERENCED));
       }
 
     }


Mime
View raw message