hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From nkey...@apache.org
Subject svn commit: r1543051 - /hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java
Date Mon, 18 Nov 2013 15:02:43 GMT
Author: nkeywal
Date: Mon Nov 18 15:02:43 2013
New Revision: 1543051

URL: http://svn.apache.org/r1543051
Log:
HBASE-9987 Remove some synchronisation points in HConnectionManager

Modified:
    hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java

Modified: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java?rev=1543051&r1=1543050&r2=1543051&view=diff
==============================================================================
--- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java
(original)
+++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java
Mon Nov 18 15:02:43 2013
@@ -25,7 +25,6 @@ import java.lang.reflect.UndeclaredThrow
 import java.net.SocketException;
 import java.util.ArrayList;
 import java.util.Date;
-import java.util.HashMap;
 import java.util.HashSet;
 import java.util.LinkedHashMap;
 import java.util.List;
@@ -36,6 +35,7 @@ import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.ConcurrentSkipListSet;
 import java.util.concurrent.CopyOnWriteArraySet;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.LinkedBlockingQueue;
@@ -606,16 +606,16 @@ public class HConnectionManager {
     /**
       * Map of table to table {@link HRegionLocation}s.
       */
-    private final Map<TableName, ConcurrentSkipListMap<byte[], HRegionLocation>>
+    private final ConcurrentMap<TableName, ConcurrentSkipListMap<byte[], HRegionLocation>>
         cachedRegionLocations =
-      new HashMap<TableName, ConcurrentSkipListMap<byte[], HRegionLocation>>();
+      new ConcurrentHashMap<TableName, ConcurrentSkipListMap<byte[], HRegionLocation>>();
 
     // The presence of a server in the map implies it's likely that there is an
     // entry in cachedRegionLocations that map to this server; but the absence
     // of a server in this map guarentees that there is no entry in cache that
     // maps to the absent server.
     // The access to this attribute must be protected by a lock on cachedRegionLocations
-    private final Set<ServerName> cachedServers = new HashSet<ServerName>();
+    private final Set<ServerName> cachedServers = new ConcurrentSkipListSet<ServerName>();
 
     // region cache prefetch is enabled by default. this set contains all
     // tables whose region cache prefetch are disabled.
@@ -1352,14 +1352,12 @@ public class HConnectionManager {
      */
     void forceDeleteCachedLocation(final TableName tableName, final byte [] row) {
       HRegionLocation rl = null;
-      synchronized (this.cachedRegionLocations) {
-        Map<byte[], HRegionLocation> tableLocations = getTableLocations(tableName);
-        // start to examine the cache. we can only do cache actions
-        // if there's something in the cache for this table.
-        rl = getCachedLocation(tableName, row);
-        if (rl != null) {
-          tableLocations.remove(rl.getRegionInfo().getStartKey());
-        }
+      Map<byte[], HRegionLocation> tableLocations = getTableLocations(tableName);
+      // start to examine the cache. we can only do cache actions
+      // if there's something in the cache for this table.
+      rl = getCachedLocation(tableName, row);
+      if (rl != null) {
+        tableLocations.remove(rl.getRegionInfo().getStartKey());
       }
       if ((rl != null) && LOG.isDebugEnabled()) {
         LOG.debug("Removed " + rl.getHostname() + ":" + rl.getPort()
@@ -1372,14 +1370,21 @@ public class HConnectionManager {
      * Delete all cached entries of a table that maps to a specific location.
      */
     @Override
-    public void clearCaches(final ServerName serverName){
+    public void clearCaches(final ServerName serverName) {
+      if (!this.cachedServers.contains(serverName)) {
+        return;
+      }
+
       boolean deletedSomething = false;
-      synchronized (this.cachedRegionLocations) {
-        if (!cachedServers.contains(serverName)) {
+      synchronized (this.cachedServers) {
+        // We block here, because if there is an error on a server, it's likely that multiple
+        //  threads will get the error  simultaneously. If there are hundreds of thousand
of
+        //  region location to check, it's better to do this only once. A better pattern
would
+        //  be to check if the server is dead when we get the region location.
+        if (!this.cachedServers.contains(serverName)) {
           return;
         }
-        for (Map<byte[], HRegionLocation> tableLocations :
-            cachedRegionLocations.values()) {
+        for (Map<byte[], HRegionLocation> tableLocations : cachedRegionLocations.values())
{
           for (Entry<byte[], HRegionLocation> e : tableLocations.entrySet()) {
             HRegionLocation value = e.getValue();
             if (value != null
@@ -1389,7 +1394,7 @@ public class HConnectionManager {
             }
           }
         }
-        cachedServers.remove(serverName);
+        this.cachedServers.remove(serverName);
       }
       if (deletedSomething && LOG.isDebugEnabled()) {
         LOG.debug("Removed all cached region locations that map to " + serverName);
@@ -1404,12 +1409,14 @@ public class HConnectionManager {
         final TableName tableName) {
       // find the map of cached locations for this table
       ConcurrentSkipListMap<byte[], HRegionLocation> result;
-      synchronized (this.cachedRegionLocations) {
-        result = this.cachedRegionLocations.get(tableName);
-        // if tableLocations for this table isn't built yet, make one
-        if (result == null) {
-          result = new ConcurrentSkipListMap<byte[], HRegionLocation>(Bytes.BYTES_COMPARATOR);
-          this.cachedRegionLocations.put(tableName, result);
+      result = this.cachedRegionLocations.get(tableName);
+      // if tableLocations for this table isn't built yet, make one
+      if (result == null) {
+        result = new ConcurrentSkipListMap<byte[], HRegionLocation>(Bytes.BYTES_COMPARATOR);
+        ConcurrentSkipListMap<byte[], HRegionLocation> old =
+            this.cachedRegionLocations.putIfAbsent(tableName, result);
+        if (old != null) {
+          return old;
         }
       }
       return result;
@@ -1417,17 +1424,13 @@ public class HConnectionManager {
 
     @Override
     public void clearRegionCache() {
-      synchronized(this.cachedRegionLocations) {
-        this.cachedRegionLocations.clear();
-        this.cachedServers.clear();
-      }
+      this.cachedRegionLocations.clear();
+      this.cachedServers.clear();
     }
 
     @Override
     public void clearRegionCache(final TableName tableName) {
-      synchronized (this.cachedRegionLocations) {
-        this.cachedRegionLocations.remove(tableName);
-      }
+      this.cachedRegionLocations.remove(tableName);
     }
 
     @Override
@@ -1445,40 +1448,34 @@ public class HConnectionManager {
         final HRegionLocation location) {
       boolean isFromMeta = (source == null);
       byte [] startKey = location.getRegionInfo().getStartKey();
-      ConcurrentMap<byte[], HRegionLocation> tableLocations =
-        getTableLocations(tableName);
-      boolean isNewCacheEntry = false;
-      boolean isStaleUpdate = false;
-      HRegionLocation oldLocation = null;
-      synchronized (this.cachedRegionLocations) {
-        oldLocation = tableLocations.putIfAbsent(startKey, location);
-        isNewCacheEntry = (oldLocation == null);
-        if (isNewCacheEntry){
-          cachedServers.add(location.getServerName());
-          return;
-        }
-        boolean updateCache;
-        // If the server in cache sends us a redirect, assume it's always valid.
-        if (oldLocation.equals(source)) {
-          updateCache = true;
-        } else {
-          long newLocationSeqNum = location.getSeqNum();
-          // Meta record is stale - some (probably the same) server has closed the region
-          // with later seqNum and told us about the new location.
-          boolean isStaleMetaRecord = isFromMeta && (oldLocation.getSeqNum() >
newLocationSeqNum);
-          // Same as above for redirect. However, in this case, if the number is equal to
previous
-          // record, the most common case is that first the region was closed with seqNum,
and then
-          // opened with the same seqNum; hence we will ignore the redirect.
-          // There are so many corner cases with various combinations of opens and closes
that
-          // an additional counter on top of seqNum would be necessary to handle them all.
-          boolean isStaleRedirect = !isFromMeta && (oldLocation.getSeqNum() >=
newLocationSeqNum);
-          isStaleUpdate = (isStaleMetaRecord || isStaleRedirect);
-          updateCache = (!isStaleUpdate);
-        }
-        if (updateCache) {
-          tableLocations.put(startKey, location);
-          cachedServers.add(location.getServerName());
-        }
+      ConcurrentMap<byte[], HRegionLocation> tableLocations = getTableLocations(tableName);
+      HRegionLocation oldLocation = tableLocations.putIfAbsent(startKey, location);
+      boolean isNewCacheEntry = (oldLocation == null);
+      if (isNewCacheEntry) {
+        cachedServers.add(location.getServerName());
+        return;
+      }
+      boolean updateCache;
+      // If the server in cache sends us a redirect, assume it's always valid.
+      if (oldLocation.equals(source)) {
+        updateCache = true;
+      } else {
+        long newLocationSeqNum = location.getSeqNum();
+        // Meta record is stale - some (probably the same) server has closed the region
+        // with later seqNum and told us about the new location.
+        boolean isStaleMetaRecord = isFromMeta && (oldLocation.getSeqNum() > newLocationSeqNum);
+        // Same as above for redirect. However, in this case, if the number is equal to previous
+        // record, the most common case is that first the region was closed with seqNum,
and then
+        // opened with the same seqNum; hence we will ignore the redirect.
+        // There are so many corner cases with various combinations of opens and closes that
+        // an additional counter on top of seqNum would be necessary to handle them all.
+        boolean isStaleRedirect = !isFromMeta && (oldLocation.getSeqNum() >= newLocationSeqNum);
+        boolean isStaleUpdate = (isStaleMetaRecord || isStaleRedirect);
+        updateCache = (!isStaleUpdate);
+      }
+      if (updateCache) {
+        tableLocations.replace(startKey, oldLocation, location);
+        cachedServers.add(location.getServerName());
       }
     }
 
@@ -2194,12 +2191,10 @@ public class HConnectionManager {
     * @param hri The region in question.
     * @param source The source of the error that prompts us to invalidate cache.
     */
-    void deleteCachedLocation(HRegionInfo hri, HRegionLocation source) {
-      synchronized (this.cachedRegionLocations) {
-        ConcurrentMap<byte[], HRegionLocation> tableLocations = getTableLocations(hri.getTable());
-        tableLocations.remove(hri.getStartKey(), source);
-      }
-    }
+   void deleteCachedLocation(HRegionInfo hri, HRegionLocation source) {
+     ConcurrentMap<byte[], HRegionLocation> tableLocations = getTableLocations(hri.getTable());
+     tableLocations.remove(hri.getStartKey(), source);
+   }
 
     @Override
     public void deleteCachedRegionLocation(final HRegionLocation location) {
@@ -2209,10 +2204,8 @@ public class HConnectionManager {
 
       HRegionLocation removedLocation;
       TableName tableName = location.getRegionInfo().getTable();
-      synchronized (this.cachedRegionLocations) {
-        Map<byte[], HRegionLocation> tableLocations = getTableLocations(tableName);
-        removedLocation = tableLocations.remove(location.getRegionInfo().getStartKey());
-      }
+      Map<byte[], HRegionLocation> tableLocations = getTableLocations(tableName);
+      removedLocation = tableLocations.remove(location.getRegionInfo().getStartKey());
       if (LOG.isDebugEnabled() && removedLocation != null) {
         LOG.debug("Removed " +
             location.getRegionInfo().getRegionNameAsString() +
@@ -2405,13 +2398,11 @@ public class HConnectionManager {
      * from a unit test.
      */
     int getNumberOfCachedRegionLocations(final TableName tableName) {
-      synchronized (this.cachedRegionLocations) {
-        Map<byte[], HRegionLocation> tableLocs = this.cachedRegionLocations.get(tableName);
-        if (tableLocs == null) {
-          return 0;
-        }
-        return tableLocs.values().size();
+      Map<byte[], HRegionLocation> tableLocs = this.cachedRegionLocations.get(tableName);
+      if (tableLocs == null) {
+        return 0;
       }
+      return tableLocs.values().size();
     }
 
     /**



Mime
View raw message