Return-Path: X-Original-To: apmail-accumulo-commits-archive@www.apache.org Delivered-To: apmail-accumulo-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id AADD8108DE for ; Sun, 24 Nov 2013 00:33:37 +0000 (UTC) Received: (qmail 92945 invoked by uid 500); 24 Nov 2013 00:33:33 -0000 Delivered-To: apmail-accumulo-commits-archive@accumulo.apache.org Received: (qmail 92601 invoked by uid 500); 24 Nov 2013 00:33:33 -0000 Mailing-List: contact commits-help@accumulo.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@accumulo.apache.org Delivered-To: mailing list commits@accumulo.apache.org Received: (qmail 92548 invoked by uid 99); 24 Nov 2013 00:33:32 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Sun, 24 Nov 2013 00:33:32 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 7CA60904043; Sun, 24 Nov 2013 00:33:32 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: elserj@apache.org To: commits@accumulo.apache.org Date: Sun, 24 Nov 2013 00:33:36 -0000 Message-Id: <99386907c85c41d7bc2daf1a54fccf48@git.apache.org> In-Reply-To: <449e651f9ade4e8fa564570029299af0@git.apache.org> References: <449e651f9ade4e8fa564570029299af0@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [05/31] WIP Merge branch 'ACCUMULO-1854-merge' into ACCUMULO-1854-1.5-merge http://git-wip-us.apache.org/repos/asf/accumulo/blob/16a2e0f0/core/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/util/InputConfigurator.java ---------------------------------------------------------------------- diff --cc core/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/util/InputConfigurator.java index c8731b1,0000000..ff14107 mode 100644,000000..100644 --- a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/util/InputConfigurator.java +++ b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/util/InputConfigurator.java @@@ -1,532 -1,0 +1,549 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.core.client.mapreduce.lib.util; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.StringTokenizer; + +import org.apache.accumulo.core.Constants; +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.ClientSideIteratorScanner; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.Instance; +import org.apache.accumulo.core.client.IsolatedScanner; +import org.apache.accumulo.core.client.IteratorSetting; +import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.client.TableNotFoundException; +import org.apache.accumulo.core.client.impl.Tables; +import org.apache.accumulo.core.client.impl.TabletLocator; +import org.apache.accumulo.core.client.mock.MockTabletLocator; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.iterators.SortedKeyValueIterator; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.core.security.CredentialHelper; +import org.apache.accumulo.core.security.TablePermission; +import org.apache.accumulo.core.util.ArgumentChecker; +import org.apache.accumulo.core.util.Pair; +import org.apache.accumulo.core.util.TextUtil; +import org.apache.commons.codec.binary.Base64; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.util.StringUtils; + +/** + * @since 1.5.0 + */ +public class InputConfigurator extends ConfiguratorBase { + + /** + * Configuration keys for {@link Scanner}. + * + * @since 1.5.0 + */ + public static enum ScanOpts { + TABLE_NAME, AUTHORIZATIONS, RANGES, COLUMNS, ITERATORS + } + + /** + * Configuration keys for various features. + * + * @since 1.5.0 + */ + public static enum Features { + AUTO_ADJUST_RANGES, SCAN_ISOLATION, USE_LOCAL_ITERATORS, SCAN_OFFLINE + } + + /** + * Sets the name of the input table, over which this job will scan. + * + * @param implementingClass + * the class whose name will be used as a prefix for the property configuration key + * @param conf + * the Hadoop configuration object to configure + * @param tableName + * the table to use when the tablename is null in the write call + * @since 1.5.0 + */ + public static void setInputTableName(Class implementingClass, Configuration conf, String tableName) { + ArgumentChecker.notNull(tableName); + conf.set(enumToConfKey(implementingClass, ScanOpts.TABLE_NAME), tableName); + } + + /** + * Gets the table name from the configuration. + * + * @param implementingClass + * the class whose name will be used as a prefix for the property configuration key + * @param conf + * the Hadoop configuration object to configure + * @return the table name + * @since 1.5.0 + * @see #setInputTableName(Class, Configuration, String) + */ + public static String getInputTableName(Class implementingClass, Configuration conf) { + return conf.get(enumToConfKey(implementingClass, ScanOpts.TABLE_NAME)); + } + + /** + * Sets the {@link Authorizations} used to scan. Must be a subset of the user's authorization. Defaults to the empty set. + * + * @param implementingClass + * the class whose name will be used as a prefix for the property configuration key + * @param conf + * the Hadoop configuration object to configure + * @param auths + * the user's authorizations + * @since 1.5.0 + */ + public static void setScanAuthorizations(Class implementingClass, Configuration conf, Authorizations auths) { + if (auths != null && !auths.isEmpty()) + conf.set(enumToConfKey(implementingClass, ScanOpts.AUTHORIZATIONS), auths.serialize()); + } + + /** + * Gets the authorizations to set for the scans from the configuration. + * + * @param implementingClass + * the class whose name will be used as a prefix for the property configuration key + * @param conf + * the Hadoop configuration object to configure + * @return the Accumulo scan authorizations + * @since 1.5.0 + * @see #setScanAuthorizations(Class, Configuration, Authorizations) + */ + public static Authorizations getScanAuthorizations(Class implementingClass, Configuration conf) { + String authString = conf.get(enumToConfKey(implementingClass, ScanOpts.AUTHORIZATIONS)); + return authString == null ? Constants.NO_AUTHS : new Authorizations(authString.getBytes()); + } + + /** + * Sets the input ranges to scan for this job. If not set, the entire table will be scanned. + * + * @param implementingClass + * the class whose name will be used as a prefix for the property configuration key + * @param conf + * the Hadoop configuration object to configure + * @param ranges + * the ranges that will be mapped over + * @since 1.5.0 + */ + public static void setRanges(Class implementingClass, Configuration conf, Collection ranges) { + ArgumentChecker.notNull(ranges); + ArrayList rangeStrings = new ArrayList(ranges.size()); + try { + for (Range r : ranges) { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + r.write(new DataOutputStream(baos)); + rangeStrings.add(new String(Base64.encodeBase64(baos.toByteArray()), Charset.forName("UTF-8"))); + } + } catch (IOException ex) { + throw new IllegalArgumentException("Unable to encode ranges to Base64", ex); + } + conf.setStrings(enumToConfKey(implementingClass, ScanOpts.RANGES), rangeStrings.toArray(new String[0])); + } + + /** + * Gets the ranges to scan over from a job. + * + * @param implementingClass + * the class whose name will be used as a prefix for the property configuration key + * @param conf + * the Hadoop configuration object to configure + * @return the ranges + * @throws IOException + * if the ranges have been encoded improperly + * @since 1.5.0 + * @see #setRanges(Class, Configuration, Collection) + */ + public static List getRanges(Class implementingClass, Configuration conf) throws IOException { + ArrayList ranges = new ArrayList(); + for (String rangeString : conf.getStringCollection(enumToConfKey(implementingClass, ScanOpts.RANGES))) { + ByteArrayInputStream bais = new ByteArrayInputStream(Base64.decodeBase64(rangeString.getBytes(Charset.forName("UTF-8")))); + Range range = new Range(); + range.readFields(new DataInputStream(bais)); + ranges.add(range); + } + return ranges; + } + + /** + * Restricts the columns that will be mapped over for this job. + * + * @param implementingClass + * the class whose name will be used as a prefix for the property configuration key + * @param conf + * the Hadoop configuration object to configure + * @param columnFamilyColumnQualifierPairs + * a pair of {@link Text} objects corresponding to column family and column qualifier. If the column qualifier is null, the entire column family is + * selected. An empty set is the default and is equivalent to scanning the all columns. + * @since 1.5.0 + */ + public static void fetchColumns(Class implementingClass, Configuration conf, Collection> columnFamilyColumnQualifierPairs) { + ArgumentChecker.notNull(columnFamilyColumnQualifierPairs); ++ String[] columnStrings = serializeColumns(columnFamilyColumnQualifierPairs); ++ conf.setStrings(enumToConfKey(implementingClass, ScanOpts.COLUMNS), columnStrings); ++ } ++ ++ public static String[] serializeColumns(Collection> columnFamilyColumnQualifierPairs) { ++ ArgumentChecker.notNull(columnFamilyColumnQualifierPairs); + ArrayList columnStrings = new ArrayList(columnFamilyColumnQualifierPairs.size()); + for (Pair column : columnFamilyColumnQualifierPairs) { + if (column.getFirst() == null) + throw new IllegalArgumentException("Column family can not be null"); - - String col = new String(Base64.encodeBase64(TextUtil.getBytes(column.getFirst())), Charset.forName("UTF-8")); ++ ++ String col = new String(Base64.encodeBase64(TextUtil.getBytes(column.getFirst()))); + if (column.getSecond() != null) - col += ":" + new String(Base64.encodeBase64(TextUtil.getBytes(column.getSecond())), Charset.forName("UTF-8")); ++ col += ":" + new String(Base64.encodeBase64(TextUtil.getBytes(column.getSecond()))); + columnStrings.add(col); + } - conf.setStrings(enumToConfKey(implementingClass, ScanOpts.COLUMNS), columnStrings.toArray(new String[0])); ++ ++ return columnStrings.toArray(new String[0]); + } + + /** + * Gets the columns to be mapped over from this job. + * + * @param implementingClass + * the class whose name will be used as a prefix for the property configuration key + * @param conf + * the Hadoop configuration object to configure + * @return a set of columns + * @since 1.5.0 + * @see #fetchColumns(Class, Configuration, Collection) + */ + public static Set> getFetchedColumns(Class implementingClass, Configuration conf) { ++ ArgumentChecker.notNull(conf); ++ ++ return deserializeFetchedColumns(conf.getStringCollection(enumToConfKey(implementingClass, ScanOpts.COLUMNS))); ++ } ++ ++ public static Set> deserializeFetchedColumns(Collection serialized) { + Set> columns = new HashSet>(); - for (String col : conf.getStringCollection(enumToConfKey(implementingClass, ScanOpts.COLUMNS))) { ++ ++ if (null == serialized) { ++ return columns; ++ } ++ ++ for (String col : serialized) { + int idx = col.indexOf(":"); - Text cf = new Text(idx < 0 ? Base64.decodeBase64(col.getBytes(Charset.forName("UTF-8"))) : Base64.decodeBase64(col.substring(0, idx).getBytes( - Charset.forName("UTF-8")))); ++ Text cf = new Text(idx < 0 ? Base64.decodeBase64(col.getBytes(Charset.forName("UTF-8"))) : Base64.decodeBase64(col.substring(0, idx).getBytes(Charset.forName("UTF-8")))); + Text cq = idx < 0 ? null : new Text(Base64.decodeBase64(col.substring(idx + 1).getBytes())); + columns.add(new Pair(cf, cq)); + } + return columns; + } + + /** + * Encode an iterator on the input for this job. + * + * @param implementingClass + * the class whose name will be used as a prefix for the property configuration key + * @param conf + * the Hadoop configuration object to configure + * @param cfg + * the configuration of the iterator + * @since 1.5.0 + */ + public static void addIterator(Class implementingClass, Configuration conf, IteratorSetting cfg) { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + String newIter; + try { + cfg.write(new DataOutputStream(baos)); + newIter = new String(Base64.encodeBase64(baos.toByteArray()), Charset.forName("UTF-8")); + baos.close(); + } catch (IOException e) { + throw new IllegalArgumentException("unable to serialize IteratorSetting"); + } + + String iterators = conf.get(enumToConfKey(implementingClass, ScanOpts.ITERATORS)); + // No iterators specified yet, create a new string + if (iterators == null || iterators.isEmpty()) { + iterators = newIter; + } else { + // append the next iterator & reset + iterators = iterators.concat(StringUtils.COMMA_STR + newIter); + } + // Store the iterators w/ the job + conf.set(enumToConfKey(implementingClass, ScanOpts.ITERATORS), iterators); + } + + /** + * Gets a list of the iterator settings (for iterators to apply to a scanner) from this configuration. + * + * @param implementingClass + * the class whose name will be used as a prefix for the property configuration key + * @param conf + * the Hadoop configuration object to configure + * @return a list of iterators + * @since 1.5.0 + * @see #addIterator(Class, Configuration, IteratorSetting) + */ + public static List getIterators(Class implementingClass, Configuration conf) { + String iterators = conf.get(enumToConfKey(implementingClass, ScanOpts.ITERATORS)); + + // If no iterators are present, return an empty list + if (iterators == null || iterators.isEmpty()) + return new ArrayList(); + + // Compose the set of iterators encoded in the job configuration + StringTokenizer tokens = new StringTokenizer(iterators, StringUtils.COMMA_STR); + List list = new ArrayList(); + try { + while (tokens.hasMoreTokens()) { + String itstring = tokens.nextToken(); + ByteArrayInputStream bais = new ByteArrayInputStream(Base64.decodeBase64(itstring.getBytes())); + list.add(new IteratorSetting(new DataInputStream(bais))); + bais.close(); + } + } catch (IOException e) { + throw new IllegalArgumentException("couldn't decode iterator settings"); + } + return list; + } + + /** + * Controls the automatic adjustment of ranges for this job. This feature merges overlapping ranges, then splits them to align with tablet boundaries. + * Disabling this feature will cause exactly one Map task to be created for each specified range. The default setting is enabled. * + * + *

