hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ecl...@apache.org
Subject git commit: HBASE-12198 Fix the bug of not updating location cache
Date Thu, 09 Oct 2014 17:17:21 GMT
Repository: hbase
Updated Branches:
  refs/heads/master 2c07372c2 -> 3c1fbd2dd


HBASE-12198 Fix the bug of not updating location cache

Summary:  # Clear the cache of the server when failed

Test Plan: Add testcase `TestHTableMultiplexerFlushCache` to reproduce the bug.

Differential Revision: https://reviews.facebook.net/D24603

Signed-off-by: Elliott Clark <elliott@fb.com>
Signed-off-by: Elliott Clark <eclark@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/3c1fbd2d
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/3c1fbd2d
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/3c1fbd2d

Branch: refs/heads/master
Commit: 3c1fbd2ddfb004db875db92feeec44b78dd60adc
Parents: 2c07372
Author: Yi Deng <daviddengcn@gmail.com>
Authored: Tue Oct 7 17:23:11 2014 -0700
Committer: Elliott Clark <eclark@apache.org>
Committed: Thu Oct 9 10:09:03 2014 -0700

----------------------------------------------------------------------
 .../hadoop/hbase/client/AsyncProcess.java       |   8 +-
 .../hadoop/hbase/client/HTableMultiplexer.java  |  47 ++++----
 .../client/TestHTableMultiplexerFlushCache.java | 115 +++++++++++++++++++
 3 files changed, 148 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/3c1fbd2d/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
