Return-Path: X-Original-To: apmail-accumulo-commits-archive@www.apache.org Delivered-To: apmail-accumulo-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 03D1B117C0 for ; Mon, 21 Apr 2014 23:48:27 +0000 (UTC) Received: (qmail 14353 invoked by uid 500); 21 Apr 2014 23:48:14 -0000 Delivered-To: apmail-accumulo-commits-archive@accumulo.apache.org Received: (qmail 14147 invoked by uid 500); 21 Apr 2014 23:48:09 -0000 Mailing-List: contact commits-help@accumulo.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@accumulo.apache.org Delivered-To: mailing list commits@accumulo.apache.org Received: (qmail 13880 invoked by uid 99); 21 Apr 2014 23:48:04 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 21 Apr 2014 23:48:04 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id CFA32839063; Mon, 21 Apr 2014 23:48:03 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: mdrob@apache.org To: commits@accumulo.apache.org Date: Mon, 21 Apr 2014 23:48:10 -0000 Message-Id: <9826ccd985ac4baf85521cc9af4ba11a@git.apache.org> In-Reply-To: <240dd757dcce408aa794d95887da7cc2@git.apache.org> References: <240dd757dcce408aa794d95887da7cc2@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [08/11] ACCUMULO-1880 create mapreduce module http://git-wip-us.apache.org/repos/asf/accumulo/blob/99baad37/core/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/impl/InputConfigurator.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/impl/InputConfigurator.java b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/impl/InputConfigurator.java deleted file mode 100644 index 7657c3c..0000000 --- a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/impl/InputConfigurator.java +++ /dev/null @@ -1,796 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.accumulo.core.client.mapreduce.lib.impl; - -import static com.google.common.base.Preconditions.checkArgument; - -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.DataInputStream; -import java.io.DataOutputStream; -import java.io.IOException; -import java.nio.charset.StandardCharsets; -import java.util.ArrayList; -import java.util.Collection; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.StringTokenizer; - -import org.apache.accumulo.core.client.AccumuloException; -import org.apache.accumulo.core.client.AccumuloSecurityException; -import org.apache.accumulo.core.client.ClientSideIteratorScanner; -import org.apache.accumulo.core.client.Connector; -import org.apache.accumulo.core.client.Instance; -import org.apache.accumulo.core.client.IsolatedScanner; -import org.apache.accumulo.core.client.IteratorSetting; -import org.apache.accumulo.core.client.RowIterator; -import org.apache.accumulo.core.client.Scanner; -import org.apache.accumulo.core.client.TableNotFoundException; -import org.apache.accumulo.core.client.impl.Tables; -import org.apache.accumulo.core.client.impl.TabletLocator; -import org.apache.accumulo.core.client.mapreduce.InputTableConfig; -import org.apache.accumulo.core.client.mock.MockTabletLocator; -import org.apache.accumulo.core.client.security.tokens.AuthenticationToken; -import org.apache.accumulo.core.data.Key; -import org.apache.accumulo.core.data.KeyExtent; -import org.apache.accumulo.core.data.PartialKey; -import org.apache.accumulo.core.data.Range; -import org.apache.accumulo.core.data.Value; -import org.apache.accumulo.core.iterators.SortedKeyValueIterator; -import org.apache.accumulo.core.master.state.tables.TableState; -import org.apache.accumulo.core.metadata.MetadataTable; -import org.apache.accumulo.core.metadata.schema.MetadataSchema; -import org.apache.accumulo.core.security.Authorizations; -import org.apache.accumulo.core.security.TablePermission; -import org.apache.accumulo.core.util.Pair; -import org.apache.accumulo.core.util.TextUtil; -import org.apache.commons.codec.binary.Base64; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.io.MapWritable; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.util.StringUtils; - -import com.google.common.collect.Maps; - -/** - * @since 1.6.0 - */ -public class InputConfigurator extends ConfiguratorBase { - - /** - * Configuration keys for {@link Scanner}. - * - * @since 1.6.0 - */ - public static enum ScanOpts { - TABLE_NAME, AUTHORIZATIONS, RANGES, COLUMNS, ITERATORS, TABLE_CONFIGS - } - - /** - * Configuration keys for various features. - * - * @since 1.6.0 - */ - public static enum Features { - AUTO_ADJUST_RANGES, SCAN_ISOLATION, USE_LOCAL_ITERATORS, SCAN_OFFLINE - } - - /** - * Sets the name of the input table, over which this job will scan. - * - * @param implementingClass - * the class whose name will be used as a prefix for the property configuration key - * @param conf - * the Hadoop configuration object to configure - * @param tableName - * the table to use when the tablename is null in the write call - * @since 1.6.0 - */ - public static void setInputTableName(Class implementingClass, Configuration conf, String tableName) { - checkArgument(tableName != null, "tableName is null"); - conf.set(enumToConfKey(implementingClass, ScanOpts.TABLE_NAME), tableName); - } - - /** - * Sets the name of the input table, over which this job will scan. - * - * @param implementingClass - * the class whose name will be used as a prefix for the property configuration key - * @param conf - * the Hadoop configuration object to configure - * @since 1.6.0 - */ - public static String getInputTableName(Class implementingClass, Configuration conf) { - return conf.get(enumToConfKey(implementingClass, ScanOpts.TABLE_NAME)); - } - - /** - * Sets the {@link Authorizations} used to scan. Must be a subset of the user's authorization. Defaults to the empty set. - * - * @param implementingClass - * the class whose name will be used as a prefix for the property configuration key - * @param conf - * the Hadoop configuration object to configure - * @param auths - * the user's authorizations - * @since 1.6.0 - */ - public static void setScanAuthorizations(Class implementingClass, Configuration conf, Authorizations auths) { - if (auths != null && !auths.isEmpty()) - conf.set(enumToConfKey(implementingClass, ScanOpts.AUTHORIZATIONS), auths.serialize()); - } - - /** - * Gets the authorizations to set for the scans from the configuration. - * - * @param implementingClass - * the class whose name will be used as a prefix for the property configuration key - * @param conf - * the Hadoop configuration object to configure - * @return the Accumulo scan authorizations - * @since 1.6.0 - * @see #setScanAuthorizations(Class, Configuration, Authorizations) - */ - public static Authorizations getScanAuthorizations(Class implementingClass, Configuration conf) { - String authString = conf.get(enumToConfKey(implementingClass, ScanOpts.AUTHORIZATIONS)); - return authString == null ? Authorizations.EMPTY : new Authorizations(authString.getBytes(StandardCharsets.UTF_8)); - } - - /** - * Sets the input ranges to scan on all input tables for this job. If not set, the entire table will be scanned. - * - * @param implementingClass - * the class whose name will be used as a prefix for the property configuration key - * @param conf - * the Hadoop configuration object to configure - * @param ranges - * the ranges that will be mapped over - * @throws IllegalArgumentException - * if the ranges cannot be encoded into base 64 - * @since 1.6.0 - */ - public static void setRanges(Class implementingClass, Configuration conf, Collection ranges) { - checkArgument(ranges != null, "ranges is null"); - - ArrayList rangeStrings = new ArrayList(ranges.size()); - try { - for (Range r : ranges) { - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - r.write(new DataOutputStream(baos)); - rangeStrings.add(new String(Base64.encodeBase64(baos.toByteArray()))); - } - conf.setStrings(enumToConfKey(implementingClass, ScanOpts.RANGES), rangeStrings.toArray(new String[0])); - } catch (IOException ex) { - throw new IllegalArgumentException("Unable to encode ranges to Base64", ex); - } - } - - /** - * Gets the ranges to scan over from a job. - * - * @param implementingClass - * the class whose name will be used as a prefix for the property configuration key - * @param conf - * the Hadoop configuration object to configure - * @return the ranges - * @throws IOException - * if the ranges have been encoded improperly - * @since 1.6.0 - * @see #setRanges(Class, Configuration, Collection) - */ - public static List getRanges(Class implementingClass, Configuration conf) throws IOException { - - Collection encodedRanges = conf.getStringCollection(enumToConfKey(implementingClass, ScanOpts.RANGES)); - List ranges = new ArrayList(); - for (String rangeString : encodedRanges) { - ByteArrayInputStream bais = new ByteArrayInputStream(Base64.decodeBase64(rangeString.getBytes())); - Range range = new Range(); - range.readFields(new DataInputStream(bais)); - ranges.add(range); - } - return ranges; - } - - /** - * Gets a list of the iterator settings (for iterators to apply to a scanner) from this configuration. - * - * @param implementingClass - * the class whose name will be used as a prefix for the property configuration key - * @param conf - * the Hadoop configuration object to configure - * @return a list of iterators - * @since 1.6.0 - * @see #addIterator(Class, Configuration, IteratorSetting) - */ - public static List getIterators(Class implementingClass, Configuration conf) { - String iterators = conf.get(enumToConfKey(implementingClass, ScanOpts.ITERATORS)); - - // If no iterators are present, return an empty list - if (iterators == null || iterators.isEmpty()) - return new ArrayList(); - - // Compose the set of iterators encoded in the job configuration - StringTokenizer tokens = new StringTokenizer(iterators, StringUtils.COMMA_STR); - List list = new ArrayList(); - try { - while (tokens.hasMoreTokens()) { - String itstring = tokens.nextToken(); - ByteArrayInputStream bais = new ByteArrayInputStream(Base64.decodeBase64(itstring.getBytes())); - list.add(new IteratorSetting(new DataInputStream(bais))); - bais.close(); - } - } catch (IOException e) { - throw new IllegalArgumentException("couldn't decode iterator settings"); - } - return list; - } - - /** - * Restricts the columns that will be mapped over for the single input table on this job. - * - * @param implementingClass - * the class whose name will be used as a prefix for the property configuration key - * @param conf - * the Hadoop configuration object to configure - * @param columnFamilyColumnQualifierPairs - * a pair of {@link Text} objects corresponding to column family and column qualifier. If the column qualifier is null, the entire column family is - * selected. An empty set is the default and is equivalent to scanning the all columns. - * @throws IllegalArgumentException - * if the column family is null - * @since 1.6.0 - */ - public static void fetchColumns(Class implementingClass, Configuration conf, Collection> columnFamilyColumnQualifierPairs) { - checkArgument(columnFamilyColumnQualifierPairs != null, "columnFamilyColumnQualifierPairs is null"); - String[] columnStrings = serializeColumns(columnFamilyColumnQualifierPairs); - conf.setStrings(enumToConfKey(implementingClass, ScanOpts.COLUMNS), columnStrings); - } - - public static String[] serializeColumns(Collection> columnFamilyColumnQualifierPairs) { - checkArgument(columnFamilyColumnQualifierPairs != null, "columnFamilyColumnQualifierPairs is null"); - ArrayList columnStrings = new ArrayList(columnFamilyColumnQualifierPairs.size()); - for (Pair column : columnFamilyColumnQualifierPairs) { - - if (column.getFirst() == null) - throw new IllegalArgumentException("Column family can not be null"); - - String col = new String(Base64.encodeBase64(TextUtil.getBytes(column.getFirst())), StandardCharsets.UTF_8); - if (column.getSecond() != null) - col += ":" + new String(Base64.encodeBase64(TextUtil.getBytes(column.getSecond())), StandardCharsets.UTF_8); - columnStrings.add(col); - } - - return columnStrings.toArray(new String[0]); - } - - /** - * Gets the columns to be mapped over from this job. - * - * @param implementingClass - * the class whose name will be used as a prefix for the property configuration key - * @param conf - * the Hadoop configuration object to configure - * @return a set of columns - * @since 1.6.0 - * @see #fetchColumns(Class, Configuration, Collection) - */ - public static Set> getFetchedColumns(Class implementingClass, Configuration conf) { - checkArgument(conf != null, "conf is null"); - String confValue = conf.get(enumToConfKey(implementingClass, ScanOpts.COLUMNS)); - List serialized = new ArrayList(); - if (confValue != null) { - // Split and include any trailing empty strings to allow empty column families - for (String val : confValue.split(",", -1)) { - serialized.add(val); - } - } - return deserializeFetchedColumns(serialized); - } - - public static Set> deserializeFetchedColumns(Collection serialized) { - Set> columns = new HashSet>(); - - if (null == serialized) { - return columns; - } - - for (String col : serialized) { - int idx = col.indexOf(":"); - Text cf = new Text(idx < 0 ? Base64.decodeBase64(col.getBytes(StandardCharsets.UTF_8)) : Base64.decodeBase64(col.substring(0, idx).getBytes( - StandardCharsets.UTF_8))); - Text cq = idx < 0 ? null : new Text(Base64.decodeBase64(col.substring(idx + 1).getBytes(StandardCharsets.UTF_8))); - columns.add(new Pair(cf, cq)); - } - return columns; - } - - /** - * Encode an iterator on the input for the single input table associated with this job. - * - * @param implementingClass - * the class whose name will be used as a prefix for the property configuration key - * @param conf - * the Hadoop configuration object to configure - * @param cfg - * the configuration of the iterator - * @throws IllegalArgumentException - * if the iterator can't be serialized into the configuration - * @since 1.6.0 - */ - public static void addIterator(Class implementingClass, Configuration conf, IteratorSetting cfg) { - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - String newIter; - try { - cfg.write(new DataOutputStream(baos)); - newIter = new String(Base64.encodeBase64(baos.toByteArray()), StandardCharsets.UTF_8); - baos.close(); - } catch (IOException e) { - throw new IllegalArgumentException("unable to serialize IteratorSetting"); - } - - String confKey = enumToConfKey(implementingClass, ScanOpts.ITERATORS); - String iterators = conf.get(confKey); - // No iterators specified yet, create a new string - if (iterators == null || iterators.isEmpty()) { - iterators = newIter; - } else { - // append the next iterator & reset - iterators = iterators.concat(StringUtils.COMMA_STR + newIter); - } - // Store the iterators w/ the job - conf.set(confKey, iterators); - } - - /** - * Controls the automatic adjustment of ranges for this job. This feature merges overlapping ranges, then splits them to align with tablet boundaries. - * Disabling this feature will cause exactly one Map task to be created for each specified range. The default setting is enabled. * - * - *

