Return-Path: X-Original-To: apmail-hbase-commits-archive@www.apache.org Delivered-To: apmail-hbase-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 0D60111102 for ; Tue, 8 Apr 2014 15:48:45 +0000 (UTC) Received: (qmail 43138 invoked by uid 500); 8 Apr 2014 15:48:43 -0000 Delivered-To: apmail-hbase-commits-archive@hbase.apache.org Received: (qmail 43103 invoked by uid 500); 8 Apr 2014 15:48:43 -0000 Mailing-List: contact commits-help@hbase.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@hbase.apache.org Delivered-To: mailing list commits@hbase.apache.org Received: (qmail 43096 invoked by uid 99); 8 Apr 2014 15:48:42 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 08 Apr 2014 15:48:42 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 08 Apr 2014 15:48:39 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 2B673238889B; Tue, 8 Apr 2014 15:48:19 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1585766 - in /hbase/branches/hbase-10070: hbase-client/src/main/java/org/apache/hadoop/hbase/ hbase-client/src/main/java/org/apache/hadoop/hbase/catalog/ hbase-client/src/main/java/org/apache/hadoop/hbase/client/ hbase-client/src/test/java... Date: Tue, 08 Apr 2014 15:48:18 -0000 To: commits@hbase.apache.org From: enis@apache.org X-Mailer: svnmailer-1.0.9 Message-Id: <20140408154819.2B673238889B@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: enis Date: Tue Apr 8 15:48:17 2014 New Revision: 1585766 URL: http://svn.apache.org/r1585766 Log: HBASE-10701 Cache invalidation improvements from client side Modified: hbase/branches/hbase-10070/hbase-client/src/main/java/org/apache/hadoop/hbase/HRegionInfo.java hbase/branches/hbase-10070/hbase-client/src/main/java/org/apache/hadoop/hbase/RegionLocations.java hbase/branches/hbase-10070/hbase-client/src/main/java/org/apache/hadoop/hbase/catalog/MetaReader.java hbase/branches/hbase-10070/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterConnection.java hbase/branches/hbase-10070/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java hbase/branches/hbase-10070/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetaCache.java hbase/branches/hbase-10070/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java hbase/branches/hbase-10070/hbase-client/src/test/java/org/apache/hadoop/hbase/TestRegionLocations.java hbase/branches/hbase-10070/hbase-common/src/main/java/org/apache/hadoop/hbase/util/BoundedCompletionService.java hbase/branches/hbase-10070/hbase-common/src/main/java/org/apache/hadoop/hbase/util/Threads.java hbase/branches/hbase-10070/hbase-server/src/main/java/org/apache/hadoop/hbase/client/CoprocessorHConnection.java hbase/branches/hbase-10070/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedReader.java hbase/branches/hbase-10070/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedWriter.java hbase/branches/hbase-10070/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedWriterBase.java Modified: hbase/branches/hbase-10070/hbase-client/src/main/java/org/apache/hadoop/hbase/HRegionInfo.java URL: http://svn.apache.org/viewvc/hbase/branches/hbase-10070/hbase-client/src/main/java/org/apache/hadoop/hbase/HRegionInfo.java?rev=1585766&r1=1585765&r2=1585766&view=diff ============================================================================== --- hbase/branches/hbase-10070/hbase-client/src/main/java/org/apache/hadoop/hbase/HRegionInfo.java (original) +++ hbase/branches/hbase-10070/hbase-client/src/main/java/org/apache/hadoop/hbase/HRegionInfo.java Tue Apr 8 15:48:17 2014 @@ -561,7 +561,9 @@ public class HRegionInfo implements Comp break; } } - if(offset == -1) throw new IOException("Invalid regionName format"); + if (offset == -1) { + throw new IOException("Invalid regionName format: " + Bytes.toStringBinary(regionName)); + } byte[] tableName = new byte[offset]; System.arraycopy(regionName, 0, tableName, 0, offset); offset = -1; @@ -590,7 +592,9 @@ public class HRegionInfo implements Comp break; } } - if(offset == -1) throw new IOException("Invalid regionName format"); + if (offset == -1) { + throw new IOException("Invalid regionName format: " + Bytes.toStringBinary(regionName)); + } byte [] startKey = HConstants.EMPTY_BYTE_ARRAY; if(offset != tableName.length + 1) { startKey = new byte[offset - tableName.length - 1]; Modified: hbase/branches/hbase-10070/hbase-client/src/main/java/org/apache/hadoop/hbase/RegionLocations.java URL: http://svn.apache.org/viewvc/hbase/branches/hbase-10070/hbase-client/src/main/java/org/apache/hadoop/hbase/RegionLocations.java?rev=1585766&r1=1585765&r2=1585766&view=diff ============================================================================== --- hbase/branches/hbase-10070/hbase-client/src/main/java/org/apache/hadoop/hbase/RegionLocations.java (original) +++ hbase/branches/hbase-10070/hbase-client/src/main/java/org/apache/hadoop/hbase/RegionLocations.java Tue Apr 8 15:48:17 2014 @@ -21,7 +21,6 @@ package org.apache.hadoop.hbase; import java.util.Collection; import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.hbase.client.RegionReplicaUtil; import org.apache.hadoop.hbase.util.Bytes; /** @@ -34,27 +33,42 @@ import org.apache.hadoop.hbase.util.Byte public class RegionLocations { private final int numNonNullElements; + + // locations array contains the HRL objects for known region replicas indexes by the replicaId. + // elements can be null if the region replica is not known at all. A null value indicates + // that there is a region replica with the index as replicaId, but the location is not known + // in the cache. private final HRegionLocation[] locations; // replicaId -> HRegionLocation. /** * Constructs the region location list. The locations array should * contain all the locations for known replicas for the region, and should be - * sorted in replicaId ascending order. + * sorted in replicaId ascending order, although it can contain nulls indicating replicaIds + * that the locations of which are not known. * @param locations an array of HRegionLocations for the same region range */ public RegionLocations(HRegionLocation... locations) { int numNonNullElements = 0; int maxReplicaId = -1; + int maxReplicaIdIndex = -1; + int index = 0; for (HRegionLocation loc : locations) { if (loc != null) { - numNonNullElements++; - if (loc.getRegionInfo().getReplicaId() > maxReplicaId) { + if (loc.getServerName() != null) { + numNonNullElements++; + } + if (loc.getRegionInfo().getReplicaId() >= maxReplicaId) { maxReplicaId = loc.getRegionInfo().getReplicaId(); + maxReplicaIdIndex = index; } } + index++; } this.numNonNullElements = numNonNullElements; + // account for the null elements in the array after maxReplicaIdIndex + maxReplicaId = maxReplicaId + (locations.length - (maxReplicaIdIndex + 1) ); + if (maxReplicaId + 1 == locations.length) { this.locations = locations; } else { @@ -97,10 +111,10 @@ public class RegionLocations { } /** - * Returns a new HRegionLocationList with the locations removed (set to null) + * Returns a new RegionLocations with the locations removed (set to null) * which have the destination server as given. * @param serverName the serverName to remove locations of - * @return an HRegionLocationList object with removed locations or the same object + * @return an RegionLocations object with removed locations or the same object * if nothing is removed */ public RegionLocations removeByServer(ServerName serverName) { @@ -123,36 +137,58 @@ public class RegionLocations { /** * Removes the given location from the list * @param location the location to remove - * @return an HRegionLocationList object with removed locations or the same object + * @return an RegionLocations object with removed locations or the same object * if nothing is removed */ public RegionLocations remove(HRegionLocation location) { - HRegionLocation[] newLocations = null; - for (int i = 0; i < locations.length; i++) { - // check whether something to remove. HRL.compareTo() compares ONLY the - // serverName. We want to compare the HRI's as well. - if (locations[i] != null - && location.getRegionInfo().equals(locations[i].getRegionInfo()) - && location.equals(locations[i])) { - if (newLocations == null) { //first time - newLocations = new HRegionLocation[locations.length]; - System.arraycopy(locations, 0, newLocations, 0, i); - } - newLocations[i] = null; - } else if (newLocations != null) { - newLocations[i] = locations[i]; - } + if (location == null) return this; + if (location.getRegionInfo() == null) return this; + int replicaId = location.getRegionInfo().getReplicaId(); + if (replicaId >= locations.length) return this; + + // check whether something to remove. HRL.compareTo() compares ONLY the + // serverName. We want to compare the HRI's as well. + if (locations[replicaId] == null + || !location.getRegionInfo().equals(locations[replicaId].getRegionInfo()) + || !location.equals(locations[replicaId])) { + return this; } - return newLocations == null ? this : new RegionLocations(newLocations); + + HRegionLocation[] newLocations = new HRegionLocation[locations.length]; + System.arraycopy(locations, 0, newLocations, 0, locations.length); + newLocations[replicaId] = null; + + return new RegionLocations(newLocations); + } + + /** + * Removes location of the given replicaId from the list + * @param replicaId the replicaId of the location to remove + * @return an RegionLocations object with removed locations or the same object + * if nothing is removed + */ + public RegionLocations remove(int replicaId) { + if (getRegionLocation(replicaId) == null) { + return this; + } + + HRegionLocation[] newLocations = new HRegionLocation[locations.length]; + + System.arraycopy(locations, 0, newLocations, 0, locations.length); + if (replicaId < newLocations.length) { + newLocations[replicaId] = null; + } + + return new RegionLocations(newLocations); } /** - * Merges this HRegionLocation list with the given list assuming + * Merges this RegionLocations list with the given list assuming * same range, and keeping the most up to date version of the * HRegionLocation entries from either list according to seqNum. If seqNums * are equal, the location from the argument (other) is taken. * @param other the locations to merge with - * @return an HRegionLocationList object with merged locations or the same object + * @return an RegionLocations object with merged locations or the same object * if nothing is merged */ public RegionLocations mergeLocations(RegionLocations other) { @@ -160,7 +196,9 @@ public class RegionLocations { HRegionLocation[] newLocations = null; - int max = Math.max(this.locations.length, other.locations.length); + // Use the length from other, since it is coming from meta. Otherwise, + // in case of region replication going down, we might have a leak here. + int max = other.locations.length; for (int i = 0; i < max; i++) { HRegionLocation thisLoc = this.getRegionLocation(i); @@ -207,7 +245,7 @@ public class RegionLocations { * @param checkForEquals whether to update the location if seqNums for the * HRegionLocations for the old and new location are the same * @param force whether to force update - * @return an HRegionLocationList object with updated locations or the same object + * @return an RegionLocations object with updated locations or the same object * if nothing is updated */ public RegionLocations updateLocation(HRegionLocation location, @@ -282,12 +320,10 @@ public class RegionLocations { public String toString() { StringBuilder builder = new StringBuilder("["); for (HRegionLocation loc : locations) { - if (loc != null) { - if (builder.length() > 1) { - builder.append(", "); - } - builder.append(loc); + if (builder.length() > 1) { + builder.append(", "); } + builder.append(loc == null ? "null" : loc); } builder.append("]"); return builder.toString(); Modified: hbase/branches/hbase-10070/hbase-client/src/main/java/org/apache/hadoop/hbase/catalog/MetaReader.java URL: http://svn.apache.org/viewvc/hbase/branches/hbase-10070/hbase-client/src/main/java/org/apache/hadoop/hbase/catalog/MetaReader.java?rev=1585766&r1=1585765&r2=1585766&view=diff ============================================================================== --- hbase/branches/hbase-10070/hbase-client/src/main/java/org/apache/hadoop/hbase/catalog/MetaReader.java (original) +++ hbase/branches/hbase-10070/hbase-client/src/main/java/org/apache/hadoop/hbase/catalog/MetaReader.java Tue Apr 8 15:48:17 2014 @@ -237,7 +237,7 @@ public class MetaReader { parsedInfo = parseRegionInfoFromRegionName(regionName); row = getMetaKeyForRegion(parsedInfo); } catch (Exception parseEx) { - LOG.warn("Received parse exception:" + parseEx); + // Ignore. This is used with tableName passed as regionName. } Get get = new Get(row); get.addFamily(HConstants.CATALOG_FAMILY); Modified: hbase/branches/hbase-10070/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterConnection.java URL: http://svn.apache.org/viewvc/hbase/branches/hbase-10070/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterConnection.java?rev=1585766&r1=1585765&r2=1585766&view=diff ============================================================================== --- hbase/branches/hbase-10070/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterConnection.java (original) +++ hbase/branches/hbase-10070/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterConnection.java Tue Apr 8 15:48:17 2014 @@ -38,6 +38,7 @@ import org.apache.hadoop.hbase.protobuf. interface ClusterConnection extends HConnection { /** @return - true if the master server is running */ + @Override boolean isMasterRunning() throws MasterNotRunningException, ZooKeeperConnectionException; @@ -53,9 +54,10 @@ interface ClusterConnection extends HCon * @throws IOException * if a remote or network exception occurs */ + @Override boolean isTableAvailable(TableName tableName, byte[][] splitKeys) throws IOException; - + /** * Find the location of the region of tableName that row * lives in. @@ -65,12 +67,14 @@ interface ClusterConnection extends HCon * question * @throws IOException if a remote or network exception occurs */ + @Override public HRegionLocation locateRegion(final TableName tableName, final byte [] row) throws IOException; /** * Allows flushing the region cache. */ + @Override void clearRegionCache(); /** @@ -79,12 +83,14 @@ interface ClusterConnection extends HCon * @param tableName Name of the table whose regions we are to remove from * cache. */ + @Override void clearRegionCache(final TableName tableName); /** * Deletes cached locations for the specific region. * @param location The location object for the region, to be purged from cache. */ + @Override void deleteCachedRegionLocation(final HRegionLocation location); /** @@ -96,10 +102,24 @@ interface ClusterConnection extends HCon * question * @throws IOException if a remote or network exception occurs */ + @Override HRegionLocation relocateRegion(final TableName tableName, final byte [] row) throws IOException; /** + * Find the location of the region of tableName that row + * lives in, ignoring any value that might be in the cache. + * @param tableName name of the table row is in + * @param row row key you're trying to find the region of + * @param replicaId the replicaId of the region + * @return HRegionLocation that describes where to find the region in + * question + * @throws IOException if a remote or network exception occurs + */ + HRegionLocation relocateRegion(final TableName tableName, + final byte [] row, int replicaId) throws IOException; + + /** * Update the location cache. This is used internally by HBase, in most cases it should not be * used by the client application. * @param tableName the table name @@ -119,6 +139,7 @@ interface ClusterConnection extends HCon * question * @throws IOException if a remote or network exception occurs */ + @Override HRegionLocation locateRegion(final byte[] regionName) throws IOException; @@ -128,6 +149,7 @@ interface ClusterConnection extends HCon * @return list of region locations for all regions of table * @throws IOException */ + @Override List locateRegions(final TableName tableName) throws IOException; /** @@ -139,6 +161,7 @@ interface ClusterConnection extends HCon * @return list of region locations for all regions of table * @throws IOException */ + @Override List locateRegions(final TableName tableName, final boolean useCache, final boolean offlined) throws IOException; @@ -154,9 +177,24 @@ interface ClusterConnection extends HCon */ RegionLocations locateRegion(TableName tableName, byte[] row, boolean useCache, boolean retry) throws IOException; + + /** + * + * @param tableName table to get regions of + * @param row the row + * @param useCache Should we use the cache to retrieve the region information. + * @param retry do we retry + * @param replicaId the replicaId for the region + * @return region locations for this row. + * @throws IOException + */ + RegionLocations locateRegion(TableName tableName, + byte[] row, boolean useCache, boolean retry, int replicaId) throws IOException; + /** * Returns a {@link MasterKeepAliveConnection} to the active master */ + @Override MasterService.BlockingInterface getMaster() throws IOException; @@ -166,6 +204,7 @@ interface ClusterConnection extends HCon * @return proxy for HRegionServer * @throws IOException if a remote or network exception occurs */ + @Override AdminService.BlockingInterface getAdmin(final ServerName serverName) throws IOException; /** @@ -177,6 +216,7 @@ interface ClusterConnection extends HCon * @throws IOException if a remote or network exception occurs * */ + @Override ClientService.BlockingInterface getClient(final ServerName serverName) throws IOException; /** @@ -187,6 +227,7 @@ interface ClusterConnection extends HCon * @return Location of row. * @throws IOException if a remote or network exception occurs */ + @Override HRegionLocation getRegionLocation(TableName tableName, byte [] row, boolean reload) throws IOException; @@ -195,6 +236,7 @@ interface ClusterConnection extends HCon * Clear any caches that pertain to server name sn. * @param sn A server name */ + @Override void clearCaches(final ServerName sn); /** @@ -203,6 +245,7 @@ interface ClusterConnection extends HCon * @return The shared instance. Never returns null. * @throws MasterNotRunningException */ + @Override @Deprecated MasterKeepAliveConnection getKeepAliveMasterService() throws MasterNotRunningException; @@ -211,12 +254,14 @@ interface ClusterConnection extends HCon * @param serverName * @return true if the server is known as dead, false otherwise. * @deprecated internal method, do not use thru HConnection */ + @Override @Deprecated boolean isDeadServer(ServerName serverName); /** * @return Nonce generator for this HConnection; may be null if disabled in configuration. */ + @Override public NonceGenerator getNonceGenerator(); /** @@ -228,4 +273,5 @@ interface ClusterConnection extends HCon * @return All locations for a particular region. */ RegionLocations locateRegionAll(TableName tableName, byte[] row) throws IOException; + } \ No newline at end of file Modified: hbase/branches/hbase-10070/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java URL: http://svn.apache.org/viewvc/hbase/branches/hbase-10070/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java?rev=1585766&r1=1585765&r2=1585766&view=diff ============================================================================== --- hbase/branches/hbase-10070/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java (original) +++ hbase/branches/hbase-10070/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java Tue Apr 8 15:48:17 2014 @@ -35,9 +35,6 @@ import java.util.NavigableMap; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.ConcurrentSkipListMap; -import java.util.concurrent.ConcurrentSkipListSet; -import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; @@ -48,7 +45,6 @@ import java.util.concurrent.atomic.Atomi import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Chore; import org.apache.hadoop.hbase.HBaseConfiguration; @@ -79,7 +75,87 @@ import org.apache.hadoop.hbase.protobuf. import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceRequest; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceResponse; -import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.*; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.AddColumnRequest; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.AddColumnResponse; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.AssignRegionRequest; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.AssignRegionResponse; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.BalanceRequest; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.BalanceResponse; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.CreateNamespaceRequest; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.CreateNamespaceResponse; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.CreateTableRequest; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.CreateTableResponse; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DeleteColumnRequest; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DeleteColumnResponse; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DeleteNamespaceRequest; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DeleteNamespaceResponse; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DeleteSnapshotRequest; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DeleteSnapshotResponse; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DeleteTableRequest; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DeleteTableResponse; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DisableTableRequest; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DisableTableResponse; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DispatchMergingRegionsRequest; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DispatchMergingRegionsResponse; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.EnableCatalogJanitorRequest; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.EnableCatalogJanitorResponse; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.EnableTableRequest; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.EnableTableResponse; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ExecProcedureRequest; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ExecProcedureResponse; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetClusterStatusRequest; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetClusterStatusResponse; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetCompletedSnapshotsRequest; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetCompletedSnapshotsResponse; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetNamespaceDescriptorRequest; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetNamespaceDescriptorResponse; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetSchemaAlterStatusRequest; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetSchemaAlterStatusResponse; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetTableDescriptorsRequest; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetTableDescriptorsResponse; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetTableNamesRequest; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetTableNamesResponse; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsCatalogJanitorEnabledRequest; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsCatalogJanitorEnabledResponse; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsMasterRunningRequest; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsMasterRunningResponse; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsProcedureDoneRequest; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsProcedureDoneResponse; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsRestoreSnapshotDoneRequest; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsRestoreSnapshotDoneResponse; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsSnapshotDoneRequest; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsSnapshotDoneResponse; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ListNamespaceDescriptorsRequest; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ListNamespaceDescriptorsResponse; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ListTableDescriptorsByNamespaceRequest; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ListTableDescriptorsByNamespaceResponse; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ListTableNamesByNamespaceRequest; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ListTableNamesByNamespaceResponse; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.MasterService; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ModifyColumnRequest; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ModifyColumnResponse; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ModifyNamespaceRequest; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ModifyNamespaceResponse; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ModifyTableRequest; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ModifyTableResponse; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.MoveRegionRequest; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.MoveRegionResponse; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.OfflineRegionRequest; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.OfflineRegionResponse; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.RestoreSnapshotRequest; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.RestoreSnapshotResponse; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.RunCatalogScanRequest; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.RunCatalogScanResponse; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.SetBalancerRunningRequest; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.SetBalancerRunningResponse; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ShutdownRequest; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ShutdownResponse; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.SnapshotRequest; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.SnapshotResponse; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.StopMasterRequest; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.StopMasterResponse; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.UnassignRegionRequest; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.UnassignRegionResponse; import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.security.UserProvider; @@ -354,6 +430,7 @@ class ConnectionManager { * @param conf configuration whose identity is used to find {@link HConnection} instance. * @deprecated */ + @Deprecated public static void deleteConnection(Configuration conf) { deleteConnection(new HConnectionKey(conf), false); } @@ -365,6 +442,7 @@ class ConnectionManager { * @param connection * @deprecated */ + @Deprecated public static void deleteStaleConnection(HConnection connection) { deleteConnection(connection, true); } @@ -375,6 +453,7 @@ class ConnectionManager { * staleConnection to true. * @deprecated */ + @Deprecated public static void deleteAllConnections(boolean staleConnection) { synchronized (CONNECTION_INSTANCES) { Set connectionKeys = new HashSet(); @@ -1003,6 +1082,12 @@ class ConnectionManager { @Override public HRegionLocation relocateRegion(final TableName tableName, final byte [] row) throws IOException{ + return relocateRegion(tableName, row, RegionReplicaUtil.DEFAULT_REPLICA_ID); + } + + @Override + public HRegionLocation relocateRegion(final TableName tableName, + final byte [] row, int replicaId) throws IOException{ // Since this is an explicit request not to use any caching, finding // disabled tables should not be desirable. This will ensure that an exception is thrown when // the first time a disabled table is interacted with. @@ -1010,8 +1095,8 @@ class ConnectionManager { throw new TableNotEnabledException(tableName.getNameAsString() + " is disabled."); } - RegionLocations locations = locateRegion(tableName, row, false, true); - return locations == null ? null : locations.getRegionLocation(); + RegionLocations locations = locateRegion(tableName, row, false, true, replicaId); + return locations == null ? null : locations.getRegionLocation(replicaId); } @Override @@ -1020,11 +1105,17 @@ class ConnectionManager { return relocateRegion(TableName.valueOf(tableName), row); } - @Override public RegionLocations locateRegion(final TableName tableName, final byte [] row, boolean useCache, boolean retry) throws IOException { + return locateRegion(tableName, row, useCache, retry, RegionReplicaUtil.DEFAULT_REPLICA_ID); + } + + @Override + public RegionLocations locateRegion(final TableName tableName, + final byte [] row, boolean useCache, boolean retry, int replicaId) + throws IOException { if (this.closed) throw new IOException(toString() + " closed"); if (tableName== null || tableName.getName().length == 0) { throw new IllegalArgumentException( @@ -1036,7 +1127,7 @@ class ConnectionManager { } else { // Region not in the cache - have to go to the meta RS return locateRegionInMeta(TableName.META_TABLE_NAME, tableName, row, - useCache, userRegionLock, retry); + useCache, userRegionLock, retry, replicaId); } } @@ -1100,15 +1191,15 @@ class ConnectionManager { */ private RegionLocations locateRegionInMeta(final TableName parentTable, final TableName tableName, final byte [] row, boolean useCache, - Object regionLockObject, boolean retry) + Object regionLockObject, boolean retry, int replicaId) throws IOException { - RegionLocations location; + RegionLocations locations; // If we are supposed to be using the cache, look in the cache to see if // we already have the region. if (useCache) { - location = getCachedLocation(tableName, row); - if (location != null) { - return location; + locations = getCachedLocation(tableName, row); + if (locations != null && locations.getRegionLocation(replicaId) != null) { + return locations; } } int localNumRetries = retry ? numTries : 1; @@ -1127,7 +1218,7 @@ class ConnectionManager { try { // locate the meta region RegionLocations metaLocations = locateRegion(parentTable, metaKey, true, false); - metaLocation = metaLocations == null ? null : metaLocations.getRegionLocation(); + metaLocation = metaLocations == null ? null : metaLocations.getDefaultRegionLocation(); // If null still, go around again. if (metaLocation == null) continue; ClientService.BlockingInterface service = getClient(metaLocation.getServerName()); @@ -1142,23 +1233,23 @@ class ConnectionManager { synchronized (regionLockObject) { // Check the cache again for a hit in case some other thread made the // same query while we were waiting on the lock. - location = getCachedLocation(tableName, row); - if (location != null) { - return location; + locations = getCachedLocation(tableName, row); + if (locations != null && locations.getRegionLocation(replicaId) != null) { + return locations; } // If the parent table is META, we may want to pre-fetch some // region info into the global region cache for this table. prefetchRegionCache(tableName, row); } } - location = getCachedLocation(tableName, row); - if (location != null) { - return location; + locations = getCachedLocation(tableName, row); + if (locations != null && locations.getRegionLocation(replicaId) != null) { + return locations; } } else { // If we are not supposed to be using the cache, delete any existing cached location // so it won't interfere. - metaCache.clearCache(tableName, row); + metaCache.clearCache(tableName, row, replicaId); } // Query the meta region for the location of the meta region @@ -1171,12 +1262,12 @@ class ConnectionManager { } // convert the row result into the HRegionLocation we need! - location = MetaReader.getRegionLocations(regionInfoRow); - if (location == null || location.getRegionLocation() == null) { + locations = MetaReader.getRegionLocations(regionInfoRow); + if (locations == null || locations.getRegionLocation(replicaId) == null) { throw new IOException("HRegionInfo was null in " + parentTable + ", row=" + regionInfoRow); } - HRegionInfo regionInfo = location.getRegionLocation().getRegionInfo(); + HRegionInfo regionInfo = locations.getRegionLocation(replicaId).getRegionInfo(); if (regionInfo == null) { throw new IOException("HRegionInfo was null or empty in " + parentTable + ", row=" + regionInfoRow); @@ -1200,7 +1291,7 @@ class ConnectionManager { regionInfo.getRegionNameAsString()); } - ServerName serverName = location.getRegionLocation().getServerName(); + ServerName serverName = locations.getRegionLocation(replicaId).getServerName(); if (serverName == null) { throw new NoServerForRegionException("No server address listed " + "in " + parentTable + " for region " + @@ -1214,8 +1305,8 @@ class ConnectionManager { ", but it is dead."); } - cacheLocation(tableName, location); - return location; + cacheLocation(tableName, locations); + return locations; } catch (TableNotFoundException e) { // if we got this error, probably means the table just plain doesn't // exist. rethrow the error immediately. this should always be coming @@ -1242,7 +1333,7 @@ class ConnectionManager { // Only relocate the parent region if necessary if(!(e instanceof RegionOfflineException || e instanceof NoServerForRegionException)) { - relocateRegion(parentTable, metaKey); + relocateRegion(parentTable, metaKey, replicaId); } } try{ Modified: hbase/branches/hbase-10070/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetaCache.java URL: http://svn.apache.org/viewvc/hbase/branches/hbase-10070/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetaCache.java?rev=1585766&r1=1585765&r2=1585766&view=diff ============================================================================== --- hbase/branches/hbase-10070/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetaCache.java (original) +++ hbase/branches/hbase-10070/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetaCache.java Tue Apr 8 15:48:17 2014 @@ -114,6 +114,9 @@ public class MetaCache { RegionLocations oldLocations = tableLocations.putIfAbsent(startKey, locations); boolean isNewCacheEntry = (oldLocations == null); if (isNewCacheEntry) { + if (LOG.isTraceEnabled()) { + LOG.trace("Cached location: " + location); + } addToCachedServers(locations); return; } @@ -131,7 +134,10 @@ public class MetaCache { // an additional counter on top of seqNum would be necessary to handle them all. RegionLocations updatedLocations = oldLocations.updateLocation(location, false, force); if (oldLocations != updatedLocations) { - tableLocations.replace(startKey, oldLocations, updatedLocations); + boolean replaced = tableLocations.replace(startKey, oldLocations, updatedLocations); + if (replaced && LOG.isTraceEnabled()) { + LOG.trace("Changed cached location to: " + location); + } addToCachedServers(updatedLocations); } } @@ -139,24 +145,30 @@ public class MetaCache { /** * Put a newly discovered HRegionLocation into the cache. * @param tableName The table name. - * @param location the new location + * @param locations the new locations */ - public void cacheLocation(final TableName tableName, final RegionLocations location) { - byte [] startKey = location.getRegionLocation().getRegionInfo().getStartKey(); + public void cacheLocation(final TableName tableName, final RegionLocations locations) { + byte [] startKey = locations.getRegionLocation().getRegionInfo().getStartKey(); ConcurrentMap tableLocations = getTableLocations(tableName); - RegionLocations oldLocation = tableLocations.putIfAbsent(startKey, location); + RegionLocations oldLocation = tableLocations.putIfAbsent(startKey, locations); boolean isNewCacheEntry = (oldLocation == null); if (isNewCacheEntry) { - addToCachedServers(location); + if (LOG.isTraceEnabled()) { + LOG.trace("Cached location: " + locations); + } + addToCachedServers(locations); return; } // merge old and new locations and add it to the cache // Meta record might be stale - some (probably the same) server has closed the region // with later seqNum and told us about the new location. - RegionLocations mergedLocation = oldLocation.mergeLocations(location); - tableLocations.replace(startKey, oldLocation, mergedLocation); - addToCachedServers(location); + RegionLocations mergedLocation = oldLocation.mergeLocations(locations); + boolean replaced = tableLocations.replace(startKey, oldLocation, mergedLocation); + if (replaced && LOG.isTraceEnabled()) { + LOG.trace("Merged cached locations: " + mergedLocation); + } + addToCachedServers(locations); } private void addToCachedServers(RegionLocations locations) { @@ -245,12 +257,11 @@ public class MetaCache { RegionLocations regionLocations = e.getValue(); if (regionLocations != null) { RegionLocations updatedLocations = regionLocations.removeByServer(serverName); - deletedSomething |= regionLocations == updatedLocations; if (updatedLocations != regionLocations) { if (updatedLocations.isEmpty()) { - tableLocations.remove(e.getKey(), regionLocations); + deletedSomething |= tableLocations.remove(e.getKey(), regionLocations); } else { - tableLocations.replace(e.getKey(), regionLocations, updatedLocations); + deletedSomething |= tableLocations.replace(e.getKey(), regionLocations, updatedLocations); } } } @@ -258,8 +269,8 @@ public class MetaCache { } this.cachedServers.remove(serverName); } - if (deletedSomething && LOG.isDebugEnabled()) { - LOG.debug("Removed all cached region locations that map to " + serverName); + if (deletedSomething && LOG.isTraceEnabled()) { + LOG.trace("Removed all cached region locations that map to " + serverName); } } @@ -267,6 +278,9 @@ public class MetaCache { * Delete all cached entries of a table. */ public void clearCache(final TableName tableName) { + if (LOG.isTraceEnabled()) { + LOG.trace("Removed all cached region locations for table " + tableName); + } this.cachedRegionLocations.remove(tableName); } @@ -275,6 +289,34 @@ public class MetaCache { * @param tableName tableName * @param row */ + public void clearCache(final TableName tableName, final byte [] row, int replicaId) { + ConcurrentMap tableLocations = getTableLocations(tableName); + + boolean removed = false; + RegionLocations regionLocations = getCachedLocation(tableName, row); + if (regionLocations != null) { + HRegionLocation toBeRemoved = regionLocations.getRegionLocation(replicaId); + RegionLocations updatedLocations = regionLocations.remove(replicaId); + if (updatedLocations != regionLocations) { + byte[] startKey = regionLocations.getRegionLocation().getRegionInfo().getStartKey(); + if (updatedLocations.isEmpty()) { + removed = tableLocations.remove(startKey, regionLocations); + } else { + removed = tableLocations.replace(startKey, regionLocations, updatedLocations); + } + } + + if (removed && LOG.isTraceEnabled() && toBeRemoved != null) { + LOG.trace("Removed " + toBeRemoved + " from cache"); + } + } + } + + /** + * Delete a cached location, no matter what it is. Called when we were told to not use cache. + * @param tableName tableName + * @param row + */ public void clearCache(final TableName tableName, final byte [] row) { ConcurrentMap tableLocations = getTableLocations(tableName); @@ -282,8 +324,8 @@ public class MetaCache { if (regionLocations != null) { byte[] startKey = regionLocations.getRegionLocation().getRegionInfo().getStartKey(); boolean removed = tableLocations.remove(startKey, regionLocations); - if (removed && LOG.isDebugEnabled()) { - LOG.debug("Removed " + regionLocations + " from cache"); + if (removed && LOG.isTraceEnabled()) { + LOG.trace("Removed " + regionLocations + " from cache"); } } } @@ -299,10 +341,15 @@ public class MetaCache { RegionLocations updatedLocations = regionLocations.removeByServer(serverName); if (updatedLocations != regionLocations) { byte[] startKey = regionLocations.getRegionLocation().getRegionInfo().getStartKey(); + boolean removed = false; if (updatedLocations.isEmpty()) { - tableLocations.remove(startKey, regionLocations); + removed = tableLocations.remove(startKey, regionLocations); } else { - tableLocations.replace(startKey, regionLocations, updatedLocations); + removed = tableLocations.replace(startKey, regionLocations, updatedLocations); + } + if (removed && LOG.isTraceEnabled()) { + LOG.trace("Removed locations of table: " + tableName + " ,row: " + Bytes.toString(row) + + " mapping to server: " + serverName + " from cache"); } } } @@ -317,12 +364,17 @@ public class MetaCache { RegionLocations regionLocations = tableLocations.get(hri.getStartKey()); if (regionLocations != null) { HRegionLocation oldLocation = regionLocations.getRegionLocation(hri.getReplicaId()); + if (oldLocation == null) return; RegionLocations updatedLocations = regionLocations.remove(oldLocation); + boolean removed = false; if (updatedLocations != regionLocations) { if (updatedLocations.isEmpty()) { - tableLocations.remove(hri.getStartKey(), regionLocations); + removed = tableLocations.remove(hri.getStartKey(), regionLocations); } else { - tableLocations.replace(hri.getStartKey(), regionLocations, updatedLocations); + removed = tableLocations.replace(hri.getStartKey(), regionLocations, updatedLocations); + } + if (removed && LOG.isTraceEnabled()) { + LOG.trace("Removed " + oldLocation + " from cache"); } } } @@ -332,22 +384,22 @@ public class MetaCache { if (location == null) { return; } - TableName tableName = location.getRegionInfo().getTable(); ConcurrentMap tableLocations = getTableLocations(tableName); - RegionLocations rll = tableLocations.get(location.getRegionInfo().getStartKey()); - if (rll == null) { - return; - } - RegionLocations updatedLocations = rll.remove(location); - if (updatedLocations.isEmpty()) { - tableLocations.remove(location.getRegionInfo().getStartKey(), rll); - } - if (LOG.isDebugEnabled() && (rll == updatedLocations)) { - LOG.debug("Removed " + - location.getRegionInfo().getRegionNameAsString() + - " for tableName=" + tableName + - " from cache"); + RegionLocations regionLocations = tableLocations.get(location.getRegionInfo().getStartKey()); + if (regionLocations != null) { + RegionLocations updatedLocations = regionLocations.remove(location); + boolean removed = false; + if (updatedLocations != regionLocations) { + if (updatedLocations.isEmpty()) { + removed = tableLocations.remove(location.getRegionInfo().getStartKey(), regionLocations); + } else { + removed = tableLocations.replace(location.getRegionInfo().getStartKey(), regionLocations, updatedLocations); + } + if (removed && LOG.isTraceEnabled()) { + LOG.trace("Removed " + location + " from cache"); + } + } } } Modified: hbase/branches/hbase-10070/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java URL: http://svn.apache.org/viewvc/hbase/branches/hbase-10070/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java?rev=1585766&r1=1585765&r2=1585766&view=diff ============================================================================== --- hbase/branches/hbase-10070/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java (original) +++ hbase/branches/hbase-10070/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java Tue Apr 8 15:48:17 2014 @@ -21,6 +21,18 @@ package org.apache.hadoop.hbase.client; +import java.io.IOException; +import java.io.InterruptedIOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.CancellationException; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -34,17 +46,6 @@ import org.apache.hadoop.hbase.protobuf. import org.apache.hadoop.hbase.util.BoundedCompletionService; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; -import java.io.IOException; -import java.io.InterruptedIOException; -import java.util.Collections; -import java.util.List; -import java.util.concurrent.Callable; -import java.util.concurrent.CancellationException; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; - /** * Caller that goes to replica if the primary region does no answer within a configurable * timeout. If the timeout is reached, it calls all the secondary replicas, and returns @@ -104,11 +105,11 @@ public class RpcRetryingCallerWithReadRe } if (reload || location == null) { - RegionLocations rl = getRegionLocations(false); + RegionLocations rl = getRegionLocations(false, id); location = id < rl.size() ? rl.getRegionLocation(id) : null; } - if (location == null) { + if (location == null || location.getServerName() == null) { // With this exception, there will be a retry. The location can be null for a replica // when the table is created or after a split. throw new HBaseIOException("There is no location for replica id #" + id); @@ -170,30 +171,61 @@ public class RpcRetryingCallerWithReadRe */ public synchronized Result call() throws DoNotRetryIOException, InterruptedIOException, RetriesExhaustedException { - RegionLocations rl = getRegionLocations(true); + RegionLocations rl = getRegionLocations(true, RegionReplicaUtil.DEFAULT_REPLICA_ID); BoundedCompletionService cs = new BoundedCompletionService(pool, rl.size()); - addCallsForReplica(cs, rl, 0, 0); // primary. - + List exceptions = null; + int submitted = 0, completed = 0; + // submit call for the primary replica. + submitted += addCallsForReplica(cs, rl, 0, 0); try { + // wait for the timeout to see whether the primary responds back Future f = cs.poll(timeBeforeReplicas, TimeUnit.MICROSECONDS); // Yes, microseconds - if (f == null) { - addCallsForReplica(cs, rl, 1, rl.size() - 1); // secondaries - f = cs.take(); + if (f != null) { + return f.get(); //great we got a response } - return f.get(); } catch (ExecutionException e) { - throwEnrichedException(e); - return null; // unreachable + // the primary call failed with RetriesExhaustedException or DoNotRetryIOException + // but the secondaries might still succeed. Continue on the replica RPCs. + exceptions = new ArrayList(rl.size()); + exceptions.add(e); + completed++; + } catch (CancellationException e) { + throw new InterruptedIOException(); + } catch (InterruptedException e) { + throw new InterruptedIOException(); + } + + // submit call for the all of the secondaries at once + // TODO: this may be an overkill for large region replication + submitted += addCallsForReplica(cs, rl, 1, rl.size() - 1); + try { + while (completed < submitted) { + try { + Future f = cs.take(); + return f.get(); // great we got an answer + } catch (ExecutionException e) { + // if not cancel or interrupt, wait until all RPC's are done + // one of the tasks failed. Save the exception for later. + if (exceptions == null) exceptions = new ArrayList(rl.size()); + exceptions.add(e); + completed++; + } + } } catch (CancellationException e) { throw new InterruptedIOException(); } catch (InterruptedException e) { throw new InterruptedIOException(); } finally { // We get there because we were interrupted or because one or more of the - // calls succeeded or failed. In all case, we stop all our tasks. + // calls succeeded or failed. In all case, we stop all our tasks. cs.cancelAll(true); } + + if (exceptions != null && !exceptions.isEmpty()) { + throwEnrichedException(exceptions.get(0)); // just rethrow the first exception for now. + } + return null; // unreachable } /** @@ -230,8 +262,9 @@ public class RpcRetryingCallerWithReadRe * @param rl - the region locations * @param min - the id of the first replica, inclusive * @param max - the id of the last replica, inclusive. + * @return the number of submitted calls */ - private void addCallsForReplica(BoundedCompletionService cs, + private int addCallsForReplica(BoundedCompletionService cs, RegionLocations rl, int min, int max) { for (int id = min; id <= max; id++) { HRegionLocation hrl = rl.getRegionLocation(id); @@ -239,21 +272,22 @@ public class RpcRetryingCallerWithReadRe RetryingRPC retryingOnReplica = new RetryingRPC(callOnReplica); cs.submit(retryingOnReplica); } + return max - min + 1; } - private RegionLocations getRegionLocations(boolean useCache) - throws RetriesExhaustedException, DoNotRetryIOException { + private RegionLocations getRegionLocations(boolean useCache, int replicaId) + throws RetriesExhaustedException, DoNotRetryIOException, InterruptedIOException { RegionLocations rl; try { - rl = cConnection.locateRegion(tableName, get.getRow(), useCache, true); + rl = cConnection.locateRegion(tableName, get.getRow(), useCache, true, replicaId); + } catch (DoNotRetryIOException e) { + throw e; + } catch (RetriesExhaustedException e) { + throw e; + } catch (InterruptedIOException e) { + throw e; } catch (IOException e) { - if (e instanceof DoNotRetryIOException) { - throw (DoNotRetryIOException) e; - } else if (e instanceof RetriesExhaustedException) { - throw (RetriesExhaustedException) e; - } else { - throw new RetriesExhaustedException("Can't get the location", e); - } + throw new RetriesExhaustedException("Can't get the location", e); } if (rl == null) { throw new RetriesExhaustedException("Can't get the locations"); @@ -261,4 +295,4 @@ public class RpcRetryingCallerWithReadRe return rl; } -} \ No newline at end of file +} Modified: hbase/branches/hbase-10070/hbase-client/src/test/java/org/apache/hadoop/hbase/TestRegionLocations.java URL: http://svn.apache.org/viewvc/hbase/branches/hbase-10070/hbase-client/src/test/java/org/apache/hadoop/hbase/TestRegionLocations.java?rev=1585766&r1=1585765&r2=1585766&view=diff ============================================================================== --- hbase/branches/hbase-10070/hbase-client/src/test/java/org/apache/hadoop/hbase/TestRegionLocations.java (original) +++ hbase/branches/hbase-10070/hbase-client/src/test/java/org/apache/hadoop/hbase/TestRegionLocations.java Tue Apr 8 15:48:17 2014 @@ -46,25 +46,25 @@ public class TestRegionLocations { list = hrll((HRegionLocation)null); assertTrue(list.isEmpty()); - assertEquals(0, list.size()); + assertEquals(1, list.size()); assertEquals(0, list.numNonNullElements()); HRegionInfo info0 = hri(0); list = hrll(hrl(info0, null)); - assertFalse(list.isEmpty()); + assertTrue(list.isEmpty()); assertEquals(1, list.size()); - assertEquals(1, list.numNonNullElements()); + assertEquals(0, list.numNonNullElements()); HRegionInfo info9 = hri(9); list = hrll(hrl(info9, null)); - assertFalse(list.isEmpty()); + assertTrue(list.isEmpty()); assertEquals(10, list.size()); - assertEquals(1, list.numNonNullElements()); + assertEquals(0, list.numNonNullElements()); list = hrll(hrl(info0, null), hrl(info9, null)); - assertFalse(list.isEmpty()); + assertTrue(list.isEmpty()); assertEquals(10, list.size()); - assertEquals(2, list.numNonNullElements()); + assertEquals(0, list.numNonNullElements()); } private HRegionInfo hri(int replicaId) { @@ -100,7 +100,7 @@ public class TestRegionLocations { list = hrll(hrl(info0, sn0)); assertTrue(list == list.removeByServer(sn1)); list = list.removeByServer(sn0); - assertTrue(list.isEmpty()); + assertEquals(0, list.numNonNullElements()); // test remove from multi element list list = hrll(hrl(info0, sn0), hrl(info1, sn1), hrl(info2, sn2), hrl(info9, sn2)); @@ -226,7 +226,7 @@ public class TestRegionLocations { list1 = list2.mergeLocations(list1); assertEquals(sn0, list1.getRegionLocation(0).getServerName()); assertEquals(sn1, list1.getRegionLocation(1).getServerName()); - assertEquals(sn2, list1.getRegionLocation(2).getServerName()); + assertEquals(2, list1.size()); // the size is taken from the argument list to merge // do the other way merge as well list1 = hrll(hrl(info0, sn0), hrl(info1, sn1)); @@ -240,10 +240,9 @@ public class TestRegionLocations { list1 = hrll(hrl(info0, sn0), hrl(info1, sn1)); list2 = hrll(hrl(info0, sn2), hrl(info1, sn2), hrl(info9, sn3)); list1 = list2.mergeLocations(list1); // list1 should override - assertEquals(10, list1.size()); + assertEquals(2, list1.size()); assertEquals(sn0, list1.getRegionLocation(0).getServerName()); assertEquals(sn1, list1.getRegionLocation(1).getServerName()); - assertEquals(sn3, list1.getRegionLocation(9).getServerName()); // do the other way list1 = hrll(hrl(info0, sn0), hrl(info1, sn1)); @@ -272,4 +271,35 @@ public class TestRegionLocations { assertEquals(sn2, list1.getRegionLocation(1).getServerName()); assertEquals(sn3, list1.getRegionLocation(9).getServerName()); } + + @Test + public void testConstructWithNullElements() { + // RegionLocations can contain null elements as well. These null elements can + + RegionLocations list = new RegionLocations((HRegionLocation)null); + assertTrue(list.isEmpty()); + assertEquals(1, list.size()); + assertEquals(0, list.numNonNullElements()); + + list = new RegionLocations(null, hrl(info1, sn0)); + assertFalse(list.isEmpty()); + assertEquals(2, list.size()); + assertEquals(1, list.numNonNullElements()); + + list = new RegionLocations(hrl(info0, sn0), null); + assertEquals(2, list.size()); + assertEquals(1, list.numNonNullElements()); + + list = new RegionLocations(null, hrl(info2, sn0), null, hrl(info9, sn0)); + assertEquals(10, list.size()); + assertEquals(2, list.numNonNullElements()); + + list = new RegionLocations(null, hrl(info2, sn0), null, hrl(info9, sn0), null); + assertEquals(11, list.size()); + assertEquals(2, list.numNonNullElements()); + + list = new RegionLocations(null, hrl(info2, sn0), null, hrl(info9, sn0), null, null); + assertEquals(12, list.size()); + assertEquals(2, list.numNonNullElements()); + } } Modified: hbase/branches/hbase-10070/hbase-common/src/main/java/org/apache/hadoop/hbase/util/BoundedCompletionService.java URL: http://svn.apache.org/viewvc/hbase/branches/hbase-10070/hbase-common/src/main/java/org/apache/hadoop/hbase/util/BoundedCompletionService.java?rev=1585766&r1=1585765&r2=1585766&view=diff ============================================================================== --- hbase/branches/hbase-10070/hbase-common/src/main/java/org/apache/hadoop/hbase/util/BoundedCompletionService.java (original) +++ hbase/branches/hbase-10070/hbase-common/src/main/java/org/apache/hadoop/hbase/util/BoundedCompletionService.java Tue Apr 8 15:48:17 2014 @@ -34,11 +34,12 @@ import java.util.concurrent.TimeUnit; * A completion service, close to the one available in the JDK 1.7 * However, this ones keeps the list of the future, and allows to cancel them all. * This means as well that it can be used for a small set of tasks only. + *
Implementation is not Thread safe. */ public class BoundedCompletionService { private final Executor executor; - private final List> sent; // alls the call we sent - private final BlockingQueue> completed; // all the results we got so far. + private final List> tasks; // alls the tasks + private final BlockingQueue> completed; // all the tasks that are completed class QueueingFuture extends FutureTask { @@ -46,6 +47,7 @@ public class BoundedCompletionService super(callable); } + @Override protected void done() { completed.add(QueueingFuture.this); } @@ -53,7 +55,7 @@ public class BoundedCompletionService public BoundedCompletionService(Executor executor, int maxTasks) { this.executor = executor; - this.sent = new ArrayList>(maxTasks); + this.tasks = new ArrayList>(maxTasks); this.completed = new ArrayBlockingQueue>(maxTasks); } @@ -61,7 +63,7 @@ public class BoundedCompletionService public Future submit(Callable task) { QueueingFuture newFuture = new QueueingFuture(task); executor.execute(newFuture); - sent.add(newFuture); + tasks.add(newFuture); return newFuture; } @@ -74,7 +76,7 @@ public class BoundedCompletionService } public void cancelAll(boolean interrupt) { - for (Future future : sent) { + for (Future future : tasks) { future.cancel(interrupt); } } Modified: hbase/branches/hbase-10070/hbase-common/src/main/java/org/apache/hadoop/hbase/util/Threads.java URL: http://svn.apache.org/viewvc/hbase/branches/hbase-10070/hbase-common/src/main/java/org/apache/hadoop/hbase/util/Threads.java?rev=1585766&r1=1585765&r2=1585766&view=diff ============================================================================== --- hbase/branches/hbase-10070/hbase-common/src/main/java/org/apache/hadoop/hbase/util/Threads.java (original) +++ hbase/branches/hbase-10070/hbase-common/src/main/java/org/apache/hadoop/hbase/util/Threads.java Tue Apr 8 15:48:17 2014 @@ -30,6 +30,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.util.ReflectionUtils; +import org.apache.hadoop.util.StringUtils; /** * Thread Utility @@ -38,6 +39,16 @@ import org.apache.hadoop.util.Reflection public class Threads { protected static final Log LOG = LogFactory.getLog(Threads.class); private static final AtomicInteger poolNumber = new AtomicInteger(1); + + private static UncaughtExceptionHandler LOGGING_EXCEPTION_HANDLER = + new UncaughtExceptionHandler() { + @Override + public void uncaughtException(Thread t, Throwable e) { + LOG.warn("Thread:" + t + " exited with Exception:" + + StringUtils.stringifyException(e)); + } + }; + /** * Utility method that sets name, daemon status and starts passed thread. * @param t thread to run @@ -160,15 +171,15 @@ public class Threads { } /** - * Create a new CachedThreadPool with a bounded number as the maximum + * Create a new CachedThreadPool with a bounded number as the maximum * thread size in the pool. - * + * * @param maxCachedThread the maximum thread could be created in the pool * @param timeout the maximum time to wait * @param unit the time unit of the timeout argument * @param threadFactory the factory to use when creating new threads - * @return threadPoolExecutor the cachedThreadPool with a bounded number - * as the maximum thread size in the pool. + * @return threadPoolExecutor the cachedThreadPool with a bounded number + * as the maximum thread size in the pool. */ public static ThreadPoolExecutor getBoundedCachedThreadPool( int maxCachedThread, long timeout, TimeUnit unit, @@ -180,8 +191,8 @@ public class Threads { boundedCachedThreadPool.allowCoreThreadTimeOut(true); return boundedCachedThreadPool; } - - + + /** * Returns a {@link java.util.concurrent.ThreadFactory} that names each created thread uniquely, * with a common prefix. @@ -230,6 +241,8 @@ public class Threads { Thread t = namedFactory.newThread(r); if (handler != null) { t.setUncaughtExceptionHandler(handler); + } else { + t.setUncaughtExceptionHandler(LOGGING_EXCEPTION_HANDLER); } if (!t.isDaemon()) { t.setDaemon(true); @@ -242,4 +255,11 @@ public class Threads { }; } + + /** Sets an UncaughtExceptionHandler for the thread which logs the + * Exception stack if the thread dies. + */ + public static void setLoggingUncaughtExceptionHandler(Thread t) { + t.setUncaughtExceptionHandler(LOGGING_EXCEPTION_HANDLER); + } } Modified: hbase/branches/hbase-10070/hbase-server/src/main/java/org/apache/hadoop/hbase/client/CoprocessorHConnection.java URL: http://svn.apache.org/viewvc/hbase/branches/hbase-10070/hbase-server/src/main/java/org/apache/hadoop/hbase/client/CoprocessorHConnection.java?rev=1585766&r1=1585765&r2=1585766&view=diff ============================================================================== --- hbase/branches/hbase-10070/hbase-server/src/main/java/org/apache/hadoop/hbase/client/CoprocessorHConnection.java (original) +++ hbase/branches/hbase-10070/hbase-server/src/main/java/org/apache/hadoop/hbase/client/CoprocessorHConnection.java Tue Apr 8 15:48:17 2014 @@ -274,6 +274,12 @@ class CoprocessorHConnection implements } @Override + public HRegionLocation relocateRegion(TableName tableName, byte[] row, int replicaId) + throws IOException { + return delegate.relocateRegion(tableName, row, replicaId); + } + + @Override public HRegionLocation relocateRegion(byte[] tableName, byte[] row) throws IOException { return delegate.relocateRegion(tableName, row); } @@ -331,6 +337,12 @@ class CoprocessorHConnection implements } @Override + public RegionLocations locateRegion(TableName tableName, byte[] row, boolean useCache, + boolean retry, int replicaId) throws IOException { + return delegate.locateRegion(tableName, row, useCache, retry, replicaId); + } + + @Override public List locateRegions(byte[] tableName, boolean useCache, boolean offlined) throws IOException { return delegate.locateRegions(tableName, useCache, offlined); Modified: hbase/branches/hbase-10070/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedReader.java URL: http://svn.apache.org/viewvc/hbase/branches/hbase-10070/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedReader.java?rev=1585766&r1=1585765&r2=1585766&view=diff ============================================================================== --- hbase/branches/hbase-10070/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedReader.java (original) +++ hbase/branches/hbase-10070/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedReader.java Tue Apr 8 15:48:17 2014 @@ -120,7 +120,9 @@ public class MultiThreadedReader extends } protected HBaseReaderThread createReaderThread(int readerId) throws IOException { - return new HBaseReaderThread(readerId); + HBaseReaderThread reader = new HBaseReaderThread(readerId); + Threads.setLoggingUncaughtExceptionHandler(reader); + return reader; } public class HBaseReaderThread extends Thread { Modified: hbase/branches/hbase-10070/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedWriter.java URL: http://svn.apache.org/viewvc/hbase/branches/hbase-10070/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedWriter.java?rev=1585766&r1=1585765&r2=1585766&view=diff ============================================================================== --- hbase/branches/hbase-10070/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedWriter.java (original) +++ hbase/branches/hbase-10070/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedWriter.java Tue Apr 8 15:48:17 2014 @@ -73,6 +73,7 @@ public class MultiThreadedWriter extends protected void createWriterThreads(int numThreads) throws IOException { for (int i = 0; i < numThreads; ++i) { HBaseWriterThread writer = new HBaseWriterThread(i); + Threads.setLoggingUncaughtExceptionHandler(writer); writers.add(writer); } } @@ -89,6 +90,7 @@ public class MultiThreadedWriter extends return new HTable(conf, tableName); } + @Override public void run() { try { long rowKeyBase; Modified: hbase/branches/hbase-10070/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedWriterBase.java URL: http://svn.apache.org/viewvc/hbase/branches/hbase-10070/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedWriterBase.java?rev=1585766&r1=1585765&r2=1585766&view=diff ============================================================================== --- hbase/branches/hbase-10070/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedWriterBase.java (original) +++ hbase/branches/hbase-10070/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedWriterBase.java Tue Apr 8 15:48:17 2014 @@ -101,8 +101,8 @@ public abstract class MultiThreadedWrite if (cached != null) { result = "cached: " + cached.toString(); } - if (real != null) { - if (real.equals(cached)) { + if (real != null && real.getServerName() != null) { + if (cached != null && cached.getServerName() != null && real.equals(cached)) { result += "; cache is up to date"; } else { result = (cached != null) ? (result + "; ") : "";