hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From te...@apache.org
Subject hbase git commit: HBASE-18005 read replica: handle the case that region server hosting both primary replica and meta region is down (huaxiang sun)
Date Fri, 02 Jun 2017 16:30:00 GMT
Repository: hbase
Updated Branches:
  refs/heads/master 549171465 -> ef46debde


HBASE-18005 read replica: handle the case that region server hosting both primary replica
and meta region is down (huaxiang sun)


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/ef46debd
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/ef46debd
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/ef46debd

Branch: refs/heads/master
Commit: ef46debde8b5408b57bd0eeec4f1fb292c69d995
Parents: 5491714
Author: tedyu <yuzhihong@gmail.com>
Authored: Fri Jun 2 09:29:51 2017 -0700
Committer: tedyu <yuzhihong@gmail.com>
Committed: Fri Jun 2 09:29:51 2017 -0700

----------------------------------------------------------------------
 .../hbase/client/ConnectionImplementation.java  |   3 +-
 .../apache/hadoop/hbase/client/MetaCache.java   |  34 ++++
 .../RpcRetryingCallerWithReadReplicas.java      |  78 ++++++---
 .../client/ScannerCallableWithReplicas.java     |  24 ++-
 .../hbase/client/TestReplicaWithCluster.java    | 158 +++++++++++++++++--
 5 files changed, 257 insertions(+), 40 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/ef46debd/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java
index ed69a98..71b0bb3 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java
@@ -826,7 +826,8 @@ class ConnectionImplementation implements ClusterConnection, Closeable
{
       } else {
         // If we are not supposed to be using the cache, delete any existing cached location
         // so it won't interfere.
-        metaCache.clearCache(tableName, row);
+        // We are only supposed to clean the cache for the specific replicaId
+        metaCache.clearCache(tableName, row, replicaId);
       }
 
       // Query the meta region