+ * By default, this feature is enabled. + * + * @param implementingClass + * the class whose name will be used as a prefix for the property configuration key + * @param conf + * the Hadoop configuration object to configure + * @param enableFeature + * the feature is enabled if true, disabled otherwise + * @see #setRanges(Class, Configuration, Collection) + * @since 1.5.0 + */ + public static void setAutoAdjustRanges(Class implementingClass, Configuration conf, boolean enableFeature) { + conf.setBoolean(enumToConfKey(implementingClass, Features.AUTO_ADJUST_RANGES), enableFeature); + } + + /** + * Determines whether a configuration has auto-adjust ranges enabled. + * + * @param implementingClass + * the class whose name will be used as a prefix for the property configuration key + * @param conf + * the Hadoop configuration object to configure + * @return false if the feature is disabled, true otherwise + * @since 1.5.0 + * @see #setAutoAdjustRanges(Class, Configuration, boolean) + */ + public static Boolean getAutoAdjustRanges(Class implementingClass, Configuration conf) { + return conf.getBoolean(enumToConfKey(implementingClass, Features.AUTO_ADJUST_RANGES), true); + } + + /** + * Controls the use of the {@link IsolatedScanner} in this job. + * + *

+ * By default, this feature is disabled. + * + * @param implementingClass + * the class whose name will be used as a prefix for the property configuration key + * @param conf + * the Hadoop configuration object to configure + * @param enableFeature + * the feature is enabled if true, disabled otherwise + * @since 1.5.0 + */ + public static void setScanIsolation(Class implementingClass, Configuration conf, boolean enableFeature) { + conf.setBoolean(enumToConfKey(implementingClass, Features.SCAN_ISOLATION), enableFeature); + } + + /** + * Determines whether a configuration has isolation enabled. + * + * @param implementingClass + * the class whose name will be used as a prefix for the property configuration key + * @param conf + * the Hadoop configuration object to configure + * @return true if the feature is enabled, false otherwise + * @since 1.5.0 + * @see #setScanIsolation(Class, Configuration, boolean) + */ + public static Boolean isIsolated(Class implementingClass, Configuration conf) { + return conf.getBoolean(enumToConfKey(implementingClass, Features.SCAN_ISOLATION), false); + } + + /** + * Controls the use of the {@link ClientSideIteratorScanner} in this job. Enabling this feature will cause the iterator stack to be constructed within the Map + * task, rather than within the Accumulo TServer. To use this feature, all classes needed for those iterators must be available on the classpath for the task. + * + *

+ * By default, this feature is disabled. + * + * @param implementingClass + * the class whose name will be used as a prefix for the property configuration key + * @param conf + * the Hadoop configuration object to configure + * @param enableFeature + * the feature is enabled if true, disabled otherwise + * @since 1.5.0 + */ + public static void setLocalIterators(Class implementingClass, Configuration conf, boolean enableFeature) { + conf.setBoolean(enumToConfKey(implementingClass, Features.USE_LOCAL_ITERATORS), enableFeature); + } + + /** + * Determines whether a configuration uses local iterators. + * + * @param implementingClass + * the class whose name will be used as a prefix for the property configuration key + * @param conf + * the Hadoop configuration object to configure + * @return true if the feature is enabled, false otherwise + * @since 1.5.0 + * @see #setLocalIterators(Class, Configuration, boolean) + */ + public static Boolean usesLocalIterators(Class implementingClass, Configuration conf) { + return conf.getBoolean(enumToConfKey(implementingClass, Features.USE_LOCAL_ITERATORS), false); + } + + /** + *

+ * Enable reading offline tables. By default, this feature is disabled and only online tables are scanned. This will make the map reduce job directly read the + * table's files. If the table is not offline, then the job will fail. If the table comes online during the map reduce job, it is likely that the job will + * fail. + * + *

+ * To use this option, the map reduce user will need access to read the Accumulo directory in HDFS. + * + *

+ * Reading the offline table will create the scan time iterator stack in the map process. So any iterators that are configured for the table will need to be + * on the mapper's classpath. The accumulo-site.xml may need to be on the mapper's classpath if HDFS or the Accumulo directory in HDFS are non-standard. + * + *

+ * One way to use this feature is to clone a table, take the clone offline, and use the clone as the input table for a map reduce job. If you plan to map + * reduce over the data many times, it may be better to the compact the table, clone it, take it offline, and use the clone for all map reduce jobs. The + * reason to do this is that compaction will reduce each tablet in the table to one file, and it is faster to read from one file. + * + *

+ * There are two possible advantages to reading a tables file directly out of HDFS. First, you may see better read performance. Second, it will support + * speculative execution better. When reading an online table speculative execution can put more load on an already slow tablet server. + * + *

+ * By default, this feature is disabled. + * + * @param implementingClass + * the class whose name will be used as a prefix for the property configuration key + * @param conf + * the Hadoop configuration object to configure + * @param enableFeature + * the feature is enabled if true, disabled otherwise + * @since 1.5.0 + */ + public static void setOfflineTableScan(Class implementingClass, Configuration conf, boolean enableFeature) { + conf.setBoolean(enumToConfKey(implementingClass, Features.SCAN_OFFLINE), enableFeature); + } + + /** + * Determines whether a configuration has the offline table scan feature enabled. + * + * @param implementingClass + * the class whose name will be used as a prefix for the property configuration key + * @param conf + * the Hadoop configuration object to configure + * @return true if the feature is enabled, false otherwise + * @since 1.5.0 + * @see #setOfflineTableScan(Class, Configuration, boolean) + */ + public static Boolean isOfflineScan(Class implementingClass, Configuration conf) { + return conf.getBoolean(enumToConfKey(implementingClass, Features.SCAN_OFFLINE), false); + } + + /** + * Initializes an Accumulo {@link TabletLocator} based on the configuration. + * + * @param implementingClass + * the class whose name will be used as a prefix for the property configuration key + * @param conf + * the Hadoop configuration object to configure + * @return an Accumulo tablet locator + * @throws TableNotFoundException + * if the table name set on the configuration doesn't exist + * @since 1.5.0 + */ + public static TabletLocator getTabletLocator(Class implementingClass, Configuration conf) throws TableNotFoundException { + String instanceType = conf.get(enumToConfKey(implementingClass, InstanceOpts.TYPE)); + if ("MockInstance".equals(instanceType)) + return new MockTabletLocator(); + Instance instance = getInstance(implementingClass, conf); + String tableName = getInputTableName(implementingClass, conf); + return TabletLocator.getInstance(instance, new Text(Tables.getTableId(instance, tableName))); + } + + // InputFormat doesn't have the equivalent of OutputFormat's checkOutputSpecs(JobContext job) + /** + * Check whether a configuration is fully configured to be used with an Accumulo {@link org.apache.hadoop.mapreduce.InputFormat}. + * + * @param implementingClass + * the class whose name will be used as a prefix for the property configuration key + * @param conf + * the Hadoop configuration object to configure + * @throws IOException + * if the context is improperly configured + * @since 1.5.0 + */ + public static void validateOptions(Class implementingClass, Configuration conf) throws IOException { + if (!isConnectorInfoSet(implementingClass, conf)) + throw new IOException("Input info has not been set."); + String instanceKey = conf.get(enumToConfKey(implementingClass, InstanceOpts.TYPE)); + if (!"MockInstance".equals(instanceKey) && !"ZooKeeperInstance".equals(instanceKey)) + throw new IOException("Instance info has not been set."); + // validate that we can connect as configured + try { + Connector c = getInstance(implementingClass, conf).getConnector(getPrincipal(implementingClass, conf), + CredentialHelper.extractToken(getTokenClass(implementingClass, conf), getToken(implementingClass, conf))); + if (!c.securityOperations().authenticateUser(getPrincipal(implementingClass, conf), + CredentialHelper.extractToken(getTokenClass(implementingClass, conf), getToken(implementingClass, conf)))) + throw new IOException("Unable to authenticate user"); + if (!c.securityOperations().hasTablePermission(getPrincipal(implementingClass, conf), getInputTableName(implementingClass, conf), TablePermission.READ)) + throw new IOException("Unable to access table"); + + if (!conf.getBoolean(enumToConfKey(implementingClass, Features.USE_LOCAL_ITERATORS), false)) { + // validate that any scan-time iterators can be loaded by the the tablet servers + for (IteratorSetting iter : getIterators(implementingClass, conf)) { + if (!c.tableOperations().testClassLoad(getInputTableName(implementingClass, conf), iter.getIteratorClass(), SortedKeyValueIterator.class.getName())) + throw new AccumuloException("Servers are unable to load " + iter.getIteratorClass() + " as a " + SortedKeyValueIterator.class.getName()); + } + } + + } catch (AccumuloException e) { + throw new IOException(e); + } catch (AccumuloSecurityException e) { + throw new IOException(e); + } catch (TableNotFoundException e) { + throw new IOException(e); + } + } + +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/16a2e0f0/core/src/main/java/org/apache/accumulo/core/security/CredentialHelper.java ---------------------------------------------------------------------- diff --cc core/src/main/java/org/apache/accumulo/core/security/CredentialHelper.java index 69e3ba1,0000000..5ebab3f mode 100644,000000..100644 --- a/core/src/main/java/org/apache/accumulo/core/security/CredentialHelper.java +++ b/core/src/main/java/org/apache/accumulo/core/security/CredentialHelper.java @@@ -1,131 -1,0 +1,131 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.core.security; + +import java.io.ByteArrayInputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.charset.Charset; + +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.impl.thrift.SecurityErrorCode; +import org.apache.accumulo.core.client.security.tokens.AuthenticationToken; +import org.apache.accumulo.core.security.thrift.TCredentials; +import org.apache.commons.codec.binary.Base64; +import org.apache.commons.io.output.ByteArrayOutputStream; +import org.apache.log4j.Logger; +import org.apache.thrift.TDeserializer; +import org.apache.thrift.TException; +import org.apache.thrift.TSerializer; + +public class CredentialHelper { + static Logger log = Logger.getLogger(CredentialHelper.class); + + public static TCredentials create(String principal, AuthenticationToken token, String instanceID) throws AccumuloSecurityException { + String className = token.getClass().getName(); + return new TCredentials(principal, className, ByteBuffer.wrap(toBytes(token)), instanceID); + } + + public static String asBase64String(TCredentials cred) throws AccumuloSecurityException { + return new String(Base64.encodeBase64(asByteArray(cred)), Charset.forName("UTF-8")); + } + + public static byte[] asByteArray(TCredentials cred) throws AccumuloSecurityException { + TSerializer ts = new TSerializer(); + try { + return ts.serialize(cred); + } catch (TException e) { + // This really shouldn't happen + log.error(e, e); + throw new AccumuloSecurityException(cred.getPrincipal(), SecurityErrorCode.SERIALIZATION_ERROR); + } + } + + public static TCredentials fromBase64String(String string) throws AccumuloSecurityException { + return fromByteArray(Base64.decodeBase64(string.getBytes(Charset.forName("UTF-8")))); + } + + public static TCredentials fromByteArray(byte[] serializedCredential) throws AccumuloSecurityException { + if (serializedCredential == null) + return null; + TDeserializer td = new TDeserializer(); + try { + TCredentials toRet = new TCredentials(); + td.deserialize(toRet, serializedCredential); + return toRet; + } catch (TException e) { + // This really shouldn't happen + log.error(e, e); + throw new AccumuloSecurityException("unknown", SecurityErrorCode.SERIALIZATION_ERROR); + } + } + + public static AuthenticationToken extractToken(TCredentials toAuth) throws AccumuloSecurityException { + return extractToken(toAuth.tokenClassName, toAuth.getToken()); + } + + public static TCredentials createSquelchError(String principal, AuthenticationToken token, String instanceID) { + try { + return create(principal, token, instanceID); + } catch (AccumuloSecurityException e) { + log.error(e, e); + return null; + } + } + + public static String tokenAsBase64(AuthenticationToken token) throws AccumuloSecurityException { + return new String(Base64.encodeBase64(toBytes(token)), Charset.forName("UTF-8")); + } + - private static byte[] toBytes(AuthenticationToken token) throws AccumuloSecurityException { ++ public static byte[] toBytes(AuthenticationToken token) throws AccumuloSecurityException { + try { + ByteArrayOutputStream bais = new ByteArrayOutputStream(); + token.write(new DataOutputStream(bais)); + byte[] serializedToken = bais.toByteArray(); + bais.close(); + return serializedToken; + } catch (IOException e) { + log.error(e, e); + throw new AccumuloSecurityException("unknown", SecurityErrorCode.SERIALIZATION_ERROR); + } + + } + + public static AuthenticationToken extractToken(String tokenClass, byte[] token) throws AccumuloSecurityException { + try { + Object obj = Class.forName(tokenClass).newInstance(); + if (obj instanceof AuthenticationToken) { + AuthenticationToken toRet = (AuthenticationToken) obj; + toRet.readFields(new DataInputStream(new ByteArrayInputStream(token))); + return toRet; + } + } catch (ClassNotFoundException cnfe) { + log.error(cnfe, cnfe); + } catch (InstantiationException e) { + log.error(e, e); + } catch (IllegalAccessException e) { + log.error(e, e); + } catch (IOException e) { + log.error(e, e); + throw new AccumuloSecurityException("unknown", SecurityErrorCode.SERIALIZATION_ERROR); + } + throw new AccumuloSecurityException("unknown", SecurityErrorCode.INVALID_TOKEN); + } + +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/16a2e0f0/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormatTest1.java ---------------------------------------------------------------------- diff --cc core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormatTest1.java index 0000000,0000000..7239b01 new file mode 100644 --- /dev/null +++ b/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormatTest1.java @@@ -1,0 -1,0 +1,534 @@@ ++/* ++ * Licensed to the Apache Software Foundation (ASF) under one or more ++ * contributor license agreements. See the NOTICE file distributed with ++ * this work for additional information regarding copyright ownership. ++ * The ASF licenses this file to You under the Apache License, Version 2.0 ++ * (the "License"); you may not use this file except in compliance with ++ * the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, software ++ * distributed under the License is distributed on an "AS IS" BASIS, ++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ++ * See the License for the specific language governing permissions and ++ * limitations under the License. ++ */ ++package org.apache.accumulo.core.client.mapreduce; ++ ++import static org.junit.Assert.assertEquals; ++import static org.junit.Assert.assertTrue; ++ ++import java.io.IOException; ++import java.util.Collection; ++import java.util.Collections; ++import java.util.List; ++import java.util.regex.Pattern; ++ ++import org.apache.accumulo.core.client.BatchWriter; ++import org.apache.accumulo.core.client.Connector; ++import org.apache.accumulo.core.client.Instance; ++import org.apache.accumulo.core.client.IteratorSetting; ++import org.apache.accumulo.core.client.mapreduce.InputFormatBase.AccumuloIterator; ++import org.apache.accumulo.core.client.mapreduce.InputFormatBase.AccumuloIteratorOption; ++import org.apache.accumulo.core.client.mapreduce.InputFormatBase.RegexType; ++import org.apache.accumulo.core.client.mock.MockInstance; ++import org.apache.accumulo.core.data.Key; ++import org.apache.accumulo.core.data.Mutation; ++import org.apache.accumulo.core.data.Value; ++import org.apache.accumulo.core.iterators.user.WholeRowIterator; ++import org.apache.accumulo.core.security.Authorizations; ++import org.apache.accumulo.core.util.Pair; ++import org.apache.hadoop.conf.Configuration; ++import org.apache.hadoop.io.Text; ++import org.apache.hadoop.mapreduce.InputSplit; ++import org.apache.hadoop.mapreduce.Job; ++import org.apache.hadoop.mapreduce.JobContext; ++import org.apache.hadoop.mapreduce.JobID; ++import org.apache.hadoop.mapreduce.Mapper; ++import org.apache.hadoop.mapreduce.RecordReader; ++import org.apache.hadoop.mapreduce.TaskAttemptContext; ++import org.apache.hadoop.mapreduce.TaskAttemptID; ++import org.apache.log4j.Level; ++import org.junit.After; ++import org.junit.Assert; ++import org.junit.Test; ++ ++public class AccumuloInputFormatTest { ++ ++ @After ++ public void tearDown() throws Exception {} ++ ++ /** ++ * Test basic setting & getting of max versions. ++ * ++ * @throws IOException ++ * Signals that an I/O exception has occurred. ++ */ ++ @Test ++ public void testMaxVersions() throws IOException { ++ JobContext job = new JobContext(new Configuration(), new JobID()); ++ AccumuloInputFormat.setMaxVersions(job.getConfiguration(), 1); ++ int version = AccumuloInputFormat.getMaxVersions(job.getConfiguration()); ++ assertEquals(1, version); ++ } ++ ++ /** ++ * Test max versions with an invalid value. ++ * ++ * @throws IOException ++ * Signals that an I/O exception has occurred. ++ */ ++ @Test(expected = IOException.class) ++ public void testMaxVersionsLessThan1() throws IOException { ++ JobContext job = new JobContext(new Configuration(), new JobID()); ++ AccumuloInputFormat.setMaxVersions(job.getConfiguration(), 0); ++ } ++ ++ /** ++ * Test no max version configured. ++ */ ++ @Test ++ public void testNoMaxVersion() { ++ JobContext job = new JobContext(new Configuration(), new JobID()); ++ assertEquals(-1, AccumuloInputFormat.getMaxVersions(job.getConfiguration())); ++ } ++ ++ /** ++ * Check that the iterator configuration is getting stored in the Job conf correctly. ++ */ ++ @SuppressWarnings("deprecation") ++ @Test ++ public void testSetIterator() { ++ JobContext job = new JobContext(new Configuration(), new JobID()); ++ ++ AccumuloInputFormat.setIterator(job, 1, "org.apache.accumulo.core.iterators.WholeRowIterator", "WholeRow"); ++ Configuration conf = job.getConfiguration(); ++ String iterators = conf.get("AccumuloInputFormat.iterators"); ++ assertEquals("1:org.apache.accumulo.core.iterators.WholeRowIterator:WholeRow", iterators); ++ } ++ ++ @Test ++ public void testAddIterator() { ++ JobContext job = new JobContext(new Configuration(), new JobID()); ++ ++ AccumuloInputFormat.addIterator(job.getConfiguration(), new IteratorSetting(1, "WholeRow", WholeRowIterator.class)); ++ AccumuloInputFormat.addIterator(job.getConfiguration(), new IteratorSetting(2, "Versions", "org.apache.accumulo.core.iterators.VersioningIterator")); ++ IteratorSetting iter = new IteratorSetting(3, "Count", "org.apache.accumulo.core.iterators.CountingIterator"); ++ iter.addOption("v1", "1"); ++ iter.addOption("junk", "\0omg:!\\xyzzy"); ++ AccumuloInputFormat.addIterator(job.getConfiguration(), iter); ++ ++ List list = AccumuloInputFormat.getIterators(job.getConfiguration()); ++ ++ // Check the list size ++ assertTrue(list.size() == 3); ++ ++ // Walk the list and make sure our settings are correct ++ AccumuloIterator setting = list.get(0); ++ assertEquals(1, setting.getPriority()); ++ assertEquals("org.apache.accumulo.core.iterators.user.WholeRowIterator", setting.getIteratorClass()); ++ assertEquals("WholeRow", setting.getIteratorName()); ++ ++ setting = list.get(1); ++ assertEquals(2, setting.getPriority()); ++ assertEquals("org.apache.accumulo.core.iterators.VersioningIterator", setting.getIteratorClass()); ++ assertEquals("Versions", setting.getIteratorName()); ++ ++ setting = list.get(2); ++ assertEquals(3, setting.getPriority()); ++ assertEquals("org.apache.accumulo.core.iterators.CountingIterator", setting.getIteratorClass()); ++ assertEquals("Count", setting.getIteratorName()); ++ ++ List iteratorOptions = AccumuloInputFormat.getIteratorOptions(job.getConfiguration()); ++ assertEquals(2, iteratorOptions.size()); ++ assertEquals("Count", iteratorOptions.get(0).getIteratorName()); ++ assertEquals("Count", iteratorOptions.get(1).getIteratorName()); ++ assertEquals("v1", iteratorOptions.get(0).getKey()); ++ assertEquals("1", iteratorOptions.get(0).getValue()); ++ assertEquals("junk", iteratorOptions.get(1).getKey()); ++ assertEquals("\0omg:!\\xyzzy", iteratorOptions.get(1).getValue()); ++ } ++ ++ /** ++ * Test adding iterator options where the keys and values contain both the FIELD_SEPARATOR character (':') and ITERATOR_SEPARATOR (',') characters. There ++ * should be no exceptions thrown when trying to parse these types of option entries. ++ * ++ * This test makes sure that the expected raw values, as appears in the Job, are equal to what's expected. ++ */ ++ @Test ++ public void testIteratorOptionEncoding() throws Throwable { ++ String key = "colon:delimited:key"; ++ String value = "comma,delimited,value"; ++ IteratorSetting someSetting = new IteratorSetting(1, "iterator", "Iterator.class"); ++ someSetting.addOption(key, value); ++ Job job = new Job(); ++ AccumuloInputFormat.addIterator(job.getConfiguration(), someSetting); ++ ++ final String rawConfigOpt = new AccumuloIteratorOption("iterator", key, value).toString(); ++ ++ assertEquals(rawConfigOpt, job.getConfiguration().get("AccumuloInputFormat.iterators.options")); ++ ++ List opts = AccumuloInputFormat.getIteratorOptions(job.getConfiguration()); ++ assertEquals(1, opts.size()); ++ assertEquals(opts.get(0).getKey(), key); ++ assertEquals(opts.get(0).getValue(), value); ++ ++ someSetting.addOption(key + "2", value); ++ someSetting.setPriority(2); ++ someSetting.setName("it2"); ++ AccumuloInputFormat.addIterator(job.getConfiguration(), someSetting); ++ opts = AccumuloInputFormat.getIteratorOptions(job.getConfiguration()); ++ assertEquals(3, opts.size()); ++ for (AccumuloIteratorOption opt : opts) { ++ assertEquals(opt.getKey().substring(0, key.length()), key); ++ assertEquals(opt.getValue(), value); ++ } ++ } ++ ++ /** ++ * Test getting iterator settings for multiple iterators set ++ */ ++ @SuppressWarnings("deprecation") ++ @Test ++ public void testGetIteratorSettings() { ++ JobContext job = new JobContext(new Configuration(), new JobID()); ++ ++ AccumuloInputFormat.setIterator(job, 1, "org.apache.accumulo.core.iterators.WholeRowIterator", "WholeRow"); ++ AccumuloInputFormat.setIterator(job, 2, "org.apache.accumulo.core.iterators.VersioningIterator", "Versions"); ++ AccumuloInputFormat.setIterator(job, 3, "org.apache.accumulo.core.iterators.CountingIterator", "Count"); ++ ++ List list = AccumuloInputFormat.getIterators(job); ++ ++ // Check the list size ++ assertTrue(list.size() == 3); ++ ++ // Walk the list and make sure our settings are correct ++ AccumuloIterator setting = list.get(0); ++ assertEquals(1, setting.getPriority()); ++ assertEquals("org.apache.accumulo.core.iterators.WholeRowIterator", setting.getIteratorClass()); ++ assertEquals("WholeRow", setting.getIteratorName()); ++ ++ setting = list.get(1); ++ assertEquals(2, setting.getPriority()); ++ assertEquals("org.apache.accumulo.core.iterators.VersioningIterator", setting.getIteratorClass()); ++ assertEquals("Versions", setting.getIteratorName()); ++ ++ setting = list.get(2); ++ assertEquals(3, setting.getPriority()); ++ assertEquals("org.apache.accumulo.core.iterators.CountingIterator", setting.getIteratorClass()); ++ assertEquals("Count", setting.getIteratorName()); ++ ++ } ++ ++ /** ++ * Check that the iterator options are getting stored in the Job conf correctly. ++ */ ++ @SuppressWarnings("deprecation") ++ @Test ++ public void testSetIteratorOption() { ++ JobContext job = new JobContext(new Configuration(), new JobID()); ++ AccumuloInputFormat.setIteratorOption(job, "someIterator", "aKey", "aValue"); ++ ++ Configuration conf = job.getConfiguration(); ++ String options = conf.get("AccumuloInputFormat.iterators.options"); ++ assertEquals(new String("someIterator:aKey:aValue"), options); ++ } ++ ++ /** ++ * Test getting iterator options for multiple options set ++ */ ++ @SuppressWarnings("deprecation") ++ @Test ++ public void testGetIteratorOption() { ++ JobContext job = new JobContext(new Configuration(), new JobID()); ++ ++ AccumuloInputFormat.setIteratorOption(job, "iterator1", "key1", "value1"); ++ AccumuloInputFormat.setIteratorOption(job, "iterator2", "key2", "value2"); ++ AccumuloInputFormat.setIteratorOption(job, "iterator3", "key3", "value3"); ++ ++ List list = AccumuloInputFormat.getIteratorOptions(job); ++ ++ // Check the list size ++ assertEquals(3, list.size()); ++ ++ // Walk the list and make sure all the options are correct ++ AccumuloIteratorOption option = list.get(0); ++ assertEquals("iterator1", option.getIteratorName()); ++ assertEquals("key1", option.getKey()); ++ assertEquals("value1", option.getValue()); ++ ++ option = list.get(1); ++ assertEquals("iterator2", option.getIteratorName()); ++ assertEquals("key2", option.getKey()); ++ assertEquals("value2", option.getValue()); ++ ++ option = list.get(2); ++ assertEquals("iterator3", option.getIteratorName()); ++ assertEquals("key3", option.getKey()); ++ assertEquals("value3", option.getValue()); ++ } ++ ++ @SuppressWarnings("deprecation") ++ @Test ++ public void testSetRegex() { ++ JobContext job = new JobContext(new Configuration(), new JobID()); ++ ++ String regex = ">\"*%<>\'\\"; ++ ++ AccumuloInputFormat.setRegex(job, RegexType.ROW, regex); ++ ++ assertTrue(regex.equals(AccumuloInputFormat.getRegex(job, RegexType.ROW))); ++ } ++ ++ static class TestMapper extends Mapper { ++ Key key = null; ++ int count = 0; ++ ++ @Override ++ protected void map(Key k, Value v, Context context) throws IOException, InterruptedException { ++ if (key != null) ++ assertEquals(key.getRow().toString(), new String(v.get())); ++ assertEquals(k.getRow(), new Text(String.format("%09x", count + 1))); ++ assertEquals(new String(v.get()), String.format("%09x", count)); ++ key = new Key(k); ++ count++; ++ } ++ } ++ ++ @Test ++ public void testMap() throws Exception { ++ MockInstance mockInstance = new MockInstance("testmapinstance"); ++ Connector c = mockInstance.getConnector("root", new byte[] {}); ++ c.tableOperations().create("testtable"); ++ BatchWriter bw = c.createBatchWriter("testtable", 10000L, 1000L, 4); ++ for (int i = 0; i < 100; i++) { ++ Mutation m = new Mutation(new Text(String.format("%09x", i + 1))); ++ m.put(new Text(), new Text(), new Value(String.format("%09x", i).getBytes())); ++ bw.addMutation(m); ++ } ++ bw.close(); ++ ++ Job job = new Job(new Configuration()); ++ job.setInputFormatClass(AccumuloInputFormat.class); ++ job.setMapperClass(TestMapper.class); ++ job.setNumReduceTasks(0); ++ AccumuloInputFormat.setInputInfo(job.getConfiguration(), "root", "".getBytes(), "testtable", new Authorizations()); ++ AccumuloInputFormat.setMockInstance(job.getConfiguration(), "testmapinstance"); ++ ++ AccumuloInputFormat input = new AccumuloInputFormat(); ++ List splits = input.getSplits(job); ++ assertEquals(splits.size(), 1); ++ ++ TestMapper mapper = (TestMapper) job.getMapperClass().newInstance(); ++ for (InputSplit split : splits) { ++ RangeInputSplit risplit = (RangeInputSplit) split; ++ Assert.assertEquals("root", risplit.getUsername()); ++ Assert.assertArrayEquals(new byte[0], risplit.getPassword()); ++ Assert.assertEquals("testtable", risplit.getTable()); ++ Assert.assertEquals(new Authorizations(), risplit.getAuths()); ++ Assert.assertEquals("testmapinstance", risplit.getInstanceName()); ++ ++ TaskAttemptID id = new TaskAttemptID(); ++ TaskAttemptContext attempt = new TaskAttemptContext(job.getConfiguration(), id); ++ RecordReader reader = input.createRecordReader(split, attempt); ++ Mapper.Context context = mapper.new Context(job.getConfiguration(), id, reader, null, null, null, split); ++ reader.initialize(split, context); ++ mapper.run(context); ++ } ++ } ++ ++ @Test ++ public void testSimple() throws Exception { ++ MockInstance mockInstance = new MockInstance("testmapinstance"); ++ Connector c = mockInstance.getConnector("root", new byte[] {}); ++ c.tableOperations().create("testtable2"); ++ BatchWriter bw = c.createBatchWriter("testtable2", 10000L, 1000L, 4); ++ for (int i = 0; i < 100; i++) { ++ Mutation m = new Mutation(new Text(String.format("%09x", i + 1))); ++ m.put(new Text(), new Text(), new Value(String.format("%09x", i).getBytes())); ++ bw.addMutation(m); ++ } ++ bw.close(); ++ ++ JobContext job = new JobContext(new Configuration(), new JobID()); ++ AccumuloInputFormat.setInputInfo(job.getConfiguration(), "root", "".getBytes(), "testtable2", new Authorizations()); ++ AccumuloInputFormat.setMockInstance(job.getConfiguration(), "testmapinstance"); ++ AccumuloInputFormat input = new AccumuloInputFormat(); ++ RangeInputSplit ris = new RangeInputSplit(); ++ TaskAttemptContext tac = new TaskAttemptContext(job.getConfiguration(), new TaskAttemptID()); ++ RecordReader rr = input.createRecordReader(ris, tac); ++ rr.initialize(ris, tac); ++ ++ TestMapper mapper = new TestMapper(); ++ Mapper.Context context = mapper.new Context(job.getConfiguration(), tac.getTaskAttemptID(), rr, null, null, null, ris); ++ while (rr.nextKeyValue()) { ++ mapper.map(rr.getCurrentKey(), rr.getCurrentValue(), context); ++ } ++ } ++ ++ @SuppressWarnings("deprecation") ++ @Test ++ public void testRegex() throws Exception { ++ MockInstance mockInstance = new MockInstance("testmapinstance"); ++ Connector c = mockInstance.getConnector("root", new byte[] {}); ++ c.tableOperations().create("testtable3"); ++ BatchWriter bw = c.createBatchWriter("testtable3", 10000L, 1000L, 4); ++ for (int i = 0; i < 100; i++) { ++ Mutation m = new Mutation(new Text(String.format("%09x", i + 1))); ++ m.put(new Text(), new Text(), new Value(String.format("%09x", i).getBytes())); ++ bw.addMutation(m); ++ } ++ bw.close(); ++ ++ JobContext job = new JobContext(new Configuration(), new JobID()); ++ AccumuloInputFormat.setInputInfo(job.getConfiguration(), "root", "".getBytes(), "testtable3", new Authorizations()); ++ AccumuloInputFormat.setMockInstance(job.getConfiguration(), "testmapinstance"); ++ final String regex = ".*1.*"; ++ AccumuloInputFormat.setRegex(job, RegexType.ROW, regex); ++ AccumuloInputFormat input = new AccumuloInputFormat(); ++ RangeInputSplit ris = new RangeInputSplit(); ++ TaskAttemptContext tac = new TaskAttemptContext(job.getConfiguration(), new TaskAttemptID()); ++ RecordReader rr = input.createRecordReader(ris, tac); ++ rr.initialize(ris, tac); ++ ++ Pattern p = Pattern.compile(regex); ++ while (rr.nextKeyValue()) { ++ Assert.assertTrue(p.matcher(rr.getCurrentKey().getRow().toString()).matches()); ++ } ++ } ++ ++ @SuppressWarnings("deprecation") ++ @Test ++ public void testCorrectRangeInputSplits() throws Exception { ++ JobContext job = new JobContext(new Configuration(), new JobID()); ++ ++ String username = "user", table = "table", rowRegex = "row.*", colfRegex = "colf.*", colqRegex = "colq.*"; ++ String valRegex = "val.*", instance = "instance"; ++ byte[] password = "password".getBytes(); ++ Authorizations auths = new Authorizations("foo"); ++ Collection> fetchColumns = Collections.singleton(new Pair(new Text("foo"), new Text("bar"))); ++ boolean isolated = true, localIters = true; ++ int maxVersions = 5; ++ Level level = Level.WARN; ++ ++ Instance inst = new MockInstance(instance); ++ Connector connector = inst.getConnector(username, password); ++ connector.tableOperations().create(table); ++ ++ AccumuloInputFormat.setInputInfo(job, username, password, table, auths); ++ AccumuloInputFormat.setMockInstance(job, instance); ++ AccumuloInputFormat.setRegex(job, RegexType.ROW, rowRegex); ++ AccumuloInputFormat.setRegex(job, RegexType.COLUMN_FAMILY, colfRegex); ++ AccumuloInputFormat.setRegex(job, RegexType.COLUMN_QUALIFIER, colqRegex); ++ AccumuloInputFormat.setRegex(job, RegexType.VALUE, valRegex); ++ AccumuloInputFormat.setIsolated(job, isolated); ++ AccumuloInputFormat.setLocalIterators(job, localIters); ++ AccumuloInputFormat.setMaxVersions(job, maxVersions); ++ AccumuloInputFormat.fetchColumns(job, fetchColumns); ++ AccumuloInputFormat.setLogLevel(job, level); ++ ++ AccumuloInputFormat aif = new AccumuloInputFormat(); ++ ++ List splits = aif.getSplits(job); ++ ++ Assert.assertEquals(1, splits.size()); ++ ++ InputSplit split = splits.get(0); ++ ++ Assert.assertEquals(RangeInputSplit.class, split.getClass()); ++ ++ RangeInputSplit risplit = (RangeInputSplit) split; ++ ++ Assert.assertEquals(username, risplit.getUsername()); ++ Assert.assertEquals(table, risplit.getTable()); ++ Assert.assertArrayEquals(password, risplit.getPassword()); ++ Assert.assertEquals(auths, risplit.getAuths()); ++ Assert.assertEquals(instance, risplit.getInstanceName()); ++ Assert.assertEquals(rowRegex, risplit.getRowRegex()); ++ Assert.assertEquals(colfRegex, risplit.getColfamRegex()); ++ Assert.assertEquals(colqRegex, risplit.getColqualRegex()); ++ Assert.assertEquals(valRegex, risplit.getValueRegex()); ++ Assert.assertEquals(isolated, risplit.isIsolatedScan()); ++ Assert.assertEquals(localIters, risplit.usesLocalIterators()); ++ Assert.assertEquals(maxVersions, risplit.getMaxVersions().intValue()); ++ Assert.assertEquals(fetchColumns, risplit.getFetchedColumns()); ++ Assert.assertEquals(level, risplit.getLogLevel()); ++ } ++ ++ @Test ++ public void testPartialInputSplitDelegationToConfiguration() throws Exception { ++ MockInstance mockInstance = new MockInstance("testPartialInputSplitDelegationToConfiguration"); ++ Connector c = mockInstance.getConnector("root", new byte[] {}); ++ c.tableOperations().create("testtable"); ++ BatchWriter bw = c.createBatchWriter("testtable", 10000L, 1000L, 4); ++ for (int i = 0; i < 100; i++) { ++ Mutation m = new Mutation(new Text(String.format("%09x", i + 1))); ++ m.put(new Text(), new Text(), new Value(String.format("%09x", i).getBytes())); ++ bw.addMutation(m); ++ } ++ bw.close(); ++ ++ Job job = new Job(new Configuration()); ++ job.setInputFormatClass(AccumuloInputFormat.class); ++ job.setMapperClass(TestMapper.class); ++ job.setNumReduceTasks(0); ++ AccumuloInputFormat.setInputInfo(job.getConfiguration(), "root", "".getBytes(), "testtable", new Authorizations()); ++ AccumuloInputFormat.setMockInstance(job.getConfiguration(), "testPartialInputSplitDelegationToConfiguration"); ++ ++ AccumuloInputFormat input = new AccumuloInputFormat(); ++ List splits = input.getSplits(job); ++ assertEquals(splits.size(), 1); ++ ++ TestMapper mapper = (TestMapper) job.getMapperClass().newInstance(); ++ ++ RangeInputSplit emptySplit = new RangeInputSplit(); ++ ++ // Using an empty split should fall back to the information in the Job's Configuration ++ TaskAttemptID id = new TaskAttemptID(); ++ TaskAttemptContext attempt = new TaskAttemptContext(job.getConfiguration(), id); ++ RecordReader reader = input.createRecordReader(emptySplit, attempt); ++ Mapper.Context context = mapper.new Context(job.getConfiguration(), id, reader, null, null, null, emptySplit); ++ reader.initialize(emptySplit, context); ++ mapper.run(context); ++ } ++ ++ @Test(expected = IOException.class) ++ public void testPartialFailedInputSplitDelegationToConfiguration() throws Exception { ++ MockInstance mockInstance = new MockInstance("testPartialFailedInputSplitDelegationToConfiguration"); ++ Connector c = mockInstance.getConnector("root", new byte[] {}); ++ c.tableOperations().create("testtable"); ++ BatchWriter bw = c.createBatchWriter("testtable", 10000L, 1000L, 4); ++ for (int i = 0; i < 100; i++) { ++ Mutation m = new Mutation(new Text(String.format("%09x", i + 1))); ++ m.put(new Text(), new Text(), new Value(String.format("%09x", i).getBytes())); ++ bw.addMutation(m); ++ } ++ bw.close(); ++ ++ Job job = new Job(new Configuration()); ++ job.setInputFormatClass(AccumuloInputFormat.class); ++ job.setMapperClass(TestMapper.class); ++ job.setNumReduceTasks(0); ++ AccumuloInputFormat.setInputInfo(job.getConfiguration(), "root", "".getBytes(), "testtable", new Authorizations()); ++ AccumuloInputFormat.setMockInstance(job.getConfiguration(), "testPartialFailedInputSplitDelegationToConfiguration"); ++ ++ AccumuloInputFormat input = new AccumuloInputFormat(); ++ List splits = input.getSplits(job); ++ assertEquals(splits.size(), 1); ++ ++ TestMapper mapper = (TestMapper) job.getMapperClass().newInstance(); ++ ++ RangeInputSplit emptySplit = new RangeInputSplit(); ++ emptySplit.setUsername("root"); ++ emptySplit.setPassword("anythingelse".getBytes()); ++ ++ // Using an empty split should fall back to the information in the Job's Configuration ++ TaskAttemptID id = new TaskAttemptID(); ++ TaskAttemptContext attempt = new TaskAttemptContext(job.getConfiguration(), id); ++ RecordReader reader = input.createRecordReader(emptySplit, attempt); ++ Mapper.Context context = mapper.new Context(job.getConfiguration(), id, reader, null, null, null, emptySplit); ++ reader.initialize(emptySplit, context); ++ mapper.run(context); ++ } ++} http://git-wip-us.apache.org/repos/asf/accumulo/blob/16a2e0f0/core/src/test/java/org/apache/accumulo/core/client/mapreduce/RangeInputSplitTest.java ---------------------------------------------------------------------- diff --cc core/src/test/java/org/apache/accumulo/core/client/mapreduce/RangeInputSplitTest.java index 0000000,0000000..22fb6e1 new file mode 100644 --- /dev/null +++ b/core/src/test/java/org/apache/accumulo/core/client/mapreduce/RangeInputSplitTest.java @@@ -1,0 -1,0 +1,100 @@@ ++package org.apache.accumulo.core.client.mapreduce; ++ ++import java.io.ByteArrayInputStream; ++import java.io.ByteArrayOutputStream; ++import java.io.DataInputStream; ++import java.io.DataOutputStream; ++import java.io.IOException; ++import java.util.Arrays; ++import java.util.HashSet; ++import java.util.Set; ++ ++import org.apache.accumulo.core.data.Key; ++import org.apache.accumulo.core.data.Range; ++import org.apache.accumulo.core.security.Authorizations; ++import org.apache.accumulo.core.util.Pair; ++import org.apache.hadoop.io.Text; ++import org.apache.log4j.Level; ++import org.junit.Assert; ++import org.junit.Test; ++ ++public class RangeInputSplitTest { ++ ++ @Test ++ public void testSimpleWritable() throws IOException { ++ RangeInputSplit split = new RangeInputSplit(new Range(new Key("a"), new Key("b")), new String[]{"localhost"}); ++ ++ ByteArrayOutputStream baos = new ByteArrayOutputStream(); ++ DataOutputStream dos = new DataOutputStream(baos); ++ split.write(dos); ++ ++ RangeInputSplit newSplit = new RangeInputSplit(); ++ ++ ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray()); ++ DataInputStream dis = new DataInputStream(bais); ++ newSplit.readFields(dis); ++ ++ Assert.assertEquals(split.getRange(), newSplit.getRange()); ++ Assert.assertTrue(Arrays.equals(split.getLocations(), newSplit.getLocations())); ++ } ++ ++ ++ ++ @Test ++ public void testAllFieldsWritable() throws IOException { ++ RangeInputSplit split = new RangeInputSplit(new Range(new Key("a"), new Key("b")), new String[]{"localhost"}); ++ ++ Set> fetchedColumns = new HashSet>(); ++ ++ fetchedColumns.add(new Pair(new Text("colf1"), new Text("colq1"))); ++ fetchedColumns.add(new Pair(new Text("colf2"), new Text("colq2"))); ++ ++ split.setAuths(new Authorizations("foo")); ++ split.setOffline(true); ++ split.setIsolatedScan(true); ++ split.setUsesLocalIterators(true); ++ split.setMaxVersions(5); ++ split.setRowRegex("row"); ++ split.setColfamRegex("colf"); ++ split.setColqualRegex("colq"); ++ split.setValueRegex("value"); ++ split.setFetchedColumns(fetchedColumns); ++ split.setPassword("password".getBytes()); ++ split.setUsername("root"); ++ split.setInstanceName("instance"); ++ split.setMockInstance(true); ++ split.setZooKeepers("localhost"); ++ split.setLogLevel(Level.WARN); ++ ++ ByteArrayOutputStream baos = new ByteArrayOutputStream(); ++ DataOutputStream dos = new DataOutputStream(baos); ++ split.write(dos); ++ ++ RangeInputSplit newSplit = new RangeInputSplit(); ++ ++ ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray()); ++ DataInputStream dis = new DataInputStream(bais); ++ newSplit.readFields(dis); ++ ++ Assert.assertEquals(split.getRange(), newSplit.getRange()); ++ Assert.assertArrayEquals(split.getLocations(), newSplit.getLocations()); ++ ++ Assert.assertEquals(split.getAuths(), newSplit.getAuths()); ++ Assert.assertEquals(split.isOffline(), newSplit.isOffline()); ++ Assert.assertEquals(split.isIsolatedScan(), newSplit.isOffline()); ++ Assert.assertEquals(split.usesLocalIterators(), newSplit.usesLocalIterators()); ++ Assert.assertEquals(split.getMaxVersions(), newSplit.getMaxVersions()); ++ Assert.assertEquals(split.getRowRegex(), newSplit.getRowRegex()); ++ Assert.assertEquals(split.getColfamRegex(), newSplit.getColfamRegex()); ++ Assert.assertEquals(split.getColqualRegex(), newSplit.getColqualRegex()); ++ Assert.assertEquals(split.getValueRegex(), newSplit.getValueRegex()); ++ Assert.assertEquals(split.getFetchedColumns(), newSplit.getFetchedColumns()); ++ Assert.assertEquals(new String(split.getPassword()), new String(newSplit.getPassword())); ++ Assert.assertEquals(split.getUsername(), newSplit.getUsername()); ++ Assert.assertEquals(split.getInstanceName(), newSplit.getInstanceName()); ++ Assert.assertEquals(split.isMockInstance(), newSplit.isMockInstance()); ++ Assert.assertEquals(split.getZooKeepers(), newSplit.getZooKeepers()); ++ Assert.assertEquals(split.getLogLevel(), newSplit.getLogLevel()); ++ } ++ ++}