Return-Path: X-Original-To: apmail-accumulo-commits-archive@www.apache.org Delivered-To: apmail-accumulo-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 8E2CC108D9 for ; Sun, 24 Nov 2013 00:33:37 +0000 (UTC) Received: (qmail 92813 invoked by uid 500); 24 Nov 2013 00:33:33 -0000 Delivered-To: apmail-accumulo-commits-archive@accumulo.apache.org Received: (qmail 92540 invoked by uid 500); 24 Nov 2013 00:33:32 -0000 Mailing-List: contact commits-help@accumulo.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@accumulo.apache.org Delivered-To: mailing list commits@accumulo.apache.org Received: (qmail 92315 invoked by uid 99); 24 Nov 2013 00:33:32 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Sun, 24 Nov 2013 00:33:32 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 77F2D904042; Sun, 24 Nov 2013 00:33:32 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: elserj@apache.org To: commits@accumulo.apache.org Date: Sun, 24 Nov 2013 00:33:33 -0000 Message-Id: <0cac8f6b97f04c69bbd5201d8bdbcafe@git.apache.org> In-Reply-To: <449e651f9ade4e8fa564570029299af0@git.apache.org> References: <449e651f9ade4e8fa564570029299af0@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [02/31] git commit: Squashed commit of the following: Squashed commit of the following: commit dfbe098fb650d1d1605ac28ff0b195e229ecb345 Author: Josh Elser Date: Wed Nov 20 23:57:18 2013 -0500 ACCUMULO-1843 Add in log4j Level to RangeInputSplit. Add more tests, notably ones that exercise delegation of the input split to the Configuration. commit 38fdee9916edd938bea1642de5d4e5cf54a81596 Author: Josh Elser Date: Fri Nov 8 17:47:57 2013 -0500 ACCUMULO-1854 Fix up InputFormatBase to use the information stored on RangeInputSplit and fall back onto the Configuration. commit 0e6d1aba7eacef357e0a17c67a453dd5b50a49dc Author: Josh Elser Date: Fri Nov 8 16:23:49 2013 -0500 ACCUMULO-1854 Clean up constructors. Add a test. commit 2f59f81f6e75f8a90ccfe3df00c6ad3f69174e0c Author: Josh Elser Date: Fri Nov 8 15:46:39 2013 -0500 ACCUMULO-1854 Move RangeInputSplit into its own file and store all connection information into it. Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/45ae55fc Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/45ae55fc Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/45ae55fc Branch: refs/heads/master Commit: 45ae55fcb74832983ffc188524790a56e5261ae0 Parents: 3f633bf Author: Josh Elser Authored: Thu Nov 21 00:19:59 2013 -0500 Committer: Josh Elser Committed: Thu Nov 21 00:19:59 2013 -0500 ---------------------------------------------------------------------- .../client/mapreduce/AccumuloInputFormat.java | 11 + .../core/client/mapreduce/InputFormatBase.java | 615 ++++++++++--------- .../core/client/mapreduce/RangeInputSplit.java | 493 +++++++++++++++ .../mapreduce/AccumuloInputFormatTest.java | 253 ++++++-- .../mapreduce/AccumuloRowInputFormatTest.java | 1 - .../client/mapreduce/RangeInputSplitTest.java | 100 +++ .../simple/filedata/ChunkInputFormatTest.java | 4 +- 7 files changed, 1126 insertions(+), 351 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/45ae55fc/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormat.java ---------------------------------------------------------------------- diff --git a/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormat.java b/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormat.java index 4de131f..c9a70eb 100644 --- a/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormat.java +++ b/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormat.java @@ -25,6 +25,7 @@ import org.apache.accumulo.core.util.format.DefaultFormatter; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.log4j.Level; /** * This class allows MapReduce jobs to use Accumulo as the source of data. This input format provides keys and values of type Key and Value to the Map() and @@ -44,6 +45,16 @@ public class AccumuloInputFormat extends InputFormatBase { @Override public RecordReader createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { log.setLevel(getLogLevel(context.getConfiguration())); + + // Override the log level from the configuration as if the RangeInputSplit has one it's the more correct one to use. + if (split instanceof RangeInputSplit) { + RangeInputSplit risplit = (RangeInputSplit) split; + Level level = risplit.getLogLevel(); + if (null != level) { + log.setLevel(level); + } + } + return new RecordReaderBase() { @Override public boolean nextKeyValue() throws IOException, InterruptedException { http://git-wip-us.apache.org/repos/asf/accumulo/blob/45ae55fc/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java ---------------------------------------------------------------------- diff --git a/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java b/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java index 8e238f1..40e09a1 100644 --- a/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java +++ b/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java @@ -110,7 +110,7 @@ import org.apache.log4j.Logger; public abstract class InputFormatBase extends InputFormat { protected static final Logger log = Logger.getLogger(InputFormatBase.class); - + private static final String PREFIX = AccumuloInputFormat.class.getSimpleName(); private static final String INPUT_INFO_HAS_BEEN_SET = PREFIX + ".configured"; private static final String INSTANCE_HAS_BEEN_SET = PREFIX + ".instanceConfigured"; @@ -118,34 +118,34 @@ public abstract class InputFormatBase extends InputFormat { private static final String PASSWORD = PREFIX + ".password"; private static final String TABLE_NAME = PREFIX + ".tablename"; private static final String AUTHORIZATIONS = PREFIX + ".authorizations"; - + private static final String INSTANCE_NAME = PREFIX + ".instanceName"; private static final String ZOOKEEPERS = PREFIX + ".zooKeepers"; private static final String MOCK = ".useMockInstance"; - + private static final String RANGES = PREFIX + ".ranges"; private static final String AUTO_ADJUST_RANGES = PREFIX + ".ranges.autoAdjust"; - + private static final String ROW_REGEX = PREFIX + ".regex.row"; private static final String COLUMN_FAMILY_REGEX = PREFIX + ".regex.cf"; private static final String COLUMN_QUALIFIER_REGEX = PREFIX + ".regex.cq"; private static final String VALUE_REGEX = PREFIX + ".regex.value"; - + private static final String COLUMNS = PREFIX + ".columns"; private static final String LOGLEVEL = PREFIX + ".loglevel"; - + private static final String ISOLATED = PREFIX + ".isolated"; - + private static final String LOCAL_ITERATORS = PREFIX + ".localiters"; - + // Used to specify the maximum # of versions of an Accumulo cell value to return private static final String MAX_VERSIONS = PREFIX + ".maxVersions"; - + // Used for specifying the iterators to be applied private static final String ITERATORS = PREFIX + ".iterators"; private static final String ITERATORS_OPTIONS = PREFIX + ".iterators.options"; private static final String ITERATORS_DELIM = ","; - + private static final String READ_OFFLINE = PREFIX + ".read.offline"; /** @@ -154,7 +154,7 @@ public abstract class InputFormatBase extends InputFormat { public static void setIsolated(JobContext job, boolean enable) { setIsolated(job.getConfiguration(), enable); } - + /** * Enable or disable use of the {@link IsolatedScanner} in this configuration object. By default it is not enabled. * @@ -166,14 +166,14 @@ public abstract class InputFormatBase extends InputFormat { public static void setIsolated(Configuration conf, boolean enable) { conf.setBoolean(ISOLATED, enable); } - + /** * @deprecated Use {@link #setLocalIterators(Configuration,boolean)} instead */ public static void setLocalIterators(JobContext job, boolean enable) { setLocalIterators(job.getConfiguration(), enable); } - + /** * Enable or disable use of the {@link ClientSideIteratorScanner} in this Configuration object. By default it is not enabled. * @@ -185,14 +185,14 @@ public abstract class InputFormatBase extends InputFormat { public static void setLocalIterators(Configuration conf, boolean enable) { conf.setBoolean(LOCAL_ITERATORS, enable); } - + /** * @deprecated Use {@link #setInputInfo(Configuration,String,byte[],String,Authorizations)} instead */ public static void setInputInfo(JobContext job, String user, byte[] passwd, String table, Authorizations auths) { setInputInfo(job.getConfiguration(), user, passwd, table, auths); } - + /** * Initialize the user, table, and authorization information for the configuration object that will be used with an Accumulo InputFormat. * @@ -211,7 +211,7 @@ public abstract class InputFormatBase extends InputFormat { if (conf.getBoolean(INPUT_INFO_HAS_BEEN_SET, false)) throw new IllegalStateException("Input info can only be set once per job"); conf.setBoolean(INPUT_INFO_HAS_BEEN_SET, true); - + ArgumentChecker.notNull(user, passwd, table); conf.set(USERNAME, user); conf.set(PASSWORD, new String(Base64.encodeBase64(passwd))); @@ -219,14 +219,14 @@ public abstract class InputFormatBase extends InputFormat { if (auths != null && !auths.isEmpty()) conf.set(AUTHORIZATIONS, auths.serialize()); } - + /** * @deprecated Use {@link #setZooKeeperInstance(Configuration,String,String)} instead */ public static void setZooKeeperInstance(JobContext job, String instanceName, String zooKeepers) { setZooKeeperInstance(job.getConfiguration(), instanceName, zooKeepers); } - + /** * Configure a {@link ZooKeeperInstance} for this configuration object. * @@ -241,19 +241,19 @@ public abstract class InputFormatBase extends InputFormat { if (conf.getBoolean(INSTANCE_HAS_BEEN_SET, false)) throw new IllegalStateException("Instance info can only be set once per job"); conf.setBoolean(INSTANCE_HAS_BEEN_SET, true); - + ArgumentChecker.notNull(instanceName, zooKeepers); conf.set(INSTANCE_NAME, instanceName); conf.set(ZOOKEEPERS, zooKeepers); } - + /** * @deprecated Use {@link #setMockInstance(Configuration,String)} instead */ public static void setMockInstance(JobContext job, String instanceName) { setMockInstance(job.getConfiguration(), instanceName); } - + /** * Configure a {@link MockInstance} for this configuration object. * @@ -267,14 +267,14 @@ public abstract class InputFormatBase extends InputFormat { conf.setBoolean(MOCK, true); conf.set(INSTANCE_NAME, instanceName); } - + /** * @deprecated Use {@link #setRanges(Configuration,Collection)} instead */ public static void setRanges(JobContext job, Collection ranges) { setRanges(job.getConfiguration(), ranges); } - + /** * Set the ranges to map over for this configuration object. * @@ -297,14 +297,14 @@ public abstract class InputFormatBase extends InputFormat { } conf.setStrings(RANGES, rangeStrings.toArray(new String[0])); } - + /** * @deprecated Use {@link #disableAutoAdjustRanges(Configuration)} instead */ public static void disableAutoAdjustRanges(JobContext job) { disableAutoAdjustRanges(job.getConfiguration()); } - + /** * Disables the adjustment of ranges for this configuration object. By default, overlapping ranges will be merged and ranges will be fit to existing tablet * boundaries. Disabling this adjustment will cause there to be exactly one mapper per range set using {@link #setRanges(Configuration, Collection)}. @@ -315,14 +315,14 @@ public abstract class InputFormatBase extends InputFormat { public static void disableAutoAdjustRanges(Configuration conf) { conf.setBoolean(AUTO_ADJUST_RANGES, false); } - + /** * @deprecated since 1.4 use {@link org.apache.accumulo.core.iterators.user.RegExFilter} and {@link #addIterator(Configuration, IteratorSetting)} */ public static enum RegexType { ROW, COLUMN_FAMILY, COLUMN_QUALIFIER, VALUE } - + /** * @deprecated since 1.4 use {@link #addIterator(Configuration, IteratorSetting)} * @see org.apache.accumulo.core.iterators.user.RegExFilter#setRegexs(IteratorSetting, String, String, String, String, boolean) @@ -356,14 +356,14 @@ public abstract class InputFormatBase extends InputFormat { throw new RuntimeException(e); } } - + /** * @deprecated Use {@link #setMaxVersions(Configuration,int)} instead */ public static void setMaxVersions(JobContext job, int maxVersions) throws IOException { setMaxVersions(job.getConfiguration(), maxVersions); } - + /** * Sets the max # of values that may be returned for an individual Accumulo cell. By default, applied before all other Accumulo iterators (highest priority) * leveraged in the scan by the record reader. To adjust priority use setIterator() & setIteratorOptions() w/ the VersioningIterator type explicitly. @@ -380,7 +380,7 @@ public abstract class InputFormatBase extends InputFormat { throw new IOException("Invalid maxVersions: " + maxVersions + ". Must be >= 1"); conf.setInt(MAX_VERSIONS, maxVersions); } - + /** *

* Enable reading offline tables. This will make the map reduce job directly read the tables files. If the table is not offline, then the job will fail. If @@ -407,18 +407,18 @@ public abstract class InputFormatBase extends InputFormat { * @param scanOff * pass true to read offline tables */ - + public static void setScanOffline(Configuration conf, boolean scanOff) { conf.setBoolean(READ_OFFLINE, scanOff); } - + /** * @deprecated Use {@link #fetchColumns(Configuration,Collection)} instead */ public static void fetchColumns(JobContext job, Collection> columnFamilyColumnQualifierPairs) { fetchColumns(job.getConfiguration(), columnFamilyColumnQualifierPairs); } - + /** * Restricts the columns that will be mapped over for this configuration object. * @@ -429,27 +429,33 @@ public abstract class InputFormatBase extends InputFormat { * selected. An empty set is the default and is equivalent to scanning the all columns. */ public static void fetchColumns(Configuration conf, Collection> columnFamilyColumnQualifierPairs) { + String[] columnStrings = serializeColumns(columnFamilyColumnQualifierPairs); + conf.setStrings(COLUMNS, columnStrings); + } + + public static String[] serializeColumns(Collection> columnFamilyColumnQualifierPairs) { ArgumentChecker.notNull(columnFamilyColumnQualifierPairs); ArrayList columnStrings = new ArrayList(columnFamilyColumnQualifierPairs.size()); for (Pair column : columnFamilyColumnQualifierPairs) { if (column.getFirst() == null) throw new IllegalArgumentException("Column family can not be null"); - + String col = new String(Base64.encodeBase64(TextUtil.getBytes(column.getFirst()))); if (column.getSecond() != null) col += ":" + new String(Base64.encodeBase64(TextUtil.getBytes(column.getSecond()))); columnStrings.add(col); } - conf.setStrings(COLUMNS, columnStrings.toArray(new String[0])); + + return columnStrings.toArray(new String[0]); } - + /** * @deprecated Use {@link #setLogLevel(Configuration,Level)} instead */ public static void setLogLevel(JobContext job, Level level) { setLogLevel(job.getConfiguration(), level); } - + /** * Sets the log level for this configuration object. * @@ -463,14 +469,14 @@ public abstract class InputFormatBase extends InputFormat { log.setLevel(level); conf.setInt(LOGLEVEL, level.toInt()); } - + /** * @deprecated Use {@link #addIterator(Configuration,IteratorSetting)} instead */ public static void addIterator(JobContext job, IteratorSetting cfg) { addIterator(job.getConfiguration(), cfg); } - + /** * Encode an iterator on the input for this configuration object. * @@ -482,7 +488,7 @@ public abstract class InputFormatBase extends InputFormat { public static void addIterator(Configuration conf, IteratorSetting cfg) { // First check to see if anything has been set already String iterators = conf.get(ITERATORS); - + // No iterators specified yet, create a new string if (iterators == null || iterators.isEmpty()) { iterators = new AccumuloIterator(cfg.getPriority(), cfg.getIteratorClass(), cfg.getName()).toString(); @@ -495,9 +501,9 @@ public abstract class InputFormatBase extends InputFormat { for (Entry entry : cfg.getOptions().entrySet()) { if (entry.getValue() == null) continue; - + String iteratorOptions = conf.get(ITERATORS_OPTIONS); - + // No options specified yet, create a new string if (iteratorOptions == null || iteratorOptions.isEmpty()) { iteratorOptions = new AccumuloIteratorOption(cfg.getName(), entry.getKey(), entry.getValue()).toString(); @@ -505,12 +511,12 @@ public abstract class InputFormatBase extends InputFormat { // append the next option & reset iteratorOptions = iteratorOptions.concat(ITERATORS_DELIM + new AccumuloIteratorOption(cfg.getName(), entry.getKey(), entry.getValue())); } - + // Store the options w/ the job conf.set(ITERATORS_OPTIONS, iteratorOptions); } } - + /** * Specify an Accumulo iterator type to manage the behavior of the underlying table scan this InputFormat's RecordReader will conduct, w/ priority dictating * the order in which specified iterators are applied. Repeat calls to specify multiple iterators are allowed. @@ -529,7 +535,7 @@ public abstract class InputFormatBase extends InputFormat { public static void setIterator(JobContext job, int priority, String iteratorClass, String iteratorName) { // First check to see if anything has been set already String iterators = job.getConfiguration().get(ITERATORS); - + // No iterators specified yet, create a new string if (iterators == null || iterators.isEmpty()) { iterators = new AccumuloIterator(priority, iteratorClass, iteratorName).toString(); @@ -539,9 +545,9 @@ public abstract class InputFormatBase extends InputFormat { } // Store the iterators w/ the job job.getConfiguration().set(ITERATORS, iterators); - + } - + /** * Specify an option for a named Accumulo iterator, further specifying that iterator's behavior. * @@ -559,9 +565,9 @@ public abstract class InputFormatBase extends InputFormat { public static void setIteratorOption(JobContext job, String iteratorName, String key, String value) { if (iteratorName == null || key == null || value == null) return; - + String iteratorOptions = job.getConfiguration().get(ITERATORS_OPTIONS); - + // No options specified yet, create a new string if (iteratorOptions == null || iteratorOptions.isEmpty()) { iteratorOptions = new AccumuloIteratorOption(iteratorName, key, value).toString(); @@ -569,18 +575,18 @@ public abstract class InputFormatBase extends InputFormat { // append the next option & reset iteratorOptions = iteratorOptions.concat(ITERATORS_DELIM + new AccumuloIteratorOption(iteratorName, key, value)); } - + // Store the options w/ the job job.getConfiguration().set(ITERATORS_OPTIONS, iteratorOptions); } - + /** * @deprecated Use {@link #isIsolated(Configuration)} instead */ protected static boolean isIsolated(JobContext job) { return isIsolated(job.getConfiguration()); } - + /** * Determines whether a configuration has isolation enabled. * @@ -592,14 +598,14 @@ public abstract class InputFormatBase extends InputFormat { protected static boolean isIsolated(Configuration conf) { return conf.getBoolean(ISOLATED, false); } - + /** * @deprecated Use {@link #usesLocalIterators(Configuration)} instead */ protected static boolean usesLocalIterators(JobContext job) { return usesLocalIterators(job.getConfiguration()); } - + /** * Determines whether a configuration uses local iterators. * @@ -611,14 +617,14 @@ public abstract class InputFormatBase extends InputFormat { protected static boolean usesLocalIterators(Configuration conf) { return conf.getBoolean(LOCAL_ITERATORS, false); } - + /** * @deprecated Use {@link #getUsername(Configuration)} instead */ protected static String getUsername(JobContext job) { return getUsername(job.getConfiguration()); } - + /** * Gets the user name from the configuration. * @@ -630,7 +636,7 @@ public abstract class InputFormatBase extends InputFormat { protected static String getUsername(Configuration conf) { return conf.get(USERNAME); } - + /** * WARNING: The password is stored in the Configuration and shared with all MapReduce tasks; It is BASE64 encoded to provide a charset safe conversion to a * string, and is not intended to be secure. @@ -640,7 +646,7 @@ public abstract class InputFormatBase extends InputFormat { protected static byte[] getPassword(JobContext job) { return getPassword(job.getConfiguration()); } - + /** * Gets the password from the configuration. WARNING: The password is stored in the Configuration and shared with all MapReduce tasks; It is BASE64 encoded to * provide a charset safe conversion to a string, and is not intended to be secure. @@ -653,14 +659,14 @@ public abstract class InputFormatBase extends InputFormat { protected static byte[] getPassword(Configuration conf) { return Base64.decodeBase64(conf.get(PASSWORD, "").getBytes()); } - + /** * @deprecated Use {@link #getTablename(Configuration)} instead */ protected static String getTablename(JobContext job) { return getTablename(job.getConfiguration()); } - + /** * Gets the table name from the configuration. * @@ -672,14 +678,14 @@ public abstract class InputFormatBase extends InputFormat { protected static String getTablename(Configuration conf) { return conf.get(TABLE_NAME); } - + /** * @deprecated Use {@link #getAuthorizations(Configuration)} instead */ protected static Authorizations getAuthorizations(JobContext job) { return getAuthorizations(job.getConfiguration()); } - + /** * Gets the authorizations to set for the scans from the configuration. * @@ -692,14 +698,14 @@ public abstract class InputFormatBase extends InputFormat { String authString = conf.get(AUTHORIZATIONS); return authString == null ? Constants.NO_AUTHS : new Authorizations(authString.split(",")); } - + /** * @deprecated Use {@link #getInstance(Configuration)} instead */ protected static Instance getInstance(JobContext job) { return getInstance(job.getConfiguration()); } - + /** * Initializes an Accumulo {@link Instance} based on the configuration. * @@ -714,14 +720,14 @@ public abstract class InputFormatBase extends InputFormat { return new MockInstance(conf.get(INSTANCE_NAME)); return new ZooKeeperInstance(conf.get(INSTANCE_NAME), conf.get(ZOOKEEPERS)); } - + /** * @deprecated Use {@link #getTabletLocator(Configuration)} instead */ protected static TabletLocator getTabletLocator(JobContext job) throws TableNotFoundException { return getTabletLocator(job.getConfiguration()); } - + /** * Initializes an Accumulo {@link TabletLocator} based on the configuration. * @@ -741,14 +747,14 @@ public abstract class InputFormatBase extends InputFormat { return TabletLocator.getInstance(instance, new AuthInfo(username, ByteBuffer.wrap(password), instance.getInstanceID()), new Text(Tables.getTableId(instance, tableName))); } - + /** * @deprecated Use {@link #getRanges(Configuration)} instead */ protected static List getRanges(JobContext job) throws IOException { return getRanges(job.getConfiguration()); } - + /** * Gets the ranges to scan over from a configuration object. * @@ -769,7 +775,7 @@ public abstract class InputFormatBase extends InputFormat { } return ranges; } - + /** * @deprecated since 1.4 use {@link org.apache.accumulo.core.iterators.user.RegExFilter} and {@link #addIterator(Configuration, IteratorSetting)} * @see #setRegex(JobContext, RegexType, String) @@ -802,14 +808,14 @@ public abstract class InputFormatBase extends InputFormat { throw new RuntimeException(e); } } - + /** * @deprecated Use {@link #getFetchedColumns(Configuration)} instead */ protected static Set> getFetchedColumns(JobContext job) { return getFetchedColumns(job.getConfiguration()); } - + /** * Gets the columns to be mapped over from this configuration object. * @@ -819,8 +825,19 @@ public abstract class InputFormatBase extends InputFormat { * @see #fetchColumns(Configuration, Collection) */ protected static Set> getFetchedColumns(Configuration conf) { + ArgumentChecker.notNull(conf); + + return deserializeFetchedColumns(conf.getStrings(COLUMNS)); + } + + public static Set> deserializeFetchedColumns(String[] serialized) { Set> columns = new HashSet>(); - for (String col : conf.getStringCollection(COLUMNS)) { + + if (null == serialized) { + return columns; + } + + for (String col : serialized) { int idx = col.indexOf(":"); Text cf = new Text(idx < 0 ? Base64.decodeBase64(col.getBytes()) : Base64.decodeBase64(col.substring(0, idx).getBytes())); Text cq = idx < 0 ? null : new Text(Base64.decodeBase64(col.substring(idx + 1).getBytes())); @@ -828,14 +845,14 @@ public abstract class InputFormatBase extends InputFormat { } return columns; } - + /** * @deprecated Use {@link #getAutoAdjustRanges(Configuration)} instead */ protected static boolean getAutoAdjustRanges(JobContext job) { return getAutoAdjustRanges(job.getConfiguration()); } - + /** * Determines whether a configuration has auto-adjust ranges enabled. * @@ -847,14 +864,14 @@ public abstract class InputFormatBase extends InputFormat { protected static boolean getAutoAdjustRanges(Configuration conf) { return conf.getBoolean(AUTO_ADJUST_RANGES, true); } - + /** * @deprecated Use {@link #getLogLevel(Configuration)} instead */ protected static Level getLogLevel(JobContext job) { return getLogLevel(job.getConfiguration()); } - + /** * Gets the log level from this configuration. * @@ -866,7 +883,7 @@ public abstract class InputFormatBase extends InputFormat { protected static Level getLogLevel(Configuration conf) { return Level.toLevel(conf.getInt(LOGLEVEL, Level.INFO.toInt())); } - + // InputFormat doesn't have the equivalent of OutputFormat's // checkOutputSpecs(JobContext job) /** @@ -875,7 +892,7 @@ public abstract class InputFormatBase extends InputFormat { protected static void validateOptions(JobContext job) throws IOException { validateOptions(job.getConfiguration()); } - + // InputFormat doesn't have the equivalent of OutputFormat's // checkOutputSpecs(JobContext job) /** @@ -898,7 +915,7 @@ public abstract class InputFormatBase extends InputFormat { throw new IOException("Unable to authenticate user"); if (!c.securityOperations().hasTablePermission(getUsername(conf), getTablename(conf), TablePermission.READ)) throw new IOException("Unable to access table"); - + if (!usesLocalIterators(conf)) { // validate that any scan-time iterators can be loaded by the the tablet servers for (AccumuloIterator iter : getIterators(conf)) { @@ -906,21 +923,21 @@ public abstract class InputFormatBase extends InputFormat { throw new AccumuloException("Servers are unable to load " + iter.getIteratorClass() + " as a " + SortedKeyValueIterator.class.getName()); } } - + } catch (AccumuloException e) { throw new IOException(e); } catch (AccumuloSecurityException e) { throw new IOException(e); } } - + /** * @deprecated Use {@link #getMaxVersions(Configuration)} instead */ protected static int getMaxVersions(JobContext job) { return getMaxVersions(job.getConfiguration()); } - + /** * Gets the maxVersions to use for the {@link VersioningIterator} from this configuration. * @@ -932,7 +949,7 @@ public abstract class InputFormatBase extends InputFormat { protected static int getMaxVersions(Configuration conf) { return conf.getInt(MAX_VERSIONS, -1); } - + protected static boolean isOfflineScan(Configuration conf) { return conf.getBoolean(READ_OFFLINE, false); } @@ -945,7 +962,7 @@ public abstract class InputFormatBase extends InputFormat { protected static List getIterators(JobContext job) { return getIterators(job.getConfiguration()); } - + /** * Gets a list of the iterator settings (for iterators to apply to a scanner) from this configuration. * @@ -955,13 +972,13 @@ public abstract class InputFormatBase extends InputFormat { * @see #addIterator(Configuration, IteratorSetting) */ protected static List getIterators(Configuration conf) { - + String iterators = conf.get(ITERATORS); - + // If no iterators are present, return an empty list if (iterators == null || iterators.isEmpty()) return new ArrayList(); - + // Compose the set of iterators encoded in the job configuration StringTokenizer tokens = new StringTokenizer(conf.get(ITERATORS), ITERATORS_DELIM); List list = new ArrayList(); @@ -971,14 +988,14 @@ public abstract class InputFormatBase extends InputFormat { } return list; } - + /** * @deprecated Use {@link #getIteratorOptions(Configuration)} instead */ protected static List getIteratorOptions(JobContext job) { return getIteratorOptions(job.getConfiguration()); } - + /** * Gets a list of the iterator options specified on this configuration. * @@ -989,11 +1006,11 @@ public abstract class InputFormatBase extends InputFormat { */ protected static List getIteratorOptions(Configuration conf) { String iteratorOptions = conf.get(ITERATORS_OPTIONS); - + // If no options are present, return an empty list if (iteratorOptions == null || iteratorOptions.isEmpty()) return new ArrayList(); - + // Compose the set of options encoded in the job configuration StringTokenizer tokens = new StringTokenizer(conf.get(ITERATORS_OPTIONS), ITERATORS_DELIM); List list = new ArrayList(); @@ -1003,13 +1020,13 @@ public abstract class InputFormatBase extends InputFormat { } return list; } - + protected abstract static class RecordReaderBase extends RecordReader { protected long numKeysRead; protected Iterator> scannerIterator; private boolean scannerRegexEnabled = false; protected RangeInputSplit split; - + /** * @deprecated since 1.4, configure {@link org.apache.accumulo.core.iterators.user.RegExFilter} instead. */ @@ -1024,7 +1041,7 @@ public abstract class InputFormatBase extends InputFormat { log.info("Setting " + methodName + " to " + regex); } } - + /** * @deprecated since 1.4, configure {@link org.apache.accumulo.core.iterators.user.RegExFilter} instead. */ @@ -1039,15 +1056,16 @@ public abstract class InputFormatBase extends InputFormat { throw new AccumuloException("Can't set up regex for scanner"); } } - + // Apply the configured iterators from the job to the scanner /** * @deprecated Use {@link #setupIterators(Configuration,Scanner)} instead */ - protected void setupIterators(TaskAttemptContext attempt, Scanner scanner) throws AccumuloException { - setupIterators(attempt.getConfiguration(), scanner); + protected void setupIterators(TaskAttemptContext attempt, Scanner scanner, List iterators, List options) + throws AccumuloException { + setupIterators(attempt.getConfiguration(), scanner, iterators, options); } - + /** * Apply the configured iterators from the configuration to the scanner. * @@ -1057,10 +1075,9 @@ public abstract class InputFormatBase extends InputFormat { * the scanner to configure * @throws AccumuloException */ - protected void setupIterators(Configuration conf, Scanner scanner) throws AccumuloException { - List iterators = getIterators(conf); - List options = getIteratorOptions(conf); - + protected void setupIterators(Configuration conf, Scanner scanner, List iterators, List options) + throws AccumuloException { + Map scanIterators = new HashMap(); for (AccumuloIterator iterator : iterators) { scanIterators.put(iterator.getIteratorName(), new IteratorSetting(iterator.getPriority(), iterator.getIteratorName(), iterator.getIteratorClass())); @@ -1072,14 +1089,14 @@ public abstract class InputFormatBase extends InputFormat { scanner.addScanIterator(scanIterators.get(iterator.getIteratorName())); } } - + /** * @deprecated Use {@link #setupMaxVersions(Configuration,Scanner)} instead */ - protected void setupMaxVersions(TaskAttemptContext attempt, Scanner scanner) { - setupMaxVersions(attempt.getConfiguration(), scanner); + protected void setupMaxVersions(TaskAttemptContext attempt, Scanner scanner, int maxVersions) { + setupMaxVersions(attempt.getConfiguration(), scanner, maxVersions); } - + /** * If maxVersions has been set, configure a {@link VersioningIterator} at priority 0 for this scanner. * @@ -1088,8 +1105,7 @@ public abstract class InputFormatBase extends InputFormat { * @param scanner * the scanner to configure */ - protected void setupMaxVersions(Configuration conf, Scanner scanner) { - int maxVersions = getMaxVersions(conf); + protected void setupMaxVersions(Configuration conf, Scanner scanner, int maxVersions) { // Check to make sure its a legit value if (maxVersions >= 1) { IteratorSetting vers = new IteratorSetting(0, "vers", VersioningIterator.class); @@ -1097,54 +1113,128 @@ public abstract class InputFormatBase extends InputFormat { scanner.addScanIterator(vers); } } - + /** * Initialize a scanner over the given input split using this task attempt configuration. */ public void initialize(InputSplit inSplit, TaskAttemptContext attempt) throws IOException { Scanner scanner; split = (RangeInputSplit) inSplit; - log.debug("Initializing input split: " + split.range); + log.debug("Initializing input split: " + split.getRange()); Configuration conf = attempt.getConfiguration(); - Instance instance = getInstance(conf); - String user = getUsername(conf); - byte[] password = getPassword(conf); - Authorizations authorizations = getAuthorizations(conf); + + Instance instance = split.getInstance(); + if (null == instance) { + instance = getInstance(conf); + } + + String user = split.getUsername(); + if (null == user) { + user = getUsername(conf); + } + + byte[] password = split.getPassword(); + if (null == password) { + password = getPassword(conf); + } + + Authorizations authorizations = split.getAuths(); + if (null == authorizations) { + authorizations = getAuthorizations(conf); + } + + String table = split.getTable(); + if (null == table) { + table = getTablename(conf); + } + + Boolean isOffline = split.isOffline(); + if (null == isOffline) { + isOffline = isOfflineScan(conf); + } + + Boolean isIsolated = split.isIsolatedScan(); + if (null == isIsolated) { + isIsolated = isIsolated(conf); + } + + Boolean usesLocalIterators = split.usesLocalIterators(); + if (null == usesLocalIterators) { + usesLocalIterators = usesLocalIterators(conf); + } + + String rowRegex = split.getRowRegex(); + if (null == rowRegex) { + rowRegex = conf.get(ROW_REGEX); + } + + String colfRegex = split.getColfamRegex(); + if (null == colfRegex) { + colfRegex = conf.get(COLUMN_FAMILY_REGEX); + } + + String colqRegex = split.getColqualRegex(); + if (null == colqRegex) { + colqRegex = conf.get(COLUMN_QUALIFIER_REGEX); + } + + String valueRegex = split.getValueRegex(); + if (null == valueRegex) { + valueRegex = conf.get(VALUE_REGEX); + } + + Integer maxVersions = split.getMaxVersions(); + if (null == maxVersions) { + maxVersions = getMaxVersions(conf); + } + + List iterators = split.getIterators(); + if (null == iterators) { + iterators = getIterators(conf); + } + + List options = split.getOptions(); + if (null == options) { + options = getIteratorOptions(conf); + } + Set> columns = split.getFetchedColumns(); + if (null == columns) { + columns = getFetchedColumns(conf); + } + try { log.debug("Creating connector with user: " + user); Connector conn = instance.getConnector(user, password); - log.debug("Creating scanner for table: " + getTablename(conf)); + log.debug("Creating scanner for table: " + table); log.debug("Authorizations are: " + authorizations); - if (isOfflineScan(conf)) { - scanner = new OfflineScanner(instance, new AuthInfo(user, ByteBuffer.wrap(password), instance.getInstanceID()), Tables.getTableId(instance, - getTablename(conf)), authorizations); + if (isOffline) { + scanner = new OfflineScanner(instance, new AuthInfo(user, ByteBuffer.wrap(password), instance.getInstanceID()), Tables.getTableId(instance, table), + authorizations); } else { - scanner = conn.createScanner(getTablename(conf), authorizations); + scanner = conn.createScanner(table, authorizations); } - if (isIsolated(conf)) { + if (isIsolated) { log.info("Creating isolated scanner"); scanner = new IsolatedScanner(scanner); } - if (usesLocalIterators(conf)) { + if (usesLocalIterators) { log.info("Using local iterators"); scanner = new ClientSideIteratorScanner(scanner); } - setupMaxVersions(conf, scanner); - if (conf.get(ROW_REGEX) != null || conf.get(COLUMN_FAMILY_REGEX) != null || conf.get(COLUMN_QUALIFIER_REGEX) != null || - conf.get(VALUE_REGEX) != null) { + setupMaxVersions(conf, scanner, maxVersions); + if (rowRegex != null || colfRegex != null || colqRegex != null || valueRegex != null) { IteratorSetting is = new IteratorSetting(50, RegExFilter.class); - RegExFilter.setRegexs(is, conf.get(ROW_REGEX), conf.get(COLUMN_FAMILY_REGEX), conf.get(COLUMN_QUALIFIER_REGEX), - conf.get(VALUE_REGEX), false); + RegExFilter.setRegexs(is, rowRegex, colfRegex, colqRegex, valueRegex, false); scanner.addScanIterator(is); } - setupIterators(conf, scanner); + setupIterators(conf, scanner, iterators, options); } catch (Exception e) { throw new IOException(e); } - + // setup a scanner within the bounds of this split - for (Pair c : getFetchedColumns(conf)) { + for (Pair c : columns) { if (c.getSecond() != null) { log.debug("Fetching column " + c.getFirst() + ":" + c.getSecond()); scanner.fetchColumn(c.getFirst(), c.getSecond()); @@ -1153,48 +1243,48 @@ public abstract class InputFormatBase extends InputFormat { scanner.fetchColumnFamily(c.getFirst()); } } - - scanner.setRange(split.range); - + + scanner.setRange(split.getRange()); + numKeysRead = 0; - + // do this last after setting all scanner options scannerIterator = scanner.iterator(); } - + public void close() {} - + public float getProgress() throws IOException { if (numKeysRead > 0 && currentKey == null) return 1.0f; return split.getProgress(currentKey); } - + protected K currentK = null; protected V currentV = null; protected Key currentKey = null; protected Value currentValue = null; - + @Override public K getCurrentKey() throws IOException, InterruptedException { return currentK; } - + @Override public V getCurrentValue() throws IOException, InterruptedException { return currentV; } } - + Map>> binOfflineTable(JobContext job, String tableName, List ranges) throws TableNotFoundException, AccumuloException, AccumuloSecurityException { - + Map>> binnedRanges = new HashMap>>(); Instance instance = getInstance(job.getConfiguration()); Connector conn = instance.getConnector(getUsername(job.getConfiguration()), getPassword(job.getConfiguration())); String tableId = Tables.getTableId(instance, tableName); - + if (Tables.getTableState(instance, tableId) != TableState.OFFLINE) { Tables.clearCache(instance); if (Tables.getTableState(instance, tableId) != TableState.OFFLINE) { @@ -1204,12 +1294,12 @@ public abstract class InputFormatBase extends InputFormat { for (Range range : ranges) { Text startRow; - + if (range.getStartKey() != null) startRow = range.getStartKey().getRow(); else startRow = new Text(); - + Range metadataRange = new Range(new KeyExtent(new Text(tableId), startRow, null).getMetadataEntry(), true, null, false); Scanner scanner = conn.createScanner(Constants.METADATA_TABLE_NAME, Constants.NO_AUTHS); ColumnFQ.fetch(scanner, Constants.METADATA_PREV_ROW_COLUMN); @@ -1217,9 +1307,9 @@ public abstract class InputFormatBase extends InputFormat { scanner.fetchColumnFamily(Constants.METADATA_CURRENT_LOCATION_COLUMN_FAMILY); scanner.fetchColumnFamily(Constants.METADATA_FUTURE_LOCATION_COLUMN_FAMILY); scanner.setRange(metadataRange); - + RowIterator rowIter = new RowIterator(scanner); - + // TODO check that extents match prev extent KeyExtent lastExtent = null; @@ -1229,15 +1319,15 @@ public abstract class InputFormatBase extends InputFormat { String last = ""; KeyExtent extent = null; String location = null; - + while (row.hasNext()) { Entry entry = row.next(); Key key = entry.getKey(); - + if (key.getColumnFamily().equals(Constants.METADATA_LAST_LOCATION_COLUMN_FAMILY)) { last = entry.getValue().toString(); } - + if (key.getColumnFamily().equals(Constants.METADATA_CURRENT_LOCATION_COLUMN_FAMILY) || key.getColumnFamily().equals(Constants.METADATA_FUTURE_LOCATION_COLUMN_FAMILY)) { location = entry.getValue().toString(); @@ -1246,9 +1336,9 @@ public abstract class InputFormatBase extends InputFormat { if (Constants.METADATA_PREV_ROW_COLUMN.hasColumns(key)) { extent = new KeyExtent(key.getRow(), entry.getValue()); } - + } - + if (location != null) return null; @@ -1265,24 +1355,24 @@ public abstract class InputFormatBase extends InputFormat { tabletRanges = new HashMap>(); binnedRanges.put(last, tabletRanges); } - + List rangeList = tabletRanges.get(extent); if (rangeList == null) { rangeList = new ArrayList(); tabletRanges.put(extent, rangeList); } - + rangeList.add(range); if (extent.getEndRow() == null || range.afterEndKey(new Key(extent.getEndRow()).followingKey(PartialKey.ROW))) { break; } - + lastExtent = extent; } } - + return binnedRanges; } @@ -1290,18 +1380,35 @@ public abstract class InputFormatBase extends InputFormat { * Read the metadata table to get tablets and match up ranges to them. */ public List getSplits(JobContext job) throws IOException { - log.setLevel(getLogLevel(job.getConfiguration())); - validateOptions(job.getConfiguration()); - - String tableName = getTablename(job.getConfiguration()); - boolean autoAdjust = getAutoAdjustRanges(job.getConfiguration()); - List ranges = autoAdjust ? Range.mergeOverlapping(getRanges(job.getConfiguration())) : getRanges(job.getConfiguration()); - + Configuration conf = job.getConfiguration(); + + log.setLevel(getLogLevel(conf)); + validateOptions(conf); + + String tableName = getTablename(conf); + boolean autoAdjust = getAutoAdjustRanges(conf); + List ranges = autoAdjust ? Range.mergeOverlapping(getRanges(conf)) : getRanges(conf); + boolean offline = isOfflineScan(conf); + boolean isolated = isIsolated(conf); + boolean localIterators = usesLocalIterators(conf); + boolean mockInstance = conf.getBoolean(MOCK, false); + int maxVersions = getMaxVersions(conf); + String rowRegex = conf.get(ROW_REGEX), colfamRegex = conf.get(COLUMN_FAMILY_REGEX), colqualRegex = conf.get(COLUMN_QUALIFIER_REGEX), valueRegex = conf + .get(VALUE_REGEX); + Set> fetchedColumns = getFetchedColumns(conf); + Authorizations auths = getAuthorizations(conf); + byte[] password = getPassword(conf); + String username = getUsername(conf); + Instance instance = getInstance(conf); + List iterators = getIterators(conf); + List options = getIteratorOptions(conf); + Level logLevel = getLogLevel(conf); + if (ranges.isEmpty()) { ranges = new ArrayList(1); ranges.add(new Range()); } - + // get the metadata information for these ranges Map>> binnedRanges = new HashMap>>(); TabletLocator tl; @@ -1314,7 +1421,6 @@ public abstract class InputFormatBase extends InputFormat { binnedRanges = binOfflineTable(job, tableName, ranges); } } else { - Instance instance = getInstance(job.getConfiguration()); String tableId = null; tl = getTabletLocator(job.getConfiguration()); // its possible that the cache could contain complete, but old information about a tables tablets... so clear it @@ -1337,15 +1443,15 @@ public abstract class InputFormatBase extends InputFormat { } catch (Exception e) { throw new IOException(e); } - + ArrayList splits = new ArrayList(ranges.size()); HashMap> splitsToAdd = null; - + if (!autoAdjust) splitsToAdd = new HashMap>(); - + HashMap hostNameCache = new HashMap(); - + for (Entry>> tserverBin : binnedRanges.entrySet()) { String ip = tserverBin.getKey().split(":", 2)[0]; String location = hostNameCache.get(ip); @@ -1354,14 +1460,14 @@ public abstract class InputFormatBase extends InputFormat { location = inetAddress.getHostName(); hostNameCache.put(ip, location); } - + for (Entry> extentRanges : tserverBin.getValue().entrySet()) { Range ke = extentRanges.getKey().toDataRange(); for (Range r : extentRanges.getValue()) { if (autoAdjust) { // divide ranges into smaller ranges, based on the // tablets - splits.add(new RangeInputSplit(tableName, ke.clip(r), new String[] {location})); + splits.add(new RangeInputSplit(ke.clip(r), new String[] {location})); } else { // don't divide ranges ArrayList locations = splitsToAdd.get(r); @@ -1373,132 +1479,55 @@ public abstract class InputFormatBase extends InputFormat { } } } - + if (!autoAdjust) for (Entry> entry : splitsToAdd.entrySet()) - splits.add(new RangeInputSplit(tableName, entry.getKey(), entry.getValue().toArray(new String[0]))); - return splits; - } - - /** - * The Class RangeInputSplit. Encapsulates an Accumulo range for use in Map Reduce jobs. - */ - public static class RangeInputSplit extends InputSplit implements Writable { - private Range range; - private String[] locations; - - public RangeInputSplit() { - range = new Range(); - locations = new String[0]; - } - - public Range getRange() { - return range; - } - - private static byte[] extractBytes(ByteSequence seq, int numBytes) { - byte[] bytes = new byte[numBytes + 1]; - bytes[0] = 0; - for (int i = 0; i < numBytes; i++) { - if (i >= seq.length()) - bytes[i + 1] = 0; - else - bytes[i + 1] = seq.byteAt(i); - } - return bytes; - } - - public static float getProgress(ByteSequence start, ByteSequence end, ByteSequence position) { - int maxDepth = Math.min(Math.max(end.length(), start.length()), position.length()); - BigInteger startBI = new BigInteger(extractBytes(start, maxDepth)); - BigInteger endBI = new BigInteger(extractBytes(end, maxDepth)); - BigInteger positionBI = new BigInteger(extractBytes(position, maxDepth)); - return (float) (positionBI.subtract(startBI).doubleValue() / endBI.subtract(startBI).doubleValue()); - } - - public float getProgress(Key currentKey) { - if (currentKey == null) - return 0f; - if (range.getStartKey() != null && range.getEndKey() != null) { - if (range.getStartKey().compareTo(range.getEndKey(), PartialKey.ROW) != 0) { - // just look at the row progress - return getProgress(range.getStartKey().getRowData(), range.getEndKey().getRowData(), currentKey.getRowData()); - } else if (range.getStartKey().compareTo(range.getEndKey(), PartialKey.ROW_COLFAM) != 0) { - // just look at the column family progress - return getProgress(range.getStartKey().getColumnFamilyData(), range.getEndKey().getColumnFamilyData(), currentKey.getColumnFamilyData()); - } else if (range.getStartKey().compareTo(range.getEndKey(), PartialKey.ROW_COLFAM_COLQUAL) != 0) { - // just look at the column qualifier progress - return getProgress(range.getStartKey().getColumnQualifierData(), range.getEndKey().getColumnQualifierData(), currentKey.getColumnQualifierData()); - } - } - // if we can't figure it out, then claim no progress - return 0f; - } - - RangeInputSplit(String table, Range range, String[] locations) { - this.range = range; - this.locations = locations; - } - - /** - * This implementation of length is only an estimate, it does not provide exact values. Do not have your code rely on this return value. - */ - public long getLength() throws IOException { - Text startRow = range.isInfiniteStartKey() ? new Text(new byte[] {Byte.MIN_VALUE}) : range.getStartKey().getRow(); - Text stopRow = range.isInfiniteStopKey() ? new Text(new byte[] {Byte.MAX_VALUE}) : range.getEndKey().getRow(); - int maxCommon = Math.min(7, Math.min(startRow.getLength(), stopRow.getLength())); - long diff = 0; - - byte[] start = startRow.getBytes(); - byte[] stop = stopRow.getBytes(); - for (int i = 0; i < maxCommon; ++i) { - diff |= 0xff & (start[i] ^ stop[i]); - diff <<= Byte.SIZE; - } - - if (startRow.getLength() != stopRow.getLength()) - diff |= 0xff; - - return diff + 1; - } - - public String[] getLocations() throws IOException { - return locations; - } - - public void readFields(DataInput in) throws IOException { - range.readFields(in); - int numLocs = in.readInt(); - locations = new String[numLocs]; - for (int i = 0; i < numLocs; ++i) - locations[i] = in.readUTF(); - } - - public void write(DataOutput out) throws IOException { - range.write(out); - out.writeInt(locations.length); - for (int i = 0; i < locations.length; ++i) - out.writeUTF(locations[i]); + splits.add(new RangeInputSplit(entry.getKey(), entry.getValue().toArray(new String[0]))); + + for (InputSplit inputSplit : splits) { + RangeInputSplit split = (RangeInputSplit) inputSplit; + + split.setTable(tableName); + split.setOffline(offline); + split.setIsolatedScan(isolated); + split.setUsesLocalIterators(localIterators); + split.setMockInstance(mockInstance); + split.setMaxVersions(maxVersions); + split.setRowRegex(rowRegex); + split.setColfamRegex(colfamRegex); + split.setColqualRegex(colqualRegex); + split.setValueRegex(valueRegex); + split.setFetchedColumns(fetchedColumns); + split.setUsername(username); + split.setPassword(password); + split.setInstanceName(instance.getInstanceName()); + split.setZooKeepers(instance.getZooKeepers()); + split.setAuths(auths); + split.setIterators(iterators); + split.setOptions(options); + split.setLogLevel(logLevel); } + + return splits; } - + /** * The Class IteratorSetting. Encapsulates specifics for an Accumulo iterator's name & priority. */ static class AccumuloIterator { - + private static final String FIELD_SEP = ":"; - + private int priority; private String iteratorClass; private String iteratorName; - + public AccumuloIterator(int priority, String iteratorClass, String iteratorName) { this.priority = priority; this.iteratorClass = iteratorClass; this.iteratorName = iteratorName; } - + // Parses out a setting given an string supplied from an earlier toString() call public AccumuloIterator(String iteratorSetting) { // Parse the string to expand the iterator @@ -1507,42 +1536,42 @@ public abstract class InputFormatBase extends InputFormat { iteratorClass = tokenizer.nextToken(); iteratorName = tokenizer.nextToken(); } - + public int getPriority() { return priority; } - + public String getIteratorClass() { return iteratorClass; } - + public String getIteratorName() { return iteratorName; } - + @Override public String toString() { return new String(priority + FIELD_SEP + iteratorClass + FIELD_SEP + iteratorName); } - + } - + /** * The Class AccumuloIteratorOption. Encapsulates specifics for an Accumulo iterator's optional configuration details - associated via the iteratorName. */ static class AccumuloIteratorOption { private static final String FIELD_SEP = ":"; - + private String iteratorName; private String key; private String value; - + public AccumuloIteratorOption(String iteratorName, String key, String value) { this.iteratorName = iteratorName; this.key = key; this.value = value; } - + // Parses out an option given a string supplied from an earlier toString() call public AccumuloIteratorOption(String iteratorOption) { StringTokenizer tokenizer = new StringTokenizer(iteratorOption, FIELD_SEP); @@ -1554,19 +1583,19 @@ public abstract class InputFormatBase extends InputFormat { throw new RuntimeException(e); } } - + public String getIteratorName() { return iteratorName; } - + public String getKey() { return key; } - + public String getValue() { return value; } - + @Override public String toString() { try { @@ -1575,7 +1604,7 @@ public abstract class InputFormatBase extends InputFormat { throw new RuntimeException(e); } } - + } - + } http://git-wip-us.apache.org/repos/asf/accumulo/blob/45ae55fc/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/RangeInputSplit.java ---------------------------------------------------------------------- diff --git a/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/RangeInputSplit.java b/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/RangeInputSplit.java new file mode 100644 index 0000000..e372801 --- /dev/null +++ b/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/RangeInputSplit.java @@ -0,0 +1,493 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.core.client.mapreduce; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.math.BigInteger; +import java.util.List; +import java.util.Set; + +import org.apache.accumulo.core.client.Instance; +import org.apache.accumulo.core.client.ZooKeeperInstance; +import org.apache.accumulo.core.client.mapreduce.InputFormatBase.AccumuloIterator; +import org.apache.accumulo.core.client.mapreduce.InputFormatBase.AccumuloIteratorOption; +import org.apache.accumulo.core.client.mock.MockInstance; +import org.apache.accumulo.core.data.ByteSequence; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.PartialKey; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.core.util.Pair; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.util.StringUtils; +import org.apache.log4j.Level; + +/** + * The Class RangeInputSplit. Encapsulates an Accumulo range for use in Map Reduce jobs. + */ +public class RangeInputSplit extends InputSplit implements Writable { + private Range range; + private String[] locations; + private String table, instanceName, zooKeepers, username; + private String rowRegex, colfamRegex, colqualRegex, valueRegex; + private byte[] password; + private Boolean offline, mockInstance, isolatedScan, localIterators; + private Integer maxVersions; + private Authorizations auths; + private Set> fetchedColumns; + private List iterators; + private List options; + private Level level; + + public RangeInputSplit() { + range = new Range(); + locations = new String[0]; + } + + public RangeInputSplit(Range range, String[] locations) { + this.range = range; + this.locations = locations; + } + + public Range getRange() { + return range; + } + + private static byte[] extractBytes(ByteSequence seq, int numBytes) { + byte[] bytes = new byte[numBytes + 1]; + bytes[0] = 0; + for (int i = 0; i < numBytes; i++) { + if (i >= seq.length()) + bytes[i + 1] = 0; + else + bytes[i + 1] = seq.byteAt(i); + } + return bytes; + } + + public static float getProgress(ByteSequence start, ByteSequence end, ByteSequence position) { + int maxDepth = Math.min(Math.max(end.length(), start.length()), position.length()); + BigInteger startBI = new BigInteger(extractBytes(start, maxDepth)); + BigInteger endBI = new BigInteger(extractBytes(end, maxDepth)); + BigInteger positionBI = new BigInteger(extractBytes(position, maxDepth)); + return (float) (positionBI.subtract(startBI).doubleValue() / endBI.subtract(startBI).doubleValue()); + } + + public float getProgress(Key currentKey) { + if (currentKey == null) + return 0f; + if (range.getStartKey() != null && range.getEndKey() != null) { + if (range.getStartKey().compareTo(range.getEndKey(), PartialKey.ROW) != 0) { + // just look at the row progress + return getProgress(range.getStartKey().getRowData(), range.getEndKey().getRowData(), currentKey.getRowData()); + } else if (range.getStartKey().compareTo(range.getEndKey(), PartialKey.ROW_COLFAM) != 0) { + // just look at the column family progress + return getProgress(range.getStartKey().getColumnFamilyData(), range.getEndKey().getColumnFamilyData(), currentKey.getColumnFamilyData()); + } else if (range.getStartKey().compareTo(range.getEndKey(), PartialKey.ROW_COLFAM_COLQUAL) != 0) { + // just look at the column qualifier progress + return getProgress(range.getStartKey().getColumnQualifierData(), range.getEndKey().getColumnQualifierData(), currentKey.getColumnQualifierData()); + } + } + // if we can't figure it out, then claim no progress + return 0f; + } + + /** + * This implementation of length is only an estimate, it does not provide exact values. Do not have your code rely on this return value. + */ + public long getLength() throws IOException { + Text startRow = range.isInfiniteStartKey() ? new Text(new byte[] {Byte.MIN_VALUE}) : range.getStartKey().getRow(); + Text stopRow = range.isInfiniteStopKey() ? new Text(new byte[] {Byte.MAX_VALUE}) : range.getEndKey().getRow(); + int maxCommon = Math.min(7, Math.min(startRow.getLength(), stopRow.getLength())); + long diff = 0; + + byte[] start = startRow.getBytes(); + byte[] stop = stopRow.getBytes(); + for (int i = 0; i < maxCommon; ++i) { + diff |= 0xff & (start[i] ^ stop[i]); + diff <<= Byte.SIZE; + } + + if (startRow.getLength() != stopRow.getLength()) + diff |= 0xff; + + return diff + 1; + } + + public String[] getLocations() throws IOException { + return locations; + } + + public void readFields(DataInput in) throws IOException { + range.readFields(in); + int numLocs = in.readInt(); + locations = new String[numLocs]; + for (int i = 0; i < numLocs; ++i) + locations[i] = in.readUTF(); + + if (in.readBoolean()) { + isolatedScan = in.readBoolean(); + } + + if (in.readBoolean()) { + offline = in.readBoolean(); + } + + if (in.readBoolean()) { + localIterators = in.readBoolean(); + } + + if (in.readBoolean()) { + mockInstance = in.readBoolean(); + } + + if (in.readBoolean()) { + maxVersions = in.readInt(); + } + + if (in.readBoolean()) { + rowRegex = in.readUTF(); + } + + if (in.readBoolean()) { + colfamRegex = in.readUTF(); + } + + if (in.readBoolean()) { + colqualRegex = in.readUTF(); + } + + if (in.readBoolean()) { + valueRegex = in.readUTF(); + } + + if (in.readBoolean()) { + int numColumns = in.readInt(); + String[] columns = new String[numColumns]; + for (int i = 0; i < numColumns; i++) { + columns[i] = in.readUTF(); + } + + fetchedColumns = InputFormatBase.deserializeFetchedColumns(columns); + } + + if (in.readBoolean()) { + auths = new Authorizations(StringUtils.split(in.readUTF())); + } + + if (in.readBoolean()) { + username = in.readUTF(); + } + + if (in.readBoolean()) { + password = in.readUTF().getBytes(); + } + + if (in.readBoolean()) { + instanceName = in.readUTF(); + } + + if (in.readBoolean()) { + zooKeepers = in.readUTF(); + } + + if (in.readBoolean()) { + level = Level.toLevel(in.readInt()); + } + } + + public void write(DataOutput out) throws IOException { + range.write(out); + out.writeInt(locations.length); + for (int i = 0; i < locations.length; ++i) + out.writeUTF(locations[i]); + + out.writeBoolean(null != isolatedScan); + if (null != isolatedScan) { + out.writeBoolean(isolatedScan); + } + + out.writeBoolean(null != offline); + if (null != offline) { + out.writeBoolean(offline); + } + + out.writeBoolean(null != localIterators); + if (null != localIterators) { + out.writeBoolean(localIterators); + } + + out.writeBoolean(null != mockInstance); + if (null != mockInstance) { + out.writeBoolean(mockInstance); + } + + out.writeBoolean(null != maxVersions); + if (null != maxVersions) { + out.writeInt(getMaxVersions()); + } + + out.writeBoolean(null != rowRegex); + if (null != rowRegex) { + out.writeUTF(rowRegex); + } + + out.writeBoolean(null != colfamRegex); + if (null != colfamRegex) { + out.writeUTF(colfamRegex); + } + + out.writeBoolean(null != colqualRegex); + if (null != colqualRegex) { + out.writeUTF(colqualRegex); + } + + out.writeBoolean(null != valueRegex); + if (null != valueRegex) { + out.writeUTF(valueRegex); + } + + out.writeBoolean(null != fetchedColumns); + if (null != fetchedColumns) { + String[] cols = InputFormatBase.serializeColumns(fetchedColumns); + out.writeInt(cols.length); + for (String col : cols) { + out.writeUTF(col); + } + } + + out.writeBoolean(null != auths); + if (null != auths) { + out.writeUTF(auths.serialize()); + } + + out.writeBoolean(null != username); + if (null != username) { + out.writeUTF(username); + } + + out.writeBoolean(null != password); + if (null != password) { + out.writeUTF(new String(password)); + } + + out.writeBoolean(null != instanceName); + if (null != instanceName) { + out.writeUTF(instanceName); + } + + out.writeBoolean(null != zooKeepers); + if (null != zooKeepers) { + out.writeUTF(zooKeepers); + } + + out.writeBoolean(null != level); + if (null != level) { + out.writeInt(level.toInt()); + } + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(128); + sb.append("Range: ").append(range); + sb.append(" Locations: ").append(locations); + sb.append(" Table: ").append(table); + // TODO finish building of string + return sb.toString(); + } + + public String getTable() { + return table; + } + + public void setTable(String table) { + this.table = table; + } + + public Instance getInstance() { + if (null == instanceName) { + return null; + } + + if (isMockInstance()) { + return new MockInstance(getInstanceName()); + } + + if (null == zooKeepers) { + return null; + } + + return new ZooKeeperInstance(getInstanceName(), getZooKeepers()); + } + + public String getInstanceName() { + return instanceName; + } + + public void setInstanceName(String instanceName) { + this.instanceName = instanceName; + } + + public String getZooKeepers() { + return zooKeepers; + } + + public void setZooKeepers(String zooKeepers) { + this.zooKeepers = zooKeepers; + } + + public String getUsername() { + return username; + } + + public void setUsername(String username) { + this.username = username; + } + + public byte[] getPassword() { + return password; + } + + public void setPassword(byte[] password) { + this.password = password; + } + + public Boolean isOffline() { + return offline; + } + + public void setOffline(Boolean offline) { + this.offline = offline; + } + + public void setLocations(String[] locations) { + this.locations = locations; + } + + public String getRowRegex() { + return rowRegex; + } + + public void setRowRegex(String rowRegex) { + this.rowRegex = rowRegex; + } + + public String getColfamRegex() { + return colfamRegex; + } + + public void setColfamRegex(String colfamRegex) { + this.colfamRegex = colfamRegex; + } + + public String getColqualRegex() { + return colqualRegex; + } + + public void setColqualRegex(String colqualRegex) { + this.colqualRegex = colqualRegex; + } + + public String getValueRegex() { + return valueRegex; + } + + public void setValueRegex(String valueRegex) { + this.valueRegex = valueRegex; + } + + public Boolean isMockInstance() { + return mockInstance; + } + + public void setMockInstance(Boolean mockInstance) { + this.mockInstance = mockInstance; + } + + public Boolean isIsolatedScan() { + return isolatedScan; + } + + public void setIsolatedScan(Boolean isolatedScan) { + this.isolatedScan = isolatedScan; + } + + public Integer getMaxVersions() { + return maxVersions; + } + + public void setMaxVersions(Integer maxVersions) { + this.maxVersions = maxVersions; + } + + public Authorizations getAuths() { + return auths; + } + + public void setAuths(Authorizations auths) { + this.auths = auths; + } + + public void setRange(Range range) { + this.range = range; + } + + public Boolean usesLocalIterators() { + return localIterators; + } + + public void setUsesLocalIterators(Boolean localIterators) { + this.localIterators = localIterators; + } + + public Set> getFetchedColumns() { + return fetchedColumns; + } + + public void setFetchedColumns(Set> fetchedColumns) { + this.fetchedColumns = fetchedColumns; + } + + public List getIterators() { + return iterators; + } + + public void setIterators(List iterators) { + this.iterators = iterators; + } + + public List getOptions() { + return options; + } + + public void setOptions(List options) { + this.options = options; + } + + public Level getLogLevel() { + return level; + } + + public void setLogLevel(Level level) { + this.level = level; + } +}