index 2dbe263..3d40dd5 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
@@ -1013,6 +1013,10 @@ class AsyncProcess {
       Retry canRetry = errorsByServer.canRetryMore(numAttempt)
           ? Retry.YES : Retry.NO_RETRIES_EXHAUSTED;
 
+      if (tableName == null) {
+        // tableName is null when we made a cross-table RPC call.
+        hConnection.clearCaches(server);
+      }
       int failed = 0, stopped = 0;
       List<Action<Row>> toReplay = new ArrayList<Action<Row>>();
       for (Map.Entry<byte[], List<Action<Row>>> e : rsActions.actions.entrySet())
{
@@ -1021,7 +1025,9 @@ class AsyncProcess {
         // Do not use the exception for updating cache because it might be coming from
         // any of the regions in the MultiAction.
         // TODO: depending on type of exception we might not want to update cache at all?
-        hConnection.updateCachedLocations(tableName, regionName, row, null, server);
+        if (tableName != null) {
+          hConnection.updateCachedLocations(tableName, regionName, row, null, server);
+        }
         for (Action<Row> action : e.getValue()) {
           Retry retry = manageError(
               action.getOriginalIndex(), action.getAction(), canRetry, t, server);

http://git-wip-us.apache.org/repos/asf/hbase/blob/3c1fbd2d/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java
index fae0a94..8d0fbc8 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java
@@ -38,6 +38,7 @@ import java.util.concurrent.atomic.AtomicLong;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HRegionLocation;
@@ -46,6 +47,7 @@ import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.classification.InterfaceStability;
 import org.apache.hadoop.hbase.client.AsyncProcess.AsyncRequestFuture;
+import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
@@ -79,7 +81,7 @@ public class HTableMultiplexer {
   private final Map<HRegionLocation, FlushWorker> serverToFlushWorkerMap =
       new ConcurrentHashMap<>();
 
-  private final Configuration conf;
+  private final Configuration workerConf;
   private final ClusterConnection conn;
   private final ExecutorService pool;
   private final int retryNum;
@@ -95,10 +97,9 @@ public class HTableMultiplexer {
    */
   public HTableMultiplexer(Configuration conf, int perRegionServerBufferQueueSize)
       throws IOException {
-    this.conf = conf;
     this.conn = (ClusterConnection) ConnectionFactory.createConnection(conf);
     this.pool = HTable.getDefaultExecutor(conf);
-    this.retryNum = this.conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
+    this.retryNum = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
         HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
     this.perRegionServerBufferQueueSize = perRegionServerBufferQueueSize;
     this.maxKeyValueSize = HTable.getMaxKeyValueSize(conf);
@@ -107,6 +108,11 @@ public class HTableMultiplexer {
     this.executor =
         Executors.newScheduledThreadPool(initThreads,
           new ThreadFactoryBuilder().setDaemon(true).setNameFormat("HTableFlushWorker-%d").build());
+
+    this.workerConf = HBaseConfiguration.create(conf);
+    // We do not do the retry because we need to reassign puts to different queues if regions
are
+    // moved.
+    this.workerConf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 0);
   }
 
   /**
@@ -218,7 +224,7 @@ public class HTableMultiplexer {
         worker = serverToFlushWorkerMap.get(addr);
         if (worker == null) {
           // Create the flush worker
-          worker = new FlushWorker(conf, this.conn, addr, this, perRegionServerBufferQueueSize,
+          worker = new FlushWorker(workerConf, this.conn, addr, this, perRegionServerBufferQueueSize,
                   pool, executor);
           this.serverToFlushWorkerMap.put(addr, worker);
           executor.scheduleAtFixedRate(worker, flushPeriod, flushPeriod, TimeUnit.MILLISECONDS);
@@ -388,32 +394,30 @@ public class HTableMultiplexer {
 
   private static class FlushWorker implements Runnable {
     private final HRegionLocation addr;
-    private final AsyncProcess asyncProc;
     private final LinkedBlockingQueue<PutStatus> queue;
     private final HTableMultiplexer multiplexer;
     private final AtomicLong totalFailedPutCount = new AtomicLong(0);
     private final AtomicInteger currentProcessingCount = new AtomicInteger(0);
     private final AtomicAverageCounter averageLatency = new AtomicAverageCounter();
     private final AtomicLong maxLatency = new AtomicLong(0);
-    private final ExecutorService pool;
+
+    private final AsyncProcess ap;
     private final List<PutStatus> processingList = new ArrayList<>();
     private final ScheduledExecutorService executor;
     private final int maxRetryInQueue;
     private final AtomicInteger retryInQueue = new AtomicInteger(0);
-    private final int rpcTimeOutMs;
     
     public FlushWorker(Configuration conf, ClusterConnection conn, HRegionLocation addr,
         HTableMultiplexer htableMultiplexer, int perRegionServerBufferQueueSize,
         ExecutorService pool, ScheduledExecutorService executor) {
       this.addr = addr;
-      this.asyncProc = conn.getAsyncProcess();
       this.multiplexer = htableMultiplexer;
       this.queue = new LinkedBlockingQueue<>(perRegionServerBufferQueueSize);
-      this.pool = pool;
+      RpcRetryingCallerFactory rpcCallerFactory = RpcRetryingCallerFactory.instantiate(conf);
+      RpcControllerFactory rpcControllerFactory = RpcControllerFactory.instantiate(conf);
+      this.ap = new AsyncProcess(conn, conf, pool, rpcCallerFactory, false, rpcControllerFactory);
       this.executor = executor;
       this.maxRetryInQueue = conf.getInt(TABLE_MULTIPLEXER_MAX_RETRIES_IN_QUEUE, 10000);
-      this.rpcTimeOutMs =
-          conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
     }
 
     protected LinkedBlockingQueue<PutStatus> getQueue() {
@@ -456,10 +460,11 @@ public class HTableMultiplexer {
       // The currentPut is failed. So get the table name for the currentPut.
       final TableName tableName = ps.regionInfo.getTable();
 
-      // Wait at least RPC timeout time
-      long delayMs = rpcTimeOutMs;
-      delayMs = Math.max(delayMs, (long) (multiplexer.flushPeriod * Math.pow(2,
-              multiplexer.retryNum - retryCount)));
+      long delayMs = ConnectionUtils.getPauseTime(multiplexer.flushPeriod,
+        multiplexer.retryNum - retryCount - 1);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("resubmitting after " + delayMs + "ms: " + retryCount);
+      }
 
       executor.schedule(new Runnable() {
         @Override
@@ -513,8 +518,8 @@ public class HTableMultiplexer {
             Collections.singletonMap(server, actions);
         try {
           AsyncRequestFuture arf =
-              asyncProc.submitMultiActions(null, retainedActions, 0L, null, results, true,
null,
-                null, actionsByServer, pool);
+              ap.submitMultiActions(null, retainedActions, 0L, null, results, true, null,
+                null, actionsByServer, null);
           arf.waitUntilDone();
           if (arf.hasError()) {
             // We just log and ignore the exception here since failed Puts will be resubmit
again.
@@ -523,20 +528,20 @@ public class HTableMultiplexer {
           }
         } finally {
           for (int i = 0; i < results.length; i++) {
-            if (results[i] == null) {
+            if (results[i] instanceof Result) {
+              failedCount--;
+            } else {
               if (failed == null) {
                 failed = new ArrayList<PutStatus>();
               }
               failed.add(processingList.get(i));
-            } else {
-              failedCount--;
             }
           }
         }
 
         if (failed != null) {
           // Resubmit failed puts
-          for (PutStatus putStatus : processingList) {
+          for (PutStatus putStatus : failed) {
             if (resubmitFailedPut(putStatus, this.addr)) {
               failedCount--;
             }

http://git-wip-us.apache.org/repos/asf/hbase/blob/3c1fbd2d/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHTableMultiplexerFlushCache.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHTableMultiplexerFlushCache.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHTableMultiplexerFlushCache.java
new file mode 100644
index 0000000..2898369
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHTableMultiplexerFlushCache.java
@@ -0,0 +1,115 @@
+/**
+ * Copyright The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.MiniHBaseCluster;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.testclassification.ClientTests;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category({ LargeTests.class, ClientTests.class })
+public class TestHTableMultiplexerFlushCache {
+  final Log LOG = LogFactory.getLog(getClass());
+  private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+  private static byte[] FAMILY = Bytes.toBytes("testFamily");
+  private static byte[] QUALIFIER1 = Bytes.toBytes("testQualifier_1");
+  private static byte[] QUALIFIER2 = Bytes.toBytes("testQualifier_2");
+  private static byte[] VALUE1 = Bytes.toBytes("testValue1");
+  private static byte[] VALUE2 = Bytes.toBytes("testValue2");
+  private static int SLAVES = 3;
+  private static int PER_REGIONSERVER_QUEUE_SIZE = 100000;
+
+  /**
+   * @throws java.lang.Exception
+   */
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    TEST_UTIL.startMiniCluster(SLAVES);
+  }
+
+  /**
+   * @throws java.lang.Exception
+   */
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+    TEST_UTIL.shutdownMiniCluster();
+  }
+
+  private static void checkExistence(HTable htable, byte[] row, byte[] family, byte[] quality,
+      byte[] value) throws Exception {
+    // verify that the Get returns the correct result
+    Result r;
+    Get get = new Get(row);
+    get.addColumn(family, quality);
+    int nbTry = 0;
+    do {
+      assertTrue("Fail to get from " + htable.getName() + " after " + nbTry + " tries", nbTry
< 50);
+      nbTry++;
+      Thread.sleep(100);
+      r = htable.get(get);
+    } while (r == null || r.getValue(family, quality) == null);
+    assertEquals("value", Bytes.toStringBinary(value),
+      Bytes.toStringBinary(r.getValue(family, quality)));
+  }
+
+  @Test
+  public void testOnRegionChange() throws Exception {
+    TableName TABLE = TableName.valueOf("testOnRegionChange");
+    final int NUM_REGIONS = 10;
+    HTable htable = TEST_UTIL.createTable(TABLE, new byte[][] { FAMILY }, 3,
+      Bytes.toBytes("aaaaa"), Bytes.toBytes("zzzzz"), NUM_REGIONS);
+
+    HTableMultiplexer multiplexer = new HTableMultiplexer(TEST_UTIL.getConfiguration(), 
+      PER_REGIONSERVER_QUEUE_SIZE);
+    
+    byte[][] startRows = htable.getStartKeys();
+    byte[] row = startRows[1];
+    assertTrue("2nd region should not start with empty row", row != null && row.length
> 0);
+
+    Put put = new Put(row).add(FAMILY, QUALIFIER1, VALUE1);
+    assertTrue("multiplexer.put returns", multiplexer.put(TABLE, put));
+    
+    checkExistence(htable, row, FAMILY, QUALIFIER1, VALUE1);
+
+    // Now let's shutdown the regionserver and let regions moved to other servers.
+    HRegionLocation loc = htable.getRegionLocation(row);
+    MiniHBaseCluster hbaseCluster = TEST_UTIL.getHBaseCluster(); 
+    hbaseCluster.stopRegionServer(loc.getServerName());
+    TEST_UTIL.waitUntilAllRegionsAssigned(TABLE);
+
+    // put with multiplexer.
+    put = new Put(row).add(FAMILY, QUALIFIER2, VALUE2);
+    assertTrue("multiplexer.put returns", multiplexer.put(TABLE, put));
+
+    checkExistence(htable, row, FAMILY, QUALIFIER2, VALUE2);
+  }
+}


Mime
View raw message