http://git-wip-us.apache.org/repos/asf/hbase/blob/ef46debd/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetaCache.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetaCache.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetaCache.java
index 14e0afd..c3b00b5 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetaCache.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetaCache.java
@@ -313,6 +313,40 @@ public class MetaCache {
   }
 
   /**
+   * Delete a cached location with specific replicaId.
+   * @param tableName tableName
+   * @param row row key
+   * @param replicaId region replica id
+   */
+  public void clearCache(final TableName tableName, final byte [] row, int replicaId) {
+    ConcurrentMap<byte[], RegionLocations> tableLocations = getTableLocations(tableName);
+
+    RegionLocations regionLocations = getCachedLocation(tableName, row);
+    if (regionLocations != null) {
+      HRegionLocation toBeRemoved = regionLocations.getRegionLocation(replicaId);
+      if (toBeRemoved != null) {
+        RegionLocations updatedLocations = regionLocations.remove(replicaId);
+        byte[] startKey = regionLocations.getRegionLocation().getRegionInfo().getStartKey();
+        boolean removed;
+        if (updatedLocations.isEmpty()) {
+          removed = tableLocations.remove(startKey, regionLocations);
+        } else {
+          removed = tableLocations.replace(startKey, regionLocations, updatedLocations);
+        }
+
+        if (removed) {
+          if (metrics != null) {
+            metrics.incrMetaCacheNumClearRegion();
+          }
+          if (LOG.isTraceEnabled()) {
+            LOG.trace("Removed " + toBeRemoved + " from cache");
+          }
+        }
+      }
+    }
+  }
+
+  /**
    * Delete a cached location for a table, row and server
    */
   public void clearCache(final TableName tableName, final byte [] row, ServerName serverName)
{

http://git-wip-us.apache.org/repos/asf/hbase/blob/ef46debd/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java
index 91c6344..b5cddde 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java
@@ -170,9 +170,36 @@ public class RpcRetryingCallerWithReadReplicas {
       throws DoNotRetryIOException, InterruptedIOException, RetriesExhaustedException {
     boolean isTargetReplicaSpecified = (get.getReplicaId() >= 0);
 
-    RegionLocations rl = getRegionLocations(true, (isTargetReplicaSpecified ? get.getReplicaId()
-        : RegionReplicaUtil.DEFAULT_REPLICA_ID), cConnection, tableName, get.getRow());
-   final ResultBoundedCompletionService<Result> cs =
+    RegionLocations rl = null;
+    boolean skipPrimary = false;
+    try {
+      rl = getRegionLocations(true,
+        (isTargetReplicaSpecified ? get.getReplicaId() : RegionReplicaUtil.DEFAULT_REPLICA_ID),
+        cConnection, tableName, get.getRow());
+    } catch (RetriesExhaustedException | DoNotRetryIOException e) {
+      // When there is no specific replica id specified. It just needs to load all replicas.
+      if (isTargetReplicaSpecified) {
+        throw e;
+      } else {
+        // We cannot get the primary replica location, it is possible that the region
+        // server hosting meta is down, it needs to proceed to try cached replicas.
+        if (cConnection instanceof ConnectionImplementation) {
+          rl = ((ConnectionImplementation)cConnection).getCachedLocation(tableName, get.getRow());
+          if (rl == null) {
+            // No cached locations
+            throw e;
+          }
+
+          // Primary replica location is not known, skip primary replica
+          skipPrimary = true;
+        } else {
+          // For completeness
+          throw e;
+        }
+      }
+    }
+
+    final ResultBoundedCompletionService<Result> cs =
         new ResultBoundedCompletionService<>(this.rpcRetryingCallerFactory, pool, rl.size());
     int startIndex = 0;
     int endIndex = rl.size();
@@ -181,25 +208,30 @@ public class RpcRetryingCallerWithReadReplicas {
       addCallsForReplica(cs, rl, get.getReplicaId(), get.getReplicaId());
       endIndex = 1;
     } else {
-      addCallsForReplica(cs, rl, 0, 0);
-      try {
-        // wait for the timeout to see whether the primary responds back
-        Future<Result> f = cs.poll(timeBeforeReplicas, TimeUnit.MICROSECONDS); // Yes,
microseconds
-        if (f != null) {
-          return f.get(); //great we got a response
-        }
-      } catch (ExecutionException e) {
-        // We ignore the ExecutionException and continue with the secondary replicas
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Primary replica returns " + e.getCause());
-        }
+      if (!skipPrimary) {
+        addCallsForReplica(cs, rl, 0, 0);
+        try {
+          // wait for the timeout to see whether the primary responds back
+          Future<Result> f = cs.poll(timeBeforeReplicas, TimeUnit.MICROSECONDS); //
Yes, microseconds
+          if (f != null) {
+            return f.get(); //great we got a response
+          }
+        } catch (ExecutionException e) {
+          // We ignore the ExecutionException and continue with the secondary replicas
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Primary replica returns " + e.getCause());
+          }
 
-        // Skip the result from the primary as we know that there is something wrong
-        startIndex = 1;
-      } catch (CancellationException e) {
-        throw new InterruptedIOException();
-      } catch (InterruptedException e) {
-        throw new InterruptedIOException();
+          // Skip the result from the primary as we know that there is something wrong
+          startIndex = 1;
+        } catch (CancellationException e) {
+          throw new InterruptedIOException();
+        } catch (InterruptedException e) {
+          throw new InterruptedIOException();
+        }
+      } else {
+        // Since primary replica is skipped, the endIndex needs to be adjusted accordingly
+        endIndex --;
       }
 
       // submit call for the all of the secondaries at once
@@ -288,10 +320,10 @@ public class RpcRetryingCallerWithReadReplicas {
     } catch (DoNotRetryIOException | InterruptedIOException | RetriesExhaustedException e)
{
       throw e;
     } catch (IOException e) {
-      throw new RetriesExhaustedException("Can't get the location", e);
+      throw new RetriesExhaustedException("Can't get the location for replica " + replicaId,
e);
     }
     if (rl == null) {
-      throw new RetriesExhaustedException("Can't get the locations");
+      throw new RetriesExhaustedException("Can't get the location for replica " + replicaId);
     }
 
     return rl;

http://git-wip-us.apache.org/repos/asf/hbase/blob/ef46debd/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java
index 6b6acf0..bcd5d21 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java
@@ -35,6 +35,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.DoNotRetryIOException;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.RegionLocations;
 import org.apache.hadoop.hbase.TableName;
@@ -142,10 +143,25 @@ class ScannerCallableWithReplicas implements RetryingCallable<Result[]>
{
     //2. We should close the "losing" scanners (scanners other than the ones we hear back
     //   from first)
     //
-    RegionLocations rl = RpcRetryingCallerWithReadReplicas.getRegionLocations(true,
-        RegionReplicaUtil.DEFAULT_REPLICA_ID, cConnection, tableName,
-        currentScannerCallable.getRow());
-
+    RegionLocations rl = null;
+    try {
+      rl = RpcRetryingCallerWithReadReplicas.getRegionLocations(true,
+          RegionReplicaUtil.DEFAULT_REPLICA_ID, cConnection, tableName,
+          currentScannerCallable.getRow());
+    } catch (RetriesExhaustedException | DoNotRetryIOException e) {
+      // We cannot get the primary replica region location, it is possible that the region
server
+      // hosting meta table is down, it needs to proceed to try cached replicas directly.
+      if (cConnection instanceof ConnectionImplementation) {
+        rl = ((ConnectionImplementation) cConnection)
+            .getCachedLocation(tableName, currentScannerCallable.getRow());
+        if (rl == null) {
+          throw e;
+        }
+      } else {
+        // For completeness
+        throw e;
+      }
+    }
     // allocate a boundedcompletion pool of some multiple of number of replicas.
     // We want to accomodate some RPCs for redundant replica scans (but are still in progress)
     ResultBoundedCompletionService<Pair<Result[], ScannerCallable>> cs =

http://git-wip-us.apache.org/repos/asf/hbase/blob/ef46debd/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicaWithCluster.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicaWithCluster.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicaWithCluster.java
index 1ad980c..41c5a24 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicaWithCluster.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicaWithCluster.java
@@ -40,6 +40,8 @@ import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.RegionLocations;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.Waiter;
 
 import org.apache.hadoop.hbase.client.replication.ReplicationAdmin;
@@ -162,8 +164,28 @@ public class TestReplicaWithCluster {
   /**
    * This copro is used to slow down the primary meta region scan a bit
    */
-  public static class RegionServerHostingPrimayMetaRegionSlowCopro implements RegionObserver
{
+  public static class RegionServerHostingPrimayMetaRegionSlowOrStopCopro implements RegionObserver
{
     static boolean slowDownPrimaryMetaScan = false;
+    static boolean throwException = false;
+
+    @Override
+    public void preGetOp(final ObserverContext<RegionCoprocessorEnvironment> e,
+        final Get get, final List<Cell> results) throws IOException {
+
+      int replicaId = e.getEnvironment().getRegion().getRegionInfo().getReplicaId();
+
+      // Fail for the primary replica, but not for meta
+      if (throwException) {
+        if (!e.getEnvironment().getRegion().getRegionInfo().isMetaRegion() && (replicaId
== 0)) {
+          LOG.info("Get, throw Region Server Stopped Exceptoin for region " + e.getEnvironment()
+              .getRegion().getRegionInfo());
+          throw new RegionServerStoppedException("Server " +
+              e.getEnvironment().getRegionServerServices().getServerName() + " not running");
+        }
+      } else {
+        LOG.info("Get, We're replica region " + replicaId);
+      }
+    }
 
     @Override
     public RegionScanner preScannerOpen(final ObserverContext<RegionCoprocessorEnvironment>
e,
@@ -172,21 +194,34 @@ public class TestReplicaWithCluster {
       int replicaId = e.getEnvironment().getRegion().getRegionInfo().getReplicaId();
 
       // Slow down with the primary meta region scan
-      if (slowDownPrimaryMetaScan && (e.getEnvironment().getRegion().getRegionInfo().isMetaRegion()
-          && (replicaId == 0))) {
-        LOG.info("Scan with primary meta region, slow down a bit");
-        try {
-          Thread.sleep(META_SCAN_TIMEOUT_IN_MILLISEC - 50);
-        } catch (InterruptedException ie) {
-          // Ingore
+      if (e.getEnvironment().getRegion().getRegionInfo().isMetaRegion() && (replicaId
== 0)) {
+        if (slowDownPrimaryMetaScan) {
+          LOG.info("Scan with primary meta region, slow down a bit");
+          try {
+            Thread.sleep(META_SCAN_TIMEOUT_IN_MILLISEC - 50);
+          } catch (InterruptedException ie) {
+            // Ingore
+          }
         }
 
+        // Fail for the primary replica
+        if (throwException) {
+          LOG.info("Scan, throw Region Server Stopped Exceptoin for replica " + e.getEnvironment()
+              .getRegion().getRegionInfo());
+
+          throw new RegionServerStoppedException("Server " +
+              e.getEnvironment().getRegionServerServices().getServerName() + " not running");
+        } else {
+          LOG.info("Scan, We're replica region " + replicaId);
+        }
+      } else {
+        LOG.info("Scan, We're replica region " + replicaId);
       }
+
       return null;
     }
   }
 
-
   @BeforeClass
   public static void beforeClass() throws Exception {
     // enable store file refreshing
@@ -216,7 +251,7 @@ public class TestReplicaWithCluster {
 
     // Set system coprocessor so it can be applied to meta regions
     HTU.getConfiguration().set("hbase.coprocessor.region.classes",
-        RegionServerHostingPrimayMetaRegionSlowCopro.class.getName());
+        RegionServerHostingPrimayMetaRegionSlowOrStopCopro.class.getName());
 
     HTU.getConfiguration().setInt(HConstants.HBASE_CLIENT_MEAT_REPLICA_SCAN_TIMEOUT,
         META_SCAN_TIMEOUT_IN_MILLISEC * 1000);
@@ -630,18 +665,117 @@ public class TestReplicaWithCluster {
 
       HTU.createTable(hdt, new byte[][] { f }, null);
 
-      RegionServerHostingPrimayMetaRegionSlowCopro.slowDownPrimaryMetaScan = true;
+      RegionServerHostingPrimayMetaRegionSlowOrStopCopro.slowDownPrimaryMetaScan = true;
 
       // Get user table location, always get it from the primary meta replica
       RegionLocations url = ((ClusterConnection) HTU.getConnection())
           .locateRegion(hdt.getTableName(), row, false, false);
 
     } finally {
-      RegionServerHostingPrimayMetaRegionSlowCopro.slowDownPrimaryMetaScan = false;
+      RegionServerHostingPrimayMetaRegionSlowOrStopCopro.slowDownPrimaryMetaScan = false;
       ((ConnectionImplementation) HTU.getAdmin().getConnection()).setUseMetaReplicas(false);
       HTU.getAdmin().setBalancerRunning(true, true);
       HTU.getAdmin().disableTable(hdt.getTableName());
       HTU.deleteTable(hdt.getTableName());
     }
   }
+
+
+  // This test is to simulate the case that the meta region and the primary user region
+  // are down, hbase client is able to access user replica regions and return stale data.
+  // Meta replica is enabled to show the case that the meta replica region could be out of
sync
+  // with the primary meta region.
+  @Test
+  public void testReplicaGetWithPrimaryAndMetaDown() throws IOException, InterruptedException
{
+    HTU.getAdmin().setBalancerRunning(false, true);
+
+    ((ConnectionImplementation)HTU.getAdmin().getConnection()).setUseMetaReplicas(true);
+
+    // Create table then get the single region for our new table.
+    HTableDescriptor hdt = HTU.createTableDescriptor("testReplicaGetWithPrimaryAndMetaDown");
+    hdt.setRegionReplication(2);
+    try {
+
+      Table table = HTU.createTable(hdt, new byte[][] { f }, null);
+
+      // Get Meta location
+      RegionLocations mrl = ((ClusterConnection) HTU.getConnection())
+          .locateRegion(TableName.META_TABLE_NAME,
+              HConstants.EMPTY_START_ROW, false, false);
+
+      // Get user table location
+      RegionLocations url = ((ClusterConnection) HTU.getConnection())
+          .locateRegion(hdt.getTableName(), row, false, false);
+
+      // Make sure that user primary region is co-hosted with the meta region
+      if (!url.getDefaultRegionLocation().getServerName().equals(
+          mrl.getDefaultRegionLocation().getServerName())) {
+        HTU.moveRegionAndWait(url.getDefaultRegionLocation().getRegionInfo(),
+            mrl.getDefaultRegionLocation().getServerName());
+      }
+
+      // Make sure that the user replica region is not hosted by the same region server with
+      // primary
+      if (url.getRegionLocation(1).getServerName().equals(mrl.getDefaultRegionLocation()
+          .getServerName())) {
+        HTU.moveRegionAndWait(url.getRegionLocation(1).getRegionInfo(),
+            url.getDefaultRegionLocation().getServerName());
+      }
+
+      // Wait until the meta table is updated with new location info
+      while (true) {
+        mrl = ((ClusterConnection) HTU.getConnection())
+            .locateRegion(TableName.META_TABLE_NAME, HConstants.EMPTY_START_ROW, false, false);
+
+        // Get user table location
+        url = ((ClusterConnection) HTU.getConnection())
+            .locateRegion(hdt.getTableName(), row, false, true);
+
+        LOG.info("meta locations " + mrl);
+        LOG.info("table locations " + url);
+        ServerName a = url.getDefaultRegionLocation().getServerName();
+        ServerName b = mrl.getDefaultRegionLocation().getServerName();
+        if(a.equals(b)) {
+          break;
+        } else {
+          LOG.info("Waiting for new region info to be updated in meta table");
+          Thread.sleep(100);
+        }
+      }
+
+      Put p = new Put(row);
+      p.addColumn(f, row, row);
+      table.put(p);
+
+      // Flush so it can be picked by the replica refresher thread
+      HTU.flush(table.getName());
+
+      // Sleep for some time until data is picked up by replicas
+      try {
+        Thread.sleep(2 * REFRESH_PERIOD);
+      } catch (InterruptedException e1) {
+        LOG.error(e1);
+      }
+
+      // Simulating the RS down
+      RegionServerHostingPrimayMetaRegionSlowOrStopCopro.throwException = true;
+
+      // The first Get is supposed to succeed
+      Get g = new Get(row);
+      g.setConsistency(Consistency.TIMELINE);
+      Result r = table.get(g);
+      Assert.assertTrue(r.isStale());
+
+      // The second Get will succeed as well
+      r = table.get(g);
+      Assert.assertTrue(r.isStale());
+
+    } finally {
+      ((ConnectionImplementation)HTU.getAdmin().getConnection()).setUseMetaReplicas(false);
+      RegionServerHostingPrimayMetaRegionSlowOrStopCopro.throwException = false;
+      HTU.getAdmin().setBalancerRunning(true, true);
+      HTU.getAdmin().disableTable(hdt.getTableName());
+      HTU.deleteTable(hdt.getTableName());
+    }
+  }
 }


Mime
View raw message