- * By default, this feature is enabled. - * - * @param implementingClass - * the class whose name will be used as a prefix for the property configuration key - * @param conf - * the Hadoop configuration object to configure - * @param enableFeature - * the feature is enabled if true, disabled otherwise - * @see #setRanges(Class, Configuration, Collection) - * @since 1.6.0 - */ - public static void setAutoAdjustRanges(Class implementingClass, Configuration conf, boolean enableFeature) { - conf.setBoolean(enumToConfKey(implementingClass, Features.AUTO_ADJUST_RANGES), enableFeature); - } - - /** - * Determines whether a configuration has auto-adjust ranges enabled. - * - * @param implementingClass - * the class whose name will be used as a prefix for the property configuration key - * @param conf - * the Hadoop configuration object to configure - * @return false if the feature is disabled, true otherwise - * @since 1.6.0 - * @see #setAutoAdjustRanges(Class, Configuration, boolean) - */ - public static Boolean getAutoAdjustRanges(Class implementingClass, Configuration conf) { - return conf.getBoolean(enumToConfKey(implementingClass, Features.AUTO_ADJUST_RANGES), true); - } - - /** - * Controls the use of the {@link IsolatedScanner} in this job. - * - *

- * By default, this feature is disabled. - * - * @param implementingClass - * the class whose name will be used as a prefix for the property configuration key - * @param conf - * the Hadoop configuration object to configure - * @param enableFeature - * the feature is enabled if true, disabled otherwise - * @since 1.6.0 - */ - public static void setScanIsolation(Class implementingClass, Configuration conf, boolean enableFeature) { - conf.setBoolean(enumToConfKey(implementingClass, Features.SCAN_ISOLATION), enableFeature); - } - - /** - * Determines whether a configuration has isolation enabled. - * - * @param implementingClass - * the class whose name will be used as a prefix for the property configuration key - * @param conf - * the Hadoop configuration object to configure - * @return true if the feature is enabled, false otherwise - * @since 1.6.0 - * @see #setScanIsolation(Class, Configuration, boolean) - */ - public static Boolean isIsolated(Class implementingClass, Configuration conf) { - return conf.getBoolean(enumToConfKey(implementingClass, Features.SCAN_ISOLATION), false); - } - - /** - * Controls the use of the {@link ClientSideIteratorScanner} in this job. Enabling this feature will cause the iterator stack to be constructed within the Map - * task, rather than within the Accumulo TServer. To use this feature, all classes needed for those iterators must be available on the classpath for the task. - * - *

- * By default, this feature is disabled. - * - * @param implementingClass - * the class whose name will be used as a prefix for the property configuration key - * @param conf - * the Hadoop configuration object to configure - * @param enableFeature - * the feature is enabled if true, disabled otherwise - * @since 1.6.0 - */ - public static void setLocalIterators(Class implementingClass, Configuration conf, boolean enableFeature) { - conf.setBoolean(enumToConfKey(implementingClass, Features.USE_LOCAL_ITERATORS), enableFeature); - } - - /** - * Determines whether a configuration uses local iterators. - * - * @param implementingClass - * the class whose name will be used as a prefix for the property configuration key - * @param conf - * the Hadoop configuration object to configure - * @return true if the feature is enabled, false otherwise - * @since 1.6.0 - * @see #setLocalIterators(Class, Configuration, boolean) - */ - public static Boolean usesLocalIterators(Class implementingClass, Configuration conf) { - return conf.getBoolean(enumToConfKey(implementingClass, Features.USE_LOCAL_ITERATORS), false); - } - - /** - *

- * Enable reading offline tables. By default, this feature is disabled and only online tables are scanned. This will make the map reduce job directly read the - * table's files. If the table is not offline, then the job will fail. If the table comes online during the map reduce job, it is likely that the job will - * fail. - * - *

- * To use this option, the map reduce user will need access to read the Accumulo directory in HDFS. - * - *

- * Reading the offline table will create the scan time iterator stack in the map process. So any iterators that are configured for the table will need to be - * on the mapper's classpath. - * - *

- * One way to use this feature is to clone a table, take the clone offline, and use the clone as the input table for a map reduce job. If you plan to map - * reduce over the data many times, it may be better to the compact the table, clone it, take it offline, and use the clone for all map reduce jobs. The - * reason to do this is that compaction will reduce each tablet in the table to one file, and it is faster to read from one file. - * - *

- * There are two possible advantages to reading a tables file directly out of HDFS. First, you may see better read performance. Second, it will support - * speculative execution better. When reading an online table speculative execution can put more load on an already slow tablet server. - * - *

- * By default, this feature is disabled. - * - * @param implementingClass - * the class whose name will be used as a prefix for the property configuration key - * @param conf - * the Hadoop configuration object to configure - * @param enableFeature - * the feature is enabled if true, disabled otherwise - * @since 1.6.0 - */ - public static void setOfflineTableScan(Class implementingClass, Configuration conf, boolean enableFeature) { - conf.setBoolean(enumToConfKey(implementingClass, Features.SCAN_OFFLINE), enableFeature); - } - - /** - * Determines whether a configuration has the offline table scan feature enabled. - * - * @param implementingClass - * the class whose name will be used as a prefix for the property configuration key - * @param conf - * the Hadoop configuration object to configure - * @return true if the feature is enabled, false otherwise - * @since 1.6.0 - * @see #setOfflineTableScan(Class, Configuration, boolean) - */ - public static Boolean isOfflineScan(Class implementingClass, Configuration conf) { - return conf.getBoolean(enumToConfKey(implementingClass, Features.SCAN_OFFLINE), false); - } - - /** - * Sets configurations for multiple tables at a time. - * - * @param implementingClass - * the class whose name will be used as a prefix for the property configuration key - * @param conf - * the Hadoop configuration object to configure - * @param configs - * an array of {@link InputTableConfig} objects to associate with the job - * @since 1.6.0 - */ - public static void setInputTableConfigs(Class implementingClass, Configuration conf, Map configs) { - MapWritable mapWritable = new MapWritable(); - for (Map.Entry tableConfig : configs.entrySet()) - mapWritable.put(new Text(tableConfig.getKey()), tableConfig.getValue()); - - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - try { - mapWritable.write(new DataOutputStream(baos)); - } catch (IOException e) { - throw new IllegalStateException("Table configuration could not be serialized."); - } - - String confKey = enumToConfKey(implementingClass, ScanOpts.TABLE_CONFIGS); - conf.set(confKey, new String(Base64.encodeBase64(baos.toByteArray()))); - } - - /** - * Returns all {@link InputTableConfig} objects associated with this job. - * - * @param implementingClass - * the class whose name will be used as a prefix for the property configuration key - * @param conf - * the Hadoop configuration object to configure - * @return all of the table query configs for the job - * @since 1.6.0 - */ - public static Map getInputTableConfigs(Class implementingClass, Configuration conf) { - Map configs = new HashMap(); - Map.Entry defaultConfig = getDefaultInputTableConfig(implementingClass, conf); - if (defaultConfig != null) - configs.put(defaultConfig.getKey(), defaultConfig.getValue()); - String configString = conf.get(enumToConfKey(implementingClass, ScanOpts.TABLE_CONFIGS)); - MapWritable mapWritable = new MapWritable(); - if (configString != null) { - try { - byte[] bytes = Base64.decodeBase64(configString.getBytes()); - ByteArrayInputStream bais = new ByteArrayInputStream(bytes); - mapWritable.readFields(new DataInputStream(bais)); - bais.close(); - } catch (IOException e) { - throw new IllegalStateException("The table query configurations could not be deserialized from the given configuration"); - } - } - for (Map.Entry entry : mapWritable.entrySet()) - configs.put(((Text) entry.getKey()).toString(), (InputTableConfig) entry.getValue()); - - return configs; - } - - /** - * Returns the {@link InputTableConfig} for the given table - * - * @param implementingClass - * the class whose name will be used as a prefix for the property configuration key - * @param conf - * the Hadoop configuration object to configure - * @param tableName - * the table name for which to fetch the table query config - * @return the table query config for the given table name (if it exists) and null if it does not - * @since 1.6.0 - */ - public static InputTableConfig getInputTableConfig(Class implementingClass, Configuration conf, String tableName) { - Map queryConfigs = getInputTableConfigs(implementingClass, conf); - return queryConfigs.get(tableName); - } - - /** - * Initializes an Accumulo {@link TabletLocator} based on the configuration. - * - * @param implementingClass - * the class whose name will be used as a prefix for the property configuration key - * @param conf - * the Hadoop configuration object to configure - * @param tableId - * The table id for which to initialize the {@link TabletLocator} - * @return an Accumulo tablet locator - * @throws TableNotFoundException - * if the table name set on the configuration doesn't exist - * @since 1.6.0 - */ - public static TabletLocator getTabletLocator(Class implementingClass, Configuration conf, String tableId) throws TableNotFoundException { - String instanceType = conf.get(enumToConfKey(implementingClass, InstanceOpts.TYPE)); - if ("MockInstance".equals(instanceType)) - return new MockTabletLocator(); - Instance instance = getInstance(implementingClass, conf); - return TabletLocator.getLocator(instance, new Text(tableId)); - } - - // InputFormat doesn't have the equivalent of OutputFormat's checkOutputSpecs(JobContext job) - /** - * Check whether a configuration is fully configured to be used with an Accumulo {@link org.apache.hadoop.mapreduce.InputFormat}. - * - * @param implementingClass - * the class whose name will be used as a prefix for the property configuration key - * @param conf - * the Hadoop configuration object to configure - * @throws IOException - * if the context is improperly configured - * @since 1.6.0 - */ - public static void validateOptions(Class implementingClass, Configuration conf) throws IOException { - - Map inputTableConfigs = getInputTableConfigs(implementingClass, conf); - if (!isConnectorInfoSet(implementingClass, conf)) - throw new IOException("Input info has not been set."); - String instanceKey = conf.get(enumToConfKey(implementingClass, InstanceOpts.TYPE)); - if (!"MockInstance".equals(instanceKey) && !"ZooKeeperInstance".equals(instanceKey)) - throw new IOException("Instance info has not been set."); - // validate that we can connect as configured - try { - String principal = getPrincipal(implementingClass, conf); - AuthenticationToken token = getAuthenticationToken(implementingClass, conf); - Connector c = getInstance(implementingClass, conf).getConnector(principal, token); - if (!c.securityOperations().authenticateUser(principal, token)) - throw new IOException("Unable to authenticate user"); - - if (getInputTableConfigs(implementingClass, conf).size() == 0) - throw new IOException("No table set."); - - for (Map.Entry tableConfig : inputTableConfigs.entrySet()) { - if (!c.securityOperations().hasTablePermission(getPrincipal(implementingClass, conf), tableConfig.getKey(), TablePermission.READ)) - throw new IOException("Unable to access table"); - } - for (Map.Entry tableConfigEntry : inputTableConfigs.entrySet()) { - InputTableConfig tableConfig = tableConfigEntry.getValue(); - if (!tableConfig.shouldUseLocalIterators()) { - if (tableConfig.getIterators() != null) { - for (IteratorSetting iter : tableConfig.getIterators()) { - if (!c.tableOperations().testClassLoad(tableConfigEntry.getKey(), iter.getIteratorClass(), SortedKeyValueIterator.class.getName())) - throw new AccumuloException("Servers are unable to load " + iter.getIteratorClass() + " as a " + SortedKeyValueIterator.class.getName()); - } - } - } - } - } catch (AccumuloException e) { - throw new IOException(e); - } catch (AccumuloSecurityException e) { - throw new IOException(e); - } catch (TableNotFoundException e) { - throw new IOException(e); - } - } - - /** - * Returns the {@link org.apache.accumulo.core.client.mapreduce.InputTableConfig} for the configuration based on the properties set using the single-table - * input methods. - * - * @param implementingClass - * the class whose name will be used as a prefix for the property configuration key - * @param conf - * the Hadoop instance for which to retrieve the configuration - * @return the config object built from the single input table properties set on the job - * @since 1.6.0 - */ - protected static Map.Entry getDefaultInputTableConfig(Class implementingClass, Configuration conf) { - String tableName = getInputTableName(implementingClass, conf); - if (tableName != null) { - InputTableConfig queryConfig = new InputTableConfig(); - List itrs = getIterators(implementingClass, conf); - if (itrs != null) - queryConfig.setIterators(itrs); - Set> columns = getFetchedColumns(implementingClass, conf); - if (columns != null) - queryConfig.fetchColumns(columns); - List ranges = null; - try { - ranges = getRanges(implementingClass, conf); - } catch (IOException e) { - throw new RuntimeException(e); - } - if (ranges != null) - queryConfig.setRanges(ranges); - - queryConfig.setAutoAdjustRanges(getAutoAdjustRanges(implementingClass, conf)).setUseIsolatedScanners(isIsolated(implementingClass, conf)) - .setUseLocalIterators(usesLocalIterators(implementingClass, conf)).setOfflineScan(isOfflineScan(implementingClass, conf)); - return Maps.immutableEntry(tableName, queryConfig); - } - return null; - } - - public static Map>> binOffline(String tableId, List ranges, Instance instance, Connector conn) - throws AccumuloException, TableNotFoundException { - Map>> binnedRanges = new HashMap>>(); - - if (Tables.getTableState(instance, tableId) != TableState.OFFLINE) { - Tables.clearCache(instance); - if (Tables.getTableState(instance, tableId) != TableState.OFFLINE) { - throw new AccumuloException("Table is online tableId:" + tableId + " cannot scan table in offline mode "); - } - } - - for (Range range : ranges) { - Text startRow; - - if (range.getStartKey() != null) - startRow = range.getStartKey().getRow(); - else - startRow = new Text(); - - Range metadataRange = new Range(new KeyExtent(new Text(tableId), startRow, null).getMetadataEntry(), true, null, false); - Scanner scanner = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY); - MetadataSchema.TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN.fetch(scanner); - scanner.fetchColumnFamily(MetadataSchema.TabletsSection.LastLocationColumnFamily.NAME); - scanner.fetchColumnFamily(MetadataSchema.TabletsSection.CurrentLocationColumnFamily.NAME); - scanner.fetchColumnFamily(MetadataSchema.TabletsSection.FutureLocationColumnFamily.NAME); - scanner.setRange(metadataRange); - - RowIterator rowIter = new RowIterator(scanner); - KeyExtent lastExtent = null; - while (rowIter.hasNext()) { - Iterator> row = rowIter.next(); - String last = ""; - KeyExtent extent = null; - String location = null; - - while (row.hasNext()) { - Map.Entry entry = row.next(); - Key key = entry.getKey(); - - if (key.getColumnFamily().equals(MetadataSchema.TabletsSection.LastLocationColumnFamily.NAME)) { - last = entry.getValue().toString(); - } - - if (key.getColumnFamily().equals(MetadataSchema.TabletsSection.CurrentLocationColumnFamily.NAME) - || key.getColumnFamily().equals(MetadataSchema.TabletsSection.FutureLocationColumnFamily.NAME)) { - location = entry.getValue().toString(); - } - - if (MetadataSchema.TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN.hasColumns(key)) { - extent = new KeyExtent(key.getRow(), entry.getValue()); - } - - } - - if (location != null) - return null; - - if (!extent.getTableId().toString().equals(tableId)) { - throw new AccumuloException("Saw unexpected table Id " + tableId + " " + extent); - } - - if (lastExtent != null && !extent.isPreviousExtent(lastExtent)) { - throw new AccumuloException(" " + lastExtent + " is not previous extent " + extent); - } - - Map> tabletRanges = binnedRanges.get(last); - if (tabletRanges == null) { - tabletRanges = new HashMap>(); - binnedRanges.put(last, tabletRanges); - } - - List rangeList = tabletRanges.get(extent); - if (rangeList == null) { - rangeList = new ArrayList(); - tabletRanges.put(extent, rangeList); - } - - rangeList.add(range); - - if (extent.getEndRow() == null || range.afterEndKey(new Key(extent.getEndRow()).followingKey(PartialKey.ROW))) { - break; - } - - lastExtent = extent; - } - - } - return binnedRanges; - } -} http://git-wip-us.apache.org/repos/asf/accumulo/blob/99baad37/core/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/impl/OutputConfigurator.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/impl/OutputConfigurator.java b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/impl/OutputConfigurator.java deleted file mode 100644 index 727971a..0000000 --- a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/impl/OutputConfigurator.java +++ /dev/null @@ -1,204 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.accumulo.core.client.mapreduce.lib.impl; - -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.DataInputStream; -import java.io.DataOutputStream; -import java.io.IOException; -import java.nio.charset.StandardCharsets; - -import org.apache.accumulo.core.client.BatchWriter; -import org.apache.accumulo.core.client.BatchWriterConfig; -import org.apache.hadoop.conf.Configuration; - -/** - * @since 1.6.0 - */ -public class OutputConfigurator extends ConfiguratorBase { - - /** - * Configuration keys for {@link BatchWriter}. - * - * @since 1.6.0 - */ - public static enum WriteOpts { - DEFAULT_TABLE_NAME, BATCH_WRITER_CONFIG - } - - /** - * Configuration keys for various features. - * - * @since 1.6.0 - */ - public static enum Features { - CAN_CREATE_TABLES, SIMULATION_MODE - } - - /** - * Sets the default table name to use if one emits a null in place of a table name for a given mutation. Table names can only be alpha-numeric and - * underscores. - * - * @param implementingClass - * the class whose name will be used as a prefix for the property configuration key - * @param conf - * the Hadoop configuration object to configure - * @param tableName - * the table to use when the tablename is null in the write call - * @since 1.6.0 - */ - public static void setDefaultTableName(Class implementingClass, Configuration conf, String tableName) { - if (tableName != null) - conf.set(enumToConfKey(implementingClass, WriteOpts.DEFAULT_TABLE_NAME), tableName); - } - - /** - * Gets the default table name from the configuration. - * - * @param implementingClass - * the class whose name will be used as a prefix for the property configuration key - * @param conf - * the Hadoop configuration object to configure - * @return the default table name - * @since 1.6.0 - * @see #setDefaultTableName(Class, Configuration, String) - */ - public static String getDefaultTableName(Class implementingClass, Configuration conf) { - return conf.get(enumToConfKey(implementingClass, WriteOpts.DEFAULT_TABLE_NAME)); - } - - /** - * Sets the configuration for for the job's {@link BatchWriter} instances. If not set, a new {@link BatchWriterConfig}, with sensible built-in defaults is - * used. Setting the configuration multiple times overwrites any previous configuration. - * - * @param implementingClass - * the class whose name will be used as a prefix for the property configuration key - * @param conf - * the Hadoop configuration object to configure - * @param bwConfig - * the configuration for the {@link BatchWriter} - * @since 1.6.0 - */ - public static void setBatchWriterOptions(Class implementingClass, Configuration conf, BatchWriterConfig bwConfig) { - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - String serialized; - try { - bwConfig.write(new DataOutputStream(baos)); - serialized = new String(baos.toByteArray(), StandardCharsets.UTF_8); - baos.close(); - } catch (IOException e) { - throw new IllegalArgumentException("unable to serialize " + BatchWriterConfig.class.getName()); - } - conf.set(enumToConfKey(implementingClass, WriteOpts.BATCH_WRITER_CONFIG), serialized); - } - - /** - * Gets the {@link BatchWriterConfig} settings. - * - * @param implementingClass - * the class whose name will be used as a prefix for the property configuration key - * @param conf - * the Hadoop configuration object to configure - * @return the configuration object - * @since 1.6.0 - * @see #setBatchWriterOptions(Class, Configuration, BatchWriterConfig) - */ - public static BatchWriterConfig getBatchWriterOptions(Class implementingClass, Configuration conf) { - String serialized = conf.get(enumToConfKey(implementingClass, WriteOpts.BATCH_WRITER_CONFIG)); - BatchWriterConfig bwConfig = new BatchWriterConfig(); - if (serialized == null || serialized.isEmpty()) { - return bwConfig; - } else { - try { - ByteArrayInputStream bais = new ByteArrayInputStream(serialized.getBytes(StandardCharsets.UTF_8)); - bwConfig.readFields(new DataInputStream(bais)); - bais.close(); - return bwConfig; - } catch (IOException e) { - throw new IllegalArgumentException("unable to serialize " + BatchWriterConfig.class.getName()); - } - } - } - - /** - * Sets the directive to create new tables, as necessary. Table names can only be alpha-numeric and underscores. - * - *

