hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ecl...@apache.org
Subject [12/50] [abbrv] git commit: [HBASE-11254] Improve retry logic in HTableMultiplexer.HTableFlushWorker
Date Thu, 31 Jul 2014 22:07:48 GMT
[HBASE-11254] Improve retry logic in HTableMultiplexer.HTableFlushWorker

Summary:
- Refactored code a little bit to avoid duplication
- Improved retry logic in HtableMultiplexer: every failed put is scheduled over executor for
retrying. Every time the put is readded, we increase the time when it is going to be executed.
Additionally I introduce a counter, which counts this kind of puts and limits them, so that
we don't run into OOM (this parameter is also client configurable parameter).

Test Plan: ran TestHTableMultplexer test

Reviewers: liyintang, aaiyer, rshroff, manukranthk, fan, gauravm, daviddeng, elliott

Reviewed By: elliott

Subscribers: hbase-eng@

Differential Revision: https://phabricator.fb.com/D1348746

Tasks: 4245275

git-svn-id: svn+ssh://tubbs/svnhive/hadoop/branches/titan/VENDOR.hbase/hbase-trunk@42672 e7acf4d4-3532-417f-9e73-7a9ae25a1f51


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/d3183638
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/d3183638
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/d3183638

Branch: refs/heads/0.89-fb
Commit: d318363840c236d94fcc5a26ef22b268fe8aec5d
Parents: a404f16
Author: adela <adela@e7acf4d4-3532-417f-9e73-7a9ae25a1f51>
Authored: Wed Jun 4 22:11:05 2014 +0000
Committer: Elliott Clark <elliott@fb.com>
Committed: Thu Jul 31 14:44:22 2014 -0700

----------------------------------------------------------------------
 .../hadoop/hbase/client/HTableMultiplexer.java  | 163 ++++++++++++-------
 1 file changed, 101 insertions(+), 62 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/d3183638/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java b/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java
index e8d3ad3..9156fe4 100644
--- a/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java
+++ b/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java
@@ -23,6 +23,7 @@ import java.io.IOException;
 import java.util.AbstractMap.SimpleEntry;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -41,6 +42,7 @@ import java.util.concurrent.atomic.AtomicLong;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HRegionLocation;
 import org.apache.hadoop.hbase.HServerAddress;
@@ -66,21 +68,23 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder;
  */
 public class HTableMultiplexer {
   private static final Log LOG = LogFactory.getLog(HTableMultiplexer.class.getName());
-  private Map<byte[], HTable> tableNameToHTableMap;
+  private final Map<byte[], HTable> tableNameToHTableMap;
 
   /** The map between each region server to its corresponding buffer queue */
-  private Map<HServerAddress, LinkedBlockingQueue<PutStatus>>
+  private final Map<HServerAddress, LinkedBlockingQueue<PutStatus>>
     serverToBufferQueueMap;
 
   /** The map between each region server to its flush worker */
-  private Map<HServerAddress, HTableFlushWorker> serverToFlushWorkerMap;
-
-  private Configuration conf;
-  private HConnection connection;
-  private int retryNum;
-  private int perRegionServerBufferQueueSize;
-  private ScheduledExecutorService executor;
-  private long frequency = 100;
+  private final Map<HServerAddress, HTableFlushWorker> serverToFlushWorkerMap;
+
+  private final Configuration conf;
+  private final HConnection connection;
+  private final int retryNum;
+  // limit of retried puts scheduled for retry
+  private final int retriedInQueueMax;
+  private final int perRegionServerBufferQueueSize;
+  private final ScheduledExecutorService executor;
+  private final long frequency;
   //initial number of threads in the pool
   public static final int INITIAL_NUM_THREADS = 10;
   
@@ -99,6 +103,7 @@ public class HTableMultiplexer {
     this.tableNameToHTableMap = new ConcurrentSkipListMap<byte[], HTable>(
             Bytes.BYTES_COMPARATOR);
     this.retryNum = conf.getInt("hbase.client.retries.number", 10);
+    this.retriedInQueueMax = conf.getInt("hbase.client.retried.inQueue", 10000);
     this.perRegionServerBufferQueueSize = perRegionServerBufferQueueSize;
     this.frequency = conf.getLong("hbase.htablemultiplexer.flush.frequency.ms",
         100);
@@ -230,10 +235,9 @@ public class HTableMultiplexer {
       serverToBufferQueueMap.put(addr, queue);
 
       // Create the flush worker
-      HTableFlushWorker worker = new HTableFlushWorker(conf, addr,
-          this.connection, this, queue);
+      HTableFlushWorker worker = new HTableFlushWorker(addr, queue);
       this.serverToFlushWorkerMap.put(addr, worker);
-      executor.scheduleAtFixedRate(worker, frequency, frequency, TimeUnit.MICROSECONDS);
+      executor.scheduleAtFixedRate(worker, frequency, frequency, TimeUnit.MILLISECONDS);
     }
     return queue;
   }
@@ -559,29 +563,23 @@ public class HTableMultiplexer {
     }
   }
 
