accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cjno...@apache.org
Subject git commit: Cleaning things up. Merging MultiTableInputFormatTest with AccumuloInputFormatTest. Columns now able to be set globally or per-table. ACCUMULO-391
Date Fri, 13 Sep 2013 03:52:41 GMT
Updated Branches:
  refs/heads/ACCUMULO-391 f8d28e7be -> d344ad6d1


Cleaning things up. Merging MultiTableInputFormatTest with AccumuloInputFormatTest. Columns now able to be set globally or per-table. ACCUMULO-391


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/d344ad6d
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/d344ad6d
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/d344ad6d

Branch: refs/heads/ACCUMULO-391
Commit: d344ad6d112c9268480b6baca2f8700fd7d5a2a3
Parents: f8d28e7
Author: Corey J. Nolet <cjnolet@gmail.com>
Authored: Thu Sep 12 23:51:31 2013 -0400
Committer: Corey J. Nolet <cjnolet@gmail.com>
Committed: Thu Sep 12 23:51:31 2013 -0400

----------------------------------------------------------------------
 .../core/client/mapred/InputFormatBase.java     | 240 ++++++++--------
 .../core/client/mapreduce/InputFormatBase.java  |  21 +-
 .../mapreduce/lib/util/InputConfigurator.java   |  60 ++--
 .../mapreduce/AccumuloInputFormatTest.java      | 276 +++++++++++++++++--
 .../mapreduce/MultiTableInputFormatTest.java    | 245 ----------------
 5 files changed, 408 insertions(+), 434 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/d344ad6d/core/src/main/java/org/apache/accumulo/core/client/mapred/InputFormatBase.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/mapred/InputFormatBase.java b/core/src/main/java/org/apache/accumulo/core/client/mapred/InputFormatBase.java