- * By default, this feature is disabled. - * - * @param implementingClass - * the class whose name will be used as a prefix for the property configuration key - * @param conf - * the Hadoop configuration object to configure - * @param enableFeature - * the feature is enabled if true, disabled otherwise - * @since 1.6.0 - */ - public static void setCreateTables(Class implementingClass, Configuration conf, boolean enableFeature) { - conf.setBoolean(enumToConfKey(implementingClass, Features.CAN_CREATE_TABLES), enableFeature); - } - - /** - * Determines whether tables are permitted to be created as needed. - * - * @param implementingClass - * the class whose name will be used as a prefix for the property configuration key - * @param conf - * the Hadoop configuration object to configure - * @return true if the feature is disabled, false otherwise - * @since 1.6.0 - * @see #setCreateTables(Class, Configuration, boolean) - */ - public static Boolean canCreateTables(Class implementingClass, Configuration conf) { - return conf.getBoolean(enumToConfKey(implementingClass, Features.CAN_CREATE_TABLES), false); - } - - /** - * Sets the directive to use simulation mode for this job. In simulation mode, no output is produced. This is useful for testing. - * - *

- * By default, this feature is disabled. - * - * @param implementingClass - * the class whose name will be used as a prefix for the property configuration key - * @param conf - * the Hadoop configuration object to configure - * @param enableFeature - * the feature is enabled if true, disabled otherwise - * @since 1.6.0 - */ - public static void setSimulationMode(Class implementingClass, Configuration conf, boolean enableFeature) { - conf.setBoolean(enumToConfKey(implementingClass, Features.SIMULATION_MODE), enableFeature); - } - - /** - * Determines whether this feature is enabled. - * - * @param implementingClass - * the class whose name will be used as a prefix for the property configuration key - * @param conf - * the Hadoop configuration object to configure - * @return true if the feature is enabled, false otherwise - * @since 1.6.0 - * @see #setSimulationMode(Class, Configuration, boolean) - */ - public static Boolean getSimulationMode(Class implementingClass, Configuration conf) { - return conf.getBoolean(enumToConfKey(implementingClass, Features.SIMULATION_MODE), false); - } - -} http://git-wip-us.apache.org/repos/asf/accumulo/blob/99baad37/core/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/impl/package-info.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/impl/package-info.java b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/impl/package-info.java deleted file mode 100644 index 243160d..0000000 --- a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/impl/package-info.java +++ /dev/null @@ -1,34 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -/** - * This package exists to store common helpers for configuring MapReduce jobs in a single location. It contains static configurator methods, stored in classes - * separate from the things they configure (typically, {@link org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat}/ - * {@link org.apache.accumulo.core.client.mapreduce.AccumuloOutputFormat} and related classes in compatible frameworks), rather than storing them in those - * InputFormats/OutputFormats, so as not to clutter their API with methods that don't match the conventions for that framework. These classes may be useful to - * input/output plugins for other frameworks, so they can reuse the same configuration options and/or serialize them into a - * {@link org.apache.hadoop.conf.Configuration} instance in a standard way. - * - *

