accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mmil...@apache.org
Subject [accumulo] branch master updated: Multi table input for new MR. Closes #749 (#821)
Date Wed, 19 Dec 2018 20:01:18 GMT
This is an automated email from the ASF dual-hosted git repository.

mmiller pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/master by this push:
     new 14e0bd6  Multi table input for new MR. Closes #749 (#821)
14e0bd6 is described below

commit 14e0bd6f671ad12eada7a62aaa49243ca7eb000d
Author: Mike Miller <mmiller@apache.org>
AuthorDate: Wed Dec 19 15:01:13 2018 -0500

    Multi table input for new MR. Closes #749 (#821)
    
    * Modified new MapReduce builder and implementation to allow multiple
    tables through the same fluent API as AccumuloInputFormat
    * Replaced setIterators method with addIterator
---
 .../hadoop/mapred/AccumuloInputFormat.java         |   4 +-
 .../hadoop/mapred/AccumuloRowInputFormat.java      |   2 +-
 .../hadoop/mapreduce/AccumuloInputFormat.java      |  11 ++
 .../hadoop/mapreduce/InputFormatBuilder.java       |  15 +-
 .../hadoopImpl/mapred/AbstractInputFormat.java     |  41 +----
 .../hadoopImpl/mapreduce/AbstractInputFormat.java  |  35 +---
 .../mapreduce/InputFormatBuilderImpl.java          | 198 +++++++++++----------
 .../hadoopImpl/mapreduce/InputTableConfig.java     |  98 +++++-----
 .../mapreduce/lib/InputConfigurator.java           |   2 +-
 .../hadoop/its/mapred/MultiTableInputFormatIT.java | 152 ++++++++++++++++
 .../its/mapreduce/MultiTableInputFormatIT.java     | 152 ++++++++++++++++
 .../hadoop/mapred/AccumuloInputFormatTest.java     |  11 ++
 .../hadoop/mapred/MultiTableInputFormatTest.java   | 123 +++++++++++++
 .../hadoop/mapreduce/AccumuloInputFormatTest.java  |  13 ++
 .../mapreduce/MultiTableInputFormatTest.java       | 125 +++++++++++++
 .../hadoopImpl/mapreduce/InputTableConfigTest.java |   2 +-
 16 files changed, 769 insertions(+), 215 deletions(-)

diff --git a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapred/AccumuloInputFormat.java b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapred/AccumuloInputFormat.java
index 6fabcf9..2523a0d 100644
--- a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapred/AccumuloInputFormat.java
+++ b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapred/AccumuloInputFormat.java
@@ -54,7 +54,7 @@ public class AccumuloInputFormat implements InputFormat<Key,Value> {
    */
   @Override
   public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
-    return AbstractInputFormat.getSplits(job, numSplits);
+    return AbstractInputFormat.getSplits(job);
   }
 
   @Override
@@ -96,6 +96,6 @@ public class AccumuloInputFormat implements InputFormat<Key,Value> {
    * Sets all the information required for this map reduce job.
    */
   public static InputFormatBuilder.ClientParams<JobConf> configure() {
-    return new InputFormatBuilderImpl<JobConf>(CLASS);
+    return new InputFormatBuilderImpl<>(CLASS);
   }
 }
diff --git a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapred/AccumuloRowInputFormat.java b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapred/AccumuloRowInputFormat.java
index bdcfbdb..cb1d650 100644
--- a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapred/AccumuloRowInputFormat.java
+++ b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapred/AccumuloRowInputFormat.java
@@ -53,7 +53,7 @@ public class AccumuloRowInputFormat implements InputFormat<Text,PeekingIterator<
    */
   @Override
   public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
-    return AbstractInputFormat.getSplits(job, numSplits);
+    return AbstractInputFormat.getSplits(job);
   }
 
   @Override
diff --git a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapreduce/AccumuloInputFormat.java b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapreduce/AccumuloInputFormat.java
index 2bffe35..2c6259d 100644
--- a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapreduce/AccumuloInputFormat.java
+++ b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapreduce/AccumuloInputFormat.java
@@ -49,6 +49,17 @@ import org.slf4j.LoggerFactory;
  *     .store(job);
  * </pre>
  *
+ * Multiple tables can be set by configuring clientProperties once and then calling .table() for
+ * each table. The methods following a call to .table() apply only to that table. For Example:
+ *
+ * <pre>
+ * AccumuloInputFormat.configure().clientProperties(props) // set client props once
+ *     .table(table1).auths(auths1).fetchColumns(cols1).batchScan(true) // options for table1
+ *     .table(table2).ranges(range2).auths(auths2).addIterator(iter2) // options for table2
+ *     .table(table3).ranges(range3).auths(auths3).addIterator(iter3) // options for table3
+ *     .store(job); // store all tables in the job when finished
+ * </pre>
+ *
  * For descriptions of all options see
  * {@link org.apache.accumulo.hadoop.mapreduce.InputFormatBuilder.InputFormatOptions}
  *
