Return-Path: X-Original-To: apmail-accumulo-commits-archive@www.apache.org Delivered-To: apmail-accumulo-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 8FA5010A36 for ; Wed, 16 Oct 2013 00:22:11 +0000 (UTC) Received: (qmail 40249 invoked by uid 500); 16 Oct 2013 00:22:11 -0000 Delivered-To: apmail-accumulo-commits-archive@accumulo.apache.org Received: (qmail 40157 invoked by uid 500); 16 Oct 2013 00:22:10 -0000 Mailing-List: contact commits-help@accumulo.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@accumulo.apache.org Delivered-To: mailing list commits@accumulo.apache.org Received: (qmail 40133 invoked by uid 99); 16 Oct 2013 00:22:10 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 16 Oct 2013 00:22:10 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 0A9A88B57EC; Wed, 16 Oct 2013 00:22:09 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: cjnolet@apache.org To: commits@accumulo.apache.org Date: Wed, 16 Oct 2013 00:22:10 -0000 Message-Id: <6d1b25f3930349ba88c666a6014d55b5@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [2/5] git commit: ACCUMULO-391 Adding AccumuloMultiTableInputFormat and tests. Reverting AccumuloInputFormatTest back to pre-multi-table version. ACCUMULO-391 Adding AccumuloMultiTableInputFormat and tests. Reverting AccumuloInputFormatTest back to pre-multi-table version. Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/5c496552 Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/5c496552 Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/5c496552 Branch: refs/heads/master Commit: 5c496552e024817bbd197514c3a37c84b6e28f50 Parents: 6114639 Author: Corey J. Nolet Authored: Sun Oct 13 00:14:38 2013 -0400 Committer: Corey J. Nolet Committed: Tue Oct 15 20:21:45 2013 -0400 ---------------------------------------------------------------------- .../client/mapreduce/AbstractInputFormat.java | 804 +++++++++++++++++++ .../AccumuloMultiTableInputFormat.java | 53 ++ .../core/client/mapreduce/InputFormatBase.java | 796 +----------------- .../mapreduce/AccumuloInputFormatTest.java | 89 +- .../AccumuloMultiTableInputFormatTest.java | 163 ++++ 5 files changed, 1035 insertions(+), 870 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/5c496552/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AbstractInputFormat.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AbstractInputFormat.java b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AbstractInputFormat.java new file mode 100644 index 0000000..6568f35 --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AbstractInputFormat.java @@ -0,0 +1,804 @@ +package org.apache.accumulo.core.client.mapreduce; + +import static com.google.common.base.Preconditions.checkNotNull; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.lang.reflect.Method; +import java.math.BigInteger; +import java.net.InetAddress; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; + +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.ClientSideIteratorScanner; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.Instance; +import org.apache.accumulo.core.client.IsolatedScanner; +import org.apache.accumulo.core.client.IteratorSetting; +import org.apache.accumulo.core.client.RowIterator; +import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.client.TableDeletedException; +import org.apache.accumulo.core.client.TableNotFoundException; +import org.apache.accumulo.core.client.TableOfflineException; +import org.apache.accumulo.core.client.impl.OfflineScanner; +import org.apache.accumulo.core.client.impl.Tables; +import org.apache.accumulo.core.client.impl.TabletLocator; +import org.apache.accumulo.core.client.mapreduce.lib.util.InputConfigurator; +import org.apache.accumulo.core.client.mock.MockInstance; +import org.apache.accumulo.core.client.security.tokens.AuthenticationToken; +import org.apache.accumulo.core.conf.TableQueryConfig; +import org.apache.accumulo.core.data.ByteSequence; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.KeyExtent; +import org.apache.accumulo.core.data.PartialKey; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.master.state.tables.TableState; +import org.apache.accumulo.core.metadata.MetadataTable; +import org.apache.accumulo.core.metadata.schema.MetadataSchema; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.core.security.Credentials; +import org.apache.accumulo.core.util.Pair; +import org.apache.accumulo.core.util.UtilWaitThread; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapreduce.InputFormat; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.log4j.Level; +import org.apache.log4j.Logger; + +public abstract class AbstractInputFormat extends InputFormat { + + protected static final Class CLASS = AccumuloInputFormat.class; + protected static final Logger log = Logger.getLogger(CLASS); + + /** + * Sets the connector information needed to communicate with Accumulo in this job. + * + *

+ * WARNING: The serialized token is stored in the configuration and shared with all MapReduce tasks. It is BASE64 encoded to provide a charset safe + * conversion to a string, and is not intended to be secure. + * + * @param job + * the Hadoop job instance to be configured + * @param principal + * a valid Accumulo user name (user must have Table.CREATE permission) + * @param token + * the user's password + * @throws org.apache.accumulo.core.client.AccumuloSecurityException + * @since 1.5.0 + */ + public static void setConnectorInfo(Job job, String principal, AuthenticationToken token) throws AccumuloSecurityException { + InputConfigurator.setConnectorInfo(CLASS, job.getConfiguration(), principal, token); + } + + /** + * Sets the connector information needed to communicate with Accumulo in this job. + * + *

+ * Stores the password in a file in HDFS and pulls that into the Distributed Cache in an attempt to be more secure than storing it in the Configuration. + * + * @param job + * the Hadoop job instance to be configured + * @param principal + * a valid Accumulo user name (user must have Table.CREATE permission) + * @param tokenFile + * the path to the token file + * @throws AccumuloSecurityException + * @since 1.6.0 + */ + public static void setConnectorInfo(Job job, String principal, String tokenFile) throws AccumuloSecurityException { + InputConfigurator.setConnectorInfo(CLASS, job.getConfiguration(), principal, tokenFile); + } + + /** + * Determines if the connector has been configured. + * + * @param context + * the Hadoop context for the configured job + * @return true if the connector has been configured, false otherwise + * @since 1.5.0 + * @see #setConnectorInfo(Job, String, AuthenticationToken) + */ + protected static Boolean isConnectorInfoSet(JobContext context) { + return InputConfigurator.isConnectorInfoSet(CLASS, getConfiguration(context)); + } + + /** + * Gets the user name from the configuration. + * + * @param context + * the Hadoop context for the configured job + * @return the user name + * @since 1.5.0 + * @see #setConnectorInfo(Job, String, AuthenticationToken) + */ + protected static String getPrincipal(JobContext context) { + return InputConfigurator.getPrincipal(CLASS, getConfiguration(context)); + } + + /** + * Gets the serialized token class from either the configuration or the token file. + * + * @since 1.5.0 + * @deprecated since 1.6.0; Use {@link #getAuthenticationToken(JobContext)} instead. + */ + @Deprecated + protected static String getTokenClass(JobContext context) { + return getAuthenticationToken(context).getClass().getName(); + } + + /** + * Gets the serialized token from either the configuration or the token file. + * + * @since 1.5.0 + * @deprecated since 1.6.0; Use {@link #getAuthenticationToken(JobContext)} instead. + */ + @Deprecated + protected static byte[] getToken(JobContext context) { + return AuthenticationToken.AuthenticationTokenSerializer.serialize(getAuthenticationToken(context)); + } + + /** + * Gets the authenticated token from either the specified token file or directly from the configuration, whichever was used when the job was configured. + * + * @param context + * the Hadoop context for the configured job + * @return the principal's authentication token + * @since 1.6.0 + * @see #setConnectorInfo(Job, String, AuthenticationToken) + * @see #setConnectorInfo(Job, String, String) + */ + protected static AuthenticationToken getAuthenticationToken(JobContext context) { + return InputConfigurator.getAuthenticationToken(CLASS, getConfiguration(context)); + } + + /** + * Configures a {@link org.apache.accumulo.core.client.ZooKeeperInstance} for this job. + * + * @param job + * the Hadoop job instance to be configured + * @param instanceName + * the Accumulo instance name + * @param zooKeepers + * a comma-separated list of zookeeper servers + * @since 1.5.0 + */ + public static void setZooKeeperInstance(Job job, String instanceName, String zooKeepers) { + InputConfigurator.setZooKeeperInstance(CLASS, job.getConfiguration(), instanceName, zooKeepers); + } + + /** + * Configures a {@link org.apache.accumulo.core.client.mock.MockInstance} for this job. + * + * @param job + * the Hadoop job instance to be configured + * @param instanceName + * the Accumulo instance name + * @since 1.5.0 + */ + public static void setMockInstance(Job job, String instanceName) { + InputConfigurator.setMockInstance(CLASS, job.getConfiguration(), instanceName); + } + + /** + * Initializes an Accumulo {@link org.apache.accumulo.core.client.Instance} based on the configuration. + * + * @param context + * the Hadoop context for the configured job + * @return an Accumulo instance + * @since 1.5.0 + * @see #setZooKeeperInstance(Job, String, String) + * @see #setMockInstance(Job, String) + */ + protected static Instance getInstance(JobContext context) { + return InputConfigurator.getInstance(CLASS, getConfiguration(context)); + } + + /** + * Sets the log level for this job. + * + * @param job + * the Hadoop job instance to be configured + * @param level + * the logging level + * @since 1.5.0 + */ + public static void setLogLevel(Job job, Level level) { + InputConfigurator.setLogLevel(CLASS, job.getConfiguration(), level); + } + + /** + * Gets the log level from this configuration. + * + * @param context + * the Hadoop context for the configured job + * @return the log level + * @since 1.5.0 + * @see #setLogLevel(Job, Level) + */ + protected static Level getLogLevel(JobContext context) { + return InputConfigurator.getLogLevel(CLASS, getConfiguration(context)); + } + + /** + * Sets the {@link org.apache.accumulo.core.security.Authorizations} used to scan. Must be a subset of the user's authorization. Defaults to the empty set. + * + * @param job + * the Hadoop job instance to be configured + * @param auths + * the user's authorizations + */ + public static void setScanAuthorizations(Job job, Authorizations auths) { + InputConfigurator.setScanAuthorizations(CLASS, job.getConfiguration(), auths); + } + + /** + * Gets the authorizations to set for the scans from the configuration. + * + * @param context + * the Hadoop context for the configured job + * @return the Accumulo scan authorizations + * @since 1.5.0 + * @see #setScanAuthorizations(Job, Authorizations) + */ + protected static Authorizations getScanAuthorizations(JobContext context) { + return InputConfigurator.getScanAuthorizations(CLASS, getConfiguration(context)); + } + + /** + * Fetches all {@link TableQueryConfig}s that have been set on the given Hadoop configuration. + * + * @param job + * the Hadoop job instance to be configured + * @return + * @since 1.6.0 + */ + protected static List getBatchScanConfigs(JobContext job) { + return InputConfigurator.getTableQueryConfigs(CLASS, getConfiguration(job)); + } + + /** + * Fetches a {@link TableQueryConfig} that has been set on the configuration for a specific table. + * + *

+ * null is returned in the event that the table doesn't exist. + * + * @param job + * the Hadoop job instance to be configured + * @param tableName + * the table name for which to grab the config object + * @return the {@link TableQueryConfig} for the given table + * @since 1.6.0 + */ + protected static TableQueryConfig getBatchScanConfig(JobContext job, String tableName) { + return InputConfigurator.getTableQueryConfig(CLASS, getConfiguration(job), tableName); + } + + /** + * Initializes an Accumulo {@link org.apache.accumulo.core.client.impl.TabletLocator} based on the configuration. + * + * @param context + * the Hadoop context for the configured job + * @param table + * the table for which to initialize the locator + * @return an Accumulo tablet locator + * @throws org.apache.accumulo.core.client.TableNotFoundException + * if the table name set on the configuration doesn't exist + * @since 1.6.0 + */ + protected static TabletLocator getTabletLocator(JobContext context, String table) throws TableNotFoundException { + return InputConfigurator.getTabletLocator(CLASS, getConfiguration(context), table); + } + + // InputFormat doesn't have the equivalent of OutputFormat's checkOutputSpecs(JobContext job) + /** + * Check whether a configuration is fully configured to be used with an Accumulo {@link org.apache.hadoop.mapreduce.InputFormat}. + * + * @param context + * the Hadoop context for the configured job + * @throws java.io.IOException + * if the context is improperly configured + * @since 1.5.0 + */ + protected static void validateOptions(JobContext context) throws IOException { + InputConfigurator.validateOptions(CLASS, getConfiguration(context)); + } + + /** + * An abstract base class to be used to create {@link org.apache.hadoop.mapreduce.RecordReader} instances that convert from Accumulo + * {@link org.apache.accumulo.core.data.Key}/{@link org.apache.accumulo.core.data.Value} pairs to the user's K/V types. + * + * Subclasses must implement {@link #nextKeyValue()} and use it to update the following variables: + *

    + *
  • K {@link #currentK}
  • + *
  • V {@link #currentV}
  • + *
  • Key {@link #currentKey} (used for progress reporting)
  • + *
  • int {@link #numKeysRead} (used for progress reporting)
  • + *
+ */ + protected abstract static class RecordReaderBase extends RecordReader { + protected long numKeysRead; + protected Iterator> scannerIterator; + protected RangeInputSplit split; + + /** + * Apply the configured iterators from the configuration to the scanner. This applies both the default iterators and the per-table iterators. + * + * @param context + * the Hadoop context for the configured job + * @param scanner + * the scanner to configure + * @param tableName + * the table name for which to set up the iterators + */ + protected void setupIterators(TaskAttemptContext context, Scanner scanner, String tableName) { + TableQueryConfig config = getBatchScanConfig(context, tableName); + List iterators = config.getIterators(); + for (IteratorSetting iterator : iterators) + scanner.addScanIterator(iterator); + } + + /** + * Initialize a scanner over the given input split using this task attempt configuration. + */ + @Override + public void initialize(InputSplit inSplit, TaskAttemptContext attempt) throws IOException { + + Scanner scanner; + split = (RangeInputSplit) inSplit; + log.debug("Initializing input split: " + split.getRange()); + Instance instance = getInstance(attempt); + String principal = getPrincipal(attempt); + + TableQueryConfig tableConfig = getBatchScanConfig(attempt, split.getTableName()); + + // in case the table name changed, we can still use the previous name for terms of configuration, + // but for the scanner, we'll need to reference the new table name. + String actualNameForId = split.getTableName(); + if (!(instance instanceof MockInstance)) { + try { + actualNameForId = Tables.getTableName(instance, split.getTableId()); + if (!actualNameForId.equals(split.getTableName())) + log.debug("Table name changed from " + split.getTableName() + " to " + actualNameForId); + } catch (TableNotFoundException e) { + throw new IOException("The specified table was not found for id=" + split.getTableId()); + } + } + + AuthenticationToken token = getAuthenticationToken(attempt); + Authorizations authorizations = getScanAuthorizations(attempt); + try { + log.debug("Creating connector with user: " + principal); + + Connector conn = instance.getConnector(principal, token); + log.debug("Creating scanner for table: " + split.getTableName()); + log.debug("Authorizations are: " + authorizations); + if (tableConfig.isOfflineScan()) { + scanner = new OfflineScanner(instance, new Credentials(principal, token), split.getTableId(), authorizations); + } else { + scanner = conn.createScanner(actualNameForId, authorizations); + } + if (tableConfig.shouldUseIsolatedScanners()) { + log.info("Creating isolated scanner"); + scanner = new IsolatedScanner(scanner); + } + if (tableConfig.shouldUseLocalIterators()) { + log.info("Using local iterators"); + scanner = new ClientSideIteratorScanner(scanner); + } + setupIterators(attempt, scanner, split.getTableName()); + } catch (Exception e) { + throw new IOException(e); + } + + // setup a scanner within the bounds of this split + for (Pair c : tableConfig.getFetchedColumns()) { + if (c.getSecond() != null) { + log.debug("Fetching column " + c.getFirst() + ":" + c.getSecond()); + scanner.fetchColumn(c.getFirst(), c.getSecond()); + } else { + log.debug("Fetching column family " + c.getFirst()); + scanner.fetchColumnFamily(c.getFirst()); + } + } + + scanner.setRange(split.getRange()); + numKeysRead = 0; + + // do this last after setting all scanner options + scannerIterator = scanner.iterator(); + } + + @Override + public void close() {} + + @Override + public float getProgress() throws IOException { + if (numKeysRead > 0 && currentKey == null) + return 1.0f; + return split.getProgress(currentKey); + } + + protected K currentK = null; + protected V currentV = null; + protected Key currentKey = null; + protected Value currentValue = null; + + @Override + public K getCurrentKey() throws IOException, InterruptedException { + return currentK; + } + + @Override + public V getCurrentValue() throws IOException, InterruptedException { + return currentV; + } + } + + Map>> binOfflineTable(JobContext context, String tableName, List ranges) throws TableNotFoundException, + AccumuloException, AccumuloSecurityException { + + Map>> binnedRanges = new HashMap>>(); + + Instance instance = getInstance(context); + Connector conn = instance.getConnector(getPrincipal(context), getAuthenticationToken(context)); + String tableId = Tables.getTableId(instance, tableName); + + if (Tables.getTableState(instance, tableId) != TableState.OFFLINE) { + Tables.clearCache(instance); + if (Tables.getTableState(instance, tableId) != TableState.OFFLINE) { + throw new AccumuloException("Table is online " + tableName + "(" + tableId + ") cannot scan table in offline mode "); + } + } + + for (Range range : ranges) { + Text startRow; + + if (range.getStartKey() != null) + startRow = range.getStartKey().getRow(); + else + startRow = new Text(); + + Range metadataRange = new Range(new KeyExtent(new Text(tableId), startRow, null).getMetadataEntry(), true, null, false); + Scanner scanner = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY); + MetadataSchema.TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN.fetch(scanner); + scanner.fetchColumnFamily(MetadataSchema.TabletsSection.LastLocationColumnFamily.NAME); + scanner.fetchColumnFamily(MetadataSchema.TabletsSection.CurrentLocationColumnFamily.NAME); + scanner.fetchColumnFamily(MetadataSchema.TabletsSection.FutureLocationColumnFamily.NAME); + scanner.setRange(metadataRange); + + RowIterator rowIter = new RowIterator(scanner); + KeyExtent lastExtent = null; + while (rowIter.hasNext()) { + Iterator> row = rowIter.next(); + String last = ""; + KeyExtent extent = null; + String location = null; + + while (row.hasNext()) { + Map.Entry entry = row.next(); + Key key = entry.getKey(); + + if (key.getColumnFamily().equals(MetadataSchema.TabletsSection.LastLocationColumnFamily.NAME)) { + last = entry.getValue().toString(); + } + + if (key.getColumnFamily().equals(MetadataSchema.TabletsSection.CurrentLocationColumnFamily.NAME) + || key.getColumnFamily().equals(MetadataSchema.TabletsSection.FutureLocationColumnFamily.NAME)) { + location = entry.getValue().toString(); + } + + if (MetadataSchema.TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN.hasColumns(key)) { + extent = new KeyExtent(key.getRow(), entry.getValue()); + } + + } + + if (location != null) + return null; + + if (!extent.getTableId().toString().equals(tableId)) { + throw new AccumuloException("Saw unexpected table Id " + tableId + " " + extent); + } + + if (lastExtent != null && !extent.isPreviousExtent(lastExtent)) { + throw new AccumuloException(" " + lastExtent + " is not previous extent " + extent); + } + + Map> tabletRanges = binnedRanges.get(last); + if (tabletRanges == null) { + tabletRanges = new HashMap>(); + binnedRanges.put(last, tabletRanges); + } + + List rangeList = tabletRanges.get(extent); + if (rangeList == null) { + rangeList = new ArrayList(); + tabletRanges.put(extent, rangeList); + } + + rangeList.add(range); + + if (extent.getEndRow() == null || range.afterEndKey(new Key(extent.getEndRow()).followingKey(PartialKey.ROW))) { + break; + } + + lastExtent = extent; + } + + } + + return binnedRanges; + } + + /** + * Gets the splits of the tables that have been set on the job. + * + * @param conf + * the configuration of the job + * @return the splits from the tables based on the ranges. + * @throws java.io.IOException + * if a table set on the job doesn't exist or an error occurs initializing the tablet locator + */ + public List getSplits(JobContext conf) throws IOException { + log.setLevel(getLogLevel(conf)); + validateOptions(conf); + + LinkedList splits = new LinkedList(); + List tableConfigs = getBatchScanConfigs(conf); + for (TableQueryConfig tableConfig : tableConfigs) { + + boolean autoAdjust = tableConfig.shouldAutoAdjustRanges(); + String tableId = null; + List ranges = autoAdjust ? Range.mergeOverlapping(tableConfig.getRanges()) : tableConfig.getRanges(); + if (ranges.isEmpty()) { + ranges = new ArrayList(1); + ranges.add(new Range()); + } + + // get the metadata information for these ranges + Map>> binnedRanges = new HashMap>>(); + TabletLocator tl; + try { + if (tableConfig.isOfflineScan()) { + binnedRanges = binOfflineTable(conf, tableConfig.getTableName(), ranges); + while (binnedRanges == null) { + // Some tablets were still online, try again + UtilWaitThread.sleep(100 + (int) (Math.random() * 100)); // sleep randomly between 100 and 200 ms + binnedRanges = binOfflineTable(conf, tableConfig.getTableName(), ranges); + + } + } else { + Instance instance = getInstance(conf); + tl = getTabletLocator(conf, tableConfig.getTableName()); + // its possible that the cache could contain complete, but old information about a tables tablets... so clear it + tl.invalidateCache(); + Credentials creds = new Credentials(getPrincipal(conf), getAuthenticationToken(conf)); + + while (!tl.binRanges(creds, ranges, binnedRanges).isEmpty()) { + if (!(instance instanceof MockInstance)) { + if (!Tables.exists(instance, tableId)) + throw new TableDeletedException(tableId); + if (Tables.getTableState(instance, tableId) == TableState.OFFLINE) + throw new TableOfflineException(instance, tableId); + tableId = Tables.getTableId(instance, tableConfig.getTableName()); + } + binnedRanges.clear(); + log.warn("Unable to locate bins for specified ranges. Retrying."); + UtilWaitThread.sleep(100 + (int) (Math.random() * 100)); // sleep randomly between 100 and 200 ms + tl.invalidateCache(); + } + } + } catch (Exception e) { + throw new IOException(e); + } + + HashMap> splitsToAdd = null; + + if (!autoAdjust) + splitsToAdd = new HashMap>(); + + HashMap hostNameCache = new HashMap(); + for (Map.Entry>> tserverBin : binnedRanges.entrySet()) { + String ip = tserverBin.getKey().split(":", 2)[0]; + String location = hostNameCache.get(ip); + if (location == null) { + InetAddress inetAddress = InetAddress.getByName(ip); + location = inetAddress.getHostName(); + hostNameCache.put(ip, location); + } + for (Map.Entry> extentRanges : tserverBin.getValue().entrySet()) { + Range ke = extentRanges.getKey().toDataRange(); + for (Range r : extentRanges.getValue()) { + if (autoAdjust) { + // divide ranges into smaller ranges, based on the tablets + splits.add(new RangeInputSplit(tableConfig.getTableName(), tableId, ke.clip(r), new String[] {location})); + } else { + // don't divide ranges + ArrayList locations = splitsToAdd.get(r); + if (locations == null) + locations = new ArrayList(1); + locations.add(location); + splitsToAdd.put(r, locations); + } + } + } + } + + if (!autoAdjust) + for (Map.Entry> entry : splitsToAdd.entrySet()) + splits.add(new RangeInputSplit(tableConfig.getTableName(), tableId, entry.getKey(), entry.getValue().toArray(new String[0]))); + } + return splits; + } + + /** + * The Class RangeInputSplit. Encapsulates an Accumulo range for use in Map Reduce jobs. + */ + public static class RangeInputSplit extends InputSplit implements Writable { + private Range range; + private String[] locations; + private String tableId; + private String tableName; + + public RangeInputSplit() { + range = new Range(); + locations = new String[0]; + tableId = ""; + tableName = ""; + } + + public RangeInputSplit(RangeInputSplit split) throws IOException { + this.setRange(split.getRange()); + this.setLocations(split.getLocations()); + this.setTableName(split.getTableName()); + } + + protected RangeInputSplit(String table, String tableId, Range range, String[] locations) { + this.range = range; + this.locations = locations; + this.tableName = table; + this.tableId = tableId; + } + + public Range getRange() { + return range; + } + + public void setRange(Range range) { + this.range = range; + } + + public String getTableName() { + return tableName; + } + + public void setTableName(String tableName) { + this.tableName = tableName; + } + + public void setTableId(String tableId) { + this.tableId = tableId; + } + + public String getTableId() { + return tableId; + } + + private static byte[] extractBytes(ByteSequence seq, int numBytes) { + byte[] bytes = new byte[numBytes + 1]; + bytes[0] = 0; + for (int i = 0; i < numBytes; i++) { + if (i >= seq.length()) + bytes[i + 1] = 0; + else + bytes[i + 1] = seq.byteAt(i); + } + return bytes; + } + + public static float getProgress(ByteSequence start, ByteSequence end, ByteSequence position) { + int maxDepth = Math.min(Math.max(end.length(), start.length()), position.length()); + BigInteger startBI = new BigInteger(extractBytes(start, maxDepth)); + BigInteger endBI = new BigInteger(extractBytes(end, maxDepth)); + BigInteger positionBI = new BigInteger(extractBytes(position, maxDepth)); + return (float) (positionBI.subtract(startBI).doubleValue() / endBI.subtract(startBI).doubleValue()); + } + + public float getProgress(Key currentKey) { + if (currentKey == null) + return 0f; + if (range.getStartKey() != null && range.getEndKey() != null) { + if (!range.getStartKey().equals(range.getEndKey(), PartialKey.ROW)) { + // just look at the row progress + return getProgress(range.getStartKey().getRowData(), range.getEndKey().getRowData(), currentKey.getRowData()); + } else if (!range.getStartKey().equals(range.getEndKey(), PartialKey.ROW_COLFAM)) { + // just look at the column family progress + return getProgress(range.getStartKey().getColumnFamilyData(), range.getEndKey().getColumnFamilyData(), currentKey.getColumnFamilyData()); + } else if (!range.getStartKey().equals(range.getEndKey(), PartialKey.ROW_COLFAM_COLQUAL)) { + // just look at the column qualifier progress + return getProgress(range.getStartKey().getColumnQualifierData(), range.getEndKey().getColumnQualifierData(), currentKey.getColumnQualifierData()); + } + } + // if we can't figure it out, then claim no progress + return 0f; + } + + /** + * This implementation of length is only an estimate, it does not provide exact values. Do not have your code rely on this return value. + */ + @Override + public long getLength() throws IOException { + Text startRow = range.isInfiniteStartKey() ? new Text(new byte[] {Byte.MIN_VALUE}) : range.getStartKey().getRow(); + Text stopRow = range.isInfiniteStopKey() ? new Text(new byte[] {Byte.MAX_VALUE}) : range.getEndKey().getRow(); + int maxCommon = Math.min(7, Math.min(startRow.getLength(), stopRow.getLength())); + long diff = 0; + + byte[] start = startRow.getBytes(); + byte[] stop = stopRow.getBytes(); + for (int i = 0; i < maxCommon; ++i) { + diff |= 0xff & (start[i] ^ stop[i]); + diff <<= Byte.SIZE; + } + + if (startRow.getLength() != stopRow.getLength()) + diff |= 0xff; + + return diff + 1; + } + + @Override + public String[] getLocations() throws IOException { + return locations; + } + + public void setLocations(String[] locations) { + this.locations = locations; + } + + @Override + public void readFields(DataInput in) throws IOException { + range.readFields(in); + tableName = in.readUTF(); + int numLocs = in.readInt(); + locations = new String[numLocs]; + for (int i = 0; i < numLocs; ++i) + locations[i] = in.readUTF(); + } + + @Override + public void write(DataOutput out) throws IOException { + range.write(out); + out.writeUTF(tableName); + out.writeInt(locations.length); + for (int i = 0; i < locations.length; ++i) + out.writeUTF(locations[i]); + } + } + + // use reflection to pull the Configuration out of the JobContext for Hadoop 1 and Hadoop 2 compatibility + static Configuration getConfiguration(JobContext context) { + try { + Class c = AbstractInputFormat.class.getClassLoader().loadClass("org.apache.hadoop.mapreduce.JobContext"); + Method m = c.getMethod("getConfiguration"); + Object o = m.invoke(context, new Object[0]); + return (Configuration) o; + } catch (Exception e) { + throw new RuntimeException(e); + } + } + +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/5c496552/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloMultiTableInputFormat.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloMultiTableInputFormat.java b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloMultiTableInputFormat.java new file mode 100644 index 0000000..2123ab9 --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloMultiTableInputFormat.java @@ -0,0 +1,53 @@ +package org.apache.accumulo.core.client.mapreduce; + +import org.apache.accumulo.core.client.mapreduce.lib.util.InputConfigurator; +import org.apache.accumulo.core.conf.TableQueryConfig; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.util.format.DefaultFormatter; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; + +import java.io.IOException; +import java.util.Map; + +import static com.google.common.base.Preconditions.checkNotNull; + +public class AccumuloMultiTableInputFormat extends AbstractInputFormat{ + + /** + * Sets the {@link org.apache.accumulo.core.conf.TableQueryConfig} objects on the given Hadoop configuration + * + * @param job + * the Hadoop job instance to be configured + * @param configs + * the table query configs to be set on the configuration. + * @since 1.6.0 + */ + public static void setBatchScanConfigs(Job job, TableQueryConfig... configs) { + checkNotNull(configs); + InputConfigurator.setTableQueryConfigs(CLASS, getConfiguration(job), configs); + } + + @Override + public RecordReader createRecordReader(InputSplit inputSplit, TaskAttemptContext context) throws IOException, InterruptedException { + log.setLevel(getLogLevel(context)); + return new RecordReaderBase() { + @Override + public boolean nextKeyValue() throws IOException, InterruptedException { + if (scannerIterator.hasNext()) { + ++numKeysRead; + Map.Entry entry = scannerIterator.next(); + currentK = currentKey = entry.getKey(); + currentV = currentValue = entry.getValue(); + if (log.isTraceEnabled()) + log.trace("Processing key/value pair: " + DefaultFormatter.formatEntry(entry, true)); + return true; + } + return false; + } + }; + } +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/5c496552/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java index e2d6b33..ad9c454 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java +++ b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java @@ -16,69 +16,28 @@ */ package org.apache.accumulo.core.client.mapreduce; -import static com.google.common.base.Preconditions.checkNotNull; -import static org.apache.accumulo.core.client.security.tokens.AuthenticationToken.AuthenticationTokenSerializer; - -import java.io.DataInput; -import java.io.DataOutput; import java.io.IOException; -import java.lang.reflect.Method; -import java.math.BigInteger; -import java.net.InetAddress; -import java.util.ArrayList; import java.util.Collection; -import java.util.HashMap; -import java.util.Iterator; -import java.util.LinkedList; import java.util.List; -import java.util.Map; -import java.util.Map.Entry; import java.util.Set; -import org.apache.accumulo.core.client.AccumuloException; -import org.apache.accumulo.core.client.AccumuloSecurityException; import org.apache.accumulo.core.client.ClientSideIteratorScanner; -import org.apache.accumulo.core.client.Connector; -import org.apache.accumulo.core.client.Instance; import org.apache.accumulo.core.client.IsolatedScanner; import org.apache.accumulo.core.client.IteratorSetting; -import org.apache.accumulo.core.client.RowIterator; -import org.apache.accumulo.core.client.Scanner; -import org.apache.accumulo.core.client.TableDeletedException; import org.apache.accumulo.core.client.TableNotFoundException; -import org.apache.accumulo.core.client.TableOfflineException; -import org.apache.accumulo.core.client.ZooKeeperInstance; -import org.apache.accumulo.core.client.impl.OfflineScanner; -import org.apache.accumulo.core.client.impl.Tables; import org.apache.accumulo.core.client.impl.TabletLocator; import org.apache.accumulo.core.client.mapreduce.lib.util.InputConfigurator; -import org.apache.accumulo.core.client.mock.MockInstance; -import org.apache.accumulo.core.client.security.tokens.AuthenticationToken; -import org.apache.accumulo.core.conf.TableQueryConfig; -import org.apache.accumulo.core.data.ByteSequence; import org.apache.accumulo.core.data.Key; -import org.apache.accumulo.core.data.KeyExtent; -import org.apache.accumulo.core.data.PartialKey; import org.apache.accumulo.core.data.Range; import org.apache.accumulo.core.data.Value; -import org.apache.accumulo.core.master.state.tables.TableState; -import org.apache.accumulo.core.metadata.MetadataTable; -import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection; -import org.apache.accumulo.core.security.Authorizations; -import org.apache.accumulo.core.security.Credentials; import org.apache.accumulo.core.util.Pair; -import org.apache.accumulo.core.util.UtilWaitThread; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.Text; -import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapreduce.InputFormat; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; -import org.apache.log4j.Level; -import org.apache.log4j.Logger; /** * This abstract {@link InputFormat} class allows MapReduce jobs to use Accumulo as the source of K,V pairs. @@ -90,75 +49,7 @@ import org.apache.log4j.Logger; *

* See {@link AccumuloInputFormat} for an example implementation. */ -public abstract class InputFormatBase extends InputFormat { - - private static final Class CLASS = AccumuloInputFormat.class; - protected static final Logger log = Logger.getLogger(CLASS); - - /** - * Sets the connector information needed to communicate with Accumulo in this job. - * - *

- * WARNING: The serialized token is stored in the configuration and shared with all MapReduce tasks. It is BASE64 encoded to provide a charset safe - * conversion to a string, and is not intended to be secure. - * - * @param job - * the Hadoop job instance to be configured - * @param principal - * a valid Accumulo user name (user must have Table.CREATE permission) - * @param token - * the user's password - * @throws AccumuloSecurityException - * @since 1.5.0 - */ - public static void setConnectorInfo(Job job, String principal, AuthenticationToken token) throws AccumuloSecurityException { - InputConfigurator.setConnectorInfo(CLASS, job.getConfiguration(), principal, token); - } - - /** - * Sets the connector information needed to communicate with Accumulo in this job. - * - *

- * Stores the password in a file in HDFS and pulls that into the Distributed Cache in an attempt to be more secure than storing it in the Configuration. - * - * @param job - * the Hadoop job instance to be configured - * @param principal - * a valid Accumulo user name (user must have Table.CREATE permission) - * @param tokenFile - * the path to the token file - * @throws AccumuloSecurityException - * @since 1.6.0 - */ - public static void setConnectorInfo(Job job, String principal, String tokenFile) throws AccumuloSecurityException { - InputConfigurator.setConnectorInfo(CLASS, job.getConfiguration(), principal, tokenFile); - } - - /** - * Determines if the connector has been configured. - * - * @param context - * the Hadoop context for the configured job - * @return true if the connector has been configured, false otherwise - * @since 1.5.0 - * @see #setConnectorInfo(Job, String, AuthenticationToken) - */ - protected static Boolean isConnectorInfoSet(JobContext context) { - return InputConfigurator.isConnectorInfoSet(CLASS, getConfiguration(context)); - } - - /** - * Gets the user name from the configuration. - * - * @param context - * the Hadoop context for the configured job - * @return the user name - * @since 1.5.0 - * @see #setConnectorInfo(Job, String, AuthenticationToken) - */ - protected static String getPrincipal(JobContext context) { - return InputConfigurator.getPrincipal(CLASS, getConfiguration(context)); - } +public abstract class InputFormatBase extends AbstractInputFormat { /** * Gets the table name from the configuration. @@ -174,110 +65,6 @@ public abstract class InputFormatBase extends InputFormat { } /** - * Gets the serialized token class from either the configuration or the token file. - * - * @since 1.5.0 - * @deprecated since 1.6.0; Use {@link #getAuthenticationToken(JobContext)} instead. - */ - @Deprecated - protected static String getTokenClass(JobContext context) { - return getAuthenticationToken(context).getClass().getName(); - } - - /** - * Gets the serialized token from either the configuration or the token file. - * - * @since 1.5.0 - * @deprecated since 1.6.0; Use {@link #getAuthenticationToken(JobContext)} instead. - */ - @Deprecated - protected static byte[] getToken(JobContext context) { - return AuthenticationTokenSerializer.serialize(getAuthenticationToken(context)); - } - - /** - * Gets the authenticated token from either the specified token file or directly from the configuration, whichever was used when the job was configured. - * - * @param context - * the Hadoop context for the configured job - * @return the principal's authentication token - * @since 1.6.0 - * @see #setConnectorInfo(Job, String, AuthenticationToken) - * @see #setConnectorInfo(Job, String, String) - */ - protected static AuthenticationToken getAuthenticationToken(JobContext context) { - return InputConfigurator.getAuthenticationToken(CLASS, getConfiguration(context)); - } - - /** - * Configures a {@link ZooKeeperInstance} for this job. - * - * @param job - * the Hadoop job instance to be configured - * @param instanceName - * the Accumulo instance name - * @param zooKeepers - * a comma-separated list of zookeeper servers - * @since 1.5.0 - */ - public static void setZooKeeperInstance(Job job, String instanceName, String zooKeepers) { - InputConfigurator.setZooKeeperInstance(CLASS, job.getConfiguration(), instanceName, zooKeepers); - } - - /** - * Configures a {@link MockInstance} for this job. - * - * @param job - * the Hadoop job instance to be configured - * @param instanceName - * the Accumulo instance name - * @since 1.5.0 - */ - public static void setMockInstance(Job job, String instanceName) { - InputConfigurator.setMockInstance(CLASS, job.getConfiguration(), instanceName); - } - - /** - * Initializes an Accumulo {@link Instance} based on the configuration. - * - * @param context - * the Hadoop context for the configured job - * @return an Accumulo instance - * @since 1.5.0 - * @see #setZooKeeperInstance(Job, String, String) - * @see #setMockInstance(Job, String) - */ - protected static Instance getInstance(JobContext context) { - return InputConfigurator.getInstance(CLASS, getConfiguration(context)); - } - - /** - * Sets the log level for this job. - * - * @param job - * the Hadoop job instance to be configured - * @param level - * the logging level - * @since 1.5.0 - */ - public static void setLogLevel(Job job, Level level) { - InputConfigurator.setLogLevel(CLASS, job.getConfiguration(), level); - } - - /** - * Gets the log level from this configuration. - * - * @param context - * the Hadoop context for the configured job - * @return the log level - * @since 1.5.0 - * @see #setLogLevel(Job, Level) - */ - protected static Level getLogLevel(JobContext context) { - return InputConfigurator.getLogLevel(CLASS, getConfiguration(context)); - } - - /** * Sets the name of the input table, over which this job will scan. * * @param job @@ -291,31 +78,6 @@ public abstract class InputFormatBase extends InputFormat { } /** - * Sets the {@link Authorizations} used to scan. Must be a subset of the user's authorization. Defaults to the empty set. - * - * @param job - * the Hadoop job instance to be configured - * @param auths - * the user's authorizations - */ - public static void setScanAuthorizations(Job job, Authorizations auths) { - InputConfigurator.setScanAuthorizations(CLASS, job.getConfiguration(), auths); - } - - /** - * Gets the authorizations to set for the scans from the configuration. - * - * @param context - * the Hadoop context for the configured job - * @return the Accumulo scan authorizations - * @since 1.5.0 - * @see #setScanAuthorizations(Job, Authorizations) - */ - protected static Authorizations getScanAuthorizations(JobContext context) { - return InputConfigurator.getScanAuthorizations(CLASS, getConfiguration(context)); - } - - /** * Sets the input ranges to scan for the single input table associated with this job. * * @param job @@ -427,55 +189,6 @@ public abstract class InputFormatBase extends InputFormat { } /** - * Sets the {@link TableQueryConfig} objects on the given Hadoop configuration - * - * @param job - * the Hadoop job instance to be configured - * @param configs - * the table query configs to be set on the configuration. - * @since 1.6.0 - */ - public static void setTableQueryConfigs(Job job, TableQueryConfig... configs) { - checkNotNull(configs); - InputConfigurator.setTableQueryConfigs(CLASS, getConfiguration(job), configs); - } - - /** - * Fetches all {@link TableQueryConfig}s that have been set on the given Hadoop configuration. - * - *

- * Note this also returns the {@link TableQueryConfig} representing the table configurations set through the single table input methods like - * {@link #setInputTableName(org.apache.hadoop.mapreduce.Job, String)}, {@link #setRanges(org.apache.hadoop.mapreduce.Job, java.util.Collection)}, - * {@link #fetchColumns(org.apache.hadoop.mapreduce.Job, java.util.Collection)}, - * {@link #addIterator(org.apache.hadoop.mapreduce.Job, org.apache.accumulo.core.client.IteratorSetting)}, etc...) - * - * @param job - * the Hadoop job instance to be configured - * @return - * @since 1.6.0 - */ - public static List getTableQueryConfigs(JobContext job) { - return InputConfigurator.getTableQueryConfigs(CLASS, getConfiguration(job)); - } - - /** - * Fetches a {@link TableQueryConfig} that has been set on the configuration for a specific table. - * - *

- * null is returned in the event that the table doesn't exist. - * - * @param job - * the Hadoop job instance to be configured - * @param tableName - * the table name for which to grab the config object - * @return the {@link TableQueryConfig} for the given table - * @since 1.6.0 - */ - public static TableQueryConfig getTableQueryConfig(JobContext job, String tableName) { - return InputConfigurator.getTableQueryConfig(CLASS,getConfiguration(job),tableName); - } - - /** * Controls the use of the {@link IsolatedScanner} in this job. * *

@@ -583,516 +296,17 @@ public abstract class InputFormatBase extends InputFormat { } /** - * Initializes an Accumulo {@link TabletLocator} based on the configuration. + * Initializes an Accumulo {@link org.apache.accumulo.core.client.impl.TabletLocator} based on the configuration. * * @param context * the Hadoop context for the configured job - * @param table - * the table for which to initialize the locator * @return an Accumulo tablet locator - * @throws TableNotFoundException + * @throws org.apache.accumulo.core.client.TableNotFoundException * if the table name set on the configuration doesn't exist * @since 1.5.0 */ - protected static TabletLocator getTabletLocator(JobContext context, String table) throws TableNotFoundException { - return InputConfigurator.getTabletLocator(CLASS, getConfiguration(context), table); - } - - // InputFormat doesn't have the equivalent of OutputFormat's checkOutputSpecs(JobContext job) - /** - * Check whether a configuration is fully configured to be used with an Accumulo {@link org.apache.hadoop.mapreduce.InputFormat}. - * - * @param context - * the Hadoop context for the configured job - * @throws IOException - * if the context is improperly configured - * @since 1.5.0 - */ - protected static void validateOptions(JobContext context) throws IOException { - InputConfigurator.validateOptions(CLASS, getConfiguration(context)); - } - - /** - * An abstract base class to be used to create {@link RecordReader} instances that convert from Accumulo {@link Key}/{@link Value} pairs to the user's K/V - * types. - * - * Subclasses must implement {@link #nextKeyValue()} and use it to update the following variables: - *

    - *
  • K {@link #currentK}
  • - *
  • V {@link #currentV}
  • - *
  • Key {@link #currentKey} (used for progress reporting)
  • - *
  • int {@link #numKeysRead} (used for progress reporting)
  • - *
- */ - protected abstract static class RecordReaderBase extends RecordReader { - protected long numKeysRead; - protected Iterator> scannerIterator; - protected RangeInputSplit split; - - /** - * Apply the configured iterators from the configuration to the scanner. This applies both the default iterators and the per-table iterators. - * - * @param context - * the Hadoop context for the configured job - * @param scanner - * the scanner to configure - * @param tableName - * the table name for which to set up the iterators - */ - protected void setupIterators(TaskAttemptContext context, Scanner scanner, String tableName) { - TableQueryConfig config = getTableQueryConfig(context, tableName); - List iterators = config.getIterators(); - for (IteratorSetting iterator : iterators) - scanner.addScanIterator(iterator); - } - - /** - * Initialize a scanner over the given input split using this task attempt configuration. - */ - @Override - public void initialize(InputSplit inSplit, TaskAttemptContext attempt) throws IOException { - - Scanner scanner; - split = (RangeInputSplit) inSplit; - log.debug("Initializing input split: " + split.getRange()); - Instance instance = getInstance(attempt); - String principal = getPrincipal(attempt); - - TableQueryConfig tableConfig = getTableQueryConfig(attempt, split.getTableName()); - - // in case the table name changed, we can still use the previous name for terms of configuration, - // but for the scanner, we'll need to reference the new table name. - String actualNameForId = split.getTableName(); - if (!(instance instanceof MockInstance)) { - try { - actualNameForId = Tables.getTableName(instance, split.getTableId()); - if (!actualNameForId.equals(split.getTableName())) - log.debug("Table name changed from " + split.getTableName() + " to " + actualNameForId); - } catch (TableNotFoundException e) { - throw new IOException("The specified table was not found for id=" + split.getTableId()); - } - } - - AuthenticationToken token = getAuthenticationToken(attempt); - Authorizations authorizations = getScanAuthorizations(attempt); - try { - log.debug("Creating connector with user: " + principal); - - Connector conn = instance.getConnector(principal, token); - log.debug("Creating scanner for table: " + split.getTableName()); - log.debug("Authorizations are: " + authorizations); - if (tableConfig.isOfflineScan()) { - scanner = new OfflineScanner(instance, new Credentials(principal, token), split.getTableId(), authorizations); - } else { - scanner = conn.createScanner(actualNameForId, authorizations); - } - if (tableConfig.shouldUseIsolatedScanners()) { - log.info("Creating isolated scanner"); - scanner = new IsolatedScanner(scanner); - } - if (tableConfig.shouldUseLocalIterators()) { - log.info("Using local iterators"); - scanner = new ClientSideIteratorScanner(scanner); - } - setupIterators(attempt, scanner, split.getTableName()); - } catch (Exception e) { - throw new IOException(e); - } - - // setup a scanner within the bounds of this split - for (Pair c : tableConfig.getFetchedColumns()) { - if (c.getSecond() != null) { - log.debug("Fetching column " + c.getFirst() + ":" + c.getSecond()); - scanner.fetchColumn(c.getFirst(), c.getSecond()); - } else { - log.debug("Fetching column family " + c.getFirst()); - scanner.fetchColumnFamily(c.getFirst()); - } - } - - scanner.setRange(split.getRange()); - numKeysRead = 0; - - // do this last after setting all scanner options - scannerIterator = scanner.iterator(); - } - - @Override - public void close() {} - - @Override - public float getProgress() throws IOException { - if (numKeysRead > 0 && currentKey == null) - return 1.0f; - return split.getProgress(currentKey); - } - - protected K currentK = null; - protected V currentV = null; - protected Key currentKey = null; - protected Value currentValue = null; - - @Override - public K getCurrentKey() throws IOException, InterruptedException { - return currentK; - } - - @Override - public V getCurrentValue() throws IOException, InterruptedException { - return currentV; - } - } - - Map>> binOfflineTable(JobContext context, String tableName, List ranges) throws TableNotFoundException, - AccumuloException, AccumuloSecurityException { - - Map>> binnedRanges = new HashMap>>(); - - Instance instance = getInstance(context); - Connector conn = instance.getConnector(getPrincipal(context), getAuthenticationToken(context)); - String tableId = Tables.getTableId(instance, tableName); - - if (Tables.getTableState(instance, tableId) != TableState.OFFLINE) { - Tables.clearCache(instance); - if (Tables.getTableState(instance, tableId) != TableState.OFFLINE) { - throw new AccumuloException("Table is online " + tableName + "(" + tableId + ") cannot scan table in offline mode "); - } - } - - for (Range range : ranges) { - Text startRow; - - if (range.getStartKey() != null) - startRow = range.getStartKey().getRow(); - else - startRow = new Text(); - - Range metadataRange = new Range(new KeyExtent(new Text(tableId), startRow, null).getMetadataEntry(), true, null, false); - Scanner scanner = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY); - TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN.fetch(scanner); - scanner.fetchColumnFamily(TabletsSection.LastLocationColumnFamily.NAME); - scanner.fetchColumnFamily(TabletsSection.CurrentLocationColumnFamily.NAME); - scanner.fetchColumnFamily(TabletsSection.FutureLocationColumnFamily.NAME); - scanner.setRange(metadataRange); - - RowIterator rowIter = new RowIterator(scanner); - KeyExtent lastExtent = null; - while (rowIter.hasNext()) { - Iterator> row = rowIter.next(); - String last = ""; - KeyExtent extent = null; - String location = null; - - while (row.hasNext()) { - Entry entry = row.next(); - Key key = entry.getKey(); - - if (key.getColumnFamily().equals(TabletsSection.LastLocationColumnFamily.NAME)) { - last = entry.getValue().toString(); - } - - if (key.getColumnFamily().equals(TabletsSection.CurrentLocationColumnFamily.NAME) - || key.getColumnFamily().equals(TabletsSection.FutureLocationColumnFamily.NAME)) { - location = entry.getValue().toString(); - } - - if (TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN.hasColumns(key)) { - extent = new KeyExtent(key.getRow(), entry.getValue()); - } - - } - - if (location != null) - return null; - - if (!extent.getTableId().toString().equals(tableId)) { - throw new AccumuloException("Saw unexpected table Id " + tableId + " " + extent); - } - - if (lastExtent != null && !extent.isPreviousExtent(lastExtent)) { - throw new AccumuloException(" " + lastExtent + " is not previous extent " + extent); - } - - Map> tabletRanges = binnedRanges.get(last); - if (tabletRanges == null) { - tabletRanges = new HashMap>(); - binnedRanges.put(last, tabletRanges); - } - - List rangeList = tabletRanges.get(extent); - if (rangeList == null) { - rangeList = new ArrayList(); - tabletRanges.put(extent, rangeList); - } - - rangeList.add(range); - - if (extent.getEndRow() == null || range.afterEndKey(new Key(extent.getEndRow()).followingKey(PartialKey.ROW))) { - break; - } - - lastExtent = extent; - } - - } - - return binnedRanges; - } - - /** - * Gets the splits of the tables that have been set on the job. - * - * @param conf - * the configuration of the job - * @return the splits from the tables based on the ranges. - * @throws IOException - * if a table set on the job doesn't exist or an error occurs initializing the tablet locator - */ - public List getSplits(JobContext conf) throws IOException { - log.setLevel(getLogLevel(conf)); - validateOptions(conf); - - LinkedList splits = new LinkedList(); - List tableConfigs = getTableQueryConfigs(conf); - for (TableQueryConfig tableConfig : tableConfigs) { - - boolean autoAdjust = tableConfig.shouldAutoAdjustRanges(); - String tableId = null; - List ranges = autoAdjust ? Range.mergeOverlapping(tableConfig.getRanges()) : tableConfig.getRanges(); - if (ranges.isEmpty()) { - ranges = new ArrayList(1); - ranges.add(new Range()); - } - - // get the metadata information for these ranges - Map>> binnedRanges = new HashMap>>(); - TabletLocator tl; - try { - if (tableConfig.isOfflineScan()) { - binnedRanges = binOfflineTable(conf, tableConfig.getTableName(), ranges); - while (binnedRanges == null) { - // Some tablets were still online, try again - UtilWaitThread.sleep(100 + (int) (Math.random() * 100)); // sleep randomly between 100 and 200 ms - binnedRanges = binOfflineTable(conf, tableConfig.getTableName(), ranges); - - } - } else { - Instance instance = getInstance(conf); - tl = getTabletLocator(conf, tableConfig.getTableName()); - // its possible that the cache could contain complete, but old information about a tables tablets... so clear it - tl.invalidateCache(); - Credentials creds = new Credentials(getPrincipal(conf), getAuthenticationToken(conf)); - - while (!tl.binRanges(creds, ranges, binnedRanges).isEmpty()) { - if (!(instance instanceof MockInstance)) { - if (!Tables.exists(instance, tableId)) - throw new TableDeletedException(tableId); - if (Tables.getTableState(instance, tableId) == TableState.OFFLINE) - throw new TableOfflineException(instance, tableId); - tableId = Tables.getTableId(instance, tableConfig.getTableName()); - } - binnedRanges.clear(); - log.warn("Unable to locate bins for specified ranges. Retrying."); - UtilWaitThread.sleep(100 + (int) (Math.random() * 100)); // sleep randomly between 100 and 200 ms - tl.invalidateCache(); - } - } - } catch (Exception e) { - throw new IOException(e); - } - - HashMap> splitsToAdd = null; - - if (!autoAdjust) - splitsToAdd = new HashMap>(); - - HashMap hostNameCache = new HashMap(); - for (Entry>> tserverBin : binnedRanges.entrySet()) { - String ip = tserverBin.getKey().split(":", 2)[0]; - String location = hostNameCache.get(ip); - if (location == null) { - InetAddress inetAddress = InetAddress.getByName(ip); - location = inetAddress.getHostName(); - hostNameCache.put(ip, location); - } - for (Entry> extentRanges : tserverBin.getValue().entrySet()) { - Range ke = extentRanges.getKey().toDataRange(); - for (Range r : extentRanges.getValue()) { - if (autoAdjust) { - // divide ranges into smaller ranges, based on the tablets - splits.add(new RangeInputSplit(tableConfig.getTableName(), tableId, ke.clip(r), new String[] {location})); - } else { - // don't divide ranges - ArrayList locations = splitsToAdd.get(r); - if (locations == null) - locations = new ArrayList(1); - locations.add(location); - splitsToAdd.put(r, locations); - } - } - } - } - - if (!autoAdjust) - for (Entry> entry : splitsToAdd.entrySet()) - splits.add(new RangeInputSplit(tableConfig.getTableName(), tableId, entry.getKey(), entry.getValue().toArray(new String[0]))); - } - return splits; + protected static TabletLocator getTabletLocator(JobContext context) throws TableNotFoundException { + return InputConfigurator.getTabletLocator(CLASS, getConfiguration(context), InputConfigurator.getInputTableName(CLASS, getConfiguration(context))); } - /** - * The Class RangeInputSplit. Encapsulates an Accumulo range for use in Map Reduce jobs. - */ - public static class RangeInputSplit extends InputSplit implements Writable { - private Range range; - private String[] locations; - private String tableId; - private String tableName; - - public RangeInputSplit() { - range = new Range(); - locations = new String[0]; - tableId = ""; - tableName = ""; - } - - public RangeInputSplit(RangeInputSplit split) throws IOException { - this.setRange(split.getRange()); - this.setLocations(split.getLocations()); - this.setTableName(split.getTableName()); - } - - protected RangeInputSplit(String table, String tableId, Range range, String[] locations) { - this.range = range; - this.locations = locations; - this.tableName = table; - this.tableId = tableId; - } - - public Range getRange() { - return range; - } - - public void setRange(Range range) { - this.range = range; - } - - public String getTableName() { - return tableName; - } - - public void setTableName(String tableName) { - this.tableName = tableName; - } - - public void setTableId(String tableId) { - this.tableId = tableId; - } - - public String getTableId() { - return tableId; - } - - private static byte[] extractBytes(ByteSequence seq, int numBytes) { - byte[] bytes = new byte[numBytes + 1]; - bytes[0] = 0; - for (int i = 0; i < numBytes; i++) { - if (i >= seq.length()) - bytes[i + 1] = 0; - else - bytes[i + 1] = seq.byteAt(i); - } - return bytes; - } - - public static float getProgress(ByteSequence start, ByteSequence end, ByteSequence position) { - int maxDepth = Math.min(Math.max(end.length(), start.length()), position.length()); - BigInteger startBI = new BigInteger(extractBytes(start, maxDepth)); - BigInteger endBI = new BigInteger(extractBytes(end, maxDepth)); - BigInteger positionBI = new BigInteger(extractBytes(position, maxDepth)); - return (float) (positionBI.subtract(startBI).doubleValue() / endBI.subtract(startBI).doubleValue()); - } - - public float getProgress(Key currentKey) { - if (currentKey == null) - return 0f; - if (range.getStartKey() != null && range.getEndKey() != null) { - if (!range.getStartKey().equals(range.getEndKey(), PartialKey.ROW)) { - // just look at the row progress - return getProgress(range.getStartKey().getRowData(), range.getEndKey().getRowData(), currentKey.getRowData()); - } else if (!range.getStartKey().equals(range.getEndKey(), PartialKey.ROW_COLFAM)) { - // just look at the column family progress - return getProgress(range.getStartKey().getColumnFamilyData(), range.getEndKey().getColumnFamilyData(), currentKey.getColumnFamilyData()); - } else if (!range.getStartKey().equals(range.getEndKey(), PartialKey.ROW_COLFAM_COLQUAL)) { - // just look at the column qualifier progress - return getProgress(range.getStartKey().getColumnQualifierData(), range.getEndKey().getColumnQualifierData(), currentKey.getColumnQualifierData()); - } - } - // if we can't figure it out, then claim no progress - return 0f; - } - - /** - * This implementation of length is only an estimate, it does not provide exact values. Do not have your code rely on this return value. - */ - @Override - public long getLength() throws IOException { - Text startRow = range.isInfiniteStartKey() ? new Text(new byte[] {Byte.MIN_VALUE}) : range.getStartKey().getRow(); - Text stopRow = range.isInfiniteStopKey() ? new Text(new byte[] {Byte.MAX_VALUE}) : range.getEndKey().getRow(); - int maxCommon = Math.min(7, Math.min(startRow.getLength(), stopRow.getLength())); - long diff = 0; - - byte[] start = startRow.getBytes(); - byte[] stop = stopRow.getBytes(); - for (int i = 0; i < maxCommon; ++i) { - diff |= 0xff & (start[i] ^ stop[i]); - diff <<= Byte.SIZE; - } - - if (startRow.getLength() != stopRow.getLength()) - diff |= 0xff; - - return diff + 1; - } - - @Override - public String[] getLocations() throws IOException { - return locations; - } - - public void setLocations(String[] locations) { - this.locations = locations; - } - - @Override - public void readFields(DataInput in) throws IOException { - range.readFields(in); - tableName = in.readUTF(); - int numLocs = in.readInt(); - locations = new String[numLocs]; - for (int i = 0; i < numLocs; ++i) - locations[i] = in.readUTF(); - } - - @Override - public void write(DataOutput out) throws IOException { - range.write(out); - out.writeUTF(tableName); - out.writeInt(locations.length); - for (int i = 0; i < locations.length; ++i) - out.writeUTF(locations[i]); - } - } - - // use reflection to pull the Configuration out of the JobContext for Hadoop 1 and Hadoop 2 compatibility - static Configuration getConfiguration(JobContext context) { - try { - Class c = InputFormatBase.class.getClassLoader().loadClass("org.apache.hadoop.mapreduce.JobContext"); - Method m = c.getMethod("getConfiguration"); - Object o = m.invoke(context, new Object[0]); - return (Configuration) o; - } catch (Exception e) { - throw new RuntimeException(e); - } - } } http://git-wip-us.apache.org/repos/asf/accumulo/blob/5c496552/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormatTest.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormatTest.java b/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormatTest.java index 6f92dec..c9539c4 100644 --- a/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormatTest.java +++ b/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormatTest.java @@ -23,7 +23,6 @@ import static org.junit.Assert.assertTrue; import java.io.ByteArrayOutputStream; import java.io.DataOutputStream; import java.io.IOException; -import java.util.Collections; import java.util.List; import org.apache.accumulo.core.client.BatchWriter; @@ -32,15 +31,12 @@ import org.apache.accumulo.core.client.Connector; import org.apache.accumulo.core.client.IteratorSetting; import org.apache.accumulo.core.client.mock.MockInstance; import org.apache.accumulo.core.client.security.tokens.PasswordToken; -import org.apache.accumulo.core.conf.TableQueryConfig; import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Mutation; -import org.apache.accumulo.core.data.Range; import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.iterators.user.RegExFilter; import org.apache.accumulo.core.iterators.user.WholeRowIterator; import org.apache.accumulo.core.util.CachedConfiguration; -import org.apache.accumulo.core.util.Pair; import org.apache.commons.codec.binary.Base64; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; @@ -57,7 +53,6 @@ public class AccumuloInputFormatTest { private static final String PREFIX = AccumuloInputFormatTest.class.getSimpleName(); private static final String INSTANCE_NAME = PREFIX + "_mapreduce_instance"; private static final String TEST_TABLE_1 = PREFIX + "_mapreduce_table_1"; - private static final String TEST_TABLE_2 = PREFIX + "_mapreduce_table_2"; /** * Check that the iterator configuration is getting stored in the Job conf correctly. @@ -201,7 +196,6 @@ public class AccumuloInputFormatTest { private static AssertionError e2 = null; private static class MRTester extends Configured implements Tool { - private static class TestMapper extends Mapper { Key key = null; int count = 0; @@ -209,11 +203,10 @@ public class AccumuloInputFormatTest { @Override protected void map(Key k, Value v, Context context) throws IOException, InterruptedException { try { - String tableName = ((InputFormatBase.RangeInputSplit) context.getInputSplit()).getTableName(); if (key != null) assertEquals(key.getRow().toString(), new String(v.get())); - assertEquals(new Text(String.format("%s_%09x", tableName, count + 1)), k.getRow()); - assertEquals(String.format("%s_%09x", tableName, count), new String(v.get())); + assertEquals(k.getRow(), new Text(String.format("%09x", count + 1))); + assertEquals(new String(v.get()), String.format("%09x", count)); } catch (AssertionError e) { e1 = e; } @@ -234,14 +227,13 @@ public class AccumuloInputFormatTest { @Override public int run(String[] args) throws Exception { - if (args.length != 4) { - throw new IllegalArgumentException("Usage : " + MRTester.class.getName() + " "); + if (args.length != 3) { + throw new IllegalArgumentException("Usage : " + MRTester.class.getName() + " "); } String user = args[0]; String pass = args[1]; - String table1 = args[2]; - String table2 = args[3]; + String table = args[2]; Job job = new Job(getConf(), this.getClass().getSimpleName() + "_" + System.currentTimeMillis()); job.setJarByClass(this.getClass()); @@ -249,11 +241,7 @@ public class AccumuloInputFormatTest { job.setInputFormatClass(AccumuloInputFormat.class); AccumuloInputFormat.setConnectorInfo(job, user, new PasswordToken(pass)); - - TableQueryConfig tableConfig1 = new TableQueryConfig(table1); - TableQueryConfig tableConfig2 = new TableQueryConfig(table2); - - AccumuloInputFormat.setTableQueryConfigs(job, tableConfig1, tableConfig2); + AccumuloInputFormat.setInputTableName(job, table); AccumuloInputFormat.setMockInstance(job, INSTANCE_NAME); job.setMapperClass(TestMapper.class); @@ -273,78 +261,21 @@ public class AccumuloInputFormatTest { } } - /** - * Generate incrementing counts and attach table name to the key/value so that order and multi-table data can be verified. - */ @Test public void testMap() throws Exception { MockInstance mockInstance = new MockInstance(INSTANCE_NAME); Connector c = mockInstance.getConnector("root", new PasswordToken("")); c.tableOperations().create(TEST_TABLE_1); - c.tableOperations().create(TEST_TABLE_2); BatchWriter bw = c.createBatchWriter(TEST_TABLE_1, new BatchWriterConfig()); - BatchWriter bw2 = c.createBatchWriter(TEST_TABLE_2, new BatchWriterConfig()); for (int i = 0; i < 100; i++) { - Mutation t1m = new Mutation(new Text(String.format("%s_%09x", TEST_TABLE_1, i + 1))); - t1m.put(new Text(), new Text(), new Value(String.format("%s_%09x", TEST_TABLE_1, i).getBytes())); - bw.addMutation(t1m); - Mutation t2m = new Mutation(new Text(String.format("%s_%09x", TEST_TABLE_2, i + 1))); - t2m.put(new Text(), new Text(), new Value(String.format("%s_%09x", TEST_TABLE_2, i).getBytes())); - bw2.addMutation(t2m); + Mutation m = new Mutation(new Text(String.format("%09x", i + 1))); + m.put(new Text(), new Text(), new Value(String.format("%09x", i).getBytes())); + bw.addMutation(m); } bw.close(); - bw2.close(); - MRTester.main(new String[] {"root", "", TEST_TABLE_1, TEST_TABLE_2}); + MRTester.main(new String[] {"root", "", TEST_TABLE_1}); assertNull(e1); assertNull(e2); } - - /** - * Verify {@link TableQueryConfig} objects get correctly serialized in the JobContext. - */ - @Test - public void testTableQueryConfigSerialization() throws IOException { - - Job job = new Job(); - - TableQueryConfig table1 = new TableQueryConfig(TEST_TABLE_1).setRanges(Collections.singletonList(new Range("a", "b"))) - .fetchColumns(Collections.singleton(new Pair(new Text("CF1"), new Text("CQ1")))) - .setIterators(Collections.singletonList(new IteratorSetting(50, "iter1", "iterclass1"))); - - TableQueryConfig table2 = new TableQueryConfig(TEST_TABLE_2).setRanges(Collections.singletonList(new Range("a", "b"))) - .fetchColumns(Collections.singleton(new Pair(new Text("CF1"), new Text("CQ1")))) - .setIterators(Collections.singletonList(new IteratorSetting(50, "iter1", "iterclass1"))); - - AccumuloInputFormat.setTableQueryConfigs(job, table1, table2); - - assertEquals(table1, AccumuloInputFormat.getTableQueryConfig(job, TEST_TABLE_1)); - assertEquals(table2, AccumuloInputFormat.getTableQueryConfig(job, TEST_TABLE_2)); - } - - /** - * Verify that union of legacy input and new multi-table input get returned for backwards compatibility. - */ - @Test - public void testTableQueryConfigSingleAndMultitableMethods() throws IOException { - - Job job = new Job(); - - TableQueryConfig table1 = new TableQueryConfig(TEST_TABLE_1).setRanges(Collections.singletonList(new Range("a", "b"))) - .fetchColumns(Collections.singleton(new Pair(new Text("CF1"), new Text("CQ1")))) - .setIterators(Collections.singletonList(new IteratorSetting(50, "iter1", "iterclass1"))); - - TableQueryConfig table2 = new TableQueryConfig(TEST_TABLE_2).setRanges(Collections.singletonList(new Range("a", "b"))) - .fetchColumns(Collections.singleton(new Pair(new Text("CF1"), new Text("CQ1")))) - .setIterators(Collections.singletonList(new IteratorSetting(50, "iter1", "iterclass1"))); - - AccumuloInputFormat.setTableQueryConfigs(job, table1); - AccumuloInputFormat.setInputTableName(job, table2.getTableName()); - AccumuloInputFormat.setRanges(job, table2.getRanges()); - AccumuloInputFormat.fetchColumns(job, table2.getFetchedColumns()); - AccumuloInputFormat.addIterator(job, table2.getIterators().get(0)); - - assertEquals(table1, AccumuloInputFormat.getTableQueryConfig(job, TEST_TABLE_1)); - assertEquals(table2, AccumuloInputFormat.getTableQueryConfig(job, TEST_TABLE_2)); - } } http://git-wip-us.apache.org/repos/asf/accumulo/blob/5c496552/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloMultiTableInputFormatTest.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloMultiTableInputFormatTest.java b/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloMultiTableInputFormatTest.java new file mode 100644 index 0000000..b3b6d8b --- /dev/null +++ b/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloMultiTableInputFormatTest.java @@ -0,0 +1,163 @@ +package org.apache.accumulo.core.client.mapreduce; + +import org.apache.accumulo.core.client.BatchWriter; +import org.apache.accumulo.core.client.BatchWriterConfig; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.IteratorSetting; +import org.apache.accumulo.core.client.mock.MockInstance; +import org.apache.accumulo.core.client.security.tokens.PasswordToken; +import org.apache.accumulo.core.conf.TableQueryConfig; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.util.CachedConfiguration; +import org.apache.accumulo.core.util.Pair; +import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; +import org.junit.Test; + +import java.io.IOException; +import java.util.Collections; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; + +public class AccumuloMultiTableInputFormatTest { + + private static final String PREFIX = AccumuloMultiTableInputFormatTest.class.getSimpleName(); + private static final String INSTANCE_NAME = PREFIX + "_mapreduce_instance"; + private static final String TEST_TABLE_1 = PREFIX + "_mapreduce_table_1"; + private static final String TEST_TABLE_2 = PREFIX + "_mapreduce_table_2"; + + private static AssertionError e1 = null; + private static AssertionError e2 = null; + + private static class MRTester extends Configured implements Tool { + + private static class TestMapper extends Mapper { + Key key = null; + int count = 0; + + @Override + protected void map(Key k, Value v, Context context) throws IOException, InterruptedException { + try { + String tableName = ((InputFormatBase.RangeInputSplit) context.getInputSplit()).getTableName(); + if (key != null) + assertEquals(key.getRow().toString(), new String(v.get())); + assertEquals(new Text(String.format("%s_%09x", tableName, count + 1)), k.getRow()); + assertEquals(String.format("%s_%09x", tableName, count), new String(v.get())); + } catch (AssertionError e) { + e1 = e; + } + key = new Key(k); + count++; + } + + @Override + protected void cleanup(Context context) throws IOException, InterruptedException { + try { + assertEquals(100, count); + } catch (AssertionError e) { + e2 = e; + } + } + } + + @Override + public int run(String[] args) throws Exception { + + if (args.length != 4) { + throw new IllegalArgumentException("Usage : " + MRTester.class.getName() + " "); + } + + String user = args[0]; + String pass = args[1]; + String table1 = args[2]; + String table2 = args[3]; + + Job job = new Job(getConf(), this.getClass().getSimpleName() + "_" + System.currentTimeMillis()); + job.setJarByClass(this.getClass()); + + job.setInputFormatClass(AccumuloMultiTableInputFormat.class); + + AccumuloMultiTableInputFormat.setConnectorInfo(job, user, new PasswordToken(pass)); + + TableQueryConfig tableConfig1 = new TableQueryConfig(table1); + TableQueryConfig tableConfig2 = new TableQueryConfig(table2); + + AccumuloMultiTableInputFormat.setBatchScanConfigs(job, tableConfig1, tableConfig2); + AccumuloMultiTableInputFormat.setMockInstance(job, INSTANCE_NAME); + + job.setMapperClass(TestMapper.class); + job.setMapOutputKeyClass(Key.class); + job.setMapOutputValueClass(Value.class); + job.setOutputFormatClass(NullOutputFormat.class); + + job.setNumReduceTasks(0); + + job.waitForCompletion(true); + + return job.isSuccessful() ? 0 : 1; + } + + public static void main(String[] args) throws Exception { + assertEquals(0, ToolRunner.run(CachedConfiguration.getInstance(), new MRTester(), args)); + } + } + + /** + * Generate incrementing counts and attach table name to the key/value so that order and multi-table data can be verified. + */ + @Test + public void testMap() throws Exception { + MockInstance mockInstance = new MockInstance(INSTANCE_NAME); + Connector c = mockInstance.getConnector("root", new PasswordToken("")); + c.tableOperations().create(TEST_TABLE_1); + c.tableOperations().create(TEST_TABLE_2); + BatchWriter bw = c.createBatchWriter(TEST_TABLE_1, new BatchWriterConfig()); + BatchWriter bw2 = c.createBatchWriter(TEST_TABLE_2, new BatchWriterConfig()); + for (int i = 0; i < 100; i++) { + Mutation t1m = new Mutation(new Text(String.format("%s_%09x", TEST_TABLE_1, i + 1))); + t1m.put(new Text(), new Text(), new Value(String.format("%s_%09x", TEST_TABLE_1, i).getBytes())); + bw.addMutation(t1m); + Mutation t2m = new Mutation(new Text(String.format("%s_%09x", TEST_TABLE_2, i + 1))); + t2m.put(new Text(), new Text(), new Value(String.format("%s_%09x", TEST_TABLE_2, i).getBytes())); + bw2.addMutation(t2m); + } + bw.close(); + bw2.close(); + + MRTester.main(new String[] {"root", "", TEST_TABLE_1, TEST_TABLE_2}); + assertNull(e1); + assertNull(e2); + } + + /** + * Verify {@link TableQueryConfig} objects get correctly serialized in the JobContext. + */ + @Test + public void testTableQueryConfigSerialization() throws IOException { + + Job job = new Job(); + + TableQueryConfig table1 = new TableQueryConfig(TEST_TABLE_1).setRanges(Collections.singletonList(new Range("a", "b"))) + .fetchColumns(Collections.singleton(new Pair(new Text("CF1"), new Text("CQ1")))) + .setIterators(Collections.singletonList(new IteratorSetting(50, "iter1", "iterclass1"))); + + TableQueryConfig table2 = new TableQueryConfig(TEST_TABLE_2).setRanges(Collections.singletonList(new Range("a", "b"))) + .fetchColumns(Collections.singleton(new Pair(new Text("CF1"), new Text("CQ1")))) + .setIterators(Collections.singletonList(new IteratorSetting(50, "iter1", "iterclass1"))); + + AccumuloMultiTableInputFormat.setBatchScanConfigs(job, table1, table2); + + assertEquals(table1, AccumuloMultiTableInputFormat.getBatchScanConfig(job, TEST_TABLE_1)); + assertEquals(table2, AccumuloMultiTableInputFormat.getBatchScanConfig(job, TEST_TABLE_2)); + } + +}