accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ktur...@apache.org
Subject [3/3] git commit: ACCUMULO-1732 Input format changes : used table id exclusively, fixed issues with table propagation, removed some duplicated code
Date Wed, 23 Oct 2013 02:42:08 GMT
ACCUMULO-1732 Input format changes : used table id exclusively, fixed issues with table propagation,
removed some duplicated code


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/7f6e5122
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/7f6e5122
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/7f6e5122

Branch: refs/heads/master
Commit: 7f6e512278a365c9bbb525c8d8fee57e5d573d24
Parents: 839d689
Author: Keith Turner <kturner@apache.org>
Authored: Tue Oct 22 22:38:50 2013 -0400
Committer: Keith Turner <kturner@apache.org>
Committed: Tue Oct 22 22:40:24 2013 -0400

----------------------------------------------------------------------
 .../core/client/mapred/AbstractInputFormat.java | 138 +++----------------
 .../client/mapreduce/AbstractInputFormat.java   | 133 +++---------------
 .../mapreduce/lib/util/InputConfigurator.java   | 116 +++++++++++++++-
 .../simple/mapreduce/UniqueColumns.java         |   7 +-
 4 files changed, 152 insertions(+), 242 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/7f6e5122/core/src/main/java/org/apache/accumulo/core/client/mapred/AbstractInputFormat.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/mapred/AbstractInputFormat.java
b/core/src/main/java/org/apache/accumulo/core/client/mapred/AbstractInputFormat.java
index d474c85..c89c5d7 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/mapred/AbstractInputFormat.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/mapred/AbstractInputFormat.java
@@ -31,12 +31,12 @@ import org.apache.accumulo.core.client.ClientSideIteratorScanner;
 import org.apache.accumulo.core.client.Connector;
 import org.apache.accumulo.core.client.Instance;
 import org.apache.accumulo.core.client.IsolatedScanner;
-import org.apache.accumulo.core.client.RowIterator;
 import org.apache.accumulo.core.client.Scanner;
 import org.apache.accumulo.core.client.TableDeletedException;
 import org.apache.accumulo.core.client.TableNotFoundException;
 import org.apache.accumulo.core.client.TableOfflineException;
 import org.apache.accumulo.core.client.impl.OfflineScanner;
+import org.apache.accumulo.core.client.impl.ScannerImpl;
 import org.apache.accumulo.core.client.impl.Tables;
 import org.apache.accumulo.core.client.impl.TabletLocator;
 import org.apache.accumulo.core.client.mapreduce.BatchScanConfig;
@@ -45,12 +45,9 @@ import org.apache.accumulo.core.client.mock.MockInstance;
 import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.KeyExtent;
-import org.apache.accumulo.core.data.PartialKey;
 import org.apache.accumulo.core.data.Range;
 import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.master.state.tables.TableState;
-import org.apache.accumulo.core.metadata.MetadataTable;
-import org.apache.accumulo.core.metadata.schema.MetadataSchema;
 import org.apache.accumulo.core.security.Authorizations;
 import org.apache.accumulo.core.security.Credentials;
 import org.apache.accumulo.core.util.Pair;
