accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ktur...@apache.org
Subject [8/9] accumulo git commit: ACCUMULO-3602 ACCUMULO-3657 Minimize AccumuloInputSplit in API
Date Tue, 21 Apr 2015 22:23:49 GMT
ACCUMULO-3602 ACCUMULO-3657 Minimize AccumuloInputSplit in API


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/d1e6e79c
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/d1e6e79c
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/d1e6e79c

Branch: refs/heads/master
Commit: d1e6e79cf12ee420dd1d20fd605723f0e5505f68
Parents: c625291
Author: Keith Turner <kturner@apache.org>
Authored: Tue Apr 21 17:30:48 2015 -0400
Committer: Keith Turner <kturner@apache.org>
Committed: Tue Apr 21 17:53:32 2015 -0400

----------------------------------------------------------------------
 .../core/client/mapred/AbstractInputFormat.java | 58 ++++++++--------
 .../client/mapreduce/AbstractInputFormat.java   | 56 ++++++++-------
 .../core/client/mapreduce/RangeInputSplit.java  | 24 ++-----
 .../mapreduce/impl/AccumuloInputSplit.java      | 73 +++++++++-----------
 .../client/mapreduce/impl/BatchInputSplit.java  | 21 ++----
 .../core/client/mapreduce/impl/SplitUtils.java  | 59 ++++++++++++++++
 6 files changed, 162 insertions(+), 129 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/d1e6e79c/core/src/main/java/org/apache/accumulo/core/client/mapred/AbstractInputFormat.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/mapred/AbstractInputFormat.java
b/core/src/main/java/org/apache/accumulo/core/client/mapred/AbstractInputFormat.java
index b97d4de..f2e3a79 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/mapred/AbstractInputFormat.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/mapred/AbstractInputFormat.java
@@ -29,18 +29,18 @@ import java.util.Random;
 
 import org.apache.accumulo.core.client.AccumuloException;
 import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.BatchScanner;
 import org.apache.accumulo.core.client.ClientConfiguration;
 import org.apache.accumulo.core.client.ClientSideIteratorScanner;
 import org.apache.accumulo.core.client.Connector;
 import org.apache.accumulo.core.client.Instance;
 import org.apache.accumulo.core.client.IsolatedScanner;
+import org.apache.accumulo.core.client.IteratorSetting;
 import org.apache.accumulo.core.client.Scanner;
-import org.apache.accumulo.core.client.BatchScanner;
 import org.apache.accumulo.core.client.ScannerBase;
 import org.apache.accumulo.core.client.TableDeletedException;
 import org.apache.accumulo.core.client.TableNotFoundException;
 import org.apache.accumulo.core.client.TableOfflineException;
-import org.apache.accumulo.core.client.IteratorSetting;
 import org.apache.accumulo.core.client.admin.DelegationTokenConfig;
 import org.apache.accumulo.core.client.admin.SecurityOperations;
 import org.apache.accumulo.core.client.impl.AuthenticationTokenIdentifier;
@@ -54,6 +54,7 @@ import org.apache.accumulo.core.client.impl.TabletLocator;
 import org.apache.accumulo.core.client.mapred.impl.BatchInputSplit;
 import org.apache.accumulo.core.client.mapreduce.InputTableConfig;
 import org.apache.accumulo.core.client.mapreduce.impl.AccumuloInputSplit;
+import org.apache.accumulo.core.client.mapreduce.impl.SplitUtils;
 import org.apache.accumulo.core.client.mapreduce.lib.impl.ConfiguratorBase;
 import org.apache.accumulo.core.client.mapreduce.lib.impl.InputConfigurator;
 import org.apache.accumulo.core.client.mock.MockInstance;