- * It is not expected these will change much (except when new features are added), but end users should not use these classes. They should use the static - * configurators on the {@link org.apache.hadoop.mapreduce.InputFormat} or {@link org.apache.hadoop.mapreduce.OutputFormat} they are configuring, which in turn - * may use these classes to implement their own static configurators. Once again, these classes are intended for internal use, but may be useful to developers - * of plugins for other frameworks that read/write to Accumulo. - * - * @since 1.6.0 - */ -package org.apache.accumulo.core.client.mapreduce.lib.impl; - http://git-wip-us.apache.org/repos/asf/accumulo/blob/99baad37/core/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/partition/KeyRangePartitioner.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/partition/KeyRangePartitioner.java b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/partition/KeyRangePartitioner.java deleted file mode 100644 index c59841d..0000000 --- a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/partition/KeyRangePartitioner.java +++ /dev/null @@ -1,60 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.accumulo.core.client.mapreduce.lib.partition; - -import org.apache.accumulo.core.data.Key; -import org.apache.hadoop.conf.Configurable; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.Partitioner; - -/** - * Hadoop partitioner that uses ranges based on row keys, and optionally sub-bins based on hashing. - */ -public class KeyRangePartitioner extends Partitioner implements Configurable { - private RangePartitioner rp = new RangePartitioner(); - - @Override - public int getPartition(Key key, Writable value, int numPartitions) { - return rp.getPartition(key.getRow(), value, numPartitions); - } - - @Override - public Configuration getConf() { - return rp.getConf(); - } - - @Override - public void setConf(Configuration conf) { - rp.setConf(conf); - } - - /** - * Sets the hdfs file name to use, containing a newline separated list of Base64 encoded split points that represent ranges for partitioning - */ - public static void setSplitFile(Job job, String file) { - RangePartitioner.setSplitFile(job, file); - } - - /** - * Sets the number of random sub-bins per range - */ - public static void setNumSubBins(Job job, int num) { - RangePartitioner.setNumSubBins(job, num); - } -} http://git-wip-us.apache.org/repos/asf/accumulo/blob/99baad37/core/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/partition/RangePartitioner.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/partition/RangePartitioner.java b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/partition/RangePartitioner.java deleted file mode 100644 index 1b7501c..0000000 --- a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/partition/RangePartitioner.java +++ /dev/null @@ -1,135 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.accumulo.core.client.mapreduce.lib.partition; - -import java.io.BufferedReader; -import java.io.FileInputStream; -import java.io.FileNotFoundException; -import java.io.IOException; -import java.io.InputStreamReader; -import java.net.URI; -import java.nio.charset.StandardCharsets; -import java.util.Arrays; -import java.util.Scanner; -import java.util.TreeSet; - -import org.apache.accumulo.core.client.mapreduce.lib.impl.DistributedCacheHelper; -import org.apache.commons.codec.binary.Base64; -import org.apache.hadoop.conf.Configurable; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.Partitioner; - -/** - * Hadoop partitioner that uses ranges, and optionally sub-bins based on hashing. - */ -public class RangePartitioner extends Partitioner implements Configurable { - private static final String PREFIX = RangePartitioner.class.getName(); - private static final String CUTFILE_KEY = PREFIX + ".cutFile"; - private static final String NUM_SUBBINS = PREFIX + ".subBins"; - - private Configuration conf; - - @Override - public int getPartition(Text key, Writable value, int numPartitions) { - try { - return findPartition(key, getCutPoints(), getNumSubBins()); - } catch (IOException e) { - throw new RuntimeException(e); - } - } - - int findPartition(Text key, Text[] array, int numSubBins) { - // find the bin for the range, and guarantee it is positive - int index = Arrays.binarySearch(array, key); - index = index < 0 ? (index + 1) * -1 : index; - - // both conditions work with numSubBins == 1, but this check is to avoid - // hashing, when we don't need to, for speed - if (numSubBins < 2) - return index; - return (key.toString().hashCode() & Integer.MAX_VALUE) % numSubBins + index * numSubBins; - } - - private int _numSubBins = 0; - - private synchronized int getNumSubBins() { - if (_numSubBins < 1) { - // get number of sub-bins and guarantee it is positive - _numSubBins = Math.max(1, getConf().getInt(NUM_SUBBINS, 1)); - } - return _numSubBins; - } - - private Text cutPointArray[] = null; - - private synchronized Text[] getCutPoints() throws IOException { - if (cutPointArray == null) { - String cutFileName = conf.get(CUTFILE_KEY); - Path[] cf = DistributedCacheHelper.getLocalCacheFiles(conf); - - if (cf != null) { - for (Path path : cf) { - if (path.toUri().getPath().endsWith(cutFileName.substring(cutFileName.lastIndexOf('/')))) { - TreeSet cutPoints = new TreeSet(); - Scanner in = new Scanner(new BufferedReader(new InputStreamReader(new FileInputStream(path.toString()), StandardCharsets.UTF_8))); - try { - while (in.hasNextLine()) - cutPoints.add(new Text(Base64.decodeBase64(in.nextLine().getBytes(StandardCharsets.UTF_8)))); - } finally { - in.close(); - } - cutPointArray = cutPoints.toArray(new Text[cutPoints.size()]); - break; - } - } - } - if (cutPointArray == null) - throw new FileNotFoundException(cutFileName + " not found in distributed cache"); - } - return cutPointArray; - } - - @Override - public Configuration getConf() { - return conf; - } - - @Override - public void setConf(Configuration conf) { - this.conf = conf; - } - - /** - * Sets the hdfs file name to use, containing a newline separated list of Base64 encoded split points that represent ranges for partitioning - */ - public static void setSplitFile(Job job, String file) { - URI uri = new Path(file).toUri(); - DistributedCacheHelper.addCacheFile(uri, job.getConfiguration()); - job.getConfiguration().set(CUTFILE_KEY, uri.getPath()); - } - - /** - * Sets the number of random sub-bins per range - */ - public static void setNumSubBins(Job job, int num) { - job.getConfiguration().setInt(NUM_SUBBINS, num); - } -} http://git-wip-us.apache.org/repos/asf/accumulo/blob/99baad37/core/src/test/java/org/apache/accumulo/core/client/mapred/AccumuloFileOutputFormatTest.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/accumulo/core/client/mapred/AccumuloFileOutputFormatTest.java b/core/src/test/java/org/apache/accumulo/core/client/mapred/AccumuloFileOutputFormatTest.java deleted file mode 100644 index aad544b..0000000 --- a/core/src/test/java/org/apache/accumulo/core/client/mapred/AccumuloFileOutputFormatTest.java +++ /dev/null @@ -1,247 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.accumulo.core.client.mapred; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; - -import java.io.File; -import java.io.FileFilter; -import java.io.IOException; - -import org.apache.accumulo.core.client.BatchWriter; -import org.apache.accumulo.core.client.BatchWriterConfig; -import org.apache.accumulo.core.client.Connector; -import org.apache.accumulo.core.client.mock.MockInstance; -import org.apache.accumulo.core.client.security.tokens.PasswordToken; -import org.apache.accumulo.core.conf.AccumuloConfiguration; -import org.apache.accumulo.core.conf.Property; -import org.apache.accumulo.core.data.Key; -import org.apache.accumulo.core.data.Mutation; -import org.apache.accumulo.core.data.Value; -import org.apache.accumulo.core.util.CachedConfiguration; -import org.apache.hadoop.conf.Configured; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.mapred.JobClient; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.Mapper; -import org.apache.hadoop.mapred.OutputCollector; -import org.apache.hadoop.mapred.Reporter; -import org.apache.hadoop.mapred.lib.IdentityMapper; -import org.apache.hadoop.util.Tool; -import org.apache.hadoop.util.ToolRunner; -import org.apache.log4j.Logger; -import org.junit.BeforeClass; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.TemporaryFolder; - -public class AccumuloFileOutputFormatTest { - private static final String PREFIX = AccumuloFileOutputFormatTest.class.getSimpleName(); - private static final String INSTANCE_NAME = PREFIX + "_mapred_instance"; - private static final String BAD_TABLE = PREFIX + "_mapred_bad_table"; - private static final String TEST_TABLE = PREFIX + "_mapred_test_table"; - private static final String EMPTY_TABLE = PREFIX + "_mapred_empty_table"; - - private static AssertionError e1 = null; - private static AssertionError e2 = null; - - @Rule - public TemporaryFolder folder = new TemporaryFolder(new File(System.getProperty("user.dir") + "/target")); - - @BeforeClass - public static void setup() throws Exception { - MockInstance mockInstance = new MockInstance(INSTANCE_NAME); - Connector c = mockInstance.getConnector("root", new PasswordToken("")); - c.tableOperations().create(EMPTY_TABLE); - c.tableOperations().create(TEST_TABLE); - c.tableOperations().create(BAD_TABLE); - BatchWriter bw = c.createBatchWriter(TEST_TABLE, new BatchWriterConfig()); - Mutation m = new Mutation("Key"); - m.put("", "", ""); - bw.addMutation(m); - bw.close(); - bw = c.createBatchWriter(BAD_TABLE, new BatchWriterConfig()); - m = new Mutation("r1"); - m.put("cf1", "cq1", "A&B"); - m.put("cf1", "cq1", "A&B"); - m.put("cf1", "cq2", "A&"); - bw.addMutation(m); - bw.close(); - } - - @Test - public void testEmptyWrite() throws Exception { - handleWriteTests(false); - } - - @Test - public void testRealWrite() throws Exception { - handleWriteTests(true); - } - - private static class MRTester extends Configured implements Tool { - private static class BadKeyMapper implements Mapper { - - int index = 0; - - @Override - public void map(Key key, Value value, OutputCollector output, Reporter reporter) throws IOException { - try { - try { - output.collect(key, value); - if (index == 2) - fail(); - } catch (Exception e) { - Logger.getLogger(this.getClass()).error(e, e); - assertEquals(2, index); - } - } catch (AssertionError e) { - e1 = e; - } - index++; - } - - @Override - public void configure(JobConf job) {} - - @Override - public void close() throws IOException { - try { - assertEquals(2, index); - } catch (AssertionError e) { - e2 = e; - } - } - - } - - @Override - public int run(String[] args) throws Exception { - - if (args.length != 4) { - throw new IllegalArgumentException("Usage : " + MRTester.class.getName() + " "); - } - - String user = args[0]; - String pass = args[1]; - String table = args[2]; - - JobConf job = new JobConf(getConf()); - job.setJarByClass(this.getClass()); - - job.setInputFormat(AccumuloInputFormat.class); - - AccumuloInputFormat.setConnectorInfo(job, user, new PasswordToken(pass)); - AccumuloInputFormat.setInputTableName(job, table); - AccumuloInputFormat.setMockInstance(job, INSTANCE_NAME); - AccumuloFileOutputFormat.setOutputPath(job, new Path(args[3])); - - job.setMapperClass(BAD_TABLE.equals(table) ? BadKeyMapper.class : IdentityMapper.class); - job.setMapOutputKeyClass(Key.class); - job.setMapOutputValueClass(Value.class); - job.setOutputFormat(AccumuloFileOutputFormat.class); - - job.setNumReduceTasks(0); - - return JobClient.runJob(job).isSuccessful() ? 0 : 1; - } - - public static void main(String[] args) throws Exception { - assertEquals(0, ToolRunner.run(CachedConfiguration.getInstance(), new MRTester(), args)); - } - } - - public void handleWriteTests(boolean content) throws Exception { - File f = folder.newFile("handleWriteTests"); - f.delete(); - MRTester.main(new String[] {"root", "", content ? TEST_TABLE : EMPTY_TABLE, f.getAbsolutePath()}); - - assertTrue(f.exists()); - File[] files = f.listFiles(new FileFilter() { - @Override - public boolean accept(File file) { - return file.getName().startsWith("part-m-"); - } - }); - if (content) { - assertEquals(1, files.length); - assertTrue(files[0].exists()); - } else { - assertEquals(0, files.length); - } - } - - @Test - public void writeBadVisibility() throws Exception { - File f = folder.newFile("writeBadVisibility"); - f.delete(); - MRTester.main(new String[] {"root", "", BAD_TABLE, f.getAbsolutePath()}); - Logger.getLogger(this.getClass()).error(e1, e1); - assertNull(e1); - assertNull(e2); - } - - @Test - public void validateConfiguration() throws IOException, InterruptedException { - - int a = 7; - long b = 300l; - long c = 50l; - long d = 10l; - String e = "snappy"; - - JobConf job = new JobConf(); - AccumuloFileOutputFormat.setReplication(job, a); - AccumuloFileOutputFormat.setFileBlockSize(job, b); - AccumuloFileOutputFormat.setDataBlockSize(job, c); - AccumuloFileOutputFormat.setIndexBlockSize(job, d); - AccumuloFileOutputFormat.setCompressionType(job, e); - - AccumuloConfiguration acuconf = AccumuloFileOutputFormat.getAccumuloConfiguration(job); - - assertEquals(7, acuconf.getCount(Property.TABLE_FILE_REPLICATION)); - assertEquals(300l, acuconf.getMemoryInBytes(Property.TABLE_FILE_BLOCK_SIZE)); - assertEquals(50l, acuconf.getMemoryInBytes(Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE)); - assertEquals(10l, acuconf.getMemoryInBytes(Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE_INDEX)); - assertEquals("snappy", acuconf.get(Property.TABLE_FILE_COMPRESSION_TYPE)); - - a = 17; - b = 1300l; - c = 150l; - d = 110l; - e = "lzo"; - - job = new JobConf(); - AccumuloFileOutputFormat.setReplication(job, a); - AccumuloFileOutputFormat.setFileBlockSize(job, b); - AccumuloFileOutputFormat.setDataBlockSize(job, c); - AccumuloFileOutputFormat.setIndexBlockSize(job, d); - AccumuloFileOutputFormat.setCompressionType(job, e); - - acuconf = AccumuloFileOutputFormat.getAccumuloConfiguration(job); - - assertEquals(17, acuconf.getCount(Property.TABLE_FILE_REPLICATION)); - assertEquals(1300l, acuconf.getMemoryInBytes(Property.TABLE_FILE_BLOCK_SIZE)); - assertEquals(150l, acuconf.getMemoryInBytes(Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE)); - assertEquals(110l, acuconf.getMemoryInBytes(Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE_INDEX)); - assertEquals("lzo", acuconf.get(Property.TABLE_FILE_COMPRESSION_TYPE)); - - } -} http://git-wip-us.apache.org/repos/asf/accumulo/blob/99baad37/core/src/test/java/org/apache/accumulo/core/client/mapred/AccumuloInputFormatTest.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/accumulo/core/client/mapred/AccumuloInputFormatTest.java b/core/src/test/java/org/apache/accumulo/core/client/mapred/AccumuloInputFormatTest.java deleted file mode 100644 index 13490e0..0000000 --- a/core/src/test/java/org/apache/accumulo/core/client/mapred/AccumuloInputFormatTest.java +++ /dev/null @@ -1,285 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.accumulo.core.client.mapred; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; - -import java.io.ByteArrayOutputStream; -import java.io.DataOutputStream; -import java.io.IOException; -import java.util.List; - -import org.apache.accumulo.core.client.BatchWriter; -import org.apache.accumulo.core.client.BatchWriterConfig; -import org.apache.accumulo.core.client.Connector; -import org.apache.accumulo.core.client.IteratorSetting; -import org.apache.accumulo.core.client.mock.MockInstance; -import org.apache.accumulo.core.client.security.tokens.PasswordToken; -import org.apache.accumulo.core.data.Key; -import org.apache.accumulo.core.data.Mutation; -import org.apache.accumulo.core.data.Value; -import org.apache.accumulo.core.iterators.user.RegExFilter; -import org.apache.accumulo.core.iterators.user.WholeRowIterator; -import org.apache.commons.codec.binary.Base64; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.conf.Configured; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapred.JobClient; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.Mapper; -import org.apache.hadoop.mapred.OutputCollector; -import org.apache.hadoop.mapred.Reporter; -import org.apache.hadoop.mapred.lib.NullOutputFormat; -import org.apache.hadoop.util.Tool; -import org.apache.hadoop.util.ToolRunner; -import org.junit.Before; -import org.junit.BeforeClass; -import org.junit.Test; - -public class AccumuloInputFormatTest { - - private static final String PREFIX = AccumuloInputFormatTest.class.getSimpleName(); - private static final String INSTANCE_NAME = PREFIX + "_mapred_instance"; - private static final String TEST_TABLE_1 = PREFIX + "_mapred_table_1"; - - private JobConf job; - - @BeforeClass - public static void setupClass() { - System.setProperty("hadoop.tmp.dir", System.getProperty("user.dir") + "/target/hadoop-tmp"); - } - - @Before - public void createJob() { - job = new JobConf(); - } - - /** - * Check that the iterator configuration is getting stored in the Job conf correctly. - */ - @Test - public void testSetIterator() throws IOException { - IteratorSetting is = new IteratorSetting(1, "WholeRow", "org.apache.accumulo.core.iterators.WholeRowIterator"); - AccumuloInputFormat.addIterator(job, is); - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - is.write(new DataOutputStream(baos)); - String iterators = job.get("AccumuloInputFormat.ScanOpts.Iterators"); - assertEquals(new String(Base64.encodeBase64(baos.toByteArray())), iterators); - } - - @Test - public void testAddIterator() throws IOException { - AccumuloInputFormat.addIterator(job, new IteratorSetting(1, "WholeRow", WholeRowIterator.class)); - AccumuloInputFormat.addIterator(job, new IteratorSetting(2, "Versions", "org.apache.accumulo.core.iterators.VersioningIterator")); - IteratorSetting iter = new IteratorSetting(3, "Count", "org.apache.accumulo.core.iterators.CountingIterator"); - iter.addOption("v1", "1"); - iter.addOption("junk", "\0omg:!\\xyzzy"); - AccumuloInputFormat.addIterator(job, iter); - - List list = AccumuloInputFormat.getIterators(job); - - // Check the list size - assertTrue(list.size() == 3); - - // Walk the list and make sure our settings are correct - IteratorSetting setting = list.get(0); - assertEquals(1, setting.getPriority()); - assertEquals("org.apache.accumulo.core.iterators.user.WholeRowIterator", setting.getIteratorClass()); - assertEquals("WholeRow", setting.getName()); - assertEquals(0, setting.getOptions().size()); - - setting = list.get(1); - assertEquals(2, setting.getPriority()); - assertEquals("org.apache.accumulo.core.iterators.VersioningIterator", setting.getIteratorClass()); - assertEquals("Versions", setting.getName()); - assertEquals(0, setting.getOptions().size()); - - setting = list.get(2); - assertEquals(3, setting.getPriority()); - assertEquals("org.apache.accumulo.core.iterators.CountingIterator", setting.getIteratorClass()); - assertEquals("Count", setting.getName()); - assertEquals(2, setting.getOptions().size()); - assertEquals("1", setting.getOptions().get("v1")); - assertEquals("\0omg:!\\xyzzy", setting.getOptions().get("junk")); - } - - /** - * Test adding iterator options where the keys and values contain both the FIELD_SEPARATOR character (':') and ITERATOR_SEPARATOR (',') characters. There - * should be no exceptions thrown when trying to parse these types of option entries. - * - * This test makes sure that the expected raw values, as appears in the Job, are equal to what's expected. - */ - @Test - public void testIteratorOptionEncoding() throws Throwable { - String key = "colon:delimited:key"; - String value = "comma,delimited,value"; - IteratorSetting someSetting = new IteratorSetting(1, "iterator", "Iterator.class"); - someSetting.addOption(key, value); - AccumuloInputFormat.addIterator(job, someSetting); - - List list = AccumuloInputFormat.getIterators(job); - assertEquals(1, list.size()); - assertEquals(1, list.get(0).getOptions().size()); - assertEquals(list.get(0).getOptions().get(key), value); - - someSetting.addOption(key + "2", value); - someSetting.setPriority(2); - someSetting.setName("it2"); - AccumuloInputFormat.addIterator(job, someSetting); - list = AccumuloInputFormat.getIterators(job); - assertEquals(2, list.size()); - assertEquals(1, list.get(0).getOptions().size()); - assertEquals(list.get(0).getOptions().get(key), value); - assertEquals(2, list.get(1).getOptions().size()); - assertEquals(list.get(1).getOptions().get(key), value); - assertEquals(list.get(1).getOptions().get(key + "2"), value); - } - - /** - * Test getting iterator settings for multiple iterators set - */ - @Test - public void testGetIteratorSettings() throws IOException { - AccumuloInputFormat.addIterator(job, new IteratorSetting(1, "WholeRow", "org.apache.accumulo.core.iterators.WholeRowIterator")); - AccumuloInputFormat.addIterator(job, new IteratorSetting(2, "Versions", "org.apache.accumulo.core.iterators.VersioningIterator")); - AccumuloInputFormat.addIterator(job, new IteratorSetting(3, "Count", "org.apache.accumulo.core.iterators.CountingIterator")); - - List list = AccumuloInputFormat.getIterators(job); - - // Check the list size - assertTrue(list.size() == 3); - - // Walk the list and make sure our settings are correct - IteratorSetting setting = list.get(0); - assertEquals(1, setting.getPriority()); - assertEquals("org.apache.accumulo.core.iterators.WholeRowIterator", setting.getIteratorClass()); - assertEquals("WholeRow", setting.getName()); - - setting = list.get(1); - assertEquals(2, setting.getPriority()); - assertEquals("org.apache.accumulo.core.iterators.VersioningIterator", setting.getIteratorClass()); - assertEquals("Versions", setting.getName()); - - setting = list.get(2); - assertEquals(3, setting.getPriority()); - assertEquals("org.apache.accumulo.core.iterators.CountingIterator", setting.getIteratorClass()); - assertEquals("Count", setting.getName()); - - } - - @Test - public void testSetRegex() throws IOException { - String regex = ">\"*%<>\'\\"; - - IteratorSetting is = new IteratorSetting(50, regex, RegExFilter.class); - RegExFilter.setRegexs(is, regex, null, null, null, false); - AccumuloInputFormat.addIterator(job, is); - - assertTrue(regex.equals(AccumuloInputFormat.getIterators(job).get(0).getName())); - } - - private static AssertionError e1 = null; - private static AssertionError e2 = null; - - private static class MRTester extends Configured implements Tool { - private static class TestMapper implements Mapper { - Key key = null; - int count = 0; - - @Override - public void map(Key k, Value v, OutputCollector output, Reporter reporter) throws IOException { - try { - if (key != null) - assertEquals(key.getRow().toString(), new String(v.get())); - assertEquals(k.getRow(), new Text(String.format("%09x", count + 1))); - assertEquals(new String(v.get()), String.format("%09x", count)); - } catch (AssertionError e) { - e1 = e; - } - key = new Key(k); - count++; - } - - @Override - public void configure(JobConf job) {} - - @Override - public void close() throws IOException { - try { - assertEquals(100, count); - } catch (AssertionError e) { - e2 = e; - } - } - - } - - @Override - public int run(String[] args) throws Exception { - - if (args.length != 3) { - throw new IllegalArgumentException("Usage : " + MRTester.class.getName() + "
"); - } - - String user = args[0]; - String pass = args[1]; - String table = args[2]; - - JobConf job = new JobConf(getConf()); - job.setJarByClass(this.getClass()); - - job.setInputFormat(AccumuloInputFormat.class); - - AccumuloInputFormat.setConnectorInfo(job, user, new PasswordToken(pass)); - AccumuloInputFormat.setInputTableName(job, table); - AccumuloInputFormat.setMockInstance(job, INSTANCE_NAME); - - job.setMapperClass(TestMapper.class); - job.setMapOutputKeyClass(Key.class); - job.setMapOutputValueClass(Value.class); - job.setOutputFormat(NullOutputFormat.class); - - job.setNumReduceTasks(0); - - return JobClient.runJob(job).isSuccessful() ? 0 : 1; - } - - public static void main(String... args) throws Exception { - assertEquals(0, ToolRunner.run(new Configuration(), new MRTester(), args)); - } - } - - @Test - public void testMap() throws Exception { - MockInstance mockInstance = new MockInstance(INSTANCE_NAME); - Connector c = mockInstance.getConnector("root", new PasswordToken("")); - c.tableOperations().create(TEST_TABLE_1); - BatchWriter bw = c.createBatchWriter(TEST_TABLE_1, new BatchWriterConfig()); - for (int i = 0; i < 100; i++) { - Mutation m = new Mutation(new Text(String.format("%09x", i + 1))); - m.put(new Text(), new Text(), new Value(String.format("%09x", i).getBytes())); - bw.addMutation(m); - } - bw.close(); - - MRTester.main("root", "", TEST_TABLE_1); - assertNull(e1); - assertNull(e2); - } -} http://git-wip-us.apache.org/repos/asf/accumulo/blob/99baad37/core/src/test/java/org/apache/accumulo/core/client/mapred/AccumuloMultiTableInputFormatTest.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/accumulo/core/client/mapred/AccumuloMultiTableInputFormatTest.java b/core/src/test/java/org/apache/accumulo/core/client/mapred/AccumuloMultiTableInputFormatTest.java deleted file mode 100644 index 2864016..0000000 --- a/core/src/test/java/org/apache/accumulo/core/client/mapred/AccumuloMultiTableInputFormatTest.java +++ /dev/null @@ -1,188 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.accumulo.core.client.mapred; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNull; - -import java.io.IOException; -import java.util.Collections; -import java.util.HashMap; -import java.util.Map; - -import org.apache.accumulo.core.client.BatchWriter; -import org.apache.accumulo.core.client.BatchWriterConfig; -import org.apache.accumulo.core.client.Connector; -import org.apache.accumulo.core.client.IteratorSetting; -import org.apache.accumulo.core.client.mapreduce.InputTableConfig; -import org.apache.accumulo.core.client.mock.MockInstance; -import org.apache.accumulo.core.client.security.tokens.PasswordToken; -import org.apache.accumulo.core.data.Key; -import org.apache.accumulo.core.data.Mutation; -import org.apache.accumulo.core.data.Range; -import org.apache.accumulo.core.data.Value; -import org.apache.accumulo.core.util.CachedConfiguration; -import org.apache.accumulo.core.util.Pair; -import org.apache.hadoop.conf.Configured; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapred.JobClient; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.Mapper; -import org.apache.hadoop.mapred.OutputCollector; -import org.apache.hadoop.mapred.Reporter; -import org.apache.hadoop.mapred.lib.NullOutputFormat; -import org.apache.hadoop.util.Tool; -import org.apache.hadoop.util.ToolRunner; -import org.junit.Test; - -public class AccumuloMultiTableInputFormatTest { - - private static final String PREFIX = AccumuloMultiTableInputFormatTest.class.getSimpleName(); - private static final String INSTANCE_NAME = PREFIX + "_mapred_instance"; - private static final String TEST_TABLE_1 = PREFIX + "_mapred_table_1"; - private static final String TEST_TABLE_2 = PREFIX + "_mapred_table_2"; - - private static AssertionError e1 = null; - private static AssertionError e2 = null; - - private static class MRTester extends Configured implements Tool { - private static class TestMapper implements Mapper { - Key key = null; - int count = 0; - - @Override - public void map(Key k, Value v, OutputCollector output, Reporter reporter) throws IOException { - try { - String tableName = ((RangeInputSplit) reporter.getInputSplit()).getTableName(); - if (key != null) - assertEquals(key.getRow().toString(), new String(v.get())); - assertEquals(new Text(String.format("%s_%09x", tableName, count + 1)), k.getRow()); - assertEquals(String.format("%s_%09x", tableName, count), new String(v.get())); - } catch (AssertionError e) { - e1 = e; - } - key = new Key(k); - count++; - } - - @Override - public void configure(JobConf job) {} - - @Override - public void close() throws IOException { - try { - assertEquals(100, count); - } catch (AssertionError e) { - e2 = e; - } - } - - } - - @Override - public int run(String[] args) throws Exception { - - if (args.length != 4) { - throw new IllegalArgumentException("Usage : " + MRTester.class.getName() + " "); - } - - String user = args[0]; - String pass = args[1]; - String table1 = args[2]; - String table2 = args[3]; - - JobConf job = new JobConf(getConf()); - job.setJarByClass(this.getClass()); - - job.setInputFormat(AccumuloInputFormat.class); - - AccumuloMultiTableInputFormat.setConnectorInfo(job, user, new PasswordToken(pass)); - AccumuloMultiTableInputFormat.setMockInstance(job, INSTANCE_NAME); - - InputTableConfig tableConfig1 = new InputTableConfig(); - InputTableConfig tableConfig2 = new InputTableConfig(); - - Map configMap = new HashMap(); - configMap.put(table1, tableConfig1); - configMap.put(table2, tableConfig2); - - AccumuloMultiTableInputFormat.setInputTableConfigs(job, configMap); - - job.setMapperClass(TestMapper.class); - job.setMapOutputKeyClass(Key.class); - job.setMapOutputValueClass(Value.class); - job.setOutputFormat(NullOutputFormat.class); - - job.setNumReduceTasks(0); - - return JobClient.runJob(job).isSuccessful() ? 0 : 1; - } - - public static void main(String[] args) throws Exception { - assertEquals(0, ToolRunner.run(CachedConfiguration.getInstance(), new MRTester(), args)); - } - } - - @Test - public void testMap() throws Exception { - MockInstance mockInstance = new MockInstance(INSTANCE_NAME); - Connector c = mockInstance.getConnector("root", new PasswordToken("")); - c.tableOperations().create(TEST_TABLE_1); - c.tableOperations().create(TEST_TABLE_2); - BatchWriter bw = c.createBatchWriter(TEST_TABLE_1, new BatchWriterConfig()); - BatchWriter bw2 = c.createBatchWriter(TEST_TABLE_2, new BatchWriterConfig()); - for (int i = 0; i < 100; i++) { - Mutation t1m = new Mutation(new Text(String.format("%s_%09x", TEST_TABLE_1, i + 1))); - t1m.put(new Text(), new Text(), new Value(String.format("%s_%09x", TEST_TABLE_1, i).getBytes())); - bw.addMutation(t1m); - Mutation t2m = new Mutation(new Text(String.format("%s_%09x", TEST_TABLE_2, i + 1))); - t2m.put(new Text(), new Text(), new Value(String.format("%s_%09x", TEST_TABLE_2, i).getBytes())); - bw2.addMutation(t2m); - } - bw.close(); - bw2.close(); - - MRTester.main(new String[] {"root", "", TEST_TABLE_1, TEST_TABLE_2}); - assertNull(e1); - assertNull(e2); - } - - /** - * Verify {@link org.apache.accumulo.core.client.mapreduce.InputTableConfig} objects get correctly serialized in the JobContext. - */ - @Test - public void testTableQueryConfigSerialization() throws IOException { - - JobConf job = new JobConf(); - - InputTableConfig table1 = new InputTableConfig().setRanges(Collections.singletonList(new Range("a", "b"))) - .fetchColumns(Collections.singleton(new Pair(new Text("CF1"), new Text("CQ1")))) - .setIterators(Collections.singletonList(new IteratorSetting(50, "iter1", "iterclass1"))); - - InputTableConfig table2 = new InputTableConfig().setRanges(Collections.singletonList(new Range("a", "b"))) - .fetchColumns(Collections.singleton(new Pair(new Text("CF1"), new Text("CQ1")))) - .setIterators(Collections.singletonList(new IteratorSetting(50, "iter1", "iterclass1"))); - - Map configMap = new HashMap(); - configMap.put(TEST_TABLE_1, table1); - configMap.put(TEST_TABLE_2, table2); - AccumuloMultiTableInputFormat.setInputTableConfigs(job, configMap); - - assertEquals(table1, AccumuloMultiTableInputFormat.getInputTableConfig(job, TEST_TABLE_1)); - assertEquals(table2, AccumuloMultiTableInputFormat.getInputTableConfig(job, TEST_TABLE_2)); - } -}