@@ -276,8 +273,8 @@ public abstract class AbstractInputFormat<K,V> implements InputFormat<K,V>
{
    *           if the table name set on the configuration doesn't exist
    * @since 1.6.0
    */
-  protected static TabletLocator getTabletLocator(JobConf job, String tableName) throws TableNotFoundException
{
-    return InputConfigurator.getTabletLocator(CLASS, job, tableName);
+  protected static TabletLocator getTabletLocator(JobConf job, String tableId) throws TableNotFoundException
{
+    return InputConfigurator.getTabletLocator(CLASS, job, tableId);
   }
 
   // InputFormat doesn't have the equivalent of OutputFormat's checkOutputSpecs(JobContext
job)
@@ -359,34 +356,23 @@ public abstract class AbstractInputFormat<K,V> implements InputFormat<K,V>
{
       split = (RangeInputSplit) inSplit;
       log.debug("Initializing input split: " + split.getRange());
       Instance instance = getInstance(job);
-      String user = getPrincipal(job);
+      String principal = getPrincipal(job);
       AuthenticationToken token = getAuthenticationToken(job);
       Authorizations authorizations = getScanAuthorizations(job);
 
+      // in case the table name changed, we can still use the previous name for terms of
configuration,
+      // but the scanner will use the table id resolved at job setup time
       BatchScanConfig tableConfig = getBatchScanConfig(job, split.getTableName());
 
-      // in case the table name changed, we can still use the previous name for terms of
configuration,
-      // but for the scanner, we'll need to reference the new table name.
-      String actualNameForId = split.getTableName();
-      if (!(instance instanceof MockInstance)) {
-        try {
-          actualNameForId = Tables.getTableName(instance, split.getTableId());
-          if (!actualNameForId.equals(split.getTableName()))
-            log.debug("Table name changed from " + split.getTableName() + " to " + actualNameForId);
-        } catch (TableNotFoundException e) {
-          throw new IOException("The specified table was not found for id=" + split.getTableId());
-        }
-      }
 
       try {
-        log.debug("Creating connector with user: " + user);
-        Connector conn = instance.getConnector(user, token);
+        log.debug("Creating connector with user: " + principal);
         log.debug("Creating scanner for table: " + split.getTableName());
         log.debug("Authorizations are: " + authorizations);
         if (tableConfig.isOfflineScan()) {
-          scanner = new OfflineScanner(instance, new Credentials(user, token), split.getTableId(),
authorizations);
+          scanner = new OfflineScanner(instance, new Credentials(principal, token), split.getTableId(),
authorizations);
         } else {
-          scanner = conn.createScanner(actualNameForId, authorizations);
+          scanner = new ScannerImpl(instance, new Credentials(principal, token), split.getTableId(),
authorizations);
         }
         if (tableConfig.shouldUseIsolatedScanners()) {
           log.info("Creating isolated scanner");
@@ -396,7 +382,7 @@ public abstract class AbstractInputFormat<K,V> implements InputFormat<K,V>
{
           log.info("Using local iterators");
           scanner = new ClientSideIteratorScanner(scanner);
         }
-        setupIterators(job, scanner, split.getTableName());
+        setupIterators(job, scanner, split.getTableId());
       } catch (Exception e) {
         throw new IOException(e);
       }
@@ -439,102 +425,13 @@ public abstract class AbstractInputFormat<K,V> implements InputFormat<K,V>
{
 
   }
 
-  Map<String,Map<KeyExtent,List<Range>>> binOfflineTable(JobConf job, String
tableName, List<Range> ranges) throws TableNotFoundException, AccumuloException,
+  Map<String,Map<KeyExtent,List<Range>>> binOfflineTable(JobConf job, String
tableId, List<Range> ranges) throws TableNotFoundException, AccumuloException,
       AccumuloSecurityException {
 
-    Map<String,Map<KeyExtent,List<Range>>> binnedRanges = new HashMap<String,Map<KeyExtent,List<Range>>>();
-
     Instance instance = getInstance(job);
     Connector conn = instance.getConnector(getPrincipal(job), getAuthenticationToken(job));
-    String tableId = Tables.getTableId(instance, tableName);
-
-    if (Tables.getTableState(instance, tableId) != TableState.OFFLINE) {
-      Tables.clearCache(instance);
-      if (Tables.getTableState(instance, tableId) != TableState.OFFLINE) {
-        throw new AccumuloException("Table is online " + tableName + "(" + tableId + ") cannot
scan table in offline mode ");
-      }
-    }
-
-    for (Range range : ranges) {
-      Text startRow;
-
-      if (range.getStartKey() != null)
-        startRow = range.getStartKey().getRow();
-      else
-        startRow = new Text();
-
-      Range metadataRange = new Range(new KeyExtent(new Text(tableId), startRow, null).getMetadataEntry(),
true, null, false);
-      Scanner scanner = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
-      MetadataSchema.TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN.fetch(scanner);
-      scanner.fetchColumnFamily(MetadataSchema.TabletsSection.LastLocationColumnFamily.NAME);
-      scanner.fetchColumnFamily(MetadataSchema.TabletsSection.CurrentLocationColumnFamily.NAME);
-      scanner.fetchColumnFamily(MetadataSchema.TabletsSection.FutureLocationColumnFamily.NAME);
-      scanner.setRange(metadataRange);
-
-      RowIterator rowIter = new RowIterator(scanner);
-
-      KeyExtent lastExtent = null;
-
-      while (rowIter.hasNext()) {
-        Iterator<Map.Entry<Key,Value>> row = rowIter.next();
-        String last = "";
-        KeyExtent extent = null;
-        String location = null;
-
-        while (row.hasNext()) {
-          Map.Entry<Key,Value> entry = row.next();
-          Key key = entry.getKey();
-
-          if (key.getColumnFamily().equals(MetadataSchema.TabletsSection.LastLocationColumnFamily.NAME))
{
-            last = entry.getValue().toString();
-          }
-
-          if (key.getColumnFamily().equals(MetadataSchema.TabletsSection.CurrentLocationColumnFamily.NAME)
-              || key.getColumnFamily().equals(MetadataSchema.TabletsSection.FutureLocationColumnFamily.NAME))
{
-            location = entry.getValue().toString();
-          }
-
-          if (MetadataSchema.TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN.hasColumns(key))
{
-            extent = new KeyExtent(key.getRow(), entry.getValue());
-          }
-
-        }
-
-        if (location != null)
-          return null;
-
-        if (!extent.getTableId().toString().equals(tableId)) {
-          throw new AccumuloException("Saw unexpected table Id " + tableId + " " + extent);
-        }
-
-        if (lastExtent != null && !extent.isPreviousExtent(lastExtent)) {
-          throw new AccumuloException(" " + lastExtent + " is not previous extent " + extent);
-        }
-
-        Map<KeyExtent,List<Range>> tabletRanges = binnedRanges.get(last);
-        if (tabletRanges == null) {
-          tabletRanges = new HashMap<KeyExtent,List<Range>>();
-          binnedRanges.put(last, tabletRanges);
-        }
-
-        List<Range> rangeList = tabletRanges.get(extent);
-        if (rangeList == null) {
-          rangeList = new ArrayList<Range>();
-          tabletRanges.put(extent, rangeList);
-        }
-
-        rangeList.add(range);
-
-        if (extent.getEndRow() == null || range.afterEndKey(new Key(extent.getEndRow()).followingKey(PartialKey.ROW)))
{
-          break;
-        }
-
-        lastExtent = extent;
-      }
-
-    }
-
-    return binnedRanges;
+    
+    return InputConfigurator.binOffline(tableId, ranges, instance, conn);
   }
 
   /**
@@ -562,16 +459,18 @@ public abstract class AbstractInputFormat<K,V> implements InputFormat<K,V>
{
       Map<String,Map<KeyExtent,List<Range>>> binnedRanges = new HashMap<String,Map<KeyExtent,List<Range>>>();
       TabletLocator tl;
       try {
+        // resolve table name to id once, and use id from this point forward
+        tableId = Tables.getTableId(getInstance(job), tableName);
         if (tableConfig.isOfflineScan()) {
-          binnedRanges = binOfflineTable(job, tableName, ranges);
+          binnedRanges = binOfflineTable(job, tableId, ranges);
           while (binnedRanges == null) {
             // Some tablets were still online, try again
             UtilWaitThread.sleep(100 + (int) (Math.random() * 100)); // sleep randomly between
100 and 200 ms
-            binnedRanges = binOfflineTable(job, tableName, ranges);
+            binnedRanges = binOfflineTable(job, tableId, ranges);
           }
         } else {
           Instance instance = getInstance(job);
-          tl = getTabletLocator(job, tableName);
+          tl = getTabletLocator(job, tableId);
           // its possible that the cache could contain complete, but old information about
a tables tablets... so clear it
           tl.invalidateCache();
           Credentials creds = new Credentials(getPrincipal(job), getAuthenticationToken(job));
@@ -582,7 +481,6 @@ public abstract class AbstractInputFormat<K,V> implements InputFormat<K,V>
{
                 throw new TableDeletedException(tableId);
               if (Tables.getTableState(instance, tableId) == TableState.OFFLINE)
                 throw new TableOfflineException(instance, tableId);
-              tableId = Tables.getTableId(instance, tableName);
             }
             binnedRanges.clear();
             log.warn("Unable to locate bins for specified ranges. Retrying.");

http://git-wip-us.apache.org/repos/asf/accumulo/blob/7f6e5122/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AbstractInputFormat.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AbstractInputFormat.java
b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AbstractInputFormat.java
index 889dcbb..74f8f8b 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AbstractInputFormat.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AbstractInputFormat.java
@@ -35,12 +35,12 @@ import org.apache.accumulo.core.client.ClientSideIteratorScanner;
 import org.apache.accumulo.core.client.Connector;
 import org.apache.accumulo.core.client.Instance;
 import org.apache.accumulo.core.client.IsolatedScanner;
-import org.apache.accumulo.core.client.RowIterator;
 import org.apache.accumulo.core.client.Scanner;
 import org.apache.accumulo.core.client.TableDeletedException;
 import org.apache.accumulo.core.client.TableNotFoundException;
 import org.apache.accumulo.core.client.TableOfflineException;
 import org.apache.accumulo.core.client.impl.OfflineScanner;
+import org.apache.accumulo.core.client.impl.ScannerImpl;
 import org.apache.accumulo.core.client.impl.Tables;
 import org.apache.accumulo.core.client.impl.TabletLocator;
 import org.apache.accumulo.core.client.mapreduce.lib.util.InputConfigurator;
@@ -53,8 +53,6 @@ import org.apache.accumulo.core.data.PartialKey;
 import org.apache.accumulo.core.data.Range;
 import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.master.state.tables.TableState;
-import org.apache.accumulo.core.metadata.MetadataTable;
-import org.apache.accumulo.core.metadata.schema.MetadataSchema;
 import org.apache.accumulo.core.security.Authorizations;
 import org.apache.accumulo.core.security.Credentials;
 import org.apache.accumulo.core.util.Pair;
@@ -374,34 +372,22 @@ public abstract class AbstractInputFormat<K,V> extends InputFormat<K,V>
{
       log.debug("Initializing input split: " + split.getRange());
       Instance instance = getInstance(attempt);
       String principal = getPrincipal(attempt);
+      AuthenticationToken token = getAuthenticationToken(attempt);
+      Authorizations authorizations = getScanAuthorizations(attempt);
 
+      // in case the table name changed, we can still use the previous name for terms of
configuration,
+      // but the scanner will use the table id resolved at job setup time
       BatchScanConfig tableConfig = getBatchScanConfig(attempt, split.getTableName());
 
-      // in case the table name changed, we can still use the previous name for terms of
configuration,
-      // but for the scanner, we'll need to reference the new table name.
-      String actualNameForId = split.getTableName();
-      if (!(instance instanceof MockInstance)) {
-        try {
-          actualNameForId = Tables.getTableName(instance, split.getTableId());
-          if (!actualNameForId.equals(split.getTableName()))
-            log.debug("Table name changed from " + split.getTableName() + " to " + actualNameForId);
-        } catch (TableNotFoundException e) {
-          throw new IOException("The specified table was not found for id=" + split.getTableId());
-        }
-      }
 
-      AuthenticationToken token = getAuthenticationToken(attempt);
-      Authorizations authorizations = getScanAuthorizations(attempt);
       try {
         log.debug("Creating connector with user: " + principal);
-
-        Connector conn = instance.getConnector(principal, token);
         log.debug("Creating scanner for table: " + split.getTableName());
         log.debug("Authorizations are: " + authorizations);
         if (tableConfig.isOfflineScan()) {
           scanner = new OfflineScanner(instance, new Credentials(principal, token), split.getTableId(),
authorizations);
         } else {
-          scanner = conn.createScanner(actualNameForId, authorizations);
+          scanner = new ScannerImpl(instance, new Credentials(principal, token), split.getTableId(),
authorizations);
         }
         if (tableConfig.shouldUseIsolatedScanners()) {
           log.info("Creating isolated scanner");
@@ -411,7 +397,7 @@ public abstract class AbstractInputFormat<K,V> extends InputFormat<K,V>
{
           log.info("Using local iterators");
           scanner = new ClientSideIteratorScanner(scanner);
         }
-        setupIterators(attempt, scanner, split.getTableName());
+        setupIterators(attempt, scanner, split.getTableId());
       } catch (Exception e) {
         throw new IOException(e);
       }
@@ -460,102 +446,15 @@ public abstract class AbstractInputFormat<K,V> extends InputFormat<K,V>
{
     }
   }
 
-  Map<String,Map<KeyExtent,List<Range>>> binOfflineTable(JobContext context,
String tableName, List<Range> ranges) throws TableNotFoundException,
+  Map<String,Map<KeyExtent,List<Range>>> binOfflineTable(JobContext context,
String tableId, List<Range> ranges) throws TableNotFoundException,
       AccumuloException, AccumuloSecurityException {
 
-    Map<String,Map<KeyExtent,List<Range>>> binnedRanges = new HashMap<String,Map<KeyExtent,List<Range>>>();
-
     Instance instance = getInstance(context);
     Connector conn = instance.getConnector(getPrincipal(context), getAuthenticationToken(context));
-    String tableId = Tables.getTableId(instance, tableName);
-
-    if (Tables.getTableState(instance, tableId) != TableState.OFFLINE) {
-      Tables.clearCache(instance);
-      if (Tables.getTableState(instance, tableId) != TableState.OFFLINE) {
-        throw new AccumuloException("Table is online " + tableName + "(" + tableId + ") cannot
scan table in offline mode ");
-      }
-    }
-
-    for (Range range : ranges) {
-      Text startRow;
-
-      if (range.getStartKey() != null)
-        startRow = range.getStartKey().getRow();
-      else
-        startRow = new Text();
-
-      Range metadataRange = new Range(new KeyExtent(new Text(tableId), startRow, null).getMetadataEntry(),
true, null, false);
-      Scanner scanner = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
-      MetadataSchema.TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN.fetch(scanner);
-      scanner.fetchColumnFamily(MetadataSchema.TabletsSection.LastLocationColumnFamily.NAME);
-      scanner.fetchColumnFamily(MetadataSchema.TabletsSection.CurrentLocationColumnFamily.NAME);
-      scanner.fetchColumnFamily(MetadataSchema.TabletsSection.FutureLocationColumnFamily.NAME);
-      scanner.setRange(metadataRange);
-
-      RowIterator rowIter = new RowIterator(scanner);
-      KeyExtent lastExtent = null;
-      while (rowIter.hasNext()) {
-        Iterator<Map.Entry<Key,Value>> row = rowIter.next();
-        String last = "";
-        KeyExtent extent = null;
-        String location = null;
-
-        while (row.hasNext()) {
-          Map.Entry<Key,Value> entry = row.next();
-          Key key = entry.getKey();
-
-          if (key.getColumnFamily().equals(MetadataSchema.TabletsSection.LastLocationColumnFamily.NAME))
{
-            last = entry.getValue().toString();
-          }
-
-          if (key.getColumnFamily().equals(MetadataSchema.TabletsSection.CurrentLocationColumnFamily.NAME)
-              || key.getColumnFamily().equals(MetadataSchema.TabletsSection.FutureLocationColumnFamily.NAME))
{
-            location = entry.getValue().toString();
-          }
-
-          if (MetadataSchema.TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN.hasColumns(key))
{
-            extent = new KeyExtent(key.getRow(), entry.getValue());
-          }
 
-        }
-
-        if (location != null)
-          return null;
-
-        if (!extent.getTableId().toString().equals(tableId)) {
-          throw new AccumuloException("Saw unexpected table Id " + tableId + " " + extent);
-        }
-
-        if (lastExtent != null && !extent.isPreviousExtent(lastExtent)) {
-          throw new AccumuloException(" " + lastExtent + " is not previous extent " + extent);
-        }
-
-        Map<KeyExtent,List<Range>> tabletRanges = binnedRanges.get(last);
-        if (tabletRanges == null) {
-          tabletRanges = new HashMap<KeyExtent,List<Range>>();
-          binnedRanges.put(last, tabletRanges);
-        }
-
-        List<Range> rangeList = tabletRanges.get(extent);
-        if (rangeList == null) {
-          rangeList = new ArrayList<Range>();
-          tabletRanges.put(extent, rangeList);
-        }
-
-        rangeList.add(range);
-
-        if (extent.getEndRow() == null || range.afterEndKey(new Key(extent.getEndRow()).followingKey(PartialKey.ROW)))
{
-          break;
-        }
-
-        lastExtent = extent;
-      }
-
-    }
-
-    return binnedRanges;
+    return InputConfigurator.binOffline(tableId, ranges, instance, conn);
   }
-
+  
   /**
    * Gets the splits of the tables that have been set on the job.
    * 
@@ -588,24 +487,25 @@ public abstract class AbstractInputFormat<K,V> extends InputFormat<K,V>
{
       Map<String,Map<KeyExtent,List<Range>>> binnedRanges = new HashMap<String,Map<KeyExtent,List<Range>>>();
       TabletLocator tl;
       try {
+        // resolve table name to id once, and use id from this point forward
+        tableId = Tables.getTableId(getInstance(context), tableName);
         if (tableConfig.isOfflineScan()) {
-          binnedRanges = binOfflineTable(context, tableName, ranges);
+          binnedRanges = binOfflineTable(context, tableId, ranges);
           while (binnedRanges == null) {
             // Some tablets were still online, try again
             UtilWaitThread.sleep(100 + (int) (Math.random() * 100)); // sleep randomly between
100 and 200 ms
-            binnedRanges = binOfflineTable(context, tableName, ranges);
+            binnedRanges = binOfflineTable(context, tableId, ranges);
 
           }
         } else {
           Instance instance = getInstance(context);
-          tl = getTabletLocator(context, tableName);
+          tl = getTabletLocator(context, tableId);
           // its possible that the cache could contain complete, but old information about
a tables tablets... so clear it
           tl.invalidateCache();
           Credentials creds = new Credentials(getPrincipal(context), getAuthenticationToken(context));
 
           while (!tl.binRanges(creds, ranges, binnedRanges).isEmpty()) {
             if (!(instance instanceof MockInstance)) {
-              tableId = Tables.getTableId(instance, tableName);
               if (!Tables.exists(instance, tableId))
                 throw new TableDeletedException(tableId);
               if (Tables.getTableState(instance, tableId) == TableState.OFFLINE)
@@ -680,6 +580,7 @@ public abstract class AbstractInputFormat<K,V> extends InputFormat<K,V>
{
       this.setRange(split.getRange());
       this.setLocations(split.getLocations());
       this.setTableName(split.getTableName());
+      this.setTableId(split.getTableId());
     }
 
     protected RangeInputSplit(String table, String tableId, Range range, String[] locations)
{
@@ -788,6 +689,7 @@ public abstract class AbstractInputFormat<K,V> extends InputFormat<K,V>
{
     public void readFields(DataInput in) throws IOException {
       range.readFields(in);
       tableName = in.readUTF();
+      tableId = in.readUTF();
       int numLocs = in.readInt();
       locations = new String[numLocs];
       for (int i = 0; i < numLocs; ++i)
@@ -798,6 +700,7 @@ public abstract class AbstractInputFormat<K,V> extends InputFormat<K,V>
{
     public void write(DataOutput out) throws IOException {
       range.write(out);
       out.writeUTF(tableName);
+      out.writeUTF(tableId);
       out.writeInt(locations.length);
       for (int i = 0; i < locations.length; ++i)
         out.writeUTF(locations[i]);

http://git-wip-us.apache.org/repos/asf/accumulo/blob/7f6e5122/core/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/util/InputConfigurator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/util/InputConfigurator.java
b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/util/InputConfigurator.java
index 016efa5..f72c081 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/util/InputConfigurator.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/util/InputConfigurator.java
@@ -27,12 +27,13 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.Set;
 import java.util.StringTokenizer;
 
-import com.google.common.collect.Maps;
 import org.apache.accumulo.core.Constants;
 import org.apache.accumulo.core.client.AccumuloException;
 import org.apache.accumulo.core.client.AccumuloSecurityException;
@@ -41,6 +42,7 @@ import org.apache.accumulo.core.client.Connector;
 import org.apache.accumulo.core.client.Instance;
 import org.apache.accumulo.core.client.IsolatedScanner;
 import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.client.RowIterator;
 import org.apache.accumulo.core.client.Scanner;
 import org.apache.accumulo.core.client.TableNotFoundException;
 import org.apache.accumulo.core.client.impl.Tables;
@@ -48,8 +50,20 @@ import org.apache.accumulo.core.client.impl.TabletLocator;
 import org.apache.accumulo.core.client.mapreduce.BatchScanConfig;
 import org.apache.accumulo.core.client.mock.MockTabletLocator;
 import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.KeyExtent;
+import org.apache.accumulo.core.data.PartialKey;
 import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
+import org.apache.accumulo.core.master.state.tables.TableState;
+import org.apache.accumulo.core.metadata.MetadataTable;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.CurrentLocationColumnFamily;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.FutureLocationColumnFamily;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.LastLocationColumnFamily;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.TabletColumnFamily;
 import org.apache.accumulo.core.security.Authorizations;
 import org.apache.accumulo.core.security.TablePermission;
 import org.apache.accumulo.core.util.Pair;
@@ -61,6 +75,8 @@ import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.util.StringUtils;
 
+import com.google.common.collect.Maps;
+
 /**
  * @since 1.5.0
  */
@@ -561,19 +577,19 @@ public class InputConfigurator extends ConfiguratorBase {
    *          the class whose name will be used as a prefix for the property configuration
key
    * @param conf
    *          the Hadoop configuration object to configure
-   * @param tableName
-   *          The table name for which to initialize the {@link TabletLocator}
+   * @param tableId
+   *          The table id for which to initialize the {@link TabletLocator}
    * @return an Accumulo tablet locator
    * @throws TableNotFoundException
    *           if the table name set on the configuration doesn't exist
    * @since 1.5.0
    */
-  public static TabletLocator getTabletLocator(Class<?> implementingClass, Configuration
conf, String tableName) throws TableNotFoundException {
+  public static TabletLocator getTabletLocator(Class<?> implementingClass, Configuration
conf, String tableId) throws TableNotFoundException {
     String instanceType = conf.get(enumToConfKey(implementingClass, InstanceOpts.TYPE));
     if ("MockInstance".equals(instanceType))
       return new MockTabletLocator();
     Instance instance = getInstance(implementingClass, conf);
-    return TabletLocator.getLocator(instance, new Text(Tables.getTableId(instance, tableName)));
+    return TabletLocator.getLocator(instance, new Text(tableId));
   }
 
   // InputFormat doesn't have the equivalent of OutputFormat's checkOutputSpecs(JobContext
job)
@@ -668,4 +684,94 @@ public class InputConfigurator extends ConfiguratorBase {
     }
     return null;
   }
+
+  public static Map<String,Map<KeyExtent,List<Range>>> binOffline(String
tableId, List<Range> ranges, Instance instance, Connector conn)
+      throws AccumuloException, TableNotFoundException {
+    Map<String,Map<KeyExtent,List<Range>>> binnedRanges = new HashMap<String,Map<KeyExtent,List<Range>>>();
+  
+    if (Tables.getTableState(instance, tableId) != TableState.OFFLINE) {
+      Tables.clearCache(instance);
+      if (Tables.getTableState(instance, tableId) != TableState.OFFLINE) {
+        throw new AccumuloException("Table is online tableId:" + tableId + " cannot scan
table in offline mode ");
+      }
+    }
+  
+    for (Range range : ranges) {
+      Text startRow;
+  
+      if (range.getStartKey() != null)
+        startRow = range.getStartKey().getRow();
+      else
+        startRow = new Text();
+  
+      Range metadataRange = new Range(new KeyExtent(new Text(tableId), startRow, null).getMetadataEntry(),
true, null, false);
+      Scanner scanner = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
+      MetadataSchema.TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN.fetch(scanner);
+      scanner.fetchColumnFamily(MetadataSchema.TabletsSection.LastLocationColumnFamily.NAME);
+      scanner.fetchColumnFamily(MetadataSchema.TabletsSection.CurrentLocationColumnFamily.NAME);
+      scanner.fetchColumnFamily(MetadataSchema.TabletsSection.FutureLocationColumnFamily.NAME);
+      scanner.setRange(metadataRange);
+  
+      RowIterator rowIter = new RowIterator(scanner);
+      KeyExtent lastExtent = null;
+      while (rowIter.hasNext()) {
+        Iterator<Map.Entry<Key,Value>> row = rowIter.next();
+        String last = "";
+        KeyExtent extent = null;
+        String location = null;
+  
+        while (row.hasNext()) {
+          Map.Entry<Key,Value> entry = row.next();
+          Key key = entry.getKey();
+  
+          if (key.getColumnFamily().equals(MetadataSchema.TabletsSection.LastLocationColumnFamily.NAME))
{
+            last = entry.getValue().toString();
+          }
+  
+          if (key.getColumnFamily().equals(MetadataSchema.TabletsSection.CurrentLocationColumnFamily.NAME)
+              || key.getColumnFamily().equals(MetadataSchema.TabletsSection.FutureLocationColumnFamily.NAME))
{
+            location = entry.getValue().toString();
+          }
+  
+          if (MetadataSchema.TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN.hasColumns(key))
{
+            extent = new KeyExtent(key.getRow(), entry.getValue());
+          }
+  
+        }
+  
+        if (location != null)
+          return null;
+  
+        if (!extent.getTableId().toString().equals(tableId)) {
+          throw new AccumuloException("Saw unexpected table Id " + tableId + " " + extent);
+        }
+  
+        if (lastExtent != null && !extent.isPreviousExtent(lastExtent)) {
+          throw new AccumuloException(" " + lastExtent + " is not previous extent " + extent);
+        }
+  
+        Map<KeyExtent,List<Range>> tabletRanges = binnedRanges.get(last);
+        if (tabletRanges == null) {
+          tabletRanges = new HashMap<KeyExtent,List<Range>>();
+          binnedRanges.put(last, tabletRanges);
+        }
+  
+        List<Range> rangeList = tabletRanges.get(extent);
+        if (rangeList == null) {
+          rangeList = new ArrayList<Range>();
+          tabletRanges.put(extent, rangeList);
+        }
+  
+        rangeList.add(range);
+  
+        if (extent.getEndRow() == null || range.afterEndKey(new Key(extent.getEndRow()).followingKey(PartialKey.ROW)))
{
+          break;
+        }
+  
+        lastExtent = extent;
+      }
+  
+    }
+    return binnedRanges;
+  }
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/7f6e5122/examples/simple/src/main/java/org/apache/accumulo/examples/simple/mapreduce/UniqueColumns.java
----------------------------------------------------------------------
diff --git a/examples/simple/src/main/java/org/apache/accumulo/examples/simple/mapreduce/UniqueColumns.java
b/examples/simple/src/main/java/org/apache/accumulo/examples/simple/mapreduce/UniqueColumns.java
index 11ddf7b..501b7e6 100644
--- a/examples/simple/src/main/java/org/apache/accumulo/examples/simple/mapreduce/UniqueColumns.java
+++ b/examples/simple/src/main/java/org/apache/accumulo/examples/simple/mapreduce/UniqueColumns.java
@@ -94,6 +94,9 @@ public class UniqueColumns extends Configured implements Tool {
     
     String clone = opts.tableName;
     Connector conn = null;
+    
+    opts.setAccumuloConfigs(job);
+
     if (opts.offline) {
       /*
        * this example clones the table and takes it offline. If you plan to run map reduce
jobs over a table many times, it may be more efficient to compact the
@@ -106,11 +109,11 @@ public class UniqueColumns extends Configured implements Tool {
       conn.tableOperations().offline(clone);
       
       AccumuloInputFormat.setOfflineTableScan(job, true);
+      AccumuloInputFormat.setInputTableName(job, clone);
     }
     
     job.setInputFormatClass(AccumuloInputFormat.class);
-    opts.setAccumuloConfigs(job);
-    
+
     job.setMapperClass(UMapper.class);
     job.setMapOutputKeyClass(Text.class);
     job.setMapOutputValueClass(Text.class);


Mime
View raw message