index bef5b94..6169505 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/mapred/InputFormatBase.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/mapred/InputFormatBase.java
@@ -82,17 +82,17 @@ import static org.apache.accumulo.core.client.security.tokens.AuthenticationToke
  * See {@link AccumuloInputFormat} for an example implementation.
  */
 public abstract class InputFormatBase<K,V> implements InputFormat<K,V> {
-  
+
   private static final Class<?> CLASS = AccumuloInputFormat.class;
   protected static final Logger log = Logger.getLogger(CLASS);
-  
+
   /**
    * Sets the connector information needed to communicate with Accumulo in this job.
-   * 
+   *
    * <p>
    * <b>WARNING:</b> The serialized token is stored in the configuration and shared with all MapReduce tasks. It is BASE64 encoded to provide a charset safe
    * conversion to a string, and is not intended to be secure.
-   * 
+   *
    * @param job
    *          the Hadoop job instance to be configured
    * @param principal
@@ -105,13 +105,13 @@ public abstract class InputFormatBase<K,V> implements InputFormat<K,V> {
   public static void setConnectorInfo(JobConf job, String principal, AuthenticationToken token) throws AccumuloSecurityException {
     InputConfigurator.setConnectorInfo(CLASS, job, principal, token);
   }
-  
+
   /**
    * Sets the connector information needed to communicate with Accumulo in this job.
-   * 
+   *
    * <p>
    * Stores the password in a file in HDFS and pulls that into the Distributed Cache in an attempt to be more secure than storing it in the Configuration.
-   * 
+   *
    * @param job
    *          the Hadoop job instance to be configured
    * @param principal
@@ -124,10 +124,10 @@ public abstract class InputFormatBase<K,V> implements InputFormat<K,V> {
   public static void setConnectorInfo(JobConf job, String principal, String tokenFile) throws AccumuloSecurityException {
     InputConfigurator.setConnectorInfo(CLASS, job, principal, tokenFile);
   }
-  
+
   /**
    * Determines if the connector has been configured.
-   * 
+   *
    * @param job
    *          the Hadoop context for the configured job
    * @return true if the connector has been configured, false otherwise
@@ -137,10 +137,10 @@ public abstract class InputFormatBase<K,V> implements InputFormat<K,V> {
   protected static Boolean isConnectorInfoSet(JobConf job) {
     return InputConfigurator.isConnectorInfoSet(CLASS, job);
   }
-  
+
   /**
    * Gets the user name from the configuration.
-   * 
+   *
    * @param job
    *          the Hadoop context for the configured job
    * @return the user name
@@ -150,10 +150,10 @@ public abstract class InputFormatBase<K,V> implements InputFormat<K,V> {
   protected static String getPrincipal(JobConf job) {
     return InputConfigurator.getPrincipal(CLASS, job);
   }
-  
+
   /**
    * Gets the serialized token class from the configuration.
-   * 
+   *
    * @param job
    *          the Hadoop context for the configured job
    * @return the user name
@@ -163,11 +163,11 @@ public abstract class InputFormatBase<K,V> implements InputFormat<K,V> {
   protected static String getTokenClass(JobConf job) {
     return InputConfigurator.getTokenClass(CLASS, job);
   }
-  
+
   /**
    * Gets the password from the configuration. WARNING: The password is stored in the Configuration and shared with all MapReduce tasks; It is BASE64 encoded to
    * provide a charset safe conversion to a string, and is not intended to be secure.
-   * 
+   *
    * @param job
    *          the Hadoop context for the configured job
    * @return the decoded user password
@@ -177,11 +177,11 @@ public abstract class InputFormatBase<K,V> implements InputFormat<K,V> {
   protected static byte[] getToken(JobConf job) {
     return InputConfigurator.getToken(CLASS, job);
   }
-  
+
   /**
    * Gets the password file from the configuration. It is BASE64 encoded to provide a charset safe conversion to a string, and is not intended to be secure. If
    * specified, the password will be stored in a file rather than in the Configuration.
-   * 
+   *
    * @param job
    *          the Hadoop context for the configured job
    * @return path to the password file as a String
@@ -191,10 +191,10 @@ public abstract class InputFormatBase<K,V> implements InputFormat<K,V> {
   protected static String getTokenFile(JobConf job) {
     return InputConfigurator.getTokenFile(CLASS, job);
   }
-  
+
   /**
    * Configures a {@link ZooKeeperInstance} for this job.
-   * 
+   *
    * @param job
    *          the Hadoop job instance to be configured
    * @param instanceName
@@ -206,10 +206,10 @@ public abstract class InputFormatBase<K,V> implements InputFormat<K,V> {
   public static void setZooKeeperInstance(JobConf job, String instanceName, String zooKeepers) {
     InputConfigurator.setZooKeeperInstance(CLASS, job, instanceName, zooKeepers);
   }
-  
+
   /**
    * Configures a {@link MockInstance} for this job.
-   * 
+   *
    * @param job
    *          the Hadoop job instance to be configured
    * @param instanceName
@@ -219,10 +219,10 @@ public abstract class InputFormatBase<K,V> implements InputFormat<K,V> {
   public static void setMockInstance(JobConf job, String instanceName) {
     InputConfigurator.setMockInstance(CLASS, job, instanceName);
   }
-  
+
   /**
    * Initializes an Accumulo {@link Instance} based on the configuration.
-   * 
+   *
    * @param job
    *          the Hadoop context for the configured job
    * @return an Accumulo instance
@@ -233,10 +233,10 @@ public abstract class InputFormatBase<K,V> implements InputFormat<K,V> {
   protected static Instance getInstance(JobConf job) {
     return InputConfigurator.getInstance(CLASS, job);
   }
-  
+
   /**
    * Sets the log level for this job.
-   * 
+   *
    * @param job
    *          the Hadoop job instance to be configured
    * @param level
@@ -246,10 +246,10 @@ public abstract class InputFormatBase<K,V> implements InputFormat<K,V> {
   public static void setLogLevel(JobConf job, Level level) {
     InputConfigurator.setLogLevel(CLASS, job, level);
   }
-  
+
   /**
    * Gets the log level from this configuration.
-   * 
+   *
    * @param job
    *          the Hadoop context for the configured job
    * @return the log level
@@ -259,10 +259,10 @@ public abstract class InputFormatBase<K,V> implements InputFormat<K,V> {
   protected static Level getLogLevel(JobConf job) {
     return InputConfigurator.getLogLevel(CLASS, job);
   }
-  
+
   /**
    * Sets the name of the input table, over which this job will scan.
-   * 
+   *
    * @param job
    *          the Hadoop job instance to be configured
    * @param tableName
@@ -272,10 +272,10 @@ public abstract class InputFormatBase<K,V> implements InputFormat<K,V> {
   public static void setInputTableName(JobConf job, String tableName) {
     InputConfigurator.setInputTableName(CLASS, job, tableName);
   }
-  
+
   /**
    * Gets the table name from the configuration.
-   * 
+   *
    * @param job
    *          the Hadoop context for the configured job
    * @return the table name
@@ -289,10 +289,10 @@ public abstract class InputFormatBase<K,V> implements InputFormat<K,V> {
     else
       return null;
   }
-  
+
   /**
    * Sets the {@link Authorizations} used to scan. Must be a subset of the user's authorization. Defaults to the empty set.
-   * 
+   *
    * @param job
    *          the Hadoop job instance to be configured
    * @param auths
@@ -302,10 +302,10 @@ public abstract class InputFormatBase<K,V> implements InputFormat<K,V> {
   public static void setScanAuthorizations(JobConf job, Authorizations auths) {
     InputConfigurator.setScanAuthorizations(CLASS, job, auths);
   }
-  
+
   /**
    * Gets the authorizations to set for the scans from the configuration.
-   * 
+   *
    * @param job
    *          the Hadoop context for the configured job
    * @return the Accumulo scan authorizations
@@ -315,10 +315,10 @@ public abstract class InputFormatBase<K,V> implements InputFormat<K,V> {
   protected static Authorizations getScanAuthorizations(JobConf job) {
     return InputConfigurator.getScanAuthorizations(CLASS, job);
   }
-  
+
   /**
    * Sets the input ranges to scan for this job. If not set, the entire table will be scanned.
-   * 
+   *
    * @param job
    *          the Hadoop job instance to be configured
    * @param ranges
@@ -328,10 +328,10 @@ public abstract class InputFormatBase<K,V> implements InputFormat<K,V> {
   public static void setRanges(JobConf job, Collection<Range> ranges) {
     InputConfigurator.setRanges(CLASS, job, ranges);
   }
-  
+
   /**
    * Gets the ranges to scan over from a job.
-   * 
+   *
    * @param job
    *          the Hadoop context for the configured job
    * @return the ranges
@@ -343,10 +343,10 @@ public abstract class InputFormatBase<K,V> implements InputFormat<K,V> {
   protected static Map<String, List<Range>> getRanges(JobConf job) throws IOException {
     return InputConfigurator.getRanges(CLASS, job);
   }
-  
+
   /**
    * Restricts the columns that will be mapped over for this job.
-   * 
+   *
    * @param job
    *          the Hadoop job instance to be configured
    * @param columnFamilyColumnQualifierPairs
@@ -357,10 +357,10 @@ public abstract class InputFormatBase<K,V> implements InputFormat<K,V> {
   public static void fetchColumns(JobConf job, Collection<Pair<Text,Text>> columnFamilyColumnQualifierPairs) {
     InputConfigurator.fetchColumns(CLASS, job, columnFamilyColumnQualifierPairs);
   }
-  
+
   /**
    * Gets the columns to be mapped over from this job.
-   * 
+   *
    * @param job
    *          the Hadoop context for the configured job
    * @return a set of columns
@@ -373,7 +373,7 @@ public abstract class InputFormatBase<K,V> implements InputFormat<K,V> {
 
   /**
    * Encode an iterator on the input for this job.
-   * 
+   *
    * @param job
    *          the Hadoop job instance to be configured
    * @param cfg
@@ -383,10 +383,10 @@ public abstract class InputFormatBase<K,V> implements InputFormat<K,V> {
   public static void addIterator(JobConf job, IteratorSetting cfg) {
     InputConfigurator.addIterator(CLASS, job, cfg);
   }
-  
+
   /**
    * Gets a list of the iterator settings (for iterators to apply to a scanner) from this configuration.
-   * 
+   *
    * @param job
    *          the Hadoop context for the configured job
    * @return a list of iterators
@@ -396,14 +396,14 @@ public abstract class InputFormatBase<K,V> implements InputFormat<K,V> {
   protected static List<IteratorSetting> getIterators(JobConf job) {
     return InputConfigurator.getDefaultIterators(CLASS, job);
   }
-  
+
   /**
    * Controls the automatic adjustment of ranges for this job. This feature merges overlapping ranges, then splits them to align with tablet boundaries.
    * Disabling this feature will cause exactly one Map task to be created for each specified range. The default setting is enabled. *
-   * 
+   *
    * <p>
    * By default, this feature is <b>enabled</b>.
-   * 
+   *
    * @param job
    *          the Hadoop job instance to be configured
    * @param enableFeature
@@ -414,10 +414,10 @@ public abstract class InputFormatBase<K,V> implements InputFormat<K,V> {
   public static void setAutoAdjustRanges(JobConf job, boolean enableFeature) {
     InputConfigurator.setAutoAdjustRanges(CLASS, job, enableFeature);
   }
-  
+
   /**
    * Determines whether a configuration has auto-adjust ranges enabled.
-   * 
+   *
    * @param job
    *          the Hadoop context for the configured job
    * @return false if the feature is disabled, true otherwise
@@ -427,13 +427,13 @@ public abstract class InputFormatBase<K,V> implements InputFormat<K,V> {
   protected static boolean getAutoAdjustRanges(JobConf job) {
     return InputConfigurator.getAutoAdjustRanges(CLASS, job);
   }
-  
+
   /**
    * Controls the use of the {@link IsolatedScanner} in this job.
-   * 
+   *
    * <p>
    * By default, this feature is <b>disabled</b>.
-   * 
+   *
    * @param job
    *          the Hadoop job instance to be configured
    * @param enableFeature
@@ -443,10 +443,10 @@ public abstract class InputFormatBase<K,V> implements InputFormat<K,V> {
   public static void setScanIsolation(JobConf job, boolean enableFeature) {
     InputConfigurator.setScanIsolation(CLASS, job, enableFeature);
   }
-  
+
   /**
    * Determines whether a configuration has isolation enabled.
-   * 
+   *
    * @param job
    *          the Hadoop context for the configured job
    * @return true if the feature is enabled, false otherwise
@@ -456,14 +456,14 @@ public abstract class InputFormatBase<K,V> implements InputFormat<K,V> {
   protected static boolean isIsolated(JobConf job) {
     return InputConfigurator.isIsolated(CLASS, job);
   }
-  
+
   /**
    * Controls the use of the {@link ClientSideIteratorScanner} in this job. Enabling this feature will cause the iterator stack to be constructed within the Map
    * task, rather than within the Accumulo TServer. To use this feature, all classes needed for those iterators must be available on the classpath for the task.
-   * 
+   *
    * <p>
    * By default, this feature is <b>disabled</b>.
-   * 
+   *
    * @param job
    *          the Hadoop job instance to be configured
    * @param enableFeature
@@ -473,10 +473,10 @@ public abstract class InputFormatBase<K,V> implements InputFormat<K,V> {
   public static void setLocalIterators(JobConf job, boolean enableFeature) {
     InputConfigurator.setLocalIterators(CLASS, job, enableFeature);
   }
-  
+
   /**
    * Determines whether a configuration uses local iterators.
-   * 
+   *
    * @param job
    *          the Hadoop context for the configured job
    * @return true if the feature is enabled, false otherwise
@@ -486,32 +486,32 @@ public abstract class InputFormatBase<K,V> implements InputFormat<K,V> {
   protected static boolean usesLocalIterators(JobConf job) {
     return InputConfigurator.usesLocalIterators(CLASS, job);
   }
-  
+
   /**
    * <p>
    * Enable reading offline tables. By default, this feature is disabled and only online tables are scanned. This will make the map reduce job directly read the
    * table's files. If the table is not offline, then the job will fail. If the table comes online during the map reduce job, it is likely that the job will
    * fail.
-   * 
+   *
    * <p>
    * To use this option, the map reduce user will need access to read the Accumulo directory in HDFS.
-   * 
+   *
    * <p>
    * Reading the offline table will create the scan time iterator stack in the map process. So any iterators that are configured for the table will need to be
    * on the mapper's classpath. The accumulo-site.xml may need to be on the mapper's classpath if HDFS or the Accumulo directory in HDFS are non-standard.
-   * 
+   *
    * <p>
    * One way to use this feature is to clone a table, take the clone offline, and use the clone as the input table for a map reduce job. If you plan to map
    * reduce over the data many times, it may be better to the compact the table, clone it, take it offline, and use the clone for all map reduce jobs. The
    * reason to do this is that compaction will reduce each tablet in the table to one file, and it is faster to read from one file.
-   * 
+   *
    * <p>
    * There are two possible advantages to reading a tables file directly out of HDFS. First, you may see better read performance. Second, it will support
    * speculative execution better. When reading an online table speculative execution can put more load on an already slow tablet server.
-   * 
+   *
    * <p>
    * By default, this feature is <b>disabled</b>.
-   * 
+   *
    * @param job
    *          the Hadoop job instance to be configured
    * @param enableFeature
@@ -521,10 +521,10 @@ public abstract class InputFormatBase<K,V> implements InputFormat<K,V> {
   public static void setOfflineTableScan(JobConf job, boolean enableFeature) {
     InputConfigurator.setOfflineTableScan(CLASS, job, enableFeature);
   }
-  
+
   /**
    * Determines whether a configuration has the offline table scan feature enabled.
-   * 
+   *
    * @param job
    *          the Hadoop context for the configured job
    * @return true if the feature is enabled, false otherwise
@@ -534,10 +534,10 @@ public abstract class InputFormatBase<K,V> implements InputFormat<K,V> {
   protected static boolean isOfflineScan(JobConf job) {
     return InputConfigurator.isOfflineScan(CLASS, job);
   }
-  
+
   /**
    * Initializes an Accumulo {@link TabletLocator} based on the configuration.
-   * 
+   *
    * @param job
    *          the Hadoop context for the configured job
    * @return an Accumulo tablet locator
@@ -548,11 +548,11 @@ public abstract class InputFormatBase<K,V> implements InputFormat<K,V> {
   protected static TabletLocator getTabletLocator(JobConf job, String tableName) throws TableNotFoundException {
     return InputConfigurator.getTabletLocator(CLASS, job, tableName);
   }
-  
+
   // InputFormat doesn't have the equivalent of OutputFormat's checkOutputSpecs(JobContext job)
   /**
    * Check whether a configuration is fully configured to be used with an Accumulo {@link org.apache.hadoop.mapreduce.InputFormat}.
-   * 
+   *
    * @param job
    *          the Hadoop context for the configured job
    * @throws IOException
@@ -562,11 +562,11 @@ public abstract class InputFormatBase<K,V> implements InputFormat<K,V> {
   protected static void validateOptions(JobConf job) throws IOException {
     InputConfigurator.validateOptions(CLASS, job);
   }
-  
+
   /**
    * An abstract base class to be used to create {@link RecordReader} instances that convert from Accumulo {@link Key}/{@link Value} pairs to the user's K/V
    * types.
-   * 
+   *
    * Subclasses must implement {@link #next(Object, Object)} to update key and value, and also to update the following variables:
    * <ul>
    * <li>Key {@link #currentKey} (used for progress reporting)</li>
@@ -577,10 +577,10 @@ public abstract class InputFormatBase<K,V> implements InputFormat<K,V> {
     protected long numKeysRead;
     protected Iterator<Entry<Key,Value>> scannerIterator;
     protected RangeInputSplit split;
-    
+
     /**
      * Apply the configured iterators from the configuration to the scanner.
-     * 
+     *
      * @param job
      *          the Hadoop context for the configured job
      * @param scanner
@@ -592,7 +592,7 @@ public abstract class InputFormatBase<K,V> implements InputFormat<K,V> {
         scanner.addScanIterator(iterator);
       }
     }
-    
+
     /**
      * Initialize a scanner over the given input split using this task attempt configuration.
      */
@@ -605,7 +605,7 @@ public abstract class InputFormatBase<K,V> implements InputFormat<K,V> {
       String tokenClass = getTokenClass(job);
       byte[] password = getToken(job);
       Authorizations authorizations = getScanAuthorizations(job);
-      
+
       try {
         log.debug("Creating connector with user: " + user);
         Connector conn = instance.getConnector(user, AuthenticationTokenSerializer.deserialize(tokenClass, password));
@@ -629,7 +629,7 @@ public abstract class InputFormatBase<K,V> implements InputFormat<K,V> {
       } catch (Exception e) {
         throw new IOException(e);
       }
-      
+
       // setup a scanner within the bounds of this split
       for (Pair<Text,Text> c : getFetchedColumns(job)) {
         if (c.getSecond() != null) {
@@ -640,58 +640,58 @@ public abstract class InputFormatBase<K,V> implements InputFormat<K,V> {
           scanner.fetchColumnFamily(c.getFirst());
         }
       }
-      
+
       scanner.setRange(split.getRange());
-      
+
       numKeysRead = 0;
-      
+
       // do this last after setting all scanner options
       scannerIterator = scanner.iterator();
     }
-    
+
     @Override
     public void close() {}
-    
+
     @Override
     public long getPos() throws IOException {
       return numKeysRead;
     }
-    
+
     @Override
     public float getProgress() throws IOException {
       if (numKeysRead > 0 && currentKey == null)
         return 1.0f;
       return split.getProgress(currentKey);
     }
-    
+
     protected Key currentKey = null;
-    
+
   }
-  
+
   Map<String,Map<KeyExtent,List<Range>>> binOfflineTable(JobConf job, String tableName, List<Range> ranges) throws TableNotFoundException, AccumuloException,
       AccumuloSecurityException {
-    
+
     Map<String,Map<KeyExtent,List<Range>>> binnedRanges = new HashMap<String,Map<KeyExtent,List<Range>>>();
-    
+
     Instance instance = getInstance(job);
     Connector conn = instance.getConnector(getPrincipal(job), AuthenticationTokenSerializer.deserialize(getTokenClass(job), getToken(job)));
     String tableId = Tables.getTableId(instance, tableName);
-    
+
     if (Tables.getTableState(instance, tableId) != TableState.OFFLINE) {
       Tables.clearCache(instance);
       if (Tables.getTableState(instance, tableId) != TableState.OFFLINE) {
         throw new AccumuloException("Table is online " + tableName + "(" + tableId + ") cannot scan table in offline mode ");
       }
     }
-    
+
     for (Range range : ranges) {
       Text startRow;
-      
+
       if (range.getStartKey() != null)
         startRow = range.getStartKey().getRow();
       else
         startRow = new Text();
-      
+
       Range metadataRange = new Range(new KeyExtent(new Text(tableId), startRow, null).getMetadataEntry(), true, null, false);
       Scanner scanner = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
       TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN.fetch(scanner);
@@ -699,73 +699,73 @@ public abstract class InputFormatBase<K,V> implements InputFormat<K,V> {
       scanner.fetchColumnFamily(TabletsSection.CurrentLocationColumnFamily.NAME);
       scanner.fetchColumnFamily(TabletsSection.FutureLocationColumnFamily.NAME);
       scanner.setRange(metadataRange);
-      
+
       RowIterator rowIter = new RowIterator(scanner);
-      
+
       KeyExtent lastExtent = null;
-      
+
       while (rowIter.hasNext()) {
         Iterator<Entry<Key,Value>> row = rowIter.next();
         String last = "";
         KeyExtent extent = null;
         String location = null;
-        
+
         while (row.hasNext()) {
           Entry<Key,Value> entry = row.next();
           Key key = entry.getKey();
-          
+
           if (key.getColumnFamily().equals(TabletsSection.LastLocationColumnFamily.NAME)) {
             last = entry.getValue().toString();
           }
-          
+
           if (key.getColumnFamily().equals(TabletsSection.CurrentLocationColumnFamily.NAME)
               || key.getColumnFamily().equals(TabletsSection.FutureLocationColumnFamily.NAME)) {
             location = entry.getValue().toString();
           }
-          
+
           if (TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN.hasColumns(key)) {
             extent = new KeyExtent(key.getRow(), entry.getValue());
           }
-          
+
         }
-        
+
         if (location != null)
           return null;
-        
+
         if (!extent.getTableId().toString().equals(tableId)) {
           throw new AccumuloException("Saw unexpected table Id " + tableId + " " + extent);
         }
-        
+
         if (lastExtent != null && !extent.isPreviousExtent(lastExtent)) {
           throw new AccumuloException(" " + lastExtent + " is not previous extent " + extent);
         }
-        
+
         Map<KeyExtent,List<Range>> tabletRanges = binnedRanges.get(last);
         if (tabletRanges == null) {
           tabletRanges = new HashMap<KeyExtent,List<Range>>();
           binnedRanges.put(last, tabletRanges);
         }
-        
+
         List<Range> rangeList = tabletRanges.get(extent);
         if (rangeList == null) {
           rangeList = new ArrayList<Range>();
           tabletRanges.put(extent, rangeList);
         }
-        
+
         rangeList.add(range);
-        
+
         if (extent.getEndRow() == null || range.afterEndKey(new Key(extent.getEndRow()).followingKey(PartialKey.ROW))) {
           break;
         }
-        
+
         lastExtent = extent;
       }
-      
+
     }
-    
+
     return binnedRanges;
   }
-  
+
   /**
    * Read the metadata table to get tablets and match up ranges to them.
    */
@@ -871,19 +871,19 @@ public abstract class InputFormatBase<K,V> implements InputFormat<K,V> {
    * The Class RangeInputSplit. Encapsulates an Accumulo range for use in Map Reduce jobs.
    */
   public static class RangeInputSplit extends org.apache.accumulo.core.client.mapreduce.InputFormatBase.RangeInputSplit implements InputSplit {
-    
+
     public RangeInputSplit() {
       super();
     }
-    
+
     public RangeInputSplit(RangeInputSplit split) throws IOException {
       super(split);
     }
-    
+
     protected RangeInputSplit(String table, Range range, String[] locations) {
       super(table, range, locations);
     }
-    
+
   }
-  
+
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/d344ad6d/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java
index 3d870ea..aa23326 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java
@@ -394,7 +394,23 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
   public static void fetchColumns(Job job, Collection<Pair<Text,Text>> columnFamilyColumnQualifierPairs) {
     InputConfigurator.fetchColumns(CLASS, job.getConfiguration(), columnFamilyColumnQualifierPairs);
   }
-  
+
+  /**
+   * Restricts the columns that will be mapped over for this job.
+   *
+   * @param job
+   *          the Hadoop job instance to be configured
+   * @param columnFamilyColumnQualifierPairs
+   *          a pair of {@link Text} objects corresponding to column family and column qualifier. If the column qualifier is null, the entire column family is
+   *          selected. An empty set is the default and is equivalent to scanning the all columns.
+   * @since 1.5.0
+   */
+  public static void fetchColumns(Job job, Map<String, Collection<Pair<Text,Text>>> columnFamilyColumnQualifierPairs) {
+    InputConfigurator.fetchColumns(CLASS, job.getConfiguration(), columnFamilyColumnQualifierPairs);
+  }
+
+
+
   /**
    * Gets the columns to be mapped over from this job.
    * 
@@ -416,9 +432,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
    * @param cfg
    *          the configuration of the iterator
    * @since 1.5.0
-   * @deprecated since 1.6.0
    */
-  @Deprecated
   public static void addIterator(Job job, IteratorSetting cfg) {
       InputConfigurator.addIterator(CLASS, job.getConfiguration(), cfg);
   }
@@ -432,7 +446,6 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
    *          the configuration of the iterator
    * @since 1.6.0
    */
-  @Deprecated
   public static void addIterator(Job job, IteratorSetting cfg, String table) {
     InputConfigurator.addIterator(CLASS, job.getConfiguration(), table, cfg);
   }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/d344ad6d/core/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/util/InputConfigurator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/util/InputConfigurator.java b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/util/InputConfigurator.java
index 0552dc6..baf5309 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/util/InputConfigurator.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/util/InputConfigurator.java
@@ -24,6 +24,7 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -177,7 +178,7 @@ public class InputConfigurator extends ConfiguratorBase {
   }
   
   /**
-   * Sets the input ranges to scan for this job. If not set, the entire table will be scanned.
+   * Sets the input ranges to scan on all input tables for this job. If not set, the entire table will be scanned.
    * 
    * @param implementingClass
    *          the class whose name will be used as a prefix for the property configuration key
@@ -190,33 +191,18 @@ public class InputConfigurator extends ConfiguratorBase {
    */
   public static void setRanges(Class<?> implementingClass, Configuration conf, Collection<Range> ranges) {
     notNull (ranges);
-    notNull(ranges);
 
     String[] tableNames = getInputTableNames (implementingClass, conf);
-    if(tableNames.length > 0) {
-
-      String tableName = tableNames[0];
-
-      ArrayList<String> rangeStrings = new ArrayList<String>(ranges.size());
-      try {
-        for (Range r : ranges) {
-          TableRange tblRange = new TableRange(tableName, r);
-          ByteArrayOutputStream baos = new ByteArrayOutputStream();
-          tblRange.write(new DataOutputStream(baos));
-          rangeStrings.add(new String(encodeBase64 (baos.toByteArray ())));
-        }
-      } catch (IOException ex) {
-        throw new IllegalArgumentException("Unable to encode ranges to Base64", ex);
-      }
-      conf.setStrings(enumToConfKey(implementingClass, ScanOpts.RANGES), rangeStrings.toArray(new String[0]));
-    } else {
-      throw new IllegalStateException ("Ranges can't be added until a table is set.");
-    }
+    Map<String, Collection<Range>> tableRanges = new HashMap<String, Collection<Range>>();
+    for(String table : tableNames)
+      tableRanges.put(table, ranges);
 
+    setRanges (implementingClass, conf, tableRanges);
   }
 
   /**
-   * Sets the input ranges to scan for this job. If not set, the entire table will be scanned.
+   * Sets the input ranges to scan for specific tables of this job. If any ranges are not set on an input table, that
+   * whole table will be scanned.
    *
    * @param implementingClass
    *          the class whose name will be used as a prefix for the property configuration key
@@ -280,8 +266,8 @@ public class InputConfigurator extends ConfiguratorBase {
   }
   
   /**
-   * Restricts the columns that will be mapped over for this job. This provides backwards compatibility when single
-   * tables
+   * Restricts the columns that will be mapped over for this job. This applies the columns to all tables that have been
+   * set on the job.
    * 
    * @param implementingClass
    *          the class whose name will be used as a prefix for the property configuration key
@@ -294,23 +280,12 @@ public class InputConfigurator extends ConfiguratorBase {
    */
   public static void fetchColumns(Class<?> implementingClass, Configuration conf, Collection<Pair<Text,Text>> columnFamilyColumnQualifierPairs) {
     notNull (columnFamilyColumnQualifierPairs);
-    ArrayList<String> columnStrings = new ArrayList<String>(columnFamilyColumnQualifierPairs.size());
-    for (Pair<Text,Text> column : columnFamilyColumnQualifierPairs) {
-      if (column.getFirst() == null)
-        throw new IllegalArgumentException("Column family can not be null");
-      
-      String col = new String(encodeBase64 (getBytes (column.getFirst ())), UTF8);
-      if (column.getSecond() != null)
-        col += ":" + new String(encodeBase64 (getBytes (column.getSecond ())), UTF8);
-      columnStrings.add(col);
-    }
-
     String[] tables = getInputTableNames (implementingClass, conf);
-    if(tables.length > 0)
-      conf.setStrings(format ("%s.%s", enumToConfKey (implementingClass, COLUMNS), tables[0]),
-              columnStrings.toArray (new String[0]));
-    else
-      throw new IllegalStateException ("Input tables must be set before setting fetched columns");
+    Map<String, Collection<Pair<Text,Text>>> columnTablePairs = new HashMap<String, Collection<Pair<Text,Text>>>();
+    for(String table : tables) {
+      columnTablePairs.put (table, columnFamilyColumnQualifierPairs);
+    }
+    fetchColumns (implementingClass, conf, columnTablePairs);
   }
 
   /**
@@ -328,9 +303,8 @@ public class InputConfigurator extends ConfiguratorBase {
    */
   public static void fetchColumns(Class<?> implementingClass, Configuration conf, Map<String, Collection<Pair<Text,Text>>> columnFamilyColumnQualifierPairs) {
     notNull (columnFamilyColumnQualifierPairs);
-    ArrayList<String> columnStrings = new ArrayList<String>(columnFamilyColumnQualifierPairs.size());
     for (Map.Entry<String, Collection<Pair<Text,Text>>> tableColumns : columnFamilyColumnQualifierPairs.entrySet()) {
-
+      ArrayList<String> columnStrings = new ArrayList<String>();
       String tableName = tableColumns.getKey();
       for(Pair<Text,Text> column : tableColumns.getValue()) {
 
@@ -456,7 +430,7 @@ public class InputConfigurator extends ConfiguratorBase {
   public static List<IteratorSetting> getIterators(Class<?> implementingClass, Configuration conf, String table) {
 
     String iterators = conf.get(format ("%s.%s", enumToConfKey (implementingClass, ITERATORS), table));
-    
+
     // If no iterators are present, return an empty list
     if (iterators == null || iterators.isEmpty())
       return new ArrayList<IteratorSetting>();

http://git-wip-us.apache.org/repos/asf/accumulo/blob/d344ad6d/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormatTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormatTest.java b/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormatTest.java
index 14f5775..2c88a01 100644
--- a/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormatTest.java
+++ b/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormatTest.java
@@ -16,15 +16,39 @@
  */
 package org.apache.accumulo.core.client.mapreduce;
 
+import static org.apache.accumulo.core.client.mapreduce.InputFormatBase.RangeInputSplit;
+import static org.apache.accumulo.core.client.mapreduce.InputFormatBase.addIterator;
+import static org.apache.accumulo.core.client.mapreduce.InputFormatBase.fetchColumns;
+import static org.apache.accumulo.core.client.mapreduce.InputFormatBase.getFetchedColumns;
+import static org.apache.accumulo.core.client.mapreduce.InputFormatBase.getIterators;
+import static org.apache.accumulo.core.client.mapreduce.InputFormatBase.getRanges;
+import static org.apache.accumulo.core.client.mapreduce.InputFormatBase.setConnectorInfo;
+import static org.apache.accumulo.core.client.mapreduce.InputFormatBase.setInputTableName;
+import static org.apache.accumulo.core.client.mapreduce.InputFormatBase.setInputTableNames;
+import static org.apache.accumulo.core.client.mapreduce.InputFormatBase.setMockInstance;
+import static org.apache.accumulo.core.client.mapreduce.InputFormatBase.setRanges;
+import static org.apache.accumulo.core.client.mapreduce.InputFormatBase.setScanAuthorizations;
+import static org.apache.accumulo.core.client.mapreduce.multi.ContextFactory.createMapContext;
+import static org.apache.accumulo.core.client.mapreduce.multi.ContextFactory.createTaskAttemptContext;
+import static org.apache.accumulo.core.iterators.user.RegExFilter.setRegexs;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
 import java.io.ByteArrayOutputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
 import java.util.List;
+import java.util.Map;
 
+import junit.framework.Assert;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
 import org.apache.accumulo.core.client.BatchWriter;
 import org.apache.accumulo.core.client.BatchWriterConfig;
 import org.apache.accumulo.core.client.Connector;
@@ -33,16 +57,22 @@ import org.apache.accumulo.core.client.mock.MockInstance;
 import org.apache.accumulo.core.client.security.tokens.PasswordToken;
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Range;
 import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.iterators.user.RegExFilter;
 import org.apache.accumulo.core.iterators.user.WholeRowIterator;
+import org.apache.accumulo.core.security.Authorizations;
 import org.apache.accumulo.core.util.CachedConfiguration;
+import org.apache.accumulo.core.util.Pair;
 import org.apache.commons.codec.binary.Base64;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configured;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
@@ -64,7 +94,7 @@ public class AccumuloInputFormatTest {
     Job job = new Job();
 
     IteratorSetting is = new IteratorSetting(1, "WholeRow", "org.apache.accumulo.core.iterators.WholeRowIterator");
-    AccumuloInputFormat.addIterator(job, is);
+    addIterator (job, is);
     Configuration conf = job.getConfiguration();
     ByteArrayOutputStream baos = new ByteArrayOutputStream();
     is.write(new DataOutputStream(baos));
@@ -76,14 +106,14 @@ public class AccumuloInputFormatTest {
   public void testAddIterator() throws IOException {
     Job job = new Job();
 
-    AccumuloInputFormat.addIterator(job, new IteratorSetting(1, "WholeRow", WholeRowIterator.class));
-    AccumuloInputFormat.addIterator(job, new IteratorSetting(2, "Versions", "org.apache.accumulo.core.iterators.VersioningIterator"));
+    addIterator (job, new IteratorSetting (1, "WholeRow", WholeRowIterator.class));
+    addIterator (job, new IteratorSetting (2, "Versions", "org.apache.accumulo.core.iterators.VersioningIterator"));
     IteratorSetting iter = new IteratorSetting(3, "Count", "org.apache.accumulo.core.iterators.CountingIterator");
     iter.addOption("v1", "1");
     iter.addOption("junk", "\0omg:!\\xyzzy");
-    AccumuloInputFormat.addIterator(job, iter);
+    addIterator (job, iter);
     
-    List<IteratorSetting> list = AccumuloInputFormat.getIterators(job);
+    List<IteratorSetting> list = getIterators (job);
     
     // Check the list size
     assertTrue(list.size() == 3);
@@ -123,18 +153,18 @@ public class AccumuloInputFormatTest {
     IteratorSetting someSetting = new IteratorSetting(1, "iterator", "Iterator.class");
     someSetting.addOption(key, value);
     Job job = new Job();
-    AccumuloInputFormat.addIterator(job, someSetting);
+    addIterator (job, someSetting);
     
-    List<IteratorSetting> list = AccumuloInputFormat.getIterators(job);
+    List<IteratorSetting> list = getIterators (job);
     assertEquals(1, list.size());
     assertEquals(1, list.get(0).getOptions().size());
     assertEquals(list.get(0).getOptions().get(key), value);
     
-    someSetting.addOption(key + "2", value);
-    someSetting.setPriority(2);
-    someSetting.setName("it2");
-    AccumuloInputFormat.addIterator(job, someSetting);
-    list = AccumuloInputFormat.getIterators(job);
+    someSetting.addOption (key + "2", value);
+    someSetting.setPriority (2);
+    someSetting.setName ("it2");
+    addIterator (job, someSetting);
+    list = getIterators (job);
     assertEquals(2, list.size());
     assertEquals(1, list.get(0).getOptions().size());
     assertEquals(list.get(0).getOptions().get(key), value);
@@ -152,11 +182,11 @@ public class AccumuloInputFormatTest {
   public void testGetIteratorSettings() throws IOException {
     Job job = new Job();
 
-    AccumuloInputFormat.addIterator(job, new IteratorSetting(1, "WholeRow", "org.apache.accumulo.core.iterators.WholeRowIterator"));
-    AccumuloInputFormat.addIterator(job, new IteratorSetting(2, "Versions", "org.apache.accumulo.core.iterators.VersioningIterator"));
-    AccumuloInputFormat.addIterator(job, new IteratorSetting(3, "Count", "org.apache.accumulo.core.iterators.CountingIterator"));
+    addIterator (job, new IteratorSetting (1, "WholeRow", "org.apache.accumulo.core.iterators.WholeRowIterator"));
+    addIterator (job, new IteratorSetting (2, "Versions", "org.apache.accumulo.core.iterators.VersioningIterator"));
+    addIterator (job, new IteratorSetting (3, "Count", "org.apache.accumulo.core.iterators.CountingIterator"));
     
-    List<IteratorSetting> list = AccumuloInputFormat.getIterators(job);
+    List<IteratorSetting> list = getIterators (job);
     
     // Check the list size
     assertTrue(list.size() == 3);
@@ -186,10 +216,10 @@ public class AccumuloInputFormatTest {
     String regex = ">\"*%<>\'\\";
     
     IteratorSetting is = new IteratorSetting(50, regex, RegExFilter.class);
-    RegExFilter.setRegexs(is, regex, null, null, null, false);
-    AccumuloInputFormat.addIterator(job, is);
+    setRegexs (is, regex, null, null, null, false);
+    addIterator (job, is);
     
-    assertTrue(regex.equals(AccumuloInputFormat.getIterators(job).get(0).getName()));
+    assertTrue(regex.equals(getIterators (job).get(0).getName()));
   }
   
   private static AssertionError e1 = null;
@@ -240,9 +270,9 @@ public class AccumuloInputFormatTest {
       
       job.setInputFormatClass(AccumuloInputFormat.class);
       
-      AccumuloInputFormat.setConnectorInfo(job, user, new PasswordToken(pass));
-      AccumuloInputFormat.setInputTableName(job, table);
-      AccumuloInputFormat.setMockInstance(job, INSTANCE_NAME);
+      setConnectorInfo (job, user, new PasswordToken (pass));
+      setInputTableName (job, table);
+      setMockInstance (job, INSTANCE_NAME);
       
       job.setMapperClass(TestMapper.class);
       job.setMapOutputKeyClass(Key.class);
@@ -260,6 +290,42 @@ public class AccumuloInputFormatTest {
       assertEquals(0, ToolRunner.run(CachedConfiguration.getInstance(), new MRTester(), args));
     }
   }
+
+  /**
+   * A sample Mapper that verifies aspects of the input.
+   *
+   * This mapper verifies that all keys passed to it are for the expected
+   * table and that it sees exactly 100 keys.
+   *
+   */
+  static class MultitableMapper extends Mapper<Key,Value,Key,Value> {
+    private int count;
+    private Text expectedTable;
+
+    public void expectedTable(Text t) {
+      this.expectedTable = t;
+    }
+
+    @Override
+    protected void setup(Context context) throws IOException,
+            InterruptedException {
+      super.setup(context);
+      count = 0;
+    }
+
+    @Override
+    protected void map(Key k, Value v, Context context) throws IOException, InterruptedException {
+      assertEquals(expectedTable.toString (), ((RangeInputSplit)context.getInputSplit ()).getTableName ());
+      ++count;
+    }
+
+    @Override
+    protected void cleanup(Context context) throws IOException,
+            InterruptedException {
+      super.cleanup(context);
+      Assert.assertEquals (100, count);
+    }
+  }
   
   @Test
   public void testMap() throws Exception {
@@ -278,4 +344,170 @@ public class AccumuloInputFormatTest {
     assertNull(e1);
     assertNull(e2);
   }
+
+  /**
+   * Asserts that the configuration contains the expected ranges for the tables.
+   */
+  @Test
+  public void testMultitableRangeSerialization() throws Throwable {
+    List<String> tables = Arrays.asList ("t1", "t2", "t3");
+    Job job = new Job(new Configuration());
+    job.setInputFormatClass(AccumuloInputFormat.class);
+    job.setMapperClass (MultitableMapper.class);
+    job.setNumReduceTasks (0);
+    setConnectorInfo (job, "root", new PasswordToken (new byte[0]));
+    setInputTableNames (job, tables);
+    setScanAuthorizations (job, new Authorizations ());
+    setMockInstance (job, "testmapinstance");
+
+    HashMap<String, Collection<Range>> tblRanges = new HashMap<String, Collection<Range>>();
+    for(String tbl : tables) {
+      List<Range> ranges = Arrays.asList(
+              new Range("a", "b"),
+              new Range("c", "d"),
+              new Range("e", "f") );
+      tblRanges.put(tbl, ranges);
+    }
+
+    setRanges (job, tblRanges);
+    Map<String, List<Range>> configuredRanges = getRanges (job);
+
+    for(Map.Entry<String, List<Range>> cfgRange : configuredRanges.entrySet()) {
+      String tbl = cfgRange.getKey();
+      HashSet<Range> originalRanges = new HashSet<Range>(tblRanges.remove(tbl));
+      HashSet<Range> retrievedRanges = new HashSet<Range>(cfgRange.getValue());
+      assertEquals (originalRanges.size (), retrievedRanges.size ());
+      assertTrue (originalRanges.containsAll (retrievedRanges));
+      assertTrue (retrievedRanges.containsAll (originalRanges));
+    }
+  }
+
+  /**
+   * Asserts that the configuration contains the expected iterators for the tables.
+   */
+  @Test
+  public void testMultitableIteratorSerialization() throws Throwable {
+    HashSet<String> tables = new HashSet<String>(Arrays.asList("t1", "t2"));
+    Job job = new Job(new Configuration());
+    job.setInputFormatClass (AccumuloInputFormat.class);
+    job.setMapperClass (MultitableMapper.class);
+    job.setNumReduceTasks (0);
+    setConnectorInfo (job, "root", new PasswordToken (new byte[0]));
+    setInputTableNames (job, tables);
+    setScanAuthorizations (job, new Authorizations ());
+
+    // create + set iterators on configuration and build expected reference set
+    IteratorSetting isetting1 = new IteratorSetting(1, "name1", "class1");
+    IteratorSetting isetting2 = new IteratorSetting(2, "name2", "class3");
+
+    addIterator (job, isetting1, "t1");
+    addIterator (job, isetting2, "t2");
+
+    // verify per-table iterators
+    List<IteratorSetting> t1iters = getIterators (job, "t1");
+    List<IteratorSetting> t2iters = getIterators (job, "t2");
+    assertFalse (t1iters.isEmpty ());
+    assertEquals(isetting1, t1iters.get(0));
+    assertEquals (isetting2, t2iters.get (0));
+  }
+
+  @Test
+  public void testMultitableColumnSerialization() throws IOException, AccumuloSecurityException {
+    HashSet<String> tables = new HashSet<String>(Arrays.asList("t1", "t2"));
+    Job job = new Job(new Configuration());
+    job.setInputFormatClass (AccumuloInputFormat.class);
+    job.setMapperClass (MultitableMapper.class);
+    job.setNumReduceTasks (0);
+    setConnectorInfo (job, "root", new PasswordToken (new byte[0]));
+    setInputTableNames (job, tables);
+    setScanAuthorizations (job, new Authorizations ());
+
+    Map<String, Collection<Pair<Text,Text>>> columns = new HashMap<String, Collection<Pair<Text,Text>>>();
+    HashSet<Pair<Text,Text>> t1cols = new HashSet<Pair<Text,Text>>();
+    t1cols.add(new Pair(new Text("a"), new Text("b")));
+    HashSet<Pair<Text,Text>> t2cols = new HashSet<Pair<Text, Text>> ();
+    t2cols.add(new Pair(new Text("b"), new Text("c")));
+    columns.put("t1", t1cols);
+    columns.put("t2", t2cols);
+
+    fetchColumns (job, columns);
+
+    Collection<Pair<Text,Text>> t1actual = getFetchedColumns (job, "t1");
+    assertEquals(columns.get("t1"), t1actual);
+    Collection<Pair<Text,Text>> t2actual = getFetchedColumns (job, "t2");
+  }
+
+
+  /**
+   * Creates five tables, table0 through table4, that get loaded with 100 keys each.
+   *
+   * This test expects that each table is filled with 100 entries and that a sample
+   * MapReduce job is created to scan all five. We should see five input splits; one for
+   * each table.
+   *
+   * The sample job uses the TestMapper class defined locally to this test. Verification
+   * of features such as expected table and number of keys is performed via the TestMapper.
+   *
+   * @throws Throwable
+   */
+  @Test
+  public void testMultitableMap() throws Throwable {
+    MockInstance mockInstance = new MockInstance("testmapinstance");
+    Connector c = mockInstance.getConnector("root", new byte[] {});
+    StringBuilder tablesBuilder = new StringBuilder();
+    LinkedList<String> tablesList = new LinkedList<String>();
+    for(int i = 0; i < 5; ++i) {
+      String table = "table" + i;
+      tablesList.add(table);
+      writeData(c, table);
+      tablesBuilder.append(table).append(',');
+    }
+    tablesBuilder.setLength(tablesBuilder.length() - 1);
+
+    Job job = new Job(new Configuration());
+    job.setInputFormatClass (AccumuloInputFormat.class);
+    job.setMapperClass (MultitableMapper.class);
+    job.setNumReduceTasks (0);
+    setConnectorInfo (job, "root", new PasswordToken (new byte[0]));
+    setInputTableNames (job, tablesList);
+    setScanAuthorizations (job, new Authorizations ());
+    setMockInstance (job, "testmapinstance");
+
+    AccumuloInputFormat input = new AccumuloInputFormat ();
+    List<InputSplit> splits = input.getSplits(job);
+    assertEquals(splits.size(), 5);
+
+    MultitableMapper mapper = (MultitableMapper) job.getMapperClass().newInstance();
+    for (InputSplit split : splits) {
+      TaskAttemptContext tac = createTaskAttemptContext (job);
+      RecordReader<Key,Value> reader = input.createRecordReader(split, tac);
+      Mapper<Key,Value,Key,Value>.Context context = createMapContext (mapper, tac, reader, null, split);
+      reader.initialize(split, context);
+      mapper.expectedTable( new Text( ((RangeInputSplit) split).getTableName () ) );
+      mapper.run(context);
+    }
+  }
+
+  /**
+   * Writes data out to a table.
+   *
+   * The data written out is 100 entries, with the row being a number 1-100 and the value
+   * being a number one less than the row (0-99).
+   *
+   * @param c
+   * @param table
+   * @throws Throwable
+   */
+  static void writeData(Connector c, String table) throws Throwable {
+    c.tableOperations().create(table);
+    BatchWriter bw = c.createBatchWriter(table, 10000L, 1000L, 4);
+    for (int i = 0; i < 100; i++) {
+      Mutation m = new Mutation(new Text(String.format("%09x", i + 1)));
+      m.put(new Text(), new Text(), new Value(String.format("%09x", i).getBytes()));
+      bw.addMutation(m);
+    }
+    bw.close();
+  }
+
+
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/d344ad6d/core/src/test/java/org/apache/accumulo/core/client/mapreduce/MultiTableInputFormatTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/accumulo/core/client/mapreduce/MultiTableInputFormatTest.java b/core/src/test/java/org/apache/accumulo/core/client/mapreduce/MultiTableInputFormatTest.java
deleted file mode 100644
index b03db9b..0000000
--- a/core/src/test/java/org/apache/accumulo/core/client/mapreduce/MultiTableInputFormatTest.java
+++ /dev/null
@@ -1,245 +0,0 @@
-package org.apache.accumulo.core.client.mapreduce;
-
-import static org.apache.accumulo.core.client.mapreduce.InputFormatBase.getRanges;
-import static org.apache.accumulo.core.client.mapreduce.InputFormatBase.setConnectorInfo;
-import static org.apache.accumulo.core.client.mapreduce.InputFormatBase.setInputTableNames;
-import static org.apache.accumulo.core.client.mapreduce.InputFormatBase.setMockInstance;
-import static org.apache.accumulo.core.client.mapreduce.InputFormatBase.setRanges;
-import static org.apache.accumulo.core.client.mapreduce.InputFormatBase.setScanAuthorizations;
-import static org.apache.accumulo.core.client.mapreduce.multi.ContextFactory.createMapContext;
-import static org.apache.accumulo.core.client.mapreduce.multi.ContextFactory.createTaskAttemptContext;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-
-import junit.framework.Assert;
-
-import org.apache.accumulo.core.client.BatchWriter;
-import org.apache.accumulo.core.client.Connector;
-import org.apache.accumulo.core.client.mapreduce.InputFormatBase.RangeInputSplit;
-import org.apache.accumulo.core.client.mock.MockInstance;
-import org.apache.accumulo.core.client.security.tokens.PasswordToken;
-import org.apache.accumulo.core.data.Key;
-import org.apache.accumulo.core.data.Mutation;
-import org.apache.accumulo.core.data.Range;
-import org.apache.accumulo.core.data.Value;
-import org.apache.accumulo.core.security.Authorizations;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.junit.Test;
-
-public class MultiTableInputFormatTest {
-  
-  /**
-   * Writes data out to a table.
-   * 
-   * The data written out is 100 entries, with the row being a number 1-100 and the value
-   * being a number one less than the row (0-99).
-   * 
-   * @param c
-   * @param table
-   * @throws Throwable
-   */
-  static void writeData(Connector c, String table) throws Throwable {
-    c.tableOperations().create(table);
-    BatchWriter bw = c.createBatchWriter(table, 10000L, 1000L, 4);
-    for (int i = 0; i < 100; i++) {
-      Mutation m = new Mutation(new Text(String.format("%09x", i + 1)));
-      m.put(new Text(), new Text(), new Value(String.format("%09x", i).getBytes()));
-      bw.addMutation(m);
-    }
-    bw.close();
-  }
-  
-  /**
-   * Creates five tables, table0 through table4, that get loaded with 100 keys each.
-   * 
-   * This test expects that each table is filled with 100 entries and that a sample
-   * MapReduce job is created to scan all five. We should see five input splits; one for
-   * each table. 
-   * 
-   * The sample job uses the TestMapper class defined locally to this test. Verification
-   * of features such as expected table and number of keys is performed via the TestMapper.
-   * 
-   * @throws Throwable
-   */
-  @Test
-  public void testMap() throws Throwable {
-    MockInstance mockInstance = new MockInstance("testmapinstance");
-    Connector c = mockInstance.getConnector("root", new byte[] {});
-    StringBuilder tablesBuilder = new StringBuilder();
-    LinkedList<String> tablesList = new LinkedList<String>();
-    for(int i = 0; i < 5; ++i) {
-      String table = "table" + i;
-      tablesList.add(table);
-      writeData(c, table);
-      tablesBuilder.append(table).append(',');
-    }
-    tablesBuilder.setLength(tablesBuilder.length() - 1);
-    
-    Job job = new Job(new Configuration());
-    job.setInputFormatClass (AccumuloInputFormat.class);
-    job.setMapperClass (TestMapper.class);
-    job.setNumReduceTasks (0);
-    setConnectorInfo (job, "root", new PasswordToken (new byte[0]));
-    setInputTableNames (job, tablesList);
-    setScanAuthorizations (job, new Authorizations ());
-    setMockInstance (job, "testmapinstance");
-
-    AccumuloInputFormat input = new AccumuloInputFormat ();
-    List<InputSplit> splits = input.getSplits(job);
-    assertEquals(splits.size(), 5);
-    
-    TestMapper mapper = (TestMapper) job.getMapperClass().newInstance();
-    for (InputSplit split : splits) {
-      TaskAttemptContext tac = createTaskAttemptContext (job);
-      RecordReader<Key,Value> reader = input.createRecordReader(split, tac);
-      Mapper<Key,Value,Key,Value>.Context context = createMapContext (mapper, tac, reader, null, split);
-      reader.initialize(split, context);
-      mapper.expectedTable( new Text( ((RangeInputSplit) split).getTableName () ) );
-      mapper.run(context);
-    }
-  }
-  
-  /**
-   * Asserts that the configuration contains the expected ranges for the tables.
-   */
-  @Test
-  public void testRangeSerialization() throws Throwable {
-    List<String> tables = Arrays.asList("t1", "t2", "t3");
-    Job job = new Job(new Configuration());
-    job.setInputFormatClass(AccumuloInputFormat.class);
-    job.setMapperClass (TestMapper.class);
-    job.setNumReduceTasks (0);
-    setConnectorInfo (job, "root", new PasswordToken (new byte[0]));
-    setInputTableNames (job, tables);
-    setScanAuthorizations (job, new Authorizations ());
-    setMockInstance (job, "testmapinstance");
-    
-    HashMap<String, Collection<Range>> tblRanges = new HashMap<String, Collection<Range>>();
-    for(String tbl : tables) {
-      List<Range> ranges = Arrays.asList(
-          new Range("a", "b"),
-          new Range("c", "d"),
-          new Range("e", "f") );
-      tblRanges.put(tbl, ranges);
-    }
-    
-    setRanges (job, tblRanges);
-    Map<String, List<Range>> configuredRanges = getRanges (job);
-    
-    for(Entry<String, List<Range>> cfgRange : configuredRanges.entrySet()) {
-      String tbl = cfgRange.getKey();
-      HashSet<Range> originalRanges = new HashSet<Range>(tblRanges.remove(tbl));
-      HashSet<Range> retrievedRanges = new HashSet<Range>(cfgRange.getValue());
-      assertEquals (originalRanges.size (), retrievedRanges.size ());
-      assertTrue (originalRanges.containsAll (retrievedRanges));
-      assertTrue (retrievedRanges.containsAll (originalRanges));
-    }
-  }
-
-//  /**
-//   * Asserts that the configuration contains the expected iterators for the tables.
-//   */
-//  @Test
-//  public void testIteratorSerialization() throws Throwable {
-//    HashSet<String> tables = new HashSet<String>(Arrays.asList("t1", "t2", "t3"));
-//    Job job = new Job(new Configuration());
-//    job.setInputFormatClass(AccumuloInputFormat.class);
-//    job.setMapperClass(TestMapper.class);
-//    job.setNumReduceTasks(0);
-//    AccumuloInputFormat.setInputInfo(job.getConfiguration(), "root", "".getBytes(), tables, new Authorizations());
-//    AccumuloInputFormat.setMockInstance(job.getConfiguration(), "testmapinstance");
-//
-//    // create + set iterators on configuration and build expected reference set
-//    HashMap<String, List<IteratorSetting>> expectedIterators = new HashMap<String, List<IteratorSetting>>();
-//    for(String tbl : tables) {
-//      IteratorSetting isetting1 = new IteratorSetting(1, "name1", "class1"),
-//          isetting2 = new IteratorSetting(2, "name2", "class3"),
-//          isetting5 = new IteratorSetting(5, "name5", "class5");
-//
-//      expectedIterators.put(tbl, Arrays.asList(isetting1, isetting2, isetting5));
-//    }
-//
-//    Map<String, List<IteratorSetting>> immutableView = Collections.unmodifiableMap(expectedIterators);
-//    AccumuloInputFormat.setIterators(job.getConfiguration(), immutableView);
-//
-//    // get a list of all the iterators set on the configuration and validate that
-//    // we find settings for all of the tables and assert that we actually configured
-//    // the iterators we get back
-//    List<AccumuloIterator> accItrs = AccumuloInputFormat.getIterators(job.getConfiguration());
-//    Assert.assertFalse(accItrs.isEmpty());
-//    for(AccumuloIterator accItr : accItrs) {
-//      String table = accItr.getTable();
-//      tables.remove( table );
-//      Assert.assertTrue( expectedIterators.containsKey(table) );
-//      Assert.assertTrue( findIteratorMatch( expectedIterators.get(table), accItr ) );
-//    }
-//
-//    Assert.assertTrue(tables.isEmpty());
-//  }
-
-//  /*
-//   * Helper method to do a linear search for the AccumuloIterator in the list of IteratorSettings.
-//   */
-//  static boolean findIteratorMatch(List<IteratorSetting> iterators, AccumuloIterator itr) {
-//    boolean match = false;
-//    for(IteratorSetting setting : iterators) {
-//      match = setting.getPriority() == itr.getPriority() &&
-//          setting.getName().equals( itr.getIteratorName() ) &&
-//          setting.getIteratorClass().equals( itr.getIteratorClass() );
-//      if(match) break;
-//    }
-//    return match;
-//  }
-  
-  /**
-   * A sample Mapper that verifies aspects of the input.
-   * 
-   * This mapper verifies that all keys passed to it are for the expected
-   * table and that it sees exactly 100 keys.
-   *
-   */
-  static class TestMapper extends Mapper<Key,Value,Key,Value> {
-    private int count;
-    private Text expectedTable;
-    
-    public void expectedTable(Text t) {
-      this.expectedTable = t;
-    }
-    
-    @Override
-    protected void setup(Context context) throws IOException,
-        InterruptedException {
-      super.setup(context);
-      count = 0;
-    }
-
-    @Override
-    protected void map(Key k, Value v, Context context) throws IOException, InterruptedException {
-      assertEquals(expectedTable.toString (), ((RangeInputSplit)context.getInputSplit ()).getTableName ());
-      ++count;
-    }
-
-    @Override
-    protected void cleanup(Context context) throws IOException,
-        InterruptedException {
-      super.cleanup(context);
-      Assert.assertEquals(100, count);
-    }
-  }
-}


Mime
View raw message