-  private static class HTableFlushWorker implements Runnable {
-    private HServerAddress addr;
-    private Configuration conf;
-    private LinkedBlockingQueue<PutStatus> queue;
-    private HConnection connection;
-    private HTableMultiplexer htableMultiplexer;
-    private AtomicLong totalFailedPutCount;
-    private AtomicLong totalSucceededPutCount;
-    private AtomicLong totalRetriedPutCount;
-    private AtomicInteger currentProcessingPutCount;
-    private AtomicInteger minProcessingPutCount;
-    private AtomicInteger maxProcessingPutCount;
-    private AtomicAverageCounter avgProcessingPutCount;
-    private AtomicAverageCounter averageLatency;
-    private AtomicLong maxLatency;
-
-    public HTableFlushWorker(Configuration conf, HServerAddress addr,
-        HConnection connection, HTableMultiplexer htableMultiplexer,
-        LinkedBlockingQueue<PutStatus> queue) {
+  private class HTableFlushWorker implements Runnable {
+    private final HServerAddress addr;
+    private final LinkedBlockingQueue<PutStatus> queue;
+    private final AtomicLong totalFailedPutCount;
+    private final AtomicLong totalSucceededPutCount;
+    private final AtomicLong totalRetriedPutCount;
+    private final AtomicInteger currentProcessingPutCount;
+    private final AtomicInteger minProcessingPutCount;
+    private final AtomicInteger maxProcessingPutCount;
+    private final AtomicAverageCounter avgProcessingPutCount;
+    private final AtomicAverageCounter averageLatency;
+    private final AtomicLong maxLatency;
+    // how many retried puts we have inserted in the queue
+    private final AtomicInteger retriedPutsInQueue;
+
+    public HTableFlushWorker(HServerAddress addr, LinkedBlockingQueue<PutStatus> queue)
{
       this.addr = addr;
-      this.conf = conf;
-      this.connection = connection;
-      this.htableMultiplexer = htableMultiplexer;
       this.queue = queue;
       this.totalFailedPutCount = new AtomicLong(0);
       this.totalSucceededPutCount = new AtomicLong(0);
@@ -592,6 +590,7 @@ public class HTableMultiplexer {
       this.avgProcessingPutCount = new AtomicAverageCounter();
       this.averageLatency = new AtomicAverageCounter();
       this.maxLatency = new AtomicLong(0);
+      this.retriedPutsInQueue = new AtomicInteger(0);
     }
 
     public long getTotalFailedCount() {
@@ -628,6 +627,12 @@ public class HTableMultiplexer {
       return this.maxLatency.getAndSet(0);
     }
 
+    /**
+     * Resubmit logic for failed put. If we have exhausted the retry count we
+     * return false. Otherwise, we schedule over executor when the failed put is
+     * going to resubmitted. Depending on how many times it has failed, the time
+     * when it is going to be tried again will increase.
+     */
     private boolean resubmitFailedPut(PutStatus failedPutStatus, HServerAddress oldLoc) throws
IOException{
       Put failedPut = failedPutStatus.getPut();
       // The currentPut is failed. So get the table name for the currentPut.
@@ -639,9 +644,40 @@ public class HTableMultiplexer {
         // Update the failed counter and no retry any more.
         return false;
       } else {
-        // Retry one more time
         HBaseRPCOptions options = failedPutStatus.getOptions();
-        return this.htableMultiplexer.put(tableName, failedPut, retryCount, options);
+        // schedule the retry of the failed put
+        int currPuts = retriedPutsInQueue.incrementAndGet();
+        // only schedule the put if we are below limit of allowed scheduled-retriable puts
+        if (currPuts <= HTableMultiplexer.this.retriedInQueueMax) {
+          Runnable actuallyReinsert = new Runnable() {
+            @Override
+            public void run() {
+              retriedPutsInQueue.decrementAndGet();
+              try {
+                HTableMultiplexer.this.put(tableName, failedPut, retryCount,
+                    options);
+              } catch (IOException e) {
+                // Log all the exceptions and move on
+                LOG.debug("Caught some exceptions " + e
+                    + " when reinserting puts to region server "
+                    + addr.getHostNameWithPort());
+              }
+            }
+          };
+          // Wait at most DEFAULT_HBASE_RPC_TIMEOUT
+          long waitTimeMs = Math.max(conf.getInt(
+              HConstants.HBASE_RPC_TIMEOUT_KEY,
+              HConstants.DEFAULT_HBASE_RPC_TIMEOUT), (long) (frequency * Math
+              .pow(2, HTableMultiplexer.this.retryNum - retryCount)));
+          executor.schedule(actuallyReinsert,
+              waitTimeMs,
+              TimeUnit.MILLISECONDS);
+          return true;
+        } else {
+          // limit for allowed scheduled-retriable puts is reached, put is failed.
+          retriedPutsInQueue.decrementAndGet(); // decrement since we optimistically incremented
+          return false;
+        }
       }
     }
 
@@ -686,43 +722,45 @@ public class HTableMultiplexer {
             }
           }
 
+          List<PutStatus> putsForResubmit;
+
           // Process this multiput request
-          List<Put> failed = null;
           Map<String, HRegionFailureInfo> failureInfo = new HashMap<String, HRegionFailureInfo>();
           try {
-            failed = connection.processListOfMultiPut(Arrays.asList(mput),
+            List<Put> failed = connection.processListOfMultiPut(Arrays.asList(mput),
                 null, options, failureInfo);
+            if (failed != null) {
+              if (failed.size() == processingList.size()) {
+                // All the puts for this region server are failed. Going to retry
+                // it later
+                putsForResubmit = processingList;
+              } else {
+                putsForResubmit = new ArrayList<>();
+                Set<Put> failedPutSet = new HashSet<Put>(failed);
+                for (PutStatus putStatus : processingList) {
+                  if (failedPutSet.contains(putStatus.getPut())) {
+                    putsForResubmit.add(putStatus);
+                  }
+                }
+              }
+            } else {
+              putsForResubmit = Collections.emptyList();
+            }
           } catch (PreemptiveFastFailException e) {
             // Client is not blocking on us. So, let us treat this
             // as a normal failure, and retry.
-            for (PutStatus putStatus : processingList) {
-              if (!resubmitFailedPut(putStatus, this.addr)) {
-                completelyFailed++;
-              }
-            }
+            putsForResubmit = processingList;
           }
 
           long putsToRetry = 0;
-          if (failed != null) {
-            if (failed.size() == processingList.size()) {
-              // All the puts for this region server are failed. Going to retry
-              // it later
-              for (PutStatus putStatus : processingList) {
-                if (!resubmitFailedPut(putStatus, this.addr)) {
-                  completelyFailed++;
-                }
-              }
-            } else {
-              Set<Put> failedPutSet = new HashSet<Put>(failed);
-              for (PutStatus putStatus : processingList) {
-                if (failedPutSet.contains(putStatus.getPut())
-                    && !resubmitFailedPut(putStatus, this.addr)) {
-                  completelyFailed++;
-                }
-              }
+          for (PutStatus putStatus : putsForResubmit) {
+            if (!resubmitFailedPut(putStatus, this.addr)) {
+              completelyFailed++;
             }
-            putsToRetry = failed.size() - completelyFailed;
           }
+          putsToRetry = putsForResubmit.size() - completelyFailed;
+
+
           // Update the totalFailedCount
           this.totalFailedPutCount.addAndGet(completelyFailed);
           // Update the totalSucceededPutCount
@@ -756,5 +794,6 @@ public class HTableMultiplexer {
             + addr.getHostNameWithPort());
       }
     }
+
   }
 }


Mime
View raw message