hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bus...@apache.org
Subject hbase git commit: HBASE-15221 Reload the cache on re-tried puts in HTableMultiplexer and adds a close() method to HTableMultiplexer
Date Mon, 08 Feb 2016 06:44:07 GMT
Repository: hbase
Updated Branches:
  refs/heads/branch-1 0649755d3 -> 01b73e987


HBASE-15221 Reload the cache on re-tried puts in HTableMultiplexer and adds a close() method
to HTableMultiplexer

When a Put fails due to a NotServingRegionException, the cached location
of that Region is never cleared. Thus, subsequent calls to resubmit
the Put will fail in the same way as the original, never determining
the new location of the Region.

If the Connection is not closed by the user before the Multiplexer
is discarded, it will leak resources and could cause resource
issues.

Signed-off-by: Sean Busbey <busbey@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/01b73e98
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/01b73e98
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/01b73e98

Branch: refs/heads/branch-1
Commit: 01b73e9877ccac6faed941fc83351955b642fc09
Parents: 0649755
Author: Josh Elser <elserj@apache.org>
Authored: Fri Feb 5 21:23:37 2016 -0500
Committer: Sean Busbey <busbey@cloudera.com>
Committed: Mon Feb 8 00:07:08 2016 -0600

----------------------------------------------------------------------
 hbase-client/pom.xml                            |   5 +
 .../hadoop/hbase/client/HTableMultiplexer.java  | 111 +++++++++--
 .../client/TestHTableMultiplexerViaMocks.java   | 193 +++++++++++++++++++
 3 files changed, 292 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/01b73e98/hbase-client/pom.xml
----------------------------------------------------------------------
diff --git a/hbase-client/pom.xml b/hbase-client/pom.xml
index ef9f30b..a3ba28c 100644
--- a/hbase-client/pom.xml
+++ b/hbase-client/pom.xml
@@ -193,6 +193,11 @@
       <groupId>com.yammer.metrics</groupId>
       <artifactId>metrics-core</artifactId>
     </dependency>
+    <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-all</artifactId>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
 
   <profiles>

http://git-wip-us.apache.org/repos/asf/hbase/blob/01b73e98/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java
index f6becc6..744b24b 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java
@@ -19,6 +19,9 @@
  */
 package org.apache.hadoop.hbase.client;
 
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
 import java.io.IOException;
 import java.util.AbstractMap.SimpleEntry;
 import java.util.ArrayList;
