Return-Path: X-Original-To: apmail-accumulo-commits-archive@www.apache.org Delivered-To: apmail-accumulo-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 2FB0C107D5 for ; Sat, 23 Nov 2013 23:51:55 +0000 (UTC) Received: (qmail 64877 invoked by uid 500); 23 Nov 2013 23:51:52 -0000 Delivered-To: apmail-accumulo-commits-archive@accumulo.apache.org Received: (qmail 64281 invoked by uid 500); 23 Nov 2013 23:51:50 -0000 Mailing-List: contact commits-help@accumulo.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@accumulo.apache.org Delivered-To: mailing list commits@accumulo.apache.org Received: (qmail 64222 invoked by uid 99); 23 Nov 2013 23:51:50 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 23 Nov 2013 23:51:50 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 4B862904003; Sat, 23 Nov 2013 23:51:50 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: elserj@apache.org To: commits@accumulo.apache.org Date: Sat, 23 Nov 2013 23:52:01 -0000 Message-Id: In-Reply-To: <3a03c74a531946da9a279bed5eecbbd3@git.apache.org> References: <3a03c74a531946da9a279bed5eecbbd3@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [13/30] git commit: ACCUMULO-1854 Get the mapred package code working with the new approach as well. ACCUMULO-1854 Get the mapred package code working with the new approach as well. Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/03b065d2 Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/03b065d2 Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/03b065d2 Branch: refs/heads/1.6.0-SNAPSHOT Commit: 03b065d2d42945f4edf50d2510a15f523accd0b5 Parents: ef64992 Author: Josh Elser Authored: Fri Nov 22 12:37:21 2013 -0500 Committer: Josh Elser Committed: Fri Nov 22 12:37:21 2013 -0500 ---------------------------------------------------------------------- .../core/client/mapred/InputFormatBase.java | 132 +++++- .../core/client/mapred/RangeInputSplit.java | 416 +++++++++++++++++++ .../core/client/mapreduce/InputFormatBase.java | 19 +- 3 files changed, 533 insertions(+), 34 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/03b065d2/core/src/main/java/org/apache/accumulo/core/client/mapred/InputFormatBase.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/client/mapred/InputFormatBase.java b/core/src/main/java/org/apache/accumulo/core/client/mapred/InputFormatBase.java index 8d3d710..0be4706 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/mapred/InputFormatBase.java +++ b/core/src/main/java/org/apache/accumulo/core/client/mapred/InputFormatBase.java @@ -45,7 +45,7 @@ import org.apache.accumulo.core.client.ZooKeeperInstance; import org.apache.accumulo.core.client.impl.OfflineScanner; import org.apache.accumulo.core.client.impl.Tables; import org.apache.accumulo.core.client.impl.TabletLocator; -import org.apache.accumulo.core.client.mapreduce.RangeInputSplit; +import org.apache.accumulo.core.client.mapred.RangeInputSplit; import org.apache.accumulo.core.client.mapreduce.lib.util.InputConfigurator; import org.apache.accumulo.core.client.mock.MockInstance; import org.apache.accumulo.core.client.security.tokens.AuthenticationToken; @@ -547,8 +547,7 @@ public abstract class InputFormatBase implements InputFormat { * @param scanner * the scanner to configure */ - protected void setupIterators(JobConf job, Scanner scanner) { - List iterators = getIterators(job); + protected void setupIterators(List iterators, Scanner scanner) { for (IteratorSetting iterator : iterators) { scanner.addScanIterator(iterator); } @@ -561,38 +560,91 @@ public abstract class InputFormatBase implements InputFormat { Scanner scanner; split = (RangeInputSplit) inSplit; log.debug("Initializing input split: " + split.getRange()); - Instance instance = getInstance(job); - String user = getPrincipal(job); - String tokenClass = getTokenClass(job); - byte[] password = getToken(job); - Authorizations authorizations = getScanAuthorizations(job); + + Instance instance = split.getInstance(); + if (null == instance) { + instance = getInstance(job); + } + + String principal = split.getPrincipal(); + if (null == principal) { + principal = getPrincipal(job); + } + + AuthenticationToken token = split.getToken(); + if (null == token) { + String tokenClass = getTokenClass(job); + byte[] tokenBytes = getToken(job); + try { + token = CredentialHelper.extractToken(tokenClass, tokenBytes); + } catch (AccumuloSecurityException e) { + throw new IOException(e); + } + } + + Authorizations authorizations = split.getAuths(); + if (null == authorizations) { + authorizations = getScanAuthorizations(job); + } + + String table = split.getTable(); + if (null == table) { + table = getInputTableName(job); + } + + Boolean isOffline = split.isOffline(); + if (null == isOffline) { + isOffline = isOfflineScan(job); + } + + Boolean isIsolated = split.isIsolatedScan(); + if (null == isIsolated) { + isIsolated = isIsolated(job); + } + + Boolean usesLocalIterators = split.usesLocalIterators(); + if (null == usesLocalIterators) { + usesLocalIterators = usesLocalIterators(job); + } + + List iterators = split.getIterators(); + if (null == iterators) { + iterators = getIterators(job); + } + + Set> columns = split.getFetchedColumns(); + if (null == columns) { + columns = getFetchedColumns(job); + } try { - log.debug("Creating connector with user: " + user); - Connector conn = instance.getConnector(user, CredentialHelper.extractToken(tokenClass, password)); - log.debug("Creating scanner for table: " + getInputTableName(job)); + log.debug("Creating connector with user: " + principal); + Connector conn = instance.getConnector(principal, token); + log.debug("Creating scanner for table: " + table); log.debug("Authorizations are: " + authorizations); if (isOfflineScan(job)) { - scanner = new OfflineScanner(instance, new TCredentials(user, tokenClass, ByteBuffer.wrap(password), instance.getInstanceID()), Tables.getTableId( - instance, getInputTableName(job)), authorizations); + String tokenClass = token.getClass().getCanonicalName(); + ByteBuffer tokenBuffer = ByteBuffer.wrap(CredentialHelper.toBytes(token)); + scanner = new OfflineScanner(instance, new TCredentials(principal, tokenClass, tokenBuffer, instance.getInstanceID()), Tables.getTableId( + instance, table), authorizations); } else { - scanner = conn.createScanner(getInputTableName(job), authorizations); + scanner = conn.createScanner(table, authorizations); } - if (isIsolated(job)) { + if (isIsolated) { log.info("Creating isolated scanner"); scanner = new IsolatedScanner(scanner); } - if (usesLocalIterators(job)) { + if (usesLocalIterators) { log.info("Using local iterators"); scanner = new ClientSideIteratorScanner(scanner); } - setupIterators(job, scanner); + setupIterators(iterators, scanner); } catch (Exception e) { throw new IOException(e); } // setup a scanner within the bounds of this split - for (Pair c : getFetchedColumns(job)) { + for (Pair c : columns) { if (c.getSecond() != null) { log.debug("Fetching column " + c.getFirst() + ":" + c.getSecond()); scanner.fetchColumn(c.getFirst(), c.getSecond()); @@ -732,12 +784,33 @@ public abstract class InputFormatBase implements InputFormat { */ @Override public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException { - log.setLevel(getLogLevel(job)); + Level logLevel = getLogLevel(job); + log.setLevel(logLevel); + validateOptions(job); String tableName = getInputTableName(job); boolean autoAdjust = getAutoAdjustRanges(job); List ranges = autoAdjust ? Range.mergeOverlapping(getRanges(job)) : getRanges(job); + Instance instance = getInstance(job); + boolean offline = isOfflineScan(job); + boolean isolated = isIsolated(job); + boolean localIterators = usesLocalIterators(job); + boolean mockInstance = (null != instance && MockInstance.class.equals(instance.getClass())); + Set> fetchedColumns = getFetchedColumns(job); + Authorizations auths = getScanAuthorizations(job); + String principal = getPrincipal(job); + String tokenClass = getTokenClass(job); + byte[] tokenBytes = getToken(job); + + AuthenticationToken token; + try { + token = CredentialHelper.extractToken(tokenClass, tokenBytes); + } catch (AccumuloSecurityException e) { + throw new IOException(e); + } + + List iterators = getIterators(job); if (ranges.isEmpty()) { ranges = new ArrayList(1); @@ -756,13 +829,11 @@ public abstract class InputFormatBase implements InputFormat { binnedRanges = binOfflineTable(job, tableName, ranges); } } else { - Instance instance = getInstance(job); String tableId = null; tl = getTabletLocator(job); // its possible that the cache could contain complete, but old information about a tables tablets... so clear it tl.invalidateCache(); - while (!tl.binRanges(ranges, binnedRanges, - new TCredentials(getPrincipal(job), getTokenClass(job), ByteBuffer.wrap(getToken(job)), getInstance(job).getInstanceID())).isEmpty()) { + while (!tl.binRanges(ranges, binnedRanges, new TCredentials(principal, tokenClass, ByteBuffer.wrap(tokenBytes), instance.getInstanceID())).isEmpty()) { if (!(instance instanceof MockInstance)) { if (tableId == null) tableId = Tables.getTableId(instance, tableName); @@ -819,6 +890,23 @@ public abstract class InputFormatBase implements InputFormat { if (!autoAdjust) for (Entry> entry : splitsToAdd.entrySet()) splits.add(new RangeInputSplit(entry.getKey(), entry.getValue().toArray(new String[0]))); + + for (RangeInputSplit split : splits) { + split.setTable(tableName); + split.setOffline(offline); + split.setIsolatedScan(isolated); + split.setUsesLocalIterators(localIterators); + split.setMockInstance(mockInstance); + split.setFetchedColumns(fetchedColumns); + split.setPrincipal(principal); + split.setToken(token); + split.setInstanceName(instance.getInstanceName()); + split.setZooKeepers(instance.getZooKeepers()); + split.setAuths(auths); + split.setIterators(iterators); + split.setLogLevel(logLevel); + } + return splits.toArray(new InputSplit[splits.size()]); } http://git-wip-us.apache.org/repos/asf/accumulo/blob/03b065d2/core/src/main/java/org/apache/accumulo/core/client/mapred/RangeInputSplit.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/client/mapred/RangeInputSplit.java b/core/src/main/java/org/apache/accumulo/core/client/mapred/RangeInputSplit.java new file mode 100644 index 0000000..01bf6dc --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/core/client/mapred/RangeInputSplit.java @@ -0,0 +1,416 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.core.client.mapred; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.math.BigInteger; +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.List; +import java.util.Set; + +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.Instance; +import org.apache.accumulo.core.client.IteratorSetting; +import org.apache.accumulo.core.client.ZooKeeperInstance; +import org.apache.accumulo.core.client.mapreduce.lib.util.InputConfigurator; +import org.apache.accumulo.core.client.mock.MockInstance; +import org.apache.accumulo.core.client.security.tokens.AuthenticationToken; +import org.apache.accumulo.core.data.ByteSequence; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.PartialKey; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.core.security.CredentialHelper; +import org.apache.accumulo.core.util.Pair; +import org.apache.commons.codec.binary.Base64; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapred.InputSplit; +import org.apache.log4j.Level; + +/** + * The Class RangeInputSplit. Encapsulates an Accumulo range for use in Map Reduce jobs. + */ +public class RangeInputSplit implements InputSplit, Writable { + private Range range; + private String[] locations; + private String table, instanceName, zooKeepers, principal; + private AuthenticationToken token; + private Boolean offline, mockInstance, isolatedScan, localIterators; + private Authorizations auths; + private Set> fetchedColumns; + private List iterators; + private Level level; + + public RangeInputSplit() { + range = new Range(); + locations = new String[0]; + } + + public RangeInputSplit(Range range, String[] locations) { + this.range = range; + this.locations = locations; + } + + public Range getRange() { + return range; + } + + private static byte[] extractBytes(ByteSequence seq, int numBytes) { + byte[] bytes = new byte[numBytes + 1]; + bytes[0] = 0; + for (int i = 0; i < numBytes; i++) { + if (i >= seq.length()) + bytes[i + 1] = 0; + else + bytes[i + 1] = seq.byteAt(i); + } + return bytes; + } + + public static float getProgress(ByteSequence start, ByteSequence end, ByteSequence position) { + int maxDepth = Math.min(Math.max(end.length(), start.length()), position.length()); + BigInteger startBI = new BigInteger(extractBytes(start, maxDepth)); + BigInteger endBI = new BigInteger(extractBytes(end, maxDepth)); + BigInteger positionBI = new BigInteger(extractBytes(position, maxDepth)); + return (float) (positionBI.subtract(startBI).doubleValue() / endBI.subtract(startBI).doubleValue()); + } + + public float getProgress(Key currentKey) { + if (currentKey == null) + return 0f; + if (range.getStartKey() != null && range.getEndKey() != null) { + if (range.getStartKey().compareTo(range.getEndKey(), PartialKey.ROW) != 0) { + // just look at the row progress + return getProgress(range.getStartKey().getRowData(), range.getEndKey().getRowData(), currentKey.getRowData()); + } else if (range.getStartKey().compareTo(range.getEndKey(), PartialKey.ROW_COLFAM) != 0) { + // just look at the column family progress + return getProgress(range.getStartKey().getColumnFamilyData(), range.getEndKey().getColumnFamilyData(), currentKey.getColumnFamilyData()); + } else if (range.getStartKey().compareTo(range.getEndKey(), PartialKey.ROW_COLFAM_COLQUAL) != 0) { + // just look at the column qualifier progress + return getProgress(range.getStartKey().getColumnQualifierData(), range.getEndKey().getColumnQualifierData(), currentKey.getColumnQualifierData()); + } + } + // if we can't figure it out, then claim no progress + return 0f; + } + + /** + * This implementation of length is only an estimate, it does not provide exact values. Do not have your code rely on this return value. + */ + public long getLength() throws IOException { + Text startRow = range.isInfiniteStartKey() ? new Text(new byte[] {Byte.MIN_VALUE}) : range.getStartKey().getRow(); + Text stopRow = range.isInfiniteStopKey() ? new Text(new byte[] {Byte.MAX_VALUE}) : range.getEndKey().getRow(); + int maxCommon = Math.min(7, Math.min(startRow.getLength(), stopRow.getLength())); + long diff = 0; + + byte[] start = startRow.getBytes(); + byte[] stop = stopRow.getBytes(); + for (int i = 0; i < maxCommon; ++i) { + diff |= 0xff & (start[i] ^ stop[i]); + diff <<= Byte.SIZE; + } + + if (startRow.getLength() != stopRow.getLength()) + diff |= 0xff; + + return diff + 1; + } + + public String[] getLocations() throws IOException { + return locations; + } + + public void readFields(DataInput in) throws IOException { + range.readFields(in); + int numLocs = in.readInt(); + locations = new String[numLocs]; + for (int i = 0; i < numLocs; ++i) + locations[i] = in.readUTF(); + + if (in.readBoolean()) { + isolatedScan = in.readBoolean(); + } + + if (in.readBoolean()) { + offline = in.readBoolean(); + } + + if (in.readBoolean()) { + localIterators = in.readBoolean(); + } + + if (in.readBoolean()) { + mockInstance = in.readBoolean(); + } + + if (in.readBoolean()) { + int numColumns = in.readInt(); + List columns = new ArrayList(numColumns); + for (int i = 0; i < numColumns; i++) { + columns.add(in.readUTF()); + } + + fetchedColumns = InputConfigurator.deserializeFetchedColumns(columns); + } + + if (in.readBoolean()) { + String strAuths = in.readUTF(); + auths = new Authorizations(strAuths.getBytes(Charset.forName("UTF-8"))); + } + + if (in.readBoolean()) { + principal = in.readUTF(); + } + + if (in.readBoolean()) { + String tokenClass = in.readUTF(); + byte[] base64TokenBytes = in.readUTF().getBytes(Charset.forName("UTF-8")); + byte[] tokenBytes = Base64.decodeBase64(base64TokenBytes); + + try { + token = CredentialHelper.extractToken(tokenClass, tokenBytes); + } catch (AccumuloSecurityException e) { + throw new IOException(e); + } + } + + if (in.readBoolean()) { + instanceName = in.readUTF(); + } + + if (in.readBoolean()) { + zooKeepers = in.readUTF(); + } + + if (in.readBoolean()) { + level = Level.toLevel(in.readInt()); + } + } + + public void write(DataOutput out) throws IOException { + range.write(out); + out.writeInt(locations.length); + for (int i = 0; i < locations.length; ++i) + out.writeUTF(locations[i]); + + out.writeBoolean(null != isolatedScan); + if (null != isolatedScan) { + out.writeBoolean(isolatedScan); + } + + out.writeBoolean(null != offline); + if (null != offline) { + out.writeBoolean(offline); + } + + out.writeBoolean(null != localIterators); + if (null != localIterators) { + out.writeBoolean(localIterators); + } + + out.writeBoolean(null != mockInstance); + if (null != mockInstance) { + out.writeBoolean(mockInstance); + } + + out.writeBoolean(null != fetchedColumns); + if (null != fetchedColumns) { + String[] cols = InputConfigurator.serializeColumns(fetchedColumns); + out.writeInt(cols.length); + for (String col : cols) { + out.writeUTF(col); + } + } + + out.writeBoolean(null != auths); + if (null != auths) { + out.writeUTF(auths.serialize()); + } + + out.writeBoolean(null != principal); + if (null != principal) { + out.writeUTF(principal); + } + + out.writeBoolean(null != token); + if (null != token) { + out.writeUTF(token.getClass().getCanonicalName()); + try { + out.writeUTF(CredentialHelper.tokenAsBase64(token)); + } catch (AccumuloSecurityException e) { + throw new IOException(e); + } + } + + out.writeBoolean(null != instanceName); + if (null != instanceName) { + out.writeUTF(instanceName); + } + + out.writeBoolean(null != zooKeepers); + if (null != zooKeepers) { + out.writeUTF(zooKeepers); + } + + out.writeBoolean(null != level); + if (null != level) { + out.writeInt(level.toInt()); + } + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(128); + sb.append("Range: ").append(range); + sb.append(" Locations: ").append(locations); + sb.append(" Table: ").append(table); + // TODO finish building of string + return sb.toString(); + } + + public String getTable() { + return table; + } + + public void setTable(String table) { + this.table = table; + } + + public Instance getInstance() { + if (null == instanceName) { + return null; + } + + if (isMockInstance()) { + return new MockInstance(getInstanceName()); + } + + if (null == zooKeepers) { + return null; + } + + return new ZooKeeperInstance(getInstanceName(), getZooKeepers()); + } + + public String getInstanceName() { + return instanceName; + } + + public void setInstanceName(String instanceName) { + this.instanceName = instanceName; + } + + public String getZooKeepers() { + return zooKeepers; + } + + public void setZooKeepers(String zooKeepers) { + this.zooKeepers = zooKeepers; + } + + public String getPrincipal() { + return principal; + } + + public void setPrincipal(String principal) { + this.principal = principal; + } + + public AuthenticationToken getToken() { + return token; + } + + public void setToken(AuthenticationToken token) { + this.token = token;; + } + + public Boolean isOffline() { + return offline; + } + + public void setOffline(Boolean offline) { + this.offline = offline; + } + + public void setLocations(String[] locations) { + this.locations = locations; + } + + public Boolean isMockInstance() { + return mockInstance; + } + + public void setMockInstance(Boolean mockInstance) { + this.mockInstance = mockInstance; + } + + public Boolean isIsolatedScan() { + return isolatedScan; + } + + public void setIsolatedScan(Boolean isolatedScan) { + this.isolatedScan = isolatedScan; + } + + public Authorizations getAuths() { + return auths; + } + + public void setAuths(Authorizations auths) { + this.auths = auths; + } + + public void setRange(Range range) { + this.range = range; + } + + public Boolean usesLocalIterators() { + return localIterators; + } + + public void setUsesLocalIterators(Boolean localIterators) { + this.localIterators = localIterators; + } + + public Set> getFetchedColumns() { + return fetchedColumns; + } + + public void setFetchedColumns(Set> fetchedColumns) { + this.fetchedColumns = fetchedColumns; + } + + public List getIterators() { + return iterators; + } + + public void setIterators(List iterators) { + this.iterators = iterators; + } + + public Level getLogLevel() { + return level; + } + + public void setLogLevel(Level level) { + this.level = level; + } +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/03b065d2/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java index 4e5b5a8..5e246c4 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java +++ b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java @@ -16,17 +16,13 @@ */ package org.apache.accumulo.core.client.mapreduce; -import java.io.DataInput; -import java.io.DataOutput; import java.io.IOException; import java.io.UnsupportedEncodingException; import java.lang.reflect.Method; -import java.math.BigInteger; import java.net.InetAddress; import java.net.URLDecoder; import java.net.URLEncoder; import java.nio.ByteBuffer; -import java.nio.charset.Charset; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; @@ -58,7 +54,6 @@ import org.apache.accumulo.core.client.mapreduce.lib.util.InputConfigurator; import org.apache.accumulo.core.client.mock.MockInstance; import org.apache.accumulo.core.client.security.tokens.AuthenticationToken; import org.apache.accumulo.core.client.security.tokens.PasswordToken; -import org.apache.accumulo.core.data.ByteSequence; import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.KeyExtent; import org.apache.accumulo.core.data.PartialKey; @@ -73,7 +68,6 @@ import org.apache.accumulo.core.util.Pair; import org.apache.accumulo.core.util.UtilWaitThread; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.Text; -import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapreduce.InputFormat; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.Job; @@ -808,7 +802,9 @@ public abstract class InputFormatBase extends InputFormat { */ @Override public List getSplits(JobContext context) throws IOException { - log.setLevel(getLogLevel(context)); + Level logLevel = getLogLevel(context); + log.setLevel(logLevel); + validateOptions(context); String tableName = getInputTableName(context); @@ -822,16 +818,17 @@ public abstract class InputFormatBase extends InputFormat { Set> fetchedColumns = getFetchedColumns(context); Authorizations auths = getScanAuthorizations(context); String principal = getPrincipal(context); + String tokenClass = getTokenClass(context); + byte[] tokenBytes = getToken(context); AuthenticationToken token; try { - token = CredentialHelper.extractToken(getTokenClass(context), getToken(context)); + token = CredentialHelper.extractToken(tokenClass, tokenBytes); } catch (AccumuloSecurityException e) { throw new IOException(e); } List iterators = getIterators(context); - Level logLevel = getLogLevel(context); if (ranges.isEmpty()) { ranges = new ArrayList(1); @@ -854,9 +851,7 @@ public abstract class InputFormatBase extends InputFormat { tl = getTabletLocator(context); // its possible that the cache could contain complete, but old information about a tables tablets... so clear it tl.invalidateCache(); - while (!tl.binRanges(ranges, binnedRanges, - new TCredentials(getPrincipal(context), getTokenClass(context), ByteBuffer.wrap(getToken(context)), getInstance(context).getInstanceID())) - .isEmpty()) { + while (!tl.binRanges(ranges, binnedRanges, new TCredentials(principal, tokenClass, ByteBuffer.wrap(tokenBytes), instance.getInstanceID())).isEmpty()) { if (!(instance instanceof MockInstance)) { if (tableId == null) tableId = Tables.getTableId(instance, tableName);