Return-Path: X-Original-To: apmail-hbase-commits-archive@www.apache.org Delivered-To: apmail-hbase-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id C04A810A26 for ; Wed, 2 Apr 2014 20:50:07 +0000 (UTC) Received: (qmail 92396 invoked by uid 500); 2 Apr 2014 20:50:06 -0000 Delivered-To: apmail-hbase-commits-archive@hbase.apache.org Received: (qmail 92134 invoked by uid 500); 2 Apr 2014 20:50:03 -0000 Mailing-List: contact commits-help@hbase.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@hbase.apache.org Delivered-To: mailing list commits@hbase.apache.org Received: (qmail 91841 invoked by uid 99); 2 Apr 2014 20:49:59 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 02 Apr 2014 20:49:59 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 02 Apr 2014 20:49:49 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 438192388B42; Wed, 2 Apr 2014 20:49:10 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1584162 [4/5] - in /hbase/branches/0.89-fb/src: main/java/org/apache/hadoop/hbase/client/ test/java/org/apache/hadoop/hbase/client/ Date: Wed, 02 Apr 2014 20:49:09 -0000 To: commits@hbase.apache.org From: liyin@apache.org X-Mailer: svnmailer-1.0.9 Message-Id: <20140402204910.438192388B42@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Added: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/TableServers.java URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/TableServers.java?rev=1584162&view=auto ============================================================================== --- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/TableServers.java (added) +++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/TableServers.java Wed Apr 2 20:49:09 2014 @@ -0,0 +1,3512 @@ +package org.apache.hadoop.hbase.client; + +import java.io.EOFException; +import java.io.IOException; +import java.io.InterruptedIOException; +import java.io.SyncFailedException; +import java.lang.reflect.UndeclaredThrowableException; +import java.net.ConnectException; +import java.net.InetSocketAddress; +import java.net.SocketTimeoutException; +import java.nio.channels.ClosedChannelException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Random; +import java.util.Set; +import java.util.TreeSet; +import java.util.concurrent.Callable; +import java.util.concurrent.CancellationException; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ConcurrentSkipListMap; +import java.util.concurrent.ConcurrentSkipListSet; +import java.util.concurrent.CopyOnWriteArraySet; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.FutureTask; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.commons.lang.mutable.MutableBoolean; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.DoNotRetryIOException; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HRegionLocation; +import org.apache.hadoop.hbase.HServerAddress; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.MasterNotRunningException; +import org.apache.hadoop.hbase.NotServingRegionException; +import org.apache.hadoop.hbase.RemoteExceptionHandler; +import org.apache.hadoop.hbase.TableNotFoundException; +import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitor; +import org.apache.hadoop.hbase.ipc.HBaseRPC; +import org.apache.hadoop.hbase.ipc.HBaseRPCOptions; +import org.apache.hadoop.hbase.ipc.HBaseRPCProtocolVersion; +import org.apache.hadoop.hbase.ipc.HConnectionParams; +import org.apache.hadoop.hbase.ipc.HMasterInterface; +import org.apache.hadoop.hbase.ipc.HRegionInterface; +import org.apache.hadoop.hbase.ipc.ThriftClientInterface; +import org.apache.hadoop.hbase.ipc.ThriftHRegionInterface; +import org.apache.hadoop.hbase.ipc.thrift.HBaseThriftRPC; +import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.hbase.regionserver.RegionOverloadedException; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.util.MetaUtils; +import org.apache.hadoop.hbase.util.Threads; +import org.apache.hadoop.hbase.util.Writables; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperWrapper; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.MapWritable; +import org.apache.hadoop.ipc.RemoteException; +import org.apache.thrift.transport.TTransportException; + +import com.google.common.base.Preconditions; + +/* Encapsulates finding the servers for an HBase instance */ +public class TableServers implements ServerConnection { + static final Log LOG = LogFactory.getLog(TableServers.class); + private final Class serverInterfaceClass; + private final int prefetchRegionLimit; + + private final Object masterLock = new Object(); + private volatile boolean closed; + private volatile HMasterInterface master; + private volatile boolean masterChecked; + + private final Object rootRegionLock = new Object(); + private final Object metaRegionLock = new Object(); + private final Object userRegionLock = new Object(); + + private volatile Configuration conf; + private final HConnectionParams params; + + // Used by master and region servers during safe mode only + private volatile HRegionLocation rootRegionLocation; + + private final Map> cachedRegionLocations = new ConcurrentHashMap>(); + + // amount of time to wait before we consider a server to be in fast fail + // mode + protected long fastFailThresholdMilliSec; + // Keeps track of failures when we cannot talk to a server. Helps in + // fast failing clients if the server is down for a long time. + protected final ConcurrentMap repeatedFailuresMap = new ConcurrentHashMap(); + // We populate repeatedFailuresMap every time there is a failure. So, to + // keep it + // from growing unbounded, we garbage collect the failure information + // every cleanupInterval. + protected final long failureMapCleanupIntervalMilliSec; + protected volatile long lastFailureMapCleanupTimeMilliSec; + // Amount of time that has to pass, before we clear region -> regionserver + // cache + // again, when in fast fail mode. This is used to clean unused entries. + protected long cacheClearingTimeoutMilliSec; + // clear failure Info. Used to clean out all entries. + // A safety valve, in case the client does not exit the + // fast fail mode for any reason. + private long fastFailClearingTimeMilliSec; + private final boolean recordClientContext; + + private ThreadLocal> operationContextPerThread = new ThreadLocal>(); + + @Override + public void resetOperationContext() { + if (!recordClientContext || this.operationContextPerThread == null) { + return; + } + + List currContext = this.operationContextPerThread.get(); + + if (currContext != null) { + currContext.clear(); + } + } + + @Override + public List getAndResetOperationContext() { + if (!recordClientContext || this.operationContextPerThread == null) { + return null; + } + + List currContext = this.operationContextPerThread.get(); + + if (currContext == null) { + return null; + } + + ArrayList context = new ArrayList( + currContext); + + // Made a copy, clear the context + currContext.clear(); + + return context; + } + + /** + * Keeps track of repeated failures to any region server. + * + * @author amitanand.s + * + */ + protected class FailureInfo { + // The number of consecutive failures. + private final AtomicLong numConsecutiveFailures = new AtomicLong(); + // The time when the server started to become unresponsive + // Once set, this would never be updated. + private long timeOfFirstFailureMilliSec; + // The time when the client last tried to contact the server. + // This is only updated by one client at a time + private volatile long timeOfLatestAttemptMilliSec; + // The time when the client last cleared cache for regions assigned + // to the server. Used to ensure we don't clearCache too often. + private volatile long timeOfLatestCacheClearMilliSec; + // Used to keep track of concurrent attempts to contact the server. + // In Fast fail mode, we want just one client thread to try to connect + // the rest of the client threads will fail fast. + private final AtomicBoolean exclusivelyRetringInspiteOfFastFail = new AtomicBoolean( + false); + + @Override + public String toString() { + return "FailureInfo: numConsecutiveFailures = " + + numConsecutiveFailures + " timeOfFirstFailureMilliSec = " + + timeOfFirstFailureMilliSec + " timeOfLatestAttemptMilliSec = " + + timeOfLatestAttemptMilliSec + + " timeOfLatestCacheClearMilliSec = " + + timeOfLatestCacheClearMilliSec + + " exclusivelyRetringInspiteOfFastFail = " + + exclusivelyRetringInspiteOfFastFail.get(); + } + + FailureInfo(long firstFailureTime) { + this.timeOfFirstFailureMilliSec = firstFailureTime; + } + } + + private final ThreadLocal threadRetryingInFastFailMode = new ThreadLocal(); + + // For TESTING purposes only; + public Map getFailureMap() { + return repeatedFailuresMap; + } + + // The presence of a server in the map implies it's likely that there is an + // entry in cachedRegionLocations that map to this server; but the absence + // of a server in this map guarantees that there is no entry in cache that + // maps to the absent server. + private final Set cachedServers = + new HashSet(); + + // region cache prefetch is enabled by default. this set contains all + // tables whose region cache prefetch are disabled. + private final Set regionCachePrefetchDisabledTables = + new CopyOnWriteArraySet(); + // keep track of servers that have been updated for batchedLoad + // tablename -> Map + Map> batchedUploadUpdatesMap; + private int batchedUploadSoftFlushRetries; + private long batchedUploadSoftFlushTimeoutMillis; + private final boolean useThrift; + + private ConcurrentSkipListSet initializedTableSet = + new ConcurrentSkipListSet(Bytes.BYTES_COMPARATOR); + /** + * constructor + * @param conf Configuration object + */ + @SuppressWarnings("unchecked") + public TableServers(Configuration conf) { + this.conf = conf; + params = HConnectionParams.getInstance(conf); + this.useThrift = conf.getBoolean(HConstants.CLIENT_TO_RS_USE_THRIFT, + HConstants.CLIENT_TO_RS_USE_THRIFT_DEFAULT); + + String serverClassName = + conf.get(HConstants.REGION_SERVER_CLASS, + HConstants.DEFAULT_REGION_SERVER_CLASS); + + this.closed = false; + + try { + this.serverInterfaceClass = + (Class) Class.forName(serverClassName); + + } catch (ClassNotFoundException e) { + throw new UnsupportedOperationException( + "Unable to find region server interface " + serverClassName, e); + } + + // TODO move parameters below into HConnectionParams + this.cacheClearingTimeoutMilliSec = conf.getLong( + "hbase.client.fastfail.cache.clear.interval", + 10000); // 10 sec + this.fastFailThresholdMilliSec = conf.getLong( + "hbase.client.fastfail.threshold", + 60000); // 1 min + this.failureMapCleanupIntervalMilliSec = conf.getLong( + "hbase.client.fastfail.cleanup.map.interval.millisec", 600000); // 10 min + this.fastFailClearingTimeMilliSec = conf.getLong( + "hbase.client.fastfail.cleanup.all.millisec", 900000); // 15 mins + + this.prefetchRegionLimit = conf.getInt("hbase.client.prefetch.limit", + 10); + + this.master = null; + this.masterChecked = false; + this.batchedUploadSoftFlushRetries = + conf.getInt("hbase.client.batched-upload.softflush.retries", 10); + this.batchedUploadSoftFlushTimeoutMillis = + conf.getLong("hbase.client.batched-upload.softflush.timeout.ms", 60000L); // 1 min + batchedUploadUpdatesMap = new ConcurrentHashMap>(); + + this.recordClientContext = conf.getBoolean("hbase.client.record.context", false); + + } + + // Used by master and region servers during safe mode only + @Override + public void unsetRootRegionLocation() { + this.rootRegionLocation = null; + } + + // Used by master and region servers during safe mode only + @Override + public void setRootRegionLocation(HRegionLocation rootRegion) { + if (rootRegion == null) { + throw new IllegalArgumentException( + "Cannot set root region location to null."); + } + this.rootRegionLocation = rootRegion; + } + + @Override + public HMasterInterface getMaster() throws MasterNotRunningException { + ZooKeeperWrapper zk; + try { + zk = getZooKeeperWrapper(); + } catch (IOException e) { + throw new MasterNotRunningException(e); + } + + HServerAddress masterLocation = null; + synchronized (this.masterLock) { + for (int tries = 0; !this.closed && !this.masterChecked + && this.master == null && tries < params.getNumRetries(); tries++) { + + try { + masterLocation = zk.readMasterAddress(zk); + + if (masterLocation != null) { + HMasterInterface tryMaster = (HMasterInterface) HBaseRPC.getProxy( + HMasterInterface.class, HBaseRPCProtocolVersion.versionID, + masterLocation.getInetSocketAddress(), this.conf, + params.getRpcTimeout(), HBaseRPCOptions.DEFAULT); + + if (tryMaster.isMasterRunning()) { + this.master = tryMaster; + this.masterLock.notifyAll(); + break; + } + } + + } catch (IOException e) { + if (tries == params.getNumRetries() - 1) { + // This was our last chance - don't bother sleeping + LOG.info( + "getMaster attempt " + tries + " of " + + params.getNumRetries() + " failed; no more retrying.", + e); + break; + } + LOG.info( + "getMaster attempt " + tries + " of " + params.getNumRetries() + + " failed; retrying after sleep of " + + params.getPauseTime(tries), e); + } + + // Cannot connect to master or it is not running. Sleep & retry + try { + this.masterLock.wait(params.getPauseTime(tries)); + } catch (InterruptedException e) { + // continue + } + } + this.masterChecked = true; + } + if (this.master == null) { + if (masterLocation == null) { + throw new MasterNotRunningException(); + } + throw new MasterNotRunningException(masterLocation.toString()); + } + return this.master; + } + + @Override + public boolean isMasterRunning() { + if (this.master == null) { + try { + getMaster(); + + } catch (MasterNotRunningException e) { + return false; + } + } + return true; + } + + @Override + public boolean tableExists(final byte[] tableName) + throws MasterNotRunningException { + getMaster(); + if (tableName == null) { + throw new IllegalArgumentException("Table name cannot be null"); + } + if (isMetaTableName(tableName)) { + return true; + } + boolean exists = false; + try { + HTableDescriptor[] tables = listTables(); + for (HTableDescriptor table : tables) { + if (Bytes.equals(table.getName(), tableName)) { + exists = true; + } + } + } catch (IOException e) { + LOG.warn("Testing for table existence threw exception", e); + } + return exists; + } + + /* + * @param n + * + * @return Truen if passed tablename n is equal to the name of + * a catalog table. + */ + private static boolean isMetaTableName(final byte[] n) { + return MetaUtils.isMetaTableName(n); + } + + @Override + public HRegionLocation getRegionLocation(final byte[] name, + final byte[] row, boolean reload) throws IOException { + return reload ? relocateRegion(name, row) : locateRegion(name, row); + } + + @Override + public HTableDescriptor[] listTables() throws IOException { + getMaster(); + final TreeSet uniqueTables = new TreeSet(); + MetaScannerVisitor visitor = new MetaScannerVisitor() { + @Override + public boolean processRow(Result result) throws IOException { + try { + byte[] value = result.getValue(HConstants.CATALOG_FAMILY, + HConstants.REGIONINFO_QUALIFIER); + HRegionInfo info = null; + if (value != null) { + info = Writables.getHRegionInfo(value); + } + // Only examine the rows where the startKey is zero length + if (info != null && info.getStartKey().length == 0) { + uniqueTables.add(info.getTableDesc()); + } + return true; + } catch (RuntimeException e) { + LOG.error("Result=" + result); + throw e; + } + } + }; + MetaScanner.metaScan(conf, visitor); + + return uniqueTables.toArray(new HTableDescriptor[uniqueTables.size()]); + } + + @Override + public boolean isTableEnabled(byte[] tableName) throws IOException { + return testTableOnlineState(tableName, true); + } + + @Override + public boolean isTableDisabled(byte[] tableName) throws IOException { + return testTableOnlineState(tableName, false); + } + + @Override + public boolean isTableAvailable(final byte[] tableName) throws IOException { + final AtomicBoolean available = new AtomicBoolean(true); + MetaScannerVisitor visitor = new MetaScannerVisitor() { + @Override + public boolean processRow(Result row) throws IOException { + byte[] value = row.getValue(HConstants.CATALOG_FAMILY, + HConstants.REGIONINFO_QUALIFIER); + HRegionInfo info = Writables.getHRegionInfoOrNull(value); + if (info != null) { + if (Bytes.equals(tableName, info.getTableDesc().getName())) { + value = row.getValue(HConstants.CATALOG_FAMILY, + HConstants.SERVER_QUALIFIER); + if (value == null) { + available.set(false); + return false; + } + } + } + return true; + } + }; + MetaScanner.metaScan(conf, visitor); + return available.get(); + } + + /* + * If online == true Returns true if all regions are online Returns false in + * any other case If online == false Returns true if all regions are offline + * Returns false in any other case + */ + private boolean testTableOnlineState(byte[] tableName, boolean online) + throws IOException { + if (!tableExists(tableName)) { + throw new TableNotFoundException(Bytes.toString(tableName)); + } + if (Bytes.equals(tableName, HConstants.ROOT_TABLE_NAME)) { + // The root region is always enabled + return true; + } + int rowsScanned = 0; + int rowsOffline = 0; + byte[] startKey = HRegionInfo.createRegionName(tableName, null, + HConstants.ZEROES, false); + byte[] endKey; + HRegionInfo currentRegion; + Scan scan = new Scan(startKey); + scan.addColumn(HConstants.CATALOG_FAMILY, HConstants.REGIONINFO_QUALIFIER); + int rows = this.conf.getInt("hbase.meta.scanner.caching", 100); + scan.setCaching(rows); + ScannerCallable s = new ScannerCallable(this, (Bytes.equals(tableName, + HConstants.META_TABLE_NAME) ? HConstants.ROOT_TABLE_NAME + : HConstants.META_TABLE_NAME), scan, HBaseRPCOptions.DEFAULT); + try { + // Open scanner + getRegionServerWithRetries(s); + do { + currentRegion = s.getHRegionInfo(); + Result r; + Result[] rrs; + while ((rrs = getRegionServerWithRetries(s)) != null + && rrs.length > 0) { + r = rrs[0]; + byte[] value = r.getValue(HConstants.CATALOG_FAMILY, + HConstants.REGIONINFO_QUALIFIER); + if (value != null) { + HRegionInfo info = Writables.getHRegionInfoOrNull(value); + if (info != null) { + if (Bytes.equals(info.getTableDesc().getName(), tableName)) { + rowsScanned += 1; + rowsOffline += info.isOffline() ? 1 : 0; + } + } + } + } + endKey = currentRegion.getEndKey(); + } while (!(endKey == null || Bytes.equals(endKey, + HConstants.EMPTY_BYTE_ARRAY))); + } finally { + s.setClose(); + // Doing below will call 'next' again and this will close the scanner + // Without it we leave scanners open. + getRegionServerWithRetries(s); + } + LOG.debug("Rowscanned=" + rowsScanned + ", rowsOffline=" + rowsOffline); + boolean onOffLine = online ? rowsOffline == 0 + : rowsOffline == rowsScanned; + return rowsScanned > 0 && onOffLine; + } + + private static class HTableDescriptorFinder implements + MetaScanner.MetaScannerVisitor { + byte[] tableName; + HTableDescriptor result; + + protected HTableDescriptorFinder(byte[] tableName) { + this.tableName = tableName; + } + + @Override + public boolean processRow(Result rowResult) throws IOException { + HRegionInfo info = Writables.getHRegionInfo(rowResult.getValue( + HConstants.CATALOG_FAMILY, HConstants.REGIONINFO_QUALIFIER)); + HTableDescriptor desc = info.getTableDesc(); + if (Bytes.compareTo(desc.getName(), tableName) == 0) { + result = desc; + return false; + } + return true; + } + + HTableDescriptor getResult() { + return result; + } + } + + @Override + public HTableDescriptor getHTableDescriptor(final byte[] tableName) + throws IOException { + if (Bytes.equals(tableName, HConstants.ROOT_TABLE_NAME)) { + return new UnmodifyableHTableDescriptor(HTableDescriptor.ROOT_TABLEDESC); + } + if (Bytes.equals(tableName, HConstants.META_TABLE_NAME)) { + return HTableDescriptor.META_TABLEDESC; + } + TableServers.HTableDescriptorFinder finder = new HTableDescriptorFinder(tableName); + MetaScanner.metaScan(conf, finder, tableName); + HTableDescriptor result = finder.getResult(); + if (result == null) { + throw new TableNotFoundException(Bytes.toString(tableName)); + } + return result; + } + + @Override + public HRegionLocation locateRegion(final byte[] tableName, final byte[] row) + throws IOException { + return locateRegion(tableName, row, true); + } + + @Override + public HRegionLocation relocateRegion(final byte[] tableName, + final byte[] row) throws IOException { + return locateRegion(tableName, row, false); + } + + private HRegionLocation locateRegion(final byte[] tableName, + final byte[] row, boolean useCache) throws IOException { + if (tableName == null || tableName.length == 0) { + throw new IllegalArgumentException( + "table name cannot be null or zero length"); + } + if (Bytes.equals(tableName, HConstants.META_TABLE_NAME)) { + return locateMetaInRoot(row, useCache, metaRegionLock); + } else if (Bytes.equals(tableName, HConstants.ROOT_TABLE_NAME)) { + synchronized (rootRegionLock) { + // This block guards against two threads trying to find the root + // region at the same time. One will go do the find while the + // second waits. The second thread will not do find. + + if (!useCache || rootRegionLocation == null + || inFastFailMode(this.rootRegionLocation.getServerAddress())) { + this.rootRegionLocation = locateRootRegion(); + LOG.info("Updated rootRegionLocation from ZK to : " + this.rootRegionLocation); + } + return this.rootRegionLocation; + } + } else { + // Region not in the cache - have to go to the meta RS + return locateRegionInMeta(HConstants.META_TABLE_NAME, tableName, row, + useCache, userRegionLock); + } + } + + private HRegionLocation prefetchRegionCache(final byte[] tableName, + final byte[] row) { + return prefetchRegionCache(tableName, row, this.prefetchRegionLimit); + } + + /* + * Search .META. for the HRegionLocation info that contains the table and + * row we're seeking. It will prefetch certain number of regions info and + * save them to the global region cache. + */ + private HRegionLocation prefetchRegionCache(final byte[] tableName, + final byte[] row, int prefetchRegionLimit) { + // Implement a new visitor for MetaScanner, and use it to walk through + // the .META. + MetaScannerVisitor visitor = new MetaScannerVisitor() { + @Override + public boolean processRow(Result result) throws IOException { + try { + byte[] value = result.getValue(HConstants.CATALOG_FAMILY, + HConstants.REGIONINFO_QUALIFIER); + HRegionInfo regionInfo = null; + + if (value != null) { + // convert the row result into the HRegionLocation we need! + regionInfo = Writables.getHRegionInfo(value); + + // possible we got a region of a different table... + if (!Bytes.equals(regionInfo.getTableDesc().getName(), tableName)) { + return false; // stop scanning + } + if (regionInfo.isOffline()) { + // don't cache offline regions + return true; + } + value = result.getValue(HConstants.CATALOG_FAMILY, + HConstants.SERVER_QUALIFIER); + if (value == null) { + return true; // don't cache it + } + final String serverAddress = Bytes.toString(value); + + value = result.getValue(HConstants.CATALOG_FAMILY, + HConstants.STARTCODE_QUALIFIER); + long serverStartCode = -1; + if (value != null) { + serverStartCode = Bytes.toLong(value); + } + + // instantiate the location + HRegionLocation loc = new HRegionLocation(regionInfo, + new HServerAddress(serverAddress), serverStartCode); + // cache this meta entry + cacheLocation(tableName, loc); + } + return true; + } catch (RuntimeException e) { + throw new IOException(e); + } + } + }; + try { + // pre-fetch certain number of regions info at region cache. + MetaScanner.metaScan(conf, visitor, tableName, row, prefetchRegionLimit); + return getCachedLocation(tableName, row); + } catch (IOException e) { + LOG.warn("Encounted problems when prefetch META table: ", e); + } + return null; + } + + /** + * Search the meta table (.META.) for the HRegionLocation info that + * contains the table and row we're seeking. + */ + private HRegionLocation locateRegionInMeta(final byte [] parentTable, + final byte [] tableName, final byte [] row, boolean useCache, + Object regionLockObject) + throws IOException { + HRegionLocation location; + if (useCache) { + location = getCachedLocation(tableName, row); + if (location != null) { + return location; + } + } + + // If we are supposed to be using the cache, look in the cache to see if + // we already have the region. + + // build the key of the meta region we should be looking for. + // the extra 9's on the end are necessary to allow "exact" matches + // without knowing the precise region names. + byte [] metaKey = HRegionInfo.createRegionName(tableName, row, + HConstants.NINES, false); + for (int tries = 0; true; tries++) { + if (tries >= params.getNumRetries()) { + throw new NoServerForRegionException("Unable to find region for " + + Bytes.toStringBinary(row) + " after " + params.getNumRetries() + + " tries."); + } + + FailureInfo fInfo = null; + HServerAddress server = null; + boolean didTry = false; + boolean couldNotCommunicateWithServer = false; + boolean retryDespiteFastFailMode = false; + try { + // locate the root or meta region + HRegionLocation metaLocation = locateRegion(parentTable, metaKey); + + server = metaLocation.getServerAddress(); + fInfo = repeatedFailuresMap.get(server); + + // Handle the case where .META. is on an unresponsive server. + if (inFastFailMode(server) && + !this.currentThreadInFastFailMode()) { + // In Fast-fail mode, all but one thread will fast fail. Check + // if we are that one chosen thread. + + retryDespiteFastFailMode = shouldRetryInspiteOfFastFail(fInfo); + + if (retryDespiteFastFailMode == false) { // we don't have to retry + throw new PreemptiveFastFailException(fInfo.numConsecutiveFailures.get(), + fInfo.timeOfFirstFailureMilliSec, fInfo.timeOfLatestAttemptMilliSec, server.getHostname()); + } + } + didTry = true; + + HBaseThriftRPC.isMeta.get().push(true); + Result regionInfoRow = null; + try { + // This block guards against two threads trying to load the meta + // region at the same time. The first will load the meta region and + // the second will use the value that the first one found. + synchronized (regionLockObject) { + // Check the cache again for a hit in case some other thread made the + // same query while we were waiting on the lock. If not supposed to + // be using the cache, delete any existing cached location so it won't + // interfere. + if (useCache) { + location = getCachedLocation(tableName, row); + if (location != null) { + return location; + } + } else { + LOG.debug("Deleting the client location cache."); + deleteCachedLocation(tableName, row, null); + } + + // If the parent table is META, we may want to pre-fetch some + // region info into the global region cache for this table. + if (Bytes.equals(parentTable, HConstants.META_TABLE_NAME) && + (getRegionCachePrefetch(tableName)) ) { + LOG.debug("Prefetching the client location cache."); + location = prefetchRegionCache(tableName, row); + if (location != null) { + return location; + } + } + + HRegionInterface serverInterface = + getHRegionConnection(metaLocation.getServerAddress()); + + // Query the root or meta region for the location of the meta region + regionInfoRow = serverInterface.getClosestRowBefore( + metaLocation.getRegionInfo().getRegionName(), metaKey, + HConstants.CATALOG_FAMILY); + } + } catch (Exception e) { + throw e; + } finally { + HBaseThriftRPC.isMeta.get().pop(); + } + location = getLocationFromRow(regionInfoRow, tableName, + parentTable, row); + cacheLocation(tableName, location); + return location; + } catch (TableNotFoundException e) { + // if we got this error, probably means the table just plain doesn't + // exist. rethrow the error immediately. this should always be coming + // from the HTable constructor. + throw e; + } catch (PreemptiveFastFailException e) { + // already processed this. Don't process this again. + throw e; + } catch (IOException e) { + if (e instanceof RemoteException) { + e = RemoteExceptionHandler + .decodeRemoteException((RemoteException) e); + } else if (isNetworkException(e)) { + couldNotCommunicateWithServer = true; + handleFailureToServer(server, e); + } + if (tries < params.getNumRetries() - 1) { + if (LOG.isDebugEnabled()) { + LOG.debug("locateRegionInMeta attempt " + tries + " of " + + params.getNumRetries() + + " failed; retrying after sleep of " + + params.getPauseTime(tries) + " because: " + e.getMessage()); + } + } else { + throw e; + } + // Only relocate the parent region if necessary + if (!(e instanceof RegionOfflineException || e instanceof NoServerForRegionException)) { + relocateRegion(parentTable, row); + } + } catch (Exception e) { + couldNotCommunicateWithServer = true; + handleFailureToServer(server, e); + if (tries < params.getNumRetries() - 1) { + LOG.debug("locateRegionInMeta attempt " + tries + " of " + + params.getNumRetries() + " failed; retrying after sleep of " + + params.getPauseTime(tries) + " because: " + e.getMessage()); + } else { + throw e; + } + } finally { + updateFailureInfoForServer(server, fInfo, didTry, + couldNotCommunicateWithServer, retryDespiteFastFailMode); + } + try { + Thread.sleep(params.getPauseTime(tries)); + } catch (InterruptedException e) { + // continue + } + } +} + +private HRegionLocation getLocationFromRow(Result regionInfoRow, + byte[] tableName, byte[] parentTable, byte[] row) throws IOException { + if (regionInfoRow == null) { + throw new TableNotFoundException(Bytes.toString(tableName)); + } + byte[] value = regionInfoRow.getValue(HConstants.CATALOG_FAMILY, + HConstants.REGIONINFO_QUALIFIER); + if (value == null || value.length == 0) { + throw new IOException("HRegionInfo was null or empty in " + + Bytes.toString(parentTable) + ", row=" + regionInfoRow); + } + // convert the row result into the HRegionLocation we need! + HRegionInfo regionInfo = (HRegionInfo) Writables.getWritable(value, + new HRegionInfo()); + // possible we got a region of a different table... + if (!Bytes.equals(regionInfo.getTableDesc().getName(), tableName)) { + throw new TableNotFoundException("Table '" + Bytes.toString(tableName) + + "' was not found."); + } + if (regionInfo.isOffline()) { + throw new RegionOfflineException("region offline: " + + regionInfo.getRegionNameAsString()); + } + + value = regionInfoRow.getValue(HConstants.CATALOG_FAMILY, + HConstants.SERVER_QUALIFIER); + String serverAddress = ""; + if (value != null) { + serverAddress = Bytes.toString(value); + } + if (serverAddress.equals("")) { + throw new NoServerForRegionException("No server address listed " + + "in " + Bytes.toString(parentTable) + " for region " + + regionInfo.getRegionNameAsString() + " containing row " + + Bytes.toStringBinary(row)); + } + + value = regionInfoRow.getValue(HConstants.CATALOG_FAMILY, + HConstants.STARTCODE_QUALIFIER); + long serverStartCode = -1; + if (value != null) { + serverStartCode = Bytes.toLong(value); + } + // instantiate the location + HRegionLocation location = new HRegionLocation(regionInfo, + new HServerAddress(serverAddress), serverStartCode); + return location; +} + +/** + * TODO:WARNING!!! This looks like a lot of duplicated code with + * {@link #locateRegionInMeta(byte[], byte[], byte[], boolean, Object)} pls + * fix this! Search the Root Table for the Meta Region. Retries a fixed + * number of times and throws if Region is not found. + */ +private HRegionLocation locateMetaInRoot(final byte[] row, + boolean useCache, Object regionLockObject) throws IOException { + HRegionLocation location; + final byte[] parentTable = HConstants.ROOT_TABLE_NAME; + final byte[] tableName = HConstants.META_TABLE_NAME; + if (useCache) { + location = getCachedLocation(tableName, row); + if (location != null) { + return location; + } + } + // If we are supposed to be using the cache, look in the cache to see if + // we already have the region. + + // build the key of the meta region we should be looking for. + // the extra 9's on the end are necessary to allow "exact" matches + // without knowing the precise region names. + byte[] metaKey = HRegionInfo.createRegionName(tableName, row, + HConstants.NINES, false); + for (int tries = 0; true; tries++) { + if (tries >= params.getNumRetries()) { + throw new NoServerForRegionException("Unable to find region for " + + Bytes.toStringBinary(row) + " after " + params.getNumRetries() + + " tries."); + } + + TableServers.FailureInfo fInfo = null; + HServerAddress server = null; + boolean didTry = false; + boolean couldNotCommunicateWithServer = false; + boolean retryDespiteFastFailMode = false; + try { + // locate the root or meta region + HRegionLocation metaLocation = null; + if (useCache && rootRegionLocation != null + && !inFastFailMode(this.rootRegionLocation.getServerAddress())) { + metaLocation = rootRegionLocation; + } else { + synchronized (rootRegionLock) { + // This block guards against two threads trying to find the root + // region at the same time. One will go do the find while the + // second waits. The second thread will not do find. + + if (!useCache || rootRegionLocation == null + || inFastFailMode(this.rootRegionLocation.getServerAddress())) { + HBaseThriftRPC.isMeta.get().push(true); + try { + this.rootRegionLocation = locateRootRegion(); + } catch (Exception e) { + throw e; + } finally { + HBaseThriftRPC.isMeta.get().pop(); + } + LOG.info("Updated rootRegionLocation from ZK to : " + + this.rootRegionLocation); + } + metaLocation = this.rootRegionLocation; + } + } + Preconditions.checkNotNull(metaLocation); + server = metaLocation.getServerAddress(); + fInfo = repeatedFailuresMap.get(server); + + // Handle the case where .META. is on an unresponsive server. + if (inFastFailMode(server) && !this.currentThreadInFastFailMode()) { + // In Fast-fail mode, all but one thread will fast fail. Check + // if we are that one chosen thread. + + retryDespiteFastFailMode = shouldRetryInspiteOfFastFail(fInfo); + + if (retryDespiteFastFailMode == false) { // we don't have to retry + throw new PreemptiveFastFailException( + fInfo.numConsecutiveFailures.get(), + fInfo.timeOfFirstFailureMilliSec, + fInfo.timeOfLatestAttemptMilliSec, server.getHostname()); + } + } + didTry = true; + HBaseThriftRPC.isMeta.get().push(true); + Result regionInfoRow = null; + // This block guards against two threads trying to load the meta + // region at the same time. The first will load the meta region and + // the second will use the value that the first one found. + synchronized (metaRegionLock) { + if (useCache) { + location = getCachedLocation(tableName, row); + if (location != null) { + return location; + } + } else { + LOG.debug("Deleting the client location cache."); + deleteCachedLocation(tableName, row, null); + } + HRegionInterface serverInterface = getHRegionConnection(metaLocation + .getServerAddress()); + + // Query the root for the location of the meta region + regionInfoRow = serverInterface.getClosestRowBefore(metaLocation + .getRegionInfo().getRegionName(), metaKey, + HConstants.CATALOG_FAMILY); + location = getLocationFromRow(regionInfoRow, tableName, + parentTable, row); + cacheLocation(tableName, location); + } + return location; + } catch (TableNotFoundException e) { + // if we got this error, probably means the table just plain doesn't + // exist. rethrow the error immediately. this should always be coming + // from the HTable constructor. + throw e; + } catch (PreemptiveFastFailException e) { + // already processed this. Don't process this again. + throw e; + } catch (IOException e) { + if (e instanceof RemoteException) { + e = RemoteExceptionHandler + .decodeRemoteException((RemoteException) e); + } else if (isNetworkException(e)) { + couldNotCommunicateWithServer = true; + handleFailureToServer(server, e); + } + if (tries < params.getNumRetries() - 1) { + LOG.debug("IOException locateRegionInMeta attempt " + tries + + " of " + params.getNumRetries() + + " failed; retrying after sleep of " + + params.getPauseTime(tries) + " because: " + e.getMessage()); + } else { + throw e; + } + } catch (Exception e) { + couldNotCommunicateWithServer = true; + handleFailureToServer(server, e); + if (tries < params.getNumRetries() - 1) { + LOG.debug("Exception locateRegionInMeta attempt " + tries + " of " + + params.getNumRetries() + " failed; retrying after sleep of " + + params.getPauseTime(tries) + " because: " + e.getMessage()); + } else { + throw e; + } + } finally { + HBaseThriftRPC.isMeta.get().pop(); + updateFailureInfoForServer(server, fInfo, didTry, + couldNotCommunicateWithServer, retryDespiteFastFailMode); + } + try { + Thread.sleep(params.getPauseTime(tries)); + } catch (InterruptedException e) { + // continue + } + } +} + + + + + /** + * Check if the exception is something that indicates that we cannot + * contact/communicate with the server. + * + * @param e + * @return + */ + private boolean isNetworkException(Throwable e) { + // This list covers most connectivity exceptions but not all. + // For example, in SocketOutputStream a plain IOException is thrown + // at times when the channel is closed. + return (e instanceof SocketTimeoutException || + e instanceof ConnectException || + e instanceof ClosedChannelException || + e instanceof SyncFailedException || + e instanceof EOFException || + e instanceof TTransportException); + } + + @Override + public Collection getCachedHRegionLocations(final byte [] tableName, + boolean forceRefresh) { + if (forceRefresh || !initializedTableSet.contains(tableName)) { + prefetchRegionCache(tableName, null, Integer.MAX_VALUE); + initializedTableSet.add(tableName); + } + + ConcurrentSkipListMap tableLocations = + getTableLocations(tableName); + return tableLocations.values(); + } + + /* + * Search the cache for a location that fits our table and row key. + * Return null if no suitable region is located. TODO: synchronization note + * + *

TODO: This method during writing consumes 15% of CPU doing lookup + * into the Soft Reference SortedMap. Improve. + * + * @param tableName + * @param row + * @return Null or region location found in cache. + */ + HRegionLocation getCachedLocation(final byte [] tableName, + final byte [] row) { + ConcurrentSkipListMap tableLocations = + getTableLocations(tableName); + + // start to examine the cache. we can only do cache actions + // if there's something in the cache for this table. + if (tableLocations.isEmpty()) { + return null; + } + + HRegionLocation rl = tableLocations.get(row); + if (rl != null) { + if (LOG.isTraceEnabled()) { + LOG.trace("Cache hit for row <" + Bytes.toStringBinary(row) + + "> in tableName " + Bytes.toString(tableName) + + ": location server " + rl.getServerAddress() + + ", location region name " + + rl.getRegionInfo().getRegionNameAsString()); + } + return rl; + } + + // get the matching region for the row + Entry entry = tableLocations.floorEntry(row); + HRegionLocation possibleRegion = (entry == null) ? null : entry + .getValue(); + + // we need to examine the cached location to verify that it is + // a match by end key as well. + if (possibleRegion != null) { + byte[] endKey = possibleRegion.getRegionInfo().getEndKey(); + + // make sure that the end key is greater than the row we're looking + // for, otherwise the row actually belongs in the next region, not + // this one. the exception case is when the endkey is + // HConstants.EMPTY_START_ROW, signifying that the region we're + // checking is actually the last region in the table. + if (Bytes.equals(endKey, HConstants.EMPTY_END_ROW) + || KeyValue.getRowComparator(tableName).compareRows(endKey, 0, + endKey.length, row, 0, row.length) > 0) { + return possibleRegion; + } + } + + // Passed all the way through, so we got nothing - complete cache miss + return null; + } + + /* + * Delete a cached location for the specified table name and row if it is + * located on the (optionally) specified old location. + */ + @Override + public void deleteCachedLocation(final byte[] tableName, final byte[] row, + HServerAddress oldServer) { + synchronized (this.cachedRegionLocations) { + Map tableLocations = getTableLocations(tableName); + + // start to examine the cache. we can only do cache actions + // if there's something in the cache for this table. + if (!tableLocations.isEmpty()) { + HRegionLocation rl = getCachedLocation(tableName, row); + if (rl != null) { + // If oldLocation is specified. deleteLocation only if it is the + // same. + if (oldServer != null && !oldServer.equals(rl.getServerAddress())) + return; // perhaps, some body else cleared and repopulated. + + tableLocations.remove(rl.getRegionInfo().getStartKey()); + if (LOG.isDebugEnabled()) { + LOG.debug("Removed " + rl.getRegionInfo().getRegionNameAsString() + + " for tableName=" + Bytes.toString(tableName) + + " from cache " + "because of " + Bytes.toStringBinary(row)); + } + } + } + } + } + + /* + * Delete all cached entries of a table that maps to a specific location. + * + * @param tablename + * + * @param server + */ + protected void clearCachedLocationForServer(final String server) { + boolean deletedSomething = false; + synchronized (this.cachedRegionLocations) { + if (!cachedServers.contains(server)) { + return; + } + for (Map tableLocations : cachedRegionLocations + .values()) { + for (Entry e : tableLocations.entrySet()) { + if (e.getValue().getServerAddress().toString().equals(server)) { + tableLocations.remove(e.getKey()); + deletedSomething = true; + } + } + } + cachedServers.remove(server); + } + if (deletedSomething && LOG.isDebugEnabled()) { + LOG.debug("Removed all cached region locations that map to " + server); + } + } + + /* + * @param tableName + * + * @return Map of cached locations for passed tableName + */ + private ConcurrentSkipListMap getTableLocations( + final byte[] tableName) { + // find the map of cached locations for this table + Integer key = Bytes.mapKey(tableName); + ConcurrentSkipListMap result = this.cachedRegionLocations + .get(key); + if (result == null) { + synchronized (this.cachedRegionLocations) { + result = this.cachedRegionLocations.get(key); + if (result == null) { + // if tableLocations for this table isn't built yet, make one + result = new ConcurrentSkipListMap( + Bytes.BYTES_COMPARATOR); + this.cachedRegionLocations.put(key, result); + } + } + } + return result; + } + + /** + * Allows flushing the region cache. + */ + @Override + public void clearRegionCache() { + synchronized (this.cachedRegionLocations) { + cachedRegionLocations.clear(); + cachedServers.clear(); + } + } + + /** + * Put a newly discovered HRegionLocation into the cache. + */ + private void cacheLocation(final byte [] tableName, + final HRegionLocation location) { + byte [] startKey = location.getRegionInfo().getStartKey(); + boolean hasNewCache; + synchronized (this.cachedRegionLocations) { + cachedServers.add(location.getServerAddress().toString()); + hasNewCache = (getTableLocations(tableName).put(startKey, location) == null); + } + if (hasNewCache) { + LOG.debug("Cached location for " + + location.getRegionInfo().getRegionNameAsString() + + " is " + location.getServerAddress().toString()); + } + } + + @SuppressWarnings("resource") + @Override + public HRegionInterface getHRegionConnection( + HServerAddress regionServer, boolean getMaster, HBaseRPCOptions options) + throws IOException { + if (getMaster) { + LOG.debug("Getting master"); + getMaster(); + } + HRegionInterface server = HRegionServer.getMainRS(regionServer); + if (server != null && !this.useThrift) { + return server; + } + + final boolean thriftPortWrittenToMeta = conf.getBoolean( + HConstants.REGION_SERVER_WRITE_THRIFT_INFO_TO_META, + HConstants.REGION_SERVER_WRITE_THRIFT_INFO_TO_META_DEFAULT); + final boolean hadoopPortWrittenToMeta = !thriftPortWrittenToMeta; + + try { + // establish an RPC for this RS + // set hbase.ipc.client.connect.max.retries to retry connection + // attempts + if (this.useThrift) { + Class serverInterface = + ThriftHRegionInterface.class; + if (thriftPortWrittenToMeta) { + try { + server = (HRegionInterface) HBaseThriftRPC.getClient( + regionServer.getInetSocketAddress(), this.conf, + serverInterface, options); + } catch (Exception e) { + LOG.warn("Exception connecting to the region server on" + + "the thrift channel. Retrying on the HadoopRPC port", e); + InetSocketAddress addr = new InetSocketAddress(regionServer + .getInetSocketAddress().getHostName(), conf.getInt( + HConstants.REGIONSERVER_PORT, + HConstants.DEFAULT_REGIONSERVER_PORT)); + server = (HRegionInterface) HBaseRPC.getProxy(serverInterfaceClass, + HBaseRPCProtocolVersion.versionID, addr, this.conf, + params.getRpcTimeout(), options); + } + } else { + try { + InetSocketAddress addr = new InetSocketAddress(regionServer + .getInetSocketAddress().getHostName(), conf.getInt( + HConstants.REGIONSERVER_SWIFT_PORT, + HConstants.DEFAULT_REGIONSERVER_SWIFT_PORT)); + server = (HRegionInterface) HBaseThriftRPC.getClient(addr, + this.conf, serverInterface, options); + } catch (Exception e) { + LOG.warn("Exception connecting to the region server on" + + "the thrift channel. Retrying on the HadoopRPC port", e); + server = (HRegionInterface) HBaseRPC.getProxy(serverInterfaceClass, + HBaseRPCProtocolVersion.versionID, + regionServer.getInetSocketAddress(), this.conf, + params.getRpcTimeout(), options); + } + } + } else { + if (hadoopPortWrittenToMeta) { + server = (HRegionInterface) HBaseRPC.getProxy(serverInterfaceClass, + HBaseRPCProtocolVersion.versionID, + regionServer.getInetSocketAddress(), this.conf, + params.getRpcTimeout(), options); + } else { + // The hadoop port is no longer written to Meta (this will happen + // when we are reasonably confident about the thrift service, and + // will soon deprecate RPC). So, try to connect to the default port. + // TODO gauravm: Verify that this works (t2830553) + InetSocketAddress addr = new InetSocketAddress(regionServer + .getInetSocketAddress().getHostName(), conf.getInt( + HConstants.REGIONSERVER_PORT, + HConstants.DEFAULT_REGIONSERVER_PORT)); + server = (HRegionInterface) HBaseRPC.getProxy(serverInterfaceClass, + HBaseRPCProtocolVersion.versionID, addr, this.conf, + params.getRpcTimeout(), options); + } + } + } catch (RemoteException e) { + throw RemoteExceptionHandler.decodeRemoteException(e); + } + + return server; + } + + @Override + public HRegionInterface getHRegionConnection(HServerAddress regionServer, + HBaseRPCOptions options) throws IOException { + return getHRegionConnection(regionServer, false, options); + } + + @Override + public HRegionInterface getHRegionConnection(HServerAddress regionServer, + boolean getMaster) throws IOException { + return getHRegionConnection(regionServer, getMaster, + HBaseRPCOptions.DEFAULT); + } + + @Override + public HRegionInterface getHRegionConnection(HServerAddress regionServer) + throws IOException { + return getHRegionConnection(regionServer, false); + } + + @Override + public synchronized ZooKeeperWrapper getZooKeeperWrapper() + throws IOException { + return HConnectionManager.getClientZKConnection(conf) + .getZooKeeperWrapper(); + } + + /* + * Repeatedly try to find the root region in ZK + * + * @return HRegionLocation for root region if found + * + * @throws NoServerForRegionException - if the root region can not be + * located after retrying + * + * @throws IOException + */ + private HRegionLocation locateRootRegion() throws IOException { + + // We lazily instantiate the ZooKeeper object because we don't want to + // make the constructor have to throw IOException or handle it itself. + ZooKeeperWrapper zk = getZooKeeperWrapper(); + + HServerAddress rootRegionAddress = null; + for (int tries = 0; tries < params.getNumRetries(); tries++) { + int localTimeouts = 0; + // ask the master which server has the root region + while (rootRegionAddress == null + && localTimeouts < params.getNumRetries()) { + // Don't read root region until we're out of safe mode so we know + // that the meta regions have been assigned. + rootRegionAddress = zk.readRootRegionLocation(); + if (rootRegionAddress == null) { + try { + if (LOG.isDebugEnabled()) { + LOG.debug("Sleeping " + params.getPauseTime(tries) + + "ms, waiting for root region."); + } + Thread.sleep(params.getPauseTime(tries)); + } catch (InterruptedException iex) { + // continue + } + localTimeouts++; + } + } + + if (rootRegionAddress == null) { + throw new NoServerForRegionException( + "Timed out trying to locate root region"); + } + + LOG.debug("Trying to get root region at : " + rootRegionAddress); + try { + // Get a connection to the region server + HRegionInterface server = getHRegionConnection(rootRegionAddress); + // if this works, then we're good, and we have an acceptable address, + // so we can stop doing retries and return the result. + server.getRegionInfo(HRegionInfo.ROOT_REGIONINFO.getRegionName()); + if (LOG.isDebugEnabled()) { + LOG.debug("Found ROOT at " + rootRegionAddress); + } + break; + } catch (Throwable t) { + t = translateException(t); + + if (tries == params.getNumRetries() - 1) { + throw new NoServerForRegionException("Timed out trying to locate "+ + "root region because: " + t.getMessage()); + } + + // Sleep and retry finding root region. + try { + LOG.debug("Root region location changed. Sleeping.", t); + Thread.sleep(params.getPauseTime(tries)); + LOG.debug("Wake. Retry finding root region."); + } catch (InterruptedException iex) { + // continue + } + } + + rootRegionAddress = null; + } + + // if the address is null by this point, then the retries have failed, + // and we're sort of sunk + if (rootRegionAddress == null) { + throw new NoServerForRegionException( + "unable to locate root region server"); + } + + // return the region location + return new HRegionLocation(HRegionInfo.ROOT_REGIONINFO, rootRegionAddress); + } + + @Override + public T getRegionServerWithRetries(ServerCallable callable) + throws IOException { + List exceptions = new ArrayList(); + RegionOverloadedException roe = null; + + long callStartTime; + int serverRequestedRetries = 0; + + callStartTime = System.currentTimeMillis(); + long serverRequestedWaitTime = 0; + // do not retry if region cannot be located. There are enough retries + // within instantiateRegionLocation. + callable.instantiateRegionLocation(false /* reload cache? */); + + for (int tries = 0;; tries++) { + // If server requested wait. We will wait for that time, and start + // again. Do not count this time/tries against the client retries. + if (serverRequestedWaitTime > 0) { + serverRequestedRetries++; + + if (serverRequestedRetries > params.getMaxServerRequestedRetries()) { + throw RegionOverloadedException.create(roe, exceptions, + serverRequestedWaitTime); + } + + long pauseTime = serverRequestedWaitTime + callStartTime + - System.currentTimeMillis(); + LOG.debug("Got a BlockingWritesRetryLaterException: sleeping for " + + pauseTime + "ms. serverRequestedRetries = " + + serverRequestedRetries); + try { + Thread.sleep(pauseTime); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new InterruptedIOException(); + } + + serverRequestedWaitTime = 0; + tries = 0; + callStartTime = System.currentTimeMillis(); + } + + try { + return getRegionServerWithoutRetries(callable, false); + } catch (DoNotRetryIOException ioe) { + // clear cache if needed + if (ioe.getCause() instanceof NotServingRegionException) { + HRegionLocation prevLoc = callable.location; + if (prevLoc.getRegionInfo() != null) { + deleteCachedLocation(callable.tableName, prevLoc.getRegionInfo() + .getStartKey(), prevLoc.getServerAddress()); + } + } + + // If we are not supposed to retry; Let it pass through. + throw ioe; + } catch (RegionOverloadedException ex) { + roe = ex; + serverRequestedWaitTime = roe.getBackoffTimeMillis(); + continue; + } catch (ClientSideDoNotRetryException exp) { + // Bail out of the retry loop, immediately + throw exp; + } catch (PreemptiveFastFailException pfe) { + // Bail out of the retry loop, if the host has been consistently + // unreachable. + throw pfe; + } catch (Throwable t) { + exceptions.add(t); + + if (tries == params.getNumRetries() - 1) { + throw new RetriesExhaustedException(callable.getServerName(), + callable.getRegionName(), callable.getRow(), tries, exceptions); + } + + HRegionLocation prevLoc = callable.location; + if (prevLoc.getRegionInfo() != null) { + deleteCachedLocation(callable.tableName, prevLoc.getRegionInfo() + .getStartKey(), prevLoc.getServerAddress()); + } + + try { + // do not retry if getting the location throws exception + callable.instantiateRegionLocation(false /* reload cache ? */); + } catch (IOException e) { + exceptions.add(e); + throw new RetriesExhaustedException(callable.getServerName(), + callable.getRegionName(), callable.getRow(), tries, exceptions); + } + if (prevLoc.getServerAddress().equals( + callable.location.getServerAddress())) { + // Bail out of the retry loop if we have to wait too long + long pauseTime = params.getPauseTime(tries); + if ((System.currentTimeMillis() - callStartTime + pauseTime) > params + .getRpcRetryTimeout()) { + throw new RetriesExhaustedException(callable.getServerName(), + callable.getRegionName(), callable.getRow(), tries, + exceptions); + } + LOG.debug("getRegionServerWithRetries failed, sleeping for " + + pauseTime + "ms. tries = " + tries, t); + try { + Thread.sleep(pauseTime); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new InterruptedIOException(); + } + // do not reload cache. While we were sleeping hopefully the cache + // has been re-populated. + callable.instantiateRegionLocation(false); + } else { + LOG.debug("getRegionServerWithRetries failed, " + + "region moved from " + prevLoc + " to " + callable.location + + "retrying immediately tries=" + tries, t); + } + } + } + } + + /** + * Pass in a ServerCallable with your particular bit of logic defined and + * this method will pass it to the defined region server. + * + * @param + * the type of the return value + * @param callable + * callable to run + * @return an object of type T + * @throws IOException + * if a remote or network exception occurs + * @throws RuntimeException + * other unspecified error + * @throws PreemptiveFastFailException + * if the remote host has been known to be unreachable for more + * than this.fastFailThresholdMilliSec. + */ + @Override + public T getRegionServerWithoutRetries(ServerCallable callable, + boolean instantiateRegionLocation) throws IOException, + RuntimeException, PreemptiveFastFailException { + FailureInfo fInfo = null; + HServerAddress server = null; + boolean didTry = false; + MutableBoolean couldNotCommunicateWithServer = new MutableBoolean(false); + boolean retryDespiteFastFailMode = false; + try { + if (instantiateRegionLocation) { + callable.instantiateRegionLocation(false); + } + // Logic to fast fail requests to unreachable servers. + server = callable.getServerAddress(); + fInfo = repeatedFailuresMap.get(server); + + if (inFastFailMode(server) && !currentThreadInFastFailMode()) { + // In Fast-fail mode, all but one thread will fast fail. Check + // if we are that one chosen thread. + retryDespiteFastFailMode = shouldRetryInspiteOfFastFail(fInfo); + if (retryDespiteFastFailMode == false) { // we don't have to retry + throw new PreemptiveFastFailException( + fInfo.numConsecutiveFailures.get(), + fInfo.timeOfFirstFailureMilliSec, + fInfo.timeOfLatestAttemptMilliSec, server.getHostname()); + } + } + didTry = true; + callable.instantiateServer(); + return callable.call(); + } catch (PreemptiveFastFailException pfe) { + throw pfe; + } catch (ClientSideDoNotRetryException exp) { + throw exp; + } catch (Throwable t1) { + handleThrowable(t1, callable, couldNotCommunicateWithServer); + return null; + } finally { + updateFailureInfoForServer(server, fInfo, didTry, + couldNotCommunicateWithServer.booleanValue(), + retryDespiteFastFailMode); + } + } + + @Override + public T getRegionServerWithoutRetries(ServerCallable callable) + throws IOException, RuntimeException, PreemptiveFastFailException { + return this.getRegionServerWithoutRetries(callable, true); + } + + private void updateClientContext(final ServerCallable callable, + final Throwable t) { + if (!recordClientContext) { + return; + } + + List currContext = this.operationContextPerThread.get(); + if (currContext == null) { + currContext = new ArrayList(); + this.operationContextPerThread.set(currContext); + } + + currContext.add(new OperationContext(callable.location, t)); + } + + /** + * Handles failures encountered when communicating with a server. + * + * Updates the FailureInfo in repeatedFailuresMap to reflect the failure. + * Throws RepeatedConnectException if the client is in Fast fail mode. + * + * @param server + * @param t + * - the throwable to be handled. + * @throws PreemptiveFastFailException + */ + private void handleFailureToServer(HServerAddress server, Throwable t) { + if (server == null || t == null) + return; + + long currentTime = System.currentTimeMillis(); + FailureInfo fInfo = repeatedFailuresMap.get(server); + if (fInfo == null) { + fInfo = new FailureInfo(currentTime); + FailureInfo oldfInfo = repeatedFailuresMap.putIfAbsent(server, fInfo); + + if (oldfInfo != null) { + fInfo = oldfInfo; + } + } + fInfo.timeOfLatestAttemptMilliSec = currentTime; + fInfo.numConsecutiveFailures.incrementAndGet(); + + if (inFastFailMode(server)) { + // In FastFail mode, do not clear out the cache if it was done recently. + if (currentTime > fInfo.timeOfLatestCacheClearMilliSec + + cacheClearingTimeoutMilliSec) { + fInfo.timeOfLatestCacheClearMilliSec = currentTime; + clearCachedLocationForServer(server.toString()); + } + LOG.error("Exception in FastFail mode : " + t.toString()); + return; + } + + // if thrown these exceptions, we clear all the cache entries that + // map to that slow/dead server; otherwise, let cache miss and ask + // .META. again to find the new location + fInfo.timeOfLatestCacheClearMilliSec = currentTime; + clearCachedLocationForServer(server.toString()); + } + + /** + * Occasionally cleans up unused information in repeatedFailuresMap. + * + * repeatedFailuresMap stores the failure information for all remote hosts + * that had failures. In order to avoid these from growing indefinitely, + * occassionallyCleanupFailureInformation() will clear these up once every + * cleanupInterval ms. + */ + private void occasionallyCleanupFailureInformation() { + long now = System.currentTimeMillis(); + if (!(now > lastFailureMapCleanupTimeMilliSec + + failureMapCleanupIntervalMilliSec)) + return; + + // remove entries that haven't been attempted in a while + // No synchronization needed. It is okay if multiple threads try to + // remove the entry again and again from a concurrent hash map. + StringBuilder sb = new StringBuilder(); + for (Entry entry : repeatedFailuresMap + .entrySet()) { + if (now > entry.getValue().timeOfLatestAttemptMilliSec + + failureMapCleanupIntervalMilliSec) { // no recent failures + repeatedFailuresMap.remove(entry.getKey()); + } else if (now > entry.getValue().timeOfFirstFailureMilliSec + + this.fastFailClearingTimeMilliSec) { // been failing for a long + // time + LOG.error(entry.getKey() + + " been failing for a long time. clearing out." + + entry.getValue().toString()); + repeatedFailuresMap.remove(entry.getKey()); + } else { + sb.append(entry.getKey().toString() + " failing " + + entry.getValue().toString() + "\n"); + } + } + if (sb.length() > 0 + // If there are multiple threads cleaning up, try to see that only one + // will log the msg. + && now > this.lastFailureMapCleanupTimeMilliSec + + this.failureMapCleanupIntervalMilliSec) { + LOG.warn("Preemptive failure enabled for : " + sb.toString()); + } + lastFailureMapCleanupTimeMilliSec = now; + } + + /** + * Checks to see if we are in the Fast fail mode for requests to the server. + * + * If a client is unable to contact a server for more than + * fastFailThresholdMilliSec the client will get into fast fail mode. + * + * @param server + * @return true if the client is in fast fail mode for the server. + */ + private boolean inFastFailMode(HServerAddress server) { + FailureInfo fInfo = repeatedFailuresMap.get(server); + // if fInfo is null --> The server is considered good. + // If the server is bad, wait long enough to believe that the server is + // down. + return (fInfo != null && System.currentTimeMillis() > fInfo.timeOfFirstFailureMilliSec + + this.fastFailThresholdMilliSec); + } + + /** + * Checks to see if the current thread is already in FastFail mode for + * *some* server. + * + * @return true, if the thread is already in FF mode. + */ + private boolean currentThreadInFastFailMode() { + return (this.threadRetryingInFastFailMode.get() != null && this.threadRetryingInFastFailMode + .get().booleanValue() == true); + } + + /** + * Check to see if the client should try to connnect to the server, inspite + * of knowing that it is in the fast fail mode. + * + * The idea here is that we want just one client thread to be actively + * trying to reconnect, while all the other threads trying to reach the + * server will short circuit. + * + * @param fInfo + * @return true if the client should try to connect to the server. + */ + private boolean shouldRetryInspiteOfFastFail(FailureInfo fInfo) { + // We believe that the server is down, But, we want to have just one + // client + // actively trying to connect. If we are the chosen one, we will retry + // and not throw an exception. + if (fInfo != null + && fInfo.exclusivelyRetringInspiteOfFastFail.compareAndSet(false, + true)) { + MutableBoolean threadAlreadyInFF = this.threadRetryingInFastFailMode + .get(); + if (threadAlreadyInFF == null) { + threadAlreadyInFF = new MutableBoolean(); + this.threadRetryingInFastFailMode.set(threadAlreadyInFF); + } + threadAlreadyInFF.setValue(true); + + return true; + } else { + return false; + } + } + + /** + * updates the failure information for the server. + * + * @param server + * @param fInfo + * @param couldNotCommunicate + * @param retryDespiteFastFailMode + */ + private void updateFailureInfoForServer(HServerAddress server, + FailureInfo fInfo, boolean didTry, boolean couldNotCommunicate, + boolean retryDespiteFastFailMode) { + if (server == null || fInfo == null || didTry == false) + return; + + // If we were able to connect to the server, reset the failure + // information. + if (couldNotCommunicate == false) { + LOG.info("Clearing out PFFE for server " + server.getHostname()); + repeatedFailuresMap.remove(server); + } else { + // update time of last attempt + long currentTime = System.currentTimeMillis(); + fInfo.timeOfLatestAttemptMilliSec = currentTime; + + // Release the lock if we were retrying inspite of FastFail + if (retryDespiteFastFailMode) { + fInfo.exclusivelyRetringInspiteOfFastFail.set(false); + threadRetryingInFastFailMode.get().setValue(false); + } + } + + occasionallyCleanupFailureInformation(); + } + + public void updateFailureInfoForServer(HServerAddress server, + boolean didTry, boolean couldNotCommunicate) { + FailureInfo fInfo = repeatedFailuresMap.get(server); + boolean retryDespiteFastFailMode = false; + if (inFastFailMode(server) && !currentThreadInFastFailMode()) { + // In Fast-fail mode, all but one thread will fast fail. Check + // if we are that one chosen thread. + retryDespiteFastFailMode = shouldRetryInspiteOfFastFail(fInfo); + } + + updateFailureInfoForServer(server, fInfo, didTry, couldNotCommunicate, retryDespiteFastFailMode); + } + + public void handleThrowable(Throwable t1, ServerCallable callable, + MutableBoolean couldNotCommunicateWithServer) + throws IOException { + Throwable t2 = translateException(t1); + boolean isLocalException = !(t2 instanceof RemoteException); + // translateException throws DoNotRetryException or any + // non-IOException. + if (isLocalException && isNetworkException(t2)) { + couldNotCommunicateWithServer.setValue(true); + handleFailureToServer(callable.getServerAddress(), t2); + } + + updateClientContext(callable, t2); + if (t2 instanceof IOException) { + throw (IOException) t2; + } else { + throw new RuntimeException(t2); + } + } + + private Callable createGetServerStartCodeCallable( + final HServerAddress address, final HBaseRPCOptions options) { + final HConnection connection = this; + return new Callable() { + @Override + public Long call() throws IOException { + return getRegionServerWithoutRetries(new ServerCallableForBatchOps( + connection, address, options) { + @Override + public Long call() throws IOException { + return server.getStartCode(); + } + }); + } + }; + } + + private Callable createCurrentTimeCallable( + final HServerAddress address, final HBaseRPCOptions options) { + final HConnection connection = this; + return new Callable() { + @Override + public Long call() throws IOException { + return getRegionServerWithoutRetries(new ServerCallableForBatchOps( + connection, address, options) { + @Override + public Long call() throws IOException { + return server.getCurrentTimeMillis(); + } + }); + } + }; + } + + private Callable createGetLastFlushTimesCallable( + final HServerAddress address, final HBaseRPCOptions options) { + final HConnection connection = this; + return new Callable() { + @Override + public MapWritable call() throws IOException { + return getRegionServerWithoutRetries(new ServerCallableForBatchOps( + connection, address, options) { + @Override + public MapWritable call() throws IOException { + return server.getLastFlushTimes(); + } + }); + } + }; + } + + private Callable createFlushCallable(final HServerAddress address, + final HRegionInfo region, final long targetFlushTime, + final HBaseRPCOptions options) { + final HConnection connection = this; + return new Callable() { + @Override + public Void call() throws IOException { + return getRegionServerWithoutRetries(new ServerCallableForBatchOps( + connection, address, options) { + @Override + public Void call() throws IOException { + server.flushRegion(region.getRegionName(), targetFlushTime); + return null; + } + }); + } + }; + } + + private Callable createMultiActionCallable( + final HServerAddress address, final MultiAction multi, + final byte[] tableName, final HBaseRPCOptions options) { + final HConnection connection = this; + // no need to track mutations here. Done at the caller. + return new Callable() { + @Override + public MultiResponse call() throws IOException { + return getRegionServerWithoutRetries( + new ServerCallableForBatchOps(connection, address, + options) { + @Override + public MultiResponse call() throws IOException, + InterruptedException, ExecutionException { + return server.multiAction(multi); + } + }, true); + } + }; + } + + private HRegionLocation getRegionLocationForRowWithRetries( + byte[] tableName, byte[] rowKey, boolean reload) throws IOException { + boolean reloadFlag = reload; + List exceptions = new ArrayList(); + HRegionLocation location = null; + int tries = 0; + for (; tries < params.getNumRetries();) { + try { + location = getRegionLocation(tableName, rowKey, reloadFlag); + } catch (Throwable t) { + exceptions.add(t); + } + if (location != null) { + break; + } + reloadFlag = true; + tries++; + try { + Thread.sleep(params.getPauseTime(tries)); + } catch (InterruptedException e) { + // continue + } + } + if (location == null) { + throw new RetriesExhaustedException( + " -- nothing found, no 'location' returned," + " tableName=" + + Bytes.toString(tableName) + ", reload=" + reload + " --", + HConstants.EMPTY_BYTE_ARRAY, rowKey, tries, exceptions); + } + return location; + } + + private Map splitRequestsByRegionServer( + List workingList, final byte[] tableName, boolean isGets) + throws IOException { + Map actionsByServer = new HashMap(); + for (int i = 0; i < workingList.size(); i++) { + Row row = workingList.get(i); + if (row != null) { + HRegionLocation loc = locateRegion(tableName, row.getRow(), true); + + byte[] regionName = loc.getRegionInfo().getRegionName(); + + MultiAction actions = actionsByServer.get(loc.getServerAddress()); + if (actions == null) { + actions = new MultiAction(); + actionsByServer.put(loc.getServerAddress(), actions); + } + + if (isGets) { + actions.addGet(regionName, (Get) row, i); + } else { + trackMutationsToTable(tableName, loc); + actions.mutate(regionName, (Mutation) row); + } + } + } + return actionsByServer; + } + + private Map> makeServerRequests( + Map actionsByServer, + final byte[] tableName, ExecutorService pool, HBaseRPCOptions options) { + + Map> futures = new HashMap>( + actionsByServer.size()); + + boolean singleServer = (actionsByServer.size() == 1); + for (Entry e : actionsByServer.entrySet()) { + Callable callable = createMultiActionCallable( + e.getKey(), e.getValue(), tableName, options); + Future task; + if (singleServer) { + task = new FutureTask(callable); + ((FutureTask) task).run(); + } else { + task = pool.submit(callable); + } + futures.put(e.getKey(), task); + } + return futures; + } + + /* + * Collects responses from each of the RegionServers. If there are failures, + * a list of failed operations is returned. + */ + private List collectResponsesForMutateFromAllRS(byte[] tableName, + Map actionsByServer, + Map> futures, + Map failureInfo) + throws InterruptedException, IOException { + + List newWorkingList = null; + for (Entry> responsePerServer : futures + .entrySet()) { + HServerAddress address = responsePerServer.getKey(); + MultiAction request = actionsByServer.get(address); + + Future future = responsePerServer.getValue(); + MultiResponse resp = null; + + try { + resp = future.get(); + } catch (InterruptedException ie) { + throw ie; + } catch (ExecutionException e) { + if (e.getCause() instanceof DoNotRetryIOException) + throw (DoNotRetryIOException) e.getCause(); + } + + // If we got a response. Let us go through the responses from each + // region and + // process the Puts and Deletes. + // If the response is null, we will add it to newWorkingList here. + if (request.getDeletes() != null) { + newWorkingList = processMutationResponseFromOneRegionServer( + tableName, address, resp, request.getDeletes(), newWorkingList, + true, failureInfo); + } + if (request.getPuts() != null) { + newWorkingList = processMutationResponseFromOneRegionServer( + tableName, address, resp, request.getPuts(), newWorkingList, + false, failureInfo); + } + } + return newWorkingList; + } + + /* + * Collects responses from each of the RegionServers and populates the + * result array. If there are failures, a list of failed operations is + * returned. + */ + private List collectResponsesForGetFromAllRS(byte[] tableName, + Map actionsByServer, + Map> futures, + List orig_list, Result[] results, + Map failureInfo) throws IOException, + InterruptedException { + + List newWorkingList = null; + for (Entry> responsePerServer : futures + .entrySet()) { + HServerAddress address = responsePerServer.getKey(); + MultiAction request = actionsByServer.get(address); + + Future future = responsePerServer.getValue(); + MultiResponse resp = null; + + try { + resp = future.get(); + } catch (InterruptedException ie) { + throw ie; + } catch (ExecutionException e) { + if (e.getCause() instanceof DoNotRetryIOException) { + throw (DoNotRetryIOException) e.getCause(); + } + e.printStackTrace(); + } + + if (resp == null) { + // Entire server failed + LOG.debug("Failed all for server: " + address + + ", removing from cache"); + } + + newWorkingList = processGetResponseFromOneRegionServer(tableName, + address, request, resp, orig_list, newWorkingList, results, + failureInfo); + } + return newWorkingList; + } + + private List processMutationResponseFromOneRegionServer( + byte[] tableName, HServerAddress address, MultiResponse resp, + Map> map, List newWorkingList, + boolean isDelete, Map failureInfo) + throws IOException { + // If we got a response. Let us go through the responses from each region + // and + // process the Puts and Deletes. + for (Map.Entry> e : map.entrySet()) { + byte[] regionName = e.getKey(); + List regionOps = map.get(regionName); + + long result = 0; + try { + if (isDelete) + result = resp.getDeleteResult(regionName); + else + result = resp.getPutResult(regionName); + + if (result != -1) { + if (newWorkingList == null) + newWorkingList = new ArrayList(); + + newWorkingList.addAll(regionOps.subList((int) result, + regionOps.size())); + } + } catch (Exception ex) { + String serverName = address.getHostname(); + String regName = Bytes.toStringBinary(regionName); + // If response is null, we will catch a NPE here. + translateException(ex); + + if (!failureInfo.containsKey(regName)) { + failureInfo.put(regName, new HRegionFailureInfo(regName)); + } + failureInfo.get(regName).addException(ex); + failureInfo.get(regName).setServerName(serverName); + + if (newWorkingList == null) + newWorkingList = new ArrayList(); + + newWorkingList.addAll(regionOps); + // enough to remove from cache one of the rows from the region + deleteCachedLocation(tableName, regionOps.get(0).getRow(), address); + } + } + + return newWorkingList; + } + + private List processGetResponseFromOneRegionServer(byte[] tableName, + HServerAddress address, MultiAction request, MultiResponse resp, + List orig_list, List newWorkingList, Result[] results, + Map failureInfo) throws IOException { + + for (Map.Entry> e : request.getGets().entrySet()) { + byte[] regionName = e.getKey(); + List regionGets = request.getGets().get(regionName); + List origIndices = request.getOriginalIndex().get(regionName); + + Result[] getResult; + try { + getResult = resp.getGetResult(regionName); + + // fill up the result[] array accordingly + assert (origIndices.size() == getResult.length); + for (int i = 0; i < getResult.length; i++) { + results[origIndices.get(i)] = getResult[i]; + } + } catch (Exception ex) { + // If response is null, we will catch a NPE here. + translateException(ex); + + if (newWorkingList == null) + newWorkingList = new ArrayList(orig_list.size()); + + String serverName = address.getHostname(); + String regName = Bytes.toStringBinary(regionName); + + if (!failureInfo.containsKey(regName)) { + failureInfo.put(regName, new HRegionFailureInfo(regName)); + } + failureInfo.get(regName).addException(ex); + failureInfo.get(regName).setServerName(serverName); + + // Add the element to the correct position + for (int i = 0; i < regionGets.size(); i++) { + newWorkingList.add(origIndices.get(i), regionGets.get(i)); + } + + // enough to clear this once for a region + deleteCachedLocation(tableName, regionGets.get(0).getRow(), address); + } + } + + return newWorkingList; + } + + @Override + public void processBatchedMutations(List orig_list, + final byte[] tableName, ExecutorService pool, List failures, + HBaseRPCOptions options) throws IOException, InterruptedException { + + // Keep track of the most recent servers for any given item for better + // exception reporting. We keep HRegionLocation to save on parsing. + // Later below when we use lastServers, we'll pull what we need from + // lastServers. + // Sort the puts based on the row key in order to optimize the row lock + // acquiring + // in the server side. + + Map failureInfo = new HashMap(); + List workingList = orig_list; + Collections.sort(workingList); + + for (int tries = 0; workingList != null && !workingList.isEmpty() + && tries < params.getNumRetries(); ++tries) { + + if (tries >= 1) { + long sleepTime = params.getPauseTime(tries); + LOG.debug("Retry " + tries + ", sleep for " + sleepTime + "ms!"); + Thread.sleep(sleepTime); + } + + // step 1: break up into regionserver-sized chunks and build the data + // structs + Map actionsByServer = splitRequestsByRegionServer( + workingList, tableName, false); + + // step 2: make the requests + Map> futures = makeServerRequests( + actionsByServer, tableName, pool, options); + + // step 3: collect the failures and successes and prepare for retry + workingList = collectResponsesForMutateFromAllRS(tableName, + actionsByServer, futures, failureInfo); + } + + if (workingList != null && !workingList.isEmpty()) { + if (failures != null) + failures.addAll(workingList); + throw new RetriesExhaustedException(failureInfo, workingList.size() + + "mutate operations remaining after " + params.getNumRetries() + + " retries"); + } + } + + @Override + public void processBatchedGets(List orig_list, final byte[] tableName, + ExecutorService pool, Result[] results, HBaseRPCOptions options) + throws IOException, InterruptedException { + + Map failureInfo = new HashMap(); + // if results is not NULL + // results must be the same size as list + if (results != null && (results.length != orig_list.size())) { + throw new IllegalArgumentException( + "argument results must be the same size as argument list"); + } + + // Keep track of the most recent servers for any given item for better + // exception reporting. We keep HRegionLocation to save on parsing. + // Later below when we use lastServers, we'll pull what we need from + // lastServers. + List workingList = orig_list; + + for (int tries = 0; workingList != null && !workingList.isEmpty() + && tries < params.getNumRetries(); ++tries) { + + if (tries >= 1) { [... 1184 lines stripped ...]