Return-Path: X-Original-To: apmail-accumulo-commits-archive@www.apache.org Delivered-To: apmail-accumulo-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id EBC52113F3 for ; Fri, 25 Apr 2014 23:12:50 +0000 (UTC) Received: (qmail 19065 invoked by uid 500); 25 Apr 2014 23:12:35 -0000 Delivered-To: apmail-accumulo-commits-archive@accumulo.apache.org Received: (qmail 18914 invoked by uid 500); 25 Apr 2014 23:12:32 -0000 Mailing-List: contact commits-help@accumulo.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@accumulo.apache.org Delivered-To: mailing list commits@accumulo.apache.org Received: (qmail 18873 invoked by uid 99); 25 Apr 2014 23:12:31 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 25 Apr 2014 23:12:31 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 1E63E952874; Fri, 25 Apr 2014 23:12:31 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: busbey@apache.org To: commits@accumulo.apache.org Date: Fri, 25 Apr 2014 23:12:38 -0000 Message-Id: In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [09/17] Merge branch '1.6.0-SNAPSHOT' http://git-wip-us.apache.org/repos/asf/accumulo/blob/592c8b06/mapreduce/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/impl/InputConfigurator.java ---------------------------------------------------------------------- diff --cc mapreduce/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/impl/InputConfigurator.java index 7657c3c,0000000..69bbef2 mode 100644,000000..100644 --- a/mapreduce/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/impl/InputConfigurator.java +++ b/mapreduce/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/impl/InputConfigurator.java @@@ -1,796 -1,0 +1,796 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.core.client.mapreduce.lib.impl; + +import static com.google.common.base.Preconditions.checkArgument; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.StringTokenizer; + +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.ClientSideIteratorScanner; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.Instance; +import org.apache.accumulo.core.client.IsolatedScanner; +import org.apache.accumulo.core.client.IteratorSetting; +import org.apache.accumulo.core.client.RowIterator; +import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.client.TableNotFoundException; +import org.apache.accumulo.core.client.impl.Tables; +import org.apache.accumulo.core.client.impl.TabletLocator; +import org.apache.accumulo.core.client.mapreduce.InputTableConfig; - import org.apache.accumulo.core.client.mock.MockTabletLocator; ++import org.apache.accumulo.core.client.mock.impl.MockTabletLocator; +import org.apache.accumulo.core.client.security.tokens.AuthenticationToken; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.KeyExtent; +import org.apache.accumulo.core.data.PartialKey; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.iterators.SortedKeyValueIterator; +import org.apache.accumulo.core.master.state.tables.TableState; +import org.apache.accumulo.core.metadata.MetadataTable; +import org.apache.accumulo.core.metadata.schema.MetadataSchema; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.core.security.TablePermission; +import org.apache.accumulo.core.util.Pair; +import org.apache.accumulo.core.util.TextUtil; +import org.apache.commons.codec.binary.Base64; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.MapWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.util.StringUtils; + +import com.google.common.collect.Maps; + +/** + * @since 1.6.0 + */ +public class InputConfigurator extends ConfiguratorBase { + + /** + * Configuration keys for {@link Scanner}. + * + * @since 1.6.0 + */ + public static enum ScanOpts { + TABLE_NAME, AUTHORIZATIONS, RANGES, COLUMNS, ITERATORS, TABLE_CONFIGS + } + + /** + * Configuration keys for various features. + * + * @since 1.6.0 + */ + public static enum Features { + AUTO_ADJUST_RANGES, SCAN_ISOLATION, USE_LOCAL_ITERATORS, SCAN_OFFLINE + } + + /** + * Sets the name of the input table, over which this job will scan. + * + * @param implementingClass + * the class whose name will be used as a prefix for the property configuration key + * @param conf + * the Hadoop configuration object to configure + * @param tableName + * the table to use when the tablename is null in the write call + * @since 1.6.0 + */ + public static void setInputTableName(Class implementingClass, Configuration conf, String tableName) { + checkArgument(tableName != null, "tableName is null"); + conf.set(enumToConfKey(implementingClass, ScanOpts.TABLE_NAME), tableName); + } + + /** + * Sets the name of the input table, over which this job will scan. + * + * @param implementingClass + * the class whose name will be used as a prefix for the property configuration key + * @param conf + * the Hadoop configuration object to configure + * @since 1.6.0 + */ + public static String getInputTableName(Class implementingClass, Configuration conf) { + return conf.get(enumToConfKey(implementingClass, ScanOpts.TABLE_NAME)); + } + + /** + * Sets the {@link Authorizations} used to scan. Must be a subset of the user's authorization. Defaults to the empty set. + * + * @param implementingClass + * the class whose name will be used as a prefix for the property configuration key + * @param conf + * the Hadoop configuration object to configure + * @param auths + * the user's authorizations + * @since 1.6.0 + */ + public static void setScanAuthorizations(Class implementingClass, Configuration conf, Authorizations auths) { + if (auths != null && !auths.isEmpty()) + conf.set(enumToConfKey(implementingClass, ScanOpts.AUTHORIZATIONS), auths.serialize()); + } + + /** + * Gets the authorizations to set for the scans from the configuration. + * + * @param implementingClass + * the class whose name will be used as a prefix for the property configuration key + * @param conf + * the Hadoop configuration object to configure + * @return the Accumulo scan authorizations + * @since 1.6.0 + * @see #setScanAuthorizations(Class, Configuration, Authorizations) + */ + public static Authorizations getScanAuthorizations(Class implementingClass, Configuration conf) { + String authString = conf.get(enumToConfKey(implementingClass, ScanOpts.AUTHORIZATIONS)); + return authString == null ? Authorizations.EMPTY : new Authorizations(authString.getBytes(StandardCharsets.UTF_8)); + } + + /** + * Sets the input ranges to scan on all input tables for this job. If not set, the entire table will be scanned. + * + * @param implementingClass + * the class whose name will be used as a prefix for the property configuration key + * @param conf + * the Hadoop configuration object to configure + * @param ranges + * the ranges that will be mapped over + * @throws IllegalArgumentException + * if the ranges cannot be encoded into base 64 + * @since 1.6.0 + */ + public static void setRanges(Class implementingClass, Configuration conf, Collection ranges) { + checkArgument(ranges != null, "ranges is null"); + + ArrayList rangeStrings = new ArrayList(ranges.size()); + try { + for (Range r : ranges) { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + r.write(new DataOutputStream(baos)); + rangeStrings.add(new String(Base64.encodeBase64(baos.toByteArray()))); + } + conf.setStrings(enumToConfKey(implementingClass, ScanOpts.RANGES), rangeStrings.toArray(new String[0])); + } catch (IOException ex) { + throw new IllegalArgumentException("Unable to encode ranges to Base64", ex); + } + } + + /** + * Gets the ranges to scan over from a job. + * + * @param implementingClass + * the class whose name will be used as a prefix for the property configuration key + * @param conf + * the Hadoop configuration object to configure + * @return the ranges + * @throws IOException + * if the ranges have been encoded improperly + * @since 1.6.0 + * @see #setRanges(Class, Configuration, Collection) + */ + public static List getRanges(Class implementingClass, Configuration conf) throws IOException { + + Collection encodedRanges = conf.getStringCollection(enumToConfKey(implementingClass, ScanOpts.RANGES)); + List ranges = new ArrayList(); + for (String rangeString : encodedRanges) { + ByteArrayInputStream bais = new ByteArrayInputStream(Base64.decodeBase64(rangeString.getBytes())); + Range range = new Range(); + range.readFields(new DataInputStream(bais)); + ranges.add(range); + } + return ranges; + } + + /** + * Gets a list of the iterator settings (for iterators to apply to a scanner) from this configuration. + * + * @param implementingClass + * the class whose name will be used as a prefix for the property configuration key + * @param conf + * the Hadoop configuration object to configure + * @return a list of iterators + * @since 1.6.0 + * @see #addIterator(Class, Configuration, IteratorSetting) + */ + public static List getIterators(Class implementingClass, Configuration conf) { + String iterators = conf.get(enumToConfKey(implementingClass, ScanOpts.ITERATORS)); + + // If no iterators are present, return an empty list + if (iterators == null || iterators.isEmpty()) + return new ArrayList(); + + // Compose the set of iterators encoded in the job configuration + StringTokenizer tokens = new StringTokenizer(iterators, StringUtils.COMMA_STR); + List list = new ArrayList(); + try { + while (tokens.hasMoreTokens()) { + String itstring = tokens.nextToken(); + ByteArrayInputStream bais = new ByteArrayInputStream(Base64.decodeBase64(itstring.getBytes())); + list.add(new IteratorSetting(new DataInputStream(bais))); + bais.close(); + } + } catch (IOException e) { + throw new IllegalArgumentException("couldn't decode iterator settings"); + } + return list; + } + + /** + * Restricts the columns that will be mapped over for the single input table on this job. + * + * @param implementingClass + * the class whose name will be used as a prefix for the property configuration key + * @param conf + * the Hadoop configuration object to configure + * @param columnFamilyColumnQualifierPairs + * a pair of {@link Text} objects corresponding to column family and column qualifier. If the column qualifier is null, the entire column family is + * selected. An empty set is the default and is equivalent to scanning the all columns. + * @throws IllegalArgumentException + * if the column family is null + * @since 1.6.0 + */ + public static void fetchColumns(Class implementingClass, Configuration conf, Collection> columnFamilyColumnQualifierPairs) { + checkArgument(columnFamilyColumnQualifierPairs != null, "columnFamilyColumnQualifierPairs is null"); + String[] columnStrings = serializeColumns(columnFamilyColumnQualifierPairs); + conf.setStrings(enumToConfKey(implementingClass, ScanOpts.COLUMNS), columnStrings); + } + + public static String[] serializeColumns(Collection> columnFamilyColumnQualifierPairs) { + checkArgument(columnFamilyColumnQualifierPairs != null, "columnFamilyColumnQualifierPairs is null"); + ArrayList columnStrings = new ArrayList(columnFamilyColumnQualifierPairs.size()); + for (Pair column : columnFamilyColumnQualifierPairs) { + + if (column.getFirst() == null) + throw new IllegalArgumentException("Column family can not be null"); + + String col = new String(Base64.encodeBase64(TextUtil.getBytes(column.getFirst())), StandardCharsets.UTF_8); + if (column.getSecond() != null) + col += ":" + new String(Base64.encodeBase64(TextUtil.getBytes(column.getSecond())), StandardCharsets.UTF_8); + columnStrings.add(col); + } + + return columnStrings.toArray(new String[0]); + } + + /** + * Gets the columns to be mapped over from this job. + * + * @param implementingClass + * the class whose name will be used as a prefix for the property configuration key + * @param conf + * the Hadoop configuration object to configure + * @return a set of columns + * @since 1.6.0 + * @see #fetchColumns(Class, Configuration, Collection) + */ + public static Set> getFetchedColumns(Class implementingClass, Configuration conf) { + checkArgument(conf != null, "conf is null"); + String confValue = conf.get(enumToConfKey(implementingClass, ScanOpts.COLUMNS)); + List serialized = new ArrayList(); + if (confValue != null) { + // Split and include any trailing empty strings to allow empty column families + for (String val : confValue.split(",", -1)) { + serialized.add(val); + } + } + return deserializeFetchedColumns(serialized); + } + + public static Set> deserializeFetchedColumns(Collection serialized) { + Set> columns = new HashSet>(); + + if (null == serialized) { + return columns; + } + + for (String col : serialized) { + int idx = col.indexOf(":"); + Text cf = new Text(idx < 0 ? Base64.decodeBase64(col.getBytes(StandardCharsets.UTF_8)) : Base64.decodeBase64(col.substring(0, idx).getBytes( + StandardCharsets.UTF_8))); + Text cq = idx < 0 ? null : new Text(Base64.decodeBase64(col.substring(idx + 1).getBytes(StandardCharsets.UTF_8))); + columns.add(new Pair(cf, cq)); + } + return columns; + } + + /** + * Encode an iterator on the input for the single input table associated with this job. + * + * @param implementingClass + * the class whose name will be used as a prefix for the property configuration key + * @param conf + * the Hadoop configuration object to configure + * @param cfg + * the configuration of the iterator + * @throws IllegalArgumentException + * if the iterator can't be serialized into the configuration + * @since 1.6.0 + */ + public static void addIterator(Class implementingClass, Configuration conf, IteratorSetting cfg) { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + String newIter; + try { + cfg.write(new DataOutputStream(baos)); + newIter = new String(Base64.encodeBase64(baos.toByteArray()), StandardCharsets.UTF_8); + baos.close(); + } catch (IOException e) { + throw new IllegalArgumentException("unable to serialize IteratorSetting"); + } + + String confKey = enumToConfKey(implementingClass, ScanOpts.ITERATORS); + String iterators = conf.get(confKey); + // No iterators specified yet, create a new string + if (iterators == null || iterators.isEmpty()) { + iterators = newIter; + } else { + // append the next iterator & reset + iterators = iterators.concat(StringUtils.COMMA_STR + newIter); + } + // Store the iterators w/ the job + conf.set(confKey, iterators); + } + + /** + * Controls the automatic adjustment of ranges for this job. This feature merges overlapping ranges, then splits them to align with tablet boundaries. + * Disabling this feature will cause exactly one Map task to be created for each specified range. The default setting is enabled. * + * + *

