Return-Path: X-Original-To: apmail-accumulo-commits-archive@www.apache.org Delivered-To: apmail-accumulo-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 0EFB910F01 for ; Wed, 23 Oct 2013 02:42:23 +0000 (UTC) Received: (qmail 87580 invoked by uid 500); 23 Oct 2013 02:42:11 -0000 Delivered-To: apmail-accumulo-commits-archive@accumulo.apache.org Received: (qmail 87546 invoked by uid 500); 23 Oct 2013 02:42:11 -0000 Mailing-List: contact commits-help@accumulo.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@accumulo.apache.org Delivered-To: mailing list commits@accumulo.apache.org Received: (qmail 87495 invoked by uid 99); 23 Oct 2013 02:42:07 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 23 Oct 2013 02:42:07 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 8D61154B98; Wed, 23 Oct 2013 02:42:06 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: kturner@apache.org To: commits@accumulo.apache.org Date: Wed, 23 Oct 2013 02:42:08 -0000 Message-Id: In-Reply-To: <68517587a9a746018f224e3fdd0b3241@git.apache.org> References: <68517587a9a746018f224e3fdd0b3241@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [3/3] git commit: ACCUMULO-1732 Input format changes : used table id exclusively, fixed issues with table propagation, removed some duplicated code ACCUMULO-1732 Input format changes : used table id exclusively, fixed issues with table propagation, removed some duplicated code Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/7f6e5122 Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/7f6e5122 Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/7f6e5122 Branch: refs/heads/master Commit: 7f6e512278a365c9bbb525c8d8fee57e5d573d24 Parents: 839d689 Author: Keith Turner Authored: Tue Oct 22 22:38:50 2013 -0400 Committer: Keith Turner Committed: Tue Oct 22 22:40:24 2013 -0400 ---------------------------------------------------------------------- .../core/client/mapred/AbstractInputFormat.java | 138 +++---------------- .../client/mapreduce/AbstractInputFormat.java | 133 +++--------------- .../mapreduce/lib/util/InputConfigurator.java | 116 +++++++++++++++- .../simple/mapreduce/UniqueColumns.java | 7 +- 4 files changed, 152 insertions(+), 242 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/7f6e5122/core/src/main/java/org/apache/accumulo/core/client/mapred/AbstractInputFormat.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/client/mapred/AbstractInputFormat.java b/core/src/main/java/org/apache/accumulo/core/client/mapred/AbstractInputFormat.java index d474c85..c89c5d7 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/mapred/AbstractInputFormat.java +++ b/core/src/main/java/org/apache/accumulo/core/client/mapred/AbstractInputFormat.java @@ -31,12 +31,12 @@ import org.apache.accumulo.core.client.ClientSideIteratorScanner; import org.apache.accumulo.core.client.Connector; import org.apache.accumulo.core.client.Instance; import org.apache.accumulo.core.client.IsolatedScanner; -import org.apache.accumulo.core.client.RowIterator; import org.apache.accumulo.core.client.Scanner; import org.apache.accumulo.core.client.TableDeletedException; import org.apache.accumulo.core.client.TableNotFoundException; import org.apache.accumulo.core.client.TableOfflineException; import org.apache.accumulo.core.client.impl.OfflineScanner; +import org.apache.accumulo.core.client.impl.ScannerImpl; import org.apache.accumulo.core.client.impl.Tables; import org.apache.accumulo.core.client.impl.TabletLocator; import org.apache.accumulo.core.client.mapreduce.BatchScanConfig; @@ -45,12 +45,9 @@ import org.apache.accumulo.core.client.mock.MockInstance; import org.apache.accumulo.core.client.security.tokens.AuthenticationToken; import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.KeyExtent; -import org.apache.accumulo.core.data.PartialKey; import org.apache.accumulo.core.data.Range; import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.master.state.tables.TableState; -import org.apache.accumulo.core.metadata.MetadataTable; -import org.apache.accumulo.core.metadata.schema.MetadataSchema; import org.apache.accumulo.core.security.Authorizations; import org.apache.accumulo.core.security.Credentials; import org.apache.accumulo.core.util.Pair; @@ -276,8 +273,8 @@ public abstract class AbstractInputFormat implements InputFormat { * if the table name set on the configuration doesn't exist * @since 1.6.0 */ - protected static TabletLocator getTabletLocator(JobConf job, String tableName) throws TableNotFoundException { - return InputConfigurator.getTabletLocator(CLASS, job, tableName); + protected static TabletLocator getTabletLocator(JobConf job, String tableId) throws TableNotFoundException { + return InputConfigurator.getTabletLocator(CLASS, job, tableId); } // InputFormat doesn't have the equivalent of OutputFormat's checkOutputSpecs(JobContext job) @@ -359,34 +356,23 @@ public abstract class AbstractInputFormat implements InputFormat { split = (RangeInputSplit) inSplit; log.debug("Initializing input split: " + split.getRange()); Instance instance = getInstance(job); - String user = getPrincipal(job); + String principal = getPrincipal(job); AuthenticationToken token = getAuthenticationToken(job); Authorizations authorizations = getScanAuthorizations(job); + // in case the table name changed, we can still use the previous name for terms of configuration, + // but the scanner will use the table id resolved at job setup time BatchScanConfig tableConfig = getBatchScanConfig(job, split.getTableName()); - // in case the table name changed, we can still use the previous name for terms of configuration, - // but for the scanner, we'll need to reference the new table name. - String actualNameForId = split.getTableName(); - if (!(instance instanceof MockInstance)) { - try { - actualNameForId = Tables.getTableName(instance, split.getTableId()); - if (!actualNameForId.equals(split.getTableName())) - log.debug("Table name changed from " + split.getTableName() + " to " + actualNameForId); - } catch (TableNotFoundException e) { - throw new IOException("The specified table was not found for id=" + split.getTableId()); - } - } try { - log.debug("Creating connector with user: " + user); - Connector conn = instance.getConnector(user, token); + log.debug("Creating connector with user: " + principal); log.debug("Creating scanner for table: " + split.getTableName()); log.debug("Authorizations are: " + authorizations); if (tableConfig.isOfflineScan()) { - scanner = new OfflineScanner(instance, new Credentials(user, token), split.getTableId(), authorizations); + scanner = new OfflineScanner(instance, new Credentials(principal, token), split.getTableId(), authorizations); } else { - scanner = conn.createScanner(actualNameForId, authorizations); + scanner = new ScannerImpl(instance, new Credentials(principal, token), split.getTableId(), authorizations); } if (tableConfig.shouldUseIsolatedScanners()) { log.info("Creating isolated scanner"); @@ -396,7 +382,7 @@ public abstract class AbstractInputFormat implements InputFormat { log.info("Using local iterators"); scanner = new ClientSideIteratorScanner(scanner); } - setupIterators(job, scanner, split.getTableName()); + setupIterators(job, scanner, split.getTableId()); } catch (Exception e) { throw new IOException(e); } @@ -439,102 +425,13 @@ public abstract class AbstractInputFormat implements InputFormat { } - Map>> binOfflineTable(JobConf job, String tableName, List ranges) throws TableNotFoundException, AccumuloException, + Map>> binOfflineTable(JobConf job, String tableId, List ranges) throws TableNotFoundException, AccumuloException, AccumuloSecurityException { - Map>> binnedRanges = new HashMap>>(); - Instance instance = getInstance(job); Connector conn = instance.getConnector(getPrincipal(job), getAuthenticationToken(job)); - String tableId = Tables.getTableId(instance, tableName); - - if (Tables.getTableState(instance, tableId) != TableState.OFFLINE) { - Tables.clearCache(instance); - if (Tables.getTableState(instance, tableId) != TableState.OFFLINE) { - throw new AccumuloException("Table is online " + tableName + "(" + tableId + ") cannot scan table in offline mode "); - } - } - - for (Range range : ranges) { - Text startRow; - - if (range.getStartKey() != null) - startRow = range.getStartKey().getRow(); - else - startRow = new Text(); - - Range metadataRange = new Range(new KeyExtent(new Text(tableId), startRow, null).getMetadataEntry(), true, null, false); - Scanner scanner = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY); - MetadataSchema.TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN.fetch(scanner); - scanner.fetchColumnFamily(MetadataSchema.TabletsSection.LastLocationColumnFamily.NAME); - scanner.fetchColumnFamily(MetadataSchema.TabletsSection.CurrentLocationColumnFamily.NAME); - scanner.fetchColumnFamily(MetadataSchema.TabletsSection.FutureLocationColumnFamily.NAME); - scanner.setRange(metadataRange); - - RowIterator rowIter = new RowIterator(scanner); - - KeyExtent lastExtent = null; - - while (rowIter.hasNext()) { - Iterator> row = rowIter.next(); - String last = ""; - KeyExtent extent = null; - String location = null; - - while (row.hasNext()) { - Map.Entry entry = row.next(); - Key key = entry.getKey(); - - if (key.getColumnFamily().equals(MetadataSchema.TabletsSection.LastLocationColumnFamily.NAME)) { - last = entry.getValue().toString(); - } - - if (key.getColumnFamily().equals(MetadataSchema.TabletsSection.CurrentLocationColumnFamily.NAME) - || key.getColumnFamily().equals(MetadataSchema.TabletsSection.FutureLocationColumnFamily.NAME)) { - location = entry.getValue().toString(); - } - - if (MetadataSchema.TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN.hasColumns(key)) { - extent = new KeyExtent(key.getRow(), entry.getValue()); - } - - } - - if (location != null) - return null; - - if (!extent.getTableId().toString().equals(tableId)) { - throw new AccumuloException("Saw unexpected table Id " + tableId + " " + extent); - } - - if (lastExtent != null && !extent.isPreviousExtent(lastExtent)) { - throw new AccumuloException(" " + lastExtent + " is not previous extent " + extent); - } - - Map> tabletRanges = binnedRanges.get(last); - if (tabletRanges == null) { - tabletRanges = new HashMap>(); - binnedRanges.put(last, tabletRanges); - } - - List rangeList = tabletRanges.get(extent); - if (rangeList == null) { - rangeList = new ArrayList(); - tabletRanges.put(extent, rangeList); - } - - rangeList.add(range); - - if (extent.getEndRow() == null || range.afterEndKey(new Key(extent.getEndRow()).followingKey(PartialKey.ROW))) { - break; - } - - lastExtent = extent; - } - - } - - return binnedRanges; + + return InputConfigurator.binOffline(tableId, ranges, instance, conn); } /** @@ -562,16 +459,18 @@ public abstract class AbstractInputFormat implements InputFormat { Map>> binnedRanges = new HashMap>>(); TabletLocator tl; try { + // resolve table name to id once, and use id from this point forward + tableId = Tables.getTableId(getInstance(job), tableName); if (tableConfig.isOfflineScan()) { - binnedRanges = binOfflineTable(job, tableName, ranges); + binnedRanges = binOfflineTable(job, tableId, ranges); while (binnedRanges == null) { // Some tablets were still online, try again UtilWaitThread.sleep(100 + (int) (Math.random() * 100)); // sleep randomly between 100 and 200 ms - binnedRanges = binOfflineTable(job, tableName, ranges); + binnedRanges = binOfflineTable(job, tableId, ranges); } } else { Instance instance = getInstance(job); - tl = getTabletLocator(job, tableName); + tl = getTabletLocator(job, tableId); // its possible that the cache could contain complete, but old information about a tables tablets... so clear it tl.invalidateCache(); Credentials creds = new Credentials(getPrincipal(job), getAuthenticationToken(job)); @@ -582,7 +481,6 @@ public abstract class AbstractInputFormat implements InputFormat { throw new TableDeletedException(tableId); if (Tables.getTableState(instance, tableId) == TableState.OFFLINE) throw new TableOfflineException(instance, tableId); - tableId = Tables.getTableId(instance, tableName); } binnedRanges.clear(); log.warn("Unable to locate bins for specified ranges. Retrying."); http://git-wip-us.apache.org/repos/asf/accumulo/blob/7f6e5122/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AbstractInputFormat.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AbstractInputFormat.java b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AbstractInputFormat.java index 889dcbb..74f8f8b 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AbstractInputFormat.java +++ b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AbstractInputFormat.java @@ -35,12 +35,12 @@ import org.apache.accumulo.core.client.ClientSideIteratorScanner; import org.apache.accumulo.core.client.Connector; import org.apache.accumulo.core.client.Instance; import org.apache.accumulo.core.client.IsolatedScanner; -import org.apache.accumulo.core.client.RowIterator; import org.apache.accumulo.core.client.Scanner; import org.apache.accumulo.core.client.TableDeletedException; import org.apache.accumulo.core.client.TableNotFoundException; import org.apache.accumulo.core.client.TableOfflineException; import org.apache.accumulo.core.client.impl.OfflineScanner; +import org.apache.accumulo.core.client.impl.ScannerImpl; import org.apache.accumulo.core.client.impl.Tables; import org.apache.accumulo.core.client.impl.TabletLocator; import org.apache.accumulo.core.client.mapreduce.lib.util.InputConfigurator; @@ -53,8 +53,6 @@ import org.apache.accumulo.core.data.PartialKey; import org.apache.accumulo.core.data.Range; import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.master.state.tables.TableState; -import org.apache.accumulo.core.metadata.MetadataTable; -import org.apache.accumulo.core.metadata.schema.MetadataSchema; import org.apache.accumulo.core.security.Authorizations; import org.apache.accumulo.core.security.Credentials; import org.apache.accumulo.core.util.Pair; @@ -374,34 +372,22 @@ public abstract class AbstractInputFormat extends InputFormat { log.debug("Initializing input split: " + split.getRange()); Instance instance = getInstance(attempt); String principal = getPrincipal(attempt); + AuthenticationToken token = getAuthenticationToken(attempt); + Authorizations authorizations = getScanAuthorizations(attempt); + // in case the table name changed, we can still use the previous name for terms of configuration, + // but the scanner will use the table id resolved at job setup time BatchScanConfig tableConfig = getBatchScanConfig(attempt, split.getTableName()); - // in case the table name changed, we can still use the previous name for terms of configuration, - // but for the scanner, we'll need to reference the new table name. - String actualNameForId = split.getTableName(); - if (!(instance instanceof MockInstance)) { - try { - actualNameForId = Tables.getTableName(instance, split.getTableId()); - if (!actualNameForId.equals(split.getTableName())) - log.debug("Table name changed from " + split.getTableName() + " to " + actualNameForId); - } catch (TableNotFoundException e) { - throw new IOException("The specified table was not found for id=" + split.getTableId()); - } - } - AuthenticationToken token = getAuthenticationToken(attempt); - Authorizations authorizations = getScanAuthorizations(attempt); try { log.debug("Creating connector with user: " + principal); - - Connector conn = instance.getConnector(principal, token); log.debug("Creating scanner for table: " + split.getTableName()); log.debug("Authorizations are: " + authorizations); if (tableConfig.isOfflineScan()) { scanner = new OfflineScanner(instance, new Credentials(principal, token), split.getTableId(), authorizations); } else { - scanner = conn.createScanner(actualNameForId, authorizations); + scanner = new ScannerImpl(instance, new Credentials(principal, token), split.getTableId(), authorizations); } if (tableConfig.shouldUseIsolatedScanners()) { log.info("Creating isolated scanner"); @@ -411,7 +397,7 @@ public abstract class AbstractInputFormat extends InputFormat { log.info("Using local iterators"); scanner = new ClientSideIteratorScanner(scanner); } - setupIterators(attempt, scanner, split.getTableName()); + setupIterators(attempt, scanner, split.getTableId()); } catch (Exception e) { throw new IOException(e); } @@ -460,102 +446,15 @@ public abstract class AbstractInputFormat extends InputFormat { } } - Map>> binOfflineTable(JobContext context, String tableName, List ranges) throws TableNotFoundException, + Map>> binOfflineTable(JobContext context, String tableId, List ranges) throws TableNotFoundException, AccumuloException, AccumuloSecurityException { - Map>> binnedRanges = new HashMap>>(); - Instance instance = getInstance(context); Connector conn = instance.getConnector(getPrincipal(context), getAuthenticationToken(context)); - String tableId = Tables.getTableId(instance, tableName); - - if (Tables.getTableState(instance, tableId) != TableState.OFFLINE) { - Tables.clearCache(instance); - if (Tables.getTableState(instance, tableId) != TableState.OFFLINE) { - throw new AccumuloException("Table is online " + tableName + "(" + tableId + ") cannot scan table in offline mode "); - } - } - - for (Range range : ranges) { - Text startRow; - - if (range.getStartKey() != null) - startRow = range.getStartKey().getRow(); - else - startRow = new Text(); - - Range metadataRange = new Range(new KeyExtent(new Text(tableId), startRow, null).getMetadataEntry(), true, null, false); - Scanner scanner = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY); - MetadataSchema.TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN.fetch(scanner); - scanner.fetchColumnFamily(MetadataSchema.TabletsSection.LastLocationColumnFamily.NAME); - scanner.fetchColumnFamily(MetadataSchema.TabletsSection.CurrentLocationColumnFamily.NAME); - scanner.fetchColumnFamily(MetadataSchema.TabletsSection.FutureLocationColumnFamily.NAME); - scanner.setRange(metadataRange); - - RowIterator rowIter = new RowIterator(scanner); - KeyExtent lastExtent = null; - while (rowIter.hasNext()) { - Iterator> row = rowIter.next(); - String last = ""; - KeyExtent extent = null; - String location = null; - - while (row.hasNext()) { - Map.Entry entry = row.next(); - Key key = entry.getKey(); - - if (key.getColumnFamily().equals(MetadataSchema.TabletsSection.LastLocationColumnFamily.NAME)) { - last = entry.getValue().toString(); - } - - if (key.getColumnFamily().equals(MetadataSchema.TabletsSection.CurrentLocationColumnFamily.NAME) - || key.getColumnFamily().equals(MetadataSchema.TabletsSection.FutureLocationColumnFamily.NAME)) { - location = entry.getValue().toString(); - } - - if (MetadataSchema.TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN.hasColumns(key)) { - extent = new KeyExtent(key.getRow(), entry.getValue()); - } - } - - if (location != null) - return null; - - if (!extent.getTableId().toString().equals(tableId)) { - throw new AccumuloException("Saw unexpected table Id " + tableId + " " + extent); - } - - if (lastExtent != null && !extent.isPreviousExtent(lastExtent)) { - throw new AccumuloException(" " + lastExtent + " is not previous extent " + extent); - } - - Map> tabletRanges = binnedRanges.get(last); - if (tabletRanges == null) { - tabletRanges = new HashMap>(); - binnedRanges.put(last, tabletRanges); - } - - List rangeList = tabletRanges.get(extent); - if (rangeList == null) { - rangeList = new ArrayList(); - tabletRanges.put(extent, rangeList); - } - - rangeList.add(range); - - if (extent.getEndRow() == null || range.afterEndKey(new Key(extent.getEndRow()).followingKey(PartialKey.ROW))) { - break; - } - - lastExtent = extent; - } - - } - - return binnedRanges; + return InputConfigurator.binOffline(tableId, ranges, instance, conn); } - + /** * Gets the splits of the tables that have been set on the job. * @@ -588,24 +487,25 @@ public abstract class AbstractInputFormat extends InputFormat { Map>> binnedRanges = new HashMap>>(); TabletLocator tl; try { + // resolve table name to id once, and use id from this point forward + tableId = Tables.getTableId(getInstance(context), tableName); if (tableConfig.isOfflineScan()) { - binnedRanges = binOfflineTable(context, tableName, ranges); + binnedRanges = binOfflineTable(context, tableId, ranges); while (binnedRanges == null) { // Some tablets were still online, try again UtilWaitThread.sleep(100 + (int) (Math.random() * 100)); // sleep randomly between 100 and 200 ms - binnedRanges = binOfflineTable(context, tableName, ranges); + binnedRanges = binOfflineTable(context, tableId, ranges); } } else { Instance instance = getInstance(context); - tl = getTabletLocator(context, tableName); + tl = getTabletLocator(context, tableId); // its possible that the cache could contain complete, but old information about a tables tablets... so clear it tl.invalidateCache(); Credentials creds = new Credentials(getPrincipal(context), getAuthenticationToken(context)); while (!tl.binRanges(creds, ranges, binnedRanges).isEmpty()) { if (!(instance instanceof MockInstance)) { - tableId = Tables.getTableId(instance, tableName); if (!Tables.exists(instance, tableId)) throw new TableDeletedException(tableId); if (Tables.getTableState(instance, tableId) == TableState.OFFLINE) @@ -680,6 +580,7 @@ public abstract class AbstractInputFormat extends InputFormat { this.setRange(split.getRange()); this.setLocations(split.getLocations()); this.setTableName(split.getTableName()); + this.setTableId(split.getTableId()); } protected RangeInputSplit(String table, String tableId, Range range, String[] locations) { @@ -788,6 +689,7 @@ public abstract class AbstractInputFormat extends InputFormat { public void readFields(DataInput in) throws IOException { range.readFields(in); tableName = in.readUTF(); + tableId = in.readUTF(); int numLocs = in.readInt(); locations = new String[numLocs]; for (int i = 0; i < numLocs; ++i) @@ -798,6 +700,7 @@ public abstract class AbstractInputFormat extends InputFormat { public void write(DataOutput out) throws IOException { range.write(out); out.writeUTF(tableName); + out.writeUTF(tableId); out.writeInt(locations.length); for (int i = 0; i < locations.length; ++i) out.writeUTF(locations[i]); http://git-wip-us.apache.org/repos/asf/accumulo/blob/7f6e5122/core/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/util/InputConfigurator.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/util/InputConfigurator.java b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/util/InputConfigurator.java index 016efa5..f72c081 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/util/InputConfigurator.java +++ b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/util/InputConfigurator.java @@ -27,12 +27,13 @@ import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; import java.util.HashSet; +import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import java.util.Set; import java.util.StringTokenizer; -import com.google.common.collect.Maps; import org.apache.accumulo.core.Constants; import org.apache.accumulo.core.client.AccumuloException; import org.apache.accumulo.core.client.AccumuloSecurityException; @@ -41,6 +42,7 @@ import org.apache.accumulo.core.client.Connector; import org.apache.accumulo.core.client.Instance; import org.apache.accumulo.core.client.IsolatedScanner; import org.apache.accumulo.core.client.IteratorSetting; +import org.apache.accumulo.core.client.RowIterator; import org.apache.accumulo.core.client.Scanner; import org.apache.accumulo.core.client.TableNotFoundException; import org.apache.accumulo.core.client.impl.Tables; @@ -48,8 +50,20 @@ import org.apache.accumulo.core.client.impl.TabletLocator; import org.apache.accumulo.core.client.mapreduce.BatchScanConfig; import org.apache.accumulo.core.client.mock.MockTabletLocator; import org.apache.accumulo.core.client.security.tokens.AuthenticationToken; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.KeyExtent; +import org.apache.accumulo.core.data.PartialKey; import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.iterators.SortedKeyValueIterator; +import org.apache.accumulo.core.master.state.tables.TableState; +import org.apache.accumulo.core.metadata.MetadataTable; +import org.apache.accumulo.core.metadata.schema.MetadataSchema; +import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection; +import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.CurrentLocationColumnFamily; +import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.FutureLocationColumnFamily; +import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.LastLocationColumnFamily; +import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.TabletColumnFamily; import org.apache.accumulo.core.security.Authorizations; import org.apache.accumulo.core.security.TablePermission; import org.apache.accumulo.core.util.Pair; @@ -61,6 +75,8 @@ import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; import org.apache.hadoop.util.StringUtils; +import com.google.common.collect.Maps; + /** * @since 1.5.0 */ @@ -561,19 +577,19 @@ public class InputConfigurator extends ConfiguratorBase { * the class whose name will be used as a prefix for the property configuration key * @param conf * the Hadoop configuration object to configure - * @param tableName - * The table name for which to initialize the {@link TabletLocator} + * @param tableId + * The table id for which to initialize the {@link TabletLocator} * @return an Accumulo tablet locator * @throws TableNotFoundException * if the table name set on the configuration doesn't exist * @since 1.5.0 */ - public static TabletLocator getTabletLocator(Class implementingClass, Configuration conf, String tableName) throws TableNotFoundException { + public static TabletLocator getTabletLocator(Class implementingClass, Configuration conf, String tableId) throws TableNotFoundException { String instanceType = conf.get(enumToConfKey(implementingClass, InstanceOpts.TYPE)); if ("MockInstance".equals(instanceType)) return new MockTabletLocator(); Instance instance = getInstance(implementingClass, conf); - return TabletLocator.getLocator(instance, new Text(Tables.getTableId(instance, tableName))); + return TabletLocator.getLocator(instance, new Text(tableId)); } // InputFormat doesn't have the equivalent of OutputFormat's checkOutputSpecs(JobContext job) @@ -668,4 +684,94 @@ public class InputConfigurator extends ConfiguratorBase { } return null; } + + public static Map>> binOffline(String tableId, List ranges, Instance instance, Connector conn) + throws AccumuloException, TableNotFoundException { + Map>> binnedRanges = new HashMap>>(); + + if (Tables.getTableState(instance, tableId) != TableState.OFFLINE) { + Tables.clearCache(instance); + if (Tables.getTableState(instance, tableId) != TableState.OFFLINE) { + throw new AccumuloException("Table is online tableId:" + tableId + " cannot scan table in offline mode "); + } + } + + for (Range range : ranges) { + Text startRow; + + if (range.getStartKey() != null) + startRow = range.getStartKey().getRow(); + else + startRow = new Text(); + + Range metadataRange = new Range(new KeyExtent(new Text(tableId), startRow, null).getMetadataEntry(), true, null, false); + Scanner scanner = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY); + MetadataSchema.TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN.fetch(scanner); + scanner.fetchColumnFamily(MetadataSchema.TabletsSection.LastLocationColumnFamily.NAME); + scanner.fetchColumnFamily(MetadataSchema.TabletsSection.CurrentLocationColumnFamily.NAME); + scanner.fetchColumnFamily(MetadataSchema.TabletsSection.FutureLocationColumnFamily.NAME); + scanner.setRange(metadataRange); + + RowIterator rowIter = new RowIterator(scanner); + KeyExtent lastExtent = null; + while (rowIter.hasNext()) { + Iterator> row = rowIter.next(); + String last = ""; + KeyExtent extent = null; + String location = null; + + while (row.hasNext()) { + Map.Entry entry = row.next(); + Key key = entry.getKey(); + + if (key.getColumnFamily().equals(MetadataSchema.TabletsSection.LastLocationColumnFamily.NAME)) { + last = entry.getValue().toString(); + } + + if (key.getColumnFamily().equals(MetadataSchema.TabletsSection.CurrentLocationColumnFamily.NAME) + || key.getColumnFamily().equals(MetadataSchema.TabletsSection.FutureLocationColumnFamily.NAME)) { + location = entry.getValue().toString(); + } + + if (MetadataSchema.TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN.hasColumns(key)) { + extent = new KeyExtent(key.getRow(), entry.getValue()); + } + + } + + if (location != null) + return null; + + if (!extent.getTableId().toString().equals(tableId)) { + throw new AccumuloException("Saw unexpected table Id " + tableId + " " + extent); + } + + if (lastExtent != null && !extent.isPreviousExtent(lastExtent)) { + throw new AccumuloException(" " + lastExtent + " is not previous extent " + extent); + } + + Map> tabletRanges = binnedRanges.get(last); + if (tabletRanges == null) { + tabletRanges = new HashMap>(); + binnedRanges.put(last, tabletRanges); + } + + List rangeList = tabletRanges.get(extent); + if (rangeList == null) { + rangeList = new ArrayList(); + tabletRanges.put(extent, rangeList); + } + + rangeList.add(range); + + if (extent.getEndRow() == null || range.afterEndKey(new Key(extent.getEndRow()).followingKey(PartialKey.ROW))) { + break; + } + + lastExtent = extent; + } + + } + return binnedRanges; + } } http://git-wip-us.apache.org/repos/asf/accumulo/blob/7f6e5122/examples/simple/src/main/java/org/apache/accumulo/examples/simple/mapreduce/UniqueColumns.java ---------------------------------------------------------------------- diff --git a/examples/simple/src/main/java/org/apache/accumulo/examples/simple/mapreduce/UniqueColumns.java b/examples/simple/src/main/java/org/apache/accumulo/examples/simple/mapreduce/UniqueColumns.java index 11ddf7b..501b7e6 100644 --- a/examples/simple/src/main/java/org/apache/accumulo/examples/simple/mapreduce/UniqueColumns.java +++ b/examples/simple/src/main/java/org/apache/accumulo/examples/simple/mapreduce/UniqueColumns.java @@ -94,6 +94,9 @@ public class UniqueColumns extends Configured implements Tool { String clone = opts.tableName; Connector conn = null; + + opts.setAccumuloConfigs(job); + if (opts.offline) { /* * this example clones the table and takes it offline. If you plan to run map reduce jobs over a table many times, it may be more efficient to compact the @@ -106,11 +109,11 @@ public class UniqueColumns extends Configured implements Tool { conn.tableOperations().offline(clone); AccumuloInputFormat.setOfflineTableScan(job, true); + AccumuloInputFormat.setInputTableName(job, clone); } job.setInputFormatClass(AccumuloInputFormat.class); - opts.setAccumuloConfigs(job); - + job.setMapperClass(UMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class);