@@ -394,7 +395,8 @@ public abstract class AbstractInputFormat<K,V> implements InputFormat<K,V>
{
   protected abstract static class AbstractRecordReader<K,V> implements RecordReader<K,V>
{
     protected long numKeysRead;
     protected Iterator<Map.Entry<Key,Value>> scannerIterator;
-    protected org.apache.accumulo.core.client.mapreduce.impl.AccumuloInputSplit split;
+    protected RangeInputSplit split;
+    private org.apache.accumulo.core.client.mapreduce.impl.AccumuloInputSplit aiSplit;
     protected ScannerBase scannerBase;
 
 
@@ -458,42 +460,42 @@ public abstract class AbstractInputFormat<K,V> implements InputFormat<K,V>
{
      * Initialize a scanner over the given input split using this task attempt configuration.
      */
     public void initialize(InputSplit inSplit, JobConf job) throws IOException {
-      split = (AccumuloInputSplit) inSplit;
-      log.debug("Initializing input split: " + split.toString());
+      aiSplit = (AccumuloInputSplit) inSplit;
+      log.debug("Initializing input split: " + aiSplit.toString());
 
-      Instance instance = split.getInstance(getClientConfiguration(job));
+      Instance instance = aiSplit.getInstance(getClientConfiguration(job));
       if (null == instance) {
         instance = getInstance(job);
       }
 
-      String principal = split.getPrincipal();
+      String principal = aiSplit.getPrincipal();
       if (null == principal) {
         principal = getPrincipal(job);
       }
 
-      AuthenticationToken token = split.getToken();
+      AuthenticationToken token = aiSplit.getToken();
       if (null == token) {
         token = getAuthenticationToken(job);
       }
 
-      Authorizations authorizations = split.getAuths();
+      Authorizations authorizations = aiSplit.getAuths();
       if (null == authorizations) {
         authorizations = getScanAuthorizations(job);
       }
 
-      String table = split.getTableName();
+      String table = aiSplit.getTableName();
 
       // in case the table name changed, we can still use the previous name for terms of
configuration,
       // but the scanner will use the table id resolved at job setup time
-      InputTableConfig tableConfig = getInputTableConfig(job, split.getTableName());
+      InputTableConfig tableConfig = getInputTableConfig(job, aiSplit.getTableName());
 
       log.debug("Creating connector with user: " + principal);
       log.debug("Creating scanner for table: " + table);
       log.debug("Authorizations are: " + authorizations);
 
-      if (split instanceof org.apache.accumulo.core.client.mapreduce.RangeInputSplit) {
-        org.apache.accumulo.core.client.mapreduce.RangeInputSplit rangeSplit = (org.apache.accumulo.core.client.mapreduce.RangeInputSplit)
split;
-
+      if (aiSplit instanceof RangeInputSplit) {
+        RangeInputSplit rangeSplit = (RangeInputSplit) aiSplit;
+        split = rangeSplit;
         Boolean isOffline = rangeSplit.isOffline();
         if (null == isOffline) {
           isOffline = tableConfig.isOfflineScan();
@@ -513,13 +515,13 @@ public abstract class AbstractInputFormat<K,V> implements InputFormat<K,V>
{
 
         try {
           if (isOffline) {
-            scanner = new OfflineScanner(instance, new Credentials(principal, token), split.getTableId(),
authorizations);
+            scanner = new OfflineScanner(instance, new Credentials(principal, token), aiSplit.getTableId(),
authorizations);
           } else if (instance instanceof MockInstance) {
-            scanner = instance.getConnector(principal, token).createScanner(split.getTableName(),
authorizations);
+            scanner = instance.getConnector(principal, token).createScanner(aiSplit.getTableName(),
authorizations);
           } else {
             ClientConfiguration clientConf = getClientConfiguration(job);
             ClientContext context = new ClientContext(instance, new Credentials(principal,
token), clientConf);
-            scanner = new ScannerImpl(context, split.getTableId(), authorizations);
+            scanner = new ScannerImpl(context, aiSplit.getTableId(), authorizations);
           }
           if (isIsolated) {
             log.info("Creating isolated scanner");
@@ -529,7 +531,7 @@ public abstract class AbstractInputFormat<K,V> implements InputFormat<K,V>
{
             log.info("Using local iterators");
             scanner = new ClientSideIteratorScanner(scanner);
           }
-          setupIterators(job, scanner, split.getTableName(), split);
+          setupIterators(job, scanner, aiSplit.getTableName(), aiSplit);
         } catch (Exception e) {
           throw new IOException(e);
         }
@@ -537,15 +539,15 @@ public abstract class AbstractInputFormat<K,V> implements InputFormat<K,V>
{
         scanner.setRange(rangeSplit.getRange());
         scannerBase = scanner;
 
-      } else if (split instanceof BatchInputSplit) {
+      } else if (aiSplit instanceof BatchInputSplit) {
         BatchScanner scanner;
-        BatchInputSplit multiRangeSplit = (BatchInputSplit) split;
+        BatchInputSplit multiRangeSplit = (BatchInputSplit) aiSplit;
 
         try{
           // Note: BatchScanner will use at most one thread per tablet, currently BatchInputSplit
will not span tablets
           int scanThreads = 1;
-          scanner = instance.getConnector(principal, token).createBatchScanner(split.getTableName(),
authorizations, scanThreads);
-          setupIterators(job, scanner, split.getTableName(), split);
+          scanner = instance.getConnector(principal, token).createBatchScanner(aiSplit.getTableName(),
authorizations, scanThreads);
+          setupIterators(job, scanner, aiSplit.getTableName(), aiSplit);
         } catch (Exception e) {
           throw new IOException(e);
         }
@@ -554,10 +556,10 @@ public abstract class AbstractInputFormat<K,V> implements InputFormat<K,V>
{
         scannerBase = scanner;
 
       } else {
-        throw new IllegalArgumentException("Can not initialize from " + split.getClass().toString());
+        throw new IllegalArgumentException("Can not initialize from " + aiSplit.getClass().toString());
       }
 
-      Collection<Pair<Text,Text>> columns = split.getFetchedColumns();
+      Collection<Pair<Text,Text>> columns = aiSplit.getFetchedColumns();
       if (null == columns) {
         columns = tableConfig.getFetchedColumns();
       }
@@ -593,7 +595,7 @@ public abstract class AbstractInputFormat<K,V> implements InputFormat<K,V>
{
     public float getProgress() throws IOException {
       if (numKeysRead > 0 && currentKey == null)
         return 1.0f;
-      return split.getProgress(currentKey);
+      return aiSplit.getProgress(currentKey);
     }
 
     protected Key currentKey = null;
@@ -721,7 +723,7 @@ public abstract class AbstractInputFormat<K,V> implements InputFormat<K,V>
{
               clippedRanges.add(ke.clip(r));
 
             BatchInputSplit split = new BatchInputSplit(tableName, tableId, clippedRanges,
new String[] {location});
-            AccumuloInputSplit.updateSplit(split, instance, tableConfig, principal, token,
auths, logLevel);
+            SplitUtils.updateSplit(split, instance, tableConfig, principal, token, auths,
logLevel);
 
             splits.add(split);
           } else {
@@ -730,7 +732,7 @@ public abstract class AbstractInputFormat<K,V> implements InputFormat<K,V>
{
               if (autoAdjust) {
                 // divide ranges into smaller ranges, based on the tablets
                 RangeInputSplit split = new RangeInputSplit(tableName, tableId, ke.clip(r),
new String[] {location});
-                AccumuloInputSplit.updateSplit(split, instance, tableConfig, principal, token,
auths, logLevel);
+                SplitUtils.updateSplit(split, instance, tableConfig, principal, token, auths,
logLevel);
                 split.setOffline(tableConfig.isOfflineScan());
                 split.setIsolatedScan(tableConfig.shouldUseIsolatedScanners());
                 split.setUsesLocalIterators(tableConfig.shouldUseLocalIterators());
@@ -752,7 +754,7 @@ public abstract class AbstractInputFormat<K,V> implements InputFormat<K,V>
{
       if (!autoAdjust)
         for (Map.Entry<Range,ArrayList<String>> entry : splitsToAdd.entrySet())
{
           RangeInputSplit split = new RangeInputSplit(tableName, tableId, entry.getKey(),
entry.getValue().toArray(new String[0]));
-          AccumuloInputSplit.updateSplit(split, instance, tableConfig, principal, token,
auths, logLevel);
+          SplitUtils.updateSplit(split, instance, tableConfig, principal, token, auths, logLevel);
           split.setOffline(tableConfig.isOfflineScan());
           split.setIsolatedScan(tableConfig.shouldUseIsolatedScanners());
           split.setUsesLocalIterators(tableConfig.shouldUseLocalIterators());

http://git-wip-us.apache.org/repos/asf/accumulo/blob/d1e6e79c/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AbstractInputFormat.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AbstractInputFormat.java
b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AbstractInputFormat.java
index c7a304c..d402bb0 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AbstractInputFormat.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AbstractInputFormat.java
@@ -29,13 +29,14 @@ import java.util.Random;
 
 import org.apache.accumulo.core.client.AccumuloException;
 import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.BatchScanner;
 import org.apache.accumulo.core.client.ClientConfiguration;
 import org.apache.accumulo.core.client.ClientSideIteratorScanner;
 import org.apache.accumulo.core.client.Connector;
 import org.apache.accumulo.core.client.Instance;
 import org.apache.accumulo.core.client.IsolatedScanner;
+import org.apache.accumulo.core.client.IteratorSetting;
 import org.apache.accumulo.core.client.Scanner;
-import org.apache.accumulo.core.client.BatchScanner;
 import org.apache.accumulo.core.client.ScannerBase;
 import org.apache.accumulo.core.client.TableDeletedException;
 import org.apache.accumulo.core.client.TableNotFoundException;
@@ -52,6 +53,7 @@ import org.apache.accumulo.core.client.impl.Tables;
 import org.apache.accumulo.core.client.impl.TabletLocator;
 import org.apache.accumulo.core.client.mapreduce.impl.AccumuloInputSplit;
 import org.apache.accumulo.core.client.mapreduce.impl.BatchInputSplit;
+import org.apache.accumulo.core.client.mapreduce.impl.SplitUtils;
 import org.apache.accumulo.core.client.mapreduce.lib.impl.ConfiguratorBase;
 import org.apache.accumulo.core.client.mapreduce.lib.impl.InputConfigurator;
 import org.apache.accumulo.core.client.mock.MockInstance;
@@ -67,7 +69,6 @@ import org.apache.accumulo.core.master.state.tables.TableState;
 import org.apache.accumulo.core.security.Authorizations;
 import org.apache.accumulo.core.util.Pair;
 import org.apache.accumulo.core.util.UtilWaitThread;
-import org.apache.accumulo.core.client.IteratorSetting;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.InputFormat;
@@ -425,7 +426,8 @@ public abstract class AbstractInputFormat<K,V> extends InputFormat<K,V>
{
     protected long numKeysRead;
     protected Iterator<Map.Entry<Key,Value>> scannerIterator;
     protected ScannerBase scannerBase;
-    protected AccumuloInputSplit split;
+    protected RangeInputSplit split;
+    private AccumuloInputSplit aiSplit;
 
     /**
      * Extracts Iterators settings from the context to be used by RecordReader.
@@ -489,41 +491,42 @@ public abstract class AbstractInputFormat<K,V> extends InputFormat<K,V>
{
     @Override
     public void initialize(InputSplit inSplit, TaskAttemptContext attempt) throws IOException
{
 
-      split = (AccumuloInputSplit) inSplit;
-      log.debug("Initializing input split: " + split.toString());
+      aiSplit = (AccumuloInputSplit) inSplit;
+      log.debug("Initializing input split: " + aiSplit.toString());
 
-      Instance instance = split.getInstance(getClientConfiguration(attempt));
+      Instance instance = aiSplit.getInstance(getClientConfiguration(attempt));
       if (null == instance) {
         instance = getInstance(attempt);
       }
 
-      String principal = split.getPrincipal();
+      String principal = aiSplit.getPrincipal();
       if (null == principal) {
         principal = getPrincipal(attempt);
       }
 
-      AuthenticationToken token = split.getToken();
+      AuthenticationToken token = aiSplit.getToken();
       if (null == token) {
         token = getAuthenticationToken(attempt);
       }
 
-      Authorizations authorizations = split.getAuths();
+      Authorizations authorizations = aiSplit.getAuths();
       if (null == authorizations) {
         authorizations = getScanAuthorizations(attempt);
       }
 
-      String table = split.getTableName();
+      String table = aiSplit.getTableName();
 
       // in case the table name changed, we can still use the previous name for terms of
configuration,
       // but the scanner will use the table id resolved at job setup time
-      InputTableConfig tableConfig = getInputTableConfig(attempt, split.getTableName());
+      InputTableConfig tableConfig = getInputTableConfig(attempt, aiSplit.getTableName());
 
       log.debug("Creating connector with user: " + principal);
       log.debug("Creating scanner for table: " + table);
       log.debug("Authorizations are: " + authorizations);
 
-      if (split instanceof RangeInputSplit) {
-        RangeInputSplit rangeSplit = (RangeInputSplit) split;
+      if (aiSplit instanceof RangeInputSplit) {
+        RangeInputSplit rangeSplit = (RangeInputSplit) aiSplit;
+        split = rangeSplit;
         Scanner scanner;
 
         Boolean isOffline = rangeSplit.isOffline();
@@ -543,13 +546,13 @@ public abstract class AbstractInputFormat<K,V> extends InputFormat<K,V>
{
 
         try {
           if (isOffline) {
-            scanner = new OfflineScanner(instance, new Credentials(principal, token), split.getTableId(),
authorizations);
+            scanner = new OfflineScanner(instance, new Credentials(principal, token), aiSplit.getTableId(),
authorizations);
           } else if (instance instanceof MockInstance) {
-            scanner = instance.getConnector(principal, token).createScanner(split.getTableName(),
authorizations);
+            scanner = instance.getConnector(principal, token).createScanner(aiSplit.getTableName(),
authorizations);
           } else {
             ClientConfiguration clientConf = getClientConfiguration(attempt);
             ClientContext context = new ClientContext(instance, new Credentials(principal,
token), clientConf);
-            scanner = new ScannerImpl(context, split.getTableId(), authorizations);
+            scanner = new ScannerImpl(context, aiSplit.getTableId(), authorizations);
           }
           if (isIsolated) {
             log.info("Creating isolated scanner");
@@ -560,7 +563,7 @@ public abstract class AbstractInputFormat<K,V> extends InputFormat<K,V>
{
             scanner = new ClientSideIteratorScanner(scanner);
           }
 
-          setupIterators(attempt, scanner, split.getTableName(), split);
+          setupIterators(attempt, scanner, aiSplit.getTableName(), aiSplit);
         } catch (Exception e) {
           throw new IOException(e);
         }
@@ -568,16 +571,17 @@ public abstract class AbstractInputFormat<K,V> extends InputFormat<K,V>
{
         scanner.setRange(rangeSplit.getRange());
         scannerBase = scanner;
 
-      } else  if (split instanceof BatchInputSplit) {
-        BatchInputSplit batchSplit = (BatchInputSplit) split;
+      } else  if (aiSplit instanceof BatchInputSplit) {
+        BatchInputSplit batchSplit = (BatchInputSplit) aiSplit;
 
         BatchScanner scanner;
         try{
           // Note: BatchScanner will use at most one thread per tablet, currently BatchInputSplit
will not span tablets
           int scanThreads = 1;
-          scanner = instance.getConnector(principal, token).createBatchScanner(split.getTableName(),
authorizations, scanThreads);
-          setupIterators(attempt, scanner, split.getTableName(), split);
+          scanner = instance.getConnector(principal, token).createBatchScanner(aiSplit.getTableName(),
authorizations, scanThreads);
+          setupIterators(attempt, scanner, aiSplit.getTableName(), aiSplit);
         } catch (Exception e) {
+          e.printStackTrace();
           throw new IOException(e);
         }
 
@@ -585,7 +589,7 @@ public abstract class AbstractInputFormat<K,V> extends InputFormat<K,V>
{
         scannerBase = scanner;
       }
 
-      Collection<Pair<Text,Text>> columns = split.getFetchedColumns();
+      Collection<Pair<Text,Text>> columns = aiSplit.getFetchedColumns();
       if (null == columns) {
         columns = tableConfig.getFetchedColumns();
       }
@@ -616,7 +620,7 @@ public abstract class AbstractInputFormat<K,V> extends InputFormat<K,V>
{
     public float getProgress() throws IOException {
       if (numKeysRead > 0 && currentKey == null)
         return 1.0f;
-      return split.getProgress(currentKey);
+      return aiSplit.getProgress(currentKey);
     }
 
     /**
@@ -767,7 +771,7 @@ public abstract class AbstractInputFormat<K,V> extends InputFormat<K,V>
{
             for(Range r: extentRanges.getValue())
               clippedRanges.add(ke.clip(r));
             BatchInputSplit split = new BatchInputSplit(tableName, tableId, clippedRanges,
new String[] {location});
-            AccumuloInputSplit.updateSplit(split, instance, tableConfig, principal, token,
auths, logLevel);
+            SplitUtils.updateSplit(split, instance, tableConfig, principal, token, auths,
logLevel);
 
             splits.add(split);
           } else {
@@ -776,7 +780,7 @@ public abstract class AbstractInputFormat<K,V> extends InputFormat<K,V>
{
               if (autoAdjust) {
                 // divide ranges into smaller ranges, based on the tablets
                 RangeInputSplit split = new RangeInputSplit(tableName, tableId, ke.clip(r),
new String[] {location});
-                AccumuloInputSplit.updateSplit(split, instance, tableConfig, principal, token,
auths, logLevel);
+                SplitUtils.updateSplit(split, instance, tableConfig, principal, token, auths,
logLevel);
                 split.setOffline(tableConfig.isOfflineScan());
                 split.setIsolatedScan(tableConfig.shouldUseIsolatedScanners());
                 split.setUsesLocalIterators(tableConfig.shouldUseLocalIterators());
@@ -798,7 +802,7 @@ public abstract class AbstractInputFormat<K,V> extends InputFormat<K,V>
{
       if (!autoAdjust)
         for (Map.Entry<Range,ArrayList<String>> entry : splitsToAdd.entrySet())
{
           RangeInputSplit split = new RangeInputSplit(tableName, tableId, entry.getKey(),
entry.getValue().toArray(new String[0]));
-          AccumuloInputSplit.updateSplit(split, instance, tableConfig, principal, token,
auths, logLevel);
+          SplitUtils.updateSplit(split, instance, tableConfig, principal, token, auths, logLevel);
           split.setOffline(tableConfig.isOfflineScan());
           split.setIsolatedScan(tableConfig.shouldUseIsolatedScanners());
           split.setUsesLocalIterators(tableConfig.shouldUseLocalIterators());

http://git-wip-us.apache.org/repos/asf/accumulo/blob/d1e6e79c/core/src/main/java/org/apache/accumulo/core/client/mapreduce/RangeInputSplit.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/RangeInputSplit.java
b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/RangeInputSplit.java
index 6c870a0..9851192 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/RangeInputSplit.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/RangeInputSplit.java
@@ -19,9 +19,9 @@ package org.apache.accumulo.core.client.mapreduce;
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
-import java.util.Arrays;
 
 import org.apache.accumulo.core.client.mapreduce.impl.AccumuloInputSplit;
+import org.apache.accumulo.core.client.mapreduce.impl.SplitUtils;
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.PartialKey;
 import org.apache.accumulo.core.data.Range;
@@ -47,6 +47,7 @@ public class RangeInputSplit extends AccumuloInputSplit {
     this.range = range;
   }
 
+  @Override
   public float getProgress(Key currentKey) {
     if (currentKey == null)
       return 0f;
@@ -55,13 +56,13 @@ public class RangeInputSplit extends AccumuloInputSplit {
       if (range.getStartKey() != null && range.getEndKey() != null) {
         if (range.getStartKey().compareTo(range.getEndKey(), PartialKey.ROW) != 0) {
           // just look at the row progress
-          return getProgress(range.getStartKey().getRowData(), range.getEndKey().getRowData(),
currentKey.getRowData());
+          return SplitUtils.getProgress(range.getStartKey().getRowData(), range.getEndKey().getRowData(),
currentKey.getRowData());
         } else if (range.getStartKey().compareTo(range.getEndKey(), PartialKey.ROW_COLFAM)
!= 0) {
           // just look at the column family progress
-          return getProgress(range.getStartKey().getColumnFamilyData(), range.getEndKey().getColumnFamilyData(),
currentKey.getColumnFamilyData());
+          return SplitUtils.getProgress(range.getStartKey().getColumnFamilyData(), range.getEndKey().getColumnFamilyData(),
currentKey.getColumnFamilyData());
         } else if (range.getStartKey().compareTo(range.getEndKey(), PartialKey.ROW_COLFAM_COLQUAL)
!= 0) {
           // just look at the column qualifier progress
-          return getProgress(range.getStartKey().getColumnQualifierData(), range.getEndKey().getColumnQualifierData(),
currentKey.getColumnQualifierData());
+          return SplitUtils.getProgress(range.getStartKey().getColumnQualifierData(), range.getEndKey().getColumnQualifierData(),
currentKey.getColumnQualifierData());
         }
       }
     }
@@ -124,23 +125,10 @@ public class RangeInputSplit extends AccumuloInputSplit {
     StringBuilder sb = new StringBuilder(256);
     sb.append("RangeInputSplit:");
     sb.append(" Range: ").append(range);
-    sb.append(" Locations: ").append(Arrays.asList(locations));
-    sb.append(" Table: ").append(tableName);
-    sb.append(" TableID: ").append(tableId);
-    sb.append(" InstanceName: ").append(instanceName);
-    sb.append(" zooKeepers: ").append(zooKeepers);
-    sb.append(" principal: ").append(principal);
-    sb.append(" tokenSource: ").append(tokenSource);
-    sb.append(" authenticationToken: ").append(token);
-    sb.append(" authenticationTokenFile: ").append(tokenFile);
-    sb.append(" Authorizations: ").append(auths);
+    sb.append(super.toString());
     sb.append(" offlineScan: ").append(offline);
-    sb.append(" mockInstance: ").append(mockInstance);
     sb.append(" isolatedScan: ").append(isolatedScan);
     sb.append(" localIterators: ").append(localIterators);
-    sb.append(" fetchColumns: ").append(fetchedColumns);
-    sb.append(" iterators: ").append(iterators);
-    sb.append(" logLevel: ").append(level);
     return sb.toString();
   }
 

http://git-wip-us.apache.org/repos/asf/accumulo/blob/d1e6e79c/core/src/main/java/org/apache/accumulo/core/client/mapreduce/impl/AccumuloInputSplit.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/impl/AccumuloInputSplit.java
b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/impl/AccumuloInputSplit.java
index 94d0026..7f83936 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/impl/AccumuloInputSplit.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/impl/AccumuloInputSplit.java
@@ -21,7 +21,6 @@ import static java.nio.charset.StandardCharsets.UTF_8;
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
-import java.math.BigInteger;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -33,18 +32,17 @@ import org.apache.accumulo.core.client.ClientConfiguration;
 import org.apache.accumulo.core.client.Instance;
 import org.apache.accumulo.core.client.IteratorSetting;
 import org.apache.accumulo.core.client.ZooKeeperInstance;
-import org.apache.accumulo.core.client.mapreduce.InputTableConfig;
 import org.apache.accumulo.core.client.mapreduce.lib.impl.ConfiguratorBase.TokenSource;
 import org.apache.accumulo.core.client.mapreduce.lib.impl.InputConfigurator;
 import org.apache.accumulo.core.client.mock.MockInstance;
 import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
 import org.apache.accumulo.core.client.security.tokens.AuthenticationToken.AuthenticationTokenSerializer;
 import org.apache.accumulo.core.data.ByteSequence;
+import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.Range;
 import org.apache.accumulo.core.security.Authorizations;
 import org.apache.accumulo.core.util.Base64;
 import org.apache.accumulo.core.util.Pair;
-import org.apache.accumulo.core.data.Key;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapreduce.InputSplit;
@@ -58,16 +56,16 @@ import org.apache.log4j.Level;
  * @see org.apache.accumulo.core.client.mapreduce.impl.BatchInputSplit
  */
 public abstract class AccumuloInputSplit extends InputSplit implements Writable {
-  protected String[] locations;
-  protected String tableId, tableName, instanceName, zooKeepers, principal;
-  protected TokenSource tokenSource;
-  protected String tokenFile;
-  protected AuthenticationToken token;
-  protected Boolean mockInstance;
-  protected Authorizations auths;
-  protected Set<Pair<Text,Text>> fetchedColumns;
-  protected List<IteratorSetting> iterators;
-  protected Level level;
+  private String[] locations;
+  private String tableId, tableName, instanceName, zooKeepers, principal;
+  private TokenSource tokenSource;
+  private String tokenFile;
+  private AuthenticationToken token;
+  private Boolean mockInstance;
+  private Authorizations auths;
+  private Set<Pair<Text,Text>> fetchedColumns;
+  private List<IteratorSetting> iterators;
+  private Level level;
 
   public abstract float getProgress(Key currentKey);
 
@@ -89,26 +87,7 @@ public abstract class AccumuloInputSplit extends InputSplit implements
Writable
     this.tableId = tableId;
   }
 
-  /**
-   * Central place to set common split configuration not handled by split constructors.
-   * The intention is to make it harder to miss optional setters in future refactor.
-   */
-  public static void updateSplit(AccumuloInputSplit split,  Instance instance, InputTableConfig
tableConfig,
-                                  String principal, AuthenticationToken token, Authorizations
auths, Level logLevel) {
-    split.setInstanceName(instance.getInstanceName());
-    split.setZooKeepers(instance.getZooKeepers());
-    split.setMockInstance(instance instanceof MockInstance);
-
-    split.setPrincipal(principal);
-    split.setToken(token);
-    split.setAuths(auths);
-
-    split.setFetchedColumns(tableConfig.getFetchedColumns());
-    split.setIterators(tableConfig.getIterators());
-    split.setLogLevel(logLevel);
-  }
-
-  private static byte[] extractBytes(ByteSequence seq, int numBytes) {
+  static byte[] extractBytes(ByteSequence seq, int numBytes) {
     byte[] bytes = new byte[numBytes + 1];
     bytes[0] = 0;
     for (int i = 0; i < numBytes; i++) {
@@ -120,14 +99,6 @@ public abstract class AccumuloInputSplit extends InputSplit implements
Writable
     return bytes;
   }
 
-  public static float getProgress(ByteSequence start, ByteSequence end, ByteSequence position)
{
-    int maxDepth = Math.min(Math.max(end.length(), start.length()), position.length());
-    BigInteger startBI = new BigInteger(extractBytes(start, maxDepth));
-    BigInteger endBI = new BigInteger(extractBytes(end, maxDepth));
-    BigInteger positionBI = new BigInteger(extractBytes(position, maxDepth));
-    return (float) (positionBI.subtract(startBI).doubleValue() / endBI.subtract(startBI).doubleValue());
-  }
-
   public long getRangeLength(Range range) throws IOException {
     Text startRow = range.isInfiniteStartKey() ? new Text(new byte[] {Byte.MIN_VALUE}) :
range.getStartKey().getRow();
     Text stopRow = range.isInfiniteStopKey() ? new Text(new byte[] {Byte.MAX_VALUE}) : range.getEndKey().getRow();
@@ -442,4 +413,24 @@ public abstract class AccumuloInputSplit extends InputSplit implements
Writable
   public void setLogLevel(Level level) {
     this.level = level;
   }
+
+  @Override
+  public String toString(){
+    StringBuilder sb = new StringBuilder(256);
+    sb.append(" Locations: ").append(Arrays.asList(locations));
+    sb.append(" Table: ").append(tableName);
+    sb.append(" TableID: ").append(tableId);
+    sb.append(" InstanceName: ").append(instanceName);
+    sb.append(" zooKeepers: ").append(zooKeepers);
+    sb.append(" principal: ").append(principal);
+    sb.append(" tokenSource: ").append(tokenSource);
+    sb.append(" authenticationToken: ").append(token);
+    sb.append(" authenticationTokenFile: ").append(tokenFile);
+    sb.append(" Authorizations: ").append(auths);
+    sb.append(" mockInstance: ").append(mockInstance);
+    sb.append(" fetchColumns: ").append(fetchedColumns);
+    sb.append(" iterators: ").append(iterators);
+    sb.append(" logLevel: ").append(level);
+    return sb.toString();
+  }
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/d1e6e79c/core/src/main/java/org/apache/accumulo/core/client/mapreduce/impl/BatchInputSplit.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/impl/BatchInputSplit.java
b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/impl/BatchInputSplit.java
index 269622a..24b9ef3 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/impl/BatchInputSplit.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/impl/BatchInputSplit.java
@@ -53,6 +53,7 @@ public class BatchInputSplit extends AccumuloInputSplit {
   /**
    * Save progress on each call to this function, implied by value of currentKey, and return
average ranges in the split
    */
+  @Override
   public float getProgress(Key currentKey) {
     if (null == rangeProgress)
       rangeProgress = new float[ranges.size()];
@@ -70,13 +71,13 @@ public class BatchInputSplit extends AccumuloInputSplit {
           if (range.getStartKey() != null && range.getEndKey() != null) {
             if (range.getStartKey().compareTo(range.getEndKey(), PartialKey.ROW) != 0) {
               // just look at the row progress
-              rangeProgress[i] = getProgress(range.getStartKey().getRowData(), range.getEndKey().getRowData(),
currentKey.getRowData());
+              rangeProgress[i] = SplitUtils.getProgress(range.getStartKey().getRowData(),
range.getEndKey().getRowData(), currentKey.getRowData());
             } else if (range.getStartKey().compareTo(range.getEndKey(), PartialKey.ROW_COLFAM)
!= 0) {
               // just look at the column family progress
-              rangeProgress[i] = getProgress(range.getStartKey().getColumnFamilyData(), range.getEndKey().getColumnFamilyData(),
currentKey.getColumnFamilyData());
+              rangeProgress[i] = SplitUtils.getProgress(range.getStartKey().getColumnFamilyData(),
range.getEndKey().getColumnFamilyData(), currentKey.getColumnFamilyData());
             } else if (range.getStartKey().compareTo(range.getEndKey(), PartialKey.ROW_COLFAM_COLQUAL)
!= 0) {
               // just look at the column qualifier progress
-              rangeProgress[i] = getProgress(range.getStartKey().getColumnQualifierData(),
range.getEndKey().getColumnQualifierData(), currentKey.getColumnQualifierData());
+              rangeProgress[i] = SplitUtils.getProgress(range.getStartKey().getColumnQualifierData(),
range.getEndKey().getColumnQualifierData(), currentKey.getColumnQualifierData());
             }
           }
           total += rangeProgress[i];
@@ -126,19 +127,7 @@ public class BatchInputSplit extends AccumuloInputSplit {
     StringBuilder sb = new StringBuilder(256);
     sb.append("BatchInputSplit:");
     sb.append(" Ranges: ").append(Arrays.asList(ranges));
-    sb.append(" Location: ").append(Arrays.asList(locations));
-    sb.append(" Table: ").append(tableName);
-    sb.append(" TableID: ").append(tableId);
-    sb.append(" InstanceName: ").append(instanceName);
-    sb.append(" zooKeepers: ").append(zooKeepers);
-    sb.append(" principal: ").append(principal);
-    sb.append(" tokenSource: ").append(tokenSource);
-    sb.append(" authenticationToken: ").append(token);
-    sb.append(" authenticationTokenFile: ").append(tokenFile);
-    sb.append(" Authorizations: ").append(auths);
-    sb.append(" fetchColumns: ").append(fetchedColumns);
-    sb.append(" iterators: ").append(iterators);
-    sb.append(" logLevel: ").append(level);
+    sb.append(super.toString());
     return sb.toString();
   }
 

http://git-wip-us.apache.org/repos/asf/accumulo/blob/d1e6e79c/core/src/main/java/org/apache/accumulo/core/client/mapreduce/impl/SplitUtils.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/impl/SplitUtils.java
b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/impl/SplitUtils.java
new file mode 100644
index 0000000..0aee665
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/impl/SplitUtils.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.accumulo.core.client.mapreduce.impl;
+
+import java.math.BigInteger;
+
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.client.mapreduce.InputTableConfig;
+import org.apache.accumulo.core.client.mock.MockInstance;
+import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
+import org.apache.accumulo.core.data.ByteSequence;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.log4j.Level;
+
+public class SplitUtils {
+
+  /**
+   * Central place to set common split configuration not handled by split constructors.
+   * The intention is to make it harder to miss optional setters in future refactor.
+   */
+  public static void updateSplit(AccumuloInputSplit split,  Instance instance, InputTableConfig
tableConfig,
+                                  String principal, AuthenticationToken token, Authorizations
auths, Level logLevel) {
+    split.setInstanceName(instance.getInstanceName());
+    split.setZooKeepers(instance.getZooKeepers());
+    split.setMockInstance(instance instanceof MockInstance);
+
+    split.setPrincipal(principal);
+    split.setToken(token);
+    split.setAuths(auths);
+
+    split.setFetchedColumns(tableConfig.getFetchedColumns());
+    split.setIterators(tableConfig.getIterators());
+    split.setLogLevel(logLevel);
+  }
+
+  public static float getProgress(ByteSequence start, ByteSequence end, ByteSequence position)
{
+    int maxDepth = Math.min(Math.max(end.length(), start.length()), position.length());
+    BigInteger startBI = new BigInteger(AccumuloInputSplit.extractBytes(start, maxDepth));
+    BigInteger endBI = new BigInteger(AccumuloInputSplit.extractBytes(end, maxDepth));
+    BigInteger positionBI = new BigInteger(AccumuloInputSplit.extractBytes(position, maxDepth));
+    return (float) (positionBI.subtract(startBI).doubleValue() / endBI.subtract(startBI).doubleValue());
+  }
+
+}


Mime
View raw message