+ * By default, this feature is enabled. + * + * @param implementingClass + * the class whose name will be used as a prefix for the property configuration key + * @param conf + * the Hadoop configuration object to configure + * @param enableFeature + * the feature is enabled if true, disabled otherwise + * @see #setRanges(Class, Configuration, Collection) + * @since 1.6.0 + */ + public static void setAutoAdjustRanges(Class implementingClass, Configuration conf, boolean enableFeature) { + conf.setBoolean(enumToConfKey(implementingClass, Features.AUTO_ADJUST_RANGES), enableFeature); + } + + /** + * Determines whether a configuration has auto-adjust ranges enabled. + * + * @param implementingClass + * the class whose name will be used as a prefix for the property configuration key + * @param conf + * the Hadoop configuration object to configure + * @return false if the feature is disabled, true otherwise + * @since 1.6.0 + * @see #setAutoAdjustRanges(Class, Configuration, boolean) + */ + public static Boolean getAutoAdjustRanges(Class implementingClass, Configuration conf) { + return conf.getBoolean(enumToConfKey(implementingClass, Features.AUTO_ADJUST_RANGES), true); + } + + /** + * Controls the use of the {@link IsolatedScanner} in this job. + * + *

+ * By default, this feature is disabled. + * + * @param implementingClass + * the class whose name will be used as a prefix for the property configuration key + * @param conf + * the Hadoop configuration object to configure + * @param enableFeature + * the feature is enabled if true, disabled otherwise + * @since 1.6.0 + */ + public static void setScanIsolation(Class implementingClass, Configuration conf, boolean enableFeature) { + conf.setBoolean(enumToConfKey(implementingClass, Features.SCAN_ISOLATION), enableFeature); + } + + /** + * Determines whether a configuration has isolation enabled. + * + * @param implementingClass + * the class whose name will be used as a prefix for the property configuration key + * @param conf + * the Hadoop configuration object to configure + * @return true if the feature is enabled, false otherwise + * @since 1.6.0 + * @see #setScanIsolation(Class, Configuration, boolean) + */ + public static Boolean isIsolated(Class implementingClass, Configuration conf) { + return conf.getBoolean(enumToConfKey(implementingClass, Features.SCAN_ISOLATION), false); + } + + /** + * Controls the use of the {@link ClientSideIteratorScanner} in this job. Enabling this feature will cause the iterator stack to be constructed within the Map + * task, rather than within the Accumulo TServer. To use this feature, all classes needed for those iterators must be available on the classpath for the task. + * + *

+ * By default, this feature is disabled. + * + * @param implementingClass + * the class whose name will be used as a prefix for the property configuration key + * @param conf + * the Hadoop configuration object to configure + * @param enableFeature + * the feature is enabled if true, disabled otherwise + * @since 1.6.0 + */ + public static void setLocalIterators(Class implementingClass, Configuration conf, boolean enableFeature) { + conf.setBoolean(enumToConfKey(implementingClass, Features.USE_LOCAL_ITERATORS), enableFeature); + } + + /** + * Determines whether a configuration uses local iterators. + * + * @param implementingClass + * the class whose name will be used as a prefix for the property configuration key + * @param conf + * the Hadoop configuration object to configure + * @return true if the feature is enabled, false otherwise + * @since 1.6.0 + * @see #setLocalIterators(Class, Configuration, boolean) + */ + public static Boolean usesLocalIterators(Class implementingClass, Configuration conf) { + return conf.getBoolean(enumToConfKey(implementingClass, Features.USE_LOCAL_ITERATORS), false); + } + + /** + *

+ * Enable reading offline tables. By default, this feature is disabled and only online tables are scanned. This will make the map reduce job directly read the + * table's files. If the table is not offline, then the job will fail. If the table comes online during the map reduce job, it is likely that the job will + * fail. + * + *

+ * To use this option, the map reduce user will need access to read the Accumulo directory in HDFS. + * + *

+ * Reading the offline table will create the scan time iterator stack in the map process. So any iterators that are configured for the table will need to be + * on the mapper's classpath. + * + *

+ * One way to use this feature is to clone a table, take the clone offline, and use the clone as the input table for a map reduce job. If you plan to map + * reduce over the data many times, it may be better to the compact the table, clone it, take it offline, and use the clone for all map reduce jobs. The + * reason to do this is that compaction will reduce each tablet in the table to one file, and it is faster to read from one file. + * + *

+ * There are two possible advantages to reading a tables file directly out of HDFS. First, you may see better read performance. Second, it will support + * speculative execution better. When reading an online table speculative execution can put more load on an already slow tablet server. + * + *

+ * By default, this feature is disabled. + * + * @param implementingClass + * the class whose name will be used as a prefix for the property configuration key + * @param conf + * the Hadoop configuration object to configure + * @param enableFeature + * the feature is enabled if true, disabled otherwise + * @since 1.6.0 + */ + public static void setOfflineTableScan(Class implementingClass, Configuration conf, boolean enableFeature) { + conf.setBoolean(enumToConfKey(implementingClass, Features.SCAN_OFFLINE), enableFeature); + } + + /** + * Determines whether a configuration has the offline table scan feature enabled. + * + * @param implementingClass + * the class whose name will be used as a prefix for the property configuration key + * @param conf + * the Hadoop configuration object to configure + * @return true if the feature is enabled, false otherwise + * @since 1.6.0 + * @see #setOfflineTableScan(Class, Configuration, boolean) + */ + public static Boolean isOfflineScan(Class implementingClass, Configuration conf) { + return conf.getBoolean(enumToConfKey(implementingClass, Features.SCAN_OFFLINE), false); + } + + /** + * Sets configurations for multiple tables at a time. + * + * @param implementingClass + * the class whose name will be used as a prefix for the property configuration key + * @param conf + * the Hadoop configuration object to configure + * @param configs + * an array of {@link InputTableConfig} objects to associate with the job + * @since 1.6.0 + */ + public static void setInputTableConfigs(Class implementingClass, Configuration conf, Map configs) { + MapWritable mapWritable = new MapWritable(); + for (Map.Entry tableConfig : configs.entrySet()) + mapWritable.put(new Text(tableConfig.getKey()), tableConfig.getValue()); + + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + try { + mapWritable.write(new DataOutputStream(baos)); + } catch (IOException e) { + throw new IllegalStateException("Table configuration could not be serialized."); + } + + String confKey = enumToConfKey(implementingClass, ScanOpts.TABLE_CONFIGS); + conf.set(confKey, new String(Base64.encodeBase64(baos.toByteArray()))); + } + + /** + * Returns all {@link InputTableConfig} objects associated with this job. + * + * @param implementingClass + * the class whose name will be used as a prefix for the property configuration key + * @param conf + * the Hadoop configuration object to configure + * @return all of the table query configs for the job + * @since 1.6.0 + */ + public static Map getInputTableConfigs(Class implementingClass, Configuration conf) { + Map configs = new HashMap(); + Map.Entry defaultConfig = getDefaultInputTableConfig(implementingClass, conf); + if (defaultConfig != null) + configs.put(defaultConfig.getKey(), defaultConfig.getValue()); + String configString = conf.get(enumToConfKey(implementingClass, ScanOpts.TABLE_CONFIGS)); + MapWritable mapWritable = new MapWritable(); + if (configString != null) { + try { + byte[] bytes = Base64.decodeBase64(configString.getBytes()); + ByteArrayInputStream bais = new ByteArrayInputStream(bytes); + mapWritable.readFields(new DataInputStream(bais)); + bais.close(); + } catch (IOException e) { + throw new IllegalStateException("The table query configurations could not be deserialized from the given configuration"); + } + } + for (Map.Entry entry : mapWritable.entrySet()) + configs.put(((Text) entry.getKey()).toString(), (InputTableConfig) entry.getValue()); + + return configs; + } + + /** + * Returns the {@link InputTableConfig} for the given table + * + * @param implementingClass + * the class whose name will be used as a prefix for the property configuration key + * @param conf + * the Hadoop configuration object to configure + * @param tableName + * the table name for which to fetch the table query config + * @return the table query config for the given table name (if it exists) and null if it does not + * @since 1.6.0 + */ + public static InputTableConfig getInputTableConfig(Class implementingClass, Configuration conf, String tableName) { + Map queryConfigs = getInputTableConfigs(implementingClass, conf); + return queryConfigs.get(tableName); + } + + /** + * Initializes an Accumulo {@link TabletLocator} based on the configuration. + * + * @param implementingClass + * the class whose name will be used as a prefix for the property configuration key + * @param conf + * the Hadoop configuration object to configure + * @param tableId + * The table id for which to initialize the {@link TabletLocator} + * @return an Accumulo tablet locator + * @throws TableNotFoundException + * if the table name set on the configuration doesn't exist + * @since 1.6.0 + */ + public static TabletLocator getTabletLocator(Class implementingClass, Configuration conf, String tableId) throws TableNotFoundException { + String instanceType = conf.get(enumToConfKey(implementingClass, InstanceOpts.TYPE)); + if ("MockInstance".equals(instanceType)) + return new MockTabletLocator(); + Instance instance = getInstance(implementingClass, conf); + return TabletLocator.getLocator(instance, new Text(tableId)); + } + + // InputFormat doesn't have the equivalent of OutputFormat's checkOutputSpecs(JobContext job) + /** + * Check whether a configuration is fully configured to be used with an Accumulo {@link org.apache.hadoop.mapreduce.InputFormat}. + * + * @param implementingClass + * the class whose name will be used as a prefix for the property configuration key + * @param conf + * the Hadoop configuration object to configure + * @throws IOException + * if the context is improperly configured + * @since 1.6.0 + */ + public static void validateOptions(Class implementingClass, Configuration conf) throws IOException { + + Map inputTableConfigs = getInputTableConfigs(implementingClass, conf); + if (!isConnectorInfoSet(implementingClass, conf)) + throw new IOException("Input info has not been set."); + String instanceKey = conf.get(enumToConfKey(implementingClass, InstanceOpts.TYPE)); + if (!"MockInstance".equals(instanceKey) && !"ZooKeeperInstance".equals(instanceKey)) + throw new IOException("Instance info has not been set."); + // validate that we can connect as configured + try { + String principal = getPrincipal(implementingClass, conf); + AuthenticationToken token = getAuthenticationToken(implementingClass, conf); + Connector c = getInstance(implementingClass, conf).getConnector(principal, token); + if (!c.securityOperations().authenticateUser(principal, token)) + throw new IOException("Unable to authenticate user"); + + if (getInputTableConfigs(implementingClass, conf).size() == 0) + throw new IOException("No table set."); + + for (Map.Entry tableConfig : inputTableConfigs.entrySet()) { + if (!c.securityOperations().hasTablePermission(getPrincipal(implementingClass, conf), tableConfig.getKey(), TablePermission.READ)) + throw new IOException("Unable to access table"); + } + for (Map.Entry tableConfigEntry : inputTableConfigs.entrySet()) { + InputTableConfig tableConfig = tableConfigEntry.getValue(); + if (!tableConfig.shouldUseLocalIterators()) { + if (tableConfig.getIterators() != null) { + for (IteratorSetting iter : tableConfig.getIterators()) { + if (!c.tableOperations().testClassLoad(tableConfigEntry.getKey(), iter.getIteratorClass(), SortedKeyValueIterator.class.getName())) + throw new AccumuloException("Servers are unable to load " + iter.getIteratorClass() + " as a " + SortedKeyValueIterator.class.getName()); + } + } + } + } + } catch (AccumuloException e) { + throw new IOException(e); + } catch (AccumuloSecurityException e) { + throw new IOException(e); + } catch (TableNotFoundException e) { + throw new IOException(e); + } + } + + /** + * Returns the {@link org.apache.accumulo.core.client.mapreduce.InputTableConfig} for the configuration based on the properties set using the single-table + * input methods. + * + * @param implementingClass + * the class whose name will be used as a prefix for the property configuration key + * @param conf + * the Hadoop instance for which to retrieve the configuration + * @return the config object built from the single input table properties set on the job + * @since 1.6.0 + */ + protected static Map.Entry getDefaultInputTableConfig(Class implementingClass, Configuration conf) { + String tableName = getInputTableName(implementingClass, conf); + if (tableName != null) { + InputTableConfig queryConfig = new InputTableConfig(); + List itrs = getIterators(implementingClass, conf); + if (itrs != null) + queryConfig.setIterators(itrs); + Set> columns = getFetchedColumns(implementingClass, conf); + if (columns != null) + queryConfig.fetchColumns(columns); + List ranges = null; + try { + ranges = getRanges(implementingClass, conf); + } catch (IOException e) { + throw new RuntimeException(e); + } + if (ranges != null) + queryConfig.setRanges(ranges); + + queryConfig.setAutoAdjustRanges(getAutoAdjustRanges(implementingClass, conf)).setUseIsolatedScanners(isIsolated(implementingClass, conf)) + .setUseLocalIterators(usesLocalIterators(implementingClass, conf)).setOfflineScan(isOfflineScan(implementingClass, conf)); + return Maps.immutableEntry(tableName, queryConfig); + } + return null; + } + + public static Map>> binOffline(String tableId, List ranges, Instance instance, Connector conn) + throws AccumuloException, TableNotFoundException { + Map>> binnedRanges = new HashMap>>(); + + if (Tables.getTableState(instance, tableId) != TableState.OFFLINE) { + Tables.clearCache(instance); + if (Tables.getTableState(instance, tableId) != TableState.OFFLINE) { + throw new AccumuloException("Table is online tableId:" + tableId + " cannot scan table in offline mode "); + } + } + + for (Range range : ranges) { + Text startRow; + + if (range.getStartKey() != null) + startRow = range.getStartKey().getRow(); + else + startRow = new Text(); + + Range metadataRange = new Range(new KeyExtent(new Text(tableId), startRow, null).getMetadataEntry(), true, null, false); + Scanner scanner = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY); + MetadataSchema.TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN.fetch(scanner); + scanner.fetchColumnFamily(MetadataSchema.TabletsSection.LastLocationColumnFamily.NAME); + scanner.fetchColumnFamily(MetadataSchema.TabletsSection.CurrentLocationColumnFamily.NAME); + scanner.fetchColumnFamily(MetadataSchema.TabletsSection.FutureLocationColumnFamily.NAME); + scanner.setRange(metadataRange); + + RowIterator rowIter = new RowIterator(scanner); + KeyExtent lastExtent = null; + while (rowIter.hasNext()) { + Iterator> row = rowIter.next(); + String last = ""; + KeyExtent extent = null; + String location = null; + + while (row.hasNext()) { + Map.Entry entry = row.next(); + Key key = entry.getKey(); + + if (key.getColumnFamily().equals(MetadataSchema.TabletsSection.LastLocationColumnFamily.NAME)) { + last = entry.getValue().toString(); + } + + if (key.getColumnFamily().equals(MetadataSchema.TabletsSection.CurrentLocationColumnFamily.NAME) + || key.getColumnFamily().equals(MetadataSchema.TabletsSection.FutureLocationColumnFamily.NAME)) { + location = entry.getValue().toString(); + } + + if (MetadataSchema.TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN.hasColumns(key)) { + extent = new KeyExtent(key.getRow(), entry.getValue()); + } + + } + + if (location != null) + return null; + + if (!extent.getTableId().toString().equals(tableId)) { + throw new AccumuloException("Saw unexpected table Id " + tableId + " " + extent); + } + + if (lastExtent != null && !extent.isPreviousExtent(lastExtent)) { + throw new AccumuloException(" " + lastExtent + " is not previous extent " + extent); + } + + Map> tabletRanges = binnedRanges.get(last); + if (tabletRanges == null) { + tabletRanges = new HashMap>(); + binnedRanges.put(last, tabletRanges); + } + + List rangeList = tabletRanges.get(extent); + if (rangeList == null) { + rangeList = new ArrayList(); + tabletRanges.put(extent, rangeList); + } + + rangeList.add(range); + + if (extent.getEndRow() == null || range.afterEndKey(new Key(extent.getEndRow()).followingKey(PartialKey.ROW))) { + break; + } + + lastExtent = extent; + } + + } + return binnedRanges; + } +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/592c8b06/server/base/src/main/java/org/apache/accumulo/server/security/SecurityOperation.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/592c8b06/server/master/src/main/java/org/apache/accumulo/master/FateServiceHandler.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/592c8b06/server/master/src/main/java/org/apache/accumulo/master/tableOps/ImportTable.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/592c8b06/shell/src/main/java/org/apache/accumulo/shell/mock/MockShell.java ---------------------------------------------------------------------- diff --cc shell/src/main/java/org/apache/accumulo/shell/mock/MockShell.java index 151648a,0000000..38bb501 mode 100644,000000..100644 --- a/shell/src/main/java/org/apache/accumulo/shell/mock/MockShell.java +++ b/shell/src/main/java/org/apache/accumulo/shell/mock/MockShell.java @@@ -1,143 -1,0 +1,182 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.shell.mock; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.charset.StandardCharsets; ++import java.io.Writer; + ++import org.apache.commons.io.output.WriterOutputStream; ++ ++import org.apache.commons.cli.CommandLine; +import jline.console.ConsoleReader; + +import org.apache.accumulo.core.client.mock.MockInstance; +import org.apache.accumulo.shell.Shell; +import org.apache.accumulo.shell.ShellOptionsJC; + +/** + * An Accumulo Shell implementation that allows a developer to attach an InputStream and Writer to the Shell for testing purposes. + */ +public class MockShell extends Shell { + private static final String NEWLINE = "\n"; + + protected InputStream in; + protected OutputStream out; - ++ ++ /** ++ * Will only be set if you use either the Writer constructor or the setWriter(Writer) method ++ * @deprecated since 1.6.0; use out ++ */ ++ @Deprecated ++ protected Writer writer = null; ++ + public MockShell(InputStream in, OutputStream out) throws IOException { + super(); + this.in = in; + this.out = out; ++ // we presume they don't use the writer field unless they use the other constructor. ++ } ++ ++ /** ++ * @deprecated since 1.6.0; use OutputStream version ++ */ ++ @Deprecated ++ public MockShell(InputStream in, Writer out) throws IOException { ++ this(in, new WriterOutputStream(out, Constants.UTF8.name())); ++ this.writer = out; + } + + public boolean config(String... args) { + configError = super.config(args); + + // Update the ConsoleReader with the input and output "redirected" + try { + this.reader = new ConsoleReader(in, out); + } catch (Exception e) { + printException(e); + configError = true; + } + + // Don't need this for testing purposes + this.reader.setHistoryEnabled(false); + this.reader.setPaginationEnabled(false); + + // Make the parsing from the client easier; + this.verbose = false; + return configError; + } + + @Override + protected void setInstance(ShellOptionsJC options) { + // We always want a MockInstance for this test + instance = new MockInstance(); + } ++ ++ /** ++ * @deprecated since 1.6.0; use ShellOptionsJC version ++ */ ++ @Deprecated ++ protected void setInstance(CommandLine cl) { ++ // same result as in previous version ++ setInstance((ShellOptionsJC)null); ++ } + + public int start() throws IOException { + if (configError) + return 1; + + String input; + if (isVerbose()) + printInfo(); + + if (execFile != null) { + java.util.Scanner scanner = new java.util.Scanner(execFile, StandardCharsets.UTF_8.name()); + try { + while (scanner.hasNextLine() && !hasExited()) { + execCommand(scanner.nextLine(), true, isVerbose()); + } + } finally { + scanner.close(); + } + } else if (execCommand != null) { + for (String command : execCommand.split("\n")) { + execCommand(command, true, isVerbose()); + } + return exitCode; + } + + while (true) { + if (hasExited()) + return exitCode; + + reader.setPrompt(getDefaultPrompt()); + input = reader.readLine(); + if (input == null) { + reader.println(); + return exitCode; + } // user canceled + + execCommand(input, false, false); + } + } + + /** + * @param in + * the in to set + */ + public void setConsoleInputStream(InputStream in) { + this.in = in; + } + + /** + * @param out + * the output stream to set + */ + public void setConsoleWriter(OutputStream out) { + this.out = out; + } ++ ++ /** ++ * @deprecated since 1.6.0; use the OutputStream version ++ */ ++ @Deprecated ++ public void setConsoleWriter(Writer out) { ++ setConsoleWriter(new WriterOutputStream(out, Constants.UTF8.name())); ++ this.writer = out; ++ } + + /** + * Convenience method to create the byte-array to hand to the console + * + * @param commands + * An array of commands to run + * @return A byte[] input stream which can be handed to the console. + */ + public static ByteArrayInputStream makeCommands(String... commands) { + StringBuilder sb = new StringBuilder(commands.length * 8); + + for (String command : commands) { + sb.append(command).append(NEWLINE); + } + + return new ByteArrayInputStream(sb.toString().getBytes(StandardCharsets.UTF_8)); + } +}