accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cjno...@apache.org
Subject [4/5] git commit: ACCUMULO-391 AbstractRecordReader created to help common functionality.
Date Wed, 16 Oct 2013 00:22:12 GMT
ACCUMULO-391 AbstractRecordReader created to help common functionality.


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/ebd11205
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/ebd11205
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/ebd11205

Branch: refs/heads/master
Commit: ebd112056017d3bbe32c62329ca31fecf6c22fea
Parents: f819767
Author: Corey J. Nolet <cjnolet@gmail.com>
Authored: Mon Oct 14 01:24:37 2013 -0400
Committer: Corey J. Nolet <cjnolet@gmail.com>
Committed: Tue Oct 15 20:22:07 2013 -0400

----------------------------------------------------------------------
 .../client/mapreduce/AbstractInputFormat.java   | 47 +++++++-------------
 .../client/mapreduce/AccumuloInputFormat.java   |  7 ++-
 .../AccumuloMultiTableInputFormat.java          | 13 +++++-
 .../mapreduce/AccumuloRowInputFormat.java       |  3 ++
 .../core/client/mapreduce/BatchScanConfig.java  |  1 -
 .../core/client/mapreduce/InputFormatBase.java  | 17 ++++++-
 6 files changed, 53 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/ebd11205/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AbstractInputFormat.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AbstractInputFormat.java
b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AbstractInputFormat.java
index 503718e..d86e111 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AbstractInputFormat.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AbstractInputFormat.java
@@ -1,7 +1,5 @@
 package org.apache.accumulo.core.client.mapreduce;
 
-import static com.google.common.base.Preconditions.checkNotNull;
-
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
@@ -21,7 +19,6 @@ import org.apache.accumulo.core.client.ClientSideIteratorScanner;
 import org.apache.accumulo.core.client.Connector;
 import org.apache.accumulo.core.client.Instance;
 import org.apache.accumulo.core.client.IsolatedScanner;
-import org.apache.accumulo.core.client.IteratorSetting;
 import org.apache.accumulo.core.client.RowIterator;
 import org.apache.accumulo.core.client.Scanner;
 import org.apache.accumulo.core.client.TableDeletedException;
@@ -58,6 +55,11 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.log4j.Level;
 import org.apache.log4j.Logger;
 
