Return-Path: X-Original-To: apmail-accumulo-commits-archive@www.apache.org Delivered-To: apmail-accumulo-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 250F211A16 for ; Fri, 28 Mar 2014 21:26:29 +0000 (UTC) Received: (qmail 78811 invoked by uid 500); 28 Mar 2014 21:26:18 -0000 Delivered-To: apmail-accumulo-commits-archive@accumulo.apache.org Received: (qmail 78451 invoked by uid 500); 28 Mar 2014 21:26:10 -0000 Mailing-List: contact commits-help@accumulo.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@accumulo.apache.org Delivered-To: mailing list commits@accumulo.apache.org Received: (qmail 78126 invoked by uid 99); 28 Mar 2014 21:26:05 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 28 Mar 2014 21:26:05 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 2EB9E915931; Fri, 28 Mar 2014 21:26:05 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: mdrob@apache.org To: commits@accumulo.apache.org Date: Fri, 28 Mar 2014 21:26:08 -0000 Message-Id: <8e22a9a8738c4e109671dd0eaab5c3ec@git.apache.org> In-Reply-To: <0b3952b892d5440cab12037f19dc4691@git.apache.org> References: <0b3952b892d5440cab12037f19dc4691@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [04/19] git commit: ACCUMULO-2564 Backport changes to unify Hadoop 1/2 ACCUMULO-2564 Backport changes to unify Hadoop 1/2 This is a backport of the changes made for Accumulo 1.5.0 and 1.5.1 to establish binary compatibility with both Hadoop 1 and 2 branches. This work was originally done under several commits and issues, as outlined below. Additionally, there was some new work that needed to be done, but is not applicable to later branches. ACCUMULO-1421, commit c9c0d45 by Adam Fuchs ACCUMULO-1421, commit d7ba6ca by Christopher Tubbs ACCUMULO-1421, commit 261cf36 by Eric Newton ACCUMULO-1421, commit cc3c2d8 by Eric Newton Issue: ACCUMULO-1809 Author: Eric Newton Reason: use reflection tricks to update counters Commit: a7e159219a29ca6f127616fd965aa857900e3f9c Issue: n/a Reason: Delegate reflection to ContextFactory The InsertWithOutputFormat functional test uses ContextFactory, so we have to update that to use the reflection hack as well. And while we're there, might as well keep the code in one place. Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/9e5854fb Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/9e5854fb Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/9e5854fb Branch: refs/heads/1.5.2-SNAPSHOT Commit: 9e5854fb3ff90c0e80523394401f4103eb4212a2 Parents: 1258b4e Author: Mike Drob Authored: Thu Mar 27 02:31:04 2014 -0400 Committer: Mike Drob Committed: Fri Mar 28 16:56:54 2014 -0400 ---------------------------------------------------------------------- .../mapreduce/AccumuloFileOutputFormat.java | 12 +-- .../client/mapreduce/AccumuloInputFormat.java | 2 +- .../client/mapreduce/AccumuloOutputFormat.java | 38 ++++----- .../core/client/mapreduce/InputFormatBase.java | 82 ++++++++++---------- .../lib/partition/RangePartitioner.java | 7 +- .../accumulo/core/util/ContextFactory.java | 24 +++++- .../org/apache/accumulo/server/Accumulo.java | 16 +--- .../apache/accumulo/server/master/LogSort.java | 7 +- .../server/test/continuous/ContinuousMoru.java | 2 +- .../test/continuous/ContinuousVerify.java | 29 ++++++- 10 files changed, 127 insertions(+), 92 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/9e5854fb/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloFileOutputFormat.java ---------------------------------------------------------------------- diff --git a/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloFileOutputFormat.java b/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloFileOutputFormat.java index 7cfab8b..8b03ace 100644 --- a/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloFileOutputFormat.java +++ b/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloFileOutputFormat.java @@ -56,7 +56,7 @@ public class AccumuloFileOutputFormat extends FileOutputFormat { @Override public RecordWriter getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException { // get the path of the temporary output file - final Configuration conf = job.getConfiguration(); + final Configuration conf = InputFormatBase.getConfiguration(job); String extension = conf.get(FILE_TYPE); if (extension == null || extension.isEmpty()) @@ -92,7 +92,7 @@ public class AccumuloFileOutputFormat extends FileOutputFormat { */ @Deprecated protected static void handleBlockSize(JobContext job) { - handleBlockSize(job.getConfiguration()); + handleBlockSize(InputFormatBase.getConfiguration(job)); } protected static void handleBlockSize(Configuration conf) { @@ -111,7 +111,7 @@ public class AccumuloFileOutputFormat extends FileOutputFormat { */ @Deprecated public static void setFileType(JobContext job, String type) { - setFileType(job.getConfiguration(), type); + setFileType(InputFormatBase.getConfiguration(job), type); } public static void setFileType(Configuration conf, String type) { @@ -123,7 +123,7 @@ public class AccumuloFileOutputFormat extends FileOutputFormat { */ @Deprecated public static void setBlockSize(JobContext job, int blockSize) { - setBlockSize(job.getConfiguration(), blockSize); + setBlockSize(InputFormatBase.getConfiguration(job), blockSize); } public static void setBlockSize(Configuration conf, int blockSize) { @@ -139,7 +139,7 @@ public class AccumuloFileOutputFormat extends FileOutputFormat { */ @Deprecated public static void setZooKeeperInstance(JobContext job, String instanceName, String zooKeepers) { - setZooKeeperInstance(job.getConfiguration(), instanceName, zooKeepers); + setZooKeeperInstance(InputFormatBase.getConfiguration(job), instanceName, zooKeepers); } public static void setZooKeeperInstance(Configuration conf, String instanceName, String zooKeepers) { @@ -159,7 +159,7 @@ public class AccumuloFileOutputFormat extends FileOutputFormat { */ @Deprecated protected static Instance getInstance(JobContext job) { - return getInstance(job.getConfiguration()); + return getInstance(InputFormatBase.getConfiguration(job)); } /** http://git-wip-us.apache.org/repos/asf/accumulo/blob/9e5854fb/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormat.java ---------------------------------------------------------------------- diff --git a/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormat.java b/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormat.java index c9a70eb..5c5c18d 100644 --- a/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormat.java +++ b/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormat.java @@ -44,7 +44,7 @@ import org.apache.log4j.Level; public class AccumuloInputFormat extends InputFormatBase { @Override public RecordReader createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { - log.setLevel(getLogLevel(context.getConfiguration())); + log.setLevel(getLogLevel(InputFormatBase.getConfiguration(context))); // Override the log level from the configuration as if the RangeInputSplit has one it's the more correct one to use. if (split instanceof RangeInputSplit) { http://git-wip-us.apache.org/repos/asf/accumulo/blob/9e5854fb/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormat.java ---------------------------------------------------------------------- diff --git a/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormat.java b/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormat.java index ed0aebf..7aee8b6 100644 --- a/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormat.java +++ b/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormat.java @@ -105,7 +105,7 @@ public class AccumuloOutputFormat extends OutputFormat { */ @Deprecated public static void setOutputInfo(JobContext job, String user, byte[] passwd, boolean createTables, String defaultTable) { - setOutputInfo(job.getConfiguration(), user, passwd, createTables, defaultTable); + setOutputInfo(InputFormatBase.getConfiguration(job), user, passwd, createTables, defaultTable); } /** @@ -140,7 +140,7 @@ public class AccumuloOutputFormat extends OutputFormat { */ @Deprecated public static void setZooKeeperInstance(JobContext job, String instanceName, String zooKeepers) { - setZooKeeperInstance(job.getConfiguration(), instanceName, zooKeepers); + setZooKeeperInstance(InputFormatBase.getConfiguration(job), instanceName, zooKeepers); } public static void setZooKeeperInstance(Configuration conf, String instanceName, String zooKeepers) { @@ -158,7 +158,7 @@ public class AccumuloOutputFormat extends OutputFormat { */ @Deprecated public static void setMockInstance(JobContext job, String instanceName) { - setMockInstance(job.getConfiguration(), instanceName); + setMockInstance(InputFormatBase.getConfiguration(job), instanceName); } public static void setMockInstance(Configuration conf, String instanceName) { @@ -172,7 +172,7 @@ public class AccumuloOutputFormat extends OutputFormat { */ @Deprecated public static void setMaxMutationBufferSize(JobContext job, long numberOfBytes) { - setMaxMutationBufferSize(job.getConfiguration(), numberOfBytes); + setMaxMutationBufferSize(InputFormatBase.getConfiguration(job), numberOfBytes); } public static void setMaxMutationBufferSize(Configuration conf, long numberOfBytes) { @@ -184,7 +184,7 @@ public class AccumuloOutputFormat extends OutputFormat { */ @Deprecated public static void setMaxLatency(JobContext job, int numberOfMilliseconds) { - setMaxLatency(job.getConfiguration(), numberOfMilliseconds); + setMaxLatency(InputFormatBase.getConfiguration(job), numberOfMilliseconds); } public static void setMaxLatency(Configuration conf, int numberOfMilliseconds) { @@ -196,7 +196,7 @@ public class AccumuloOutputFormat extends OutputFormat { */ @Deprecated public static void setMaxWriteThreads(JobContext job, int numberOfThreads) { - setMaxWriteThreads(job.getConfiguration(), numberOfThreads); + setMaxWriteThreads(InputFormatBase.getConfiguration(job), numberOfThreads); } public static void setMaxWriteThreads(Configuration conf, int numberOfThreads) { @@ -208,7 +208,7 @@ public class AccumuloOutputFormat extends OutputFormat { */ @Deprecated public static void setLogLevel(JobContext job, Level level) { - setLogLevel(job.getConfiguration(), level); + setLogLevel(InputFormatBase.getConfiguration(job), level); } public static void setLogLevel(Configuration conf, Level level) { @@ -221,7 +221,7 @@ public class AccumuloOutputFormat extends OutputFormat { */ @Deprecated public static void setSimulationMode(JobContext job) { - setSimulationMode(job.getConfiguration()); + setSimulationMode(InputFormatBase.getConfiguration(job)); } public static void setSimulationMode(Configuration conf) { @@ -233,7 +233,7 @@ public class AccumuloOutputFormat extends OutputFormat { */ @Deprecated protected static String getUsername(JobContext job) { - return getUsername(job.getConfiguration()); + return getUsername(InputFormatBase.getConfiguration(job)); } protected static String getUsername(Configuration conf) { @@ -248,7 +248,7 @@ public class AccumuloOutputFormat extends OutputFormat { */ @Deprecated protected static byte[] getPassword(JobContext job) { - return getPassword(job.getConfiguration()); + return getPassword(InputFormatBase.getConfiguration(job)); } /** @@ -264,7 +264,7 @@ public class AccumuloOutputFormat extends OutputFormat { */ @Deprecated protected static boolean canCreateTables(JobContext job) { - return canCreateTables(job.getConfiguration()); + return canCreateTables(InputFormatBase.getConfiguration(job)); } protected static boolean canCreateTables(Configuration conf) { @@ -276,7 +276,7 @@ public class AccumuloOutputFormat extends OutputFormat { */ @Deprecated protected static String getDefaultTableName(JobContext job) { - return getDefaultTableName(job.getConfiguration()); + return getDefaultTableName(InputFormatBase.getConfiguration(job)); } protected static String getDefaultTableName(Configuration conf) { @@ -288,7 +288,7 @@ public class AccumuloOutputFormat extends OutputFormat { */ @Deprecated protected static Instance getInstance(JobContext job) { - return getInstance(job.getConfiguration()); + return getInstance(InputFormatBase.getConfiguration(job)); } protected static Instance getInstance(Configuration conf) { @@ -302,7 +302,7 @@ public class AccumuloOutputFormat extends OutputFormat { */ @Deprecated protected static long getMaxMutationBufferSize(JobContext job) { - return getMaxMutationBufferSize(job.getConfiguration()); + return getMaxMutationBufferSize(InputFormatBase.getConfiguration(job)); } protected static long getMaxMutationBufferSize(Configuration conf) { @@ -314,7 +314,7 @@ public class AccumuloOutputFormat extends OutputFormat { */ @Deprecated protected static int getMaxLatency(JobContext job) { - return getMaxLatency(job.getConfiguration()); + return getMaxLatency(InputFormatBase.getConfiguration(job)); } protected static int getMaxLatency(Configuration conf) { @@ -326,7 +326,7 @@ public class AccumuloOutputFormat extends OutputFormat { */ @Deprecated protected static int getMaxWriteThreads(JobContext job) { - return getMaxWriteThreads(job.getConfiguration()); + return getMaxWriteThreads(InputFormatBase.getConfiguration(job)); } protected static int getMaxWriteThreads(Configuration conf) { @@ -338,7 +338,7 @@ public class AccumuloOutputFormat extends OutputFormat { */ @Deprecated protected static Level getLogLevel(JobContext job) { - return getLogLevel(job.getConfiguration()); + return getLogLevel(InputFormatBase.getConfiguration(job)); } protected static Level getLogLevel(Configuration conf) { @@ -352,7 +352,7 @@ public class AccumuloOutputFormat extends OutputFormat { */ @Deprecated protected static boolean getSimulationMode(JobContext job) { - return getSimulationMode(job.getConfiguration()); + return getSimulationMode(InputFormatBase.getConfiguration(job)); } protected static boolean getSimulationMode(Configuration conf) { @@ -513,7 +513,7 @@ public class AccumuloOutputFormat extends OutputFormat { @Override public void checkOutputSpecs(JobContext job) throws IOException { - Configuration conf = job.getConfiguration(); + Configuration conf = InputFormatBase.getConfiguration(job); if (!conf.getBoolean(OUTPUT_INFO_HAS_BEEN_SET, false)) throw new IOException("Output info has not been set."); if (!conf.getBoolean(INSTANCE_HAS_BEEN_SET, false)) http://git-wip-us.apache.org/repos/asf/accumulo/blob/9e5854fb/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java ---------------------------------------------------------------------- diff --git a/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java b/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java index a11096c..b71f592 100644 --- a/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java +++ b/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java @@ -72,6 +72,7 @@ import org.apache.accumulo.core.security.TablePermission; import org.apache.accumulo.core.security.thrift.AuthInfo; import org.apache.accumulo.core.util.ArgumentChecker; import org.apache.accumulo.core.util.ColumnFQ; +import org.apache.accumulo.core.util.ContextFactory; import org.apache.accumulo.core.util.Pair; import org.apache.accumulo.core.util.TextUtil; import org.apache.accumulo.core.util.UtilWaitThread; @@ -148,7 +149,7 @@ public abstract class InputFormatBase extends InputFormat { */ @Deprecated public static void setIsolated(JobContext job, boolean enable) { - setIsolated(job.getConfiguration(), enable); + setIsolated(InputFormatBase.getConfiguration(job), enable); } /** @@ -168,7 +169,7 @@ public abstract class InputFormatBase extends InputFormat { */ @Deprecated public static void setLocalIterators(JobContext job, boolean enable) { - setLocalIterators(job.getConfiguration(), enable); + setLocalIterators(InputFormatBase.getConfiguration(job), enable); } /** @@ -188,7 +189,7 @@ public abstract class InputFormatBase extends InputFormat { */ @Deprecated public static void setInputInfo(JobContext job, String user, byte[] passwd, String table, Authorizations auths) { - setInputInfo(job.getConfiguration(), user, passwd, table, auths); + setInputInfo(InputFormatBase.getConfiguration(job), user, passwd, table, auths); } /** @@ -223,7 +224,7 @@ public abstract class InputFormatBase extends InputFormat { */ @Deprecated public static void setZooKeeperInstance(JobContext job, String instanceName, String zooKeepers) { - setZooKeeperInstance(job.getConfiguration(), instanceName, zooKeepers); + setZooKeeperInstance(InputFormatBase.getConfiguration(job), instanceName, zooKeepers); } /** @@ -251,7 +252,7 @@ public abstract class InputFormatBase extends InputFormat { */ @Deprecated public static void setMockInstance(JobContext job, String instanceName) { - setMockInstance(job.getConfiguration(), instanceName); + setMockInstance(InputFormatBase.getConfiguration(job), instanceName); } /** @@ -273,7 +274,7 @@ public abstract class InputFormatBase extends InputFormat { */ @Deprecated public static void setRanges(JobContext job, Collection ranges) { - setRanges(job.getConfiguration(), ranges); + setRanges(InputFormatBase.getConfiguration(job), ranges); } /** @@ -304,7 +305,7 @@ public abstract class InputFormatBase extends InputFormat { */ @Deprecated public static void disableAutoAdjustRanges(JobContext job) { - disableAutoAdjustRanges(job.getConfiguration()); + disableAutoAdjustRanges(InputFormatBase.getConfiguration(job)); } /** @@ -354,7 +355,7 @@ public abstract class InputFormatBase extends InputFormat { throw new NoSuchElementException(); } try { - job.getConfiguration().set(key, URLEncoder.encode(regex, "UTF-8")); + InputFormatBase.getConfiguration(job).set(key, URLEncoder.encode(regex, "UTF-8")); } catch (UnsupportedEncodingException e) { log.error("Failedd to encode regular expression", e); throw new RuntimeException(e); @@ -366,7 +367,7 @@ public abstract class InputFormatBase extends InputFormat { */ @Deprecated public static void setMaxVersions(JobContext job, int maxVersions) throws IOException { - setMaxVersions(job.getConfiguration(), maxVersions); + setMaxVersions(InputFormatBase.getConfiguration(job), maxVersions); } /** @@ -422,7 +423,7 @@ public abstract class InputFormatBase extends InputFormat { */ @Deprecated public static void fetchColumns(JobContext job, Collection> columnFamilyColumnQualifierPairs) { - fetchColumns(job.getConfiguration(), columnFamilyColumnQualifierPairs); + fetchColumns(InputFormatBase.getConfiguration(job), columnFamilyColumnQualifierPairs); } /** @@ -460,7 +461,7 @@ public abstract class InputFormatBase extends InputFormat { */ @Deprecated public static void setLogLevel(JobContext job, Level level) { - setLogLevel(job.getConfiguration(), level); + setLogLevel(InputFormatBase.getConfiguration(job), level); } /** @@ -482,7 +483,7 @@ public abstract class InputFormatBase extends InputFormat { */ @Deprecated public static void addIterator(JobContext job, IteratorSetting cfg) { - addIterator(job.getConfiguration(), cfg); + addIterator(InputFormatBase.getConfiguration(job), cfg); } /** @@ -543,7 +544,7 @@ public abstract class InputFormatBase extends InputFormat { @Deprecated public static void setIterator(JobContext job, int priority, String iteratorClass, String iteratorName) { // First check to see if anything has been set already - String iterators = job.getConfiguration().get(ITERATORS); + String iterators = InputFormatBase.getConfiguration(job).get(ITERATORS); // No iterators specified yet, create a new string if (iterators == null || iterators.isEmpty()) { @@ -553,7 +554,7 @@ public abstract class InputFormatBase extends InputFormat { iterators = iterators.concat(ITERATORS_DELIM + new AccumuloIterator(priority, iteratorClass, iteratorName).toString()); } // Store the iterators w/ the job - job.getConfiguration().set(ITERATORS, iterators); + InputFormatBase.getConfiguration(job).set(ITERATORS, iterators); } @@ -576,7 +577,7 @@ public abstract class InputFormatBase extends InputFormat { if (iteratorName == null || key == null || value == null) return; - String iteratorOptions = job.getConfiguration().get(ITERATORS_OPTIONS); + String iteratorOptions = InputFormatBase.getConfiguration(job).get(ITERATORS_OPTIONS); // No options specified yet, create a new string if (iteratorOptions == null || iteratorOptions.isEmpty()) { @@ -587,7 +588,7 @@ public abstract class InputFormatBase extends InputFormat { } // Store the options w/ the job - job.getConfiguration().set(ITERATORS_OPTIONS, iteratorOptions); + InputFormatBase.getConfiguration(job).set(ITERATORS_OPTIONS, iteratorOptions); } /** @@ -595,7 +596,7 @@ public abstract class InputFormatBase extends InputFormat { */ @Deprecated protected static boolean isIsolated(JobContext job) { - return isIsolated(job.getConfiguration()); + return isIsolated(InputFormatBase.getConfiguration(job)); } /** @@ -615,7 +616,7 @@ public abstract class InputFormatBase extends InputFormat { */ @Deprecated protected static boolean usesLocalIterators(JobContext job) { - return usesLocalIterators(job.getConfiguration()); + return usesLocalIterators(InputFormatBase.getConfiguration(job)); } /** @@ -635,7 +636,7 @@ public abstract class InputFormatBase extends InputFormat { */ @Deprecated protected static String getUsername(JobContext job) { - return getUsername(job.getConfiguration()); + return getUsername(InputFormatBase.getConfiguration(job)); } /** @@ -658,7 +659,7 @@ public abstract class InputFormatBase extends InputFormat { */ @Deprecated protected static byte[] getPassword(JobContext job) { - return getPassword(job.getConfiguration()); + return getPassword(InputFormatBase.getConfiguration(job)); } /** @@ -679,7 +680,7 @@ public abstract class InputFormatBase extends InputFormat { */ @Deprecated protected static String getTablename(JobContext job) { - return getTablename(job.getConfiguration()); + return getTablename(InputFormatBase.getConfiguration(job)); } /** @@ -699,7 +700,7 @@ public abstract class InputFormatBase extends InputFormat { */ @Deprecated protected static Authorizations getAuthorizations(JobContext job) { - return getAuthorizations(job.getConfiguration()); + return getAuthorizations(InputFormatBase.getConfiguration(job)); } /** @@ -720,7 +721,7 @@ public abstract class InputFormatBase extends InputFormat { */ @Deprecated protected static Instance getInstance(JobContext job) { - return getInstance(job.getConfiguration()); + return getInstance(InputFormatBase.getConfiguration(job)); } /** @@ -743,7 +744,7 @@ public abstract class InputFormatBase extends InputFormat { */ @Deprecated protected static TabletLocator getTabletLocator(JobContext job) throws TableNotFoundException { - return getTabletLocator(job.getConfiguration()); + return getTabletLocator(InputFormatBase.getConfiguration(job)); } /** @@ -771,7 +772,7 @@ public abstract class InputFormatBase extends InputFormat { */ @Deprecated protected static List getRanges(JobContext job) throws IOException { - return getRanges(job.getConfiguration()); + return getRanges(InputFormatBase.getConfiguration(job)); } /** @@ -819,7 +820,7 @@ public abstract class InputFormatBase extends InputFormat { throw new NoSuchElementException(); } try { - String s = job.getConfiguration().get(key); + String s = InputFormatBase.getConfiguration(job).get(key); if (s == null) return null; return URLDecoder.decode(s, "UTF-8"); @@ -834,7 +835,7 @@ public abstract class InputFormatBase extends InputFormat { */ @Deprecated protected static Set> getFetchedColumns(JobContext job) { - return getFetchedColumns(job.getConfiguration()); + return getFetchedColumns(InputFormatBase.getConfiguration(job)); } /** @@ -872,7 +873,7 @@ public abstract class InputFormatBase extends InputFormat { */ @Deprecated protected static boolean getAutoAdjustRanges(JobContext job) { - return getAutoAdjustRanges(job.getConfiguration()); + return getAutoAdjustRanges(InputFormatBase.getConfiguration(job)); } /** @@ -892,7 +893,7 @@ public abstract class InputFormatBase extends InputFormat { */ @Deprecated protected static Level getLogLevel(JobContext job) { - return getLogLevel(job.getConfiguration()); + return getLogLevel(InputFormatBase.getConfiguration(job)); } /** @@ -914,7 +915,7 @@ public abstract class InputFormatBase extends InputFormat { */ @Deprecated protected static void validateOptions(JobContext job) throws IOException { - validateOptions(job.getConfiguration()); + validateOptions(InputFormatBase.getConfiguration(job)); } // InputFormat doesn't have the equivalent of OutputFormat's @@ -960,7 +961,7 @@ public abstract class InputFormatBase extends InputFormat { */ @Deprecated protected static int getMaxVersions(JobContext job) { - return getMaxVersions(job.getConfiguration()); + return getMaxVersions(InputFormatBase.getConfiguration(job)); } /** @@ -986,7 +987,7 @@ public abstract class InputFormatBase extends InputFormat { */ @Deprecated protected static List getIterators(JobContext job) { - return getIterators(job.getConfiguration()); + return getIterators(InputFormatBase.getConfiguration(job)); } /** @@ -1020,7 +1021,7 @@ public abstract class InputFormatBase extends InputFormat { */ @Deprecated protected static List getIteratorOptions(JobContext job) { - return getIteratorOptions(job.getConfiguration()); + return getIteratorOptions(InputFormatBase.getConfiguration(job)); } /** @@ -1136,7 +1137,7 @@ public abstract class InputFormatBase extends InputFormat { Scanner scanner; split = (RangeInputSplit) inSplit; log.debug("Initializing input split: " + split.getRange()); - Configuration conf = attempt.getConfiguration(); + Configuration conf = InputFormatBase.getConfiguration(attempt); Instance instance = split.getInstance(); if (null == instance) { @@ -1296,8 +1297,8 @@ public abstract class InputFormatBase extends InputFormat { Map>> binnedRanges = new HashMap>>(); - Instance instance = getInstance(job.getConfiguration()); - Connector conn = instance.getConnector(getUsername(job.getConfiguration()), getPassword(job.getConfiguration())); + Instance instance = getInstance(InputFormatBase.getConfiguration(job)); + Connector conn = instance.getConnector(getUsername(InputFormatBase.getConfiguration(job)), getPassword(InputFormatBase.getConfiguration(job))); String tableId = Tables.getTableId(instance, tableName); if (Tables.getTableState(instance, tableId) != TableState.OFFLINE) { @@ -1395,7 +1396,7 @@ public abstract class InputFormatBase extends InputFormat { * Read the metadata table to get tablets and match up ranges to them. */ public List getSplits(JobContext job) throws IOException { - Configuration conf = job.getConfiguration(); + Configuration conf = InputFormatBase.getConfiguration(job); log.setLevel(getLogLevel(conf)); validateOptions(conf); @@ -1428,7 +1429,7 @@ public abstract class InputFormatBase extends InputFormat { Map>> binnedRanges = new HashMap>>(); TabletLocator tl; try { - if (isOfflineScan(job.getConfiguration())) { + if (isOfflineScan(InputFormatBase.getConfiguration(job))) { binnedRanges = binOfflineTable(job, tableName, ranges); while (binnedRanges == null) { // Some tablets were still online, try again @@ -1437,7 +1438,7 @@ public abstract class InputFormatBase extends InputFormat { } } else { String tableId = null; - tl = getTabletLocator(job.getConfiguration()); + tl = getTabletLocator(InputFormatBase.getConfiguration(job)); // its possible that the cache could contain complete, but old information about a tables tablets... so clear it tl.invalidateCache(); while (!tl.binRanges(ranges, binnedRanges).isEmpty()) { @@ -1622,4 +1623,7 @@ public abstract class InputFormatBase extends InputFormat { } + public static Configuration getConfiguration(JobContext context) { + return ContextFactory.getConfiguration(context); + } } http://git-wip-us.apache.org/repos/asf/accumulo/blob/9e5854fb/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/partition/RangePartitioner.java ---------------------------------------------------------------------- diff --git a/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/partition/RangePartitioner.java b/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/partition/RangePartitioner.java index ae537f9..2a39dc7 100644 --- a/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/partition/RangePartitioner.java +++ b/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/partition/RangePartitioner.java @@ -25,6 +25,7 @@ import java.util.Arrays; import java.util.Scanner; import java.util.TreeSet; +import org.apache.accumulo.core.client.mapreduce.InputFormatBase; import org.apache.commons.codec.binary.Base64; import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configuration; @@ -119,14 +120,14 @@ public class RangePartitioner extends Partitioner implements Conf */ public static void setSplitFile(JobContext job, String file) { URI uri = new Path(file).toUri(); - DistributedCache.addCacheFile(uri, job.getConfiguration()); - job.getConfiguration().set(CUTFILE_KEY, uri.getPath()); + DistributedCache.addCacheFile(uri, InputFormatBase.getConfiguration(job)); + InputFormatBase.getConfiguration(job).set(CUTFILE_KEY, uri.getPath()); } /** * Sets the number of random sub-bins per range */ public static void setNumSubBins(JobContext job, int num) { - job.getConfiguration().setInt(NUM_SUBBINS, num); + InputFormatBase.getConfiguration(job).setInt(NUM_SUBBINS, num); } } http://git-wip-us.apache.org/repos/asf/accumulo/blob/9e5854fb/src/core/src/main/java/org/apache/accumulo/core/util/ContextFactory.java ---------------------------------------------------------------------- diff --git a/src/core/src/main/java/org/apache/accumulo/core/util/ContextFactory.java b/src/core/src/main/java/org/apache/accumulo/core/util/ContextFactory.java index 5a1c2ef..4979dfa 100644 --- a/src/core/src/main/java/org/apache/accumulo/core/util/ContextFactory.java +++ b/src/core/src/main/java/org/apache/accumulo/core/util/ContextFactory.java @@ -20,6 +20,7 @@ package org.apache.accumulo.core.util; import java.lang.reflect.Constructor; import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapreduce.InputSplit; @@ -46,6 +47,7 @@ public class ContextFactory { private static final Constructor MAP_CONSTRUCTOR; private static final Constructor MAP_CONTEXT_CONSTRUCTOR; private static final Constructor MAP_CONTEXT_IMPL_CONSTRUCTOR; + private static final Method GET_CONFIGURATION_METHOD; private static final Class TASK_TYPE_CLASS; private static final boolean useV21; @@ -63,6 +65,8 @@ public class ContextFactory { Class mapCls; Class mapContextCls; Class innerMapContextCls; + Class jobContextRoot; + try { if (v21) { jobContextCls = Class.forName(PACKAGE + ".task.JobContextImpl"); @@ -79,6 +83,7 @@ public class ContextFactory { mapCls = Class.forName(PACKAGE + ".Mapper"); innerMapContextCls = Class.forName(PACKAGE + ".Mapper$Context"); } + jobContextRoot = Class.forName(PACKAGE + ".JobContext"); } catch (ClassNotFoundException e) { throw new IllegalArgumentException("Can't find class", e); } @@ -104,6 +109,7 @@ public class ContextFactory { MAP_CONTEXT_IMPL_CONSTRUCTOR = null; } MAP_CONTEXT_CONSTRUCTOR.setAccessible(true); + GET_CONFIGURATION_METHOD = jobContextRoot.getMethod("getConfiguration"); } catch (SecurityException e) { throw new IllegalArgumentException("Can't run constructor ", e); } catch (NoSuchMethodException e) { @@ -127,8 +133,20 @@ public class ContextFactory { } } + public static Configuration getConfiguration(JobContext context) { + try { + return (Configuration) GET_CONFIGURATION_METHOD.invoke(context, new Object[0]); + } catch (IllegalArgumentException e) { + throw new IllegalArgumentException("Can't invoke method", e); + } catch (IllegalAccessException e) { + throw new IllegalArgumentException("Can't invoke method", e); + } catch (InvocationTargetException e) { + throw new IllegalArgumentException("Can't invoke method", e); + } + } + public static TaskAttemptContext createTaskAttemptContext(JobContext job) { - return createTaskAttemptContext(job.getConfiguration()); + return createTaskAttemptContext(getConfiguration(job)); } public static TaskAttemptContext createTaskAttemptContext(Configuration conf) { @@ -157,10 +175,10 @@ public class ContextFactory { RecordWriter writer, OutputCommitter committer, StatusReporter reporter, InputSplit split) { try { if (useV21) { - Object basis = MAP_CONTEXT_IMPL_CONSTRUCTOR.newInstance(tac.getConfiguration(), tac.getTaskAttemptID(), reader, writer, committer, reporter, split); + Object basis = MAP_CONTEXT_IMPL_CONSTRUCTOR.newInstance(getConfiguration(tac), tac.getTaskAttemptID(), reader, writer, committer, reporter, split); return (Mapper.Context) MAP_CONTEXT_CONSTRUCTOR.newInstance((Mapper) MAP_CONSTRUCTOR.newInstance(), basis); } else { - return (Mapper.Context) MAP_CONTEXT_CONSTRUCTOR.newInstance(m, tac.getConfiguration(), tac.getTaskAttemptID(), reader, writer, committer, reporter, + return (Mapper.Context) MAP_CONTEXT_CONSTRUCTOR.newInstance(m, getConfiguration(tac), tac.getTaskAttemptID(), reader, writer, committer, reporter, split); } } catch (InstantiationException e) { http://git-wip-us.apache.org/repos/asf/accumulo/blob/9e5854fb/src/server/src/main/java/org/apache/accumulo/server/Accumulo.java ---------------------------------------------------------------------- diff --git a/src/server/src/main/java/org/apache/accumulo/server/Accumulo.java b/src/server/src/main/java/org/apache/accumulo/server/Accumulo.java index 184692c..7747ea3 100644 --- a/src/server/src/main/java/org/apache/accumulo/server/Accumulo.java +++ b/src/server/src/main/java/org/apache/accumulo/server/Accumulo.java @@ -250,28 +250,18 @@ public class Accumulo { DistributedFileSystem dfs = (DistributedFileSystem) FileSystem.get(CachedConfiguration.getInstance()); // So this: if (!dfs.setSafeMode(SafeModeAction.SAFEMODE_GET)) // Becomes this: - Class constantClass; + Class safeModeAction; try { // hadoop 2.0 - constantClass = Class.forName("org.apache.hadoop.hdfs.protocol.HdfsConstants"); + safeModeAction = Class.forName("org.apache.hadoop.hdfs.protocol.HdfsConstants$SafeModeAction"); } catch (ClassNotFoundException ex) { // hadoop 1.0 try { - constantClass = Class.forName("org.apache.hadoop.hdfs.protocol.FSConstants"); + safeModeAction = Class.forName("org.apache.hadoop.hdfs.protocol.FSConstants$SafeModeAction"); } catch (ClassNotFoundException e) { throw new RuntimeException("Cannot figure out the right class for Constants"); } } - Class safeModeAction = null; - for (Class klass : constantClass.getDeclaredClasses()) { - if (klass.getSimpleName().equals("SafeModeAction")) { - safeModeAction = klass; - break; - } - } - if (safeModeAction == null) { - throw new RuntimeException("Cannot find SafeModeAction in constants class"); - } Object get = null; for (Object obj : safeModeAction.getEnumConstants()) { http://git-wip-us.apache.org/repos/asf/accumulo/blob/9e5854fb/src/server/src/main/java/org/apache/accumulo/server/master/LogSort.java ---------------------------------------------------------------------- diff --git a/src/server/src/main/java/org/apache/accumulo/server/master/LogSort.java b/src/server/src/main/java/org/apache/accumulo/server/master/LogSort.java index 1c384a3..10f9151 100644 --- a/src/server/src/main/java/org/apache/accumulo/server/master/LogSort.java +++ b/src/server/src/main/java/org/apache/accumulo/server/master/LogSort.java @@ -21,6 +21,7 @@ import java.util.ArrayList; import java.util.List; import org.apache.accumulo.core.client.mapreduce.AccumuloFileOutputFormat; +import org.apache.accumulo.core.client.mapreduce.InputFormatBase; import org.apache.accumulo.core.util.CachedConfiguration; import org.apache.accumulo.server.client.HdfsZooInstance; import org.apache.accumulo.server.logger.IdentityReducer; @@ -76,7 +77,7 @@ public class LogSort extends Configured implements Tool { public SortCommit(Path outputPath, TaskAttemptContext context) throws IOException { super(outputPath, context); this.outputPath = outputPath; - outputFileSystem = outputPath.getFileSystem(context.getConfiguration()); + outputFileSystem = outputPath.getFileSystem(InputFormatBase.getConfiguration(context)); } @Override @@ -207,7 +208,7 @@ public class LogSort extends Configured implements Tool { // get the path of the temporary output file Path file = getDefaultWorkFile(job, ""); - FileSystem fs = file.getFileSystem(job.getConfiguration()); + FileSystem fs = file.getFileSystem(InputFormatBase.getConfiguration(job)); CompressionCodec codec = null; CompressionType compressionType = CompressionType.NONE; if (getCompressOutput(job)) { @@ -216,7 +217,7 @@ public class LogSort extends Configured implements Tool { // find the right codec Class codecClass = getOutputCompressorClass(job, DefaultCodec.class); - codec = ReflectionUtils.newInstance(codecClass, job.getConfiguration()); + codec = ReflectionUtils.newInstance(codecClass, InputFormatBase.getConfiguration(job)); } Progressable progress = new Progressable() { http://git-wip-us.apache.org/repos/asf/accumulo/blob/9e5854fb/src/server/src/main/java/org/apache/accumulo/server/test/continuous/ContinuousMoru.java ---------------------------------------------------------------------- diff --git a/src/server/src/main/java/org/apache/accumulo/server/test/continuous/ContinuousMoru.java b/src/server/src/main/java/org/apache/accumulo/server/test/continuous/ContinuousMoru.java index 1c384cc..88fbb25 100644 --- a/src/server/src/main/java/org/apache/accumulo/server/test/continuous/ContinuousMoru.java +++ b/src/server/src/main/java/org/apache/accumulo/server/test/continuous/ContinuousMoru.java @@ -102,7 +102,7 @@ public class ContinuousMoru extends Configured implements Tool { } } else { - context.getCounter(Counts.SELF_READ).increment(1); + ContinuousVerify.increment(context.getCounter(Counts.SELF_READ)); } } } http://git-wip-us.apache.org/repos/asf/accumulo/blob/9e5854fb/src/server/src/main/java/org/apache/accumulo/server/test/continuous/ContinuousVerify.java ---------------------------------------------------------------------- diff --git a/src/server/src/main/java/org/apache/accumulo/server/test/continuous/ContinuousVerify.java b/src/server/src/main/java/org/apache/accumulo/server/test/continuous/ContinuousVerify.java index 9441cf5..4b465a8 100644 --- a/src/server/src/main/java/org/apache/accumulo/server/test/continuous/ContinuousVerify.java +++ b/src/server/src/main/java/org/apache/accumulo/server/test/continuous/ContinuousVerify.java @@ -17,6 +17,7 @@ package org.apache.accumulo.server.test.continuous; import java.io.IOException; +import java.lang.reflect.Method; import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; @@ -38,6 +39,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.VLongWritable; +import org.apache.hadoop.mapreduce.Counter; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; @@ -50,6 +52,25 @@ import org.apache.hadoop.util.ToolRunner; */ public class ContinuousVerify extends Configured implements Tool { + // work around hadoop-1/hadoop-2 runtime incompatibility + static private Method INCREMENT; + static { + try { + Class counter = Counter.class; + + INCREMENT = counter.getMethod("increment", Long.TYPE); + } catch (Exception ex) { + throw new RuntimeException(ex); + } + } + + static void increment(Object obj) { + try { + INCREMENT.invoke(obj, 1L); + } catch (Exception ex) { + throw new RuntimeException(ex); + } + } public static final VLongWritable DEF = new VLongWritable(-1); @@ -69,7 +90,7 @@ public class ContinuousVerify extends Configured implements Tool { try { ContinuousWalk.validate(key, data); } catch (BadChecksumException bce) { - context.getCounter(Counts.CORRUPT).increment(1); + increment(context.getCounter(Counts.CORRUPT)); if (corrupt < 1000) { System.out.println("ERROR Bad checksum : " + key); } else if (corrupt == 1000) { @@ -123,12 +144,12 @@ public class ContinuousVerify extends Configured implements Tool { } context.write(new Text(ContinuousIngest.genRow(key.get())), new Text(sb.toString())); - context.getCounter(Counts.UNDEFINED).increment(1); + increment(context.getCounter(Counts.UNDEFINED)); } else if (defCount > 0 && refs.size() == 0) { - context.getCounter(Counts.UNREFERENCED).increment(1); + increment(context.getCounter(Counts.UNREFERENCED)); } else { - context.getCounter(Counts.REFERENCED).increment(1); + increment(context.getCounter(Counts.REFERENCED)); } }