diff --git a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapreduce/InputFormatBuilder.java b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapreduce/InputFormatBuilder.java
index 4677bda..ddb4deb 100644
--- a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapreduce/InputFormatBuilder.java
+++ b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapreduce/InputFormatBuilder.java
@@ -62,12 +62,18 @@ public interface InputFormatBuilder {
    */
   interface TableParams<T> {
     /**
-     * Sets the name of the input table, over which this job will scan.
+     * Sets the name of the input table, over which this job will scan. At least one table is
+     * required before calling store(Job)
      *
      * @param tableName
      *          the table to use when the tablename is null in the write call
      */
     InputFormatOptions<T> table(String tableName);
+
+    /**
+     * Finish configuring, verify and serialize options into the JobConf or Job
+     */
+    void store(T j) throws AccumuloException, AccumuloSecurityException;
   }
 
   /**
@@ -75,7 +81,7 @@ public interface InputFormatBuilder {
    *
    * @since 2.0
    */
-  interface InputFormatOptions<T> {
+  interface InputFormatOptions<T> extends TableParams<T> {
     /**
      * Sets the {@link Authorizations} used to scan. Must be a subset of the user's authorizations.
      * By Default, all of the users auths are set.
@@ -217,10 +223,5 @@ public interface InputFormatBuilder {
      * By default, this feature is <b>disabled</b>.
      */
     InputFormatOptions<T> batchScan(boolean value);
-
-    /**
-     * Finish configuring, verify and serialize options into the JobConf or Job
-     */
-    void store(T j) throws AccumuloException, AccumuloSecurityException;
   }
 }
diff --git a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapred/AbstractInputFormat.java b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapred/AbstractInputFormat.java
index fc34f3f..4d1d4ea 100644
--- a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapred/AbstractInputFormat.java
+++ b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapred/AbstractInputFormat.java
@@ -201,35 +201,6 @@ public abstract class AbstractInputFormat {
   }
 
   /**
-   * Fetches all {@link InputTableConfig}s that have been set on the given Hadoop job.
-   *
-   * @param job
-   *          the Hadoop job instance to be configured
-   * @return the {@link InputTableConfig} objects set on the job
-   * @since 1.6.0
-   */
-  protected static Map<String,InputTableConfig> getInputTableConfigs(JobConf job) {
-    return InputConfigurator.getInputTableConfigs(CLASS, job);
-  }
-
-  /**
-   * Fetches a {@link InputTableConfig} that has been set on the configuration for a specific table.
-   *
-   * <p>
-   * null is returned in the event that the table doesn't exist.
-   *
-   * @param job
-   *          the Hadoop job instance to be configured
-   * @param tableName
-   *          the table name for which to grab the config object
-   * @return the {@link InputTableConfig} for the given table
-   * @since 1.6.0
-   */
-  protected static InputTableConfig getInputTableConfig(JobConf job, String tableName) {
-    return InputConfigurator.getInputTableConfig(CLASS, job, tableName);
-  }
-
-  /**
    * An abstract base class to be used to create {@link org.apache.hadoop.mapred.RecordReader}
    * instances that convert from Accumulo
    * {@link org.apache.accumulo.core.data.Key}/{@link org.apache.accumulo.core.data.Value} pairs to
@@ -305,7 +276,8 @@ public abstract class AbstractInputFormat {
 
       // in case the table name changed, we can still use the previous name for terms of
       // configuration, but the scanner will use the table id resolved at job setup time
-      InputTableConfig tableConfig = getInputTableConfig(job, baseSplit.getTableName());
+      InputTableConfig tableConfig = InputConfigurator.getInputTableConfig(CLASS, job,
+          baseSplit.getTableName());
 
       log.debug("Created client with user: " + context.whoami());
       log.debug("Creating scanner for table: " + table);
@@ -452,18 +424,13 @@ public abstract class AbstractInputFormat {
   /**
    * Gets the splits of the tables that have been set on the job by reading the metadata table for
    * the specified ranges.
-   *
-   * @return the splits from the tables based on the ranges.
-   * @throws java.io.IOException
-   *           if a table set on the job doesn't exist or an error occurs initializing the tablet
-   *           locator
    */
-  public static InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
+  public static InputSplit[] getSplits(JobConf job) throws IOException {
     validateOptions(job);
 
     Random random = new SecureRandom();
     LinkedList<InputSplit> splits = new LinkedList<>();
-    Map<String,InputTableConfig> tableConfigs = getInputTableConfigs(job);
+    Map<String,InputTableConfig> tableConfigs = InputConfigurator.getInputTableConfigs(CLASS, job);
     try (AccumuloClient client = createClient(job)) {
       for (Map.Entry<String,InputTableConfig> tableConfigEntry : tableConfigs.entrySet()) {
         String tableName = tableConfigEntry.getKey();
diff --git a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/AbstractInputFormat.java b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/AbstractInputFormat.java
index c6702b6..b845ff8 100644
--- a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/AbstractInputFormat.java
+++ b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/AbstractInputFormat.java
@@ -182,35 +182,6 @@ public abstract class AbstractInputFormat {
     return InputConfigurator.getScanAuthorizations(CLASS, context.getConfiguration());
   }
 
-  /**
-   * Fetches all {@link InputTableConfig}s that have been set on the given job.
-   *
-   * @param context
-   *          the Hadoop job instance to be configured
-   * @return the {@link InputTableConfig} objects for the job
-   * @since 1.6.0
-   */
-  public static Map<String,InputTableConfig> getInputTableConfigs(JobContext context) {
-    return InputConfigurator.getInputTableConfigs(CLASS, context.getConfiguration());
-  }
-
-  /**
-   * Fetches a {@link InputTableConfig} that has been set on the configuration for a specific table.
-   *
-   * <p>
-   * null is returned in the event that the table doesn't exist.
-   *
-   * @param context
-   *          the Hadoop job instance to be configured
-   * @param tableName
-   *          the table name for which to grab the config object
-   * @return the {@link InputTableConfig} for the given table
-   * @since 1.6.0
-   */
-  protected static InputTableConfig getInputTableConfig(JobContext context, String tableName) {
-    return InputConfigurator.getInputTableConfig(CLASS, context.getConfiguration(), tableName);
-  }
-
   // InputFormat doesn't have the equivalent of OutputFormat's checkOutputSpecs(JobContext job)
   /**
    * Check whether a configuration is fully configured to be used with an Accumulo
@@ -310,7 +281,8 @@ public abstract class AbstractInputFormat {
       // in case the table name changed, we can still use the previous name for terms of
       // configuration,
       // but the scanner will use the table id resolved at job setup time
-      InputTableConfig tableConfig = getInputTableConfig(attempt, split.getTableName());
+      InputTableConfig tableConfig = InputConfigurator.getInputTableConfig(CLASS,
+          attempt.getConfiguration(), split.getTableName());
 
       log.debug("Creating client with user: " + client.whoami());
       log.debug("Creating scanner for table: " + table);
@@ -476,7 +448,8 @@ public abstract class AbstractInputFormat {
     Random random = new SecureRandom();
     LinkedList<InputSplit> splits = new LinkedList<>();
     try (AccumuloClient client = createClient(context)) {
-      Map<String,InputTableConfig> tableConfigs = getInputTableConfigs(context);
+      Map<String,InputTableConfig> tableConfigs = InputConfigurator.getInputTableConfigs(CLASS,
+          context.getConfiguration());
       for (Map.Entry<String,InputTableConfig> tableConfigEntry : tableConfigs.entrySet()) {
 
         String tableName = tableConfigEntry.getKey();
diff --git a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/InputFormatBuilderImpl.java b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/InputFormatBuilderImpl.java
index 3d85205..0af63d8 100644
--- a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/InputFormatBuilderImpl.java
+++ b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/InputFormatBuilderImpl.java
@@ -19,9 +19,9 @@ package org.apache.accumulo.hadoopImpl.mapreduce;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.LinkedHashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Objects;
-import java.util.Optional;
 import java.util.Properties;
 
 import org.apache.accumulo.core.client.Accumulo;
@@ -46,17 +46,10 @@ public class InputFormatBuilderImpl<T>
     InputFormatBuilder.TableParams<T>, InputFormatBuilder.InputFormatOptions<T> {
 
   Class<?> callingClass;
-  String tableName;
   ClientInfo clientInfo;
-  Authorizations scanAuths;
 
-  Optional<String> context = Optional.empty();
-  Collection<Range> ranges = Collections.emptyList();
-  Collection<IteratorSetting.Column> fetchColumns = Collections.emptyList();
-  Map<String,IteratorSetting> iterators = Collections.emptyMap();
-  Optional<SamplerConfiguration> samplerConfig = Optional.empty();
-  Map<String,String> hints = Collections.emptyMap();
-  BuilderBooleans bools = new BuilderBooleans();
+  String currentTable;
+  Map<String,InputTableConfig> tableConfigMap = Collections.emptyMap();
 
   public InputFormatBuilderImpl(Class<?> callingClass) {
     this.callingClass = callingClass;
@@ -71,38 +64,44 @@ public class InputFormatBuilderImpl<T>
 
   @Override
   public InputFormatBuilder.InputFormatOptions<T> table(String tableName) {
-    this.tableName = Objects.requireNonNull(tableName, "Table name must not be null");
+    this.currentTable = Objects.requireNonNull(tableName, "Table name must not be null");
+    if (tableConfigMap.isEmpty())
+      tableConfigMap = new LinkedHashMap<>();
+    tableConfigMap.put(currentTable, new InputTableConfig());
     return this;
   }
 
   @Override
   public InputFormatBuilder.InputFormatOptions<T> auths(Authorizations auths) {
-    this.scanAuths = Objects.requireNonNull(auths, "Authorizations must not be null");
+    tableConfigMap.get(currentTable)
+        .setScanAuths(Objects.requireNonNull(auths, "Authorizations must not be null"));
     return this;
   }
 
   @Override
   public InputFormatBuilder.InputFormatOptions<T> classLoaderContext(String context) {
-    this.context = Optional.of(context);
+    tableConfigMap.get(currentTable).setContext(context);
     return this;
   }
 
   @Override
   public InputFormatBuilder.InputFormatOptions<T> ranges(Collection<Range> ranges) {
-    this.ranges = ImmutableList
+    List<Range> newRanges = ImmutableList
         .copyOf(Objects.requireNonNull(ranges, "Collection of ranges is null"));
-    if (this.ranges.size() == 0)
+    if (newRanges.size() == 0)
       throw new IllegalArgumentException("Specified collection of ranges is empty.");
+    tableConfigMap.get(currentTable).setRanges(newRanges);
     return this;
   }
 
   @Override
   public InputFormatBuilder.InputFormatOptions<T> fetchColumns(
       Collection<IteratorSetting.Column> fetchColumns) {
-    this.fetchColumns = ImmutableList
+    Collection<IteratorSetting.Column> newFetchColumns = ImmutableList
         .copyOf(Objects.requireNonNull(fetchColumns, "Collection of fetch columns is null"));
-    if (this.fetchColumns.size() == 0)
+    if (newFetchColumns.size() == 0)
       throw new IllegalArgumentException("Specified collection of fetch columns is empty.");
+    tableConfigMap.get(currentTable).fetchColumns(newFetchColumns);
     return this;
   }
 
@@ -110,57 +109,56 @@ public class InputFormatBuilderImpl<T>
   public InputFormatBuilder.InputFormatOptions<T> addIterator(IteratorSetting cfg) {
     // store iterators by name to prevent duplicates
     Objects.requireNonNull(cfg, "IteratorSetting must not be null.");
-    if (this.iterators.size() == 0)
-      this.iterators = new LinkedHashMap<>();
-    this.iterators.put(cfg.getName(), cfg);
+    tableConfigMap.get(currentTable).addIterator(cfg);
     return this;
   }
 
   @Override
   public InputFormatBuilder.InputFormatOptions<T> executionHints(Map<String,String> hints) {
-    this.hints = ImmutableMap
+    Map<String,String> newHints = ImmutableMap
         .copyOf(Objects.requireNonNull(hints, "Map of execution hints must not be null."));
-    if (hints.size() == 0)
+    if (newHints.size() == 0)
       throw new IllegalArgumentException("Specified map of execution hints is empty.");
+    tableConfigMap.get(currentTable).setExecutionHints(newHints);
     return this;
   }
 
   @Override
   public InputFormatBuilder.InputFormatOptions<T> samplerConfiguration(
       SamplerConfiguration samplerConfig) {
-    this.samplerConfig = Optional.of(samplerConfig);
+    tableConfigMap.get(currentTable).setSamplerConfiguration(samplerConfig);
     return this;
   }
 
   @Override
   public InputFormatOptions<T> autoAdjustRanges(boolean value) {
-    bools.autoAdjustRanges = value;
+    tableConfigMap.get(currentTable).setAutoAdjustRanges(value);
     return this;
   }
 
   @Override
   public InputFormatOptions<T> scanIsolation(boolean value) {
-    bools.scanIsolation = value;
+    tableConfigMap.get(currentTable).setUseIsolatedScanners(value);
     return this;
   }
 
   @Override
   public InputFormatOptions<T> localIterators(boolean value) {
-    bools.localIters = value;
+    tableConfigMap.get(currentTable).setUseLocalIterators(value);
     return this;
   }
 
   @Override
   public InputFormatOptions<T> offlineScan(boolean value) {
-    bools.offlineScan = value;
+    tableConfigMap.get(currentTable).setOfflineScan(value);
     return this;
   }
 
   @Override
   public InputFormatOptions<T> batchScan(boolean value) {
-    bools.batchScan = value;
+    tableConfigMap.get(currentTable).setUseBatchScan(value);
     if (value)
-      bools.autoAdjustRanges = true;
+      tableConfigMap.get(currentTable).setAutoAdjustRanges(true);
     return this;
   }
 
@@ -180,30 +178,40 @@ public class InputFormatBuilderImpl<T>
    */
   private void store(Job job) throws AccumuloException, AccumuloSecurityException {
     AbstractInputFormat.setClientInfo(job, clientInfo);
-    InputFormatBase.setInputTableName(job, tableName);
-
-    scanAuths = getUserAuths(scanAuths, clientInfo);
-    AbstractInputFormat.setScanAuthorizations(job, scanAuths);
-
-    // all optional values
-    if (context.isPresent())
-      AbstractInputFormat.setClassLoaderContext(job, context.get());
-    if (ranges.size() > 0)
-      InputFormatBase.setRanges(job, ranges);
-    if (iterators.size() > 0)
-      InputConfigurator.writeIteratorsToConf(callingClass, job.getConfiguration(),
-          iterators.values());
-    if (fetchColumns.size() > 0)
-      InputConfigurator.fetchColumns(callingClass, job.getConfiguration(), fetchColumns);
-    if (samplerConfig.isPresent())
-      InputFormatBase.setSamplerConfiguration(job, samplerConfig.get());
-    if (hints.size() > 0)
-      InputFormatBase.setExecutionHints(job, hints);
-    InputFormatBase.setAutoAdjustRanges(job, bools.autoAdjustRanges);
-    InputFormatBase.setScanIsolation(job, bools.scanIsolation);
-    InputFormatBase.setLocalIterators(job, bools.localIters);
-    InputFormatBase.setOfflineTableScan(job, bools.offlineScan);
-    InputFormatBase.setBatchScan(job, bools.batchScan);
+    if (tableConfigMap.size() == 0) {
+      throw new IllegalArgumentException("At least one Table must be configured for job.");
+    }
+    // if only one table use the single table configuration method
+    if (tableConfigMap.size() == 1) {
+      Map.Entry<String,InputTableConfig> entry = tableConfigMap.entrySet().iterator().next();
+      InputFormatBase.setInputTableName(job, entry.getKey());
+      InputTableConfig config = entry.getValue();
+      if (!config.getScanAuths().isPresent())
+        config.setScanAuths(getUserAuths(clientInfo));
+      AbstractInputFormat.setScanAuthorizations(job, config.getScanAuths().get());
+      // all optional values
+      if (config.getContext().isPresent())
+        AbstractInputFormat.setClassLoaderContext(job, config.getContext().get());
+      if (config.getRanges().size() > 0)
+        InputFormatBase.setRanges(job, config.getRanges());
+      if (config.getIterators().size() > 0)
+        InputConfigurator.writeIteratorsToConf(callingClass, job.getConfiguration(),
+            config.getIterators());
+      if (config.getFetchedColumns().size() > 0)
+        InputConfigurator.fetchColumns(callingClass, job.getConfiguration(),
+            config.getFetchedColumns());
+      if (config.getSamplerConfiguration() != null)
+        InputFormatBase.setSamplerConfiguration(job, config.getSamplerConfiguration());
+      if (config.getExecutionHints().size() > 0)
+        InputFormatBase.setExecutionHints(job, config.getExecutionHints());
+      InputFormatBase.setAutoAdjustRanges(job, config.shouldAutoAdjustRanges());
+      InputFormatBase.setScanIsolation(job, config.shouldUseIsolatedScanners());
+      InputFormatBase.setLocalIterators(job, config.shouldUseLocalIterators());
+      InputFormatBase.setOfflineTableScan(job, config.isOfflineScan());
+      InputFormatBase.setBatchScan(job, config.shouldBatchScan());
+    } else {
+      InputConfigurator.setInputTableConfigs(callingClass, job.getConfiguration(), tableConfigMap);
+    }
   }
 
   /**
@@ -211,52 +219,56 @@ public class InputFormatBuilderImpl<T>
    */
   private void store(JobConf jobConf) throws AccumuloException, AccumuloSecurityException {
     org.apache.accumulo.hadoopImpl.mapred.AbstractInputFormat.setClientInfo(jobConf, clientInfo);
-    org.apache.accumulo.hadoopImpl.mapred.InputFormatBase.setInputTableName(jobConf, tableName);
-
-    scanAuths = getUserAuths(scanAuths, clientInfo);
-    org.apache.accumulo.hadoopImpl.mapred.AbstractInputFormat.setScanAuthorizations(jobConf,
-        scanAuths);
-
-    // all optional values
-    if (context.isPresent())
-      org.apache.accumulo.hadoopImpl.mapred.AbstractInputFormat.setClassLoaderContext(jobConf,
-          context.get());
-    if (ranges.size() > 0)
-      org.apache.accumulo.hadoopImpl.mapred.InputFormatBase.setRanges(jobConf, ranges);
-    if (iterators.size() > 0)
-      InputConfigurator.writeIteratorsToConf(callingClass, jobConf, iterators.values());
-    if (fetchColumns.size() > 0)
-      InputConfigurator.fetchColumns(callingClass, jobConf, fetchColumns);
-    if (samplerConfig.isPresent())
-      org.apache.accumulo.hadoopImpl.mapred.InputFormatBase.setSamplerConfiguration(jobConf,
-          samplerConfig.get());
-    if (hints.size() > 0)
-      org.apache.accumulo.hadoopImpl.mapred.InputFormatBase.setExecutionHints(jobConf, hints);
-    org.apache.accumulo.hadoopImpl.mapred.InputFormatBase.setAutoAdjustRanges(jobConf,
-        bools.autoAdjustRanges);
-    org.apache.accumulo.hadoopImpl.mapred.InputFormatBase.setScanIsolation(jobConf,
-        bools.scanIsolation);
-    org.apache.accumulo.hadoopImpl.mapred.InputFormatBase.setLocalIterators(jobConf,
-        bools.localIters);
-    org.apache.accumulo.hadoopImpl.mapred.InputFormatBase.setOfflineTableScan(jobConf,
-        bools.offlineScan);
-    org.apache.accumulo.hadoopImpl.mapred.InputFormatBase.setBatchScan(jobConf, bools.batchScan);
+    if (tableConfigMap.size() == 0) {
+      throw new IllegalArgumentException("At least one Table must be configured for job.");
+    }
+    // if only one table use the single table configuration method
+    if (tableConfigMap.size() == 1) {
+      Map.Entry<String,InputTableConfig> entry = tableConfigMap.entrySet().iterator().next();
+      org.apache.accumulo.hadoopImpl.mapred.InputFormatBase.setInputTableName(jobConf,
+          entry.getKey());
+      InputTableConfig config = entry.getValue();
+      if (!config.getScanAuths().isPresent())
+        config.setScanAuths(getUserAuths(clientInfo));
+      org.apache.accumulo.hadoopImpl.mapred.AbstractInputFormat.setScanAuthorizations(jobConf,
+          config.getScanAuths().get());
+      // all optional values
+      if (config.getContext().isPresent())
+        org.apache.accumulo.hadoopImpl.mapred.AbstractInputFormat.setClassLoaderContext(jobConf,
+            config.getContext().get());
+      if (config.getRanges().size() > 0)
+        org.apache.accumulo.hadoopImpl.mapred.InputFormatBase.setRanges(jobConf,
+            config.getRanges());
+      if (config.getIterators().size() > 0)
+        InputConfigurator.writeIteratorsToConf(callingClass, jobConf, config.getIterators());
+      if (config.getFetchedColumns().size() > 0)
+        InputConfigurator.fetchColumns(callingClass, jobConf, config.getFetchedColumns());
+      if (config.getSamplerConfiguration() != null)
+        org.apache.accumulo.hadoopImpl.mapred.InputFormatBase.setSamplerConfiguration(jobConf,
+            config.getSamplerConfiguration());
+      if (config.getExecutionHints().size() > 0)
+        org.apache.accumulo.hadoopImpl.mapred.InputFormatBase.setExecutionHints(jobConf,
+            config.getExecutionHints());
+      org.apache.accumulo.hadoopImpl.mapred.InputFormatBase.setAutoAdjustRanges(jobConf,
+          config.shouldAutoAdjustRanges());
+      org.apache.accumulo.hadoopImpl.mapred.InputFormatBase.setScanIsolation(jobConf,
+          config.shouldUseIsolatedScanners());
+      org.apache.accumulo.hadoopImpl.mapred.InputFormatBase.setLocalIterators(jobConf,
+          config.shouldUseLocalIterators());
+      org.apache.accumulo.hadoopImpl.mapred.InputFormatBase.setOfflineTableScan(jobConf,
+          config.isOfflineScan());
+      org.apache.accumulo.hadoopImpl.mapred.InputFormatBase.setBatchScan(jobConf,
+          config.shouldBatchScan());
+    } else {
+      InputConfigurator.setInputTableConfigs(callingClass, jobConf, tableConfigMap);
+    }
   }
 
-  private Authorizations getUserAuths(Authorizations scanAuths, ClientInfo clientInfo)
+  private Authorizations getUserAuths(ClientInfo clientInfo)
       throws AccumuloSecurityException, AccumuloException {
-    if (scanAuths != null)
-      return scanAuths;
     try (AccumuloClient c = Accumulo.newClient().from(clientInfo.getProperties()).build()) {
       return c.securityOperations().getUserAuthorizations(clientInfo.getPrincipal());
     }
   }
 
-  private static class BuilderBooleans {
-    boolean autoAdjustRanges = true;
-    boolean scanIsolation = false;
-    boolean offlineScan = false;
-    boolean localIters = false;
-    boolean batchScan = false;
-  }
 }
diff --git a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/InputTableConfig.java b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/InputTableConfig.java
index c90c92f..9d27d6f 100644
--- a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/InputTableConfig.java
+++ b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/InputTableConfig.java
@@ -24,15 +24,20 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.Objects;
+import java.util.Optional;
 
 import org.apache.accumulo.core.client.IteratorSetting;
 import org.apache.accumulo.core.client.ScannerBase;
 import org.apache.accumulo.core.client.sample.SamplerConfiguration;
 import org.apache.accumulo.core.data.Range;
 import org.apache.accumulo.core.sample.impl.SamplerConfigurationImpl;
+import org.apache.accumulo.core.security.Authorizations;
 import org.apache.accumulo.core.util.Pair;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
@@ -43,14 +48,19 @@ import org.apache.hadoop.io.Writable;
  */
 public class InputTableConfig implements Writable {
 
-  private List<IteratorSetting> iterators;
-  private List<Range> ranges;
-  private Collection<IteratorSetting.Column> columns;
+  // store iterators by name to prevent duplicates for addIterator
+  private Map<String,IteratorSetting> iterators = Collections.emptyMap();
+  private List<Range> ranges = Collections.emptyList();
+  private Collection<IteratorSetting.Column> columns = Collections.emptyList();
+
+  private Optional<Authorizations> scanAuths = Optional.empty();
+  private Optional<String> context = Optional.empty();
 
   private boolean autoAdjustRanges = true;
   private boolean useLocalIterators = false;
   private boolean useIsolatedScanners = false;
   private boolean offlineScan = false;
+  private boolean batchScan = false;
   private SamplerConfiguration samplerConfig = null;
   private Map<String,String> executionHints = Collections.emptyMap();
 
@@ -83,7 +93,7 @@ public class InputTableConfig implements Writable {
    * Returns the ranges to be queried in the configuration
    */
   public List<Range> getRanges() {
-    return ranges != null ? ranges : new ArrayList<>();
+    return ranges;
   }
 
   /**
@@ -107,23 +117,17 @@ public class InputTableConfig implements Writable {
     return columns != null ? columns : new HashSet<>();
   }
 
-  /**
-   * Set iterators on to be used in the query.
-   *
-   * @param iterators
-   *          the configurations for the iterators
-   * @since 1.6.0
-   */
-  public InputTableConfig setIterators(List<IteratorSetting> iterators) {
-    this.iterators = iterators;
-    return this;
+  public void addIterator(IteratorSetting cfg) {
+    if (this.iterators.isEmpty())
+      this.iterators = new LinkedHashMap<>();
+    this.iterators.put(cfg.getName(), cfg);
   }
 
   /**
    * Returns the iterators to be set on this configuration
    */
   public List<IteratorSetting> getIterators() {
-    return iterators != null ? iterators : new ArrayList<>();
+    return new LinkedList<>(iterators.values());
   }
 
   /**
@@ -217,7 +221,6 @@ public class InputTableConfig implements Writable {
    *
    * @param offlineScan
    *          the feature is enabled if true, disabled otherwise
-   * @since 1.6.0
    */
   public InputTableConfig setOfflineScan(boolean offlineScan) {
     this.offlineScan = offlineScan;
@@ -228,7 +231,6 @@ public class InputTableConfig implements Writable {
    * Determines whether a configuration has the offline table scan feature enabled.
    *
    * @return true if the feature is enabled, false otherwise
-   * @since 1.6.0
    * @see #setOfflineScan(boolean)
    */
   public boolean isOfflineScan() {
@@ -243,7 +245,6 @@ public class InputTableConfig implements Writable {
    *
    * @param useIsolatedScanners
    *          the feature is enabled if true, disabled otherwise
-   * @since 1.6.0
    */
   public InputTableConfig setUseIsolatedScanners(boolean useIsolatedScanners) {
     this.useIsolatedScanners = useIsolatedScanners;
@@ -254,55 +255,68 @@ public class InputTableConfig implements Writable {
    * Determines whether a configuration has isolation enabled.
    *
    * @return true if the feature is enabled, false otherwise
-   * @since 1.6.0
    * @see #setUseIsolatedScanners(boolean)
    */
   public boolean shouldUseIsolatedScanners() {
     return useIsolatedScanners;
   }
 
+  public void setUseBatchScan(boolean value) {
+    this.batchScan = value;
+  }
+
+  public boolean shouldBatchScan() {
+    return batchScan;
+  }
+
   /**
    * Set the sampler configuration to use when reading from the data.
    *
    * @see ScannerBase#setSamplerConfiguration(SamplerConfiguration)
    * @see InputFormatBase#setSamplerConfiguration(org.apache.hadoop.mapreduce.Job,
    *      SamplerConfiguration)
-   *
-   * @since 1.8.0
    */
   public void setSamplerConfiguration(SamplerConfiguration samplerConfiguration) {
     this.samplerConfig = samplerConfiguration;
   }
 
-  /**
-   *
-   * @since 1.8.0
-   */
   public SamplerConfiguration getSamplerConfiguration() {
     return samplerConfig;
   }
 
   /**
    * The execution hints to set on created scanners. See {@link ScannerBase#setExecutionHints(Map)}
-   *
-   * @since 2.0.0
    */
   public void setExecutionHints(Map<String,String> executionHints) {
     this.executionHints = executionHints;
   }
 
-  /**
-   * @since 2.0.0
-   */
   public Map<String,String> getExecutionHints() {
     return executionHints;
   }
 
+  public Optional<Authorizations> getScanAuths() {
+    return scanAuths;
+  }
+
+  public InputTableConfig setScanAuths(Authorizations scanAuths) {
+    this.scanAuths = Optional.of(scanAuths);
+    return this;
+  }
+
+  public Optional<String> getContext() {
+    return context;
+  }
+
+  public void setContext(String context) {
+    this.context = Optional.of(context);
+  }
+
   @Override
   public void write(DataOutput dataOutput) throws IOException {
     if (iterators != null) {
       dataOutput.writeInt(iterators.size());
-      for (IteratorSetting setting : iterators)
+      for (IteratorSetting setting : getIterators())
         setting.write(dataOutput);
     } else {
       dataOutput.writeInt(0);
@@ -340,7 +354,7 @@ public class InputTableConfig implements Writable {
       new SamplerConfigurationImpl(samplerConfig).write(dataOutput);
     }
 
-    if (executionHints == null || executionHints.size() == 0) {
+    if (executionHints.isEmpty()) {
       dataOutput.writeInt(0);
     } else {
       dataOutput.writeInt(executionHints.size());
@@ -356,9 +370,11 @@ public class InputTableConfig implements Writable {
     // load iterators
     long iterSize = dataInput.readInt();
     if (iterSize > 0)
-      iterators = new ArrayList<>();
-    for (int i = 0; i < iterSize; i++)
-      iterators.add(new IteratorSetting(dataInput));
+      iterators = new LinkedHashMap<>();
+    for (int i = 0; i < iterSize; i++) {
+      IteratorSetting newIter = new IteratorSetting(dataInput);
+      iterators.put(newIter.getName(), newIter);
+    }
     // load ranges
     long rangeSize = dataInput.readInt();
     if (rangeSize > 0)
@@ -419,17 +435,15 @@ public class InputTableConfig implements Writable {
       return false;
     if (useLocalIterators != that.useLocalIterators)
       return false;
-    if (columns != null ? !columns.equals(that.columns) : that.columns != null)
+    if (!Objects.equals(columns, that.columns))
       return false;
-    if (iterators != null ? !iterators.equals(that.iterators) : that.iterators != null)
+    if (!Objects.equals(iterators, that.iterators))
       return false;
-    if (ranges != null ? !ranges.equals(that.ranges) : that.ranges != null)
+    if (!Objects.equals(ranges, that.ranges))
       return false;
-    if (executionHints != null ? !executionHints.equals(that.executionHints)
-        : that.executionHints != null)
+    if (!Objects.equals(executionHints, that.executionHints))
       return false;
-    return samplerConfig != null ? samplerConfig.equals(that.samplerConfig)
-        : that.samplerConfig == null;
+    return Objects.equals(samplerConfig, that.samplerConfig);
   }
 
   @Override
diff --git a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/lib/InputConfigurator.java b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/lib/InputConfigurator.java
index f52b427..e1da2b9 100644
--- a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/lib/InputConfigurator.java
+++ b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/lib/InputConfigurator.java
@@ -780,7 +780,7 @@ public class InputConfigurator extends ConfiguratorBase {
       InputTableConfig queryConfig = new InputTableConfig();
       List<IteratorSetting> itrs = getIterators(implementingClass, conf);
       if (itrs != null)
-        queryConfig.setIterators(itrs);
+        itrs.forEach(itr -> queryConfig.addIterator(itr));
       Set<IteratorSetting.Column> columns = getFetchedColumns(implementingClass, conf);
       if (columns != null)
         queryConfig.fetchColumns(columns);
diff --git a/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/its/mapred/MultiTableInputFormatIT.java b/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/its/mapred/MultiTableInputFormatIT.java
new file mode 100644
index 0000000..95244fe
--- /dev/null
+++ b/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/its/mapred/MultiTableInputFormatIT.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.hadoop.its.mapred;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.accumulo.core.client.AccumuloClient;
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.hadoop.mapred.AccumuloInputFormat;
+import org.apache.accumulo.hadoopImpl.mapred.RangeInputSplit;
+import org.apache.accumulo.harness.AccumuloClusterHarness;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.lib.NullOutputFormat;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.junit.Test;
+
+public class MultiTableInputFormatIT extends AccumuloClusterHarness {
+
+  private static AssertionError e1 = null;
+  private static AssertionError e2 = null;
+
+  private static class MRTester extends Configured implements Tool {
+    private static class TestMapper implements Mapper<Key,Value,Key,Value> {
+      Key key = null;
+      int count = 0;
+
+      @Override
+      public void map(Key k, Value v, OutputCollector<Key,Value> output, Reporter reporter)
+          throws IOException {
+        try {
+          String tableName = ((RangeInputSplit) reporter.getInputSplit()).getTableName();
+          if (key != null)
+            assertEquals(key.getRow().toString(), new String(v.get()));
+          assertEquals(new Text(String.format("%s_%09x", tableName, count + 1)), k.getRow());
+          assertEquals(String.format("%s_%09x", tableName, count), new String(v.get()));
+        } catch (AssertionError e) {
+          e1 = e;
+        }
+        key = new Key(k);
+        count++;
+      }
+
+      @Override
+      public void configure(JobConf job) {}
+
+      @Override
+      public void close() throws IOException {
+        try {
+          assertEquals(100, count);
+        } catch (AssertionError e) {
+          e2 = e;
+        }
+      }
+
+    }
+
+    @Override
+    public int run(String[] args) throws Exception {
+
+      if (args.length != 2) {
+        throw new IllegalArgumentException(
+            "Usage : " + MRTester.class.getName() + " <table1> <table2>");
+      }
+
+      String table1 = args[0];
+      String table2 = args[1];
+
+      JobConf job = new JobConf(getConf());
+      job.setJarByClass(this.getClass());
+
+      job.setInputFormat(AccumuloInputFormat.class);
+
+      AccumuloInputFormat.configure().clientProperties(getClientInfo().getProperties())
+          .table(table1).table(table2).store(job);
+
+      job.setMapperClass(TestMapper.class);
+      job.setMapOutputKeyClass(Key.class);
+      job.setMapOutputValueClass(Value.class);
+      job.setOutputFormat(NullOutputFormat.class);
+
+      job.setNumReduceTasks(0);
+
+      return JobClient.runJob(job).isSuccessful() ? 0 : 1;
+    }
+
+    public static void main(String[] args) throws Exception {
+      Configuration conf = new Configuration();
+      conf.set("mapreduce.framework.name", "local");
+      conf.set("mapreduce.cluster.local.dir",
+          new File(System.getProperty("user.dir"), "target/mapreduce-tmp").getAbsolutePath());
+      assertEquals(0, ToolRunner.run(conf, new MRTester(), args));
+    }
+  }
+
+  @Test
+  public void testMap() throws Exception {
+    String[] tableNames = getUniqueNames(2);
+    String table1 = tableNames[0];
+    String table2 = tableNames[1];
+    try (AccumuloClient c = createAccumuloClient()) {
+      c.tableOperations().create(table1);
+      c.tableOperations().create(table2);
+      BatchWriter bw = c.createBatchWriter(table1, new BatchWriterConfig());
+      BatchWriter bw2 = c.createBatchWriter(table2, new BatchWriterConfig());
+      for (int i = 0; i < 100; i++) {
+        Mutation t1m = new Mutation(new Text(String.format("%s_%09x", table1, i + 1)));
+        t1m.put(new Text(), new Text(), new Value(String.format("%s_%09x", table1, i).getBytes()));
+        bw.addMutation(t1m);
+        Mutation t2m = new Mutation(new Text(String.format("%s_%09x", table2, i + 1)));
+        t2m.put(new Text(), new Text(), new Value(String.format("%s_%09x", table2, i).getBytes()));
+        bw2.addMutation(t2m);
+      }
+      bw.close();
+      bw2.close();
+
+      MRTester.main(new String[] {table1, table2});
+      assertNull(e1);
+      assertNull(e2);
+    }
+  }
+
+}
diff --git a/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/its/mapreduce/MultiTableInputFormatIT.java b/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/its/mapreduce/MultiTableInputFormatIT.java
new file mode 100644
index 0000000..07349dd
--- /dev/null
+++ b/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/its/mapreduce/MultiTableInputFormatIT.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.hadoop.its.mapreduce;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.accumulo.core.client.AccumuloClient;
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.hadoop.mapreduce.AccumuloInputFormat;
+import org.apache.accumulo.hadoopImpl.mapreduce.RangeInputSplit;
+import org.apache.accumulo.harness.AccumuloClusterHarness;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.junit.Test;
+
+public class MultiTableInputFormatIT extends AccumuloClusterHarness {
+
+  private static AssertionError e1 = null;
+  private static AssertionError e2 = null;
+
+  private static class MRTester extends Configured implements Tool {
+
+    private static class TestMapper extends Mapper<Key,Value,Key,Value> {
+      Key key = null;
+      int count = 0;
+
+      @Override
+      protected void map(Key k, Value v, Context context) throws IOException, InterruptedException {
+        try {
+          String tableName = ((RangeInputSplit) context.getInputSplit()).getTableName();
+          if (key != null)
+            assertEquals(key.getRow().toString(), new String(v.get()));
+          assertEquals(new Text(String.format("%s_%09x", tableName, count + 1)), k.getRow());
+          assertEquals(String.format("%s_%09x", tableName, count), new String(v.get()));
+        } catch (AssertionError e) {
+          e1 = e;
+        }
+        key = new Key(k);
+        count++;
+      }
+
+      @Override
+      protected void cleanup(Context context) throws IOException, InterruptedException {
+        try {
+          assertEquals(100, count);
+        } catch (AssertionError e) {
+          e2 = e;
+        }
+      }
+    }
+
+    @Override
+    public int run(String[] args) throws Exception {
+
+      if (args.length != 2) {
+        throw new IllegalArgumentException(
+            "Usage : " + MRTester.class.getName() + " <table1> <table2>");
+      }
+
+      String table1 = args[0];
+      String table2 = args[1];
+
+      Job job = Job.getInstance(getConf(),
+          this.getClass().getSimpleName() + "_" + System.currentTimeMillis());
+      job.setJarByClass(this.getClass());
+
+      job.setInputFormatClass(AccumuloInputFormat.class);
+
+      AccumuloInputFormat.configure().clientProperties(getClientInfo().getProperties())
+          .table(table1).table(table2).store(job);
+
+      job.setMapperClass(TestMapper.class);
+      job.setMapOutputKeyClass(Key.class);
+      job.setMapOutputValueClass(Value.class);
+      job.setOutputFormatClass(NullOutputFormat.class);
+
+      job.setNumReduceTasks(0);
+
+      job.waitForCompletion(true);
+
+      return job.isSuccessful() ? 0 : 1;
+    }
+
+    public static void main(String[] args) throws Exception {
+      Configuration conf = new Configuration();
+      conf.set("mapreduce.framework.name", "local");
+      conf.set("mapreduce.cluster.local.dir",
+          new File(System.getProperty("user.dir"), "target/mapreduce-tmp").getAbsolutePath());
+      assertEquals(0, ToolRunner.run(conf, new MRTester(), args));
+    }
+  }
+
+  /**
+   * Generate incrementing counts and attach table name to the key/value so that order and
+   * multi-table data can be verified.
+   */
+  @Test
+  public void testMap() throws Exception {
+    String[] tableNames = getUniqueNames(2);
+    String table1 = tableNames[0];
+    String table2 = tableNames[1];
+    try (AccumuloClient c = createAccumuloClient()) {
+      c.tableOperations().create(table1);
+      c.tableOperations().create(table2);
+      BatchWriter bw = c.createBatchWriter(table1, new BatchWriterConfig());
+      BatchWriter bw2 = c.createBatchWriter(table2, new BatchWriterConfig());
+      for (int i = 0; i < 100; i++) {
+        Mutation t1m = new Mutation(new Text(String.format("%s_%09x", table1, i + 1)));
+        t1m.put(new Text(), new Text(), new Value(String.format("%s_%09x", table1, i).getBytes()));
+        bw.addMutation(t1m);
+        Mutation t2m = new Mutation(new Text(String.format("%s_%09x", table2, i + 1)));
+        t2m.put(new Text(), new Text(), new Value(String.format("%s_%09x", table2, i).getBytes()));
+        bw2.addMutation(t2m);
+      }
+      bw.close();
+      bw2.close();
+
+      MRTester.main(new String[] {table1, table2});
+      assertNull(e1);
+      assertNull(e2);
+    }
+  }
+
+}
diff --git a/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/mapred/AccumuloInputFormatTest.java b/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/mapred/AccumuloInputFormatTest.java
index 2401477..65e1ddf 100644
--- a/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/mapred/AccumuloInputFormatTest.java
+++ b/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/mapred/AccumuloInputFormatTest.java
@@ -40,6 +40,7 @@ import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.ExpectedException;
 import org.junit.rules.TestName;
 
 public class AccumuloInputFormatTest {
@@ -49,6 +50,8 @@ public class AccumuloInputFormatTest {
 
   @Rule
   public TestName test = new TestName();
+  @Rule
+  public ExpectedException exception = ExpectedException.none();
 
   @Before
   public void createJob() {
@@ -61,6 +64,14 @@ public class AccumuloInputFormatTest {
         .setupClientProperties();
   }
 
+  @Test
+  public void testMissingTable() throws Exception {
+    Properties clientProps = org.apache.accumulo.hadoop.mapreduce.AccumuloInputFormatTest
+        .setupClientProperties();
+    exception.expect(IllegalArgumentException.class);
+    AccumuloInputFormat.configure().clientProperties(clientProps).store(new JobConf());
+  }
+
   /**
    * Check that the iterator configuration is getting stored in the Job conf correctly.
    */
diff --git a/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/mapred/MultiTableInputFormatTest.java b/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/mapred/MultiTableInputFormatTest.java
new file mode 100644
index 0000000..775596a
--- /dev/null
+++ b/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/mapred/MultiTableInputFormatTest.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.hadoop.mapred;
+
+import static java.util.Collections.singleton;
+import static java.util.Collections.singletonList;
+import static org.junit.Assert.assertEquals;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+
+import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.client.IteratorSetting.Column;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.hadoop.mapreduce.InputFormatBuilder;
+import org.apache.accumulo.hadoopImpl.mapreduce.InputTableConfig;
+import org.apache.accumulo.hadoopImpl.mapreduce.lib.InputConfigurator;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobConf;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+
+public class MultiTableInputFormatTest {
+  public static final Class<AccumuloInputFormat> CLASS = AccumuloInputFormat.class;
+
+  @Rule
+  public TestName testName = new TestName();
+
+  /**
+   * Verify {@link InputTableConfig} objects get correctly serialized in the JobContext.
+   */
+  @Test
+  public void testStoreTables() throws Exception {
+    String table1Name = testName.getMethodName() + "1";
+    String table2Name = testName.getMethodName() + "2";
+    JobConf job = new JobConf();
+    Properties clientProps = org.apache.accumulo.hadoop.mapreduce.AccumuloInputFormatTest
+        .setupClientProperties();
+    List<Range> ranges = singletonList(new Range("a", "b"));
+    Set<IteratorSetting.Column> cols = singleton(
+        new IteratorSetting.Column(new Text("CF1"), new Text("CQ1")));
+    IteratorSetting iter1 = new IteratorSetting(50, "iter1", "iterclass1");
+    IteratorSetting iter2 = new IteratorSetting(60, "iter2", "iterclass2");
+    List<IteratorSetting> allIters = new ArrayList<>();
+    allIters.add(iter1);
+    allIters.add(iter2);
+
+    // if auths are not set client will try to get from server, we dont want that here
+    Authorizations auths = Authorizations.EMPTY;
+
+    // @formatter:off
+    AccumuloInputFormat.configure().clientProperties(clientProps)
+        .table(table1Name).auths(auths).ranges(ranges).fetchColumns(cols).addIterator(iter1)
+        .addIterator(iter2).localIterators(true).offlineScan(true) // end table 1
+        .table(table2Name).auths(auths).ranges(ranges).fetchColumns(cols).addIterator(iter2) // end
+        .store(job);
+    // @formatter:on
+
+    InputTableConfig table1 = new InputTableConfig();
+    table1.setScanAuths(auths).setRanges(ranges).fetchColumns(cols).setUseLocalIterators(true)
+        .setOfflineScan(true);
+    allIters.forEach(itr -> table1.addIterator(itr));
+    InputTableConfig table2 = new InputTableConfig();
+    table2.setScanAuths(auths).setRanges(ranges).fetchColumns(cols).addIterator(iter2);
+
+    assertEquals(table1, InputConfigurator.getInputTableConfig(CLASS, job, table1Name));
+    assertEquals(table2, InputConfigurator.getInputTableConfig(CLASS, job, table2Name));
+  }
+
+  @Test
+  public void testManyTables() throws Exception {
+    JobConf job = new JobConf();
+    Properties clientProps = org.apache.accumulo.hadoop.mapreduce.AccumuloInputFormatTest
+        .setupClientProperties();
+
+    // if auths are not set client will try to get from server, we dont want that here
+    Authorizations auths = Authorizations.EMPTY;
+
+    // set the client properties once then loop over tables
+    InputFormatBuilder.TableParams<JobConf> opts = AccumuloInputFormat.configure()
+        .clientProperties(clientProps);
+    for (int i = 0; i < 10_000; i++) {
+      List<Range> ranges = singletonList(new Range("a" + i, "b" + i));
+      Set<Column> cols = singleton(new Column(new Text("CF" + i), new Text("CQ" + i)));
+      IteratorSetting iter = new IteratorSetting(50, "iter" + i, "iterclass" + i);
+      opts.table("table" + i).auths(auths).ranges(ranges).fetchColumns(cols).addIterator(iter);
+    }
+    opts.store(job);
+
+    // verify
+    Map<String,InputTableConfig> configs = InputConfigurator.getInputTableConfigs(CLASS, job);
+    assertEquals(10_000, configs.size());
+
+    // create objects to test against
+    for (int i = 0; i < 10_000; i++) {
+      InputTableConfig t = new InputTableConfig();
+      List<Range> ranges = singletonList(new Range("a" + i, "b" + i));
+      Set<Column> cols = singleton(new Column(new Text("CF" + i), new Text("CQ" + i)));
+      IteratorSetting iter = new IteratorSetting(50, "iter" + i, "iterclass" + i);
+      t.setScanAuths(auths).setRanges(ranges).fetchColumns(cols).addIterator(iter);
+      assertEquals(t, configs.get("table" + i));
+    }
+  }
+}
diff --git a/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/mapreduce/AccumuloInputFormatTest.java b/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/mapreduce/AccumuloInputFormatTest.java
index 227eb84..ac22733 100644
--- a/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/mapreduce/AccumuloInputFormatTest.java
+++ b/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/mapreduce/AccumuloInputFormatTest.java
@@ -39,9 +39,14 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.Job;
 import org.junit.BeforeClass;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.ExpectedException;
 
 public class AccumuloInputFormatTest {
+  @Rule
+  public ExpectedException exception = ExpectedException.none();
+
   static Properties clientProperties;
 
   @BeforeClass
@@ -59,6 +64,14 @@ public class AccumuloInputFormatTest {
     return cp;
   }
 
+  @Test
+  public void testMissingTable() throws Exception {
+    Properties clientProps = org.apache.accumulo.hadoop.mapreduce.AccumuloInputFormatTest
+        .setupClientProperties();
+    exception.expect(IllegalArgumentException.class);
+    AccumuloInputFormat.configure().clientProperties(clientProps).store(Job.getInstance());
+  }
+
   /**
    * Check that the iterator configuration is getting stored in the Job conf correctly.
    */
diff --git a/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/mapreduce/MultiTableInputFormatTest.java b/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/mapreduce/MultiTableInputFormatTest.java
new file mode 100644
index 0000000..8cd353b
--- /dev/null
+++ b/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/mapreduce/MultiTableInputFormatTest.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.hadoop.mapreduce;
+
+import static java.util.Collections.singleton;
+import static java.util.Collections.singletonList;
+import static org.junit.Assert.assertEquals;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+
+import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.client.IteratorSetting.Column;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.hadoopImpl.mapreduce.InputTableConfig;
+import org.apache.accumulo.hadoopImpl.mapreduce.lib.InputConfigurator;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+
+public class MultiTableInputFormatTest {
+  public static final Class<AccumuloInputFormat> CLASS = AccumuloInputFormat.class;
+
+  @Rule
+  public TestName testName = new TestName();
+
+  /**
+   * Verify {@link InputTableConfig} objects get correctly serialized in the JobContext.
+   */
+  @Test
+  public void testStoreTables() throws Exception {
+    String table1Name = testName.getMethodName() + "1";
+    String table2Name = testName.getMethodName() + "2";
+    Job job = Job.getInstance();
+    Properties clientProps = org.apache.accumulo.hadoop.mapreduce.AccumuloInputFormatTest
+        .setupClientProperties();
+    List<Range> ranges = singletonList(new Range("a", "b"));
+    Set<IteratorSetting.Column> cols = singleton(
+        new IteratorSetting.Column(new Text("CF1"), new Text("CQ1")));
+    IteratorSetting iter1 = new IteratorSetting(50, "iter1", "iterclass1");
+    IteratorSetting iter2 = new IteratorSetting(60, "iter2", "iterclass2");
+    List<IteratorSetting> allIters = new ArrayList<>();
+    allIters.add(iter1);
+    allIters.add(iter2);
+
+    // if auths are not set client will try to get from server, we dont want that here
+    Authorizations auths = Authorizations.EMPTY;
+
+    // @formatter:off
+    AccumuloInputFormat.configure().clientProperties(clientProps)
+        .table(table1Name).auths(auths).ranges(ranges).fetchColumns(cols).addIterator(iter1)
+        .addIterator(iter2).localIterators(true).offlineScan(true) // end table 1
+        .table(table2Name).auths(auths).ranges(ranges).fetchColumns(cols).addIterator(iter2) // end
+        .store(job);
+    // @formatter:on
+
+    InputTableConfig table1 = new InputTableConfig();
+    table1.setScanAuths(auths).setRanges(ranges).fetchColumns(cols).setUseLocalIterators(true)
+        .setOfflineScan(true);
+    allIters.forEach(itr -> table1.addIterator(itr));
+    InputTableConfig table2 = new InputTableConfig();
+    table2.setScanAuths(auths).setRanges(ranges).fetchColumns(cols).addIterator(iter2);
+
+    Configuration jc = job.getConfiguration();
+    assertEquals(table1, InputConfigurator.getInputTableConfig(CLASS, jc, table1Name));
+    assertEquals(table2, InputConfigurator.getInputTableConfig(CLASS, jc, table2Name));
+  }
+
+  @Test
+  public void testManyTables() throws Exception {
+    Job job = Job.getInstance();
+    Properties clientProps = org.apache.accumulo.hadoop.mapreduce.AccumuloInputFormatTest
+        .setupClientProperties();
+
+    // if auths are not set client will try to get from server, we dont want that here
+    Authorizations auths = Authorizations.EMPTY;
+
+    // set the client properties once then loop over tables
+    InputFormatBuilder.TableParams<Job> opts = AccumuloInputFormat.configure()
+        .clientProperties(clientProps);
+    for (int i = 0; i < 10_000; i++) {
+      List<Range> ranges = singletonList(new Range("a" + i, "b" + i));
+      Set<Column> cols = singleton(new Column(new Text("CF" + i), new Text("CQ" + i)));
+      IteratorSetting iter = new IteratorSetting(50, "iter" + i, "iterclass" + i);
+      opts.table("table" + i).auths(auths).ranges(ranges).fetchColumns(cols).addIterator(iter);
+    }
+    opts.store(job);
+
+    // verify
+    Map<String,InputTableConfig> configs = InputConfigurator.getInputTableConfigs(CLASS,
+        job.getConfiguration());
+    assertEquals(10_000, configs.size());
+
+    // create objects to test against
+    for (int i = 0; i < 10_000; i++) {
+      InputTableConfig t = new InputTableConfig();
+      List<Range> ranges = singletonList(new Range("a" + i, "b" + i));
+      Set<Column> cols = singleton(new Column(new Text("CF" + i), new Text("CQ" + i)));
+      IteratorSetting iter = new IteratorSetting(50, "iter" + i, "iterclass" + i);
+      t.setScanAuths(auths).setRanges(ranges).fetchColumns(cols).addIterator(iter);
+      assertEquals(t, configs.get("table" + i));
+    }
+  }
+}
diff --git a/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoopImpl/mapreduce/InputTableConfigTest.java b/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoopImpl/mapreduce/InputTableConfigTest.java
index 0d25fee..74ab698 100644
--- a/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoopImpl/mapreduce/InputTableConfigTest.java
+++ b/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoopImpl/mapreduce/InputTableConfigTest.java
@@ -97,7 +97,7 @@ public class InputTableConfigTest {
     List<IteratorSetting> settings = new ArrayList<>();
     settings.add(new IteratorSetting(50, "iter", "iterclass"));
     settings.add(new IteratorSetting(55, "iter2", "iterclass2"));
-    tableQueryConfig.setIterators(settings);
+    settings.forEach(itr -> tableQueryConfig.addIterator(itr));
     byte[] serialized = serialize(tableQueryConfig);
     InputTableConfig actualConfig = deserialize(serialized);
     assertEquals(actualConfig.getIterators(), settings);


Mime
View raw message