+/**
+ * 
+ * @param <K>
+ * @param <V>
+ */
 public abstract class AbstractInputFormat<K,V> extends InputFormat<K,V> {
 
   protected static final Class<?> CLASS = AccumuloInputFormat.class;
@@ -328,27 +330,12 @@ public abstract class AbstractInputFormat<K,V> extends InputFormat<K,V>
{
    * <li>int {@link #numKeysRead} (used for progress reporting)</li>
    * </ul>
    */
-  protected abstract static class RecordReaderBase<K,V> extends RecordReader<K,V>
{
+  protected abstract static class AbstractRecordReader<K,V> extends RecordReader<K,V>
{
     protected long numKeysRead;
     protected Iterator<Map.Entry<Key,Value>> scannerIterator;
     protected RangeInputSplit split;
 
-    /**
-     * Apply the configured iterators from the configuration to the scanner. This applies
both the default iterators and the per-table iterators.
-     * 
-     * @param context
-     *          the Hadoop context for the configured job
-     * @param scanner
-     *          the scanner to configure
-     * @param tableName
-     *          the table name for which to set up the iterators
-     */
-    protected void setupIterators(TaskAttemptContext context, Scanner scanner, String tableName)
{
-      BatchScanConfig config = getBatchScanConfig(context, tableName);
-      List<IteratorSetting> iterators = config.getIterators();
-      for (IteratorSetting iterator : iterators)
-        scanner.addScanIterator(iterator);
-    }
+    protected abstract void setupIterators(TaskAttemptContext context, Scanner scanner, String
tableName);
 
     /**
      * Initialize a scanner over the given input split using this task attempt configuration.
@@ -546,18 +533,18 @@ public abstract class AbstractInputFormat<K,V> extends InputFormat<K,V>
{
   /**
    * Gets the splits of the tables that have been set on the job.
    * 
-   * @param conf
+   * @param context
    *          the configuration of the job
    * @return the splits from the tables based on the ranges.
    * @throws java.io.IOException
    *           if a table set on the job doesn't exist or an error occurs initializing the
tablet locator
    */
-  public List<InputSplit> getSplits(JobContext conf) throws IOException {
-    log.setLevel(getLogLevel(conf));
-    validateOptions(conf);
+  public List<InputSplit> getSplits(JobContext context) throws IOException {
+    log.setLevel(getLogLevel(context));
+    validateOptions(context);
 
     LinkedList<InputSplit> splits = new LinkedList<InputSplit>();
-    List<BatchScanConfig> tableConfigs = getBatchScanConfigs(conf);
+    List<BatchScanConfig> tableConfigs = getBatchScanConfigs(context);
     for (BatchScanConfig tableConfig : tableConfigs) {
 
       boolean autoAdjust = tableConfig.shouldAutoAdjustRanges();
@@ -573,19 +560,19 @@ public abstract class AbstractInputFormat<K,V> extends InputFormat<K,V>
{
       TabletLocator tl;
       try {
         if (tableConfig.isOfflineScan()) {
-          binnedRanges = binOfflineTable(conf, tableConfig.getTableName(), ranges);
+          binnedRanges = binOfflineTable(context, tableConfig.getTableName(), ranges);
           while (binnedRanges == null) {
             // Some tablets were still online, try again
             UtilWaitThread.sleep(100 + (int) (Math.random() * 100)); // sleep randomly between
100 and 200 ms
-            binnedRanges = binOfflineTable(conf, tableConfig.getTableName(), ranges);
+            binnedRanges = binOfflineTable(context, tableConfig.getTableName(), ranges);
 
           }
         } else {
-          Instance instance = getInstance(conf);
-          tl = getTabletLocator(conf, tableConfig.getTableName());
+          Instance instance = getInstance(context);
+          tl = getTabletLocator(context, tableConfig.getTableName());
           // its possible that the cache could contain complete, but old information about
a tables tablets... so clear it
           tl.invalidateCache();
-          Credentials creds = new Credentials(getPrincipal(conf), getAuthenticationToken(conf));
+          Credentials creds = new Credentials(getPrincipal(context), getAuthenticationToken(context));
 
           while (!tl.binRanges(creds, ranges, binnedRanges).isEmpty()) {
             if (!(instance instanceof MockInstance)) {

http://git-wip-us.apache.org/repos/asf/accumulo/blob/ebd11205/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormat.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormat.java
b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormat.java
index c7dfda5..cf4b376 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormat.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormat.java
@@ -17,8 +17,11 @@
 package org.apache.accumulo.core.client.mapreduce;
 
 import java.io.IOException;
+import java.util.List;
 import java.util.Map.Entry;
 
+import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.client.Scanner;
 import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.Value;
@@ -38,19 +41,19 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext;
  * 
  * <ul>
  * <li>{@link AccumuloInputFormat#setConnectorInfo(Job, String, AuthenticationToken)}
- * <li>{@link AccumuloInputFormat#setConnectorInfo(Job, String, String)}
  * <li>{@link AccumuloInputFormat#setScanAuthorizations(Job, Authorizations)}
  * <li>{@link AccumuloInputFormat#setZooKeeperInstance(Job, String, String)} OR {@link
AccumuloInputFormat#setMockInstance(Job, String)}
- * <li>{@link AccumuloInputFormat#setTableQueryConfigs(Job, BatchScanConfig...)} 
  * </ul>
  * 
  * Other static methods are optional.
  */
 public class AccumuloInputFormat extends InputFormatBase<Key,Value> {
+
   @Override
   public RecordReader<Key,Value> createRecordReader(InputSplit split, TaskAttemptContext
context) throws IOException, InterruptedException {
     log.setLevel(getLogLevel(context));
     return new RecordReaderBase<Key,Value>() {
+
       @Override
       public boolean nextKeyValue() throws IOException, InterruptedException {
         if (scannerIterator.hasNext()) {

http://git-wip-us.apache.org/repos/asf/accumulo/blob/ebd11205/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloMultiTableInputFormat.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloMultiTableInputFormat.java
b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloMultiTableInputFormat.java
index f92de4b..7f8b47a 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloMultiTableInputFormat.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloMultiTableInputFormat.java
@@ -1,5 +1,7 @@
 package org.apache.accumulo.core.client.mapreduce;
 
+import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.client.Scanner;
 import org.apache.accumulo.core.client.mapreduce.lib.util.InputConfigurator;
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.Value;
@@ -10,6 +12,7 @@ import org.apache.hadoop.mapreduce.RecordReader;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 
 import java.io.IOException;
+import java.util.List;
 import java.util.Map;
 
 import static com.google.common.base.Preconditions.checkNotNull;
@@ -33,7 +36,7 @@ public class AccumuloMultiTableInputFormat extends AbstractInputFormat<Key,Value
   @Override
   public RecordReader<Key, Value> createRecordReader(InputSplit inputSplit, TaskAttemptContext
context) throws IOException, InterruptedException {
     log.setLevel(getLogLevel(context));
-    return new RecordReaderBase<Key,Value>() {
+    return new AbstractRecordReader<Key, Value>() {
       @Override
       public boolean nextKeyValue() throws IOException, InterruptedException {
         if (scannerIterator.hasNext()) {
@@ -47,6 +50,14 @@ public class AccumuloMultiTableInputFormat extends AbstractInputFormat<Key,Value
         }
         return false;
       }
+
+      @Override
+      protected void setupIterators(TaskAttemptContext context, Scanner scanner, String tableName)
{
+        List<IteratorSetting> iterators = getBatchScanConfig(context, tableName).getIterators();
+        for(IteratorSetting setting : iterators) {
+          scanner.addScanIterator(setting);
+        }
+      }
     };
   }
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/ebd11205/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloRowInputFormat.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloRowInputFormat.java
b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloRowInputFormat.java
index 992990d..92ceec1 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloRowInputFormat.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloRowInputFormat.java
@@ -17,9 +17,12 @@
 package org.apache.accumulo.core.client.mapreduce;
 
 import java.io.IOException;
+import java.util.List;
 import java.util.Map.Entry;
 
+import org.apache.accumulo.core.client.IteratorSetting;
 import org.apache.accumulo.core.client.RowIterator;
+import org.apache.accumulo.core.client.Scanner;
 import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.Value;

http://git-wip-us.apache.org/repos/asf/accumulo/blob/ebd11205/core/src/main/java/org/apache/accumulo/core/client/mapreduce/BatchScanConfig.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/BatchScanConfig.java
b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/BatchScanConfig.java
index d084b1a..eae973d 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/BatchScanConfig.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/BatchScanConfig.java
@@ -172,7 +172,6 @@ public class BatchScanConfig implements Writable {
    * @return true if the feature is enabled, false otherwise
    * @since 1.6.0
    * @see #setUseLocalIterators(boolean)
-   * @deprecated since 1.6.0
    */
   public boolean shouldUseLocalIterators() {
     return useLocalIterators;

http://git-wip-us.apache.org/repos/asf/accumulo/blob/ebd11205/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java
b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java
index ad9c454..f7057f1 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java
@@ -24,6 +24,7 @@ import java.util.Set;
 import org.apache.accumulo.core.client.ClientSideIteratorScanner;
 import org.apache.accumulo.core.client.IsolatedScanner;
 import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.client.Scanner;
 import org.apache.accumulo.core.client.TableNotFoundException;
 import org.apache.accumulo.core.client.impl.TabletLocator;
 import org.apache.accumulo.core.client.mapreduce.lib.util.InputConfigurator;
@@ -304,9 +305,23 @@ public abstract class InputFormatBase<K,V> extends AbstractInputFormat<K,V>
{
    * @throws org.apache.accumulo.core.client.TableNotFoundException
    *           if the table name set on the configuration doesn't exist
    * @since 1.5.0
+   * @deprecated since 1.6.0
    */
+  @Deprecated
   protected static TabletLocator getTabletLocator(JobContext context) throws TableNotFoundException
{
     return InputConfigurator.getTabletLocator(CLASS, getConfiguration(context), InputConfigurator.getInputTableName(CLASS,
getConfiguration(context)));
   }
-
+  
+  protected abstract static class RecordReaderBase<K,V> extends AbstractRecordReader<K,V>
{
+      @Override
+      protected void setupIterators(TaskAttemptContext context, Scanner scanner, String tableName)
{
+        setupIterators(context, scanner);
+      }
+      
+      protected void setupIterators(TaskAttemptContext context, Scanner scanner) {
+        List<IteratorSetting> iterators = getIterators(context);
+        for (IteratorSetting iterator : iterators) 
+          scanner.addScanIterator(iterator);
+      }
+  }
 }


Mime
View raw message