@@ -50,8 +53,6 @@ import org.apache.hadoop.hbase.client.AsyncProcess.AsyncRequestFuture;
 import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-
 /**
  * HTableMultiplexer provides a thread-safe non blocking PUT API across all the tables.
  * Each put will be sharded into different buffer queues based on its destination region
server.
@@ -97,7 +98,18 @@ public class HTableMultiplexer {
    */
   public HTableMultiplexer(Configuration conf, int perRegionServerBufferQueueSize)
       throws IOException {
-    this.conn = (ClusterConnection) ConnectionFactory.createConnection(conf);
+    this(ConnectionFactory.createConnection(conf), conf, perRegionServerBufferQueueSize);
+  }
+
+  /**
+   * @param conn The HBase connection.
+   * @param conf The HBase configuration
+   * @param perRegionServerBufferQueueSize determines the max number of the buffered Put
ops for
+   *          each region server before dropping the request.
+   */
+  public HTableMultiplexer(Connection conn, Configuration conf,
+      int perRegionServerBufferQueueSize) {
+    this.conn = (ClusterConnection) conn;
     this.pool = HTable.getDefaultExecutor(conf);
     this.retryNum = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
         HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
@@ -116,6 +128,18 @@ public class HTableMultiplexer {
   }
 
   /**
+   * Closes the internal {@link Connection}. Does nothing if the {@link Connection} has already
+   * been closed.
+   * @throws IOException If there is an error closing the connection.
+   */
+  @SuppressWarnings("deprecation")
+  public synchronized void close() throws IOException {
+    if (!getConnection().isClosed()) {
+      getConnection().close();
+    }
+  }
+
+  /**
    * The put request will be buffered by its corresponding buffer queue. Return false if
the queue
    * is already full.
    * @param tableName
@@ -169,13 +193,28 @@ public class HTableMultiplexer {
    * @return true if the request can be accepted by its corresponding buffer queue.
    */
   public boolean put(final TableName tableName, final Put put, int retry) {
+    return _put(tableName, put, retry, false);
+  }
+
+  /**
+   * Internal "put" which exposes a boolean flag to control whether or not the region location
+   * cache should be reloaded when trying to queue the {@link Put}.
+   * @param tableName Destination table for the Put
+   * @param put The Put to send
+   * @param retry Number of attempts to retry the {@code put}
+   * @param reloadCache Should the region location cache be reloaded
+   * @return true if the request was accepted in the queue, otherwise false
+   */
+  boolean _put(final TableName tableName, final Put put, int retry, boolean reloadCache)
{
     if (retry <= 0) {
       return false;
     }
 
     try {
       HTable.validatePut(put, maxKeyValueSize);
-      HRegionLocation loc = conn.getRegionLocation(tableName, put.getRow(), false);
+      // Allow mocking to get at the connection, but don't expose the connection to users.
+      ClusterConnection conn = (ClusterConnection) getConnection();
+      HRegionLocation loc = conn.getRegionLocation(tableName, put.getRow(), reloadCache);
       if (loc != null) {
         // Add the put pair into its corresponding queue.
         LinkedBlockingQueue<PutStatus> queue = getQueue(loc);
@@ -214,7 +253,8 @@ public class HTableMultiplexer {
     return new HTableMultiplexerStatus(serverToFlushWorkerMap);
   }
 
-  private LinkedBlockingQueue<PutStatus> getQueue(HRegionLocation addr) {
+  @VisibleForTesting
+  LinkedBlockingQueue<PutStatus> getQueue(HRegionLocation addr) {
     FlushWorker worker = serverToFlushWorkerMap.get(addr);
     if (worker == null) {
       synchronized (this.serverToFlushWorkerMap) {
@@ -231,6 +271,11 @@ public class HTableMultiplexer {
     return worker.getQueue();
   }
 
+  @VisibleForTesting
+  ClusterConnection getConnection() {
+    return this.conn;
+  }
+
   /**
    * HTableMultiplexerStatus keeps track of the current status of the HTableMultiplexer.
    * report the number of buffered requests and the number of the failed (dropped) requests
@@ -339,7 +384,8 @@ public class HTableMultiplexer {
     }
   }
 
-  private static class PutStatus {
+  @VisibleForTesting
+  static class PutStatus {
     public final HRegionInfo regionInfo;
     public final Put put;
     public final int retryCount;
@@ -391,7 +437,8 @@ public class HTableMultiplexer {
     }
   }
 
-  private static class FlushWorker implements Runnable {
+  @VisibleForTesting
+  static class FlushWorker implements Runnable {
     private final HRegionLocation addr;
     private final LinkedBlockingQueue<PutStatus> queue;
     private final HTableMultiplexer multiplexer;
@@ -439,7 +486,7 @@ public class HTableMultiplexer {
       return this.maxLatency.getAndSet(0);
     }
 
-    private boolean resubmitFailedPut(PutStatus ps, HRegionLocation oldLoc) throws IOException
{
+    boolean resubmitFailedPut(PutStatus ps, HRegionLocation oldLoc) throws IOException {
       // Decrease the retry count
       final int retryCount = ps.retryCount - 1;
 
@@ -448,10 +495,10 @@ public class HTableMultiplexer {
         return false;
       }
 
-      int cnt = retryInQueue.incrementAndGet();
-      if (cnt > maxRetryInQueue) {
+      int cnt = getRetryInQueue().incrementAndGet();
+      if (cnt > getMaxRetryInQueue()) {
         // Too many Puts in queue for resubmit, give up this
-        retryInQueue.decrementAndGet();
+        getRetryInQueue().decrementAndGet();
         return false;
       }
 
@@ -459,22 +506,21 @@ public class HTableMultiplexer {
       // The currentPut is failed. So get the table name for the currentPut.
       final TableName tableName = ps.regionInfo.getTable();
 
-      long delayMs = ConnectionUtils.getPauseTime(multiplexer.flushPeriod,
-        multiplexer.retryNum - retryCount - 1);
+      long delayMs = getNextDelay(retryCount);
       if (LOG.isDebugEnabled()) {
         LOG.debug("resubmitting after " + delayMs + "ms: " + retryCount);
       }
 
-      executor.schedule(new Runnable() {
+      getExecutor().schedule(new Runnable() {
         @Override
         public void run() {
           boolean succ = false;
           try {
-            succ = FlushWorker.this.multiplexer.put(tableName, failedPut, retryCount);
+            succ = FlushWorker.this.getMultiplexer()._put(tableName, failedPut, retryCount,
true);
           } finally {
-            FlushWorker.this.retryInQueue.decrementAndGet();
+            FlushWorker.this.getRetryInQueue().decrementAndGet();
             if (!succ) {
-              FlushWorker.this.totalFailedPutCount.incrementAndGet();
+              FlushWorker.this.getTotalFailedPutCount().incrementAndGet();
             }
           }
         }
@@ -482,6 +528,37 @@ public class HTableMultiplexer {
       return true;
     }
 
+    @VisibleForTesting
+    long getNextDelay(int retryCount) {
+      return ConnectionUtils.getPauseTime(multiplexer.flushPeriod,
+          multiplexer.retryNum - retryCount - 1);
+    }
+
+    @VisibleForTesting
+    AtomicInteger getRetryInQueue() {
+      return this.retryInQueue;
+    }
+
+    @VisibleForTesting
+    int getMaxRetryInQueue() {
+      return this.maxRetryInQueue;
+    }
+
+    @VisibleForTesting
+    AtomicLong getTotalFailedPutCount() {
+      return this.totalFailedPutCount;
+    }
+
+    @VisibleForTesting
+    HTableMultiplexer getMultiplexer() {
+      return this.multiplexer;
+    }
+
+    @VisibleForTesting
+    ScheduledExecutorService getExecutor() {
+      return this.executor;
+    }
+
     @Override
     public void run() {
       int failedCount = 0;

http://git-wip-us.apache.org/repos/asf/hbase/blob/01b73e98/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestHTableMultiplexerViaMocks.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestHTableMultiplexerViaMocks.java
b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestHTableMultiplexerViaMocks.java
new file mode 100644
index 0000000..38ddeb9
--- /dev/null
+++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestHTableMultiplexerViaMocks.java
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import static org.junit.Assert.*;
+
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.HTableMultiplexer.FlushWorker;
+import org.apache.hadoop.hbase.client.HTableMultiplexer.PutStatus;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import java.io.IOException;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyBoolean;
+import static org.mockito.Matchers.anyInt;
+import static org.mockito.Mockito.doCallRealMethod;
+import static org.mockito.Mockito.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+@Category(SmallTests.class)
+public class TestHTableMultiplexerViaMocks {
+
+  private static final int NUM_RETRIES = HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER;
+  private HTableMultiplexer mockMultiplexer;
+  private ClusterConnection mockConnection;
+  private HRegionLocation mockRegionLocation;
+  private HRegionInfo mockRegionInfo;
+
+  private TableName tableName;
+  private Put put;
+
+  @Before
+  public void setupTest() {
+    mockMultiplexer = mock(HTableMultiplexer.class);
+    mockConnection = mock(ClusterConnection.class);
+    mockRegionLocation = mock(HRegionLocation.class);
+    mockRegionInfo = mock(HRegionInfo.class);
+
+    tableName = TableName.valueOf("my_table");
+    put = new Put(getBytes("row1"));
+    put.addColumn(getBytes("f1"), getBytes("q1"), getBytes("v11"));
+    put.addColumn(getBytes("f1"), getBytes("q2"), getBytes("v12"));
+    put.addColumn(getBytes("f2"), getBytes("q1"), getBytes("v21"));
+
+    // Call the real put(TableName, Put, int) method
+    when(mockMultiplexer.put(any(TableName.class), any(Put.class), anyInt())).thenCallRealMethod();
+
+    // Return the mocked ClusterConnection
+    when(mockMultiplexer.getConnection()).thenReturn(mockConnection);
+
+    // Return the regionInfo from the region location
+    when(mockRegionLocation.getRegionInfo()).thenReturn(mockRegionInfo);
+
+    // Make sure this RegionInfo points to our table
+    when(mockRegionInfo.getTable()).thenReturn(tableName);
+  }
+
+  @Test public void useCacheOnInitialPut() throws Exception {
+    mockMultiplexer.put(tableName, put, NUM_RETRIES);
+
+    verify(mockMultiplexer)._put(tableName, put, NUM_RETRIES, false);
+  }
+
+  @Test public void nonNullLocationQueuesPut() throws Exception {
+    final LinkedBlockingQueue<PutStatus> queue = new LinkedBlockingQueue<>();
+
+    // Call the real method for _put(TableName, Put, int, boolean)
+    when(mockMultiplexer._put(any(TableName.class), any(Put.class), anyInt(), anyBoolean())).thenCallRealMethod();
+
+    // Return a region location
+    when(mockConnection.getRegionLocation(tableName, put.getRow(), false)).thenReturn(mockRegionLocation);
+    when(mockMultiplexer.getQueue(mockRegionLocation)).thenReturn(queue);
+
+    assertTrue("Put should have been queued", mockMultiplexer.put(tableName, put, NUM_RETRIES));
+
+    assertEquals(1, queue.size());
+    final PutStatus ps = queue.take();
+    assertEquals(put, ps.put);
+    assertEquals(mockRegionInfo, ps.regionInfo);
+  }
+
+  @Test public void ignoreCacheOnRetriedPut() throws Exception {
+    FlushWorker mockFlushWorker = mock(FlushWorker.class);
+    ScheduledExecutorService mockExecutor = mock(ScheduledExecutorService.class);
+    final AtomicInteger retryInQueue = new AtomicInteger(0);
+    final AtomicLong totalFailedPuts = new AtomicLong(0L);
+    final int maxRetryInQueue = 20;
+    final long delay = 100L;
+
+    final PutStatus ps = new PutStatus(mockRegionInfo, put, NUM_RETRIES);
+
+    // Call the real resubmitFailedPut(PutStatus, HRegionLocation) method
+    when(mockFlushWorker.resubmitFailedPut(any(PutStatus.class), any(HRegionLocation.class))).thenCallRealMethod();
+    // Succeed on the re-submit without caching
+    when(mockMultiplexer._put(tableName, put, NUM_RETRIES - 1, true)).thenReturn(true);
+
+    // Stub out the getters for resubmitFailedPut(PutStatus, HRegionLocation)
+    when(mockFlushWorker.getExecutor()).thenReturn(mockExecutor);
+    when(mockFlushWorker.getNextDelay(anyInt())).thenReturn(delay);
+    when(mockFlushWorker.getMultiplexer()).thenReturn(mockMultiplexer);
+    when(mockFlushWorker.getRetryInQueue()).thenReturn(retryInQueue);
+    when(mockFlushWorker.getMaxRetryInQueue()).thenReturn(maxRetryInQueue);
+    when(mockFlushWorker.getTotalFailedPutCount()).thenReturn(totalFailedPuts);
+
+    // When a Runnable is scheduled, run that Runnable
+    when(mockExecutor.schedule(any(Runnable.class), eq(delay), eq(TimeUnit.MILLISECONDS))).thenAnswer(
+        new Answer<Void>() {
+          @Override
+          public Void answer(InvocationOnMock invocation) throws Throwable {
+            // Before we run this, should have one retry in progress.
+            assertEquals(1L, retryInQueue.get());
+
+            Object[] args = invocation.getArguments();
+            assertEquals(3, args.length);
+            assertTrue("Argument should be an instance of Runnable", args[0] instanceof Runnable);
+            Runnable runnable = (Runnable) args[0];
+            runnable.run();
+            return null;
+          }
+        });
+
+    // The put should be rescheduled
+    assertTrue("Put should have been rescheduled", mockFlushWorker.resubmitFailedPut(ps,
mockRegionLocation));
+
+    verify(mockMultiplexer)._put(tableName, put, NUM_RETRIES - 1, true);
+    assertEquals(0L, totalFailedPuts.get());
+    // Net result should be zero (added one before rerunning, subtracted one after running).
+    assertEquals(0L, retryInQueue.get());
+  }
+
+  @SuppressWarnings("deprecation")
+  @Test public void testConnectionClosing() throws IOException {
+    doCallRealMethod().when(mockMultiplexer).close();
+    // If the connection is not closed
+    when(mockConnection.isClosed()).thenReturn(false);
+
+    mockMultiplexer.close();
+
+    // We should close it
+    verify(mockConnection).close();
+  }
+
+  @SuppressWarnings("deprecation")
+  @Test public void testClosingAlreadyClosedConnection() throws IOException {
+    doCallRealMethod().when(mockMultiplexer).close();
+    // If the connection is already closed
+    when(mockConnection.isClosed()).thenReturn(true);
+
+    mockMultiplexer.close();
+
+    // We should not close it again
+    verify(mockConnection, times(0)).close();
+  }
+
+  /**
+   * @return UTF-8 byte representation for {@code str}
+   */
+  private static byte[] getBytes(String str) {
+    return str.getBytes(UTF_8);
+  }
+}


Mime
View raw message